This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 0a53036c1f4 [improve][io] support kafka connect transforms and
predicates (#24221)
0a53036c1f4 is described below
commit 0a53036c1f4f3fc5272d797b6294bd2da4ce3bd1
Author: Enrique Fernández <[email protected]>
AuthorDate: Wed Apr 30 09:50:50 2025 +0200
[improve][io] support kafka connect transforms and predicates (#24221)
(cherry picked from commit c0c5044e4023fc375ba719f7e87558c3708da029)
---
pulsar-io/kafka-connect-adaptor/pom.xml | 12 ++
.../kafka/connect/AbstractKafkaConnectSource.java | 2 +-
.../io/kafka/connect/KafkaConnectSource.java | 121 ++++++++++++++++++++-
.../io/kafka/connect/KafkaConnectSourceTest.java | 71 ++++++++++++
4 files changed, 203 insertions(+), 3 deletions(-)
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml
b/pulsar-io/kafka-connect-adaptor/pom.xml
index 68039fb2ff2..9238b2b35e6 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -104,6 +104,18 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-transforms</artifactId>
+ <version>${kafka-client.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>jose4j</artifactId>
+ <groupId>org.bitbucket.b_c</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<!-- pulsar-client is only needed for MessageId conversion (for seeking),
commons-lang3 and Netty buffer manipulation -->
<dependency>
<groupId>${project.groupId}</groupId>
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index a7bf32d9bc7..ad2e3d8001b 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -176,7 +176,7 @@ public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
}
if (currentBatch.hasNext()) {
AbstractKafkaSourceRecord<T> processRecord =
processSourceRecord(currentBatch.next());
- if (processRecord.isEmpty()) {
+ if (processRecord == null || processRecord.isEmpty()) {
outstandingRecords.decrementAndGet();
continue;
} else {
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index f2ee8a8e6ca..3d5d76d4230 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -21,7 +21,10 @@ package org.apache.pulsar.io.kafka.connect;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.confluent.connect.avro.AvroData;
+import java.util.ArrayList;
import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -31,6 +34,8 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -51,6 +56,15 @@ public class KafkaConnectSource extends
AbstractKafkaConnectSource<KeyValue<byte
private boolean jsonWithEnvelope = false;
private static final String JSON_WITH_ENVELOPE_CONFIG =
"json-with-envelope";
+ private Map<String, Predicate<SourceRecord>> predicates = new HashMap<>();
+
+ private record PredicatedTransform(
+ Predicate<SourceRecord> predicate,
+ Transformation<SourceRecord> transform,
+ boolean negated
+ ) {}
+ private List<PredicatedTransform> transformations = new ArrayList<>();
+
public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
if (config.get(JSON_WITH_ENVELOPE_CONFIG) != null) {
jsonWithEnvelope =
Boolean.parseBoolean(config.get(JSON_WITH_ENVELOPE_CONFIG).toString());
@@ -60,17 +74,120 @@ public class KafkaConnectSource extends
AbstractKafkaConnectSource<KeyValue<byte
}
log.info("jsonWithEnvelope: {}", jsonWithEnvelope);
+ initPredicates(config);
+ initTransforms(config);
super.open(config, sourceContext);
}
+ private void initPredicates(Map<String, Object> config) {
+ Object predicatesListObj = config.get("predicates");
+ if (predicatesListObj != null) {
+ String predicatesList = predicatesListObj.toString();
+ for (String predicateName : predicatesList.split(",")) {
+ predicateName = predicateName.trim();
+ String prefix = "predicates." + predicateName + ".";
+ String typeKey = prefix + "type";
+ Object classNameObj = config.get(typeKey);
+ if (classNameObj == null) {
+ continue;
+ }
+ String className = classNameObj.toString();
+ try {
+ @SuppressWarnings("unchecked")
+ Class<Predicate<SourceRecord>> clazz =
+ (Class<Predicate<SourceRecord>>)
Class.forName(className);
+ Predicate<SourceRecord> predicate =
clazz.getDeclaredConstructor().newInstance();
+ java.util.Map<String, Object> predicateConfig =
config.entrySet().stream()
+ .filter(e -> e.getKey().startsWith(prefix))
+ .collect(java.util.stream.Collectors.toMap(
+ e -> e.getKey().substring(prefix.length()),
+ java.util.Map.Entry::getValue
+ ));
+ log.info("predicate config: {}", predicateConfig);
+ predicate.configure(predicateConfig);
+ predicates.put(predicateName, predicate);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to instantiate
predicate: " + className, e);
+ }
+ }
+ }
+ }
+
+ private void initTransforms(Map<String, Object> config) {
+ transformations.clear();
+ Object transformsListObj = config.get("transforms");
+ if (transformsListObj != null) {
+ String transformsList = transformsListObj.toString();
+ for (String transformName : transformsList.split(",")) {
+ transformName = transformName.trim();
+ String prefix = "transforms." + transformName + ".";
+ String typeKey = prefix + "type";
+ Object classNameObj = config.get(typeKey);
+ if (classNameObj == null) {
+ continue;
+ }
+ String className = classNameObj.toString();
+ try {
+ @SuppressWarnings("unchecked")
+ Class<Transformation<SourceRecord>> clazz =
+ (Class<Transformation<SourceRecord>>)
Class.forName(className);
+ Transformation<SourceRecord> transform =
clazz.getDeclaredConstructor().newInstance();
+ java.util.Map<String, Object> transformConfig =
config.entrySet().stream()
+ .filter(e -> e.getKey().startsWith(prefix))
+ .collect(java.util.stream.Collectors.toMap(
+ e -> e.getKey().substring(prefix.length()),
+ java.util.Map.Entry::getValue
+ ));
+ log.info("transform config: {}", transformConfig);
+ String predicateName = (String)
transformConfig.get("predicate");
+ boolean negated = Boolean.parseBoolean(
+ String.valueOf(transformConfig.getOrDefault("negate",
"false")));
+ Predicate<SourceRecord> predicate = null;
+ if (predicateName != null) {
+ predicate = predicates.get(predicateName);
+ if (predicate == null) {
+ log.warn("Transform {} references non-existent
predicate: {}",
+ transformName, predicateName);
+ }
+ }
+ transform.configure(transformConfig);
+ transformations.add(new PredicatedTransform(predicate,
transform, negated));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to instantiate SMT: " +
className, e);
+ }
+ }
+ }
+ }
+
+ private static final AvroData avroData = new AvroData(1000);
public synchronized KafkaSourceRecord processSourceRecord(final
SourceRecord srcRecord) {
- KafkaSourceRecord record = new KafkaSourceRecord(srcRecord);
+ SourceRecord transformedRecord = applyTransforms(srcRecord);
+
offsetWriter.offset(srcRecord.sourcePartition(),
srcRecord.sourceOffset());
+ if (transformedRecord == null) {
+ return null;
+ }
+
+ KafkaSourceRecord record = new KafkaSourceRecord(transformedRecord);
return record;
}
- private static final AvroData avroData = new AvroData(1000);
+ public SourceRecord applyTransforms(SourceRecord record) {
+ SourceRecord current = record;
+ for (PredicatedTransform pt : transformations) {
+ if (current == null) {
+ break;
+ }
+
+ if (pt.predicate != null && !(pt.negated !=
pt.predicate.test(current))) {
+ continue;
+ }
+
+ current = pt.transform.apply(current);
+ }
+ return current;
+ }
public class KafkaSourceRecord extends
AbstractKafkaSourceRecord<KeyValue<byte[], byte[]>>
implements KVRecord<byte[], byte[]> {
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index 8852ba02b04..f00749ba7df 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -21,8 +21,10 @@ package org.apache.pulsar.io.kafka.connect;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
import java.io.File;
import java.io.OutputStream;
import java.nio.file.Files;
@@ -31,6 +33,7 @@ import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.source.SourceRecord;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.schema.KeyValue;
@@ -101,6 +104,74 @@ public class KafkaConnectSourceTest extends
ProducerConsumerBase {
testOpenAndReadTask(config);
}
+ @Test
+ void testTransformation() throws Exception {
+ Map<String, Object> config = setupTransformConfig(false, false);
+ runTransformTest(config, true);
+ }
+
+ @Test
+ void testTransformationWithPredicate() throws Exception {
+ Map<String, Object> config = setupTransformConfig(true, false);
+ runTransformTest(config, true);
+ }
+
+ @Test
+ void testTransformationWithNegatedPredicate() throws Exception {
+ Map<String, Object> config = setupTransformConfig(true, true);
+ runTransformTest(config, false);
+ }
+
+ private Map<String, Object> setupTransformConfig(boolean withPredicate,
boolean negated) {
+ Map<String, Object> config = getConfig();
+ config.put(TaskConfig.TASK_CLASS_CONFIG,
"org.apache.kafka.connect.file.FileStreamSourceTask");
+
+ if (withPredicate) {
+ config.put("predicates", "TopicMatch");
+ config.put("predicates.TopicMatch.type",
"org.apache.kafka.connect.transforms.predicates.TopicNameMatches");
+ config.put("predicates.TopicMatch.pattern", "test-topic");
+ }
+
+ config.put("transforms", "Cast");
+ config.put("transforms.Cast.type",
"org.apache.kafka.connect.transforms.Cast$Value");
+ config.put("transforms.Cast.spec", "myField:int32");
+
+ if (withPredicate) {
+ config.put("transforms.Cast.predicate", "TopicMatch");
+ if (negated) {
+ config.put("transforms.Cast.negate", "true");
+ }
+ }
+
+ return config;
+ }
+
+ private void runTransformTest(Map<String, Object> config, boolean
expectTransformed) throws Exception {
+ kafkaConnectSource = new KafkaConnectSource();
+ kafkaConnectSource.open(config, context);
+
+ Map<String, Object> value = new HashMap<>();
+ value.put("myField", "42");
+ SourceRecord record = new SourceRecord(
+ null, null, "test-topic", null,
+ null, null, null, value
+ );
+
+ SourceRecord transformed = kafkaConnectSource.applyTransforms(record);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> transformedValue = (Map<String, Object>)
transformed.value();
+ assertNotNull(transformedValue);
+
+ if (expectTransformed) {
+ assertEquals(42,
((Number)transformedValue.get("myField")).intValue());
+ assertTrue(transformedValue.get("myField") instanceof Number);
+ } else {
+ assertEquals("42", transformedValue.get("myField"));
+ assertTrue(transformedValue.get("myField") instanceof String);
+ }
+ }
+
private Map<String, Object> getConfig() {
Map<String, Object> config = new HashMap<>();