This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 911fbf5fa2b [fix][io] Not restart instance when kafka source poll
exception. (#20795)
911fbf5fa2b is described below
commit 911fbf5fa2b49825d7dcbf2270f0329a5267a2fa
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jul 13 21:11:06 2023 +0800
[fix][io] Not restart instance when kafka source poll exception. (#20795)
---
.../pulsar/io/kafka/KafkaAbstractSource.java | 53 ++++++++--------------
.../io/kafka/source/KafkaAbstractSourceTest.java | 43 +++++++++++++-----
2 files changed, 52 insertions(+), 44 deletions(-)
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 012e4143744..3d4612c039f 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -27,7 +27,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -133,7 +132,6 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
throw new IllegalArgumentException("Unable to instantiate Kafka
consumer", ex);
}
this.start();
- running = true;
}
protected Properties beforeCreateConsumer(Properties props) {
@@ -158,47 +156,36 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
@SuppressWarnings("unchecked")
public void start() {
+ LOG.info("Starting subscribe kafka source on {}",
kafkaSourceConfig.getTopic());
+
consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
runnerThread = new Thread(() -> {
- LOG.info("Starting kafka source on {}",
kafkaSourceConfig.getTopic());
-
consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
LOG.info("Kafka source started.");
while (running) {
- ConsumerRecords<Object, Object> consumerRecords =
consumer.poll(Duration.ofSeconds(1L));
- CompletableFuture<?>[] futures = new
CompletableFuture<?>[consumerRecords.count()];
- int index = 0;
- for (ConsumerRecord<Object, Object> consumerRecord :
consumerRecords) {
- KafkaRecord record = buildRecord(consumerRecord);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Write record {} {} {}", record.getKey(),
record.getValue(), record.getSchema());
+ try {
+ ConsumerRecords<Object, Object> consumerRecords =
consumer.poll(Duration.ofSeconds(1L));
+ CompletableFuture<?>[] futures = new
CompletableFuture<?>[consumerRecords.count()];
+ int index = 0;
+ for (ConsumerRecord<Object, Object> consumerRecord :
consumerRecords) {
+ KafkaRecord record = buildRecord(consumerRecord);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Write record {} {} {}",
record.getKey(), record.getValue(), record.getSchema());
+ }
+ consume(record);
+ futures[index] = record.getCompletableFuture();
+ index++;
}
- consume(record);
- futures[index] = record.getCompletableFuture();
- index++;
- }
- if (!kafkaSourceConfig.isAutoCommitEnabled()) {
- try {
+ if (!kafkaSourceConfig.isAutoCommitEnabled()) {
CompletableFuture.allOf(futures).get();
consumer.commitSync();
- } catch (InterruptedException ex) {
- break;
- } catch (ExecutionException ex) {
- LOG.error("Error while processing records", ex);
- break;
}
+ } catch (Exception e) {
+ LOG.error("Error while processing records", e);
+ notifyError(e);
+ break;
}
}
});
- runnerThread.setUncaughtExceptionHandler(
- (t, e) -> {
- new Thread(() -> {
- LOG.error("[{}] Error while consuming records",
t.getName(), e);
- try {
- this.close();
- } catch (Exception ex) {
- LOG.error("[{}] Close kafka source error",
t.getName(), e);
- }
- }, "Kafka Source Close Task Thread").start();
- });
+ running = true;
runnerThread.setName("Kafka Source Thread");
runnerThread.start();
}
diff --git
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 402727f4ec0..6911ec2a6bf 100644
---
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.io.kafka.source;
import com.google.common.collect.ImmutableMap;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.lang.reflect.Field;
@@ -31,7 +32,6 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
import org.apache.pulsar.io.kafka.KafkaSourceConfig;
-import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -158,26 +158,47 @@ public class KafkaAbstractSourceTest {
assertEquals(config.getSslTruststorePassword(), "cert_pwd");
}
- @Test
- public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws
Exception {
+ @Test(expectedExceptions = RuntimeException.class,
expectedExceptionsMessageRegExp = "Subscribe exception")
+ public final void throwExceptionBySubscribe() throws Exception {
KafkaAbstractSource source = new DummySource();
+
+ KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
+ kafkaSourceConfig.setTopic("test-topic");
+ Field kafkaSourceConfigField =
KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
+ kafkaSourceConfigField.setAccessible(true);
+ kafkaSourceConfigField.set(source, kafkaSourceConfig);
+
Consumer consumer = mock(Consumer.class);
- Mockito.doThrow(new RuntimeException("Uncaught
exception")).when(consumer)
+ Mockito.doThrow(new RuntimeException("Subscribe
exception")).when(consumer)
.subscribe(Mockito.any(Collection.class));
Field consumerField =
KafkaAbstractSource.class.getDeclaredField("consumer");
consumerField.setAccessible(true);
consumerField.set(source, consumer);
-
+ // will throw RuntimeException.
source.start();
+ }
+
+ @Test(expectedExceptions = RuntimeException.class,
expectedExceptionsMessageRegExp = "Pool exception")
+ public final void throwExceptionByPoll() throws Exception {
+ KafkaAbstractSource source = new DummySource();
- Field runningField =
KafkaAbstractSource.class.getDeclaredField("running");
- runningField.setAccessible(true);
+ KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
+ kafkaSourceConfig.setTopic("test-topic");
+ Field kafkaSourceConfigField =
KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
+ kafkaSourceConfigField.setAccessible(true);
+ kafkaSourceConfigField.set(source, kafkaSourceConfig);
- Awaitility.await().untilAsserted(() -> {
- Assert.assertFalse((boolean) runningField.get(source));
- Assert.assertNull(consumerField.get(source));
- });
+ Consumer consumer = mock(Consumer.class);
+ Mockito.doThrow(new RuntimeException("Pool exception")).when(consumer)
+ .poll(Mockito.any(Duration.class));
+
+ Field consumerField =
KafkaAbstractSource.class.getDeclaredField("consumer");
+ consumerField.setAccessible(true);
+ consumerField.set(source, consumer);
+ source.start();
+ // will throw RuntimeException.
+ source.read();
}
private File getFile(String name) {