[FLINK-5093] Fix bug about throwing ConcurrentModificationException when 
stopping TimerService.

[FLINK-5093] Remove useless import.

This closes #2828.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eefcbbde
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eefcbbde
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eefcbbde

Branch: refs/heads/master
Commit: eefcbbded159ab5819fe2b09606f8a33b9150254
Parents: c424900
Author: biao.liub <[email protected]>
Authored: Fri Nov 18 18:15:37 2016 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Fri Dec 23 20:54:26 2016 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/slot/TimerService.java | 16 ++++-
 .../taskexecutor/slot/TimerServiceTest.java     | 68 ++++++++++++++++++++
 2 files changed, 81 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eefcbbde/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
index e28e801..14c9ab1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
@@ -60,9 +60,7 @@ public class TimerService<K> {
        }
 
        public void stop() {
-               for (K key: timeouts.keySet()) {
-                       unregisterTimeout(key);
-               }
+               unregisterAllTimeouts();
 
                timeoutListener = null;
 
@@ -101,6 +99,18 @@ public class TimerService<K> {
        }
 
        /**
+        * Unregister all timeouts.
+        */
+       protected void unregisterAllTimeouts() {
+               for (Timeout<K> timeout : timeouts.values()) {
+                       if (timeout != null) {
+                               timeout.cancel();
+                       }
+               }
+               timeouts.clear();
+       }
+
+       /**
         * Check whether the timeout for the given key and ticket is still 
valid (not yet unregistered
         * and not yet overwritten).
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/eefcbbde/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
new file mode 100644
index 0000000..9dd5f39
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TimerServiceTest {
+       /**
+        * Test all timeouts registered can be unregistered
+        * @throws Exception
+   */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testUnregisterAllTimeouts() throws Exception {
+               // Prepare all instances.
+               ScheduledExecutorService scheduledExecutorService = 
mock(ScheduledExecutorService.class);
+               ScheduledFuture scheduledFuture = mock(ScheduledFuture.class);
+               when(scheduledExecutorService.schedule(any(Runnable.class), 
anyLong(), any(TimeUnit.class)))
+                       .thenReturn(scheduledFuture);
+               TimerService<AllocationID> timerService = new 
TimerService<>(scheduledExecutorService);
+               TimeoutListener<AllocationID> listener = 
mock(TimeoutListener.class);
+
+               timerService.start(listener);
+
+               // Invoke register and unregister.
+               timerService.registerTimeout(new AllocationID(), 10, 
TimeUnit.SECONDS);
+               timerService.registerTimeout(new AllocationID(), 10, 
TimeUnit.SECONDS);
+
+               timerService.unregisterAllTimeouts();
+
+               // Verify.
+               Map<?, ?> timeouts = (Map<?, ?>) 
Whitebox.getInternalState(timerService, "timeouts");
+               assertTrue(timeouts.isEmpty());
+               verify(scheduledFuture, times(2)).cancel(true);
+       }
+
+}

Reply via email to