This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6c84ef2 [HUDI-1282] Check whether the topic exists before deltastrmer
consumes Kafka (#2090)
6c84ef2 is described below
commit 6c84ef20ac737087ed76c12b2f3e40d9c3aa4b30
Author: liujinhui <[email protected]>
AuthorDate: Wed Sep 16 10:43:52 2020 +0800
[HUDI-1282] Check whether the topic exists before deltastrmer consumes
Kafka (#2090)
---
.../hudi/utilities/sources/helpers/KafkaOffsetGen.java | 13 +++++++++++++
1 file changed, 13 insertions(+)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 23d3c8d..e958f85 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -172,6 +172,9 @@ public class KafkaOffsetGen {
Map<TopicPartition, Long> fromOffsets;
Map<TopicPartition, Long> toOffsets;
try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
+ if (!checkTopicExists(consumer)) {
+ throw new HoodieException("Kafka topic:" + topicName + " does not
exist");
+ }
List<PartitionInfo> partitionInfoList;
partitionInfoList = consumer.partitionsFor(topicName);
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
@@ -230,6 +233,16 @@ public class KafkaOffsetGen {
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
}
+ /**
+ * Check if topic exists.
+ * @param consumer kafka consumer
+ * @return
+ */
+ public boolean checkTopicExists(KafkaConsumer consumer) {
+ Map<String, List<PartitionInfo>> result = consumer.listTopics();
+ return result.containsKey(topicName);
+ }
+
public String getTopicName() {
return topicName;
}