This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 860a2ba28cf [fix][io] Close the kafka source connector got stuck
(#20698)
860a2ba28cf is described below
commit 860a2ba28cff8b51ff2a4e3c5b691a017fa33a83
Author: fengyubiao <[email protected]>
AuthorDate: Sat Jul 1 02:23:45 2023 +0800
[fix][io] Close the kafka source connector got stuck (#20698)
Motivation:
https://github.com/apache/pulsar/discussions/19880#discussioncomment-6026807
When Kafka connector is closing, it waits for the `runnerThread` to stop, but
the task-close is running at the same thread, so it will be stuck.
Modifications: run `close` in another thread.
(cherry picked from commit c5237eaeeee1d1444deda94a69721a12b19c5ba1)
---
pulsar-io/kafka/pom.xml | 6 ++++++
.../org/apache/pulsar/io/kafka/KafkaAbstractSource.java | 14 ++++++++------
.../pulsar/io/kafka/source/KafkaAbstractSourceTest.java | 6 +++++-
3 files changed, 19 insertions(+), 7 deletions(-)
diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml
index eaa9385ffc8..222d7972d09 100644
--- a/pulsar-io/kafka/pom.xml
+++ b/pulsar-io/kafka/pom.xml
@@ -93,6 +93,12 @@
<version>${kafka.confluent.avroserializer.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index eeaa89c601e..3933375512b 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -183,12 +183,14 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
});
runnerThread.setUncaughtExceptionHandler(
(t, e) -> {
- LOG.error("[{}] Error while consuming records",
t.getName(), e);
- try {
- this.close();
- } catch (InterruptedException ex) {
- // The interrupted exception is thrown by the
runnerThread itself. Ignore it.
- }
+ new Thread(() -> {
+ LOG.error("[{}] Error while consuming records",
t.getName(), e);
+ try {
+ this.close();
+ } catch (Exception ex) {
+ LOG.error("[{}] Close kafka source error",
t.getName(), e);
+ }
+ }, "Kafka Source Close Task Thread").start();
});
runnerThread.setName("Kafka Source Thread");
runnerThread.start();
diff --git
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index d706c389907..ee573cacd77 100644
---
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
import org.apache.pulsar.io.kafka.KafkaSourceConfig;
+import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -173,7 +174,10 @@ public class KafkaAbstractSourceTest {
Field runningField =
KafkaAbstractSource.class.getDeclaredField("running");
runningField.setAccessible(true);
- Assert.assertFalse((boolean) runningField.get(source));
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse((boolean) runningField.get(source));
+ Assert.assertNull(consumerField.get(source));
+ });
}
private File getFile(String name) {