This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eb779ff8ccbe3eae80148f29715aa5713e330d91 Author: Rui Fu <[email protected]> AuthorDate: Wed Mar 2 16:35:58 2022 +0800 [pulsar-io] throw exceptions when kafka offset backing store failed to start (#14491) (cherry picked from commit e6656e1407be80fdf4b6aaf424a57068687840cc) --- .../io/kafka/connect/PulsarOffsetBackingStore.java | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java index 495c8b9..86905ad 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java @@ -30,6 +30,8 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -49,7 +51,7 @@ import org.apache.pulsar.client.api.Schema; @Slf4j public class PulsarOffsetBackingStore implements OffsetBackingStore { - private Map<ByteBuffer, ByteBuffer> data; + private final Map<ByteBuffer, ByteBuffer> data = new ConcurrentHashMap<>(); private PulsarClient client; private String topic; private Producer<byte[]> producer; @@ -65,7 +67,6 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore { public void configure(WorkerConfig workerConfig) { this.topic = workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG); checkArgument(!isBlank(topic), "Offset storage topic must be specified"); - this.data = new HashMap<>(); log.info("Configure offset backing store on pulsar topic {}", topic); } @@ -126,10 +127,13 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore { } void processMessage(Message<byte[]> message) { - synchronized (data) { + if (message.getKey() != null) { data.put( ByteBuffer.wrap(message.getKey().getBytes(UTF_8)), ByteBuffer.wrap(message.getValue())); + } else { + log.debug("Got message without key from the offset storage topic, skip it. message value: {}", + message.getValue()); } } @@ -149,10 +153,13 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore { CompletableFuture<Void> endFuture = new CompletableFuture<>(); readToEnd(endFuture); - endFuture.join(); + endFuture.get(); } catch (PulsarClientException e) { log.error("Failed to setup pulsar producer/reader to cluster", e); throw new RuntimeException("Failed to setup pulsar producer/reader to cluster ", e); + } catch (ExecutionException | InterruptedException e) { + log.error("Failed to start PulsarOffsetBackingStore", e); + throw new RuntimeException("Failed to start PulsarOffsetBackingStore", e); } } @@ -180,6 +187,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore { } reader = null; } + data.clear(); // do not close the client, it is provided by the sink context } @@ -191,10 +199,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore { return endFuture.thenApply(ignored -> { Map<ByteBuffer, ByteBuffer> values = new HashMap<>(); for (ByteBuffer key : keys) { - ByteBuffer value; - synchronized (data) { - value = data.get(key); - } + ByteBuffer value = data.get(key); if (null != value) { values.put(key, value); }
