[
https://issues.apache.org/jira/browse/SAMZA-234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Martin Kleppmann updated SAMZA-234:
-----------------------------------
Assignee: (was: Martin Kleppmann)
> SystemConsumer should signal when it has consumed all available input
> ---------------------------------------------------------------------
>
> Key: SAMZA-234
> URL: https://issues.apache.org/jira/browse/SAMZA-234
> Project: Samza
> Issue Type: Improvement
> Reporter: Martin Kleppmann
>
> Extracting this out of SAMZA-179. As suggested by [~jghoman] in a
> [comment|https://issues.apache.org/jira/browse/SAMZA-179?focusedCommentId=13947290&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13947290],
> there should be a way for a SystemConsumer to indicate that it has caught up
> (i.e. there currently isn't any data left to consume).
> This will be useful for anything that needs to change behaviour depending on
> whether it is backlogged. For example, bootstrap streams (with
> {{systems.\*.streams.\*.samza.bootstrap=true}}) are prioritised over all
> other inputs until the stream has caught up. And SAMZA-179 is proposing
> support for batch-style reprocessing jobs, which need to know when they are
> done.
> This is currently done by comparing the offset on an IncomingMessageEnvelope
> to the newest offset reported by the SystemAdmin. That works for Kafka, but
> not for Databus (which doesn't currently have an API for getting the newest
> offset), and not for files (if the offset is the byte offset of the start of
> a record in the file, there can never be a record whose byte offset is equal
> to the length of the file, assuming every record is more than zero bytes
> long). This indicates that the logic for detecting how far behind a consumer
> is should be specific to the system, not an offset calculation in samza-core.
> One way for SystemConsumer to signal that it has caught up would be for
> SystemConsumer.poll to return an empty list of IncomingMessageEnvelopes.
> However, that may be dangerous, because there could be reasons for it to
> return an empty list even if it hasn't caught up. A better solution would
> therefore be to add a method like isCaughtUp() to the SystemConsumer
> interface.
> To discuss:
> * Should it report caught-up state for each partition individually, or for
> all partitions collectively?
> * Should backlog be reported as a boolean (caught up or not), or as a number
> (a kind of progress meter, cf. SAMZA-228)?
--
This message was sent by Atlassian JIRA
(v6.2#6252)