This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 49ee8a6bf45 [fix][connector] KCA connectors: fix offset mapping when 
sanitizeTopicName=true (#15950)
49ee8a6bf45 is described below

commit 49ee8a6bf4571d39adf0e942fc6bb04d9daa1290
Author: Nicolò Boschi <[email protected]>
AuthorDate: Mon Jun 6 20:49:17 2022 +0200

    [fix][connector] KCA connectors: fix offset mapping when 
sanitizeTopicName=true (#15950)
---
 .../java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java   | 6 +++---
 .../org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java    | 3 +++
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 445a74b3e3b..31f7cbf6399 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -243,7 +243,7 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
     @SuppressWarnings("rawtypes")
     protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
         final int partition = sourceRecord.getPartitionIndex().orElse(0);
-        final String topic = sourceRecord.getTopicName().orElse(topicName);
+        final String topic = 
sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), 
sanitizeTopicName);
         final Object key;
         final Object value;
         final Schema keySchema;
@@ -300,7 +300,7 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
             // keep timestampType = TimestampType.NO_TIMESTAMP_TYPE
             timestamp = sourceRecord.getMessage().get().getPublishTime();
         }
-        return new SinkRecord(sanitizeNameIfNeeded(topic, sanitizeTopicName),
+        return new SinkRecord(topic,
                 partition,
                 keySchema,
                 key,
@@ -313,7 +313,7 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
 
     @VisibleForTesting
     protected long currentOffset(String topic, int partition) {
-        return taskContext.currentOffset(topic, partition);
+        return taskContext.currentOffset(sanitizeNameIfNeeded(topic, 
sanitizeTopicName), partition);
     }
 
     // Replace all non-letter, non-digit characters with underscore.
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 1b767e22b52..23d9f1b5ce2 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -78,6 +78,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
@@ -201,6 +202,8 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
         sink.write(record);
         sink.flush();
 
+        assertTrue(sink.currentOffset("persistent://a-b/c-d/fake-topic.a", 0) 
> 0L);
+
         assertEquals(status.get(), 1);
         assertEquals(resultCaptor.getResult().topic(), 
"persistent___a_b_c_d_fake_topic_a");
 

Reply via email to