This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bbff29d8ecc [fix][io] Kafka Source connector maybe stuck (#22511)
bbff29d8ecc is described below

commit bbff29d8ecc2f6c7ec91e0a48085fe14c8ffd6b8
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Tue Apr 16 08:04:11 2024 +0800

    [fix][io] Kafka Source connector maybe stuck (#22511)
---
 .../pulsar/io/kafka/KafkaAbstractSource.java       | 28 ++++++-
 .../io/kafka/source/KafkaAbstractSourceTest.java   | 89 ++++++++++++++++++++++
 2 files changed, 116 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 782f9d5d57d..7eba7438b2b 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -27,6 +27,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -63,6 +64,7 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
     private volatile boolean running = false;
     private KafkaSourceConfig kafkaSourceConfig;
     private Thread runnerThread;
+    private long maxPollIntervalMs;
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
@@ -126,6 +128,13 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
kafkaSourceConfig.getAutoOffsetReset());
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
kafkaSourceConfig.getKeyDeserializationClass());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
kafkaSourceConfig.getValueDeserializationClass());
+        if (props.containsKey(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) {
+            maxPollIntervalMs = 
Long.parseLong(props.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG).toString());
+        } else {
+            maxPollIntervalMs = Long.parseLong(
+                    
ConsumerConfig.configDef().defaultValues().get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)
+                            .toString());
+        }
         try {
             consumer = new KafkaConsumer<>(beforeCreateConsumer(props));
         } catch (Exception ex) {
@@ -175,7 +184,9 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
                         index++;
                     }
                     if (!kafkaSourceConfig.isAutoCommitEnabled()) {
-                        CompletableFuture.allOf(futures).get();
+                        // Wait about 2/3 of the time of maxPollIntervalMs.
+                        // so as to avoid waiting for the timeout to be kicked 
out of the consumer group.
+                        CompletableFuture.allOf(futures).get(maxPollIntervalMs 
* 2 / 3, TimeUnit.MILLISECONDS);
                         consumer.commitSync();
                     }
                 } catch (Exception e) {
@@ -253,6 +264,21 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
             completableFuture.complete(null);
         }
 
+        @Override
+        public void fail() {
+            completableFuture.completeExceptionally(
+                    new RuntimeException(
+                            String.format(
+                                    "Failed to process record with kafka 
topic: %s partition: %d offset: %d key: %s",
+                                    record.topic(),
+                                    record.partition(),
+                                    record.offset(),
+                                    getKey()
+                            )
+                    )
+            );
+        }
+
         @Override
         public Schema<V> getSchema() {
             return schema;
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 7675de0636e..6b4719709a1 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -21,12 +21,18 @@ package org.apache.pulsar.io.kafka.source;
 import com.google.common.collect.ImmutableMap;
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Arrays;
 import java.lang.reflect.Field;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.kafka.KafkaAbstractSource;
 import org.apache.pulsar.io.kafka.KafkaSourceConfig;
@@ -46,6 +52,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.expectThrows;
 import static org.testng.Assert.fail;
 
@@ -218,6 +225,88 @@ public class KafkaAbstractSourceTest {
         source.read();
     }
 
+    @Test
+    public final void throwExceptionBySendFail() throws Exception {
+        KafkaAbstractSource source = new DummySource();
+
+        KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
+        kafkaSourceConfig.setTopic("test-topic");
+        kafkaSourceConfig.setAutoCommitEnabled(false);
+        Field kafkaSourceConfigField = 
KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
+        kafkaSourceConfigField.setAccessible(true);
+        kafkaSourceConfigField.set(source, kafkaSourceConfig);
+
+        Field defaultMaxPollIntervalMsField = 
KafkaAbstractSource.class.getDeclaredField("maxPollIntervalMs");
+        defaultMaxPollIntervalMsField.setAccessible(true);
+        defaultMaxPollIntervalMsField.set(source, 300000);
+
+        Consumer consumer = mock(Consumer.class);
+        ConsumerRecord<String, byte[]> consumerRecord = new 
ConsumerRecord<>("topic", 0, 0,
+                "t-key", "t-value".getBytes(StandardCharsets.UTF_8));
+        ConsumerRecords<String, byte[]> consumerRecords = new 
ConsumerRecords<>(Collections.singletonMap(
+                new TopicPartition("topic", 0),
+                Arrays.asList(consumerRecord)));
+        
Mockito.doReturn(consumerRecords).when(consumer).poll(Mockito.any(Duration.class));
+
+        Field consumerField = 
KafkaAbstractSource.class.getDeclaredField("consumer");
+        consumerField.setAccessible(true);
+        consumerField.set(source, consumer);
+        source.start();
+
+        // Mock send message fail
+        Record record = source.read();
+        record.fail();
+
+        // read again will throw RuntimeException.
+        try {
+            source.read();
+            fail("Should throw exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof RuntimeException);
+            assertTrue(e.getCause().getMessage().contains("Failed to process 
record with kafka topic"));
+        }
+    }
+
+    @Test
+    public final void throwExceptionBySendTimeOut() throws Exception {
+        KafkaAbstractSource source = new DummySource();
+
+        KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
+        kafkaSourceConfig.setTopic("test-topic");
+        kafkaSourceConfig.setAutoCommitEnabled(false);
+        Field kafkaSourceConfigField = 
KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
+        kafkaSourceConfigField.setAccessible(true);
+        kafkaSourceConfigField.set(source, kafkaSourceConfig);
+
+        Field defaultMaxPollIntervalMsField = 
KafkaAbstractSource.class.getDeclaredField("maxPollIntervalMs");
+        defaultMaxPollIntervalMsField.setAccessible(true);
+        defaultMaxPollIntervalMsField.set(source, 1);
+
+        Consumer consumer = mock(Consumer.class);
+        ConsumerRecord<String, byte[]> consumerRecord = new 
ConsumerRecord<>("topic", 0, 0,
+                "t-key", "t-value".getBytes(StandardCharsets.UTF_8));
+        ConsumerRecords<String, byte[]> consumerRecords = new 
ConsumerRecords<>(Collections.singletonMap(
+                new TopicPartition("topic", 0),
+                Arrays.asList(consumerRecord)));
+        
Mockito.doReturn(consumerRecords).when(consumer).poll(Mockito.any(Duration.class));
+
+        Field consumerField = 
KafkaAbstractSource.class.getDeclaredField("consumer");
+        consumerField.setAccessible(true);
+        consumerField.set(source, consumer);
+        source.start();
+
+        // Mock send message fail, just read do noting.
+        source.read();
+
+        // read again will throw TimeOutException.
+        try {
+            source.read();
+            fail("Should throw exception");
+        } catch (Exception e) {
+            assertTrue(e instanceof TimeoutException);
+        }
+    }
+
     private File getFile(String name) {
         ClassLoader classLoader = getClass().getClassLoader();
         return new File(classLoader.getResource(name).getFile());

Reply via email to