I quickly updated the draft PR that would propagate idleness information to
the Sink function, based on the recent improvement provided by
FLINK-18934.  For illustration purposes.
https://github.com/streamnative/flink/pull/2

On Thu, Jun 10, 2021 at 11:34 AM Eron Wright <ewri...@streamnative.io>
wrote:

> Regarding records vs watermarks, I feel it is wrong to include records in
> the considerations, because the clearest definition of idleness (IMO) is
> 'active participation in advancing the event-time clock', and records don't
> directly affect the clock.  Of course, records indirectly influence the
> clock by stimulating a generator.
>
> Let's focus on the problem that Arvid mentioned about the need to briefly
> toggle idleness (as implemented by the AnnouncedStatus class).  Seems to me
> that the idleness of an operator's inputs need not strictly determine
> whether its output is idle.  The operator should be able to react to status
> changes on a given input (implemented in FLINK-18934), and this MAY cause a
> change to the output status at the operator's discretion.  The default
> behavior would be passthrough.  Meanwhile, when a given operator emits a
> watermark, it is re-asserting itself as a participant in advancing the
> downstream event time clock, and its output channel should transition to
> active and remain active.  An operator should also be able to mark its
> output channel(s) as idle, to complete the framework.
>
> In concept, a watermark generator somewhere in the pipeline could 'take
> control' of the event time clock when its input channel transitions to
> idle.  The upstream source is relinquishing control of the clock in that
> situation.
>
> BTW, I recommend looking at the PR of FLINK-18934 because it lays bare the
> whole pipeline.  Nice work there Dawid!  To better reflect the decoupling
> of input from output idleness, "AbstractStreamOperator::emitStreamStatus"
> should be named "processStreamStatus" and call an overridable method to
> emit the status change whenever the combined idleness flips.  This would
> facilitate an idleness-aware watermark generator and an idleness-aware sink.
>
>
> On Thu, Jun 10, 2021 at 3:31 AM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Thanks for providing these details Gordon. I have to admit that I do not
>> fully follow the reasoning why periodic watermark generators forced us to
>> define idleness for records. Is it because the idleness was generated
>> based
>> on the non-availability of more data in the sources and not in the
>> watermark generators which are executed after the records have been read
>> from the external system? So was the problem where the stream status was
>> decided in the end?
>>
>> If there is a periodic watermark generator somewhere in the pipeline that
>> periodically generates watermarks, then we don't have to mark its output
>> channels as watermark idle because watermarks are being sent. Hence, given
>> that the watermark generation logic makes sense, the overall job should be
>> able to make progress. If the watermark generator is informed about its
>> input channel status, it could even decide whether to propagate the
>> watermark idleness and stop generating watermarks or not. Of course, this
>> leaves room for people shooting themselves into their feet.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 10, 2021 at 5:44 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>> wrote:
>>
>> > Forgot to provide the link to the [1] reference:
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-5017
>> >
>> > On Thu, Jun 10, 2021 at 11:43 AM Tzu-Li (Gordon) Tai <
>> tzuli...@apache.org>
>> > wrote:
>> >
>> > > Hi everyone,
>> > >
>> > > Sorry for chiming in late here.
>> > >
>> > > Regarding the topic of changing the definition of StreamStatus and
>> > > changing the name as well:
>> > > After digging into some of the roots of this implementation [1],
>> > initially
>> > > the StreamStatus was actually defined to mark "watermark idleness",
>> and
>> > not
>> > > "record idleness" (in fact, the alternative name "WatermarkStatus" was
>> > > considered at the time).
>> > >
>> > > The concern at the time causing us to alter the definition to be
>> "record
>> > > idleness" in the final implementation was due to the existence of
>> > periodic
>> > > timestamp / watermark generators within the pipeline. Those would
>> > continue
>> > > to generate non-increasing watermarks in the absence of any input
>> records
>> > > from upstream. In this scenario, downstream operators would not be
>> able
>> > to
>> > > consider that channel as idle and therefore watermark progress is
>> locked.
>> > > We could consider a timeout-based approach on those specific
>> operators to
>> > > toggle watermark idleness if the values remain constant for a period
>> of
>> > > time, but then again, this is very ill-defined and most likely wrong.
>> > >
>> > > I have not followed the newest changes to the watermark generator
>> > > operators and am not sure if this issue is still relevant.
>> > > Otherwise, I don't see other problems with changing the definition
>> here.
>> > >
>> > > Thanks,
>> > > Gordon
>> > >
>> > > On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise <ar...@apache.org> wrote:
>> > >
>> > >> Hi Eron,
>> > >>
>> > >> again to recap from the other thread:
>> > >> - You are right that idleness is correct with static assignment and
>> > fully
>> > >> active partitions. In this case, the source defines idleness. (case
>> A)
>> > >> - For the more pressing use cases of idle, assigned partitions, the
>> user
>> > >> defines an idleness threshold, and it becomes potentially incorrect,
>> > when
>> > >> the partition becomes active again. (case B)
>> > >> - Same holds for dynamic assignment of splits. If a source without a
>> > split
>> > >> gets a split assigned dynamically, there is a realistic chance that
>> the
>> > >> watermark advanced past the first record of the newly assigned split.
>> > >> (case
>> > >> C)
>> > >> You can certainly insist that only the first case is valid (as it's
>> > >> correct) but we know that users use it in other ways and that was
>> also
>> > the
>> > >> intent of the devs.
>> > >>
>> > >> Now the question could be if it makes sense to distinguish these
>> cases.
>> > >> Would you treat the idleness information differently (especially in
>> the
>> > >> sink/source that motivated FLIP-167) if you knew that the idleness is
>> > >> guaranteed correct?
>> > >> We could have some WatermarkStatus with ACTIVE, IDLE (case A),
>> TIMEOUT
>> > >> (case B).
>> > >>
>> > >> However, that would still leave case C, which probably would need to
>> be
>> > >> solved completely differently. I could imagine that a source with
>> > dynamic
>> > >> assignments should never have IDLE subtasks and rather manage the
>> > idleness
>> > >> itself. For example, it could emit a watermark per second/minute
>> that is
>> > >> directly fetched from the source system. I'm just not sure if the
>> > current
>> > >> WatermarkAssigner interface suffices in that regard...
>> > >>
>> > >>
>> > >> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <
>> piotr.nowoj...@gmail.com
>> > >
>> > >> wrote:
>> > >>
>> > >> > Hi Eron,
>> > >> >
>> > >> > Can you elaborate a bit more what do you mean? I don’t understand
>> what
>> > >> do
>> > >> > you mean by more general solution.
>> > >> >
>> > >> > As of now, stream is marked idle by a source/watermark generator,
>> > which
>> > >> > has an effect of temporarily ignoring this stream/partition from
>> > >> > calculating min watermark in the downstream tasks. However stream
>> is
>> > >> > switching back to active when any record is emitted. This is what’s
>> > >> causing
>> > >> > problems described by Arvid.
>> > >> >
>> > >> > The core of our proposal is very simple. Keep everything as it is
>> > except
>> > >> > stating that stream will be changed back to active only once a
>> > >> watermark is
>> > >> > emitted again - not record. In other words disconnecting idleness
>> from
>> > >> > presence of records and connecting it only to presence or lack of
>> > >> > watermarks and allowing to emit records while “stream status” is
>> > “idle”
>> > >> >
>> > >> > Piotrek
>> > >> >
>> > >> >
>> > >> > > Wiadomość napisana przez Eron Wright <ewri...@streamnative.io
>> > >> .invalid>
>> > >> > w dniu 09.06.2021, o godz. 06:01:
>> > >> > >
>> > >> > > It seems to me that idleness was introduced to deal with a very
>> > >> specific
>> > >> > > issue.  In the pipeline, watermarks are aggregated not on a
>> > per-split
>> > >> > basis
>> > >> > > but on a per-subtask basis.  This works well when each subtask
>> has
>> > >> > exactly
>> > >> > > one split.  When a sub-task has multiple splits, various
>> > complications
>> > >> > > occur involving the commingling of watermarks.  And when a
>> sub-task
>> > >> has
>> > >> > no
>> > >> > > splits, the pipeline stalls altogether.  To deal with the latter
>> > >> problem,
>> > >> > > idleness was introduced.  The sub-task simply declares itself to
>> be
>> > >> idle
>> > >> > to
>> > >> > > be taken out of consideration for purposes of watermark
>> aggregation.
>> > >> > >
>> > >> > > If we're looking for a more general solution, I would suggest we
>> > >> discuss
>> > >> > > how to track watermarks on a per-split basis.  Or, as Till
>> mentioned
>> > >> > > recently, an alternate solution may be to dynamically adjust the
>> > >> > > parallelism of the task.
>> > >> > >
>> > >> > > I don't agree with the notion that idleness involves a
>> correctness
>> > >> > > tradeoff.  The facility I described above has no impact on
>> > >> correctness.
>> > >> > > Meanwhile, various watermark strategies rely on heuristics
>> involving
>> > >> the
>> > >> > > processing-time domain, and the term idleness seems to have found
>> > >> > purchase
>> > >> > > there too.  The connection among the concepts seems tenuous.
>> > >> > >
>> > >> > > -E
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <
>> > pnowoj...@apache.org>
>> > >> > wrote:
>> > >> > >>
>> > >> > >> Hi Arvid,
>> > >> > >>
>> > >> > >> Thanks for writing down this summary and proposal. I think this
>> was
>> > >> the
>> > >> > >> foundation of the disagreement in FLIP-167 discussion. Dawid was
>> > >> arguing
>> > >> > >> that idleness is intermittent, strictly a task local concept
>> and as
>> > >> such
>> > >> > >> shouldn't be exposed in for example sinks. While me and Eron
>> > thought
>> > >> > that
>> > >> > >> it's a concept strictly connected to watermarks.
>> > >> > >>
>> > >> > >> 1. I'm big +1 for changing the StreamStatus definition to stream
>> > >> > "providing
>> > >> > >> watermark" and "not providing watermark". With respect to that I
>> > >> agree
>> > >> > with
>> > >> > >> Dawid that record bound idleness *(if we would ever need to
>> > >> > define/expose
>> > >> > >> it)* should be an intermittent concept, like for example the
>> > >> existing in
>> > >> > >> the Task/runtime input availability
>> (StreamTaskInput#isAvailable).
>> > >> > >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good
>> name.
>> > But
>> > >> > >> I also don't have any good ideas.
>> > >> > >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
>> > >> > >>
>> > >> > >> Best,
>> > >> > >> Piotrek
>> > >> > >>
>> > >> > >> wt., 8 cze 2021 o 16:35 Arvid Heise <ar...@apache.org>
>> napisał(a):
>> > >> > >>
>> > >> > >>> Hi devs,
>> > >> > >>>
>> > >> > >>> While discussing "Watermark propagation with Sink API" and
>> during
>> > >> > >>> "[FLINK-18934] Idle stream does not advance watermark in
>> connected
>> > >> > >> stream",
>> > >> > >>> we noticed some drawbacks on how Flink defines idle partitions
>> > >> > currently.
>> > >> > >>>
>> > >> > >>> To recap, idleness was always considered as a means to achieve
>> > >> progress
>> > >> > >> in
>> > >> > >>> window operators with idle partition in the source at the risk
>> of
>> > >> > losing
>> > >> > >> a
>> > >> > >>> bit of correctness. In particular, records could be considered
>> > late,
>> > >> > >> simply
>> > >> > >>> because of that idleness timeout and not because they arrived
>> out
>> > of
>> > >> > >> order.
>> > >> > >>> A potential reprocessing would not be causing these records to
>> be
>> > >> > >>> considered late and we may end up with a different (correct)
>> > result.
>> > >> > >>>
>> > >> > >>> The drawbacks that we discovered are as follows:
>> > >> > >>> - We currently only use idleness to exclude respective upstream
>> > >> tasks
>> > >> > >> from
>> > >> > >>> participating in watermark generation.
>> > >> > >>> - However, the definition is bound to records. [1] In
>> particular,
>> > >> > while a
>> > >> > >>> partition is idle, no records should be produced.
>> > >> > >>> - That brings us into quite a few edge cases, where operators
>> emit
>> > >> > >> records,
>> > >> > >>> while they are actually idling: Think of timers, asyncIO
>> > operators,
>> > >> > >> window
>> > >> > >>> operators based on timeouts, etc. that trigger on an operator
>> > >> ingesting
>> > >> > >> an
>> > >> > >>> idle partition.
>> > >> > >>> - The proper solution would be to turn the operator active
>> while
>> > >> > emitting
>> > >> > >>> and to return to being idle afterwards (but when?). However,
>> this
>> > >> has
>> > >> > >> some
>> > >> > >>> unintended side-effects depending on when you switch back:
>> > >> > >>>  - If you toggle stream status for each record, you get a huge
>> > >> overhead
>> > >> > >> on
>> > >> > >>> stream status records and quite a bit of processing in
>> downstream
>> > >> > >> operators
>> > >> > >>> (that code path is not much optimized since switching is
>> > considered
>> > >> a
>> > >> > >> rare
>> > >> > >>> thing).
>> > >> > >>>  - If you toggle after a certain time, you may get
>> delays>idleness
>> > >> in
>> > >> > >> the
>> > >> > >>> downstream window operators.
>> > >> > >>>  - You could turn back when you processed all pending mails,
>> but
>> > if
>> > >> you
>> > >> > >>> have a self-replicating mail that would be never.
>> Self-enqueueing,
>> > >> low
>> > >> > >>> timer would also produce a flood similar to the first case.
>> > >> > >>>
>> > >> > >>> All in all, the situation is quite unsatisfying because
>> idleness
>> > >> > implies
>> > >> > >> no
>> > >> > >>> records. However, currently there is no need to have that
>> > >> implication:
>> > >> > >>> since we only use it for watermarks, we can easily allow
>> records
>> > to
>> > >> be
>> > >> > >>> emitted (in fact that was the old behavior before FLINK-18934
>> in
>> > >> many
>> > >> > >>> cases) and still get the intended behavior in respect to
>> > watermarks:
>> > >> > >>> - A channel that is active is providing watermarks.
>> > >> > >>> - An idle channel is not providing any watermarks but can
>> deliver
>> > >> > >> records.
>> > >> > >>>
>> > >> > >>> Ultimately, that would mean that we are actually not talking
>> > >> > idle/active
>> > >> > >>> partitions anymore. We are talking more about whether a
>> particular
>> > >> > >> subtask
>> > >> > >>> should influence downstream watermark calculation or not.
>> Leading
>> > to
>> > >> > the
>> > >> > >>> following questions:
>> > >> > >>> 1. Do we want to change the definition as outlined?
>> > >> > >>> 2. Do you see any problem with emitting records on subtask
>> without
>> > >> > >> explicit
>> > >> > >>> watermarks?
>> > >> > >>> 3. If we want to go this way, we may need to refine the
>> > >> > >> names/definitions.
>> > >> > >>> Any ideas?
>> > >> > >>>
>> > >> > >>> I think idle partition should translate into something like
>> > >> > >>> automatic/implicit/passive watermarks; active partition into
>> > >> > >>> explicit/active watermarks. Then StreamStatus is more about
>> > >> > WatermarkMode
>> > >> > >>> (not really happy with this one).
>> > >> > >>>
>> > >> > >>> Best,
>> > >> > >>>
>> > >> > >>> Arvid
>> > >> > >>>
>> > >> > >>> [1]
>> > >> > >>>
>> > >> > >>>
>> > >> > >>
>> > >> >
>> > >>
>> >
>> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
>> > >> > >>>
>> > >> > >>
>> > >> >
>> > >>
>> > >
>> >
>>
>

Reply via email to