Hi everyone,

Sorry for chiming in late here.

Regarding the topic of changing the definition of StreamStatus and changing
the name as well:
After digging into some of the roots of this implementation [1], initially
the StreamStatus was actually defined to mark "watermark idleness", and not
"record idleness" (in fact, the alternative name "WatermarkStatus" was
considered at the time).

The concern at the time causing us to alter the definition to be "record
idleness" in the final implementation was due to the existence of periodic
timestamp / watermark generators within the pipeline. Those would continue
to generate non-increasing watermarks in the absence of any input records
from upstream. In this scenario, downstream operators would not be able to
consider that channel as idle and therefore watermark progress is locked.
We could consider a timeout-based approach on those specific operators to
toggle watermark idleness if the values remain constant for a period of
time, but then again, this is very ill-defined and most likely wrong.

I have not followed the newest changes to the watermark generator operators
and am not sure if this issue is still relevant.
Otherwise, I don't see other problems with changing the definition here.

Thanks,
Gordon

On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Eron,
>
> again to recap from the other thread:
> - You are right that idleness is correct with static assignment and fully
> active partitions. In this case, the source defines idleness. (case A)
> - For the more pressing use cases of idle, assigned partitions, the user
> defines an idleness threshold, and it becomes potentially incorrect, when
> the partition becomes active again. (case B)
> - Same holds for dynamic assignment of splits. If a source without a split
> gets a split assigned dynamically, there is a realistic chance that the
> watermark advanced past the first record of the newly assigned split. (case
> C)
> You can certainly insist that only the first case is valid (as it's
> correct) but we know that users use it in other ways and that was also the
> intent of the devs.
>
> Now the question could be if it makes sense to distinguish these cases.
> Would you treat the idleness information differently (especially in the
> sink/source that motivated FLIP-167) if you knew that the idleness is
> guaranteed correct?
> We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
> (case B).
>
> However, that would still leave case C, which probably would need to be
> solved completely differently. I could imagine that a source with dynamic
> assignments should never have IDLE subtasks and rather manage the idleness
> itself. For example, it could emit a watermark per second/minute that is
> directly fetched from the source system. I'm just not sure if the current
> WatermarkAssigner interface suffices in that regard...
>
>
> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <piotr.nowoj...@gmail.com>
> wrote:
>
> > Hi Eron,
> >
> > Can you elaborate a bit more what do you mean? I don’t understand what do
> > you mean by more general solution.
> >
> > As of now, stream is marked idle by a source/watermark generator, which
> > has an effect of temporarily ignoring this stream/partition from
> > calculating min watermark in the downstream tasks. However stream is
> > switching back to active when any record is emitted. This is what’s
> causing
> > problems described by Arvid.
> >
> > The core of our proposal is very simple. Keep everything as it is except
> > stating that stream will be changed back to active only once a watermark
> is
> > emitted again - not record. In other words disconnecting idleness from
> > presence of records and connecting it only to presence or lack of
> > watermarks and allowing to emit records while “stream status” is “idle”
> >
> > Piotrek
> >
> >
> > > Wiadomość napisana przez Eron Wright <ewri...@streamnative.io.invalid>
> > w dniu 09.06.2021, o godz. 06:01:
> > >
> > > It seems to me that idleness was introduced to deal with a very
> specific
> > > issue.  In the pipeline, watermarks are aggregated not on a per-split
> > basis
> > > but on a per-subtask basis.  This works well when each subtask has
> > exactly
> > > one split.  When a sub-task has multiple splits, various complications
> > > occur involving the commingling of watermarks.  And when a sub-task has
> > no
> > > splits, the pipeline stalls altogether.  To deal with the latter
> problem,
> > > idleness was introduced.  The sub-task simply declares itself to be
> idle
> > to
> > > be taken out of consideration for purposes of watermark aggregation.
> > >
> > > If we're looking for a more general solution, I would suggest we
> discuss
> > > how to track watermarks on a per-split basis.  Or, as Till mentioned
> > > recently, an alternate solution may be to dynamically adjust the
> > > parallelism of the task.
> > >
> > > I don't agree with the notion that idleness involves a correctness
> > > tradeoff.  The facility I described above has no impact on correctness.
> > > Meanwhile, various watermark strategies rely on heuristics involving
> the
> > > processing-time domain, and the term idleness seems to have found
> > purchase
> > > there too.  The connection among the concepts seems tenuous.
> > >
> > > -E
> > >
> > >
> > >
> > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> > >>
> > >> Hi Arvid,
> > >>
> > >> Thanks for writing down this summary and proposal. I think this was
> the
> > >> foundation of the disagreement in FLIP-167 discussion. Dawid was
> arguing
> > >> that idleness is intermittent, strictly a task local concept and as
> such
> > >> shouldn't be exposed in for example sinks. While me and Eron thought
> > that
> > >> it's a concept strictly connected to watermarks.
> > >>
> > >> 1. I'm big +1 for changing the StreamStatus definition to stream
> > "providing
> > >> watermark" and "not providing watermark". With respect to that I agree
> > with
> > >> Dawid that record bound idleness *(if we would ever need to
> > define/expose
> > >> it)* should be an intermittent concept, like for example the existing
> in
> > >> the Task/runtime input availability (StreamTaskInput#isAvailable).
> > >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
> > >> I also don't have any good ideas.
> > >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
> > >>
> > >> Best,
> > >> Piotrek
> > >>
> > >> wt., 8 cze 2021 o 16:35 Arvid Heise <ar...@apache.org> napisał(a):
> > >>
> > >>> Hi devs,
> > >>>
> > >>> While discussing "Watermark propagation with Sink API" and during
> > >>> "[FLINK-18934] Idle stream does not advance watermark in connected
> > >> stream",
> > >>> we noticed some drawbacks on how Flink defines idle partitions
> > currently.
> > >>>
> > >>> To recap, idleness was always considered as a means to achieve
> progress
> > >> in
> > >>> window operators with idle partition in the source at the risk of
> > losing
> > >> a
> > >>> bit of correctness. In particular, records could be considered late,
> > >> simply
> > >>> because of that idleness timeout and not because they arrived out of
> > >> order.
> > >>> A potential reprocessing would not be causing these records to be
> > >>> considered late and we may end up with a different (correct) result.
> > >>>
> > >>> The drawbacks that we discovered are as follows:
> > >>> - We currently only use idleness to exclude respective upstream tasks
> > >> from
> > >>> participating in watermark generation.
> > >>> - However, the definition is bound to records. [1] In particular,
> > while a
> > >>> partition is idle, no records should be produced.
> > >>> - That brings us into quite a few edge cases, where operators emit
> > >> records,
> > >>> while they are actually idling: Think of timers, asyncIO operators,
> > >> window
> > >>> operators based on timeouts, etc. that trigger on an operator
> ingesting
> > >> an
> > >>> idle partition.
> > >>> - The proper solution would be to turn the operator active while
> > emitting
> > >>> and to return to being idle afterwards (but when?). However, this has
> > >> some
> > >>> unintended side-effects depending on when you switch back:
> > >>>  - If you toggle stream status for each record, you get a huge
> overhead
> > >> on
> > >>> stream status records and quite a bit of processing in downstream
> > >> operators
> > >>> (that code path is not much optimized since switching is considered a
> > >> rare
> > >>> thing).
> > >>>  - If you toggle after a certain time, you may get delays>idleness in
> > >> the
> > >>> downstream window operators.
> > >>>  - You could turn back when you processed all pending mails, but if
> you
> > >>> have a self-replicating mail that would be never. Self-enqueueing,
> low
> > >>> timer would also produce a flood similar to the first case.
> > >>>
> > >>> All in all, the situation is quite unsatisfying because idleness
> > implies
> > >> no
> > >>> records. However, currently there is no need to have that
> implication:
> > >>> since we only use it for watermarks, we can easily allow records to
> be
> > >>> emitted (in fact that was the old behavior before FLINK-18934 in many
> > >>> cases) and still get the intended behavior in respect to watermarks:
> > >>> - A channel that is active is providing watermarks.
> > >>> - An idle channel is not providing any watermarks but can deliver
> > >> records.
> > >>>
> > >>> Ultimately, that would mean that we are actually not talking
> > idle/active
> > >>> partitions anymore. We are talking more about whether a particular
> > >> subtask
> > >>> should influence downstream watermark calculation or not. Leading to
> > the
> > >>> following questions:
> > >>> 1. Do we want to change the definition as outlined?
> > >>> 2. Do you see any problem with emitting records on subtask without
> > >> explicit
> > >>> watermarks?
> > >>> 3. If we want to go this way, we may need to refine the
> > >> names/definitions.
> > >>> Any ideas?
> > >>>
> > >>> I think idle partition should translate into something like
> > >>> automatic/implicit/passive watermarks; active partition into
> > >>> explicit/active watermarks. Then StreamStatus is more about
> > WatermarkMode
> > >>> (not really happy with this one).
> > >>>
> > >>> Best,
> > >>>
> > >>> Arvid
> > >>>
> > >>> [1]
> > >>>
> > >>>
> > >>
> >
> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
> > >>>
> > >>
> >
>

Reply via email to