[ 
https://issues.apache.org/jira/browse/SAMZA-234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13966903#comment-13966903
 ] 

Chris Riccomini commented on SAMZA-234:
---------------------------------------

bq. One way for SystemConsumer to signal that it has caught up would be for 
SystemConsumer.poll to return an empty list of IncomingMessageEnvelopes. 
However, that may be dangerous, because there could be reasons for it to return 
an empty list even if it hasn't caught up. A better solution would therefore be 
to add a method like isCaughtUp() to the SystemConsumer interface.

I don't think we can rely on an empty list meaning that the stream has caught 
up, since we allow a timeout. A timeout > 0 allows an empty list to be returned 
in cases where the stream/partition is NOT caught up, but no new messages are 
available (maybe because a fetch is happening on another thread). We had some 
discussion about this when we were building the state management restore 
feature.

Some method on SystemConsumer seems reasonable to me. I'm also not sure how 
well this change is going to integrate into the DefaultChooser. Right now, the 
chooser has no knowledge of the SystemConsumer. Everything is signaled through 
the IncomingMessageEnvelope. Maybe isCaughtUp can go in the 
SystemStreamPartition's metadata instead?

bq. Should backlog be reported as a boolean (caught up or not), or as a number 
(a kind of progress meter, cf. SAMZA-228)?

As for how the backlog should be reported, a third approach would be to just 
have a count for the number of times the consumer has reached the head of the 
partition. This allows you to differentiate between the FIRST (isCaughtUp==1) 
time that the consumer reaches head, and subsequent times (isCaughtUp > 1). 
This would be useful in the DefaultChooser, where we want to ONLY read messages 
from a bootstrap stream until the FIRST time that the consume reaches head for 
all partitions. This is something that can't be done with boolean. I also like 
the progress meter idea that you've proposed, and think it provides a very 
close proxy to the counter. The only difference with the counter is that you 
can also see if the stream was caught up in the past; if the progress bar is 
not caught up, there's no indication if it was before, but fell behind again. 
On the flip side, the counter gives no measure of lag.

bq. Should it report caught-up state for each partition individually, or for 
all partitions collectively?

Instinctively, I feel like we should do it per-partition, but it's definitely 
more cumbersome than per-stream. Right now, the DefaultChooser does its 
bootstrapping at the per-partition granularity.

> SystemConsumer should signal when it has consumed all available input
> ---------------------------------------------------------------------
>
>                 Key: SAMZA-234
>                 URL: https://issues.apache.org/jira/browse/SAMZA-234
>             Project: Samza
>          Issue Type: Improvement
>            Reporter: Martin Kleppmann
>            Assignee: Martin Kleppmann
>
> Extracting this out of SAMZA-179. As suggested by [~jghoman] in a 
> [comment|https://issues.apache.org/jira/browse/SAMZA-179?focusedCommentId=13947290&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13947290],
>  there should be a way for a SystemConsumer to indicate that it has caught up 
> (i.e. there currently isn't any data left to consume).
> This will be useful for anything that needs to change behaviour depending on 
> whether it is backlogged. For example, bootstrap streams (with 
> {{systems.\*.streams.\*.samza.bootstrap=true}}) are prioritised over all 
> other inputs until the stream has caught up. And SAMZA-179 is proposing 
> support for batch-style reprocessing jobs, which need to know when they are 
> done.
> This is currently done by comparing the offset on an IncomingMessageEnvelope 
> to the newest offset reported by the SystemAdmin. That works for Kafka, but 
> not for Databus (which doesn't currently have an API for getting the newest 
> offset), and not for files (if the offset is the byte offset of the start of 
> a record in the file, there can never be a record whose byte offset is equal 
> to the length of the file, assuming every record is more than zero bytes 
> long). This indicates that the logic for detecting how far behind a consumer 
> is should be specific to the system, not an offset calculation in samza-core.
> One way for SystemConsumer to signal that it has caught up would be for 
> SystemConsumer.poll to return an empty list of IncomingMessageEnvelopes. 
> However, that may be dangerous, because there could be reasons for it to 
> return an empty list even if it hasn't caught up. A better solution would 
> therefore be to add a method like isCaughtUp() to the SystemConsumer 
> interface.
> To discuss:
> * Should it report caught-up state for each partition individually, or for 
> all partitions collectively?
> * Should backlog be reported as a boolean (caught up or not), or as a number 
> (a kind of progress meter, cf. SAMZA-228)?



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to