FWIW, there is a Beam presentation that has a very crisp set of rules around watermarks. From memory it boils down to something like:
InputWatermark(stage) = min { OutputWatermark(stage') for stage' in Upstream(stage) } OutputWatermark(stage) = min { InputWatermark(stage), OldestWork(stage) } OldestWork(stage) is the oldest message that has been received by the stage but not yet processed. - Chris On Tue, May 30, 2017 at 1:39 PM, Yi Pan <nickpa...@gmail.com> wrote: > Hi, Xinyu, > > Thanks for the proposal. I took a quick pass and had the following > questions/comments: > > - message shuffling ==> data shuffling??? > > - the proposal is for all types of control messages, not just for > end-of-stream, right? Better to define the scope and layout the comment > requirements of control message delivery. > > - dropped option should go to “Rejected alternatives” > > - “Samza finds out the following intermediate streams that all the inputs > have been end-of-stream” what does it mean? The task consuming the input > stream(s) reconcile all EoS from all input stream partitions and then > propagate EoS messages to all partitions in intermediate streams? This is > not super clear to me. > > - in step-3, how does the consumer of intermediate streams know how many > EOS messages should be received? And we should make it clear that it should > be EOS / producer and the count of the downstream consumer is counting on > the number of unique EOS from all producers from the upstream. > > - In comparison table, “checkpoint the control messages received” ==> is it > referring to the partially accumulated upstream EOS messages? > > - Please make a clear definition on “Watermark” and “EndOfStream”. Why are > they different? Are they both control messages that requires the same > delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)? > If yes, should we make the “watermark” vs “EndOfStream” a sub-category in > control message? > > - As for the serde for intermediate stream, I assume that we will need an > envelope serde that is avro to wrap the user message and control message > in? So, user-defined serde now only applies to the “UserMessage”? And > what’s the message key in the message format? > > - A big question regarding to the watermark propagation: “When Samza > receives watermark messages, it will emit a watermark with the earliest > event time across all the stream partitions. No emission if the earliest > event time doesn’t change.” Does the watermark propagation requires > synchronization/coordination between all producers at the source? Say, if > the task taking one input source emits watermark at 1min interval and the > task taking another input source emits watermark at 5min interval, how does > the downstream consumer reconcile the watermarks? > > - In the checkpoint message format, it seems that it is only design for > watermark messages? Any streamId info that EoS is carrying over? > > > Thanks a lot! > > > -Yi > > On Tue, May 30, 2017 at 9:46 AM, xinyu liu <xinyuliu...@gmail.com> wrote: > > > Makes sense. I noticed that too and I dropped the ControlMessage type in > my > > code. I also moved taskName, taskCount to the parent ControlMessage > class. > > Just updated the SEP-6. Please take a look again. > > > > Thanks, > > Xinyu > > > > On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt < > > cpett...@linkedin.com.invalid> wrote: > > > > > MessageType and ControlMessage.Type look redundant. You could either > use > > > "ControlMessage" as the type in MessageType or drop > ControlMessage.Type. > > > > > > On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xinyuliu...@gmail.com> > > wrote: > > > > > > > Thanks a lot for the comments. I updated the SEP with more details > and > > > > clarification. Please let me know if you have further questions. > > > > > > > > Thanks, > > > > Xinyu > > > > > > > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari < > > > > pmaheshw...@linkedin.com.invalid> wrote: > > > > > > > > > Hi Xinyu, > > > > > > > > > > Thanks for the proposal. Some requests for clarifications. Let's > > update > > > > the > > > > > SEP directly instead of replying here. > > > > > > > > > > E.g., in "For any following intermediate stream whose input streams > > are > > > > all > > > > > end-of-stream, it will be marked as pending EOS" - Should clarify > > that > > > > > (IIUC) something is injecting EOS messages in all intermediate > stream > > > > > partitions once it receives EOS from all input stream partitions > it's > > > > > consuming. Should also clarify what is that something. > > > > > Same for "declare end of stream once all the EOS messages have been > > > > > received." - What does this declaration involve and who is doing > > this? > > > > > > > > > > In pro for approach 2: Not clear what this means - "The watermark > can > > > > > conclude the input messages before this watermark have been > > complete." > > > > > > > > > > For the cons of approach 2: "Complicated failure scenario of the > > second > > > > > job. It needs to checkpoint all the watermark messages received, so > > > when > > > > it > > > > > recovered from failure, it can still count." - How is this related > to > > > > EOS? > > > > > How is this related to the checkpoint watermark section? > > > > > Also, what is the "more messages required to write.. " referring > to? > > > > > > > > > > "Samza needs to reconcile based on the task counts." - Please > explain > > > > what > > > > > reconciliation means, why it needs to happen, and why we need to > > track > > > > the > > > > > producer task and total task count in the watermark message to do > > this. > > > > > > > > > > Checkpoint watermarks section is also unclear. What problem are we > > > trying > > > > > to solve here? > > > > > > > > > > Should also move the message format and the watermark message > > interface > > > > > sections to the bottom, since they depend on details in the event > > time > > > > and > > > > > checkpoint watermark sections. > > > > > > > > > > Thanks, > > > > > Prateek > > > > > > > > > > > > > > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu <xinyuliu...@gmail.com > > > > > > wrote: > > > > > > > > > > > Hi all, > > > > > > > > > > > > I created SEP-6 for SAMZA-1260 > > > > > > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support > > > Watermark > > > > > > Across Intermediate Streams for Batch Processing. The link to the > > SEP > > > > is > > > > > > here: > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP- > > > > > > 6+Support+Watermark+Across+Intermediate+Streams+for+ > > Batch+Processing > > > > > > > > > > > > Please review and comments are welcome! > > > > > > > > > > > > Thanks, > > > > > > Xinyu > > > > > > > > > > > > > > > > > > > > >