[
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368014#comment-17368014
]
Arvid Heise commented on FLINK-23011:
-------------------------------------
Thanks for sharing your thoughts.
I agree with 1(a) but I'm torn on (b).
If we don't use idleness in (b), we would not advance watermarks in cases where
you have fewer partitions than readers. That can happen permanently (fewer
kafka partitions than readers) or temporarily (kinesis currently has fewer
shards but that can change later). So I think we should also use idleness here
but it would be nice to differentiate the cause of idleness.
I agree that we should never switch idle, active in the general framework and
leave it to the source implementation. The only exception is readers that
received `noMoreSplits` and drained all their splits; but that is effectively
covered with END_OF_INPUT and subsequent closing of the reader.
For 2). I think we see the same fundamental design issue with the current
idleness that is decided per reader.
Piotr and I were doing a bit of brainstorming and were thinking that a complete
alternative to the current solution would be to always generate watermarks on
the sources and in the case of idleness let the enumerator generate the
watermarks. In certain systems, it may be easier to calculate a lower bound and
propagate it to all readers.
That would also allow the enumerator to not advance the watermark when it knows
that splits are still in the backlog and cover situation where partitions are
dynamically split/merged. I think this pretty much aligns with your idea.
For me, the big question is if we need then idleness in the current form at
all. If the enumerator of all sources have a meaningful way to advance the
watermark and all source tasks emit these watermarks, we could get rid of
StreamStatus and simplify the whole logic. I'm just not sure if we can assume
that all sources could implement such a behavior. I'm especially struggling
with continuous FileSource but it feels like watermarks are not terrible useful
here anyways.
> FLIP-27 sources are generating non-deterministic results when using event time
> ------------------------------------------------------------------------------
>
> Key: FLINK-23011
> URL: https://issues.apache.org/jira/browse/FLINK-23011
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.14.0
> Environment:
> Reporter: Piotr Nowojski
> Assignee: Dawid Wysakowicz
> Priority: Critical
> Fix For: 1.14.0
>
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating
> the input (min) watermark.
> An extreme example to what problem this leads to, are completely bogus
> results if for example one FLIP-27 source subtask is slower than others for
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
> 10));
> DataStream<Long> eventStream =
> env.fromSource(
> new NumberSequenceSource(0, Long.MAX_VALUE),
> WatermarkStrategy.<Long>forMonotonousTimestamps()
> .withTimestampAssigner(new
> LongTimestampAssigner()),
> "NumberSequenceSource")
> .map(
> new RichMapFunction<Long, Long>() {
> @Override
> public Long map(Long value) throws Exception {
> if
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
> Thread.sleep(1);
> }
> return 1L;
> }
> });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements
> SerializableTimestampAssigner<Long> {
> private long counter = 0;
> @Override
> public long extractTimestamp(Long record, long recordTimeStamp) {
> return counter++;
> }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not
> throttled subtask (subTaskId == 1) generates very high watermarks. The other
> source subtask (subTaskId == 0) emits very low watermarks. If the non
> throttled watermark reaches the downstream {{WindowOperator}} first, while
> the other input channel is still idle, it will take those high watermarks as
> combined input watermark for the the whole {{WindowOperator}}. When the input
> channel from the throttled source subtask finally receives it's {{ACTIVE}}
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)