This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new bbff29d8ecc [fix][io] Kafka Source connector maybe stuck (#22511) bbff29d8ecc is described below commit bbff29d8ecc2f6c7ec91e0a48085fe14c8ffd6b8 Author: Baodi Shi <ba...@apache.org> AuthorDate: Tue Apr 16 08:04:11 2024 +0800 [fix][io] Kafka Source connector maybe stuck (#22511) --- .../pulsar/io/kafka/KafkaAbstractSource.java | 28 ++++++- .../io/kafka/source/KafkaAbstractSourceTest.java | 89 ++++++++++++++++++++++ 2 files changed, 116 insertions(+), 1 deletion(-) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 782f9d5d57d..7eba7438b2b 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -63,6 +64,7 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> { private volatile boolean running = false; private KafkaSourceConfig kafkaSourceConfig; private Thread runnerThread; + private long maxPollIntervalMs; @Override public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { @@ -126,6 +128,13 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaSourceConfig.getAutoOffsetReset()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getKeyDeserializationClass()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getValueDeserializationClass()); + if (props.containsKey(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) { + maxPollIntervalMs = Long.parseLong(props.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG).toString()); + } else { + maxPollIntervalMs = Long.parseLong( + ConsumerConfig.configDef().defaultValues().get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) + .toString()); + } try { consumer = new KafkaConsumer<>(beforeCreateConsumer(props)); } catch (Exception ex) { @@ -175,7 +184,9 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> { index++; } if (!kafkaSourceConfig.isAutoCommitEnabled()) { - CompletableFuture.allOf(futures).get(); + // Wait about 2/3 of the time of maxPollIntervalMs. + // so as to avoid waiting for the timeout to be kicked out of the consumer group. + CompletableFuture.allOf(futures).get(maxPollIntervalMs * 2 / 3, TimeUnit.MILLISECONDS); consumer.commitSync(); } } catch (Exception e) { @@ -253,6 +264,21 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> { completableFuture.complete(null); } + @Override + public void fail() { + completableFuture.completeExceptionally( + new RuntimeException( + String.format( + "Failed to process record with kafka topic: %s partition: %d offset: %d key: %s", + record.topic(), + record.partition(), + record.offset(), + getKey() + ) + ) + ); + } + @Override public Schema<V> getSchema() { return schema; diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index 7675de0636e..6b4719709a1 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -21,12 +21,18 @@ package org.apache.pulsar.io.kafka.source; import com.google.common.collect.ImmutableMap; import java.time.Duration; import java.util.Collections; +import java.util.Arrays; import java.lang.reflect.Field; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.kafka.KafkaAbstractSource; import org.apache.pulsar.io.kafka.KafkaSourceConfig; @@ -46,6 +52,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.expectThrows; import static org.testng.Assert.fail; @@ -218,6 +225,88 @@ public class KafkaAbstractSourceTest { source.read(); } + @Test + public final void throwExceptionBySendFail() throws Exception { + KafkaAbstractSource source = new DummySource(); + + KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig(); + kafkaSourceConfig.setTopic("test-topic"); + kafkaSourceConfig.setAutoCommitEnabled(false); + Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig"); + kafkaSourceConfigField.setAccessible(true); + kafkaSourceConfigField.set(source, kafkaSourceConfig); + + Field defaultMaxPollIntervalMsField = KafkaAbstractSource.class.getDeclaredField("maxPollIntervalMs"); + defaultMaxPollIntervalMsField.setAccessible(true); + defaultMaxPollIntervalMsField.set(source, 300000); + + Consumer consumer = mock(Consumer.class); + ConsumerRecord<String, byte[]> consumerRecord = new ConsumerRecord<>("topic", 0, 0, + "t-key", "t-value".getBytes(StandardCharsets.UTF_8)); + ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(Collections.singletonMap( + new TopicPartition("topic", 0), + Arrays.asList(consumerRecord))); + Mockito.doReturn(consumerRecords).when(consumer).poll(Mockito.any(Duration.class)); + + Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer"); + consumerField.setAccessible(true); + consumerField.set(source, consumer); + source.start(); + + // Mock send message fail + Record record = source.read(); + record.fail(); + + // read again will throw RuntimeException. + try { + source.read(); + fail("Should throw exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof RuntimeException); + assertTrue(e.getCause().getMessage().contains("Failed to process record with kafka topic")); + } + } + + @Test + public final void throwExceptionBySendTimeOut() throws Exception { + KafkaAbstractSource source = new DummySource(); + + KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig(); + kafkaSourceConfig.setTopic("test-topic"); + kafkaSourceConfig.setAutoCommitEnabled(false); + Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig"); + kafkaSourceConfigField.setAccessible(true); + kafkaSourceConfigField.set(source, kafkaSourceConfig); + + Field defaultMaxPollIntervalMsField = KafkaAbstractSource.class.getDeclaredField("maxPollIntervalMs"); + defaultMaxPollIntervalMsField.setAccessible(true); + defaultMaxPollIntervalMsField.set(source, 1); + + Consumer consumer = mock(Consumer.class); + ConsumerRecord<String, byte[]> consumerRecord = new ConsumerRecord<>("topic", 0, 0, + "t-key", "t-value".getBytes(StandardCharsets.UTF_8)); + ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(Collections.singletonMap( + new TopicPartition("topic", 0), + Arrays.asList(consumerRecord))); + Mockito.doReturn(consumerRecords).when(consumer).poll(Mockito.any(Duration.class)); + + Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer"); + consumerField.setAccessible(true); + consumerField.set(source, consumer); + source.start(); + + // Mock send message fail, just read do noting. + source.read(); + + // read again will throw TimeOutException. + try { + source.read(); + fail("Should throw exception"); + } catch (Exception e) { + assertTrue(e instanceof TimeoutException); + } + } + private File getFile(String name) { ClassLoader classLoader = getClass().getClassLoader(); return new File(classLoader.getResource(name).getFile());