Repository: flink Updated Branches: refs/heads/flip-6 6dc228fcf -> 7041f9344 (forced update)
http://git-wip-us.apache.org/repos/asf/flink/blob/ae42bde8/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index 01776ed..957453a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -18,16 +18,13 @@ package org.apache.flink.runtime.rpc; -import akka.dispatch.ExecutionContexts; import akka.dispatch.Futures; -import akka.util.Timeout; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.util.DirectExecutorService; import org.apache.flink.util.Preconditions; -import scala.concurrent.Await; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; @@ -37,6 +34,7 @@ import java.util.BitSet; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -67,8 +65,8 @@ public class TestingSerialRpcService implements RpcService { } @Override - public ExecutionContext getExecutionContext() { - return ExecutionContexts.fromExecutorService(executorService); + public Executor getExecutor() { + return executorService; } @Override @@ -94,7 +92,7 @@ public class TestingSerialRpcService implements RpcService { classLoader, new Class<?>[]{ rpcEndpoint.getSelfGatewayType(), - MainThreadExecutor.class, + MainThreadExecutable.class, StartStoppable.class, RpcGateway.class }, @@ -114,13 +112,13 @@ public class TestingSerialRpcService implements RpcService { if (clazz.isAssignableFrom(gateway.getClass())) { @SuppressWarnings("unchecked") C typedGateway = (C) gateway; - return Futures.successful(typedGateway); + return FlinkCompletableFuture.completed(typedGateway); } else { - return Futures.failed( + return FlinkCompletableFuture.completedExceptionally( new Exception("Gateway registered under " + address + " is not of type " + clazz)); } } else { - return Futures.failed(new Exception("No gateway registered under that name")); + return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name")); } } @@ -141,20 +139,20 @@ public class TestingSerialRpcService implements RpcService { registeredConnections.clear(); } - private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable { + private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutable, StartStoppable { private final T rpcEndpoint; /** default timeout for asks */ - private final Timeout timeout; + private final Time timeout; private final String address; private TestingSerialInvocationHandler(String address, T rpcEndpoint) { - this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS))); + this(address, rpcEndpoint, Time.seconds(10)); } - private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) { + private TestingSerialInvocationHandler(String address, T rpcEndpoint, Time timeout) { this.rpcEndpoint = rpcEndpoint; this.timeout = timeout; this.address = address; @@ -163,7 +161,7 @@ public class TestingSerialRpcService implements RpcService { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?> declaringClass = method.getDeclaringClass(); - if (declaringClass.equals(MainThreadExecutor.class) || + if (declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(RpcGateway.class)) { return method.invoke(this, args); @@ -171,7 +169,7 @@ public class TestingSerialRpcService implements RpcService { final String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); Annotation[][] parameterAnnotations = method.getParameterAnnotations(); - Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); + Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); final Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments( parameterTypes, @@ -201,13 +199,13 @@ public class TestingSerialRpcService implements RpcService { private Object handleRpcInvocationSync(final String methodName, final Class<?>[] parameterTypes, final Object[] args, - final Timeout futureTimeout) throws Exception { + final Time futureTimeout) throws Exception { final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes); Object result = rpcMethod.invoke(rpcEndpoint, args); if (result instanceof Future) { Future<?> future = (Future<?>) result; - return Await.result(future, futureTimeout.duration()); + return future.get(futureTimeout.getSize(), futureTimeout.getUnit()); } else { return result; } @@ -219,11 +217,11 @@ public class TestingSerialRpcService implements RpcService { } @Override - public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) { + public <V> Future<V> callAsync(Callable<V> callable, Time callTimeout) { try { - return Futures.successful(callable.call()); + return FlinkCompletableFuture.completed(callable.call()); } catch (Throwable e) { - return Futures.failed(e); + return FlinkCompletableFuture.completedExceptionally(e); } } @@ -281,18 +279,18 @@ public class TestingSerialRpcService implements RpcService { * has been found * @return Timeout extracted from the array of arguments or the default timeout */ - private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, - Timeout defaultTimeout) { + private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, + Time defaultTimeout) { if (args != null) { Preconditions.checkArgument(parameterAnnotations.length == args.length); for (int i = 0; i < parameterAnnotations.length; i++) { if (isRpcTimeout(parameterAnnotations[i])) { - if (args[i] instanceof FiniteDuration) { - return new Timeout((FiniteDuration) args[i]); + if (args[i] instanceof Time) { + return (Time) args[i]; } else { throw new RuntimeException("The rpc timeout parameter must be of type " + - FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() + + Time.class.getName() + ". The type " + args[i].getClass().getName() + " is not supported."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ae42bde8/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index a6ceb91..5624d12 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -19,8 +19,9 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; -import akka.util.Timeout; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcMethod; @@ -30,13 +31,12 @@ import org.apache.flink.util.TestLogger; import org.hamcrest.core.Is; import org.junit.AfterClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class AkkaRpcActorTest extends TestLogger { @@ -47,7 +47,7 @@ public class AkkaRpcActorTest extends TestLogger { private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); - private static Timeout timeout = new Timeout(10000, TimeUnit.MILLISECONDS); + private static Time timeout = Time.milliseconds(10000L); private static AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, timeout); @@ -69,7 +69,7 @@ public class AkkaRpcActorTest extends TestLogger { Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class); - DummyRpcGateway rpcGateway = Await.result(futureRpcGateway, timeout.duration()); + DummyRpcGateway rpcGateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit()); assertEquals(rpcEndpoint.getAddress(), rpcGateway.getAddress()); } @@ -82,11 +82,12 @@ public class AkkaRpcActorTest extends TestLogger { Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class); try { - DummyRpcGateway gateway = Await.result(futureRpcGateway, timeout.duration()); + DummyRpcGateway gateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit()); fail("The rpc connection resolution should have failed."); - } catch (RpcConnectionException exception) { + } catch (ExecutionException exception) { // we're expecting a RpcConnectionException + assertTrue(exception.getCause() instanceof RpcConnectionException); } } @@ -111,7 +112,7 @@ public class AkkaRpcActorTest extends TestLogger { // now process the rpc rpcEndpoint.start(); - Integer actualValue = Await.result(result, timeout.duration()); + Integer actualValue = result.get(timeout.getSize(), timeout.getUnit()); assertThat("The new foobar value should have been returned.", actualValue, Is.is(expectedValue)); http://git-wip-us.apache.org/repos/asf/flink/blob/ae42bde8/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index f55069e..4e9e518 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; -import akka.util.Timeout; +import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.util.TestLogger; @@ -40,7 +40,7 @@ public class AkkaRpcServiceTest extends TestLogger { private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); private static AkkaRpcService akkaRpcService = - new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS)); + new AkkaRpcService(actorSystem, Time.milliseconds(10000)); @AfterClass public static void shutdown() { http://git-wip-us.apache.org/repos/asf/flink/blob/ae42bde8/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java index 9ffafda..9ec1f7e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java @@ -18,8 +18,7 @@ package org.apache.flink.runtime.rpc.akka; -import akka.util.Timeout; - +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.rpc.RpcEndpoint; @@ -30,8 +29,6 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.TestLogger; import org.junit.Test; -import java.util.concurrent.TimeUnit; - import static org.junit.Assert.assertTrue; public class MainThreadValidationTest extends TestLogger { @@ -48,7 +45,7 @@ public class MainThreadValidationTest extends TestLogger { // actual test AkkaRpcService akkaRpcService = new AkkaRpcService( AkkaUtils.createDefaultActorSystem(), - new Timeout(10000, TimeUnit.MILLISECONDS)); + Time.milliseconds(10000)); try { TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService); http://git-wip-us.apache.org/repos/asf/flink/blob/ae42bde8/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java index 9d2ed99..0d5dc28 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java @@ -19,10 +19,11 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; -import akka.util.Timeout; import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcMethod; @@ -32,13 +33,9 @@ import org.hamcrest.core.Is; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.io.IOException; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -52,7 +49,7 @@ public class MessageSerializationTest extends TestLogger { private static AkkaRpcService akkaRpcService1; private static AkkaRpcService akkaRpcService2; - private static final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS); + private static final Time timeout = Time.seconds(10L); private static final int maxFrameSize = 32000; @BeforeClass @@ -63,8 +60,8 @@ public class MessageSerializationTest extends TestLogger { actorSystem1 = AkkaUtils.createActorSystem(modifiedAkkaConfig); actorSystem2 = AkkaUtils.createActorSystem(modifiedAkkaConfig); - akkaRpcService1 = new AkkaRpcService(actorSystem1, new Timeout(timeout)); - akkaRpcService2 = new AkkaRpcService(actorSystem2, new Timeout(timeout)); + akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout); + akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout); } @AfterClass @@ -113,7 +110,7 @@ public class MessageSerializationTest extends TestLogger { Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class); - TestGateway remoteGateway = Await.result(remoteGatewayFuture, timeout); + TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit()); remoteGateway.foobar(new Object()); @@ -134,7 +131,7 @@ public class MessageSerializationTest extends TestLogger { Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class); - TestGateway remoteGateway = Await.result(remoteGatewayFuture, timeout); + TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit()); int expected = 42; @@ -158,7 +155,7 @@ public class MessageSerializationTest extends TestLogger { Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class); - TestGateway remoteGateway = Await.result(remoteGatewayFuture, timeout); + TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit()); int bufferSize = maxFrameSize + 1; byte[] buffer = new byte[bufferSize]; http://git-wip-us.apache.org/repos/asf/flink/blob/ae42bde8/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index a8d5bd7..09aab18 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.NonHaServices; @@ -29,8 +30,6 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; - import java.util.UUID; import static org.junit.Assert.*; @@ -56,7 +55,7 @@ public class TaskExecutorTest extends TestLogger { taskManager.start(); verify(rmGateway, timeout(5000)).registerTaskExecutor( - any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class)); + any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(Time.class)); } finally { rpc.stopService(); @@ -97,7 +96,7 @@ public class TaskExecutorTest extends TestLogger { testLeaderService.notifyListener(address1, leaderId1); verify(rmGateway1, timeout(5000)).registerTaskExecutor( - eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class)); + eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(Time.class)); assertNotNull(taskManager.getResourceManagerConnection()); // cancel the leader @@ -107,7 +106,7 @@ public class TaskExecutorTest extends TestLogger { testLeaderService.notifyListener(address2, leaderId2); verify(rmGateway2, timeout(5000)).registerTaskExecutor( - eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class)); + eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(Time.class)); assertNotNull(taskManager.getResourceManagerConnection()); } finally {
