[ 
https://issues.apache.org/jira/browse/SAMZA-234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Kleppmann updated SAMZA-234:
-----------------------------------

    Assignee:     (was: Martin Kleppmann)

> SystemConsumer should signal when it has consumed all available input
> ---------------------------------------------------------------------
>
>                 Key: SAMZA-234
>                 URL: https://issues.apache.org/jira/browse/SAMZA-234
>             Project: Samza
>          Issue Type: Improvement
>            Reporter: Martin Kleppmann
>
> Extracting this out of SAMZA-179. As suggested by [~jghoman] in a 
> [comment|https://issues.apache.org/jira/browse/SAMZA-179?focusedCommentId=13947290&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13947290],
>  there should be a way for a SystemConsumer to indicate that it has caught up 
> (i.e. there currently isn't any data left to consume).
> This will be useful for anything that needs to change behaviour depending on 
> whether it is backlogged. For example, bootstrap streams (with 
> {{systems.\*.streams.\*.samza.bootstrap=true}}) are prioritised over all 
> other inputs until the stream has caught up. And SAMZA-179 is proposing 
> support for batch-style reprocessing jobs, which need to know when they are 
> done.
> This is currently done by comparing the offset on an IncomingMessageEnvelope 
> to the newest offset reported by the SystemAdmin. That works for Kafka, but 
> not for Databus (which doesn't currently have an API for getting the newest 
> offset), and not for files (if the offset is the byte offset of the start of 
> a record in the file, there can never be a record whose byte offset is equal 
> to the length of the file, assuming every record is more than zero bytes 
> long). This indicates that the logic for detecting how far behind a consumer 
> is should be specific to the system, not an offset calculation in samza-core.
> One way for SystemConsumer to signal that it has caught up would be for 
> SystemConsumer.poll to return an empty list of IncomingMessageEnvelopes. 
> However, that may be dangerous, because there could be reasons for it to 
> return an empty list even if it hasn't caught up. A better solution would 
> therefore be to add a method like isCaughtUp() to the SystemConsumer 
> interface.
> To discuss:
> * Should it report caught-up state for each partition individually, or for 
> all partitions collectively?
> * Should backlog be reported as a boolean (caught up or not), or as a number 
> (a kind of progress meter, cf. SAMZA-228)?



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to