[ 
https://issues.apache.org/jira/browse/SAMZA-234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13969469#comment-13969469
 ] 

Martin Kleppmann commented on SAMZA-234:
----------------------------------------

bq. …a third approach would be to just have a count for the number of times the 
consumer has reached the head of the partition.

I don't quite follow what you mean. Whether a consumer is caught up or not is 
really a state, which may change at any time. How often the state changes may 
depend on various implementation details, such as how often we query the 
brokers to find the most recent offset (in the Kafka case). Are you proposing 
that we count the number of state transitions from "lagging" to "caught up"? Or 
the number of messages fetched while in a caught-up state? Or something else? 
The counter feels a bit ill-defined to me.

Another issue is that some systems may have a loose definition of "caught up", 
e.g. a system that does not report queue length may need to use a message 
timestamp as a proxy for backlog. While a system is on the boundary between 
"lagging" and "caught up", it could change state quite frequently.

For bootstrap stream purposes (ending the bootstrap stream's exclusivity period 
when it has caught up), I'd say that BootstrappingChooser should maintain the 
state of whether the bootstrapping has completed. I'm not so keen on leaking 
that into the general API for systems.

> SystemConsumer should signal when it has consumed all available input
> ---------------------------------------------------------------------
>
>                 Key: SAMZA-234
>                 URL: https://issues.apache.org/jira/browse/SAMZA-234
>             Project: Samza
>          Issue Type: Improvement
>            Reporter: Martin Kleppmann
>            Assignee: Martin Kleppmann
>
> Extracting this out of SAMZA-179. As suggested by [~jghoman] in a 
> [comment|https://issues.apache.org/jira/browse/SAMZA-179?focusedCommentId=13947290&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13947290],
>  there should be a way for a SystemConsumer to indicate that it has caught up 
> (i.e. there currently isn't any data left to consume).
> This will be useful for anything that needs to change behaviour depending on 
> whether it is backlogged. For example, bootstrap streams (with 
> {{systems.\*.streams.\*.samza.bootstrap=true}}) are prioritised over all 
> other inputs until the stream has caught up. And SAMZA-179 is proposing 
> support for batch-style reprocessing jobs, which need to know when they are 
> done.
> This is currently done by comparing the offset on an IncomingMessageEnvelope 
> to the newest offset reported by the SystemAdmin. That works for Kafka, but 
> not for Databus (which doesn't currently have an API for getting the newest 
> offset), and not for files (if the offset is the byte offset of the start of 
> a record in the file, there can never be a record whose byte offset is equal 
> to the length of the file, assuming every record is more than zero bytes 
> long). This indicates that the logic for detecting how far behind a consumer 
> is should be specific to the system, not an offset calculation in samza-core.
> One way for SystemConsumer to signal that it has caught up would be for 
> SystemConsumer.poll to return an empty list of IncomingMessageEnvelopes. 
> However, that may be dangerous, because there could be reasons for it to 
> return an empty list even if it hasn't caught up. A better solution would 
> therefore be to add a method like isCaughtUp() to the SystemConsumer 
> interface.
> To discuss:
> * Should it report caught-up state for each partition individually, or for 
> all partitions collectively?
> * Should backlog be reported as a boolean (caught up or not), or as a number 
> (a kind of progress meter, cf. SAMZA-228)?



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to