Aljoscha Krettek created FLINK-4207:
---------------------------------------

             Summary: WindowOperator becomes very slow with allowed lateness
                 Key: FLINK-4207
                 URL: https://issues.apache.org/jira/browse/FLINK-4207
             Project: Flink
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.1.0
            Reporter: Aljoscha Krettek


In this simple example the throughput (as measured by the count the window 
emits) becomes very low when an allowed lateness is set:

{code}
public class WindowWordCount {

        public static void main(String[] args) throws Exception {

                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
                env.setParallelism(1);

                env.addSource(new InfiniteTupleSource(100_000))
                                .keyBy(0)
                                .timeWindow(Time.seconds(3))
                                .allowedLateness(Time.seconds(1))
                                .reduce(new ReduceFunction<Tuple2<String, 
Integer>>() {
                                        @Override
                                        public Tuple2<String, Integer> 
reduce(Tuple2<String, Integer> value1,
                                                        Tuple2<String, Integer> 
value2) throws Exception {
                                                return Tuple2.of(value1.f0, 
value1.f1 + value2.f1);
                                        }
                                })
                                .filter(new FilterFunction<Tuple2<String, 
Integer>>() {
                                        private static final long 
serialVersionUID = 1L;

                                        @Override
                                        public boolean filter(Tuple2<String, 
Integer> value) throws Exception {
                                                return 
value.f0.startsWith("Tuple 0");
                                        }
                                })
                                .print();

                // execute program
                env.execute("WindowWordCount");
        }

        public static class InfiniteTupleSource implements 
ParallelSourceFunction<Tuple2<String, Integer>> {
                private static final long serialVersionUID = 1L;

                private int numGroups;

                public InfiniteTupleSource(int numGroups) {
                        this.numGroups = numGroups;
                }

                @Override
                public void run(SourceContext<Tuple2<String, Integer>> out) 
throws Exception {
                        long index = 0;
                        while (true) {
                                Tuple2<String, Integer> tuple = new 
Tuple2<>("Tuple " + (index % numGroups), 1);
                                out.collect(tuple);
                                index++;
                        }
                }

                @Override
                public void cancel() {
                }
        }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to