[ https://issues.apache.org/jira/browse/BEAM-14518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17550040#comment-17550040 ]
Danny McCormick commented on BEAM-14518: ---------------------------------------- This issue has been migrated to https://github.com/apache/beam/issues/21610 > Support for reading Kafka topics from any startReadTime in Java > --------------------------------------------------------------- > > Key: BEAM-14518 > URL: https://issues.apache.org/jira/browse/BEAM-14518 > Project: Beam > Issue Type: Bug > Components: io-java-kafka > Reporter: Balázs Németh > Priority: P2 > > [https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198] > > Right now the 'startReadTime' config for KafkaIO.Read looks up an offset in > every topic partition that is newer or equal to that timestamp. The problem > is that if we use a timestamp that is so new, that we don't have any > newer/equal message in the partition. In that case the code fails with an > exception. Meanwhile in certain cases it makes no sense as we could actually > make it work. > If we don't get an offset from calling `consumer.offsetsForTimes`, we should > call `endOffsets`, and use the returned offset + 1. That is actually the > offset we will have to read next time. > Even if `endOffsets` can't return an offset we could use 0 as the offset to > read from. > > Am I missing something here? Is it okay to contribute this? -- This message was sent by Atlassian Jira (v8.20.7#820007)