[ 
https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15559473#comment-15559473
 ] 

Amit Sela commented on BEAM-704:
--------------------------------

I forgot about consumer-group..
I didn't mean that Beam currently asks "start-checkpoint" for UnboundedSources, 
but I suggested that it would be possible, so instead of assigning 
topic/partitions and asking for offsets on the worker, it would have all the 
information (topic-partition-offset) prior to splitting and they would be a 
property of the Source instead of the Reader (if there is a previous 
CheckpointMark the Reader would ignore them). I do understand how this might be 
problematic if the driver may run in an environment that can't connect with the 
Kafka cluster (Spark can run the driver on cluster as well).   

> KafkaIO should handle "latest offset" evenly, and persist it as part of the 
> CheckpointMark.
> -------------------------------------------------------------------------------------------
>
>                 Key: BEAM-704
>                 URL: https://issues.apache.org/jira/browse/BEAM-704
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-extensions
>            Reporter: Amit Sela
>            Assignee: Raghu Angadi
>
> Currently, the KafkaIO (when configured to "latest") will check the latest 
> offset on the worker. This means that each worker sees a "different" latest 
> for the time it checks for the partitions assigned to it.
> This also means that if a worker fails before starting to read, and new 
> messages were added in between, they would be missed.
> I think we should consider checking the offsets (could be the same for 
> "earliest") when running initialSplits (that's how Spark does that as well, 
> one call from the driver for all topic-partitions).
> I'd also suggest we persist the latest offset as part of the CheckpointMark 
> so that once latest is set, it is remembered until new messages arrive and it 
> doesn't need to be resolved again (and if there were new messages available 
> they won't be missed upon failure).
> For Spark this is even more important as state is passed in-between 
> micro-batches and sparse partitions may skip messages until a message finally 
> arrives within the read time-frame. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to