This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 860a2ba28cf [fix][io] Close the kafka source connector got stuck 
(#20698)
860a2ba28cf is described below

commit 860a2ba28cff8b51ff2a4e3c5b691a017fa33a83
Author: fengyubiao <[email protected]>
AuthorDate: Sat Jul 1 02:23:45 2023 +0800

    [fix][io] Close the kafka source connector got stuck (#20698)
    
    Motivation: 
https://github.com/apache/pulsar/discussions/19880#discussioncomment-6026807 
When Kafka connector is closing, it waits for the `runnerThread` to stop, but 
the task-close is running at the same thread, so it will be stuck.
    
    Modifications: run `close` in another thread.
    (cherry picked from commit c5237eaeeee1d1444deda94a69721a12b19c5ba1)
---
 pulsar-io/kafka/pom.xml                                    |  6 ++++++
 .../org/apache/pulsar/io/kafka/KafkaAbstractSource.java    | 14 ++++++++------
 .../pulsar/io/kafka/source/KafkaAbstractSourceTest.java    |  6 +++++-
 3 files changed, 19 insertions(+), 7 deletions(-)

diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml
index eaa9385ffc8..222d7972d09 100644
--- a/pulsar-io/kafka/pom.xml
+++ b/pulsar-io/kafka/pom.xml
@@ -93,6 +93,12 @@
       <version>${kafka.confluent.avroserializer.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index eeaa89c601e..3933375512b 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -183,12 +183,14 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
         });
         runnerThread.setUncaughtExceptionHandler(
                 (t, e) -> {
-                    LOG.error("[{}] Error while consuming records", 
t.getName(), e);
-                    try {
-                        this.close();
-                    } catch (InterruptedException ex) {
-                        // The interrupted exception is thrown by the 
runnerThread itself. Ignore it.
-                    }
+                    new Thread(() -> {
+                        LOG.error("[{}] Error while consuming records", 
t.getName(), e);
+                        try {
+                            this.close();
+                        } catch (Exception ex) {
+                            LOG.error("[{}] Close kafka source error", 
t.getName(), e);
+                        }
+                    }, "Kafka Source Close Task Thread").start();
                 });
         runnerThread.setName("Kafka Source Thread");
         runnerThread.start();
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index d706c389907..ee573cacd77 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.kafka.KafkaAbstractSource;
 import org.apache.pulsar.io.kafka.KafkaSourceConfig;
+import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -173,7 +174,10 @@ public class KafkaAbstractSourceTest {
         Field runningField = 
KafkaAbstractSource.class.getDeclaredField("running");
         runningField.setAccessible(true);
 
-        Assert.assertFalse((boolean) runningField.get(source));
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertFalse((boolean) runningField.get(source));
+            Assert.assertNull(consumerField.get(source));
+        });
     }
 
     private File getFile(String name) {

Reply via email to