This is an automated email from the ASF dual-hosted git repository.
pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2e865155e [INLONG-7026][Manager] Fixed the problem of not checked
before creating the kafka topic (#7027)
2e865155e is described below
commit 2e865155e236fdeb59cb8a8154c063e5d05b3a99
Author: fuweng11 <[email protected]>
AuthorDate: Thu Dec 22 20:22:08 2022 +0800
[INLONG-7026][Manager] Fixed the problem of not checked before creating the
kafka topic (#7027)
---
.../inlong/manager/service/resource/queue/kafka/KafkaOperator.java | 5 +++++
1 file changed, 5 insertions(+)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index 6691b55d0..7e626ec10 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -49,6 +49,11 @@ public class KafkaOperator {
NewTopic topic = new NewTopic(topicName,
inlongKafkaInfo.getNumPartitions(),
inlongKafkaInfo.getReplicationFactor());
+ // Topic will be returned if it exists, and created if it does not
exist
+ if (topicIsExists(kafkaClusterInfo, topicName)) {
+ LOGGER.warn("kafka topic={} already exists", topicName);
+ return;
+ }
CreateTopicsResult result =
adminClient.createTopics(Collections.singletonList(topic));
// To prevent the client from disconnecting too quickly and causing
the Topic to not be created successfully
Thread.sleep(500);