Hi Eron, thank you very much for your feedback.
Please mention that the "temporary status toggle" code will be removed. > This code is already removed but there is still some automation of going idle when temporary no splits are assigned. I will include it in the FLIP. I agree with adding the markActive() functionality, for symmetry. Speaking > of symmetry, could we now include the minor enhancement we discussed in > FLIP-167, the exposure of watermark status changes on the Sink interface. > I drafted a PR and would be happy to revisit it. > > https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70 I'm still not sure if that's a good idea. If we have now refined idleness to be an user-specified, application-specific way to handle with temporarily stalled partitions, then downstream applications should actually implement their own idleness definition. Let's see what other devs think. I'm pinging the once that have been most involved in the discussion: @Stephan Ewen <se...@apache.org> @Till Rohrmann <trohrm...@apache.org> @Dawid Wysakowicz <dwysakow...@apache.org> . The flip mentions a 'watermarkstatus' package for the WatermarkStatus > class. Should it be 'eventtime' package? > Are you proposing org.apache.flink.api.common.eventtime? I was simply suggesting to simply rename org.apache.flink.streaming.runtime.streamstatus but I'm very open for other suggestions (given that there are only 2 classes in the package). > Regarding the change of 'streamStatus' to 'watermarkStatus', could you > spell out what the new method names will be on each interface? May I > suggest that Input.emitStreamStatus be Input.processStreamStatus? This is > to help decouple the input's watermark status from the output's watermark > status. > I haven't found org.apache.flink.streaming.api.operators.Input#emitStreamStatus in master. Could you double-check if I'm looking at the correct class? The current idea was mainly to grep+replace /streamStatus/watermarkStatus/ and /StreamStatus/WatermarkStatus/. But again I'm very open for more descriptive names. I can add an explicit list later. I'm assuming you are only interested in (semi-)public classes. > I observe that AbstractStreamOperator is hardcoded to derive the output > channel's status from the input channel's status. May I suggest > we refactor "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)" to > allow for the operator subclass to customize the processing of the > aggregated watermark and watermark status. > Can you add a motivation for that? @Dawid Wysakowicz <dwysakow...@apache.org> , I think you are the last person that touched the code. Do you have some example operators in your head that would change it? Maybe the FLIP should spell out the expected behavior of the generic > watermark generator (TimestampsAndWatermarksOperator). Should the > generator ignore the upstream idleness signal? I believe it propagates the > signal, even though it also generates its own signals. Given that > source-based and generic watermark generation shouldn't be combined, one > could argue that the generic watermark generator should activate only when > its input channel's watermark status is idle. > I will add a section. In general, we assume that we only have source-based watermark generators once FLIP-27 is properly adopted. Best, Arvid On Wed, Jul 21, 2021 at 12:40 AM Eron Wright <ewri...@streamnative.io.invalid> wrote: > This proposal to narrow the definition of idleness to focus on the > event-time clock is great. > > Please mention that the "temporary status toggle" code will be removed. > > I agree with adding the markActive() functionality, for symmetry. Speaking > of symmetry, could we now include the minor enhancement we discussed in > FLIP-167, the exposure of watermark status changes on the Sink interface. > I drafted a PR and would be happy to revisit it. > > https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70 > > The flip mentions a 'watermarkstatus' package for the WatermarkStatus > class. Should it be 'eventtime' package? > > Regarding the change of 'streamStatus' to 'watermarkStatus', could you > spell out what the new method names will be on each interface? May I > suggest that Input.emitStreamStatus be Input.processStreamStatus? This is > to help decouple the input's watermark status from the output's watermark > status. > > I observe that AbstractStreamOperator is hardcoded to derive the output > channel's status from the input channel's status. May I suggest > we refactor "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)" to > allow for the operator subclass to customize the processing of the > aggregated watermark and watermark status. > > Maybe the FLIP should spell out the expected behavior of the generic > watermark generator (TimestampsAndWatermarksOperator). Should the > generator ignore the upstream idleness signal? I believe it propagates the > signal, even though it also generates its own signals. Given that > source-based and generic watermark generation shouldn't be combined, one > could argue that the generic watermark generator should activate only when > its input channel's watermark status is idle. > > Thanks again for this effort! > -Eron > > > On Sun, Jul 18, 2021 at 11:53 PM Arvid Heise <ar...@apache.org> wrote: > > > Dear devs, > > > > We recently discovered that StreamStatus and Idleness is insufficiently > > defined [1], so I drafted a FLIP [3] to amend that situation. It would be > > good to hear more opinions on that matter. Ideally, we can make the > changes > > to 1.14 as some newly added methods are affected. > > > > Best, > > > > Arvid > > > > [1] > > > > > https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E > > [2] > > > > > https://lists.apache.org/thread.html/rb871f5aecbca6e5d786303557a6cdb3d425954385cbdb1b777f2fcf5%40%3Cdev.flink.apache.org%3E > > [3] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition > > >