This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 73e470c1e71b3fe2aba6a9cb96d3b102f85e3969 Author: Andrey Yegorov <[email protected]> AuthorDate: Tue Mar 22 00:51:39 2022 -0700 Handle kafka sinks that return immutable maps as configs (#14780) (cherry picked from commit b56d7318e73fb6915208dbe1223446e759c2ed0b) --- .../pulsar/io/kafka/connect/KafkaConnectSink.java | 29 ++++++++++++++-------- .../connect/SchemaedFileStreamSinkConnector.java | 14 +++++++++++ 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java index bd1971e75c6..7f89c106360 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java @@ -25,6 +25,19 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -44,17 +57,6 @@ import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG; import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG; @@ -155,6 +157,11 @@ public class KafkaConnectSink implements Sink<GenericObject> { Preconditions.checkNotNull(configs); Preconditions.checkArgument(configs.size() == 1); + // configs may contain immutable/unmodifiable maps + configs = configs.stream() + .map(HashMap::new) + .collect(Collectors.toList()); + configs.forEach(x -> { x.put(OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic()); x.put(PULSAR_SERVICE_URL_CONFIG, kafkaSinkConfig.getPulsarServiceUrl()); diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java index a3cce924d1a..4a786617f75 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java @@ -22,6 +22,11 @@ package org.apache.pulsar.io.kafka.connect; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.file.FileStreamSinkConnector; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + /** * A FileStreamSinkConnector for testing that writes data other than just a value, i.e.: * key, value, key and value schemas. @@ -31,4 +36,13 @@ public class SchemaedFileStreamSinkConnector extends FileStreamSinkConnector { public Class<? extends Task> taskClass() { return SchemaedFileStreamSinkTask.class; } + + @Override + public List<Map<String, String>> taskConfigs(int maxTasks) { + // to test cases when task return immutable maps as configs + return super.taskConfigs(maxTasks) + .stream() + .map(Collections::unmodifiableMap) + .collect(Collectors.toList()); + } }
