This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 1655ca66249 KCA: picking fixes from master (#19787)
1655ca66249 is described below
commit 1655ca66249008de41816e3f0b6c86a20f81eaa8
Author: Andrey Yegorov <[email protected]>
AuthorDate: Sat Mar 11 21:47:09 2023 -0800
KCA: picking fixes from master (#19787)
---
.../kafka/connect/AbstractKafkaConnectSource.java | 48 ++++++++++++++++---
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 22 ++++++++-
.../kafka/connect/PulsarKafkaSinkTaskContext.java | 14 ++++--
.../io/kafka/connect/KafkaConnectSinkTest.java | 44 ++++++++++++++++++
.../io/kafka/connect/KafkaConnectSourceTest.java | 54 ++++++++++++++--------
.../coordination/impl/LeaderElectionImplTest.java | 2 +-
6 files changed, 153 insertions(+), 31 deletions(-)
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 97390a85fca..1d71d41b357 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.io.kafka.connect;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -30,7 +32,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
@@ -55,6 +59,7 @@ public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
// kafka connect related variables
private SourceTaskContext sourceTaskContext;
+ private SourceConnector connector;
@Getter
private SourceTask sourceTask;
public Converter keyConverter;
@@ -71,6 +76,8 @@ public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
// number of outstandingRecords that have been polled but not been acked
private final AtomicInteger outstandingRecords = new AtomicInteger(0);
+ public static final String CONNECTOR_CLASS = "kafkaConnectorSourceClass";
+
@Override
public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
Map<String, String> stringConfig = new HashMap<>();
@@ -80,12 +87,6 @@ public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
}
});
- // get the source class name from config and create source task from
reflection
- sourceTask =
Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG))
- .asSubclass(SourceTask.class)
- .getDeclaredConstructor()
- .newInstance();
-
topicNamespace =
stringConfig.get(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG);
// initialize the key and value converter
@@ -129,8 +130,36 @@ public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader,
pulsarKafkaWorkerConfig);
+ final Map<String, String> taskConfig;
+ if (config.get(CONNECTOR_CLASS) != null) {
+ String kafkaConnectorFQClassName =
config.get(CONNECTOR_CLASS).toString();
+ Class<?> clazz = Class.forName(kafkaConnectorFQClassName);
+ connector = (SourceConnector) clazz.getConstructor().newInstance();
+
+ Class<? extends Task> taskClass = connector.taskClass();
+ sourceTask = (SourceTask) taskClass.getConstructor().newInstance();
+
+ connector.initialize(new PulsarKafkaSinkContext());
+ connector.start(stringConfig);
+
+ List<Map<String, String>> configs = connector.taskConfigs(1);
+ checkNotNull(configs);
+ checkArgument(configs.size() == 1);
+ taskConfig = configs.get(0);
+ } else {
+ // for backward compatibility with old configuration
+ // that use the task directly
+
+ // get the source class name from config and create source task
from reflection
+ sourceTask =
Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG))
+ .asSubclass(SourceTask.class)
+ .getDeclaredConstructor()
+ .newInstance();
+ taskConfig = stringConfig;
+ }
+
sourceTask.initialize(sourceTaskContext);
- sourceTask.start(stringConfig);
+ sourceTask.start(taskConfig);
}
@Override
@@ -178,6 +207,11 @@ public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
sourceTask = null;
}
+ if (connector != null) {
+ connector.stop();
+ connector = null;
+ }
+
if (offsetStore != null) {
offsetStore.stop();
offsetStore = null;
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 719642edf13..a8cb38b9480 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -96,6 +96,12 @@ public class KafkaConnectSink implements Sink<GenericObject>
{
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();
+ // Can't really safely expire these entries. If we do, we could end up
with
+ // a sanitized topic name that used in e.g. resume() after a long pause
but can't be
+ // // re-resolved into a form usable for Pulsar.
+ private final Cache<String, String> desanitizedTopicCache =
+ CacheBuilder.newBuilder().build();
+
private int maxBatchBitsForOffset = 12;
private boolean useIndexAsOffset = true;
@@ -185,7 +191,18 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
});
task = (SinkTask) taskClass.getConstructor().newInstance();
taskContext =
- new PulsarKafkaSinkTaskContext(configs.get(0), ctx,
task::open);
+ new PulsarKafkaSinkTaskContext(configs.get(0), ctx,
task::open, kafkaName -> {
+ if (sanitizeTopicName) {
+ String pulsarTopicName =
desanitizedTopicCache.getIfPresent(kafkaName);
+ if (log.isDebugEnabled()) {
+ log.debug("desanitizedTopicCache got: kafkaName:
{}, pulsarTopicName: {}",
+ kafkaName, pulsarTopicName);
+ }
+ return pulsarTopicName != null ? pulsarTopicName :
kafkaName;
+ } else {
+ return kafkaName;
+ }
+ });
task.initialize(taskContext);
task.start(configs.get(0));
@@ -487,6 +504,9 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
if (sanitizedName.matches("^[^a-zA-Z_].*")) {
sanitizedName = "_" + sanitizedName;
}
+ // do this once, sanitize() can be called on already sanitized
name
+ // so avoid replacing with (sanitizedName -> sanitizedName).
+ desanitizedTopicCache.get(sanitizedName, () -> name);
return sanitizedName;
});
} catch (ExecutionException e) {
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
index 8c4639a2373..c95af0363a6 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -50,6 +51,7 @@ public class PulsarKafkaSinkTaskContext implements
SinkTaskContext {
private final SinkContext ctx;
private final OffsetBackingStore offsetStore;
+ private Function<String, String> desanitizeTopicName;
private final String topicNamespace;
private final Consumer<Collection<TopicPartition>> onPartitionChange;
private final AtomicBoolean runRepartition = new AtomicBoolean(false);
@@ -58,11 +60,13 @@ public class PulsarKafkaSinkTaskContext implements
SinkTaskContext {
public PulsarKafkaSinkTaskContext(Map<String, String> config,
SinkContext ctx,
- Consumer<Collection<TopicPartition>>
onPartitionChange) {
+ Consumer<Collection<TopicPartition>>
onPartitionChange,
+ Function<String, String>
desanitizeTopicName) {
this.config = config;
this.ctx = ctx;
offsetStore = new PulsarOffsetBackingStore(ctx.getPulsarClient());
+ this.desanitizeTopicName = desanitizeTopicName;
PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new
PulsarKafkaWorkerConfig(config);
offsetStore.configure(pulsarKafkaWorkerConfig);
offsetStore.start();
@@ -145,7 +149,9 @@ public class PulsarKafkaSinkTaskContext implements
SinkTaskContext {
private void seekAndUpdateOffset(TopicPartition topicPartition, long
offset) {
try {
- ctx.seek(topicPartition.topic(), topicPartition.partition(),
MessageIdUtils.getMessageId(offset));
+ ctx.seek(desanitizeTopicName.apply(topicPartition.topic()),
+ topicPartition.partition(),
+ MessageIdUtils.getMessageId(offset));
} catch (PulsarClientException e) {
log.error("Failed to seek topic {} partition {} offset {}",
topicPartition.topic(), topicPartition.partition(),
offset, e);
@@ -203,7 +209,7 @@ public class PulsarKafkaSinkTaskContext implements
SinkTaskContext {
public void pause(TopicPartition... topicPartitions) {
for (TopicPartition tp: topicPartitions) {
try {
- ctx.pause(tp.topic(), tp.partition());
+ ctx.pause(desanitizeTopicName.apply(tp.topic()),
tp.partition());
} catch (PulsarClientException e) {
log.error("Failed to pause topic {} partition {}", tp.topic(),
tp.partition(), e);
throw new RuntimeException("Failed to pause topic " +
tp.topic() + " partition " + tp.partition(), e);
@@ -215,7 +221,7 @@ public class PulsarKafkaSinkTaskContext implements
SinkTaskContext {
public void resume(TopicPartition... topicPartitions) {
for (TopicPartition tp: topicPartitions) {
try {
- ctx.resume(tp.topic(), tp.partition());
+ ctx.resume(desanitizeTopicName.apply(tp.topic()),
tp.partition());
} catch (PulsarClientException e) {
log.error("Failed to resume topic {} partition {}",
tp.topic(), tp.partition(), e);
throw new RuntimeException("Failed to resume topic " +
tp.topic() + " partition " + tp.partition(), e);
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index d3e1a40550f..6aca149dc54 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -314,6 +314,50 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
sink.close();
}
+ @Test
+ public void seekPauseResumeWithSanitizeTest() throws Exception {
+ KafkaConnectSink sink = new KafkaConnectSink();
+ props.put("sanitizeTopicName", "true");
+ sink.open(props, context);
+
+ String pulsarTopicName = "persistent://a-b/c-d/fake-topic.a";
+
+ final GenericRecord rec = getGenericRecord("value", Schema.STRING);
+ Message msg = mock(MessageImpl.class);
+ when(msg.getValue()).thenReturn(rec);
+ final MessageId msgId = new MessageIdImpl(10, 10, 0);
+ when(msg.getMessageId()).thenReturn(msgId);
+
+ final AtomicInteger status = new AtomicInteger(0);
+ Record<GenericObject> record = PulsarRecord.<String>builder()
+ .topicName(pulsarTopicName)
+ .message(msg)
+ .ackFunction(status::incrementAndGet)
+ .failFunction(status::decrementAndGet)
+ .schema(Schema.STRING)
+ .build();
+
+ sink.write(record);
+ sink.flush();
+
+ assertEquals(status.get(), 1);
+
+ final TopicPartition tp = new
TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0);
+ assertNotEquals(MessageIdUtils.getOffset(msgId), 0);
+ assertEquals(sink.currentOffset(tp.topic(), tp.partition()),
MessageIdUtils.getOffset(msgId));
+
+ sink.taskContext.offset(tp, 0);
+ verify(context, times(1)).seek(pulsarTopicName,
+ tp.partition(), MessageIdUtils.getMessageId(0));
+ assertEquals(sink.currentOffset(tp.topic(), tp.partition()), 0);
+
+ sink.taskContext.pause(tp);
+ verify(context, times(1)).pause(pulsarTopicName, tp.partition());
+ sink.taskContext.resume(tp);
+ verify(context, times(1)).resume(pulsarTopicName, tp.partition());
+
+ sink.close();
+ }
@Test
public void subscriptionTypeTest() throws Exception {
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index 26ecf21aa0b..39c7b0b7408 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -23,7 +23,6 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
-
import java.io.File;
import java.io.OutputStream;
import java.nio.file.Files;
@@ -47,7 +46,6 @@ import org.testng.annotations.Test;
@Slf4j
public class KafkaConnectSourceTest extends ProducerConsumerBase {
- private Map<String, Object> config = new HashMap<>();
private String offsetTopicName;
// The topic to publish data to, for kafkaSource
private String topicName;
@@ -62,18 +60,10 @@ public class KafkaConnectSourceTest extends
ProducerConsumerBase {
super.internalSetup();
super.producerBaseSetup();
- config.put(TaskConfig.TASK_CLASS_CONFIG,
"org.apache.kafka.connect.file.FileStreamSourceTask");
- config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.storage.StringConverter");
- config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.storage.StringConverter");
-
this.offsetTopicName =
"persistent://my-property/my-ns/kafka-connect-source-offset";
- config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
offsetTopicName);
-
this.topicName = "persistent://my-property/my-ns/kafka-connect-source";
- config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName);
tempFile = File.createTempFile("some-file-name", null);
- config.put(FileStreamSourceConnector.FILE_CONFIG,
tempFile.getAbsoluteFile().toString());
- config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG,
String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
+ tempFile.deleteOnExit();
this.context = mock(SourceContext.class);
this.client = PulsarClient.builder()
@@ -91,16 +81,44 @@ public class KafkaConnectSourceTest extends
ProducerConsumerBase {
tempFile.delete();
super.internalCleanup();
}
- protected void completedFlush(Throwable error, Void result) {
- if (error != null) {
- log.error("Failed to flush {} offsets to storage: ", this, error);
- } else {
- log.info("Finished flushing {} offsets to storage", this);
- }
+
+ @Test
+ public void testOpenAndReadConnectorConfig() throws Exception {
+ Map<String, Object> config = getConfig();
+ config.put(AbstractKafkaConnectSource.CONNECTOR_CLASS,
+ "org.apache.kafka.connect.file.FileStreamSourceConnector");
+
+ testOpenAndReadTask(config);
}
@Test
- public void testOpenAndRead() throws Exception {
+ public void testOpenAndReadTaskDirect() throws Exception {
+ Map<String, Object> config = getConfig();
+
+ config.put(TaskConfig.TASK_CLASS_CONFIG,
+ "org.apache.kafka.connect.file.FileStreamSourceTask");
+
+ testOpenAndReadTask(config);
+ }
+
+ private Map<String, Object> getConfig() {
+ Map<String, Object> config = new HashMap<>();
+
+ config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+ "org.apache.kafka.connect.storage.StringConverter");
+ config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
+ "org.apache.kafka.connect.storage.StringConverter");
+
+ config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
offsetTopicName);
+
+ config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName);
+ config.put(FileStreamSourceConnector.FILE_CONFIG,
tempFile.getAbsoluteFile().toString());
+ config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG,
+
String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
+ return config;
+ }
+
+ private void testOpenAndReadTask(Map<String, Object> config) throws
Exception {
kafkaConnectSource = new KafkaConnectSource();
kafkaConnectSource.open(config, context);
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java
index 027521d2ffc..8f70ab51958 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information