[ 
https://issues.apache.org/jira/browse/FLINK-4207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391927#comment-15391927
 ] 

ASF GitHub Bot commented on FLINK-4207:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2277#discussion_r72067210
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
    @@ -408,11 +410,22 @@ public void processWatermark(Watermark mark) throws 
Exception {
                                if (windowAssigner instanceof 
MergingWindowAssigner) {
                                        mergingWindows = getMergingWindowSet();
                                        W stateWindow = 
mergingWindows.getStateWindow(context.window);
    +                                   if (stateWindow == null) {
    +                                           // then the window is already 
purged and this is a cleanup
    +                                           // timer set due to allowed 
lateness that has nothing to clean,
    +                                           // so it is safe to just ignore
    +                                           continue;
    +                                   }
                                        windowState = 
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
                                } else {
                                        windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
                                }
     
    +                           if (windowState.get() == null) {
    --- End diff --
    
    This doubles the amount of accesses to the state. For example, for RocksDB 
that's two DB lookups instead of one. I think we can get the state contents 
once and pass that to `fireOrContinue()` instead of the state object. 
    
    Same for other instances of this snipped. 


> WindowOperator becomes very slow with allowed lateness
> ------------------------------------------------------
>
>                 Key: FLINK-4207
>                 URL: https://issues.apache.org/jira/browse/FLINK-4207
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.1.0
>            Reporter: Aljoscha Krettek
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>
> In this simple example the throughput (as measured by the count the window 
> emits) becomes very low when an allowed lateness is set:
> {code}
> public class WindowWordCount {
>       public static void main(String[] args) throws Exception {
>               final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>               env.setParallelism(1);
>               env.addSource(new InfiniteTupleSource(100_000))
>                               .keyBy(0)
>                               .timeWindow(Time.seconds(3))
>                               .allowedLateness(Time.seconds(1))
>                               .reduce(new ReduceFunction<Tuple2<String, 
> Integer>>() {
>                                       @Override
>                                       public Tuple2<String, Integer> 
> reduce(Tuple2<String, Integer> value1,
>                                                       Tuple2<String, Integer> 
> value2) throws Exception {
>                                               return Tuple2.of(value1.f0, 
> value1.f1 + value2.f1);
>                                       }
>                               })
>                               .filter(new FilterFunction<Tuple2<String, 
> Integer>>() {
>                                       private static final long 
> serialVersionUID = 1L;
>                                       @Override
>                                       public boolean filter(Tuple2<String, 
> Integer> value) throws Exception {
>                                               return 
> value.f0.startsWith("Tuple 0");
>                                       }
>                               })
>                               .print();
>               // execute program
>               env.execute("WindowWordCount");
>       }
>       public static class InfiniteTupleSource implements 
> ParallelSourceFunction<Tuple2<String, Integer>> {
>               private static final long serialVersionUID = 1L;
>               private int numGroups;
>               public InfiniteTupleSource(int numGroups) {
>                       this.numGroups = numGroups;
>               }
>               @Override
>               public void run(SourceContext<Tuple2<String, Integer>> out) 
> throws Exception {
>                       long index = 0;
>                       while (true) {
>                               Tuple2<String, Integer> tuple = new 
> Tuple2<>("Tuple " + (index % numGroups), 1);
>                               out.collect(tuple);
>                               index++;
>                       }
>               }
>               @Override
>               public void cancel() {
>               }
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to