This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9767a823cf0ad05197b948d8da1b022b8f46e97d Author: Neng Lu <[email protected]> AuthorDate: Fri Jul 23 13:13:36 2021 -0700 make KafkaSourceRecord ack() async to avoid deadlock (#11435) The `ack()` method of the `AbstractKafkaSourceRecord` should be non-blocking. Otherwise there'll be deadlock for pulsar-client-io thread and the main `public/default/debezium-mongodb-source-0` thread. And further blocks the whole debezium connector to work correctly. 1. remove the blocking `future.get()` call from `ack()` 2. move the commit logic into callbacks (cherry picked from commit b3892ee96945b93190d9d2b20e70cbec7382fcde) --- .../kafka/connect/AbstractKafkaConnectSource.java | 43 +++++++++++----------- .../io/kafka/connect/PulsarOffsetBackingStore.java | 3 +- 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index eab0bca..1fbacfb 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -150,9 +150,15 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> { } else { // there is no records any more, then waiting for the batch to complete writing // to sink and the offsets are committed as well, then do next round read. - flushFuture.get(); - flushFuture = null; - currentBatch = null; + try { + flushFuture.get(); + } catch (ExecutionException ex) { + // log the error, continue execution + log.error("execution exception while get flushFuture", ex); + } finally { + flushFuture = null; + currentBatch = null; + } } } } @@ -168,7 +174,7 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> { private static Map<String, String> PROPERTIES = Collections.emptyMap(); private static Optional<Long> RECORD_SEQUENCE = Optional.empty(); - private static long FLUSH_TIMEOUT_MS = 2000; + private static long FLUSH_TIMEOUT_MS = 60000; public abstract class AbstractKafkaSourceRecord<T> implements Record { @Getter @@ -221,9 +227,17 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> { offsetWriter.cancelFlush(); flushFuture.completeExceptionally(new Exception("No Offsets Added Error")); } else { - log.trace("Finished flushing offsets to storage"); - currentBatch = null; - flushFuture.complete(null); + try { + sourceTask.commit(); + + log.info("Finished flushing offsets to storage"); + currentBatch = null; + flushFuture.complete(null); + } catch (InterruptedException exception) { + log.warn("Flush of {} offsets interrupted, cancelling", this); + offsetWriter.cancelFlush(); + flushFuture.completeExceptionally(new Exception("Failed to commit offsets")); + } } } @@ -248,21 +262,6 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> { flushFuture.completeExceptionally(new Exception("No Offsets Added Error")); return; } - - // Wait until the offsets are flushed - try { - doFlush.get(FLUSH_TIMEOUT_MS, TimeUnit.MILLISECONDS); - sourceTask.commit(); - } catch (InterruptedException e) { - log.warn("Flush of {} offsets interrupted, cancelling", this); - offsetWriter.cancelFlush(); - } catch (ExecutionException e) { - log.error("Flush of {} offsets threw an unexpected exception: ", this, e); - offsetWriter.cancelFlush(); - } catch (TimeoutException e) { - log.error("Timed out waiting to flush {} offsets to storage", this); - offsetWriter.cancelFlush(); - } } } diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java index e616e84..46da98a 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java @@ -66,8 +66,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore { + WorkerConfig.BOOTSTRAP_SERVERS_CONFIG + "`"); this.data = new HashMap<>(); - log.info("Configure offset backing store on pulsar topic {} at cluster {}", - topic, serviceUrl); + log.info("Configure offset backing store on pulsar topic {} at cluster {}", topic); } void readToEnd(CompletableFuture<Void> future) {
