[ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368119#comment-17368119
 ] 

Stephan Ewen commented on FLINK-23011:
--------------------------------------

*About 1(b):*

In my understanding, that is covered by the following mechanism, which we 
already have: When the reader gets the {{noMoreSplits}} event and has finished 
all currently assigned splits, it emits a LONG_MAX watermark, thus fully and 
permanently unblocking the downstream eventtime progress. Having fewer 
partitions than readers (e.g. in Kafka) means that some readers get the 
end-of-partition event directly and not ever getting any partition. That would 
be fine.

This admittedly isn't working well with Kafka partition discovery, in which 
case there would never be a {{noMoreSplitsEvent}}. Then again, partition 
discovery breaks a lot of things (like key partition and order guarantees).

I guess to make this case work, we need something like coordinator-triggered 
idleness, that is the only thing I can see working well. The Kafka Source's 
split enumerator would see that it didn't assign splits to a reader, and when 
being in split discovery mode, it would send a "go idle" event to that reader. 
The reader alone cannot make that decision.

*About 2*:

I kind of think about it in the exact opposite way. I agree that idleness is 
not per reader, but it is per split (partition), and the main purpose of it is 
to cover the case where a long-living split (like a Kafka or Kinesis 
partition/segment) have no data temporarily.
To my understanding, the Kafka and Kinesis case are the most common ones that 
we have to handle well.
I don't see how that can be covered be delegating the watermark assignment back 
to the enumerator.

We could solve this without idleness by always communicating the watermark back 
from the reader to the enumerator, merging it there, and then rebroadcasting it 
from there. But that seems pretty involved and inefficient in the common case. 
Maybe I am overlooking something there.

What I agree with is that we shouldn't be thinking about Idleness in the source 
design, that was what I was trying to motivate in the previous comment. 
Idleness is purely in the space of a partition-local Watermark Generator and 
only handled between WatermarkGenerators and downstream operators. The source 
framework never gets involved with idelness at all.

> FLIP-27 sources are generating non-deterministic results when using event time
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-23011
>                 URL: https://issues.apache.org/jira/browse/FLINK-23011
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.14.0
>         Environment: 
>            Reporter: Piotr Nowojski
>            Assignee: Dawid Wysakowicz
>            Priority: Critical
>             Fix For: 1.14.0
>
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream<Long> eventStream =
>         env.fromSource(
>                         new NumberSequenceSource(0, Long.MAX_VALUE),
>                         WatermarkStrategy.<Long>forMonotonousTimestamps()
>                                 .withTimestampAssigner(new 
> LongTimestampAssigner()),
>                         "NumberSequenceSource")
>                 .map(
>                         new RichMapFunction<Long, Long>() {
>                             @Override
>                             public Long map(Long value) throws Exception {
>                                 if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
>                                     Thread.sleep(1);
>                                 }
>                                 return 1L;
>                             }
>                         });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner<Long> {
>     private long counter = 0;
>     @Override
>     public long extractTimestamp(Long record, long recordTimeStamp) {
>         return counter++;
>     }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole {{WindowOperator}}. When the input 
> channel from the throttled source subtask finally receives it's {{ACTIVE}} 
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in 
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to