This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 8f47bd12e25 [cherry-pick][branch-2.10] KCA: picking fixes from master 
(#19788)
8f47bd12e25 is described below

commit 8f47bd12e254ed943650720a362dab429b8255bd
Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com>
AuthorDate: Tue Mar 14 00:20:33 2023 -0700

    [cherry-pick][branch-2.10] KCA: picking fixes from master (#19788)
---
 .../kafka/connect/AbstractKafkaConnectSource.java  | 48 ++++++++++++++++---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  | 22 ++++++++-
 .../kafka/connect/PulsarKafkaSinkTaskContext.java  | 14 ++++--
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 44 ++++++++++++++++++
 .../io/kafka/connect/KafkaConnectSourceTest.java   | 54 ++++++++++++++--------
 5 files changed, 152 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 8c0b2e3cc93..2364e1d62a6 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.io.kafka.connect;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -30,7 +32,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.source.SourceTaskContext;
@@ -56,6 +60,7 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
 
     // kafka connect related variables
     private SourceTaskContext sourceTaskContext;
+    private SourceConnector connector;
     @Getter
     private SourceTask sourceTask;
     public Converter keyConverter;
@@ -72,6 +77,8 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
     // number of outstandingRecords that have been polled but not been acked
     private final AtomicInteger outstandingRecords = new AtomicInteger(0);
 
+    public static final String CONNECTOR_CLASS = "kafkaConnectorSourceClass";
+
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
         Map<String, String> stringConfig = new HashMap<>();
@@ -81,12 +88,6 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
             }
         });
 
-        // get the source class name from config and create source task from 
reflection
-        sourceTask = 
Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG))
-                .asSubclass(SourceTask.class)
-                .getDeclaredConstructor()
-                .newInstance();
-
         topicNamespace = 
stringConfig.get(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG);
 
         // initialize the key and value converter
@@ -130,8 +131,36 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
 
         sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader, 
pulsarKafkaWorkerConfig);
 
+        final Map<String, String> taskConfig;
+        if (config.get(CONNECTOR_CLASS) != null) {
+            String kafkaConnectorFQClassName = 
config.get(CONNECTOR_CLASS).toString();
+            Class<?> clazz = Class.forName(kafkaConnectorFQClassName);
+            connector = (SourceConnector) clazz.getConstructor().newInstance();
+
+            Class<? extends Task> taskClass = connector.taskClass();
+            sourceTask = (SourceTask) taskClass.getConstructor().newInstance();
+
+            connector.initialize(new PulsarKafkaSinkContext());
+            connector.start(stringConfig);
+
+            List<Map<String, String>> configs = connector.taskConfigs(1);
+            checkNotNull(configs);
+            checkArgument(configs.size() == 1);
+            taskConfig = configs.get(0);
+        } else {
+            // for backward compatibility with old configuration
+            // that use the task directly
+
+            // get the source class name from config and create source task 
from reflection
+            sourceTask = 
Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG))
+                    .asSubclass(SourceTask.class)
+                    .getDeclaredConstructor()
+                    .newInstance();
+            taskConfig = stringConfig;
+        }
+
         sourceTask.initialize(sourceTaskContext);
-        sourceTask.start(stringConfig);
+        sourceTask.start(taskConfig);
     }
 
     @Override
@@ -179,6 +208,11 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
             sourceTask = null;
         }
 
+        if (connector != null) {
+            connector.stop();
+            connector = null;
+        }
+
         if (offsetStore != null) {
             offsetStore.stop();
             offsetStore = null;
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 31f7cbf6399..a0c9acc6e2a 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -91,6 +91,12 @@ public class KafkaConnectSink implements Sink<GenericObject> 
{
             CacheBuilder.newBuilder().maximumSize(1000)
                     .expireAfterAccess(30, TimeUnit.MINUTES).build();
 
+    // Can't really safely expire these entries.  If we do, we could end up 
with
+    // a sanitized topic name that used in e.g. resume() after a long pause 
but can't be
+    // // re-resolved into a form usable for Pulsar.
+    private final Cache<String, String> desanitizedTopicCache =
+            CacheBuilder.newBuilder().build();
+
     @Override
     public void write(Record<GenericObject> sourceRecord) {
         if (log.isDebugEnabled()) {
@@ -172,7 +178,18 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
         });
         task = (SinkTask) taskClass.getConstructor().newInstance();
         taskContext =
-                new PulsarKafkaSinkTaskContext(configs.get(0), ctx, 
task::open);
+                new PulsarKafkaSinkTaskContext(configs.get(0), ctx, 
task::open, kafkaName -> {
+                    if (sanitizeTopicName) {
+                        String pulsarTopicName = 
desanitizedTopicCache.getIfPresent(kafkaName);
+                        if (log.isDebugEnabled()) {
+                            log.debug("desanitizedTopicCache got: kafkaName: 
{}, pulsarTopicName: {}",
+                                    kafkaName, pulsarTopicName);
+                        }
+                        return pulsarTopicName != null ? pulsarTopicName : 
kafkaName;
+                    } else {
+                        return kafkaName;
+                    }
+                });
         task.initialize(taskContext);
         task.start(configs.get(0));
 
@@ -329,6 +346,9 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
                 if (sanitizedName.matches("^[^a-zA-Z_].*")) {
                     sanitizedName = "_" + sanitizedName;
                 }
+                // do this once, sanitize() can be called on already sanitized 
name
+                // so avoid replacing with (sanitizedName -> sanitizedName).
+                desanitizedTopicCache.get(sanitizedName, () -> name);
                 return sanitizedName;
             });
         } catch (ExecutionException e) {
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
index 8c4639a2373..c95af0363a6 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -50,6 +51,7 @@ public class PulsarKafkaSinkTaskContext implements 
SinkTaskContext {
     private final SinkContext ctx;
 
     private final OffsetBackingStore offsetStore;
+    private Function<String, String> desanitizeTopicName;
     private final String topicNamespace;
     private final Consumer<Collection<TopicPartition>> onPartitionChange;
     private final AtomicBoolean runRepartition = new AtomicBoolean(false);
@@ -58,11 +60,13 @@ public class PulsarKafkaSinkTaskContext implements 
SinkTaskContext {
 
     public PulsarKafkaSinkTaskContext(Map<String, String> config,
                                       SinkContext ctx,
-                                      Consumer<Collection<TopicPartition>> 
onPartitionChange) {
+                                      Consumer<Collection<TopicPartition>> 
onPartitionChange,
+                                      Function<String, String> 
desanitizeTopicName) {
         this.config = config;
         this.ctx = ctx;
 
         offsetStore = new PulsarOffsetBackingStore(ctx.getPulsarClient());
+        this.desanitizeTopicName = desanitizeTopicName;
         PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new 
PulsarKafkaWorkerConfig(config);
         offsetStore.configure(pulsarKafkaWorkerConfig);
         offsetStore.start();
@@ -145,7 +149,9 @@ public class PulsarKafkaSinkTaskContext implements 
SinkTaskContext {
 
     private void seekAndUpdateOffset(TopicPartition topicPartition, long 
offset) {
         try {
-            ctx.seek(topicPartition.topic(), topicPartition.partition(), 
MessageIdUtils.getMessageId(offset));
+            ctx.seek(desanitizeTopicName.apply(topicPartition.topic()),
+                    topicPartition.partition(),
+                    MessageIdUtils.getMessageId(offset));
         } catch (PulsarClientException e) {
             log.error("Failed to seek topic {} partition {} offset {}",
                     topicPartition.topic(), topicPartition.partition(), 
offset, e);
@@ -203,7 +209,7 @@ public class PulsarKafkaSinkTaskContext implements 
SinkTaskContext {
     public void pause(TopicPartition... topicPartitions) {
         for (TopicPartition tp: topicPartitions) {
             try {
-                ctx.pause(tp.topic(), tp.partition());
+                ctx.pause(desanitizeTopicName.apply(tp.topic()), 
tp.partition());
             } catch (PulsarClientException e) {
                 log.error("Failed to pause topic {} partition {}", tp.topic(), 
tp.partition(), e);
                 throw new RuntimeException("Failed to pause topic " + 
tp.topic() + " partition " + tp.partition(), e);
@@ -215,7 +221,7 @@ public class PulsarKafkaSinkTaskContext implements 
SinkTaskContext {
     public void resume(TopicPartition... topicPartitions) {
         for (TopicPartition tp: topicPartitions) {
             try {
-                ctx.resume(tp.topic(), tp.partition());
+                ctx.resume(desanitizeTopicName.apply(tp.topic()), 
tp.partition());
             } catch (PulsarClientException e) {
                 log.error("Failed to resume topic {} partition {}", 
tp.topic(), tp.partition(), e);
                 throw new RuntimeException("Failed to resume topic " + 
tp.topic() + " partition " + tp.partition(), e);
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index d07b0f3979d..2cc5260f957 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -268,6 +268,50 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
         sink.close();
     }
 
+    @Test
+    public void seekPauseResumeWithSanitizeTest() throws Exception {
+        KafkaConnectSink sink = new KafkaConnectSink();
+        props.put("sanitizeTopicName", "true");
+        sink.open(props, context);
+
+        String pulsarTopicName = "persistent://a-b/c-d/fake-topic.a";
+
+        final GenericRecord rec = getGenericRecord("value", Schema.STRING);
+        Message msg = mock(MessageImpl.class);
+        when(msg.getValue()).thenReturn(rec);
+        final MessageId msgId = new MessageIdImpl(10, 10, 0);
+        when(msg.getMessageId()).thenReturn(msgId);
+
+        final AtomicInteger status = new AtomicInteger(0);
+        Record<GenericObject> record = PulsarRecord.<String>builder()
+                .topicName(pulsarTopicName)
+                .message(msg)
+                .ackFunction(status::incrementAndGet)
+                .failFunction(status::decrementAndGet)
+                .schema(Schema.STRING)
+                .build();
+
+        sink.write(record);
+        sink.flush();
+
+        assertEquals(status.get(), 1);
+
+        final TopicPartition tp = new 
TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0);
+        assertNotEquals(MessageIdUtils.getOffset(msgId), 0);
+        assertEquals(sink.currentOffset(tp.topic(), tp.partition()), 
MessageIdUtils.getOffset(msgId));
+
+        sink.taskContext.offset(tp, 0);
+        verify(context, times(1)).seek(pulsarTopicName,
+                tp.partition(), MessageIdUtils.getMessageId(0));
+        assertEquals(sink.currentOffset(tp.topic(), tp.partition()), 0);
+
+        sink.taskContext.pause(tp);
+        verify(context, times(1)).pause(pulsarTopicName, tp.partition());
+        sink.taskContext.resume(tp);
+        verify(context, times(1)).resume(pulsarTopicName, tp.partition());
+
+        sink.close();
+    }
 
     @Test
     public void subscriptionTypeTest() throws Exception {
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index 26ecf21aa0b..39c7b0b7408 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -23,7 +23,6 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-
 import java.io.File;
 import java.io.OutputStream;
 import java.nio.file.Files;
@@ -47,7 +46,6 @@ import org.testng.annotations.Test;
 @Slf4j
 public class KafkaConnectSourceTest extends ProducerConsumerBase  {
 
-    private Map<String, Object> config = new HashMap<>();
     private String offsetTopicName;
     // The topic to publish data to, for kafkaSource
     private String topicName;
@@ -62,18 +60,10 @@ public class KafkaConnectSourceTest extends 
ProducerConsumerBase  {
         super.internalSetup();
         super.producerBaseSetup();
 
-        config.put(TaskConfig.TASK_CLASS_CONFIG, 
"org.apache.kafka.connect.file.FileStreamSourceTask");
-        config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
-        config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
-
         this.offsetTopicName = 
"persistent://my-property/my-ns/kafka-connect-source-offset";
-        config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
offsetTopicName);
-
         this.topicName = "persistent://my-property/my-ns/kafka-connect-source";
-        config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName);
         tempFile = File.createTempFile("some-file-name", null);
-        config.put(FileStreamSourceConnector.FILE_CONFIG, 
tempFile.getAbsoluteFile().toString());
-        config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
+        tempFile.deleteOnExit();
 
         this.context = mock(SourceContext.class);
         this.client = PulsarClient.builder()
@@ -91,16 +81,44 @@ public class KafkaConnectSourceTest extends 
ProducerConsumerBase  {
         tempFile.delete();
         super.internalCleanup();
     }
-    protected void completedFlush(Throwable error, Void result) {
-        if (error != null) {
-            log.error("Failed to flush {} offsets to storage: ", this, error);
-        } else {
-            log.info("Finished flushing {} offsets to storage", this);
-        }
+
+    @Test
+    public void testOpenAndReadConnectorConfig() throws Exception {
+        Map<String, Object> config = getConfig();
+        config.put(AbstractKafkaConnectSource.CONNECTOR_CLASS,
+                "org.apache.kafka.connect.file.FileStreamSourceConnector");
+
+        testOpenAndReadTask(config);
     }
 
     @Test
-    public void testOpenAndRead() throws Exception {
+    public void testOpenAndReadTaskDirect() throws Exception {
+        Map<String, Object> config = getConfig();
+
+        config.put(TaskConfig.TASK_CLASS_CONFIG,
+                "org.apache.kafka.connect.file.FileStreamSourceTask");
+
+        testOpenAndReadTask(config);
+    }
+
+    private Map<String, Object> getConfig() {
+        Map<String, Object> config = new HashMap<>();
+
+        config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+                "org.apache.kafka.connect.storage.StringConverter");
+        config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
+                "org.apache.kafka.connect.storage.StringConverter");
+
+        config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
offsetTopicName);
+
+        config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName);
+        config.put(FileStreamSourceConnector.FILE_CONFIG, 
tempFile.getAbsoluteFile().toString());
+        config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG,
+                
String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
+        return config;
+    }
+
+    private void testOpenAndReadTask(Map<String, Object> config) throws 
Exception {
         kafkaConnectSource = new KafkaConnectSource();
         kafkaConnectSource.open(config, context);
 

Reply via email to