navina commented on code in PR #11040:
URL: https://github.com/apache/pinot/pull/11040#discussion_r1257083702


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java:
##########
@@ -61,12 +67,36 @@ public KafkaPartitionLevelConnectionHandler(String 
clientId, StreamConfig stream
       consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
_config.getKafkaIsolationLevel());
     }
     consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
-    _consumer = new KafkaConsumer<>(consumerProp);
+    _consumer = createConsumer(consumerProp);
     _topicPartition = new TopicPartition(_topic, _partition);
     _consumer.assign(Collections.singletonList(_topicPartition));
     _kafkaMetadataExtractor = 
KafkaMetadataExtractor.build(_config.isPopulateMetadata());
   }
 
+  private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+    // Creation of the KafkaConsumer can fail for multiple reasons including 
DNS issues.
+    // We arbitrarily chose 5 retries with 2 seconds sleep in between retries. 
10 seconds total felt
+    // like a good balance of not waiting too long for a retry, but also not 
retrying too many times.
+    int maxTries = 5;
+    int tries = 0;
+    while (true) {
+      try {
+        return new KafkaConsumer<>(consumerProp);
+      } catch (KafkaException e) {
+        tries++;
+        if (tries >= maxTries) {
+          LOGGER.error("Caught exception while creating Kafka consumer, giving 
up", e);
+          throw e;
+        }
+        LOGGER.warn("Caught exception while creating Kafka consumer, retrying 
{}/{}", tries, maxTries, e);
+        // We are choosing to sleepUniterruptibly here because other parts of 
the Kafka consumer code do this
+        // as well. We don't want random interrupts to cause us to fail to 
create the consumer and have the table
+        // stuck in ERROR state.
+        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);

Review Comment:
   @jadami10 Since this is used in the instantiation of the segment data 
manager, if we use exponential backoff, it might actually block the helix 
thread that is handling the state transition (offline -> consuming) , which is 
usually where the table and segment data manager get created. so, I would 
recommend just using a backoff with a max upper-bound. The drawback with 
constant backoff is the possibility of thrashing the source system. But in this 
case, I don't think it will be an issue. 
   
   btw, can you share the exception/stacktrace you see in the absence of this 
retry loop? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to