This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0a53036c1f4 [improve][io] support kafka connect transforms and 
predicates (#24221)
0a53036c1f4 is described below

commit 0a53036c1f4f3fc5272d797b6294bd2da4ce3bd1
Author: Enrique Fernández <[email protected]>
AuthorDate: Wed Apr 30 09:50:50 2025 +0200

    [improve][io] support kafka connect transforms and predicates (#24221)
    
    (cherry picked from commit c0c5044e4023fc375ba719f7e87558c3708da029)
---
 pulsar-io/kafka-connect-adaptor/pom.xml            |  12 ++
 .../kafka/connect/AbstractKafkaConnectSource.java  |   2 +-
 .../io/kafka/connect/KafkaConnectSource.java       | 121 ++++++++++++++++++++-
 .../io/kafka/connect/KafkaConnectSourceTest.java   |  71 ++++++++++++
 4 files changed, 203 insertions(+), 3 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml 
b/pulsar-io/kafka-connect-adaptor/pom.xml
index 68039fb2ff2..9238b2b35e6 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -104,6 +104,18 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>connect-transforms</artifactId>
+      <version>${kafka-client.version}</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>jose4j</artifactId>
+          <groupId>org.bitbucket.b_c</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <!-- pulsar-client is only needed for MessageId conversion (for seeking), 
commons-lang3 and Netty buffer manipulation -->
     <dependency>
       <groupId>${project.groupId}</groupId>
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index a7bf32d9bc7..ad2e3d8001b 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -176,7 +176,7 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
             }
             if (currentBatch.hasNext()) {
                 AbstractKafkaSourceRecord<T> processRecord = 
processSourceRecord(currentBatch.next());
-                if (processRecord.isEmpty()) {
+                if (processRecord == null || processRecord.isEmpty()) {
                     outstandingRecords.decrementAndGet();
                     continue;
                 } else {
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index f2ee8a8e6ca..3d5d76d4230 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -21,7 +21,10 @@ package org.apache.pulsar.io.kafka.connect;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import io.confluent.connect.avro.AvroData;
+import java.util.ArrayList;
 import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -31,6 +34,8 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -51,6 +56,15 @@ public class KafkaConnectSource extends 
AbstractKafkaConnectSource<KeyValue<byte
     private boolean jsonWithEnvelope = false;
     private static final String JSON_WITH_ENVELOPE_CONFIG = 
"json-with-envelope";
 
+    private Map<String, Predicate<SourceRecord>> predicates = new HashMap<>();
+
+    private record PredicatedTransform(
+            Predicate<SourceRecord> predicate,
+            Transformation<SourceRecord> transform,
+            boolean negated
+    ) {}
+    private List<PredicatedTransform> transformations = new ArrayList<>();
+
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
         if (config.get(JSON_WITH_ENVELOPE_CONFIG) != null) {
             jsonWithEnvelope = 
Boolean.parseBoolean(config.get(JSON_WITH_ENVELOPE_CONFIG).toString());
@@ -60,17 +74,120 @@ public class KafkaConnectSource extends 
AbstractKafkaConnectSource<KeyValue<byte
         }
         log.info("jsonWithEnvelope: {}", jsonWithEnvelope);
 
+        initPredicates(config);
+        initTransforms(config);
         super.open(config, sourceContext);
     }
 
+    private void initPredicates(Map<String, Object> config) {
+        Object predicatesListObj = config.get("predicates");
+        if (predicatesListObj != null) {
+            String predicatesList = predicatesListObj.toString();
+            for (String predicateName : predicatesList.split(",")) {
+                predicateName = predicateName.trim();
+                String prefix = "predicates." + predicateName + ".";
+                String typeKey = prefix + "type";
+                Object classNameObj = config.get(typeKey);
+                if (classNameObj == null) {
+                    continue;
+                }
+                String className = classNameObj.toString();
+                try {
+                    @SuppressWarnings("unchecked")
+                    Class<Predicate<SourceRecord>> clazz =
+                        (Class<Predicate<SourceRecord>>) 
Class.forName(className);
+                    Predicate<SourceRecord> predicate = 
clazz.getDeclaredConstructor().newInstance();
+                    java.util.Map<String, Object> predicateConfig = 
config.entrySet().stream()
+                        .filter(e -> e.getKey().startsWith(prefix))
+                        .collect(java.util.stream.Collectors.toMap(
+                            e -> e.getKey().substring(prefix.length()),
+                            java.util.Map.Entry::getValue
+                        ));
+                    log.info("predicate config: {}", predicateConfig);
+                    predicate.configure(predicateConfig);
+                    predicates.put(predicateName, predicate);
+                } catch (Exception e) {
+                    throw new RuntimeException("Failed to instantiate 
predicate: " + className, e);
+                }
+            }
+        }
+    }
+
+    private void initTransforms(Map<String, Object> config) {
+        transformations.clear();
+        Object transformsListObj = config.get("transforms");
+        if (transformsListObj != null) {
+            String transformsList = transformsListObj.toString();
+            for (String transformName : transformsList.split(",")) {
+                transformName = transformName.trim();
+                String prefix = "transforms." + transformName + ".";
+                String typeKey = prefix + "type";
+                Object classNameObj = config.get(typeKey);
+                if (classNameObj == null) {
+                    continue;
+                }
+                String className = classNameObj.toString();
+                try {
+                    @SuppressWarnings("unchecked")
+                    Class<Transformation<SourceRecord>> clazz =
+                        (Class<Transformation<SourceRecord>>) 
Class.forName(className);
+                    Transformation<SourceRecord> transform = 
clazz.getDeclaredConstructor().newInstance();
+                    java.util.Map<String, Object> transformConfig = 
config.entrySet().stream()
+                        .filter(e -> e.getKey().startsWith(prefix))
+                        .collect(java.util.stream.Collectors.toMap(
+                            e -> e.getKey().substring(prefix.length()),
+                            java.util.Map.Entry::getValue
+                        ));
+                    log.info("transform config: {}", transformConfig);
+                    String predicateName = (String) 
transformConfig.get("predicate");
+                    boolean negated = Boolean.parseBoolean(
+                        String.valueOf(transformConfig.getOrDefault("negate", 
"false")));
+                    Predicate<SourceRecord> predicate = null;
+                    if (predicateName != null) {
+                        predicate = predicates.get(predicateName);
+                        if (predicate == null) {
+                            log.warn("Transform {} references non-existent 
predicate: {}",
+                                    transformName, predicateName);
+                        }
+                    }
+                    transform.configure(transformConfig);
+                    transformations.add(new PredicatedTransform(predicate, 
transform, negated));
+                } catch (Exception e) {
+                    throw new RuntimeException("Failed to instantiate SMT: " + 
className, e);
+                }
+            }
+        }
+    }
+
+    private static final AvroData avroData = new AvroData(1000);
 
     public synchronized KafkaSourceRecord processSourceRecord(final 
SourceRecord srcRecord) {
-        KafkaSourceRecord record = new KafkaSourceRecord(srcRecord);
+        SourceRecord transformedRecord = applyTransforms(srcRecord);
+
         offsetWriter.offset(srcRecord.sourcePartition(), 
srcRecord.sourceOffset());
+        if (transformedRecord == null) {
+            return null;
+        }
+
+        KafkaSourceRecord record = new KafkaSourceRecord(transformedRecord);
         return record;
     }
 
-    private static final AvroData avroData = new AvroData(1000);
+    public SourceRecord applyTransforms(SourceRecord record) {
+        SourceRecord current = record;
+        for (PredicatedTransform pt : transformations) {
+            if (current == null) {
+                break;
+            }
+
+            if (pt.predicate != null && !(pt.negated != 
pt.predicate.test(current))) {
+                continue;
+            }
+
+            current = pt.transform.apply(current);
+        }
+        return current;
+    }
 
     public class KafkaSourceRecord extends 
AbstractKafkaSourceRecord<KeyValue<byte[], byte[]>>
             implements KVRecord<byte[], byte[]> {
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index 8852ba02b04..f00749ba7df 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -21,8 +21,10 @@ package org.apache.pulsar.io.kafka.connect;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
 import java.io.File;
 import java.io.OutputStream;
 import java.nio.file.Files;
@@ -31,6 +33,7 @@ import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.file.FileStreamSourceConnector;
 import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.schema.KeyValue;
@@ -101,6 +104,74 @@ public class KafkaConnectSourceTest extends 
ProducerConsumerBase  {
         testOpenAndReadTask(config);
     }
 
+    @Test
+    void testTransformation() throws Exception {
+        Map<String, Object> config = setupTransformConfig(false, false);
+        runTransformTest(config, true);
+    }
+
+    @Test
+    void testTransformationWithPredicate() throws Exception {
+        Map<String, Object> config = setupTransformConfig(true, false);
+        runTransformTest(config, true);
+    }
+
+    @Test
+    void testTransformationWithNegatedPredicate() throws Exception {
+        Map<String, Object> config = setupTransformConfig(true, true);
+        runTransformTest(config, false);
+    }
+
+    private Map<String, Object> setupTransformConfig(boolean withPredicate, 
boolean negated) {
+        Map<String, Object> config = getConfig();
+        config.put(TaskConfig.TASK_CLASS_CONFIG, 
"org.apache.kafka.connect.file.FileStreamSourceTask");
+
+        if (withPredicate) {
+            config.put("predicates", "TopicMatch");
+            config.put("predicates.TopicMatch.type", 
"org.apache.kafka.connect.transforms.predicates.TopicNameMatches");
+            config.put("predicates.TopicMatch.pattern", "test-topic");
+        }
+
+        config.put("transforms", "Cast");
+        config.put("transforms.Cast.type", 
"org.apache.kafka.connect.transforms.Cast$Value");
+        config.put("transforms.Cast.spec", "myField:int32");
+
+        if (withPredicate) {
+            config.put("transforms.Cast.predicate", "TopicMatch");
+            if (negated) {
+                config.put("transforms.Cast.negate", "true");
+            }
+        }
+
+        return config;
+    }
+
+    private void runTransformTest(Map<String, Object> config, boolean 
expectTransformed) throws Exception {
+        kafkaConnectSource = new KafkaConnectSource();
+        kafkaConnectSource.open(config, context);
+
+        Map<String, Object> value = new HashMap<>();
+        value.put("myField", "42");
+        SourceRecord record = new SourceRecord(
+            null, null, "test-topic", null,
+            null, null, null, value
+        );
+
+        SourceRecord transformed = kafkaConnectSource.applyTransforms(record);
+
+        @SuppressWarnings("unchecked")
+        Map<String, Object> transformedValue = (Map<String, Object>) 
transformed.value();
+        assertNotNull(transformedValue);
+
+        if (expectTransformed) {
+            assertEquals(42, 
((Number)transformedValue.get("myField")).intValue());
+            assertTrue(transformedValue.get("myField") instanceof Number);
+        } else {
+            assertEquals("42", transformedValue.get("myField"));
+            assertTrue(transformedValue.get("myField") instanceof String);
+        }
+    }
+
     private Map<String, Object> getConfig() {
         Map<String, Object> config = new HashMap<>();
 

Reply via email to