This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 49ac549cf5ef [HUDI-9633] Implement ability to filter configs based on
custom prefixes before passing down to the kafka consumer (#13604)
49ac549cf5ef is described below
commit 49ac549cf5ef7dd635d069dd44100ab98e6f19d1
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Sat Jul 26 07:45:33 2025 -0700
[HUDI-9633] Implement ability to filter configs based on custom prefixes
before passing down to the kafka consumer (#13604)
---------
Co-authored-by: rmahindra123 <[email protected]>
---
.../hudi/utilities/config/KafkaSourceConfig.java | 7 +++
.../hudi/utilities/sources/AvroKafkaSource.java | 13 ++---
.../hudi/utilities/sources/JsonKafkaSource.java | 7 +--
.../apache/hudi/utilities/sources/KafkaSource.java | 68 ++++++++++++++++++++++
.../hudi/utilities/sources/ProtoKafkaSource.java | 10 ++--
.../hudi/utilities/sources/TestKafkaSource.java | 50 ++++++++++++++++
6 files changed, 135 insertions(+), 20 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
index e9606442ecd6..e0e06549ac02 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
@@ -177,6 +177,13 @@ public class KafkaSourceConfig extends HoodieConfig {
+ "OffsetOutOfRange exception, as it is possible that the earliest
Kafka offsets may "
+ "expire soon while the job is progressing.");
+ public static final ConfigProperty<String> IGNORE_PREFIX_CONFIG_LIST =
ConfigProperty
+ .key(PREFIX + "ignore.prefix.config.list")
+ .defaultValue("")
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Comma-separated list of prefixes for config keys
that should be dropped from the configs when passed to the Kafka consumer.");
+
/**
* Kafka reset offset strategies.
*/
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 7e17b0ff0b2e..210627153c5e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -39,8 +39,6 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.kafka010.KafkaUtils;
-import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,14 +114,13 @@ public class AvroKafkaSource extends
KafkaSource<JavaRDD<GenericRecord>> {
//Don't want kafka offsets here so we use originalSchemaProvider
AvroConvertor convertor = new
AvroConvertor(originalSchemaProvider.getSourceSchema());
- kafkaRDD = KafkaUtils.<String, byte[]>createRDD(sparkContext,
offsetGen.getKafkaParams(), offsetRanges,
- LocationStrategies.PreferConsistent()).filter(obj -> obj.value() !=
null).map(obj ->
- new ConsumerRecord<>(obj.topic(), obj.partition(), obj.offset(),
- obj.key(), convertor.fromAvroBinary(obj.value())));
+ JavaRDD<ConsumerRecord<String, byte[]>> kafkaRDDByteArray =
createKafkaRDD(this.props, sparkContext, offsetGen, offsetRanges);
+ kafkaRDD = kafkaRDDByteArray.filter(obj -> obj.value() != null)
+ .map(obj -> new ConsumerRecord<>(obj.topic(), obj.partition(),
obj.offset(), obj.key(), convertor.fromAvroBinary(obj.value())));
} else {
- kafkaRDD = KafkaUtils.createRDD(sparkContext,
offsetGen.getKafkaParams(), offsetRanges,
- LocationStrategies.PreferConsistent());
+ kafkaRDD = createKafkaRDD(this.props, sparkContext, offsetGen,
offsetRanges);
}
+
return maybeAppendKafkaOffsets(kafkaRDD.filter(consemerRec ->
consemerRec.value() != null));
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 7c47691ac116..266020ba7761 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -45,8 +45,6 @@ import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.kafka010.KafkaUtils;
-import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,10 +92,7 @@ public class JsonKafkaSource extends
KafkaSource<JavaRDD<String>> {
@Override
protected JavaRDD<String> toBatch(OffsetRange[] offsetRanges) {
String deserializerClass =
props.getString(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP);
- JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD =
KafkaUtils.createRDD(sparkContext,
- offsetGen.getKafkaParams(),
- offsetRanges,
- LocationStrategies.PreferConsistent())
+ JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD =
createKafkaRDD(this.props, sparkContext, offsetGen, offsetRanges)
.filter(x -> filterForNullValues(x.value(), deserializerClass));
return postProcess(maybeAppendKafkaOffsets(kafkaRDD, deserializerClass));
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index fea1cf5367f1..9417c5e5140f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -20,7 +20,9 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
@@ -32,21 +34,28 @@ import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.hudi.utilities.streamer.StreamContext;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.streaming.kafka010.KafkaUtils;
+import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
public abstract class KafkaSource<T> extends Source<T> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
+ private static final String COMMA_DELIMITER = ",";
// these are native kafka's config. do not change the config names.
protected static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP =
"key.deserializer";
protected static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP =
"value.deserializer";
@@ -128,6 +137,29 @@ public abstract class KafkaSource<T> extends Source<T> {
return new InputBatch<>(Option.of(newBatch),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
}
+ /**
+ * Creates a Kafka RDD with the specified key-value deserialization types.
+ *
+ * @param props Configuration properties for the Kafka source,
including topic, bootstrap servers, etc.
+ * @param sparkContext Spark context used to create the RDD.
+ * @param offsetGen Generator that provides Kafka parameters and helps
map offset ranges.
+ * @param offsetRanges Kafka offset ranges to read data from.
+ *
+ * @param <K> Type of the Kafka record key.
+ * @param <V> Type of the Kafka record value.
+ * @return JavaRDD containing Kafka ConsumerRecord entries with deserialized
key-value pairs.
+ */
+ public static <K, V> JavaRDD<ConsumerRecord<K, V>> createKafkaRDD(
+ TypedProperties props,
+ JavaSparkContext sparkContext,
+ KafkaOffsetGen offsetGen,
+ OffsetRange[] offsetRanges) {
+ Map<String, Object> kafkaParams =
+ filterKafkaParameters(offsetGen.getKafkaParams(),
ConfigUtils.getStringWithAltKeys(props,
KafkaSourceConfig.IGNORE_PREFIX_CONFIG_LIST, true));
+ LOG.debug("Original kafka params " + offsetGen.getKafkaParams() + "\n
After filtering kafka params " + kafkaParams);
+ return KafkaUtils.createRDD(sparkContext, kafkaParams, offsetRanges,
LocationStrategies.PreferConsistent());
+ }
+
protected abstract T toBatch(OffsetRange[] offsetRanges);
@Override
@@ -137,6 +169,42 @@ public abstract class KafkaSource<T> extends Source<T> {
}
}
+ /**
+ * Utility method that removes configs with keys that match (start with) any
one of the prefixes.
+ *
+ * @param kafkaParams The incoming kafka params
+ * @param commaSeparatedPrefixes all configs with keys starting with any one
of these comma-separated prefixes will be ignored.
+ * @return a new set of kafkaParams with the configs matching the prefixes
removed.
+ **/
+ @VisibleForTesting
+ static Map<String, Object> filterKafkaParameters(Map<String, Object>
kafkaParams, String commaSeparatedPrefixes) {
+ if (commaSeparatedPrefixes == null || commaSeparatedPrefixes.isEmpty()) {
+ return kafkaParams;
+ }
+
+ String[] prefixes =
Arrays.stream(commaSeparatedPrefixes.split(COMMA_DELIMITER))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .toArray(String[]::new);
+
+ Map<String, Object> filteredInParams = new HashMap<>();
+ for (Map.Entry<String, Object> entry : kafkaParams.entrySet()) {
+ boolean beginsWithAtleastOnePrefix = false;
+ for (String prefix : prefixes) {
+ if (entry.getKey().startsWith(prefix)) {
+ beginsWithAtleastOnePrefix = true;
+ break;
+ }
+ }
+
+ if (!beginsWithAtleastOnePrefix) {
+ filteredInParams.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ return filteredInParams;
+ }
+
private boolean hasConfigException(Throwable e) {
if (e == null) {
return false;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index a0df75eb6655..b8008a2bc63f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -41,8 +41,6 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.kafka010.KafkaUtils;
-import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import java.io.Serializable;
@@ -93,11 +91,11 @@ public class ProtoKafkaSource extends
KafkaSource<JavaRDD<Message>> {
className.isPresent(),
ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key() +
" config must be present.");
ProtoDeserializer deserializer = new ProtoDeserializer(className.get());
- return KafkaUtils.<String, byte[]>createRDD(sparkContext,
offsetGen.getKafkaParams(), offsetRanges,
- LocationStrategies.PreferConsistent()).map(obj ->
deserializer.parse(obj.value()));
+ JavaRDD<ConsumerRecord<String, byte[]>> kafkaRDD =
createKafkaRDD(this.props, sparkContext, offsetGen, offsetRanges);
+ return kafkaRDD.map(obj -> deserializer.parse(obj.value()));
} else {
- return KafkaUtils.<String, Message>createRDD(sparkContext,
offsetGen.getKafkaParams(), offsetRanges,
- LocationStrategies.PreferConsistent()).map(ConsumerRecord::value);
+ JavaRDD<ConsumerRecord<String, Message>> kafkaRDD =
createKafkaRDD(this.props, sparkContext, offsetGen, offsetRanges);
+ return kafkaRDD.map(ConsumerRecord::value);
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
new file mode 100644
index 000000000000..8265feb900fc
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestKafkaSource {
+
+ @Test
+ public void testFilterKafkaParameters() {
+ Map<String, Object> kafkaParams = new HashMap<>();
+ kafkaParams.put("custom1.config.streamer", "offer");
+ kafkaParams.put("boostrap.servers", "dns:port");
+ kafkaParams.put("custom2.config.capture", "s3://folder1");
+ kafkaParams.put("custom1config.sourceprofile.refresh.mode", "ENABLED");
+
+ // Case 1: No prefixes are configured.
+ assertEquals(kafkaParams, KafkaSource.filterKafkaParameters(kafkaParams,
""));
+ // Case 2: Only prefix matching is done, sub strings within the config key
are ignored and config is passed down to kafka consumer.
+ assertEquals(kafkaParams, KafkaSource.filterKafkaParameters(kafkaParams,
"config.,port"));
+ // Case 3: There are no configs with the given prefixes.
+ assertEquals(kafkaParams, KafkaSource.filterKafkaParameters(kafkaParams,
"custom3,custom4"));
+ // Case 4: Ensure only the appropriate configs are filtered out.
+ Map<String, Object> filteredParams =
KafkaSource.filterKafkaParameters(kafkaParams, "custom1,custom2");
+ Map<String, Object> expectedParams = new HashMap<>();
+ expectedParams.put("boostrap.servers", "dns:port");
+ assertEquals(expectedParams, filteredParams);
+ }
+}