[hotfix] [streaming api] Cleanup watermark initialization in window operator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d24d51f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d24d51f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d24d51f Branch: refs/heads/master Commit: 9d24d51f1d19805e23cab370db0f4bbf1c0038bc Parents: dd3416f Author: Stephan Ewen <[email protected]> Authored: Tue Oct 4 23:13:53 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Oct 5 19:36:14 2016 +0200 ---------------------------------------------------------------------- .../runtime/operators/windowing/WindowOperator.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9d24d51f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index ffdf334..f010822 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -63,7 +63,6 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.io.IOException; -import java.io.ObjectInputStream; import java.io.Serializable; import java.util.Collection; import java.util.HashMap; @@ -158,7 +157,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> * To keep track of the current watermark so that we can immediately fire if a trigger * registers an event time callback for a timestamp that lies in the past. */ - protected transient long currentWatermark = Long.MIN_VALUE; + protected long currentWatermark = Long.MIN_VALUE; protected transient Context context = new Context(null, null); @@ -214,11 +213,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> setChainingStrategy(ChainingStrategy.ALWAYS); } - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - currentWatermark = -1; - } - @Override @SuppressWarnings("unchecked") public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { @@ -262,8 +256,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> if (windowAssigner instanceof MergingWindowAssigner) { mergingWindowsByKey = new HashMap<>(); } - - currentWatermark = Long.MIN_VALUE; } @Override
