This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 49ee8a6bf45 [fix][connector] KCA connectors: fix offset mapping when
sanitizeTopicName=true (#15950)
49ee8a6bf45 is described below
commit 49ee8a6bf4571d39adf0e942fc6bb04d9daa1290
Author: Nicolò Boschi <[email protected]>
AuthorDate: Mon Jun 6 20:49:17 2022 +0200
[fix][connector] KCA connectors: fix offset mapping when
sanitizeTopicName=true (#15950)
---
.../java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java | 6 +++---
.../org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java | 3 +++
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 445a74b3e3b..31f7cbf6399 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -243,7 +243,7 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
@SuppressWarnings("rawtypes")
protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
final int partition = sourceRecord.getPartitionIndex().orElse(0);
- final String topic = sourceRecord.getTopicName().orElse(topicName);
+ final String topic =
sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName),
sanitizeTopicName);
final Object key;
final Object value;
final Schema keySchema;
@@ -300,7 +300,7 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
// keep timestampType = TimestampType.NO_TIMESTAMP_TYPE
timestamp = sourceRecord.getMessage().get().getPublishTime();
}
- return new SinkRecord(sanitizeNameIfNeeded(topic, sanitizeTopicName),
+ return new SinkRecord(topic,
partition,
keySchema,
key,
@@ -313,7 +313,7 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
@VisibleForTesting
protected long currentOffset(String topic, int partition) {
- return taskContext.currentOffset(topic, partition);
+ return taskContext.currentOffset(sanitizeNameIfNeeded(topic,
sanitizeTopicName), partition);
}
// Replace all non-letter, non-digit characters with underscore.
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 1b767e22b52..23d9f1b5ce2 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -78,6 +78,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -201,6 +202,8 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
sink.write(record);
sink.flush();
+ assertTrue(sink.currentOffset("persistent://a-b/c-d/fake-topic.a", 0)
> 0L);
+
assertEquals(status.get(), 1);
assertEquals(resultCaptor.getResult().topic(),
"persistent___a_b_c_d_fake_topic_a");