This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d4930a31c05 [fix][io] KCA: 'desanitize' topic name for the pulsar's
ctx calls (#19756)
d4930a31c05 is described below
commit d4930a31c052dd8fcd5982b649898967a24f8961
Author: Andrey Yegorov <[email protected]>
AuthorDate: Thu Mar 9 01:09:34 2023 -0800
[fix][io] KCA: 'desanitize' topic name for the pulsar's ctx calls (#19756)
---
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 22 ++++++++++-
.../kafka/connect/PulsarKafkaSinkTaskContext.java | 14 +++++--
.../io/kafka/connect/KafkaConnectSinkTest.java | 44 ++++++++++++++++++++++
3 files changed, 75 insertions(+), 5 deletions(-)
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 37d0987e610..efbad2ef47a 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -95,6 +95,12 @@ public class KafkaConnectSink implements Sink<GenericObject>
{
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();
+ // Can't really safely expire these entries. If we do, we could end up
with
+ // a sanitized topic name that used in e.g. resume() after a long pause
but can't be
+ // // re-resolved into a form usable for Pulsar.
+ private final Cache<String, String> desanitizedTopicCache =
+ CacheBuilder.newBuilder().build();
+
private int maxBatchBitsForOffset = 12;
private boolean useIndexAsOffset = true;
@@ -184,7 +190,18 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
});
task = (SinkTask) taskClass.getConstructor().newInstance();
taskContext =
- new PulsarKafkaSinkTaskContext(configs.get(0), ctx,
task::open);
+ new PulsarKafkaSinkTaskContext(configs.get(0), ctx,
task::open, kafkaName -> {
+ if (sanitizeTopicName) {
+ String pulsarTopicName =
desanitizedTopicCache.getIfPresent(kafkaName);
+ if (log.isDebugEnabled()) {
+ log.debug("desanitizedTopicCache got: kafkaName:
{}, pulsarTopicName: {}",
+ kafkaName, pulsarTopicName);
+ }
+ return pulsarTopicName != null ? pulsarTopicName :
kafkaName;
+ } else {
+ return kafkaName;
+ }
+ });
task.initialize(taskContext);
task.start(configs.get(0));
@@ -486,6 +503,9 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
if (sanitizedName.matches("^[^a-zA-Z_].*")) {
sanitizedName = "_" + sanitizedName;
}
+ // do this once, sanitize() can be called on already sanitized
name
+ // so avoid replacing with (sanitizedName -> sanitizedName).
+ desanitizedTopicCache.get(sanitizedName, () -> name);
return sanitizedName;
});
} catch (ExecutionException e) {
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
index 99a8bf29082..7a908b553a8 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -49,6 +50,7 @@ public class PulsarKafkaSinkTaskContext implements
SinkTaskContext {
private final SinkContext ctx;
private final OffsetBackingStore offsetStore;
+ private Function<String, String> desanitizeTopicName;
private final String topicNamespace;
private final Consumer<Collection<TopicPartition>> onPartitionChange;
private final AtomicBoolean runRepartition = new AtomicBoolean(false);
@@ -57,11 +59,13 @@ public class PulsarKafkaSinkTaskContext implements
SinkTaskContext {
public PulsarKafkaSinkTaskContext(Map<String, String> config,
SinkContext ctx,
- Consumer<Collection<TopicPartition>>
onPartitionChange) {
+ Consumer<Collection<TopicPartition>>
onPartitionChange,
+ Function<String, String>
desanitizeTopicName) {
this.config = config;
this.ctx = ctx;
offsetStore = new PulsarOffsetBackingStore(ctx.getPulsarClient());
+ this.desanitizeTopicName = desanitizeTopicName;
PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new
PulsarKafkaWorkerConfig(config);
offsetStore.configure(pulsarKafkaWorkerConfig);
offsetStore.start();
@@ -144,7 +148,9 @@ public class PulsarKafkaSinkTaskContext implements
SinkTaskContext {
private void seekAndUpdateOffset(TopicPartition topicPartition, long
offset) {
try {
- ctx.seek(topicPartition.topic(), topicPartition.partition(),
MessageIdUtils.getMessageId(offset));
+ ctx.seek(desanitizeTopicName.apply(topicPartition.topic()),
+ topicPartition.partition(),
+ MessageIdUtils.getMessageId(offset));
} catch (PulsarClientException e) {
log.error("Failed to seek topic {} partition {} offset {}",
topicPartition.topic(), topicPartition.partition(),
offset, e);
@@ -202,7 +208,7 @@ public class PulsarKafkaSinkTaskContext implements
SinkTaskContext {
public void pause(TopicPartition... topicPartitions) {
for (TopicPartition tp: topicPartitions) {
try {
- ctx.pause(tp.topic(), tp.partition());
+ ctx.pause(desanitizeTopicName.apply(tp.topic()),
tp.partition());
} catch (PulsarClientException e) {
log.error("Failed to pause topic {} partition {}", tp.topic(),
tp.partition(), e);
throw new RuntimeException("Failed to pause topic " +
tp.topic() + " partition " + tp.partition(), e);
@@ -214,7 +220,7 @@ public class PulsarKafkaSinkTaskContext implements
SinkTaskContext {
public void resume(TopicPartition... topicPartitions) {
for (TopicPartition tp: topicPartitions) {
try {
- ctx.resume(tp.topic(), tp.partition());
+ ctx.resume(desanitizeTopicName.apply(tp.topic()),
tp.partition());
} catch (PulsarClientException e) {
log.error("Failed to resume topic {} partition {}",
tp.topic(), tp.partition(), e);
throw new RuntimeException("Failed to resume topic " +
tp.topic() + " partition " + tp.partition(), e);
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index fe2def25023..567562d338b 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -314,6 +314,50 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
sink.close();
}
+ @Test
+ public void seekPauseResumeWithSanitizeTest() throws Exception {
+ KafkaConnectSink sink = new KafkaConnectSink();
+ props.put("sanitizeTopicName", "true");
+ sink.open(props, context);
+
+ String pulsarTopicName = "persistent://a-b/c-d/fake-topic.a";
+
+ final GenericRecord rec = getGenericRecord("value", Schema.STRING);
+ Message msg = mock(MessageImpl.class);
+ when(msg.getValue()).thenReturn(rec);
+ final MessageId msgId = new MessageIdImpl(10, 10, 0);
+ when(msg.getMessageId()).thenReturn(msgId);
+
+ final AtomicInteger status = new AtomicInteger(0);
+ Record<GenericObject> record = PulsarRecord.<String>builder()
+ .topicName(pulsarTopicName)
+ .message(msg)
+ .ackFunction(status::incrementAndGet)
+ .failFunction(status::decrementAndGet)
+ .schema(Schema.STRING)
+ .build();
+
+ sink.write(record);
+ sink.flush();
+
+ assertEquals(status.get(), 1);
+
+ final TopicPartition tp = new
TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0);
+ assertNotEquals(MessageIdUtils.getOffset(msgId), 0);
+ assertEquals(sink.currentOffset(tp.topic(), tp.partition()),
MessageIdUtils.getOffset(msgId));
+
+ sink.taskContext.offset(tp, 0);
+ verify(context, times(1)).seek(pulsarTopicName,
+ tp.partition(), MessageIdUtils.getMessageId(0));
+ assertEquals(sink.currentOffset(tp.topic(), tp.partition()), 0);
+
+ sink.taskContext.pause(tp);
+ verify(context, times(1)).pause(pulsarTopicName, tp.partition());
+ sink.taskContext.resume(tp);
+ verify(context, times(1)).resume(pulsarTopicName, tp.partition());
+
+ sink.close();
+ }
@Test
public void subscriptionTypeTest() throws Exception {