This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d4930a31c05 [fix][io] KCA: 'desanitize' topic name for the pulsar's 
ctx calls (#19756)
d4930a31c05 is described below

commit d4930a31c052dd8fcd5982b649898967a24f8961
Author: Andrey Yegorov <[email protected]>
AuthorDate: Thu Mar 9 01:09:34 2023 -0800

    [fix][io] KCA: 'desanitize' topic name for the pulsar's ctx calls (#19756)
---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  | 22 ++++++++++-
 .../kafka/connect/PulsarKafkaSinkTaskContext.java  | 14 +++++--
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 44 ++++++++++++++++++++++
 3 files changed, 75 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 37d0987e610..efbad2ef47a 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -95,6 +95,12 @@ public class KafkaConnectSink implements Sink<GenericObject> 
{
             CacheBuilder.newBuilder().maximumSize(1000)
                     .expireAfterAccess(30, TimeUnit.MINUTES).build();
 
+    // Can't really safely expire these entries.  If we do, we could end up 
with
+    // a sanitized topic name that used in e.g. resume() after a long pause 
but can't be
+    // // re-resolved into a form usable for Pulsar.
+    private final Cache<String, String> desanitizedTopicCache =
+            CacheBuilder.newBuilder().build();
+
     private int maxBatchBitsForOffset = 12;
     private boolean useIndexAsOffset = true;
 
@@ -184,7 +190,18 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
         });
         task = (SinkTask) taskClass.getConstructor().newInstance();
         taskContext =
-                new PulsarKafkaSinkTaskContext(configs.get(0), ctx, 
task::open);
+                new PulsarKafkaSinkTaskContext(configs.get(0), ctx, 
task::open, kafkaName -> {
+                    if (sanitizeTopicName) {
+                        String pulsarTopicName = 
desanitizedTopicCache.getIfPresent(kafkaName);
+                        if (log.isDebugEnabled()) {
+                            log.debug("desanitizedTopicCache got: kafkaName: 
{}, pulsarTopicName: {}",
+                                    kafkaName, pulsarTopicName);
+                        }
+                        return pulsarTopicName != null ? pulsarTopicName : 
kafkaName;
+                    } else {
+                        return kafkaName;
+                    }
+                });
         task.initialize(taskContext);
         task.start(configs.get(0));
 
@@ -486,6 +503,9 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
                 if (sanitizedName.matches("^[^a-zA-Z_].*")) {
                     sanitizedName = "_" + sanitizedName;
                 }
+                // do this once, sanitize() can be called on already sanitized 
name
+                // so avoid replacing with (sanitizedName -> sanitizedName).
+                desanitizedTopicCache.get(sanitizedName, () -> name);
                 return sanitizedName;
             });
         } catch (ExecutionException e) {
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
index 99a8bf29082..7a908b553a8 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -49,6 +50,7 @@ public class PulsarKafkaSinkTaskContext implements 
SinkTaskContext {
     private final SinkContext ctx;
 
     private final OffsetBackingStore offsetStore;
+    private Function<String, String> desanitizeTopicName;
     private final String topicNamespace;
     private final Consumer<Collection<TopicPartition>> onPartitionChange;
     private final AtomicBoolean runRepartition = new AtomicBoolean(false);
@@ -57,11 +59,13 @@ public class PulsarKafkaSinkTaskContext implements 
SinkTaskContext {
 
     public PulsarKafkaSinkTaskContext(Map<String, String> config,
                                       SinkContext ctx,
-                                      Consumer<Collection<TopicPartition>> 
onPartitionChange) {
+                                      Consumer<Collection<TopicPartition>> 
onPartitionChange,
+                                      Function<String, String> 
desanitizeTopicName) {
         this.config = config;
         this.ctx = ctx;
 
         offsetStore = new PulsarOffsetBackingStore(ctx.getPulsarClient());
+        this.desanitizeTopicName = desanitizeTopicName;
         PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new 
PulsarKafkaWorkerConfig(config);
         offsetStore.configure(pulsarKafkaWorkerConfig);
         offsetStore.start();
@@ -144,7 +148,9 @@ public class PulsarKafkaSinkTaskContext implements 
SinkTaskContext {
 
     private void seekAndUpdateOffset(TopicPartition topicPartition, long 
offset) {
         try {
-            ctx.seek(topicPartition.topic(), topicPartition.partition(), 
MessageIdUtils.getMessageId(offset));
+            ctx.seek(desanitizeTopicName.apply(topicPartition.topic()),
+                    topicPartition.partition(),
+                    MessageIdUtils.getMessageId(offset));
         } catch (PulsarClientException e) {
             log.error("Failed to seek topic {} partition {} offset {}",
                     topicPartition.topic(), topicPartition.partition(), 
offset, e);
@@ -202,7 +208,7 @@ public class PulsarKafkaSinkTaskContext implements 
SinkTaskContext {
     public void pause(TopicPartition... topicPartitions) {
         for (TopicPartition tp: topicPartitions) {
             try {
-                ctx.pause(tp.topic(), tp.partition());
+                ctx.pause(desanitizeTopicName.apply(tp.topic()), 
tp.partition());
             } catch (PulsarClientException e) {
                 log.error("Failed to pause topic {} partition {}", tp.topic(), 
tp.partition(), e);
                 throw new RuntimeException("Failed to pause topic " + 
tp.topic() + " partition " + tp.partition(), e);
@@ -214,7 +220,7 @@ public class PulsarKafkaSinkTaskContext implements 
SinkTaskContext {
     public void resume(TopicPartition... topicPartitions) {
         for (TopicPartition tp: topicPartitions) {
             try {
-                ctx.resume(tp.topic(), tp.partition());
+                ctx.resume(desanitizeTopicName.apply(tp.topic()), 
tp.partition());
             } catch (PulsarClientException e) {
                 log.error("Failed to resume topic {} partition {}", 
tp.topic(), tp.partition(), e);
                 throw new RuntimeException("Failed to resume topic " + 
tp.topic() + " partition " + tp.partition(), e);
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index fe2def25023..567562d338b 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -314,6 +314,50 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
         sink.close();
     }
 
+    @Test
+    public void seekPauseResumeWithSanitizeTest() throws Exception {
+        KafkaConnectSink sink = new KafkaConnectSink();
+        props.put("sanitizeTopicName", "true");
+        sink.open(props, context);
+
+        String pulsarTopicName = "persistent://a-b/c-d/fake-topic.a";
+
+        final GenericRecord rec = getGenericRecord("value", Schema.STRING);
+        Message msg = mock(MessageImpl.class);
+        when(msg.getValue()).thenReturn(rec);
+        final MessageId msgId = new MessageIdImpl(10, 10, 0);
+        when(msg.getMessageId()).thenReturn(msgId);
+
+        final AtomicInteger status = new AtomicInteger(0);
+        Record<GenericObject> record = PulsarRecord.<String>builder()
+                .topicName(pulsarTopicName)
+                .message(msg)
+                .ackFunction(status::incrementAndGet)
+                .failFunction(status::decrementAndGet)
+                .schema(Schema.STRING)
+                .build();
+
+        sink.write(record);
+        sink.flush();
+
+        assertEquals(status.get(), 1);
+
+        final TopicPartition tp = new 
TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0);
+        assertNotEquals(MessageIdUtils.getOffset(msgId), 0);
+        assertEquals(sink.currentOffset(tp.topic(), tp.partition()), 
MessageIdUtils.getOffset(msgId));
+
+        sink.taskContext.offset(tp, 0);
+        verify(context, times(1)).seek(pulsarTopicName,
+                tp.partition(), MessageIdUtils.getMessageId(0));
+        assertEquals(sink.currentOffset(tp.topic(), tp.partition()), 0);
+
+        sink.taskContext.pause(tp);
+        verify(context, times(1)).pause(pulsarTopicName, tp.partition());
+        sink.taskContext.resume(tp);
+        verify(context, times(1)).resume(pulsarTopicName, tp.partition());
+
+        sink.close();
+    }
 
     @Test
     public void subscriptionTypeTest() throws Exception {

Reply via email to