[ https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17699384#comment-17699384 ]
Sagar Rao commented on KAFKA-14750: ----------------------------------- Thanks [~morozov] for filing this and [~ChrisEgerton] for trying this out and your great explanation. IIUC, the error arrives at the point when we are trying to fetch the position of a topic partition of a deleted topic. At that point, the position API throws a `TimeoutException` which has a default timeout of 60s. Also, if I understood the flow of the Kafka Consumer correctly, then first the `onPartitionsRevoked` callback is called for the consumer which lost the TP and then `onPartitionsAssigned` is called for the new owner of the TP. And as Chris pointed out above, the error happens when trying to commit the offsets of a deleted TP. I was thinking can we leverage the `Consumer#partitionsFor` to first figure out if the topic exists or not. The API returns an empty list (atleast in newer versions) if the topic isn't found as per javadocs: [https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#partitionsFor(java.lang.String).] And then we issue a call to `position` api only if it exists. This way we can still prevent the connector from totally failing. Couple of noteworthy points => 1) This method entails making 2 remote calls to the broker which can be considered sub-optimal. However, we do this only when a consumer rebalance event happens (which hopefully shouldn't be too frequent or otherwise the real problems lie elsewhere). This could be a small tradeoff which can be paid instead of a failed connector. 2) There is still a chance that the topic existed when `partitionsFor` was invoked but got deleted by the time `position` is invoked. I don't think there's any atomic way of finding this out and the chances of this happening seems slightly rare to me. 3) Lastly, and FWIW the KafkaBasedLog within connect uses a [similar mechanism that the topic exists|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L246-L258], and eventually fails the connector if not created eventually as well(after some retries). WDYAT? > Sink connector fails if a topic matching its topics.regex gets deleted > ---------------------------------------------------------------------- > > Key: KAFKA-14750 > URL: https://issues.apache.org/jira/browse/KAFKA-14750 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 3.3.1 > Reporter: Sergei Morozov > Priority: Major > > Steps to reproduce: > # In {{{}config/connect-standalone.properties{}}}, set: > {code:bash} > plugin.path=libs/connect-file-3.3.1.jar > {code} > # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line > and add this one: > {code:bash} > topics.regex=connect-test-.* > {code} > # Start zookeeper: > {code:bash} > bin/zookeeper-server-start.sh config/zookeeper.properties > {code} > # Start the brokers: > {code:bash} > bin/kafka-server-start.sh config/server.properties > {code} > # Start the file sink connector: > {code:bash} > bin/connect-standalone.sh config/connect-standalone.properties > config/connect-file-sink.properties > {code} > # Create topics for the sink connector to subscribe to: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --create \ > --topic connect-test-$j > done & > done > wait > {code} > # Wait until all the created topics are assigned to the connector. Check the > number of partitions to be > 0 in the output of: > {code:bash} > bin/kafka-consumer-groups.sh \ > --bootstrap-server localhost:9092 \ > --group connect-local-file-sink \ > --describe --members > {code} > # Delete the created topics: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --delete \ > --topic connect-test-$j > echo Deleted topic connect-test-$j. > done & > done > wait > {code} > # Observe the connector fail with the following error: > {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms > expired before the position for partition connect-test-211-0 could be > determined > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)