This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a125e159c2d [fix][io] Close the kafka source connector if there is 
uncaught exception (#20424)
a125e159c2d is described below

commit a125e159c2db7c673dea795bc061dd6bfaeabd52
Author: Zike Yang <[email protected]>
AuthorDate: Mon May 29 21:03:34 2023 +0800

    [fix][io] Close the kafka source connector if there is uncaught exception 
(#20424)
    
    Fixes #19880
    
    ### Motivation
    
    If any unexpected and uncaught exceptions are thrown in the `runnerThread` 
of the Kafka source connector, the connector will just log that message and get 
stuck. The connector won't exit but won't do anything either.
    
    ### Modifications
    
    * Close the connector if there is uncaught exception
    
    Signed-off-by: Zike Yang <[email protected]>
    (cherry picked from commit 6079a9186ac8d1c96c97828a289d7865c5890ebd)
---
 .../pulsar/io/kafka/KafkaAbstractSource.java       |  9 ++++++++-
 .../io/kafka/source/KafkaAbstractSourceTest.java   | 23 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 565c3604747..8d2cbd8e74e 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -189,7 +189,14 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
             }
         });
         runnerThread.setUncaughtExceptionHandler(
-                (t, e) -> LOG.error("[{}] Error while consuming records", 
t.getName(), e));
+                (t, e) -> {
+                    LOG.error("[{}] Error while consuming records", 
t.getName(), e);
+                    try {
+                        this.close();
+                    } catch (InterruptedException ex) {
+                        // The interrupted exception is thrown by the 
runnerThread itself. Ignore it.
+                    }
+                });
         runnerThread.setName("Kafka Source Thread");
         runnerThread.start();
     }
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 612cf0bc6d2..bc06c3e1935 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.io.kafka.source;
 
 
 import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
 import java.util.Collections;
+import java.lang.reflect.Field;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -28,6 +31,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.kafka.KafkaAbstractSource;
 import org.apache.pulsar.io.kafka.KafkaSourceConfig;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -153,6 +157,25 @@ public class KafkaAbstractSourceTest {
         assertEquals(config.getSslTruststorePassword(), "cert_pwd");
     }
 
+    @Test
+    public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws 
Exception {
+        KafkaAbstractSource source = new DummySource();
+        Consumer consumer = mock(Consumer.class);
+        Mockito.doThrow(new RuntimeException("Uncaught 
exception")).when(consumer)
+                .subscribe(Mockito.any(Collection.class));
+
+        Field consumerField = 
KafkaAbstractSource.class.getDeclaredField("consumer");
+        consumerField.setAccessible(true);
+        consumerField.set(source, consumer);
+
+        source.start();
+
+        Field runningField = 
KafkaAbstractSource.class.getDeclaredField("running");
+        runningField.setAccessible(true);
+
+        Assert.assertFalse((boolean) runningField.get(source));
+    }
+
     private File getFile(String name) {
         ClassLoader classLoader = getClass().getClassLoader();
         return new File(classLoader.getResource(name).getFile());

Reply via email to