[ 
https://issues.apache.org/jira/browse/FLINK-4207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15387644#comment-15387644
 ] 

ASF GitHub Bot commented on FLINK-4207:
---------------------------------------

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/2277

    [FLINK-4207] WindowOperator becomes very slow with allowed lateness

    Fixes the slowdown due to the allowedLateness.
    The problem was the deleteCleanupTimer that was called on cleanup() for 
each window.
    This was causing a traversal of the whole list of timers in order to delete 
the correct one.
    
    Now we let the timer be processed as a regular timer (even if the state has 
already been cleaned) and if the state is already cleaned up then we do nothing 
and just ignore it.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink window_slowdown_inv

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2277.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2277
    
----
commit 0454a211d7711d6ed484577e8e64bafb669336e5
Author: kl0u <[email protected]>
Date:   2016-07-18T09:37:06Z

    [FLINK-4207] WindowOperator becomes very slow with allowed lateness

----


> WindowOperator becomes very slow with allowed lateness
> ------------------------------------------------------
>
>                 Key: FLINK-4207
>                 URL: https://issues.apache.org/jira/browse/FLINK-4207
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.1.0
>            Reporter: Aljoscha Krettek
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>
> In this simple example the throughput (as measured by the count the window 
> emits) becomes very low when an allowed lateness is set:
> {code}
> public class WindowWordCount {
>       public static void main(String[] args) throws Exception {
>               final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>               env.setParallelism(1);
>               env.addSource(new InfiniteTupleSource(100_000))
>                               .keyBy(0)
>                               .timeWindow(Time.seconds(3))
>                               .allowedLateness(Time.seconds(1))
>                               .reduce(new ReduceFunction<Tuple2<String, 
> Integer>>() {
>                                       @Override
>                                       public Tuple2<String, Integer> 
> reduce(Tuple2<String, Integer> value1,
>                                                       Tuple2<String, Integer> 
> value2) throws Exception {
>                                               return Tuple2.of(value1.f0, 
> value1.f1 + value2.f1);
>                                       }
>                               })
>                               .filter(new FilterFunction<Tuple2<String, 
> Integer>>() {
>                                       private static final long 
> serialVersionUID = 1L;
>                                       @Override
>                                       public boolean filter(Tuple2<String, 
> Integer> value) throws Exception {
>                                               return 
> value.f0.startsWith("Tuple 0");
>                                       }
>                               })
>                               .print();
>               // execute program
>               env.execute("WindowWordCount");
>       }
>       public static class InfiniteTupleSource implements 
> ParallelSourceFunction<Tuple2<String, Integer>> {
>               private static final long serialVersionUID = 1L;
>               private int numGroups;
>               public InfiniteTupleSource(int numGroups) {
>                       this.numGroups = numGroups;
>               }
>               @Override
>               public void run(SourceContext<Tuple2<String, Integer>> out) 
> throws Exception {
>                       long index = 0;
>                       while (true) {
>                               Tuple2<String, Integer> tuple = new 
> Tuple2<>("Tuple " + (index % numGroups), 1);
>                               out.collect(tuple);
>                               index++;
>                       }
>               }
>               @Override
>               public void cancel() {
>               }
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to