This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 49ac549cf5ef [HUDI-9633] Implement ability to filter configs based on 
custom prefixes before passing down to the kafka consumer (#13604)
49ac549cf5ef is described below

commit 49ac549cf5ef7dd635d069dd44100ab98e6f19d1
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Sat Jul 26 07:45:33 2025 -0700

    [HUDI-9633] Implement ability to filter configs based on custom prefixes 
before passing down to the kafka consumer (#13604)
    
    
    ---------
    
    Co-authored-by: rmahindra123 <[email protected]>
---
 .../hudi/utilities/config/KafkaSourceConfig.java   |  7 +++
 .../hudi/utilities/sources/AvroKafkaSource.java    | 13 ++---
 .../hudi/utilities/sources/JsonKafkaSource.java    |  7 +--
 .../apache/hudi/utilities/sources/KafkaSource.java | 68 ++++++++++++++++++++++
 .../hudi/utilities/sources/ProtoKafkaSource.java   | 10 ++--
 .../hudi/utilities/sources/TestKafkaSource.java    | 50 ++++++++++++++++
 6 files changed, 135 insertions(+), 20 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
index e9606442ecd6..e0e06549ac02 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
@@ -177,6 +177,13 @@ public class KafkaSourceConfig extends HoodieConfig {
           + "OffsetOutOfRange exception, as it is possible that the earliest 
Kafka offsets may "
           + "expire soon while the job is progressing.");
 
+  public static final ConfigProperty<String> IGNORE_PREFIX_CONFIG_LIST = 
ConfigProperty
+      .key(PREFIX + "ignore.prefix.config.list")
+      .defaultValue("")
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Comma-separated list of prefixes for config keys 
that should be dropped from the configs when passed to the Kafka consumer.");
+
   /**
    * Kafka reset offset strategies.
    */
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 7e17b0ff0b2e..210627153c5e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -39,8 +39,6 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.kafka010.KafkaUtils;
-import org.apache.spark.streaming.kafka010.LocationStrategies;
 import org.apache.spark.streaming.kafka010.OffsetRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,14 +114,13 @@ public class AvroKafkaSource extends 
KafkaSource<JavaRDD<GenericRecord>> {
 
       //Don't want kafka offsets here so we use originalSchemaProvider
       AvroConvertor convertor = new 
AvroConvertor(originalSchemaProvider.getSourceSchema());
-      kafkaRDD = KafkaUtils.<String, byte[]>createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
-          LocationStrategies.PreferConsistent()).filter(obj -> obj.value() != 
null).map(obj ->
-          new ConsumerRecord<>(obj.topic(), obj.partition(), obj.offset(),
-              obj.key(), convertor.fromAvroBinary(obj.value())));
+      JavaRDD<ConsumerRecord<String, byte[]>> kafkaRDDByteArray = 
createKafkaRDD(this.props, sparkContext, offsetGen, offsetRanges);
+      kafkaRDD = kafkaRDDByteArray.filter(obj -> obj.value() != null)
+          .map(obj -> new ConsumerRecord<>(obj.topic(), obj.partition(), 
obj.offset(), obj.key(), convertor.fromAvroBinary(obj.value())));
     } else {
-      kafkaRDD = KafkaUtils.createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
-          LocationStrategies.PreferConsistent());
+      kafkaRDD = createKafkaRDD(this.props, sparkContext, offsetGen, 
offsetRanges);
     }
+
     return maybeAppendKafkaOffsets(kafkaRDD.filter(consemerRec -> 
consemerRec.value() != null));
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 7c47691ac116..266020ba7761 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -45,8 +45,6 @@ import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.kafka010.KafkaUtils;
-import org.apache.spark.streaming.kafka010.LocationStrategies;
 import org.apache.spark.streaming.kafka010.OffsetRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,10 +92,7 @@ public class JsonKafkaSource extends 
KafkaSource<JavaRDD<String>> {
   @Override
   protected JavaRDD<String> toBatch(OffsetRange[] offsetRanges) {
     String deserializerClass = 
props.getString(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP);
-    JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD = 
KafkaUtils.createRDD(sparkContext,
-            offsetGen.getKafkaParams(),
-            offsetRanges,
-            LocationStrategies.PreferConsistent())
+    JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD = 
createKafkaRDD(this.props, sparkContext, offsetGen, offsetRanges)
         .filter(x -> filterForNullValues(x.value(), deserializerClass));
     return postProcess(maybeAppendKafkaOffsets(kafkaRDD, deserializerClass));
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index fea1cf5367f1..9417c5e5140f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -20,7 +20,9 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
@@ -32,21 +34,28 @@ import org.apache.hudi.utilities.streamer.SourceProfile;
 import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
 import org.apache.hudi.utilities.streamer.StreamContext;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.streaming.kafka010.KafkaUtils;
+import org.apache.spark.streaming.kafka010.LocationStrategies;
 import org.apache.spark.streaming.kafka010.OffsetRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
 
 public abstract class KafkaSource<T> extends Source<T> {
   private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
+  private static final String COMMA_DELIMITER = ",";
   // these are native kafka's config. do not change the config names.
   protected static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = 
"key.deserializer";
   protected static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = 
"value.deserializer";
@@ -128,6 +137,29 @@ public abstract class KafkaSource<T> extends Source<T> {
     return new InputBatch<>(Option.of(newBatch), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
   }
 
+  /**
+   * Creates a Kafka RDD with the specified key-value deserialization types.
+   *
+   * @param props        Configuration properties for the Kafka source, 
including topic, bootstrap servers, etc.
+   * @param sparkContext Spark context used to create the RDD.
+   * @param offsetGen    Generator that provides Kafka parameters and helps 
map offset ranges.
+   * @param offsetRanges Kafka offset ranges to read data from.
+   *
+   * @param <K>          Type of the Kafka record key.
+   * @param <V>          Type of the Kafka record value.
+   * @return JavaRDD containing Kafka ConsumerRecord entries with deserialized 
key-value pairs.
+   */
+  public static <K, V> JavaRDD<ConsumerRecord<K, V>> createKafkaRDD(
+      TypedProperties props,
+      JavaSparkContext sparkContext,
+      KafkaOffsetGen offsetGen,
+      OffsetRange[] offsetRanges) {
+    Map<String, Object> kafkaParams =
+        filterKafkaParameters(offsetGen.getKafkaParams(), 
ConfigUtils.getStringWithAltKeys(props, 
KafkaSourceConfig.IGNORE_PREFIX_CONFIG_LIST, true));
+    LOG.debug("Original kafka params " + offsetGen.getKafkaParams() + "\n 
After filtering kafka params " + kafkaParams);
+    return KafkaUtils.createRDD(sparkContext, kafkaParams, offsetRanges, 
LocationStrategies.PreferConsistent());
+  }
+
   protected abstract T toBatch(OffsetRange[] offsetRanges);
 
   @Override
@@ -137,6 +169,42 @@ public abstract class KafkaSource<T> extends Source<T> {
     }
   }
 
+  /**
+  * Utility method that removes configs with keys that match (start with) any 
one of the prefixes.
+  *
+  * @param kafkaParams The incoming kafka params
+  * @param commaSeparatedPrefixes all configs with keys starting with any one 
of these comma-separated prefixes will be ignored.
+  * @return a new set of kafkaParams with the configs matching the prefixes 
removed.
+  **/
+  @VisibleForTesting
+  static Map<String, Object> filterKafkaParameters(Map<String, Object> 
kafkaParams, String commaSeparatedPrefixes) {
+    if (commaSeparatedPrefixes == null || commaSeparatedPrefixes.isEmpty()) {
+      return kafkaParams;
+    }
+
+    String[] prefixes = 
Arrays.stream(commaSeparatedPrefixes.split(COMMA_DELIMITER))
+        .map(String::trim)
+        .filter(s -> !s.isEmpty())
+        .toArray(String[]::new);
+
+    Map<String, Object> filteredInParams = new HashMap<>();
+    for (Map.Entry<String, Object> entry : kafkaParams.entrySet()) {
+      boolean beginsWithAtleastOnePrefix = false;
+      for (String prefix : prefixes) {
+        if (entry.getKey().startsWith(prefix)) {
+          beginsWithAtleastOnePrefix = true;
+          break;
+        }
+      }
+
+      if (!beginsWithAtleastOnePrefix) {
+        filteredInParams.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    return filteredInParams;
+  }
+
   private boolean hasConfigException(Throwable e) {
     if (e == null) {
       return false;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index a0df75eb6655..b8008a2bc63f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -41,8 +41,6 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.kafka010.KafkaUtils;
-import org.apache.spark.streaming.kafka010.LocationStrategies;
 import org.apache.spark.streaming.kafka010.OffsetRange;
 
 import java.io.Serializable;
@@ -93,11 +91,11 @@ public class ProtoKafkaSource extends 
KafkaSource<JavaRDD<Message>> {
           className.isPresent(),
           ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key() + 
" config must be present.");
       ProtoDeserializer deserializer = new ProtoDeserializer(className.get());
-      return KafkaUtils.<String, byte[]>createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
-          LocationStrategies.PreferConsistent()).map(obj -> 
deserializer.parse(obj.value()));
+      JavaRDD<ConsumerRecord<String, byte[]>> kafkaRDD = 
createKafkaRDD(this.props, sparkContext, offsetGen, offsetRanges);
+      return kafkaRDD.map(obj -> deserializer.parse(obj.value()));
     } else {
-      return KafkaUtils.<String, Message>createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
-          LocationStrategies.PreferConsistent()).map(ConsumerRecord::value);
+      JavaRDD<ConsumerRecord<String, Message>> kafkaRDD = 
createKafkaRDD(this.props, sparkContext, offsetGen, offsetRanges);
+      return kafkaRDD.map(ConsumerRecord::value);
     }
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
new file mode 100644
index 000000000000..8265feb900fc
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestKafkaSource {
+
+  @Test
+  public void testFilterKafkaParameters() {
+    Map<String, Object> kafkaParams = new HashMap<>();
+    kafkaParams.put("custom1.config.streamer", "offer");
+    kafkaParams.put("boostrap.servers", "dns:port");
+    kafkaParams.put("custom2.config.capture", "s3://folder1");
+    kafkaParams.put("custom1config.sourceprofile.refresh.mode", "ENABLED");
+
+    // Case 1: No prefixes are configured.
+    assertEquals(kafkaParams, KafkaSource.filterKafkaParameters(kafkaParams, 
""));
+    // Case 2: Only prefix matching is done, sub strings within the config key 
are ignored and config is passed down to kafka consumer.
+    assertEquals(kafkaParams, KafkaSource.filterKafkaParameters(kafkaParams, 
"config.,port"));
+    // Case 3: There are no configs with the given prefixes.
+    assertEquals(kafkaParams, KafkaSource.filterKafkaParameters(kafkaParams, 
"custom3,custom4"));
+    // Case 4: Ensure only the appropriate configs are filtered out.
+    Map<String, Object> filteredParams = 
KafkaSource.filterKafkaParameters(kafkaParams, "custom1,custom2");
+    Map<String, Object> expectedParams = new HashMap<>();
+    expectedParams.put("boostrap.servers", "dns:port");
+    assertEquals(expectedParams, filteredParams);
+  }
+}

Reply via email to