This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0c15f498f4 [INLONG-8226][Manager] Correct KafkaSink's partitionNum
property type definition (#8227)
0c15f498f4 is described below
commit 0c15f498f4baa9ab5abed0af289020c137e07ef2
Author: chestnufang <[email protected]>
AuthorDate: Tue Jun 13 18:43:12 2023 +0800
[INLONG-8226][Manager] Correct KafkaSink's partitionNum property type
definition (#8227)
Co-authored-by: chestnufang <[email protected]>
---
.../inlong/manager/client/api/inner/ClientFactoryTest.java | 2 +-
.../org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java | 2 +-
.../apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java | 2 +-
.../inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java | 2 +-
.../inlong/manager/pojo/sort/node/provider/KafkaProvider.java | 4 +---
.../service/resource/sink/kafka/KafkaResourceOperator.java | 10 +++++-----
6 files changed, 10 insertions(+), 12 deletions(-)
diff --git
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index 62378840d8..2430e21602 100644
---
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -655,7 +655,7 @@ class ClientFactoryTest {
.id(6)
.sinkType(SinkType.KAFKA)
.topicName("test")
- .partitionNum("6")
+ .partitionNum(6)
.build(),
PostgreSQLSink.builder()
.id(7)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
index 851d394398..15e212d578 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
@@ -50,7 +50,7 @@ public class KafkaSink extends StreamSink {
private String topicName;
@ApiModelProperty("Partition number of the topic")
- private String partitionNum;
+ private Integer partitionNum;
@ApiModelProperty("Data Serialization, support: json, canal, avro")
private String serializationType;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
index 8deb703b0c..3916153cf3 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
@@ -47,7 +47,7 @@ public class KafkaSinkDTO {
private String topicName;
@ApiModelProperty("Partition number of the topic")
- private String partitionNum;
+ private Integer partitionNum;
@ApiModelProperty("Data Serialization, support: json, canal, avro")
private String serializationType;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
index f7cd6513e2..899fba146a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
@@ -45,7 +45,7 @@ public class KafkaSinkRequest extends SinkRequest {
private String topicName;
@ApiModelProperty("Partition number of the topic")
- private String partitionNum;
+ private Integer partitionNum;
@ApiModelProperty("Data Serialization, support: json, canal, avro")
private String serializationType = DataFormat.CANAL.getName();
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
index bb355b30c6..e8315660b6 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
@@ -42,7 +42,6 @@ import
org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Map;
@@ -99,8 +98,7 @@ public class KafkaProvider implements ExtractNodeProvider,
LoadNodeProvider {
List<FieldInfo> fieldInfos =
parseSinkFieldInfos(kafkaSink.getSinkFieldList(), kafkaSink.getSinkName());
List<FieldRelation> fieldRelations =
parseSinkFields(kafkaSink.getSinkFieldList(), constantFieldMap);
- String partitionNum = kafkaSink.getPartitionNum();
- Integer sinkParallelism = StringUtils.isNotBlank(partitionNum) ?
Integer.parseInt(partitionNum) : null;
+ Integer sinkParallelism = kafkaSink.getPartitionNum();
Format format = parseFormat(kafkaSink.getSerializationType());
return new KafkaLoadNode(
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java
index a84a484021..0e89b9867b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java
@@ -65,15 +65,15 @@ public class KafkaResourceOperator implements
SinkResourceOperator {
KafkaSinkDTO kafkaInfo =
KafkaSinkDTO.getFromJson(sinkInfo.getExtParams());
String topicName = kafkaInfo.getTopicName();
- String partitionNum = kafkaInfo.getPartitionNum();
+ Integer partitionNum = kafkaInfo.getPartitionNum();
Preconditions.expectNotBlank(topicName,
ErrorCodeEnum.INVALID_PARAMETER, "topic name cannot be empty");
- Preconditions.expectNotBlank(partitionNum,
ErrorCodeEnum.INVALID_PARAMETER, "partition cannot be empty");
+ Preconditions.expectNotNull(partitionNum,
ErrorCodeEnum.INVALID_PARAMETER, "partition cannot be empty");
try (Admin admin = getKafkaAdmin(kafkaInfo.getBootstrapServers())) {
boolean topicExists = isTopicExists(admin, topicName,
partitionNum);
if (!topicExists) {
CreateTopicsResult result =
admin.createTopics(Collections.singleton(
- new NewTopic(topicName,
Optional.of(Integer.parseInt(partitionNum)), Optional.empty())));
+ new NewTopic(topicName, Optional.of(partitionNum),
Optional.empty())));
result.values().get(topicName).get();
}
@@ -90,7 +90,7 @@ public class KafkaResourceOperator implements
SinkResourceOperator {
/**
* Check whether the topic exists in the Kafka MQ
*/
- private boolean isTopicExists(Admin admin, String topicName, String
partitionNum) throws Exception {
+ private boolean isTopicExists(Admin admin, String topicName, Integer
partitionNum) throws Exception {
ListTopicsResult listResult = admin.listTopics();
if (!listResult.namesToListings().get().containsKey(topicName)) {
LOGGER.info("kafka topic {} not existed", topicName);
@@ -100,7 +100,7 @@ public class KafkaResourceOperator implements
SinkResourceOperator {
DescribeTopicsResult result =
admin.describeTopics(Collections.singletonList(topicName));
TopicDescription desc = result.values().get(topicName).get();
String info = "kafka topic=%s already exist with partition num=%s";
- if (desc.partitions().size() != Integer.parseInt(partitionNum)) {
+ if (desc.partitions().size() != partitionNum) {
String errMsg = String.format(info + ", but the requested
partition num=%s", topicName,
desc.partitions().size(), partitionNum);
LOGGER.error(errMsg);