[FLINK-4749] [streaming api] Remove redundant processing time timer sets from 
window operator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47e49774
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47e49774
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47e49774

Branch: refs/heads/master
Commit: 47e4977442f7045baf29ce7dc772b7f7aff65343
Parents: 9d24d51
Author: Stephan Ewen <[email protected]>
Authored: Tue Oct 4 23:48:31 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Oct 5 20:04:34 2016 +0200

----------------------------------------------------------------------
 .../windowing/EvictingWindowOperator.java       |  79 +++++------
 .../operators/windowing/WindowOperator.java     | 140 +++++++++----------
 .../runtime/tasks/TestTimeServiceProvider.java  |  72 +++++++++-
 .../operators/windowing/WindowOperatorTest.java |  24 ++--
 4 files changed, 175 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47e49774/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index a838faa..6609e4d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -262,60 +262,53 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
        @Override
        public void trigger(long time) throws Exception {
-               boolean fire;
+               Timer<K, W> timer;
 
-               //Remove information about the triggering task
-               processingTimeTimerFutures.remove(time);
-               processingTimeTimerTimestamps.remove(time, 
processingTimeTimerTimestamps.count(time));
+               while ((timer = processingTimeTimersQueue.peek()) != null && 
timer.timestamp <= time) {
 
-               do {
-                       Timer<K, W> timer = processingTimeTimersQueue.peek();
-                       if (timer != null && timer.timestamp <= time) {
-                               fire = true;
+                       processingTimeTimers.remove(timer);
+                       processingTimeTimersQueue.remove();
 
-                               processingTimeTimers.remove(timer);
-                               processingTimeTimersQueue.remove();
+                       context.key = timer.key;
+                       context.window = timer.window;
+                       setKeyContext(timer.key);
 
-                               context.key = timer.key;
-                               context.window = timer.window;
-                               setKeyContext(timer.key);
-
-                               ListState<StreamRecord<IN>> windowState;
-                               MergingWindowSet<W> mergingWindows = null;
-
-                               if (windowAssigner instanceof 
MergingWindowAssigner) {
-                                       mergingWindows = getMergingWindowSet();
-                                       W stateWindow = 
mergingWindows.getStateWindow(context.window);
-                                       if (stateWindow == null) {
-                                               // then the window is already 
purged and this is a cleanup
-                                               // timer set due to allowed 
lateness that has nothing to clean,
-                                               // so it is safe to just ignore
-                                               continue;
-                                       }
-                                       windowState = 
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
-                               } else {
-                                       windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
-                               }
+                       ListState<StreamRecord<IN>> windowState;
+                       MergingWindowSet<W> mergingWindows = null;
 
-                               Iterable<StreamRecord<IN>> contents = 
windowState.get();
-                               if (contents == null) {
-                                       // if we have no state, there is 
nothing to do
+                       if (windowAssigner instanceof MergingWindowAssigner) {
+                               mergingWindows = getMergingWindowSet();
+                               W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                               if (stateWindow == null) {
+                                       // then the window is already purged 
and this is a cleanup
+                                       // timer set due to allowed lateness 
that has nothing to clean,
+                                       // so it is safe to just ignore
                                        continue;
                                }
+                               windowState = getPartitionedState(stateWindow, 
windowSerializer, windowStateDescriptor);
+                       } else {
+                               windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+                       }
 
-                               TriggerResult triggerResult = 
context.onProcessingTime(timer.timestamp);
-                               if (triggerResult.isFire()) {
-                                       fire(context.window, contents);
-                               }
+                       Iterable<StreamRecord<IN>> contents = windowState.get();
+                       if (contents == null) {
+                               // if we have no state, there is nothing to do
+                               continue;
+                       }
 
-                               if (triggerResult.isPurge() || 
(!windowAssigner.isEventTime() && isCleanupTime(context.window, 
timer.timestamp))) {
-                                       cleanup(context.window, windowState, 
mergingWindows);
-                               }
+                       TriggerResult triggerResult = 
context.onProcessingTime(timer.timestamp);
+                       if (triggerResult.isFire()) {
+                               fire(context.window, contents);
+                       }
 
-                       } else {
-                               fire = false;
+                       if (triggerResult.isPurge() || 
(!windowAssigner.isEventTime() && isCleanupTime(context.window, 
timer.timestamp))) {
+                               cleanup(context.window, windowState, 
mergingWindows);
                        }
-               } while (fire);
+               }
+
+               if (timer != null) {
+                       nextTimer = 
getTimerService().registerTimer(timer.timestamp, this);
+               }
        }
 
        private void fire(W window, Iterable<StreamRecord<IN>> contents) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/47e49774/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index f010822..de316e7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -151,7 +149,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         */
        protected transient TimestampedCollector<OUT> timestampedCollector;
 
-       protected transient Map<Long, ScheduledFuture<?>> 
processingTimeTimerFutures;
+       protected transient ScheduledFuture<?> nextTimer;
 
        /**
         * To keep track of the current watermark so that we can immediately 
fire if a trigger
@@ -172,7 +170,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         */
        protected transient PriorityQueue<Timer<K, W>> 
processingTimeTimersQueue;
        protected transient Set<Timer<K, W>> processingTimeTimers;
-       protected transient Multiset<Long> processingTimeTimerTimestamps;
 
        /**
         * Current waiting watermark callbacks.
@@ -219,7 +216,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                inputSerializer = (TypeSerializer<IN>) 
type.createSerializer(executionConfig);
        }
 
-       @SuppressWarnings("unchecked")
        @Override
        public final void open() throws Exception {
                super.open();
@@ -237,13 +233,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                }
                if (processingTimeTimers == null) {
                        processingTimeTimers = new HashSet<>();
-                       processingTimeTimerTimestamps = HashMultiset.create();
                        processingTimeTimersQueue = new PriorityQueue<>(100);
                }
 
-               //ScheduledFutures are not checkpointed
-               processingTimeTimerFutures = new HashMap<>();
-
                context = new Context(null, null);
 
                windowAssignerContext = new 
WindowAssigner.WindowAssignerContext() {
@@ -261,6 +253,12 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        @Override
        public final void close() throws Exception {
                super.close();
+
+               if (nextTimer != null) {
+                       nextTimer.cancel(false);
+                       nextTimer = null;
+               }
+
                timestampedCollector = null;
                watermarkTimers = null;
                watermarkTimersQueue = null;
@@ -274,6 +272,12 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        @Override
        public void dispose() throws Exception {
                super.dispose();
+
+               if (nextTimer != null) {
+                       nextTimer.cancel(false);
+                       nextTimer = null;
+               }
+
                timestampedCollector = null;
                watermarkTimers = null;
                watermarkTimersQueue = null;
@@ -459,60 +463,53 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
        @Override
        public void trigger(long time) throws Exception {
-               boolean fire;
-
-               //Remove information about the triggering task
-               processingTimeTimerFutures.remove(time);
-               processingTimeTimerTimestamps.remove(time, 
processingTimeTimerTimestamps.count(time));
+               Timer<K, W> timer;
 
-               do {
-                       Timer<K, W> timer = processingTimeTimersQueue.peek();
-                       if (timer != null && timer.timestamp <= time) {
-                               fire = true;
+               while ((timer = processingTimeTimersQueue.peek()) != null && 
timer.timestamp <= time) {
 
-                               processingTimeTimers.remove(timer);
-                               processingTimeTimersQueue.remove();
+                       processingTimeTimers.remove(timer);
+                       processingTimeTimersQueue.remove();
 
-                               context.key = timer.key;
-                               context.window = timer.window;
-                               setKeyContext(timer.key);
+                       context.key = timer.key;
+                       context.window = timer.window;
+                       setKeyContext(timer.key);
 
-                               AppendingState<IN, ACC> windowState;
-                               MergingWindowSet<W> mergingWindows = null;
-
-                               if (windowAssigner instanceof 
MergingWindowAssigner) {
-                                       mergingWindows = getMergingWindowSet();
-                                       W stateWindow = 
mergingWindows.getStateWindow(context.window);
-                                       if (stateWindow == null) {
-                                               // then the window is already 
purged and this is a cleanup
-                                               // timer set due to allowed 
lateness that has nothing to clean,
-                                               // so it is safe to just ignore
-                                               continue;
-                                       }
-                                       windowState = 
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
-                               } else {
-                                       windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
-                               }
+                       AppendingState<IN, ACC> windowState;
+                       MergingWindowSet<W> mergingWindows = null;
 
-                               ACC contents = windowState.get();
-                               if (contents == null) {
-                                       // if we have no state, there is 
nothing to do
+                       if (windowAssigner instanceof MergingWindowAssigner) {
+                               mergingWindows = getMergingWindowSet();
+                               W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                               if (stateWindow == null) {
+                                       // then the window is already purged 
and this is a cleanup
+                                       // timer set due to allowed lateness 
that has nothing to clean,
+                                       // so it is safe to just ignore
                                        continue;
                                }
+                               windowState = getPartitionedState(stateWindow, 
windowSerializer, windowStateDescriptor);
+                       } else {
+                               windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+                       }
 
-                               TriggerResult triggerResult = 
context.onProcessingTime(timer.timestamp);
-                               if (triggerResult.isFire()) {
-                                       fire(context.window, contents);
-                               }
+                       ACC contents = windowState.get();
+                       if (contents == null) {
+                               // if we have no state, there is nothing to do
+                               continue;
+                       }
 
-                               if (triggerResult.isPurge() || 
(!windowAssigner.isEventTime() && isCleanupTime(context.window, 
timer.timestamp))) {
-                                       cleanup(context.window, windowState, 
mergingWindows);
-                               }
+                       TriggerResult triggerResult = 
context.onProcessingTime(timer.timestamp);
+                       if (triggerResult.isFire()) {
+                               fire(context.window, contents);
+                       }
 
-                       } else {
-                               fire = false;
+                       if (triggerResult.isPurge() || 
(!windowAssigner.isEventTime() && isCleanupTime(context.window, 
timer.timestamp))) {
+                               cleanup(context.window, windowState, 
mergingWindows);
                        }
-               } while (fire);
+               }
+
+               if (timer != null) {
+                       nextTimer = 
getTimerService().registerTimer(timer.timestamp, this);
+               }
        }
 
        /**
@@ -719,14 +716,21 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                @Override
                public void registerProcessingTimeTimer(long time) {
                        Timer<K, W> timer = new Timer<>(time, key, window);
+
                        // make sure we only put one timer per key into the 
queue
                        if (processingTimeTimers.add(timer)) {
+
+                               Timer<K, W> oldHead = 
processingTimeTimersQueue.peek();
+                               long nextTriggerTime = oldHead != null ? 
oldHead.timestamp : Long.MAX_VALUE; 
+
                                processingTimeTimersQueue.add(timer);
-                               //If this is the first timer added for this 
timestamp register a TriggerTask
-                               if (processingTimeTimerTimestamps.add(time, 1) 
== 0) {
-                                       ScheduledFuture<?> scheduledFuture = 
WindowOperator.this.getTimerService()
-                                               .registerTimer(time, 
WindowOperator.this);
-                                       processingTimeTimerFutures.put(time, 
scheduledFuture);
+
+                               // check if we need to re-schedule our timer to 
earlier
+                               if (time < nextTriggerTime) {
+                                       if (nextTimer != null) {
+                                               nextTimer.cancel(false);
+                                       }
+                                       nextTimer = 
getTimerService().registerTimer(time, WindowOperator.this);
                                }
                        }
                }
@@ -746,14 +750,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        if (processingTimeTimers.remove(timer)) {
                                processingTimeTimersQueue.remove(timer);
                        }
-
-                       //If there are no timers left for this timestamp, 
remove it from queue and cancel TriggerTask
-                       if (processingTimeTimerTimestamps.remove(time,1) == 1) {
-                               ScheduledFuture<?> triggerTaskFuture = 
processingTimeTimerFutures.remove(timer.timestamp);
-                               if (triggerTaskFuture != null && 
!triggerTaskFuture.isDone()) {
-                                       triggerTaskFuture.cancel(false);
-                               }
-                       }
                }
 
                @Override
@@ -904,12 +900,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        processingTimeTimers.add(timer);
                }
 
-               int numProcessingTimeTimerTimestamp = in.readInt();
-               processingTimeTimerTimestamps = HashMultiset.create();
-               for (int i = 0; i< numProcessingTimeTimerTimestamp; i++) {
-                       long timestamp = in.readLong();
-                       int count = in.readInt();
-                       processingTimeTimerTimestamps.add(timestamp, count);
+               if (numProcessingTimeTimers > 0) {
+                       nextTimer = 
getTimerService().registerTimer(processingTimeTimersQueue.peek().timestamp, 
this);
                }
        }
 
@@ -927,12 +919,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        windowSerializer.serialize(timer.window, out);
                        out.writeLong(timer.timestamp);
                }
-
-               out.writeInt(processingTimeTimerTimestamps.entrySet().size());
-               for (Multiset.Entry<Long> timerTimestampCounts: 
processingTimeTimerTimestamps.entrySet()) {
-                       out.writeLong(timerTimestampCounts.getElement());
-                       out.writeInt(timerTimestampCounts.getCount());
-               }
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/47e49774/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
index 81faec9..f4bead9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
@@ -24,7 +24,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * This is a {@link TimeServiceProvider} used <b>strictly for testing</b> the
@@ -32,13 +36,14 @@ import java.util.concurrent.ScheduledFuture;
  * */
 public class TestTimeServiceProvider extends TimeServiceProvider {
 
-       private long currentTime = 0;
+       private volatile long currentTime = 0;
 
-       private boolean isTerminated = false;
+       private volatile boolean isTerminated;
 
        // sorts the timers by timestamp so that they are processed in the 
correct order.
-       private Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>();
+       private final Map<Long, List<Triggerable>> registeredTasks = new 
TreeMap<>();
 
+       
        public void setCurrentTime(long timestamp) throws Exception {
                this.currentTime = timestamp;
 
@@ -72,13 +77,26 @@ public class TestTimeServiceProvider extends 
TimeServiceProvider {
 
        @Override
        public ScheduledFuture<?> registerTimer(long timestamp, Triggerable 
target) {
+               if (isTerminated) {
+                       throw new IllegalStateException("terminated");
+               }
+
+               if (timestamp <= currentTime) {
+                       try {
+                               target.trigger(timestamp);
+                       } catch (Exception e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+
                List<Triggerable> tasks = registeredTasks.get(timestamp);
                if (tasks == null) {
                        tasks = new ArrayList<>();
                        registeredTasks.put(timestamp, tasks);
                }
                tasks.add(target);
-               return null;
+
+               return new DummyFuture();
        }
 
        @Override
@@ -86,6 +104,11 @@ public class TestTimeServiceProvider extends 
TimeServiceProvider {
                return isTerminated;
        }
 
+       @Override
+       public void shutdownService() throws Exception {
+               isTerminated = true;
+       }
+
        public int getNumRegisteredTimers() {
                int count = 0;
                for (List<Triggerable> tasks: registeredTasks.values()) {
@@ -94,8 +117,43 @@ public class TestTimeServiceProvider extends 
TimeServiceProvider {
                return count;
        }
 
-       @Override
-       public void shutdownService() throws Exception {
-               this.isTerminated = true;
+       // 
------------------------------------------------------------------------
+
+       private static class DummyFuture implements ScheduledFuture<Object> {
+
+               @Override
+               public long getDelay(TimeUnit unit) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public int compareTo(Delayed o) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public boolean cancel(boolean mayInterruptIfRunning) {
+                       return true;
+               }
+
+               @Override
+               public boolean isCancelled() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public boolean isDone() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public Object get() throws InterruptedException, 
ExecutionException {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public Object get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
+                       throw new UnsupportedOperationException();
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/47e49774/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index fd73bcc..e98bc91 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -876,6 +876,9 @@ public class WindowOperatorTest extends TestLogger {
                final int WINDOW_SLIDE = 1;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+               TestTimeServiceProvider timer = new TestTimeServiceProvider();
+
+               TestTimeServiceProvider timer = new TestTimeServiceProvider();
 
                ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
                                new SumReducer(),
@@ -893,9 +896,10 @@ public class WindowOperatorTest extends TestLogger {
 
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO); ;
-
-               operator.setInputType(inputType, new ExecutionConfig());
+                               new KeyedOneInputStreamOperatorTestHarness<>(
+                                               operator, new 
ExecutionConfig(), timer,
+                                               new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+               
                testHarness.open();
 
                WindowOperator.Timer<String, TimeWindow> timer1 = new 
WindowOperator.Timer<>(1L, "key1", new TimeWindow(1L, 2L));
@@ -907,12 +911,7 @@ public class WindowOperatorTest extends TestLogger {
                operator.processingTimeTimersQueue.add(timer1);
                operator.processingTimeTimersQueue.add(timer2);
                operator.processingTimeTimersQueue.add(timer3);
-
-               operator.processingTimeTimerTimestamps.add(1L, 10);
-               operator.processingTimeTimerTimestamps.add(2L, 5);
-               operator.processingTimeTimerTimestamps.add(3L, 1);
-
-
+               
                StreamStateHandle snapshot = testHarness.snapshot(0, 0);
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> otherOperator = new 
WindowOperator<>(
@@ -926,9 +925,9 @@ public class WindowOperatorTest extends TestLogger {
                                0);
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> otherTestHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(otherOperator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               otherOperator.setInputType(inputType, new ExecutionConfig());
+                               new KeyedOneInputStreamOperatorTestHarness<>(
+                                               otherOperator, new 
ExecutionConfig(), timer,
+                                               new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
                otherTestHarness.setup();
                otherTestHarness.restore(snapshot);
@@ -936,7 +935,6 @@ public class WindowOperatorTest extends TestLogger {
 
                Assert.assertEquals(operator.processingTimeTimers, 
otherOperator.processingTimeTimers);
                
Assert.assertArrayEquals(operator.processingTimeTimersQueue.toArray(), 
otherOperator.processingTimeTimersQueue.toArray());
-               Assert.assertEquals(operator.processingTimeTimerTimestamps, 
otherOperator.processingTimeTimerTimestamps);
        }
 
        @Test

Reply via email to