[ 
https://issues.apache.org/jira/browse/FLINK-24623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17478532#comment-17478532
 ] 

Till Rohrmann edited comment on FLINK-24623 at 1/19/22, 10:18 AM:
------------------------------------------------------------------

Hi Dario, sorry for the inactivity. Having such a pre-flight check could indeed 
be beneficial. What is not super optimal is that we iterate over all 
predecessors for every operator that uses event time. This could have quadratic 
complexity. However, since this happens pre-flight, this might be ok. And it 
will improve Flink's usability.

I am also pulling in [~fpaul] for additional comments.


was (Author: till.rohrmann):
Hi Dario, sorry for the inactivity. Having such a pre-flight check could indeed 
be beneficial. What is not super optimal is that we iterate over all 
predecessors for every operator that uses event time. This could have quadratic 
complexity. However, since this happens pre-flight, this might be ok.

I am also pulling in [~fpaul] for additional comments.

> Prevent usage of EventTimeWindows when EventTime is disabled
> ------------------------------------------------------------
>
>                 Key: FLINK-24623
>                 URL: https://issues.apache.org/jira/browse/FLINK-24623
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>            Reporter: Dario Heinisch
>            Priority: Not a Priority
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Having the following stream will never process values after the windowing as 
> event time based has been disabled via the Watermark strategy:
> {code:java}
> public class PlaygroundJob {
>      public static void main(String[] args) throws Exception {
>          StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
> Configuration());         DataStreamSource<Tuple2<Long, Integer>> source =
> env.addSource(new SourceFunction<Tuple2<Long, Integer>>() {
>              @Override
>              public void run(SourceContext<Tuple2<Long, Integer>>
> sourceContext) throws Exception {
>                  int i = 0;
>                  while (true) {
>                      Tuple2<Long, Integer> tuple =
> Tuple2.of(System.currentTimeMillis(), i++ % 10);
>                      sourceContext.collect(tuple);
>                  }
>              }             @Override
>              public void cancel() {
>              }         });         source.assignTimestampsAndWatermarks(
>                  // Switch noWatermarks() to forMonotonousTimestamps()
>                  // and values are being printed.
>                  WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
>                          .withTimestampAssigner((t, timestamp) -> t.f0)
>                  ).keyBy(t -> t.f1)
> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>                  .process(new ProcessWindowFunction<Tuple2<Long,
> Integer>, String, Integer, TimeWindow>() {
>                      @Override
>                      public void process(Integer key, Context context,
> Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws
> Exception {
>                          int count = 0;
>                          Iterator<Tuple2<Long, Integer>> iter =
> iterable.iterator();
>                          while (iter.hasNext()) {
>                              count++;
>                              iter.next();
>                          }                         out.collect("Key: " + key 
> + " count: " + count);                     }
>                  }).print();         env.execute();
>      }
> }{code}
>  
> The issue is that the stream makes use of _noWatermarks()_ which effectively 
> disables any event time windowing. 
> As this pipeline can never process values it is faulty and Flink should throw 
> an Exception when starting up. 
>  
> --------------------
> Proposed change:
> We extend the interface 
> [WatermarkStrategy|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L55]
>  with the method _boolean isEventTime()_.
> We create a new class named _EventTimeWindowPreconditions_ and add the 
> following method to it where we make use of _isEventTime()_:
>  
> {code:java}
> public static void hasPrecedingEventTimeGenerator(final 
> List<Transformation<?>> predecessors) {
>     for (int i = predecessors.size() - 1; i >= 0; i--) {
>         final Transformation<?> pre = predecessors.get(i);
>         if (pre instanceof TimestampsAndWatermarksTransformation) {
>             TimestampsAndWatermarksTransformation<?> 
> timestampsAndWatermarksTransformation =
>                     (TimestampsAndWatermarksTransformation<?>) pre;
>             final WatermarkStrategy<?> waStrat = 
> timestampsAndWatermarksTransformation.getWatermarkStrategy();
>             // assert that it generates timestamps or throw exception
>             if (!waStrat.isEventTime()) {
>                 // TODO: Custom exception
>                 throw new IllegalArgumentException(
>                         "Cannot use an EventTime window with a preceding 
> water mark generator which"
>                                 + " does not ingest event times. Did you use 
> noWatermarks() as the WatermarkStrategy"
>                                 + " and used EventTime windows such as 
> SlidingEventTimeWindows/SlidingEventTimeWindows ?"
>                                 + " These windows will never window any 
> values as your stream does not support event time"
>                 );
>             }
>             // We have to terminate the check now as we have found the first 
> most recent
>             // timestamp assigner for this window and ensured that it 
> actually adds event
>             // time stamps. If there has been previously in the chain a 
> window assigner
>             // such as noWatermarks() we can safely ignore it as another 
> valid event time watermark assigner
>             // exists in the chain after and before our current event time 
> window.
>             break;
>         }
>     }
> }
> {code}
>  
> Then we can update the constructors of 
> [AllWindowedStream|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L112]
>  and 
> [WindowedStream|https://github.com/apache/flink/blob/2cb477343de5dce70978c0add5ec58edbaec157c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L79]
>  to:
> {code:java}
> if (windowAssigner.isEventTime()) {
>    
> EventTimeWindowPreconditions.hasPrecedingEventTimeGenerator(input.getTransformation().getInputs());
> }
> {code}
> This is the approach I currently have in mind but not sure whether this is 
> the best approach. 
> Best regards, 
> Dario
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to