This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c15f498f4 [INLONG-8226][Manager] Correct KafkaSink's partitionNum 
property type definition (#8227)
0c15f498f4 is described below

commit 0c15f498f4baa9ab5abed0af289020c137e07ef2
Author: chestnufang <[email protected]>
AuthorDate: Tue Jun 13 18:43:12 2023 +0800

    [INLONG-8226][Manager] Correct KafkaSink's partitionNum property type 
definition (#8227)
    
    Co-authored-by: chestnufang <[email protected]>
---
 .../inlong/manager/client/api/inner/ClientFactoryTest.java     |  2 +-
 .../org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java   |  2 +-
 .../apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java    |  2 +-
 .../inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java       |  2 +-
 .../inlong/manager/pojo/sort/node/provider/KafkaProvider.java  |  4 +---
 .../service/resource/sink/kafka/KafkaResourceOperator.java     | 10 +++++-----
 6 files changed, 10 insertions(+), 12 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
 
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index 62378840d8..2430e21602 100644
--- 
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++ 
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -655,7 +655,7 @@ class ClientFactoryTest {
                         .id(6)
                         .sinkType(SinkType.KAFKA)
                         .topicName("test")
-                        .partitionNum("6")
+                        .partitionNum(6)
                         .build(),
                 PostgreSQLSink.builder()
                         .id(7)
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
index 851d394398..15e212d578 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
@@ -50,7 +50,7 @@ public class KafkaSink extends StreamSink {
     private String topicName;
 
     @ApiModelProperty("Partition number of the topic")
-    private String partitionNum;
+    private Integer partitionNum;
 
     @ApiModelProperty("Data Serialization, support: json, canal, avro")
     private String serializationType;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
index 8deb703b0c..3916153cf3 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
@@ -47,7 +47,7 @@ public class KafkaSinkDTO {
     private String topicName;
 
     @ApiModelProperty("Partition number of the topic")
-    private String partitionNum;
+    private Integer partitionNum;
 
     @ApiModelProperty("Data Serialization, support: json, canal, avro")
     private String serializationType;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
index f7cd6513e2..899fba146a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
@@ -45,7 +45,7 @@ public class KafkaSinkRequest extends SinkRequest {
     private String topicName;
 
     @ApiModelProperty("Partition number of the topic")
-    private String partitionNum;
+    private Integer partitionNum;
 
     @ApiModelProperty("Data Serialization, support: json, canal, avro")
     private String serializationType = DataFormat.CANAL.getName();
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
index bb355b30c6..e8315660b6 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
@@ -42,7 +42,6 @@ import 
org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
 
 import java.util.List;
 import java.util.Map;
@@ -99,8 +98,7 @@ public class KafkaProvider implements ExtractNodeProvider, 
LoadNodeProvider {
         List<FieldInfo> fieldInfos = 
parseSinkFieldInfos(kafkaSink.getSinkFieldList(), kafkaSink.getSinkName());
         List<FieldRelation> fieldRelations = 
parseSinkFields(kafkaSink.getSinkFieldList(), constantFieldMap);
 
-        String partitionNum = kafkaSink.getPartitionNum();
-        Integer sinkParallelism = StringUtils.isNotBlank(partitionNum) ? 
Integer.parseInt(partitionNum) : null;
+        Integer sinkParallelism = kafkaSink.getPartitionNum();
         Format format = parseFormat(kafkaSink.getSerializationType());
 
         return new KafkaLoadNode(
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java
index a84a484021..0e89b9867b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java
@@ -65,15 +65,15 @@ public class KafkaResourceOperator implements 
SinkResourceOperator {
 
         KafkaSinkDTO kafkaInfo = 
KafkaSinkDTO.getFromJson(sinkInfo.getExtParams());
         String topicName = kafkaInfo.getTopicName();
-        String partitionNum = kafkaInfo.getPartitionNum();
+        Integer partitionNum = kafkaInfo.getPartitionNum();
         Preconditions.expectNotBlank(topicName, 
ErrorCodeEnum.INVALID_PARAMETER, "topic name cannot be empty");
-        Preconditions.expectNotBlank(partitionNum, 
ErrorCodeEnum.INVALID_PARAMETER, "partition cannot be empty");
+        Preconditions.expectNotNull(partitionNum, 
ErrorCodeEnum.INVALID_PARAMETER, "partition cannot be empty");
 
         try (Admin admin = getKafkaAdmin(kafkaInfo.getBootstrapServers())) {
             boolean topicExists = isTopicExists(admin, topicName, 
partitionNum);
             if (!topicExists) {
                 CreateTopicsResult result = 
admin.createTopics(Collections.singleton(
-                        new NewTopic(topicName, 
Optional.of(Integer.parseInt(partitionNum)), Optional.empty())));
+                        new NewTopic(topicName, Optional.of(partitionNum), 
Optional.empty())));
                 result.values().get(topicName).get();
             }
 
@@ -90,7 +90,7 @@ public class KafkaResourceOperator implements 
SinkResourceOperator {
     /**
      * Check whether the topic exists in the Kafka MQ
      */
-    private boolean isTopicExists(Admin admin, String topicName, String 
partitionNum) throws Exception {
+    private boolean isTopicExists(Admin admin, String topicName, Integer 
partitionNum) throws Exception {
         ListTopicsResult listResult = admin.listTopics();
         if (!listResult.namesToListings().get().containsKey(topicName)) {
             LOGGER.info("kafka topic {} not existed", topicName);
@@ -100,7 +100,7 @@ public class KafkaResourceOperator implements 
SinkResourceOperator {
         DescribeTopicsResult result = 
admin.describeTopics(Collections.singletonList(topicName));
         TopicDescription desc = result.values().get(topicName).get();
         String info = "kafka topic=%s already exist with partition num=%s";
-        if (desc.partitions().size() != Integer.parseInt(partitionNum)) {
+        if (desc.partitions().size() != partitionNum) {
             String errMsg = String.format(info + ", but the requested 
partition num=%s", topicName,
                     desc.partitions().size(), partitionNum);
             LOGGER.error(errMsg);

Reply via email to