Hi Eron,

again to recap from the other thread:
- You are right that idleness is correct with static assignment and fully
active partitions. In this case, the source defines idleness. (case A)
- For the more pressing use cases of idle, assigned partitions, the user
defines an idleness threshold, and it becomes potentially incorrect, when
the partition becomes active again. (case B)
- Same holds for dynamic assignment of splits. If a source without a split
gets a split assigned dynamically, there is a realistic chance that the
watermark advanced past the first record of the newly assigned split. (case
C)
You can certainly insist that only the first case is valid (as it's
correct) but we know that users use it in other ways and that was also the
intent of the devs.

Now the question could be if it makes sense to distinguish these cases.
Would you treat the idleness information differently (especially in the
sink/source that motivated FLIP-167) if you knew that the idleness is
guaranteed correct?
We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
(case B).

However, that would still leave case C, which probably would need to be
solved completely differently. I could imagine that a source with dynamic
assignments should never have IDLE subtasks and rather manage the idleness
itself. For example, it could emit a watermark per second/minute that is
directly fetched from the source system. I'm just not sure if the current
WatermarkAssigner interface suffices in that regard...


On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

> Hi Eron,
>
> Can you elaborate a bit more what do you mean? I don’t understand what do
> you mean by more general solution.
>
> As of now, stream is marked idle by a source/watermark generator, which
> has an effect of temporarily ignoring this stream/partition from
> calculating min watermark in the downstream tasks. However stream is
> switching back to active when any record is emitted. This is what’s causing
> problems described by Arvid.
>
> The core of our proposal is very simple. Keep everything as it is except
> stating that stream will be changed back to active only once a watermark is
> emitted again - not record. In other words disconnecting idleness from
> presence of records and connecting it only to presence or lack of
> watermarks and allowing to emit records while “stream status” is “idle”
>
> Piotrek
>
>
> > Wiadomość napisana przez Eron Wright <ewri...@streamnative.io.invalid>
> w dniu 09.06.2021, o godz. 06:01:
> >
> > It seems to me that idleness was introduced to deal with a very specific
> > issue.  In the pipeline, watermarks are aggregated not on a per-split
> basis
> > but on a per-subtask basis.  This works well when each subtask has
> exactly
> > one split.  When a sub-task has multiple splits, various complications
> > occur involving the commingling of watermarks.  And when a sub-task has
> no
> > splits, the pipeline stalls altogether.  To deal with the latter problem,
> > idleness was introduced.  The sub-task simply declares itself to be idle
> to
> > be taken out of consideration for purposes of watermark aggregation.
> >
> > If we're looking for a more general solution, I would suggest we discuss
> > how to track watermarks on a per-split basis.  Or, as Till mentioned
> > recently, an alternate solution may be to dynamically adjust the
> > parallelism of the task.
> >
> > I don't agree with the notion that idleness involves a correctness
> > tradeoff.  The facility I described above has no impact on correctness.
> > Meanwhile, various watermark strategies rely on heuristics involving the
> > processing-time domain, and the term idleness seems to have found
> purchase
> > there too.  The connection among the concepts seems tenuous.
> >
> > -E
> >
> >
> >
> >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
> >>
> >> Hi Arvid,
> >>
> >> Thanks for writing down this summary and proposal. I think this was the
> >> foundation of the disagreement in FLIP-167 discussion. Dawid was arguing
> >> that idleness is intermittent, strictly a task local concept and as such
> >> shouldn't be exposed in for example sinks. While me and Eron thought
> that
> >> it's a concept strictly connected to watermarks.
> >>
> >> 1. I'm big +1 for changing the StreamStatus definition to stream
> "providing
> >> watermark" and "not providing watermark". With respect to that I agree
> with
> >> Dawid that record bound idleness *(if we would ever need to
> define/expose
> >> it)* should be an intermittent concept, like for example the existing in
> >> the Task/runtime input availability (StreamTaskInput#isAvailable).
> >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
> >> I also don't have any good ideas.
> >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
> >>
> >> Best,
> >> Piotrek
> >>
> >> wt., 8 cze 2021 o 16:35 Arvid Heise <ar...@apache.org> napisał(a):
> >>
> >>> Hi devs,
> >>>
> >>> While discussing "Watermark propagation with Sink API" and during
> >>> "[FLINK-18934] Idle stream does not advance watermark in connected
> >> stream",
> >>> we noticed some drawbacks on how Flink defines idle partitions
> currently.
> >>>
> >>> To recap, idleness was always considered as a means to achieve progress
> >> in
> >>> window operators with idle partition in the source at the risk of
> losing
> >> a
> >>> bit of correctness. In particular, records could be considered late,
> >> simply
> >>> because of that idleness timeout and not because they arrived out of
> >> order.
> >>> A potential reprocessing would not be causing these records to be
> >>> considered late and we may end up with a different (correct) result.
> >>>
> >>> The drawbacks that we discovered are as follows:
> >>> - We currently only use idleness to exclude respective upstream tasks
> >> from
> >>> participating in watermark generation.
> >>> - However, the definition is bound to records. [1] In particular,
> while a
> >>> partition is idle, no records should be produced.
> >>> - That brings us into quite a few edge cases, where operators emit
> >> records,
> >>> while they are actually idling: Think of timers, asyncIO operators,
> >> window
> >>> operators based on timeouts, etc. that trigger on an operator ingesting
> >> an
> >>> idle partition.
> >>> - The proper solution would be to turn the operator active while
> emitting
> >>> and to return to being idle afterwards (but when?). However, this has
> >> some
> >>> unintended side-effects depending on when you switch back:
> >>>  - If you toggle stream status for each record, you get a huge overhead
> >> on
> >>> stream status records and quite a bit of processing in downstream
> >> operators
> >>> (that code path is not much optimized since switching is considered a
> >> rare
> >>> thing).
> >>>  - If you toggle after a certain time, you may get delays>idleness in
> >> the
> >>> downstream window operators.
> >>>  - You could turn back when you processed all pending mails, but if you
> >>> have a self-replicating mail that would be never. Self-enqueueing, low
> >>> timer would also produce a flood similar to the first case.
> >>>
> >>> All in all, the situation is quite unsatisfying because idleness
> implies
> >> no
> >>> records. However, currently there is no need to have that implication:
> >>> since we only use it for watermarks, we can easily allow records to be
> >>> emitted (in fact that was the old behavior before FLINK-18934 in many
> >>> cases) and still get the intended behavior in respect to watermarks:
> >>> - A channel that is active is providing watermarks.
> >>> - An idle channel is not providing any watermarks but can deliver
> >> records.
> >>>
> >>> Ultimately, that would mean that we are actually not talking
> idle/active
> >>> partitions anymore. We are talking more about whether a particular
> >> subtask
> >>> should influence downstream watermark calculation or not. Leading to
> the
> >>> following questions:
> >>> 1. Do we want to change the definition as outlined?
> >>> 2. Do you see any problem with emitting records on subtask without
> >> explicit
> >>> watermarks?
> >>> 3. If we want to go this way, we may need to refine the
> >> names/definitions.
> >>> Any ideas?
> >>>
> >>> I think idle partition should translate into something like
> >>> automatic/implicit/passive watermarks; active partition into
> >>> explicit/active watermarks. Then StreamStatus is more about
> WatermarkMode
> >>> (not really happy with this one).
> >>>
> >>> Best,
> >>>
> >>> Arvid
> >>>
> >>> [1]
> >>>
> >>>
> >>
> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
> >>>
> >>
>

Reply via email to