This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c84ef2  [HUDI-1282] Check whether the topic exists before deltastrmer 
consumes Kafka (#2090)
6c84ef2 is described below

commit 6c84ef20ac737087ed76c12b2f3e40d9c3aa4b30
Author: liujinhui <[email protected]>
AuthorDate: Wed Sep 16 10:43:52 2020 +0800

    [HUDI-1282] Check whether the topic exists before deltastrmer consumes 
Kafka (#2090)
---
 .../hudi/utilities/sources/helpers/KafkaOffsetGen.java      | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 23d3c8d..e958f85 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -172,6 +172,9 @@ public class KafkaOffsetGen {
     Map<TopicPartition, Long> fromOffsets;
     Map<TopicPartition, Long> toOffsets;
     try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
+      if (!checkTopicExists(consumer)) {
+        throw new HoodieException("Kafka topic:" + topicName + " does not 
exist");
+      }
       List<PartitionInfo> partitionInfoList;
       partitionInfoList = consumer.partitionsFor(topicName);
       Set<TopicPartition> topicPartitions = partitionInfoList.stream()
@@ -230,6 +233,16 @@ public class KafkaOffsetGen {
     return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
   }
 
+  /**
+   * Check if topic exists.
+   * @param consumer kafka consumer
+   * @return
+   */
+  public boolean checkTopicExists(KafkaConsumer consumer)  {
+    Map<String, List<PartitionInfo>> result = consumer.listTopics();
+    return result.containsKey(topicName);
+  }
+
   public String getTopicName() {
     return topicName;
   }

Reply via email to