David, When it emits END_STREAM, WindowIdActivatedReservoir is replaced with the reservoir which's active for all the (subsequent) windows. The logic you quoted below works hand in hand with the END_STREAM handling in the GenericNode. So during recovery, GenericNode will always see it as the first tuple but the operator itself or downstream Nodes will never see it.
Not sure what you mean by: >Is it safe to treat the END_STREAM tuple just as RESET_TUPLE at recovery >from a checkpoint? ‹ Chetan On 12/7/15, 5:37 PM, "David Yan" <[email protected]> wrote: >I traced to this piece of code in WindowIdActivatedReservoir.java below. >Chetan, please let me know if you recall the idea behind this. The commit >message of the EndStreamTuple constructor call says something about making >a recovery test pass. > >@Override >public Tuple sweep() >{ > Tuple t; > while ((t = reservoir.sweep()) != null) { > if (t.getType() == MessageType.BEGIN_WINDOW && t.getWindowId() > >windowId) { > reservoir.setSink(sink); > return (est = new EndStreamTuple(windowId)); > } > reservoir.remove(); > } > > return null; >} > >On Mon, Dec 7, 2015 at 3:02 PM, David Yan <[email protected]> wrote: > >> Anybody have any idea? Is it safe to treat the END_STREAM tuple just as >> RESET_TUPLE at recovery from a checkpoint? >> >> David >> >> On Fri, Dec 4, 2015 at 10:34 AM, David Yan <[email protected]> >>wrote: >> >>> Hi all, >>> >>> At recovery from a checkpoint, I see END_STREAM control tuple as the >>> first tuple sent from upstream. That seems a little counter intuitive >>> because a stream is actually starting. Can someone shed some light on >>>this? >>> >>> Thanks! >>> >>> David >>> >> >>
