jadami10 commented on code in PR #11040:
URL: https://github.com/apache/pinot/pull/11040#discussion_r1258265019
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java:
##########
@@ -61,12 +67,36 @@ public KafkaPartitionLevelConnectionHandler(String
clientId, StreamConfig stream
consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
_config.getKafkaIsolationLevel());
}
consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
- _consumer = new KafkaConsumer<>(consumerProp);
+ _consumer = createConsumer(consumerProp);
_topicPartition = new TopicPartition(_topic, _partition);
_consumer.assign(Collections.singletonList(_topicPartition));
_kafkaMetadataExtractor =
KafkaMetadataExtractor.build(_config.isPopulateMetadata());
}
+ private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+ // Creation of the KafkaConsumer can fail for multiple reasons including
DNS issues.
+ // We arbitrarily chose 5 retries with 2 seconds sleep in between retries.
10 seconds total felt
+ // like a good balance of not waiting too long for a retry, but also not
retrying too many times.
+ int maxTries = 5;
+ int tries = 0;
+ while (true) {
+ try {
+ return new KafkaConsumer<>(consumerProp);
+ } catch (KafkaException e) {
+ tries++;
+ if (tries >= maxTries) {
+ LOGGER.error("Caught exception while creating Kafka consumer, giving
up", e);
+ throw e;
+ }
+ LOGGER.warn("Caught exception while creating Kafka consumer, retrying
{}/{}", tries, maxTries, e);
+ // We are choosing to sleepUniterruptibly here because other parts of
the Kafka consumer code do this
+ // as well. We don't want random interrupts to cause us to fail to
create the consumer and have the table
+ // stuck in ERROR state.
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
Review Comment:
here's the stack trace. we're still seeing it daily, but it resolves with
1-2 retries. we're separately investigating why dns resolution suddenly started
failing.
```
[2023-07-10 13:14:13.087928] WARN [ClientUtils]
[HelixTaskExecutor-message_handle_thread_16:25] Couldn't resolve server <kafka
broker address> from bootstrap.servers as DNS resolution failed for <kafka
broker address>
[2023-07-10 13:14:13.088247] WARN [KafkaPartitionLevelConnectionHandler]
[HelixTaskExecutor-message_handle_thread_16:25] Caught exception while creating
Kafka consumer, retrying 1/5
[2023-07-10 13:14:13.088268]
org.apache.pinot.shaded.org.apache.kafka.common.KafkaException: Failed to
construct kafka consumer
[2023-07-10 13:14:13.088284] at
org.apache.pinot.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088303] at
org.apache.pinot.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088322] at
org.apache.pinot.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088337] at
org.apache.pinot.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:626)
~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088354] at
org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConnectionHandler.createConsumer(KafkaPartitionLevelConnectionHandler.java:84)
~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088371] at
org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConnectionHandler.<init>(KafkaPartitionLevelConnectionHandler.java:70)
~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088393] at
org.apache.pinot.plugin.stream.kafka20.KafkaStreamMetadataProvider.<init>(KafkaStreamMetadataProvider.java:54)
~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088411] at
org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory.createPartitionMetadataProvider(KafkaConsumerFactory.java:43)
~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088426] at
org.apache.pinot.spi.stream.StreamMetadataProvider.computePartitionGroupMetadata(StreamMetadataProvider.java:83)
~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088442] at
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.setPartitionParameters(LLRealtimeSegmentDataManager.java:1532)
~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088476] at
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.<init>(LLRealtimeSegmentDataManager.java:1442)
~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088493] at
org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:445)
~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088508] at
org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSegment(HelixInstanceDataManager.java:219)
~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088528] at
org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:168)
~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088553] at
org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:83)
~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088561] at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
[2023-07-10 13:14:13.088570] at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:?]
[2023-07-10 13:14:13.088579] at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?]
[2023-07-10 13:14:13.088585] at
java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
[2023-07-10 13:14:13.088602] at
org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:350)
~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088620] at
org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:278)
~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088636] at
org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97)
~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088650] at
org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49)
~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088657] at
java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
[2023-07-10 13:14:13.088666] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
~[?:?]
[2023-07-10 13:14:13.088675] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
~[?:?]
[2023-07-10 13:14:13.088681] at
java.lang.Thread.run(Thread.java:829) [?:?]
[2023-07-10 13:14:13.088691] Caused by:
org.apache.pinot.shaded.org.apache.kafka.common.config.ConfigException: No
resolvable bootstrap urls given in bootstrap.servers
[2023-07-10 13:14:13.088710] at
org.apache.pinot.shaded.org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89)
~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088724] at
org.apache.pinot.shaded.org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088738] at
org.apache.pinot.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:731)
~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
[2023-07-10 13:14:13.088743] ... 26 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]