This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9767a823cf0ad05197b948d8da1b022b8f46e97d
Author: Neng Lu <[email protected]>
AuthorDate: Fri Jul 23 13:13:36 2021 -0700

    make KafkaSourceRecord ack() async to avoid deadlock (#11435)
    
    The `ack()` method of the `AbstractKafkaSourceRecord` should be 
non-blocking. Otherwise there'll be deadlock for pulsar-client-io thread and 
the main `public/default/debezium-mongodb-source-0` thread. And further blocks 
the whole debezium connector to work correctly.
    
    1. remove the blocking `future.get()` call from `ack()`
    2. move the commit logic into callbacks
    
    (cherry picked from commit b3892ee96945b93190d9d2b20e70cbec7382fcde)
---
 .../kafka/connect/AbstractKafkaConnectSource.java  | 43 +++++++++++-----------
 .../io/kafka/connect/PulsarOffsetBackingStore.java |  3 +-
 2 files changed, 22 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index eab0bca..1fbacfb 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -150,9 +150,15 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
             } else {
                 // there is no records any more, then waiting for the batch to 
complete writing
                 // to sink and the offsets are committed as well, then do next 
round read.
-                flushFuture.get();
-                flushFuture = null;
-                currentBatch = null;
+                try {
+                    flushFuture.get();
+                } catch (ExecutionException ex) {
+                    // log the error, continue execution
+                    log.error("execution exception while get flushFuture", ex);
+                } finally {
+                    flushFuture = null;
+                    currentBatch = null;
+                }
             }
         }
     }
@@ -168,7 +174,7 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
 
     private static Map<String, String> PROPERTIES = Collections.emptyMap();
     private static Optional<Long> RECORD_SEQUENCE = Optional.empty();
-    private static long FLUSH_TIMEOUT_MS = 2000;
+    private static long FLUSH_TIMEOUT_MS = 60000;
 
     public abstract class AbstractKafkaSourceRecord<T> implements Record {
         @Getter
@@ -221,9 +227,17 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
                 offsetWriter.cancelFlush();
                 flushFuture.completeExceptionally(new Exception("No Offsets 
Added Error"));
             } else {
-                log.trace("Finished flushing offsets to storage");
-                currentBatch = null;
-                flushFuture.complete(null);
+                try {
+                    sourceTask.commit();
+
+                    log.info("Finished flushing offsets to storage");
+                    currentBatch = null;
+                    flushFuture.complete(null);
+                } catch (InterruptedException exception) {
+                    log.warn("Flush of {} offsets interrupted, cancelling", 
this);
+                    offsetWriter.cancelFlush();
+                    flushFuture.completeExceptionally(new Exception("Failed to 
commit offsets"));
+                }
             }
         }
 
@@ -248,21 +262,6 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
                     flushFuture.completeExceptionally(new Exception("No 
Offsets Added Error"));
                     return;
                 }
-
-                // Wait until the offsets are flushed
-                try {
-                    doFlush.get(FLUSH_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-                    sourceTask.commit();
-                } catch (InterruptedException e) {
-                    log.warn("Flush of {} offsets interrupted, cancelling", 
this);
-                    offsetWriter.cancelFlush();
-                } catch (ExecutionException e) {
-                    log.error("Flush of {} offsets threw an unexpected 
exception: ", this, e);
-                    offsetWriter.cancelFlush();
-                } catch (TimeoutException e) {
-                    log.error("Timed out waiting to flush {} offsets to 
storage", this);
-                    offsetWriter.cancelFlush();
-                }
             }
         }
 
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index e616e84..46da98a 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -66,8 +66,7 @@ public class PulsarOffsetBackingStore implements 
OffsetBackingStore {
             + WorkerConfig.BOOTSTRAP_SERVERS_CONFIG + "`");
         this.data = new HashMap<>();
 
-        log.info("Configure offset backing store on pulsar topic {} at cluster 
{}",
-            topic, serviceUrl);
+        log.info("Configure offset backing store on pulsar topic {} at cluster 
{}", topic);
     }
 
     void readToEnd(CompletableFuture<Void> future) {

Reply via email to