This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6485fa5c8a7 [HUDI-4625] Clean up KafkaOffsetGen: introduce retrying 
KafkaConsumer (#11664)
6485fa5c8a7 is described below

commit 6485fa5c8a79f3199e24dfdf721d9d1993f88fa2
Author: Vova Kolmakov <[email protected]>
AuthorDate: Tue Jul 30 07:16:54 2024 +0700

    [HUDI-4625] Clean up KafkaOffsetGen: introduce retrying KafkaConsumer 
(#11664)
    
    Co-authored-by: Vova Kolmakov <[email protected]>
---
 .../hudi/utilities/config/KafkaSourceConfig.java   | 31 ++++++-
 .../sources/HoodieRetryingKafkaConsumer.java       | 99 ++++++++++++++++++++++
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 31 +++----
 .../utilities/sources/BaseTestKafkaSource.java     |  3 +-
 .../sources/helpers/TestKafkaOffsetGen.java        |  2 +
 5 files changed, 144 insertions(+), 22 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
index 92f1f1cc507..b0954d85bd5 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
@@ -76,7 +76,7 @@ public class KafkaSourceConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Schema to deserialize the records.");
 
-
+  @Deprecated
   public static final ConfigProperty<Long> KAFKA_FETCH_PARTITION_TIME_OUT = 
ConfigProperty
       .key(PREFIX + "fetch_partition.time.out")
       .defaultValue(300 * 1000L)
@@ -138,6 +138,35 @@ public class KafkaSourceConfig extends HoodieConfig {
       .sinceVersion("0.15.0")
       .withDocumentation("Kafka Proto Payload Deserializer Class");
 
+  public static final ConfigProperty<Long> INITIAL_RETRY_INTERVAL_MS = 
ConfigProperty
+      .key(PREFIX + "retry.initial_interval_ms")
+      .defaultValue(100L)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Amount of time (in ms) to wait, before retry to do 
operations on KafkaConsumer.");
+
+  public static final ConfigProperty<Long> MAX_RETRY_INTERVAL_MS = 
ConfigProperty
+      .key(PREFIX + "retry.max_interval_ms")
+      .defaultValue(2000L)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Maximum amount of time (in ms), to wait for next 
retry.");
+
+  public static final ConfigProperty<Integer> MAX_RETRY_COUNT = ConfigProperty
+      .key(PREFIX + "retry.max_count")
+      .defaultValue(4)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Maximum number of retry actions to perform, with 
exponential backoff.");
+
+  public static final ConfigProperty<String> RETRY_EXCEPTIONS = ConfigProperty
+      .key(PREFIX + "retry.exceptions")
+      .defaultValue("")
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("The class name of the Exception that needs to be 
retried, separated by commas. "
+          + "Default is empty which means retry all the IOException and 
RuntimeException from KafkaConsumer");
+
   /**
    * Kafka reset offset strategies.
    */
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieRetryingKafkaConsumer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieRetryingKafkaConsumer.java
new file mode 100644
index 00000000000..234edd64dcf
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieRetryingKafkaConsumer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.RetryHelper;
+import org.apache.hudi.utilities.config.KafkaSourceConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Retrying client for {@link KafkaConsumer} operations.
+ */
+public class HoodieRetryingKafkaConsumer extends KafkaConsumer {
+
+  private final long maxRetryIntervalMs;
+  private final int maxRetryCount;
+  private final long initialRetryIntervalMs;
+  private final String retryExceptionsList;
+
+  public HoodieRetryingKafkaConsumer(TypedProperties config, Map<String, 
Object> kafkaParams) {
+    super(kafkaParams);
+    this.maxRetryIntervalMs = 
config.getLong(KafkaSourceConfig.MAX_RETRY_INTERVAL_MS.key(),
+        KafkaSourceConfig.MAX_RETRY_INTERVAL_MS.defaultValue());
+    this.maxRetryCount = 
config.getInteger(KafkaSourceConfig.MAX_RETRY_COUNT.key(),
+        KafkaSourceConfig.MAX_RETRY_COUNT.defaultValue());
+    this.initialRetryIntervalMs = 
config.getLong(KafkaSourceConfig.INITIAL_RETRY_INTERVAL_MS.key(),
+        KafkaSourceConfig.INITIAL_RETRY_INTERVAL_MS.defaultValue());
+    this.retryExceptionsList = 
config.getString(KafkaSourceConfig.RETRY_EXCEPTIONS.key(),
+        KafkaSourceConfig.RETRY_EXCEPTIONS.defaultValue());
+  }
+
+  @Override
+  public Map<TopicPartition, Long> beginningOffsets(Collection partitions) {
+    return new RetryHelper<Map<TopicPartition, Long>, KafkaException>(
+        maxRetryIntervalMs, maxRetryCount, initialRetryIntervalMs, 
retryExceptionsList)
+        .start(() -> super.beginningOffsets(partitions));
+  }
+
+  @Override
+  public Map<TopicPartition, Long> endOffsets(Collection partitions) {
+    return new RetryHelper<Map<TopicPartition, Long>, KafkaException>(
+        maxRetryIntervalMs, maxRetryCount, initialRetryIntervalMs, 
retryExceptionsList)
+        .start(() -> super.endOffsets(partitions));
+  }
+
+  @Override
+  public List<PartitionInfo> partitionsFor(String topic) {
+    return new RetryHelper<List<PartitionInfo>, KafkaException>(
+        maxRetryIntervalMs, maxRetryCount, initialRetryIntervalMs, 
retryExceptionsList)
+        .start(() -> super.partitionsFor(topic));
+  }
+
+  @Override
+  public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map 
timestampsToSearch) {
+    return new RetryHelper<Map<TopicPartition, OffsetAndTimestamp>, 
KafkaException>(
+        maxRetryIntervalMs, maxRetryCount, initialRetryIntervalMs, 
retryExceptionsList)
+        .start(() -> super.offsetsForTimes(timestampsToSearch));
+  }
+
+  @Override
+  public Map<String, List<PartitionInfo>> listTopics() {
+    return new RetryHelper<Map<String, List<PartitionInfo>>, KafkaException>(
+        maxRetryIntervalMs, maxRetryCount, initialRetryIntervalMs, 
retryExceptionsList)
+        .start(() -> super.listTopics());
+  }
+
+  @Override
+  public OffsetAndMetadata committed(TopicPartition partition) {
+    return new RetryHelper<OffsetAndMetadata, KafkaException>(
+        maxRetryIntervalMs, maxRetryCount, initialRetryIntervalMs, 
retryExceptionsList)
+        .start(() -> super.committed(partition));
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 028e44bbe50..8e5750752b6 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -28,6 +28,7 @@ import 
org.apache.hudi.utilities.exception.HoodieStreamerException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.sources.AvroKafkaSource;
 
+import org.apache.hudi.utilities.sources.HoodieRetryingKafkaConsumer;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -49,7 +50,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -57,6 +57,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
 import static org.apache.hudi.common.util.ConfigUtils.checkRequiredProperties;
+import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
@@ -274,7 +275,7 @@ public class KafkaOffsetGen {
     // Obtain current metadata for the topic
     Map<TopicPartition, Long> fromOffsets;
     Map<TopicPartition, Long> toOffsets;
-    try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
+    try (KafkaConsumer consumer = new HoodieRetryingKafkaConsumer(props, 
kafkaParams)) {
       if (!checkTopicExists(consumer)) {
         throw new HoodieException("Kafka topic:" + topicName + " does not 
exist");
       }
@@ -324,22 +325,12 @@ public class KafkaOffsetGen {
    * @param topicName
    */
   private List<PartitionInfo> fetchPartitionInfos(KafkaConsumer consumer, 
String topicName) {
-    long timeout = getLongWithAltKeys(this.props, 
KafkaSourceConfig.KAFKA_FETCH_PARTITION_TIME_OUT);
-    long start = System.currentTimeMillis();
-
-    List<PartitionInfo> partitionInfos;
-    do {
-      // TODO(HUDI-4625) cleanup, introduce retrying client
-      partitionInfos = consumer.partitionsFor(topicName);
-      try {
-        if (partitionInfos == null) {
-          TimeUnit.SECONDS.sleep(10);
-        }
-      } catch (InterruptedException e) {
-        LOG.error("Sleep failed while fetching partitions");
-      }
-    } while (partitionInfos == null && (System.currentTimeMillis() <= (start + 
timeout)));
-
+    if (containsConfigProperty(this.props, 
KafkaSourceConfig.KAFKA_FETCH_PARTITION_TIME_OUT)) {
+      LOG.warn("{} is deprecated and is not taking effect anymore. Use {}, {} 
and {} for setting up retrying configuration of KafkaConsumer",
+          KafkaSourceConfig.KAFKA_FETCH_PARTITION_TIME_OUT.key(), 
KafkaSourceConfig.INITIAL_RETRY_INTERVAL_MS.key(),
+          KafkaSourceConfig.MAX_RETRY_INTERVAL_MS.key(), 
KafkaSourceConfig.MAX_RETRY_COUNT.key());
+    }
+    List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
     if (partitionInfos == null) {
       throw new HoodieStreamerException(String.format("Can not find metadata 
for topic %s from kafka cluster", topicName));
     }
@@ -477,7 +468,7 @@ public class KafkaOffsetGen {
     return kafkaParams;
   }
 
-  private Map<String, Object> excludeHoodieConfigs(TypedProperties props) {
+  public static Map<String, Object> excludeHoodieConfigs(TypedProperties 
props) {
     Map<String, Object> kafkaParams = new HashMap<>();
     props.keySet().stream().filter(prop -> {
       // In order to prevent printing unnecessary warn logs, here filter out 
the hoodie
@@ -500,7 +491,7 @@ public class KafkaOffsetGen {
     checkRequiredProperties(props, 
Collections.singletonList(ConsumerConfig.GROUP_ID_CONFIG));
     Map<TopicPartition, Long> offsetMap = 
CheckpointUtils.strToOffsets(checkpointStr);
     Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new 
HashMap<>(offsetMap.size());
-    try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
+    try (KafkaConsumer consumer = new HoodieRetryingKafkaConsumer(props, 
kafkaParams)) {
       offsetMap.forEach((topicPartition, offset) -> 
offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(offset)));
       consumer.commitSync(offsetAndMetadataMap);
     } catch (CommitFailedException | TimeoutException e) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index 3227891df5a..e8368b0d590 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -27,6 +27,7 @@ import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.exception.HoodieStreamerException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.streamer.SourceProfile;
 import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
@@ -214,7 +215,7 @@ public abstract class BaseTestKafkaSource extends 
SparkClientFunctionalTestHarne
     InputBatch<JavaRDD<GenericRecord>> fetch1 = 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 599);
     // commit to kafka after first batch
     kafkaSource.getSource().onCommit(fetch1.getCheckpointForNextBatch());
-    try (KafkaConsumer consumer = new KafkaConsumer(props)) {
+    try (KafkaConsumer consumer = new HoodieRetryingKafkaConsumer(props, 
KafkaOffsetGen.excludeHoodieConfigs(props))) {
       consumer.assign(topicPartitions);
 
       OffsetAndMetadata offsetAndMetadata = 
consumer.committed(topicPartition0);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index db8bcab42b0..bbc6422e4dc 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -259,6 +259,8 @@ public class TestKafkaOffsetGen {
     assertEquals(300, nextOffsetRanges[0].untilOffset());
 
     props.put(KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), 2L);
+    // just to check warn-message manually if props contains deprecated config
+    props.put(KafkaSourceConfig.KAFKA_FETCH_PARTITION_TIME_OUT.key(), 1L);
     kafkaOffsetGen = new KafkaOffsetGen(props);
     nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, 
metrics);
     assertEquals(0, nextOffsetRanges[0].fromOffset());

Reply via email to