Aljoscha Krettek created FLINK-4207: ---------------------------------------
Summary: WindowOperator becomes very slow with allowed lateness Key: FLINK-4207 URL: https://issues.apache.org/jira/browse/FLINK-4207 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.1.0 Reporter: Aljoscha Krettek In this simple example the throughput (as measured by the count the window emits) becomes very low when an allowed lateness is set: {code} public class WindowWordCount { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(1); env.addSource(new InfiniteTupleSource(100_000)) .keyBy(0) .timeWindow(Time.seconds(3)) .allowedLateness(Time.seconds(1)) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }) .filter(new FilterFunction<Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public boolean filter(Tuple2<String, Integer> value) throws Exception { return value.f0.startsWith("Tuple 0"); } }) .print(); // execute program env.execute("WindowWordCount"); } public static class InfiniteTupleSource implements ParallelSourceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; private int numGroups; public InfiniteTupleSource(int numGroups) { this.numGroups = numGroups; } @Override public void run(SourceContext<Tuple2<String, Integer>> out) throws Exception { long index = 0; while (true) { Tuple2<String, Integer> tuple = new Tuple2<>("Tuple " + (index % numGroups), 1); out.collect(tuple); index++; } } @Override public void cancel() { } } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)