Martin Kleppmann created SAMZA-234:
--------------------------------------

             Summary: SystemConsumer should signal when it has consumed all 
available input
                 Key: SAMZA-234
                 URL: https://issues.apache.org/jira/browse/SAMZA-234
             Project: Samza
          Issue Type: Improvement
            Reporter: Martin Kleppmann


Extracting this out of SAMZA-179. As suggested by [~jghoman] in a 
[comment|https://issues.apache.org/jira/browse/SAMZA-179?focusedCommentId=13947290&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13947290],
 there should be a way for a SystemConsumer to indicate that it has caught up 
(i.e. there currently isn't any data left to consume).

This will be useful for anything that needs to change behaviour depending on 
whether it is backlogged. For example, bootstrap streams (with 
{{systems.\*.streams.\*.samza.bootstrap=true}}) are prioritised over all other 
inputs until the stream has caught up. And SAMZA-179 is proposing support for 
batch-style reprocessing jobs, which need to know when they are done.

This is currently done by comparing the offset on an IncomingMessageEnvelope to 
the newest offset reported by the SystemAdmin. That works for Kafka, but not 
for Databus (which doesn't currently have an API for getting the newest 
offset), and not for files (if the offset is the byte offset of the start of a 
record in the file, there can never be a record whose byte offset is equal to 
the length of the file, assuming every record is more than zero bytes long). 
This indicates that the logic for detecting how far behind a consumer is should 
be specific to the system, not an offset calculation in samza-core.

One way for SystemConsumer to signal that it has caught up would be for 
SystemConsumer.poll to return an empty list of IncomingMessageEnvelopes. 
However, that may be dangerous, because there could be reasons for it to return 
an empty list even if it hasn't caught up. A better solution would therefore be 
to add a method like isCaughtUp() to the SystemConsumer interface.

To discuss:

* Should it report caught-up state for each partition individually, or for all 
partitions collectively?
* Should backlog be reported as a boolean (caught up or not), or as a number (a 
kind of progress meter, cf. SAMZA-228)?



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to