Repository: flink
Updated Branches:
  refs/heads/flip-6 04fbdb3c5 -> 31a091b93


http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 01776ed..957453a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -18,16 +18,13 @@
 
 package org.apache.flink.runtime.rpc;
 
-import akka.dispatch.ExecutionContexts;
 import akka.dispatch.Futures;
-import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.util.Preconditions;
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
@@ -37,6 +34,7 @@ import java.util.BitSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -67,8 +65,8 @@ public class TestingSerialRpcService implements RpcService {
        }
 
        @Override
-       public ExecutionContext getExecutionContext() {
-               return ExecutionContexts.fromExecutorService(executorService);
+       public Executor getExecutor() {
+               return executorService;
        }
 
        @Override
@@ -94,7 +92,7 @@ public class TestingSerialRpcService implements RpcService {
                        classLoader,
                        new Class<?>[]{
                                rpcEndpoint.getSelfGatewayType(),
-                               MainThreadExecutor.class,
+                               MainThreadExecutable.class,
                                StartStoppable.class,
                                RpcGateway.class
                        },
@@ -114,13 +112,13 @@ public class TestingSerialRpcService implements 
RpcService {
                        if (clazz.isAssignableFrom(gateway.getClass())) {
                                @SuppressWarnings("unchecked")
                                C typedGateway = (C) gateway;
-                               return Futures.successful(typedGateway);
+                               return 
FlinkCompletableFuture.completed(typedGateway);
                        } else {
-                               return Futures.failed(
+                               return 
FlinkCompletableFuture.completedExceptionally(
                                        new Exception("Gateway registered under 
" + address + " is not of type " + clazz));
                        }
                } else {
-                       return Futures.failed(new Exception("No gateway 
registered under that name"));
+                       return 
FlinkCompletableFuture.completedExceptionally(new Exception("No gateway 
registered under that name"));
                }
        }
 
@@ -141,20 +139,20 @@ public class TestingSerialRpcService implements 
RpcService {
                registeredConnections.clear();
        }
 
-       private static final class TestingSerialInvocationHandler<C extends 
RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, 
MainThreadExecutor, StartStoppable {
+       private static final class TestingSerialInvocationHandler<C extends 
RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, 
MainThreadExecutable, StartStoppable {
 
                private final T rpcEndpoint;
 
                /** default timeout for asks */
-               private final Timeout timeout;
+               private final Time timeout;
 
                private final String address;
 
                private TestingSerialInvocationHandler(String address, T 
rpcEndpoint) {
-                       this(address, rpcEndpoint, new Timeout(new 
FiniteDuration(10, TimeUnit.SECONDS)));
+                       this(address, rpcEndpoint, Time.seconds(10));
                }
 
-               private TestingSerialInvocationHandler(String address, T 
rpcEndpoint, Timeout timeout) {
+               private TestingSerialInvocationHandler(String address, T 
rpcEndpoint, Time timeout) {
                        this.rpcEndpoint = rpcEndpoint;
                        this.timeout = timeout;
                        this.address = address;
@@ -163,7 +161,7 @@ public class TestingSerialRpcService implements RpcService {
                @Override
                public Object invoke(Object proxy, Method method, Object[] 
args) throws Throwable {
                        Class<?> declaringClass = method.getDeclaringClass();
-                       if (declaringClass.equals(MainThreadExecutor.class) ||
+                       if (declaringClass.equals(MainThreadExecutable.class) ||
                                declaringClass.equals(Object.class) || 
declaringClass.equals(StartStoppable.class) ||
                                declaringClass.equals(RpcGateway.class)) {
                                return method.invoke(this, args);
@@ -171,7 +169,7 @@ public class TestingSerialRpcService implements RpcService {
                                final String methodName = method.getName();
                                Class<?>[] parameterTypes = 
method.getParameterTypes();
                                Annotation[][] parameterAnnotations = 
method.getParameterAnnotations();
-                               Timeout futureTimeout = 
extractRpcTimeout(parameterAnnotations, args, timeout);
+                               Time futureTimeout = 
extractRpcTimeout(parameterAnnotations, args, timeout);
 
                                final Tuple2<Class<?>[], Object[]> 
filteredArguments = filterArguments(
                                        parameterTypes,
@@ -201,13 +199,13 @@ public class TestingSerialRpcService implements 
RpcService {
                private Object handleRpcInvocationSync(final String methodName,
                        final Class<?>[] parameterTypes,
                        final Object[] args,
-                       final Timeout futureTimeout) throws Exception {
+                       final Time futureTimeout) throws Exception {
                        final Method rpcMethod = lookupRpcMethod(methodName, 
parameterTypes);
                        Object result = rpcMethod.invoke(rpcEndpoint, args);
 
                        if (result instanceof Future) {
                                Future<?> future = (Future<?>) result;
-                               return Await.result(future, 
futureTimeout.duration());
+                               return future.get(futureTimeout.getSize(), 
futureTimeout.getUnit());
                        } else {
                                return result;
                        }
@@ -219,11 +217,11 @@ public class TestingSerialRpcService implements 
RpcService {
                }
 
                @Override
-               public <V> Future<V> callAsync(Callable<V> callable, Timeout 
callTimeout) {
+               public <V> Future<V> callAsync(Callable<V> callable, Time 
callTimeout) {
                        try {
-                               return Futures.successful(callable.call());
+                               return 
FlinkCompletableFuture.completed(callable.call());
                        } catch (Throwable e) {
-                               return Futures.failed(e);
+                               return 
FlinkCompletableFuture.completedExceptionally(e);
                        }
                }
 
@@ -281,18 +279,18 @@ public class TestingSerialRpcService implements 
RpcService {
                 *                             has been found
                 * @return Timeout extracted from the array of arguments or the 
default timeout
                 */
-               private static Timeout extractRpcTimeout(Annotation[][] 
parameterAnnotations, Object[] args,
-                       Timeout defaultTimeout) {
+               private static Time extractRpcTimeout(Annotation[][] 
parameterAnnotations, Object[] args,
+                       Time defaultTimeout) {
                        if (args != null) {
                                
Preconditions.checkArgument(parameterAnnotations.length == args.length);
 
                                for (int i = 0; i < 
parameterAnnotations.length; i++) {
                                        if 
(isRpcTimeout(parameterAnnotations[i])) {
-                                               if (args[i] instanceof 
FiniteDuration) {
-                                                       return new 
Timeout((FiniteDuration) args[i]);
+                                               if (args[i] instanceof Time) {
+                                                       return (Time) args[i];
                                                } else {
                                                        throw new 
RuntimeException("The rpc timeout parameter must be of type " +
-                                                               
FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() +
+                                                               
Time.class.getName() + ". The type " + args[i].getClass().getName() +
                                                                " is not 
supported.");
                                                }
                                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index a6ceb91..5624d12 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -19,8 +19,9 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorSystem;
-import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
@@ -30,13 +31,12 @@ import org.apache.flink.util.TestLogger;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class AkkaRpcActorTest extends TestLogger {
@@ -47,7 +47,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
        private static ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
 
-       private static Timeout timeout = new Timeout(10000, 
TimeUnit.MILLISECONDS);
+       private static Time timeout = Time.milliseconds(10000L);
 
        private static AkkaRpcService akkaRpcService =
                new AkkaRpcService(actorSystem, timeout);
@@ -69,7 +69,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
                Future<DummyRpcGateway> futureRpcGateway = 
akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
 
-               DummyRpcGateway rpcGateway = Await.result(futureRpcGateway, 
timeout.duration());
+               DummyRpcGateway rpcGateway = 
futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
 
                assertEquals(rpcEndpoint.getAddress(), rpcGateway.getAddress());
        }
@@ -82,11 +82,12 @@ public class AkkaRpcActorTest extends TestLogger {
                Future<DummyRpcGateway> futureRpcGateway = 
akkaRpcService.connect("foobar", DummyRpcGateway.class);
 
                try {
-                       DummyRpcGateway gateway = 
Await.result(futureRpcGateway, timeout.duration());
+                       DummyRpcGateway gateway = 
futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
 
                        fail("The rpc connection resolution should have 
failed.");
-               } catch (RpcConnectionException exception) {
+               } catch (ExecutionException exception) {
                        // we're expecting a RpcConnectionException
+                       assertTrue(exception.getCause() instanceof 
RpcConnectionException);
                }
        }
 
@@ -111,7 +112,7 @@ public class AkkaRpcActorTest extends TestLogger {
                // now process the rpc
                rpcEndpoint.start();
 
-               Integer actualValue = Await.result(result, timeout.duration());
+               Integer actualValue = result.get(timeout.getSize(), 
timeout.getUnit());
 
                assertThat("The new foobar value should have been returned.", 
actualValue, Is.is(expectedValue));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index f55069e..4e9e518 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorSystem;
-import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.util.TestLogger;
@@ -40,7 +40,7 @@ public class AkkaRpcServiceTest extends TestLogger {
        private static ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
 
        private static AkkaRpcService akkaRpcService =
-                       new AkkaRpcService(actorSystem, new Timeout(10000, 
TimeUnit.MILLISECONDS));
+                       new AkkaRpcService(actorSystem, 
Time.milliseconds(10000));
 
        @AfterClass
        public static void shutdown() {

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index 9ffafda..9ec1f7e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import akka.util.Timeout;
-
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -30,8 +29,6 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
-import java.util.concurrent.TimeUnit;
-
 import static org.junit.Assert.assertTrue;
 
 public class MainThreadValidationTest extends TestLogger {
@@ -48,7 +45,7 @@ public class MainThreadValidationTest extends TestLogger {
                // actual test
                AkkaRpcService akkaRpcService = new AkkaRpcService(
                                AkkaUtils.createDefaultActorSystem(),
-                               new Timeout(10000, TimeUnit.MILLISECONDS));
+                               Time.milliseconds(10000));
 
                try {
                        TestEndpoint testEndpoint = new 
TestEndpoint(akkaRpcService);

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 9d2ed99..0d5dc28 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -19,10 +19,11 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorSystem;
-import akka.util.Timeout;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
@@ -32,13 +33,9 @@ import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
@@ -52,7 +49,7 @@ public class MessageSerializationTest extends TestLogger {
        private static AkkaRpcService akkaRpcService1;
        private static AkkaRpcService akkaRpcService2;
 
-       private static final FiniteDuration timeout = new FiniteDuration(10L, 
TimeUnit.SECONDS);
+       private static final Time timeout = Time.seconds(10L);
        private static final int maxFrameSize = 32000;
 
        @BeforeClass
@@ -63,8 +60,8 @@ public class MessageSerializationTest extends TestLogger {
                actorSystem1 = AkkaUtils.createActorSystem(modifiedAkkaConfig);
                actorSystem2 = AkkaUtils.createActorSystem(modifiedAkkaConfig);
 
-               akkaRpcService1 = new AkkaRpcService(actorSystem1, new 
Timeout(timeout));
-               akkaRpcService2 = new AkkaRpcService(actorSystem2, new 
Timeout(timeout));
+               akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout);
+               akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
        }
 
        @AfterClass
@@ -113,7 +110,7 @@ public class MessageSerializationTest extends TestLogger {
 
                Future<TestGateway> remoteGatewayFuture = 
akkaRpcService2.connect(address, TestGateway.class);
 
-               TestGateway remoteGateway = Await.result(remoteGatewayFuture, 
timeout);
+               TestGateway remoteGateway = 
remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
 
                remoteGateway.foobar(new Object());
 
@@ -134,7 +131,7 @@ public class MessageSerializationTest extends TestLogger {
 
                Future<TestGateway> remoteGatewayFuture = 
akkaRpcService2.connect(address, TestGateway.class);
 
-               TestGateway remoteGateway = Await.result(remoteGatewayFuture, 
timeout);
+               TestGateway remoteGateway = 
remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
 
                int expected = 42;
 
@@ -158,7 +155,7 @@ public class MessageSerializationTest extends TestLogger {
 
                Future<TestGateway> remoteGatewayFuture = 
akkaRpcService2.connect(address, TestGateway.class);
 
-               TestGateway remoteGateway = Await.result(remoteGatewayFuture, 
timeout);
+               TestGateway remoteGateway = 
remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
 
                int bufferSize = maxFrameSize + 1;
                byte[] buffer = new byte[bufferSize];

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index a8d5bd7..09aab18 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.NonHaServices;
@@ -29,8 +30,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.UUID;
 
 import static org.junit.Assert.*;
@@ -56,7 +55,7 @@ public class TaskExecutorTest extends TestLogger {
                        taskManager.start();
 
                        verify(rmGateway, timeout(5000)).registerTaskExecutor(
-                                       any(UUID.class), 
eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+                                       any(UUID.class), 
eq(taskManagerAddress), eq(resourceID), any(Time.class));
                }
                finally {
                        rpc.stopService();
@@ -97,7 +96,7 @@ public class TaskExecutorTest extends TestLogger {
                        testLeaderService.notifyListener(address1, leaderId1);
 
                        verify(rmGateway1, timeout(5000)).registerTaskExecutor(
-                                       eq(leaderId1), eq(taskManagerAddress), 
eq(resourceID), any(FiniteDuration.class));
+                                       eq(leaderId1), eq(taskManagerAddress), 
eq(resourceID), any(Time.class));
                        
assertNotNull(taskManager.getResourceManagerConnection());
 
                        // cancel the leader 
@@ -107,7 +106,7 @@ public class TaskExecutorTest extends TestLogger {
                        testLeaderService.notifyListener(address2, leaderId2);
 
                        verify(rmGateway2, timeout(5000)).registerTaskExecutor(
-                                       eq(leaderId2), eq(taskManagerAddress), 
eq(resourceID), any(FiniteDuration.class));
+                                       eq(leaderId2), eq(taskManagerAddress), 
eq(resourceID), any(Time.class));
                        
assertNotNull(taskManager.getResourceManagerConnection());
                }
                finally {

Reply via email to