[
https://issues.apache.org/jira/browse/FLINK-4207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391927#comment-15391927
]
ASF GitHub Bot commented on FLINK-4207:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2277#discussion_r72067210
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
---
@@ -408,11 +410,22 @@ public void processWatermark(Watermark mark) throws
Exception {
if (windowAssigner instanceof
MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow =
mergingWindows.getStateWindow(context.window);
+ if (stateWindow == null) {
+ // then the window is already
purged and this is a cleanup
+ // timer set due to allowed
lateness that has nothing to clean,
+ // so it is safe to just ignore
+ continue;
+ }
windowState =
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
} else {
windowState =
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
}
+ if (windowState.get() == null) {
--- End diff --
This doubles the amount of accesses to the state. For example, for RocksDB
that's two DB lookups instead of one. I think we can get the state contents
once and pass that to `fireOrContinue()` instead of the state object.
Same for other instances of this snipped.
> WindowOperator becomes very slow with allowed lateness
> ------------------------------------------------------
>
> Key: FLINK-4207
> URL: https://issues.apache.org/jira/browse/FLINK-4207
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.1.0
> Reporter: Aljoscha Krettek
> Assignee: Kostas Kloudas
> Priority: Blocker
>
> In this simple example the throughput (as measured by the count the window
> emits) becomes very low when an allowed lateness is set:
> {code}
> public class WindowWordCount {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> env.setParallelism(1);
> env.addSource(new InfiniteTupleSource(100_000))
> .keyBy(0)
> .timeWindow(Time.seconds(3))
> .allowedLateness(Time.seconds(1))
> .reduce(new ReduceFunction<Tuple2<String,
> Integer>>() {
> @Override
> public Tuple2<String, Integer>
> reduce(Tuple2<String, Integer> value1,
> Tuple2<String, Integer>
> value2) throws Exception {
> return Tuple2.of(value1.f0,
> value1.f1 + value2.f1);
> }
> })
> .filter(new FilterFunction<Tuple2<String,
> Integer>>() {
> private static final long
> serialVersionUID = 1L;
> @Override
> public boolean filter(Tuple2<String,
> Integer> value) throws Exception {
> return
> value.f0.startsWith("Tuple 0");
> }
> })
> .print();
> // execute program
> env.execute("WindowWordCount");
> }
> public static class InfiniteTupleSource implements
> ParallelSourceFunction<Tuple2<String, Integer>> {
> private static final long serialVersionUID = 1L;
> private int numGroups;
> public InfiniteTupleSource(int numGroups) {
> this.numGroups = numGroups;
> }
> @Override
> public void run(SourceContext<Tuple2<String, Integer>> out)
> throws Exception {
> long index = 0;
> while (true) {
> Tuple2<String, Integer> tuple = new
> Tuple2<>("Tuple " + (index % numGroups), 1);
> out.collect(tuple);
> index++;
> }
> }
> @Override
> public void cancel() {
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)