[
https://issues.apache.org/jira/browse/KAFKA-9999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Boyang Chen resolved KAFKA-9999.
--------------------------------
Resolution: Won't Fix
> Internal topic creation failure should be non-fatal and trigger explicit
> rebalance
> -----------------------------------------------------------------------------------
>
> Key: KAFKA-9999
> URL: https://issues.apache.org/jira/browse/KAFKA-9999
> Project: Kafka
> Issue Type: Bug
> Components: admin, streams
> Affects Versions: 2.4.0
> Reporter: Boyang Chen
> Assignee: Boyang Chen
> Priority: Major
>
> We spotted a case in system test failure where the topic already exists but
> the admin client still attempts to recreate it:
>
> {code:java}
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic
> SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog. Topic is probably
> marked for deletion (number of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete
> operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic
> 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog' already exists.
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic
> SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number
> of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete
> operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic
> 'SmokeTest-uwin-cnt-changelog' already exists.
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic
> SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number
> of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete
> operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic
> 'SmokeTest-cntByCnt-changelog' already exists.
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics
> [SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog,
> SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made
> ready with 5 retries left
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics
> after 5 retries. This can happen if the Kafka cluster is temporary not
> available. You can increase admin client config `retries` to be resilient
> against this error.
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,221] ERROR stream-thread
> [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered
> the following unexpected Kafka exception during processing, this usually
> indicate Streams internal errors:
> (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.StreamsException: Could not create topics
> after 5 retries. This can happen if the Kafka cluster is temporary not
> available. You can increase admin client config `retries` to be resilient
> against this error.
> at
> org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171)
> at
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229)
> at
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588)
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
>
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743){code}
> Looking closer it seems that we don't know as of today if a topic is pending
> deletion or running properly. We could discuss a follow-up effort to reflect
> that information as part of topic description result.
> The current solution to this problem is to explicitly trigger a rebalance
> when we run out of retries to unblock the group, as the short term
> unavailability should be more likely a broker side unavailability.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)