[
https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15555737#comment-15555737
]
Raghu Angadi commented on BEAM-704:
-----------------------------------
[[email protected]] expanded a bit more on this. The issue you might be
pointing to is that there is no offset stored in the checkpoint for a partition
if the reader hasn't ever read a record. That should be easily fixable, the
reader can set the current offset in reader.start() :
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L935
> KafkaIO should handle "latest offset" evenly, and persist it as part of the
> CheckpointMark.
> -------------------------------------------------------------------------------------------
>
> Key: BEAM-704
> URL: https://issues.apache.org/jira/browse/BEAM-704
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-extensions
> Reporter: Amit Sela
>
> Currently, the KafkaIO (when configured to "latest") will check the latest
> offset on the worker. This means that each worker sees a "different" latest
> for the time it checks for the partitions assigned to it.
> This also means that if a worker fails before starting to read, and new
> messages were added in between, they would be missed.
> I think we should consider checking the offsets (could be the same for
> "earliest") when running initialSplits (that's how Spark does that as well,
> one call from the driver for all topic-partitions).
> I'd also suggest we persist the latest offset as part of the CheckpointMark
> so that once latest is set, it is remembered until new messages arrive and it
> doesn't need to be resolved again (and if there were new messages available
> they won't be missed upon failure).
> For Spark this is even more important as state is passed in-between
> micro-batches and sparse partitions may skip messages until a message finally
> arrives within the read time-frame.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)