[ 
https://issues.apache.org/jira/browse/BEAM-14518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17550040#comment-17550040
 ] 

Danny McCormick commented on BEAM-14518:
----------------------------------------

This issue has been migrated to https://github.com/apache/beam/issues/21610

> Support for reading Kafka topics from any startReadTime in Java
> ---------------------------------------------------------------
>
>                 Key: BEAM-14518
>                 URL: https://issues.apache.org/jira/browse/BEAM-14518
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>            Reporter: Balázs Németh
>            Priority: P2
>
> [https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198]
>  
> Right now the 'startReadTime' config for KafkaIO.Read looks up an offset in 
> every topic partition that is newer or equal to that timestamp. The problem 
> is that if we use a timestamp that is so new, that we don't have any 
> newer/equal message in the partition. In that case the code fails with an 
> exception. Meanwhile in certain cases it makes no sense as we could actually 
> make it work.
> If we don't get an offset from calling `consumer.offsetsForTimes`, we should 
> call `endOffsets`, and use the returned offset + 1. That is actually the 
> offset we will have to read next time.
> Even if `endOffsets` can't return an offset we could use 0 as the offset to 
> read from.
>  
> Am I missing something here? Is it okay to contribute this?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to