This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6485fa5c8a7 [HUDI-4625] Clean up KafkaOffsetGen: introduce retrying
KafkaConsumer (#11664)
6485fa5c8a7 is described below
commit 6485fa5c8a79f3199e24dfdf721d9d1993f88fa2
Author: Vova Kolmakov <[email protected]>
AuthorDate: Tue Jul 30 07:16:54 2024 +0700
[HUDI-4625] Clean up KafkaOffsetGen: introduce retrying KafkaConsumer
(#11664)
Co-authored-by: Vova Kolmakov <[email protected]>
---
.../hudi/utilities/config/KafkaSourceConfig.java | 31 ++++++-
.../sources/HoodieRetryingKafkaConsumer.java | 99 ++++++++++++++++++++++
.../utilities/sources/helpers/KafkaOffsetGen.java | 31 +++----
.../utilities/sources/BaseTestKafkaSource.java | 3 +-
.../sources/helpers/TestKafkaOffsetGen.java | 2 +
5 files changed, 144 insertions(+), 22 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
index 92f1f1cc507..b0954d85bd5 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
@@ -76,7 +76,7 @@ public class KafkaSourceConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Schema to deserialize the records.");
-
+ @Deprecated
public static final ConfigProperty<Long> KAFKA_FETCH_PARTITION_TIME_OUT =
ConfigProperty
.key(PREFIX + "fetch_partition.time.out")
.defaultValue(300 * 1000L)
@@ -138,6 +138,35 @@ public class KafkaSourceConfig extends HoodieConfig {
.sinceVersion("0.15.0")
.withDocumentation("Kafka Proto Payload Deserializer Class");
+ public static final ConfigProperty<Long> INITIAL_RETRY_INTERVAL_MS =
ConfigProperty
+ .key(PREFIX + "retry.initial_interval_ms")
+ .defaultValue(100L)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Amount of time (in ms) to wait, before retry to do
operations on KafkaConsumer.");
+
+ public static final ConfigProperty<Long> MAX_RETRY_INTERVAL_MS =
ConfigProperty
+ .key(PREFIX + "retry.max_interval_ms")
+ .defaultValue(2000L)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Maximum amount of time (in ms), to wait for next
retry.");
+
+ public static final ConfigProperty<Integer> MAX_RETRY_COUNT = ConfigProperty
+ .key(PREFIX + "retry.max_count")
+ .defaultValue(4)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Maximum number of retry actions to perform, with
exponential backoff.");
+
+ public static final ConfigProperty<String> RETRY_EXCEPTIONS = ConfigProperty
+ .key(PREFIX + "retry.exceptions")
+ .defaultValue("")
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("The class name of the Exception that needs to be
retried, separated by commas. "
+ + "Default is empty which means retry all the IOException and
RuntimeException from KafkaConsumer");
+
/**
* Kafka reset offset strategies.
*/
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieRetryingKafkaConsumer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieRetryingKafkaConsumer.java
new file mode 100644
index 00000000000..234edd64dcf
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieRetryingKafkaConsumer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.RetryHelper;
+import org.apache.hudi.utilities.config.KafkaSourceConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Retrying client for {@link KafkaConsumer} operations.
+ */
+public class HoodieRetryingKafkaConsumer extends KafkaConsumer {
+
+ private final long maxRetryIntervalMs;
+ private final int maxRetryCount;
+ private final long initialRetryIntervalMs;
+ private final String retryExceptionsList;
+
+ public HoodieRetryingKafkaConsumer(TypedProperties config, Map<String,
Object> kafkaParams) {
+ super(kafkaParams);
+ this.maxRetryIntervalMs =
config.getLong(KafkaSourceConfig.MAX_RETRY_INTERVAL_MS.key(),
+ KafkaSourceConfig.MAX_RETRY_INTERVAL_MS.defaultValue());
+ this.maxRetryCount =
config.getInteger(KafkaSourceConfig.MAX_RETRY_COUNT.key(),
+ KafkaSourceConfig.MAX_RETRY_COUNT.defaultValue());
+ this.initialRetryIntervalMs =
config.getLong(KafkaSourceConfig.INITIAL_RETRY_INTERVAL_MS.key(),
+ KafkaSourceConfig.INITIAL_RETRY_INTERVAL_MS.defaultValue());
+ this.retryExceptionsList =
config.getString(KafkaSourceConfig.RETRY_EXCEPTIONS.key(),
+ KafkaSourceConfig.RETRY_EXCEPTIONS.defaultValue());
+ }
+
+ @Override
+ public Map<TopicPartition, Long> beginningOffsets(Collection partitions) {
+ return new RetryHelper<Map<TopicPartition, Long>, KafkaException>(
+ maxRetryIntervalMs, maxRetryCount, initialRetryIntervalMs,
retryExceptionsList)
+ .start(() -> super.beginningOffsets(partitions));
+ }
+
+ @Override
+ public Map<TopicPartition, Long> endOffsets(Collection partitions) {
+ return new RetryHelper<Map<TopicPartition, Long>, KafkaException>(
+ maxRetryIntervalMs, maxRetryCount, initialRetryIntervalMs,
retryExceptionsList)
+ .start(() -> super.endOffsets(partitions));
+ }
+
+ @Override
+ public List<PartitionInfo> partitionsFor(String topic) {
+ return new RetryHelper<List<PartitionInfo>, KafkaException>(
+ maxRetryIntervalMs, maxRetryCount, initialRetryIntervalMs,
retryExceptionsList)
+ .start(() -> super.partitionsFor(topic));
+ }
+
+ @Override
+ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map
timestampsToSearch) {
+ return new RetryHelper<Map<TopicPartition, OffsetAndTimestamp>,
KafkaException>(
+ maxRetryIntervalMs, maxRetryCount, initialRetryIntervalMs,
retryExceptionsList)
+ .start(() -> super.offsetsForTimes(timestampsToSearch));
+ }
+
+ @Override
+ public Map<String, List<PartitionInfo>> listTopics() {
+ return new RetryHelper<Map<String, List<PartitionInfo>>, KafkaException>(
+ maxRetryIntervalMs, maxRetryCount, initialRetryIntervalMs,
retryExceptionsList)
+ .start(() -> super.listTopics());
+ }
+
+ @Override
+ public OffsetAndMetadata committed(TopicPartition partition) {
+ return new RetryHelper<OffsetAndMetadata, KafkaException>(
+ maxRetryIntervalMs, maxRetryCount, initialRetryIntervalMs,
retryExceptionsList)
+ .start(() -> super.committed(partition));
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 028e44bbe50..8e5750752b6 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -28,6 +28,7 @@ import
org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
+import org.apache.hudi.utilities.sources.HoodieRetryingKafkaConsumer;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -49,7 +50,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -57,6 +57,7 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
import static org.apache.hudi.common.util.ConfigUtils.checkRequiredProperties;
+import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
@@ -274,7 +275,7 @@ public class KafkaOffsetGen {
// Obtain current metadata for the topic
Map<TopicPartition, Long> fromOffsets;
Map<TopicPartition, Long> toOffsets;
- try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
+ try (KafkaConsumer consumer = new HoodieRetryingKafkaConsumer(props,
kafkaParams)) {
if (!checkTopicExists(consumer)) {
throw new HoodieException("Kafka topic:" + topicName + " does not
exist");
}
@@ -324,22 +325,12 @@ public class KafkaOffsetGen {
* @param topicName
*/
private List<PartitionInfo> fetchPartitionInfos(KafkaConsumer consumer,
String topicName) {
- long timeout = getLongWithAltKeys(this.props,
KafkaSourceConfig.KAFKA_FETCH_PARTITION_TIME_OUT);
- long start = System.currentTimeMillis();
-
- List<PartitionInfo> partitionInfos;
- do {
- // TODO(HUDI-4625) cleanup, introduce retrying client
- partitionInfos = consumer.partitionsFor(topicName);
- try {
- if (partitionInfos == null) {
- TimeUnit.SECONDS.sleep(10);
- }
- } catch (InterruptedException e) {
- LOG.error("Sleep failed while fetching partitions");
- }
- } while (partitionInfos == null && (System.currentTimeMillis() <= (start +
timeout)));
-
+ if (containsConfigProperty(this.props,
KafkaSourceConfig.KAFKA_FETCH_PARTITION_TIME_OUT)) {
+ LOG.warn("{} is deprecated and is not taking effect anymore. Use {}, {}
and {} for setting up retrying configuration of KafkaConsumer",
+ KafkaSourceConfig.KAFKA_FETCH_PARTITION_TIME_OUT.key(),
KafkaSourceConfig.INITIAL_RETRY_INTERVAL_MS.key(),
+ KafkaSourceConfig.MAX_RETRY_INTERVAL_MS.key(),
KafkaSourceConfig.MAX_RETRY_COUNT.key());
+ }
+ List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
if (partitionInfos == null) {
throw new HoodieStreamerException(String.format("Can not find metadata
for topic %s from kafka cluster", topicName));
}
@@ -477,7 +468,7 @@ public class KafkaOffsetGen {
return kafkaParams;
}
- private Map<String, Object> excludeHoodieConfigs(TypedProperties props) {
+ public static Map<String, Object> excludeHoodieConfigs(TypedProperties
props) {
Map<String, Object> kafkaParams = new HashMap<>();
props.keySet().stream().filter(prop -> {
// In order to prevent printing unnecessary warn logs, here filter out
the hoodie
@@ -500,7 +491,7 @@ public class KafkaOffsetGen {
checkRequiredProperties(props,
Collections.singletonList(ConsumerConfig.GROUP_ID_CONFIG));
Map<TopicPartition, Long> offsetMap =
CheckpointUtils.strToOffsets(checkpointStr);
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new
HashMap<>(offsetMap.size());
- try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
+ try (KafkaConsumer consumer = new HoodieRetryingKafkaConsumer(props,
kafkaParams)) {
offsetMap.forEach((topicPartition, offset) ->
offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(offset)));
consumer.commitSync(offsetAndMetadataMap);
} catch (CommitFailedException | TimeoutException e) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index 3227891df5a..e8368b0d590 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -27,6 +27,7 @@ import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
@@ -214,7 +215,7 @@ public abstract class BaseTestKafkaSource extends
SparkClientFunctionalTestHarne
InputBatch<JavaRDD<GenericRecord>> fetch1 =
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 599);
// commit to kafka after first batch
kafkaSource.getSource().onCommit(fetch1.getCheckpointForNextBatch());
- try (KafkaConsumer consumer = new KafkaConsumer(props)) {
+ try (KafkaConsumer consumer = new HoodieRetryingKafkaConsumer(props,
KafkaOffsetGen.excludeHoodieConfigs(props))) {
consumer.assign(topicPartitions);
OffsetAndMetadata offsetAndMetadata =
consumer.committed(topicPartition0);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index db8bcab42b0..bbc6422e4dc 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -259,6 +259,8 @@ public class TestKafkaOffsetGen {
assertEquals(300, nextOffsetRanges[0].untilOffset());
props.put(KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), 2L);
+ // just to check warn-message manually if props contains deprecated config
+ props.put(KafkaSourceConfig.KAFKA_FETCH_PARTITION_TIME_OUT.key(), 1L);
kafkaOffsetGen = new KafkaOffsetGen(props);
nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300,
metrics);
assertEquals(0, nextOffsetRanges[0].fromOffset());