[ 
https://issues.apache.org/jira/browse/BEAM-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17049968#comment-17049968
 ] 

Jozef Vilcek commented on BEAM-9420:
------------------------------------

[~iemejia], I do not see how. The timeout is raised outside of consumer, here:

[https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L118]

Waiting for completion of this method:

[https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L625]

which is executed async. Comment in the code says that `consumer.seek()` can 
block forever sometimes therefore there is an async Future with timeout. This 
default timeout does not have to be generous enough. This explicit timeout 
might not be needed in higher versions of kafka client thanks to

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior]
 and new `default.api.timeout.ms` setting

 

> Configurable timeout for Kafka setupInitialOffset()
> ---------------------------------------------------
>
>                 Key: BEAM-9420
>                 URL: https://issues.apache.org/jira/browse/BEAM-9420
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.19.0
>            Reporter: Jozef Vilcek
>            Assignee: Jozef Vilcek
>            Priority: Major
>
> If bootstrap brokers does contain an unhealthy server, it can break the start 
> of a whole Beam job. During the start, `KafkaUnboundedReader` is waiting for  
> `setupInitialOffset()`. Wait timeout is either a double time of `request. 
> timeout.ms` or some default constant. In both cases, it might not be enough 
> time for kafka-client to initiate fallback and retry metadata discovery via 
> another broker from given bootstrap list.
> The client should be able to specify timeout for `setupInitialOffset()` 
> explicitly as a setting to KafkaIO read.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to