[ 
https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17699384#comment-17699384
 ] 

Sagar Rao commented on KAFKA-14750:
-----------------------------------

Thanks [~morozov] for filing this and [~ChrisEgerton] for trying this out and 
your great explanation. IIUC, the error arrives at the point when we are trying 
to fetch the position of a topic partition of a deleted topic. At that point, 
the position API throws a `TimeoutException` which has a default timeout of 60s.

 

Also, if I understood the flow of the Kafka Consumer correctly, then first the 
`onPartitionsRevoked` callback is called for the consumer which lost the TP and 
then `onPartitionsAssigned` is called for the new owner of the TP. And as Chris 
pointed out above, the error happens when trying to commit the offsets of a 
deleted TP. 

I was thinking can we leverage the `Consumer#partitionsFor` to first figure out 
if the topic exists or not. The API returns an empty list (atleast in newer 
versions) if the topic isn't found as per javadocs: 
[https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#partitionsFor(java.lang.String).]

 

And then we issue a call to `position` api only if it exists. This way we can 
still prevent the connector from totally failing. Couple of noteworthy points =>

 

1) This method entails making 2 remote calls to the broker which can be 
considered sub-optimal. However, we do this only when a consumer rebalance 
event happens (which hopefully shouldn't be too frequent or otherwise the real 
problems lie elsewhere). This could be a small tradeoff which can be paid 
instead of a failed connector.

2) There is still a chance that the topic existed when `partitionsFor` was 
invoked but got deleted by the time `position` is invoked. I don't think 
there's any atomic way of finding this out and the chances of this happening 
seems slightly rare to me.

3) Lastly, and FWIW the KafkaBasedLog within connect uses a [similar mechanism 
that the topic 
exists|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L246-L258],
 and eventually fails the connector if not created eventually as well(after 
some retries). 

WDYAT?

 

 

> Sink connector fails if a topic matching its topics.regex gets deleted
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-14750
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14750
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 3.3.1
>            Reporter: Sergei Morozov
>            Priority: Major
>
> Steps to reproduce:
> # In {{{}config/connect-standalone.properties{}}}, set:
> {code:bash}
> plugin.path=libs/connect-file-3.3.1.jar
> {code}
> # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line 
> and add this one:
> {code:bash}
> topics.regex=connect-test-.*
> {code}
> # Start zookeeper:
> {code:bash}
> bin/zookeeper-server-start.sh config/zookeeper.properties
> {code}
> # Start the brokers:
> {code:bash}
> bin/kafka-server-start.sh config/server.properties
> {code}
> # Start the file sink connector:
> {code:bash}
> bin/connect-standalone.sh config/connect-standalone.properties 
> config/connect-file-sink.properties
> {code}
> # Create topics for the sink connector to subscribe to:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
>     bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --create \
>         --topic connect-test-$j
>   done &
> done
> wait
> {code}
> # Wait until all the created topics are assigned to the connector. Check the 
> number of partitions to be > 0 in the output of:
> {code:bash}
> bin/kafka-consumer-groups.sh \
>     --bootstrap-server localhost:9092 \
>     --group connect-local-file-sink \
>     --describe --members
> {code}
> # Delete the created topics:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
>     bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --delete \
>         --topic connect-test-$j
>     echo Deleted topic connect-test-$j.
>   done &
> done
> wait
> {code}
> # Observe the connector fail with the following error:
> {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms 
> expired before the position for partition connect-test-211-0 could be 
> determined
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to