This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 10a18d55b9 retry KafkaConsumer creation in
KafkaPartitionLevelConnectionHandler.java (#253) (#11040)
10a18d55b9 is described below
commit 10a18d55b9d27851fa2bba1f77eda3e9d600aa37
Author: Johan Adami <[email protected]>
AuthorDate: Sun Jul 9 05:32:18 2023 -0400
retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java
(#253) (#11040)
Co-authored-by: Johan Adami <[email protected]>
---
.../KafkaPartitionLevelConnectionHandler.java | 32 +++++++++++++++++++++-
1 file changed, 31 insertions(+), 1 deletion(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
index ca6290d59a..00371acaba 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
@@ -19,17 +19,22 @@
package org.apache.pinot.plugin.stream.kafka20;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -39,6 +44,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
*/
public abstract class KafkaPartitionLevelConnectionHandler {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaPartitionLevelConnectionHandler.class);
protected final KafkaPartitionLevelStreamConfig _config;
protected final String _clientId;
protected final int _partition;
@@ -61,12 +67,36 @@ public abstract class KafkaPartitionLevelConnectionHandler {
consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
_config.getKafkaIsolationLevel());
}
consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
- _consumer = new KafkaConsumer<>(consumerProp);
+ _consumer = createConsumer(consumerProp);
_topicPartition = new TopicPartition(_topic, _partition);
_consumer.assign(Collections.singletonList(_topicPartition));
_kafkaMetadataExtractor =
KafkaMetadataExtractor.build(_config.isPopulateMetadata());
}
+ private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+ // Creation of the KafkaConsumer can fail for multiple reasons including
DNS issues.
+ // We arbitrarily chose 5 retries with 2 seconds sleep in between retries.
10 seconds total felt
+ // like a good balance of not waiting too long for a retry, but also not
retrying too many times.
+ int maxTries = 5;
+ int tries = 0;
+ while (true) {
+ try {
+ return new KafkaConsumer<>(consumerProp);
+ } catch (KafkaException e) {
+ tries++;
+ if (tries >= maxTries) {
+ LOGGER.error("Caught exception while creating Kafka consumer, giving
up", e);
+ throw e;
+ }
+ LOGGER.warn("Caught exception while creating Kafka consumer, retrying
{}/{}", tries, maxTries, e);
+ // We are choosing to sleepUniterruptibly here because other parts of
the Kafka consumer code do this
+ // as well. We don't want random interrupts to cause us to fail to
create the consumer and have the table
+ // stuck in ERROR state.
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ }
+ }
+ }
+
public void close()
throws IOException {
_consumer.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]