[FLINK-5798] [rpc] Let the RpcService provide a ScheduledExecutorService

This PR adds the getScheduledExecutorService method to the RpcService 
interface. So
henceforth all RpcService implementations have to provide a 
ScheduledExecutorService
implementation.

Currently, we only support the AkkaRpcService. The AkkaRpcService returns a
ScheduledExecutorService proxy which forwards the schedule calls to the 
ActorSystem's
internal scheduler.

Introduce ScheduledExecutor interface to hide service methods from the 
ScheduledExecutorService

This closes #3310.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ccf458dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ccf458dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ccf458dd

Branch: refs/heads/master
Commit: ccf458dd4d173b3370257177c2bbd9680baa6511
Parents: 5983069
Author: Till Rohrmann <[email protected]>
Authored: Tue Feb 14 16:50:43 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Feb 24 14:48:52 2017 +0100

----------------------------------------------------------------------
 .../runtime/concurrent/ScheduledExecutor.java   |  92 ++++++++++
 .../ScheduledExecutorServiceAdapter.java        |  64 +++++++
 .../apache/flink/runtime/rpc/RpcService.java    |  15 ++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 178 +++++++++++++++++++
 .../runtime/rpc/TestingSerialRpcService.java    |  34 ++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    | 160 +++++++++++++++++
 6 files changed, 543 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
new file mode 100644
index 0000000..c1b47e2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Extension for the {@link Executor} interface which is enriched by method 
for scheduling tasks
+ * in the future.
+ */
+public interface ScheduledExecutor extends Executor {
+
+       /**
+        * Executes the given command after the given delay.
+        *
+        * @param command the task to execute in the future
+        * @param delay the time from now to delay the execution
+        * @param unit the time unit of the delay parameter
+        * @return a ScheduledFuture representing the completion of the 
scheduled task
+        */
+       ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit);
+
+       /**
+        * Executes the given callable after the given delay. The result of the 
callable is returned
+        * as a {@link ScheduledFuture}.
+        *
+        * @param callable the callable to execute
+        * @param delay the time from now to delay the execution
+        * @param unit the time unit of the delay parameter
+        * @param <V> result type of the callable
+        * @return a ScheduledFuture which holds the future value of the given 
callable
+        */
+       <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit);
+
+       /**
+        * Executes the given command periodically. The first execution is 
started after the
+        * {@code initialDelay}, the second execution is started after {@code 
initialDelay + period},
+        * the third after {@code initialDelay + 2*period} and so on.
+        * The task is executed until either an execution fails, or the 
returned {@link ScheduledFuture}
+        * is cancelled.
+        *
+        * @param command the task to be executed periodically
+        * @param initialDelay the time from now until the first execution is 
triggered
+        * @param period the time after which the next execution is triggered
+        * @param unit the time unit of the delay and period parameter
+        * @return a ScheduledFuture representing the periodic task. This 
future never completes
+        * unless an execution of the given task fails or if the future is 
cancelled
+        */
+       ScheduledFuture<?> scheduleAtFixedRate(
+               Runnable command,
+               long initialDelay,
+               long period,
+               TimeUnit unit);
+
+       /**
+        * Executed the given command repeatedly with the given delay between 
the end of an execution
+        * and the start of the next execution.
+        * The task is executed repeatedly until either an exception occurs or 
if the returned
+        * {@link ScheduledFuture} is cancelled.
+        *
+        * @param command the task to execute repeatedly
+        * @param initialDelay the time from now until the first execution is 
triggered
+        * @param delay the time between the end of the current and the start 
of the next execution
+        * @param unit the time unit of the initial delay and the delay 
parameter
+        * @return a ScheduledFuture representing the repeatedly executed task. 
This future never
+        * completes unless th exectuion of the given task fails or if the 
future is cancelled
+        */
+       ScheduledFuture<?> scheduleWithFixedDelay(
+               Runnable command,
+               long initialDelay,
+               long delay,
+               TimeUnit unit);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java
new file mode 100644
index 0000000..7662c35
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Adapter class for a {@link ScheduledExecutorService} which shall be used as 
a
+ * {@link ScheduledExecutor}.
+ */
+public class ScheduledExecutorServiceAdapter implements ScheduledExecutor {
+
+       private final ScheduledExecutorService scheduledExecutorService;
+
+       public ScheduledExecutorServiceAdapter(ScheduledExecutorService 
scheduledExecutorService) {
+               this.scheduledExecutorService = 
Preconditions.checkNotNull(scheduledExecutorService);
+       }
+
+       @Override
+       public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
+               return scheduledExecutorService.schedule(command, delay, unit);
+       }
+
+       @Override
+       public <V> ScheduledFuture<V> schedule(Callable<V> callable, long 
delay, TimeUnit unit) {
+               return scheduledExecutorService.schedule(callable, delay, unit);
+       }
+
+       @Override
+       public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long 
initialDelay, long period, TimeUnit unit) {
+               return scheduledExecutorService.scheduleAtFixedRate(command, 
initialDelay, period, unit);
+       }
+
+       @Override
+       public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long 
initialDelay, long delay, TimeUnit unit) {
+               return scheduledExecutorService.scheduleWithFixedDelay(command, 
initialDelay, delay, unit);
+       }
+
+       @Override
+       public void execute(Runnable command) {
+               scheduledExecutorService.execute(command);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 4b9100a..2d2019a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 
 import java.util.concurrent.Callable;
@@ -98,6 +99,20 @@ public interface RpcService {
        Executor getExecutor();
 
        /**
+        * Gets a scheduled executor from the RPC service. This executor can be 
used to schedule
+        * tasks to be executed in the future.
+        *
+        * <p><b>IMPORTANT:</b> This executor does not isolate the method 
invocations against
+        * any concurrent invocations and is therefore not suitable to run 
completion methods of futures
+        * that modify state of an {@link RpcEndpoint}. For such operations, 
one needs to use the
+        * {@link RpcEndpoint#getMainThreadExecutor() 
MainThreadExecutionContext} of that
+        * {@code RpcEndpoint}.
+        *
+        * @return The RPC service provided scheduled executor
+        */
+       ScheduledExecutor getScheduledExecutor();
+
+       /**
         * Execute the runnable in the execution context of this RPC Service, 
as returned by
         * {@link #getExecutor()}, after a scheduled delay.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 6e3fb40..6a6a85d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -23,6 +23,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
+import akka.actor.Cancellable;
 import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -34,6 +35,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
@@ -43,18 +45,24 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import scala.concurrent.duration.FiniteDuration;
 
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
 import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -81,6 +89,8 @@ public class AkkaRpcService implements RpcService {
 
        private final String address;
 
+       private final ScheduledExecutor internalScheduledExecutor;
+
        private volatile boolean stopped;
 
        public AkkaRpcService(final ActorSystem actorSystem, final Time 
timeout) {
@@ -101,6 +111,8 @@ public class AkkaRpcService implements RpcService {
                } else {
                        address = "";
                }
+
+               internalScheduledExecutor = new 
InternalScheduledExecutorImpl(actorSystem);
        }
 
        @Override
@@ -259,6 +271,10 @@ public class AkkaRpcService implements RpcService {
                return actorSystem.dispatcher();
        }
 
+       public ScheduledExecutor getScheduledExecutor() {
+               return internalScheduledExecutor;
+       }
+
        @Override
        public void scheduleRunnable(Runnable runnable, long delay, TimeUnit 
unit) {
                checkNotNull(runnable, "runnable");
@@ -279,4 +295,166 @@ public class AkkaRpcService implements RpcService {
 
                return new FlinkFuture<>(scalaFuture);
        }
+
+       /**
+        * Helper class to expose the internal scheduling logic via a {@link 
ScheduledExecutor}.
+        */
+       private static final class InternalScheduledExecutorImpl implements 
ScheduledExecutor {
+
+               private final ActorSystem actorSystem;
+
+               private InternalScheduledExecutorImpl(ActorSystem actorSystem) {
+                       this.actorSystem = 
Preconditions.checkNotNull(actorSystem, "rpcService");
+               }
+
+               @Override
+               @Nonnull
+               public ScheduledFuture<?> schedule(@Nonnull Runnable command, 
long delay, @Nonnull TimeUnit unit) {
+                       ScheduledFutureTask<Void> scheduledFutureTask = new 
ScheduledFutureTask<>(command, unit.toNanos(delay), 0L);
+
+                       Cancellable cancellable = 
internalSchedule(scheduledFutureTask, delay, unit);
+
+                       scheduledFutureTask.setCancellable(cancellable);
+
+                       return scheduledFutureTask;
+               }
+
+               @Override
+               @Nonnull
+               public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> 
callable, long delay, @Nonnull TimeUnit unit) {
+                       ScheduledFutureTask<V> scheduledFutureTask = new 
ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L);
+
+                       Cancellable cancellable = 
internalSchedule(scheduledFutureTask, delay, unit);
+
+                       scheduledFutureTask.setCancellable(cancellable);
+
+                       return scheduledFutureTask;
+               }
+
+               @Override
+               @Nonnull
+               public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable 
command, long initialDelay, long period, @Nonnull TimeUnit unit) {
+                       ScheduledFutureTask<Void> scheduledFutureTask = new 
ScheduledFutureTask<>(
+                               command,
+                               triggerTime(unit.toNanos(initialDelay)),
+                               unit.toNanos(period));
+
+                       Cancellable cancellable = 
actorSystem.scheduler().schedule(
+                               new FiniteDuration(initialDelay, unit),
+                               new FiniteDuration(period, unit),
+                               scheduledFutureTask,
+                               actorSystem.dispatcher());
+
+                       scheduledFutureTask.setCancellable(cancellable);
+
+                       return scheduledFutureTask;
+               }
+
+               @Override
+               @Nonnull
+               public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull 
Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) {
+                       ScheduledFutureTask<Void> scheduledFutureTask = new 
ScheduledFutureTask<>(
+                               command,
+                               triggerTime(unit.toNanos(initialDelay)),
+                               unit.toNanos(-delay));
+
+                       Cancellable cancellable = 
internalSchedule(scheduledFutureTask, initialDelay, unit);
+
+                       scheduledFutureTask.setCancellable(cancellable);
+
+                       return scheduledFutureTask;
+               }
+
+               @Override
+               public void execute(@Nonnull Runnable command) {
+                       actorSystem.dispatcher().execute(command);
+               }
+
+               private Cancellable internalSchedule(Runnable runnable, long 
delay, TimeUnit unit) {
+                       return actorSystem.scheduler().scheduleOnce(
+                               new FiniteDuration(delay, unit),
+                               runnable,
+                               actorSystem.dispatcher());
+               }
+
+               private long now() {
+                       return System.nanoTime();
+               }
+
+               private long triggerTime(long delay) {
+                       return now() + delay;
+               }
+
+               private final class ScheduledFutureTask<V> extends 
FutureTask<V> implements RunnableScheduledFuture<V> {
+
+                       private long time;
+
+                       private final long period;
+
+                       private volatile Cancellable cancellable;
+
+                       ScheduledFutureTask(Callable<V> callable, long time, 
long period) {
+                               super(callable);
+                               this.time = time;
+                               this.period = period;
+                       }
+
+                       ScheduledFutureTask(Runnable runnable, long time, long 
period) {
+                               super(runnable, null);
+                               this.time = time;
+                               this.period = period;
+                       }
+
+                       public void setCancellable(Cancellable newCancellable) {
+                               this.cancellable = newCancellable;
+                       }
+
+                       @Override
+                       public void run() {
+                               if (!isPeriodic()) {
+                                       super.run();
+                               } else if (runAndReset()){
+                                       if (period > 0L) {
+                                               time += period;
+                                       } else {
+                                               cancellable = 
internalSchedule(this, -period, TimeUnit.NANOSECONDS);
+
+                                               // check whether we have been 
cancelled concurrently
+                                               if (isCancelled()) {
+                                                       cancellable.cancel();
+                                               } else {
+                                                       time = 
triggerTime(-period);
+                                               }
+                                       }
+                               }
+                       }
+
+                       @Override
+                       public boolean cancel(boolean mayInterruptIfRunning) {
+                               boolean result = 
super.cancel(mayInterruptIfRunning);
+
+                               return result && cancellable.cancel();
+                       }
+
+                       @Override
+                       public long getDelay(@Nonnull  TimeUnit unit) {
+                               return unit.convert(time - now(), 
TimeUnit.NANOSECONDS);
+                       }
+
+                       @Override
+                       public int compareTo(@Nonnull Delayed o) {
+                               if (o == this) {
+                                       return 0;
+                               }
+
+                               long diff = getDelay(TimeUnit.NANOSECONDS) - 
o.getDelay(TimeUnit.NANOSECONDS);
+                               return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0;
+                       }
+
+                       @Override
+                       public boolean isPeriodic() {
+                               return period != 0L;
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 1d30ea4..07edfef 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.util.Preconditions;
@@ -31,10 +33,13 @@ import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.util.BitSet;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -46,13 +51,19 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class TestingSerialRpcService implements RpcService {
 
        private final DirectExecutorService executorService;
+       private final ScheduledExecutorService scheduledExecutorService;
        private final ConcurrentHashMap<String, RpcGateway> 
registeredConnections;
        private final CompletableFuture<Void> terminationFuture;
 
+       private final ScheduledExecutor scheduledExecutorServiceAdapter;
+
        public TestingSerialRpcService() {
                executorService = new DirectExecutorService();
+               scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
                this.registeredConnections = new ConcurrentHashMap<>(16);
                this.terminationFuture = new FlinkCompletableFuture<>();
+
+               this.scheduledExecutorServiceAdapter = new 
ScheduledExecutorServiceAdapter(scheduledExecutorService);
        }
 
        @Override
@@ -86,9 +97,32 @@ public class TestingSerialRpcService implements RpcService {
                return executorService;
        }
 
+       public ScheduledExecutor getScheduledExecutor() {
+               return scheduledExecutorServiceAdapter;
+       }
+
        @Override
        public void stopService() {
                executorService.shutdown();
+
+               scheduledExecutorService.shutdown();
+
+               boolean terminated = false;
+
+               try {
+                       terminated = 
scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS);
+               } catch (InterruptedException e) {
+                       Thread.currentThread().interrupt();
+               }
+
+               if (!terminated) {
+                       List<Runnable> runnables = 
scheduledExecutorService.shutdownNow();
+
+                       for (Runnable runnable : runnables) {
+                               runnable.run();
+                       }
+               }
+
                registeredConnections.clear();
                terminationFuture.complete(null);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 7c8defa..eb71287 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.util.TestLogger;
 
@@ -30,13 +31,16 @@ import org.junit.AfterClass;
 import org.junit.Test;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class AkkaRpcServiceTest extends TestLogger {
 
@@ -149,4 +153,160 @@ public class AkkaRpcServiceTest extends TestLogger {
 
                terminationFuture.get();
        }
+
+       /**
+        * Tests a simple scheduled runnable being executed by the RPC services 
scheduled executor
+        * service.
+        */
+       @Test(timeout = 1000)
+       public void testScheduledExecutorServiceSimpleSchedule() throws 
ExecutionException, InterruptedException {
+               ScheduledExecutor scheduledExecutor = 
akkaRpcService.getScheduledExecutor();
+
+               final OneShotLatch latch = new OneShotLatch();
+
+               ScheduledFuture<?> future = scheduledExecutor.schedule(
+                       new Runnable() {
+                               @Override
+                               public void run() {
+                                       latch.trigger();
+                               }
+                       },
+                       10L,
+                       TimeUnit.MILLISECONDS);
+
+               future.get();
+
+               // once the future is completed, then the latch should have 
been triggered
+               assertTrue(latch.isTriggered());
+       }
+
+       /**
+        * Tests that the RPC service's scheduled executor service can execute 
runnables at a fixed
+        * rate.
+        */
+       @Test(timeout = 1000)
+       public void testScheduledExecutorServicePeriodicSchedule() throws 
ExecutionException, InterruptedException {
+               ScheduledExecutor scheduledExecutor = 
akkaRpcService.getScheduledExecutor();
+
+               final int tries = 4;
+               final long delay = 10L;
+               final CountDownLatch countDownLatch = new CountDownLatch(tries);
+
+               long currentTime = System.nanoTime();
+
+               ScheduledFuture<?> future = 
scheduledExecutor.scheduleAtFixedRate(
+                       new Runnable() {
+                               @Override
+                               public void run() {
+                                       countDownLatch.countDown();
+                               }
+                       },
+                       delay,
+                       delay,
+                       TimeUnit.MILLISECONDS);
+
+               assertTrue(!future.isDone());
+
+               countDownLatch.await();
+
+               // the future should not complete since we have a periodic task
+               assertTrue(!future.isDone());
+
+               long finalTime = System.nanoTime() - currentTime;
+
+               // the processing should have taken at least delay times the 
number of count downs.
+               assertTrue(finalTime >= tries * delay);
+
+               future.cancel(true);
+       }
+
+       /**
+        * Tests that the RPC service's scheduled executor service can execute 
runnable with a fixed
+        * delay.
+        */
+       @Test(timeout = 1000)
+       public void testScheduledExecutorServiceWithFixedDelaySchedule() throws 
ExecutionException, InterruptedException {
+               ScheduledExecutor scheduledExecutor = 
akkaRpcService.getScheduledExecutor();
+
+               final int tries = 4;
+               final long delay = 10L;
+               final CountDownLatch countDownLatch = new CountDownLatch(tries);
+
+               long currentTime = System.nanoTime();
+
+               ScheduledFuture<?> future = 
scheduledExecutor.scheduleWithFixedDelay(
+                       new Runnable() {
+                               @Override
+                               public void run() {
+                                       countDownLatch.countDown();
+                               }
+                       },
+                       delay,
+                       delay,
+                       TimeUnit.MILLISECONDS);
+
+               assertTrue(!future.isDone());
+
+               countDownLatch.await();
+
+               // the future should not complete since we have a periodic task
+               assertTrue(!future.isDone());
+
+               long finalTime = System.nanoTime() - currentTime;
+
+               // the processing should have taken at least delay times the 
number of count downs.
+               assertTrue(finalTime >= tries * delay);
+
+               future.cancel(true);
+       }
+
+       /**
+        * Tests that canceling the returned future will stop the execution of 
the scheduled runnable.
+        */
+       @Test
+       public void testScheduledExecutorServiceCancelWithFixedDelay() throws 
InterruptedException {
+               ScheduledExecutor scheduledExecutor = 
akkaRpcService.getScheduledExecutor();
+
+               long delay = 10L;
+
+               final OneShotLatch futureTask = new OneShotLatch();
+               final OneShotLatch latch = new OneShotLatch();
+               final OneShotLatch shouldNotBeTriggeredLatch = new 
OneShotLatch();
+
+               ScheduledFuture<?> future = 
scheduledExecutor.scheduleWithFixedDelay(
+                       new Runnable() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               if (!futureTask.isTriggered()) {
+                                                       // first run
+                                                       futureTask.trigger();
+                                                       latch.await();
+                                               } else {
+                                                       
shouldNotBeTriggeredLatch.trigger();
+                                               }
+                                       } catch (InterruptedException e) {
+                                               // ignore
+                                       }
+                               }
+                       },
+                       delay,
+                       delay,
+                       TimeUnit.MILLISECONDS);
+
+               // wait until we're in the runnable
+               futureTask.await();
+
+               // cancel the scheduled future
+               future.cancel(false);
+
+               latch.trigger();
+
+               try {
+                       shouldNotBeTriggeredLatch.await(5 * delay, 
TimeUnit.MILLISECONDS);
+                       fail("The shouldNotBeTriggeredLatch should never be 
triggered.");
+               } catch (TimeoutException e) {
+                       // expected
+               }
+       }
 }

Reply via email to