[
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17367037#comment-17367037
]
Dawid Wysakowicz commented on FLINK-23011:
------------------------------------------
After some investigation of FLINK-22926 I realized I accidently changed the
behaviour for situation when there are no splits assigned. In 1.13 and older if
there were no {{PartialWatermarks}} in {{WatermarkOutputMultiplexer}} we were
not emitting the {{StreamStatus.IDLE}}.
I tried reverting that behaviour in:
https://github.com/apache/flink/pull/16221/commits/9841894efcb68d3c5d72695a19ca9b26dba08ef4
I emit {{StreamStatus.IDLE}} only if there are no splits, but the {{Watermark}}
has progressed. If it is at its initial value and there are no splits assigned
yet, we do not emit {{StreamStatus}}. Effectively that implements the
workaround suggested by [~AHeise].
> FLIP-27 sources are generating non-deterministic results when using event time
> ------------------------------------------------------------------------------
>
> Key: FLINK-23011
> URL: https://issues.apache.org/jira/browse/FLINK-23011
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.14.0, 1.13.1, 1.12.4
> Environment:
> Reporter: Piotr Nowojski
> Priority: Critical
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating
> the input (min) watermark.
> An extreme example to what problem this leads to, are completely bogus
> results if for example one FLIP-27 source subtask is slower than others for
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
> 10));
> DataStream<Long> eventStream =
> env.fromSource(
> new NumberSequenceSource(0, Long.MAX_VALUE),
> WatermarkStrategy.<Long>forMonotonousTimestamps()
> .withTimestampAssigner(new
> LongTimestampAssigner()),
> "NumberSequenceSource")
> .map(
> new RichMapFunction<Long, Long>() {
> @Override
> public Long map(Long value) throws Exception {
> if
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
> Thread.sleep(1);
> }
> return 1L;
> }
> });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements
> SerializableTimestampAssigner<Long> {
> private long counter = 0;
> @Override
> public long extractTimestamp(Long record, long recordTimeStamp) {
> return counter++;
> }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not
> throttled subtask (subTaskId == 1) generates very high watermarks. The other
> source subtask (subTaskId == 0) emits very low watermarks. If the non
> throttled watermark reaches the downstream {{WindowOperator}} first, while
> the other input channel is still idle, it will take those high watermarks as
> combined input watermark for the the whole {{WindowOperator}}. When the input
> channel from the throttled source subtask finally receives it's {{ACTIVE}}
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)