[
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368643#comment-17368643
]
Arvid Heise commented on FLINK-23011:
-------------------------------------
I think we a have a large agreement here. I'd open a FLIP but I still would
like some clarification before that:
* Does that mean, we can completely remove StreamStatus and related
mechanisms? Effectively, watermark aggregation is really as simple as it gets:
you take the min of all non-finished inputs. If all inputs are finished,
watermark is LONG_MAX.
* For multiple sources: completely drained sources emit LONG_MAX and finish,
so they don't participate at all in watermark assignment.
* For multiple sources: if a source is completely idle but not yet finished
(bursty payloads), we may need to invent a watermark out of thin air. I think
the best course in your described setup is to have idleness in the watermark
generator on the enumerator that periodically advances the global watermark
holdback. This is again approximate and we may be able to avoid it for sources
with built-in watermarks in the first place but this is also explicitly
user-defined and probably cannot be avoided.
>From an architecture's point of view, we would shift idleness and implicit
>watermark generation completely into the source enumerator and hide it from
>the downstream tasks. That gives source implementers much more freedom on the
>cost of having more complexity. We can solve the latter by providing building
>blocks and documentation.
> FLIP-27 sources are generating non-deterministic results when using event time
> ------------------------------------------------------------------------------
>
> Key: FLINK-23011
> URL: https://issues.apache.org/jira/browse/FLINK-23011
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.14.0
> Environment:
> Reporter: Piotr Nowojski
> Assignee: Dawid Wysakowicz
> Priority: Critical
> Fix For: 1.14.0
>
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating
> the input (min) watermark.
> An extreme example to what problem this leads to, are completely bogus
> results if for example one FLIP-27 source subtask is slower than others for
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
> 10));
> DataStream<Long> eventStream =
> env.fromSource(
> new NumberSequenceSource(0, Long.MAX_VALUE),
> WatermarkStrategy.<Long>forMonotonousTimestamps()
> .withTimestampAssigner(new
> LongTimestampAssigner()),
> "NumberSequenceSource")
> .map(
> new RichMapFunction<Long, Long>() {
> @Override
> public Long map(Long value) throws Exception {
> if
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
> Thread.sleep(1);
> }
> return 1L;
> }
> });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements
> SerializableTimestampAssigner<Long> {
> private long counter = 0;
> @Override
> public long extractTimestamp(Long record, long recordTimeStamp) {
> return counter++;
> }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not
> throttled subtask (subTaskId == 1) generates very high watermarks. The other
> source subtask (subTaskId == 0) emits very low watermarks. If the non
> throttled watermark reaches the downstream {{WindowOperator}} first, while
> the other input channel is still idle, it will take those high watermarks as
> combined input watermark for the the whole {{WindowOperator}}. When the input
> channel from the throttled source subtask finally receives it's {{ACTIVE}}
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)