[ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368343#comment-17368343
 ] 

Stephan Ewen commented on FLINK-23011:
--------------------------------------

The dynamic partitioning case is certainly good to keep in mind. I was thinking 
about that in a similar way as for the file source:
  - It means that we don't yet know all splits
  - Splits get discovered over time
  - But we can deduce (from metadata) the event time ranges of splits.

A Kinesis / Pulasr / Pravega source could also use the global watermark 
holdback: It would set the global watermark to the lowest value for which all 
overlapping splits (segments) are already assigned. So instead of emitting 
LONG_MAX, the empty reader would emit the watermark corresponding to the global 
holdback.

As a general mechanism, this would have kind of this shape:
  - When an enumerator starts, the global watermark holdback is LONG_MIN
  - Sources that eagerly discover and assign their splits (like Kafka w/o 
partition discovery) would set the global holdback to LONG_MAX immediately 
after assigning splits and sending the noMoreSplits.
  - Sources like the FileSource (or Kinesis/Pulasr/Pravega) would eagerly 
assign the first set of splits, then set the global watermark holdback the 
start of the earliest remaining split. With each next split assignment, it 
would advance the global watermark holdback to the start of the lowest 
remaining split.

I think this is a bit nicer than going with idleness, because idleness 
semantically advances the watermark (locally) and the pulls it back, hoping 
that everything is still alright (that's why it is something I would only let 
users do explicitly, not the system).
In contrast this approach advances the watermark step by step, never too far 
(even locally). I think that makes it somewhat easier to understand and well 
defined. 

> FLIP-27 sources are generating non-deterministic results when using event time
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-23011
>                 URL: https://issues.apache.org/jira/browse/FLINK-23011
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.14.0
>         Environment: 
>            Reporter: Piotr Nowojski
>            Assignee: Dawid Wysakowicz
>            Priority: Critical
>             Fix For: 1.14.0
>
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream<Long> eventStream =
>         env.fromSource(
>                         new NumberSequenceSource(0, Long.MAX_VALUE),
>                         WatermarkStrategy.<Long>forMonotonousTimestamps()
>                                 .withTimestampAssigner(new 
> LongTimestampAssigner()),
>                         "NumberSequenceSource")
>                 .map(
>                         new RichMapFunction<Long, Long>() {
>                             @Override
>                             public Long map(Long value) throws Exception {
>                                 if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
>                                     Thread.sleep(1);
>                                 }
>                                 return 1L;
>                             }
>                         });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner<Long> {
>     private long counter = 0;
>     @Override
>     public long extractTimestamp(Long record, long recordTimeStamp) {
>         return counter++;
>     }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole {{WindowOperator}}. When the input 
> channel from the throttled source subtask finally receives it's {{ACTIVE}} 
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in 
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to