This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2af98303d3 [HUDI-4122] Fix NPE caused by adding kafka nodes (#5632)
2af98303d3 is described below
commit 2af98303d3881e5d1da7d2e08f904b18f8b79488
Author: wangxianghu <[email protected]>
AuthorDate: Sat May 21 07:12:53 2022 +0400
[HUDI-4122] Fix NPE caused by adding kafka nodes (#5632)
---
.../utilities/sources/helpers/KafkaOffsetGen.java | 47 ++++++++++++++++++----
1 file changed, 39 insertions(+), 8 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 564c5e2058..1abd2616b9 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -22,17 +22,17 @@ import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
-
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
+import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
+
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
@@ -48,6 +48,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -169,9 +170,14 @@ public class KafkaOffsetGen {
.withDocumentation("Kafka topic name.");
public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE =
ConfigProperty
- .key("hoodie.deltastreamer.source.kafka.checkpoint.type")
- .defaultValue("string")
- .withDocumentation("Kafka chepoint type.");
+ .key("hoodie.deltastreamer.source.kafka.checkpoint.type")
+ .defaultValue("string")
+ .withDocumentation("Kafka checkpoint type.");
+
+ public static final ConfigProperty<Long> KAFKA_FETCH_PARTITION_TIME_OUT =
ConfigProperty
+ .key("hoodie.deltastreamer.source.kafka.fetch_partition.time.out")
+ .defaultValue(300 * 1000L)
+ .withDocumentation("Time out for fetching partitions. 5min by
default");
public static final ConfigProperty<Boolean> ENABLE_KAFKA_COMMIT_OFFSET =
ConfigProperty
.key("hoodie.deltastreamer.source.kafka.enable.commit.offset")
@@ -236,8 +242,7 @@ public class KafkaOffsetGen {
if (!checkTopicExists(consumer)) {
throw new HoodieException("Kafka topic:" + topicName + " does not
exist");
}
- List<PartitionInfo> partitionInfoList;
- partitionInfoList = consumer.partitionsFor(topicName);
+ List<PartitionInfo> partitionInfoList = fetchPartitionInfos(consumer,
topicName);
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
.map(x -> new TopicPartition(x.topic(),
x.partition())).collect(Collectors.toSet());
@@ -287,6 +292,32 @@ public class KafkaOffsetGen {
return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets,
numEvents);
}
+ /**
+ * Fetch partition infos for given topic.
+ *
+ * @param consumer
+ * @param topicName
+ */
+ private List<PartitionInfo> fetchPartitionInfos(KafkaConsumer consumer,
String topicName) {
+ long timeout =
this.props.getLong(Config.KAFKA_FETCH_PARTITION_TIME_OUT.key(),
Config.KAFKA_FETCH_PARTITION_TIME_OUT.defaultValue());
+ long start = System.currentTimeMillis();
+
+ List<PartitionInfo> partitionInfos;
+ do {
+ partitionInfos = consumer.partitionsFor(topicName);
+ try {
+ TimeUnit.SECONDS.sleep(10);
+ } catch (InterruptedException e) {
+ LOG.error("Sleep failed while fetching partitions");
+ }
+ } while (partitionInfos == null && (System.currentTimeMillis() <= (start +
timeout)));
+
+ if (partitionInfos == null) {
+ throw new HoodieDeltaStreamerException(String.format("Can not find
metadata for topic %s from kafka cluster", topicName));
+ }
+ return partitionInfos;
+ }
+
/**
* Fetch checkpoint offsets for each partition.
* @param consumer instance of {@link KafkaConsumer} to fetch offsets from.