FWIW, there is a Beam presentation that has a very crisp set of rules
around watermarks. From memory it boils down to something like:

InputWatermark(stage) = min { OutputWatermark(stage') for stage' in
Upstream(stage) }
OutputWatermark(stage) = min { InputWatermark(stage), OldestWork(stage) }

OldestWork(stage) is the oldest message that has been received by the stage
but not yet processed.

- Chris

On Tue, May 30, 2017 at 1:39 PM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Xinyu,
>
> Thanks for the proposal. I took a quick pass and had the following
> questions/comments:
>
> - message shuffling ==> data shuffling???
>
> - the proposal is for all types of control messages, not just for
> end-of-stream, right? Better to define the scope and layout the comment
> requirements of control message delivery.
>
> - dropped option should go to “Rejected alternatives”
>
> - “Samza finds out the following intermediate streams that all the inputs
> have been end-of-stream” what does it mean? The task consuming the input
> stream(s) reconcile all EoS from all input stream partitions and then
> propagate EoS messages to all partitions in intermediate streams? This is
> not super clear to me.
>
> - in step-3, how does the consumer of intermediate streams know how many
> EOS messages should be received? And we should make it clear that it should
> be EOS / producer and the count of the downstream consumer is counting on
> the number of unique EOS from all producers from the upstream.
>
> - In comparison table, “checkpoint the control messages received” ==> is it
> referring to the partially accumulated upstream EOS messages?
>
> - Please make a clear definition on “Watermark” and “EndOfStream”. Why are
> they different? Are they both control messages that requires the same
> delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)?
> If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
> control message?
>
> - As for the serde for intermediate stream, I assume that we will need an
> envelope serde that is avro to wrap the user message and control message
> in? So, user-defined serde now only applies to the “UserMessage”? And
> what’s the message key in the message format?
>
> - A big question regarding to the watermark propagation: “When Samza
> receives watermark messages, it will emit a watermark with the earliest
> event time across all the stream partitions. No emission if the earliest
> event time doesn’t change.” Does the watermark propagation requires
> synchronization/coordination between all producers at the source? Say, if
> the task taking one input source emits watermark at 1min interval and the
> task taking another input source emits watermark at 5min interval, how does
> the downstream consumer reconcile the watermarks?
>
> - In the checkpoint message format, it seems that it is only design for
> watermark messages? Any streamId info that EoS is carrying over?
>
>
> Thanks a lot!
>
>
> -Yi
>
> On Tue, May 30, 2017 at 9:46 AM, xinyu liu <xinyuliu...@gmail.com> wrote:
>
> > Makes sense. I noticed that too and I dropped the ControlMessage type in
> my
> > code. I also moved taskName, taskCount to the parent ControlMessage
> class.
> > Just updated the SEP-6. Please take a look again.
> >
> > Thanks,
> > Xinyu
> >
> > On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
> > cpett...@linkedin.com.invalid> wrote:
> >
> > > MessageType and ControlMessage.Type look redundant. You could either
> use
> > > "ControlMessage" as the type in MessageType or drop
> ControlMessage.Type.
> > >
> > > On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xinyuliu...@gmail.com>
> > wrote:
> > >
> > > > Thanks a lot for the comments. I updated the SEP with more details
> and
> > > > clarification. Please let me know if you have further questions.
> > > >
> > > > Thanks,
> > > > Xinyu
> > > >
> > > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> > > > pmaheshw...@linkedin.com.invalid> wrote:
> > > >
> > > > > Hi Xinyu,
> > > > >
> > > > > Thanks for the proposal. Some requests for clarifications. Let's
> > update
> > > > the
> > > > > SEP directly instead of replying here.
> > > > >
> > > > > E.g., in "For any following intermediate stream whose input streams
> > are
> > > > all
> > > > > end-of-stream, it will be marked as pending EOS" - Should clarify
> > that
> > > > > (IIUC) something is injecting EOS messages in all intermediate
> stream
> > > > > partitions once it receives EOS from all input stream partitions
> it's
> > > > > consuming. Should also clarify what is that something.
> > > > > Same for "declare end of stream once all the EOS messages have been
> > > > > received." - What does this declaration involve and who is doing
> > this?
> > > > >
> > > > > In pro for approach 2: Not clear what this means - "The watermark
> can
> > > > > conclude the input messages before this watermark have been
> > complete."
> > > > >
> > > > > For the cons of approach 2: "Complicated failure scenario of the
> > second
> > > > > job. It needs to checkpoint all the watermark messages received, so
> > > when
> > > > it
> > > > > recovered from failure, it can still count." - How is this related
> to
> > > > EOS?
> > > > > How is this related to the checkpoint watermark section?
> > > > > Also, what is the "more messages required to write.. " referring
> to?
> > > > >
> > > > > "Samza needs to reconcile based on the task counts." - Please
> explain
> > > > what
> > > > > reconciliation means, why it needs to happen, and why we need to
> > track
> > > > the
> > > > > producer task and total task count in the watermark message to do
> > this.
> > > > >
> > > > > Checkpoint watermarks section is also unclear. What problem are we
> > > trying
> > > > > to solve here?
> > > > >
> > > > > Should also move the message format and the watermark message
> > interface
> > > > > sections to the bottom, since they depend on details in the event
> > time
> > > > and
> > > > > checkpoint watermark sections.
> > > > >
> > > > > Thanks,
> > > > > Prateek
> > > > >
> > > > >
> > > > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu <xinyuliu...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I created SEP-6 for SAMZA-1260
> > > > > > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support
> > > Watermark
> > > > > > Across Intermediate Streams for Batch Processing. The link to the
> > SEP
> > > > is
> > > > > > here:
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > > > > 6+Support+Watermark+Across+Intermediate+Streams+for+
> > Batch+Processing
> > > > > >
> > > > > > Please review and comments are welcome!
> > > > > >
> > > > > > Thanks,
> > > > > > Xinyu
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to