http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java new file mode 100644 index 0000000..464a261 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.resourcemanager; + +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.rpc.jobmaster.JobMaster; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +/** + * {@link ResourceManager} rpc gateway interface. + */ +public interface ResourceManagerGateway extends RpcGateway { + + /** + * Register a {@link JobMaster} at the resource manager. + * + * @param jobMasterRegistration Job master registration information + * @param timeout Timeout for the future to complete + * @return Future registration response + */ + Future<RegistrationResponse> registerJobMaster( + JobMasterRegistration jobMasterRegistration, + @RpcTimeout FiniteDuration timeout); + + /** + * Register a {@link JobMaster} at the resource manager. + * + * @param jobMasterRegistration Job master registration information + * @return Future registration response + */ + Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration); + + /** + * Requests a slot from the resource manager. + * + * @param slotRequest Slot request + * @return Future slot assignment + */ + Future<SlotAssignment> requestSlot(SlotRequest slotRequest); +}
http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java new file mode 100644 index 0000000..86cd8b7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.resourcemanager; + +import java.io.Serializable; + +public class SlotAssignment implements Serializable{ + private static final long serialVersionUID = -6990813455942742322L; +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java new file mode 100644 index 0000000..d8fe268 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.resourcemanager; + +import java.io.Serializable; + +public class SlotRequest implements Serializable{ + private static final long serialVersionUID = -6586877187990445986L; +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java new file mode 100644 index 0000000..cdfc3bd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.taskexecutor; + +import akka.dispatch.ExecutionContexts$; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcService; +import scala.concurrent.ExecutionContext; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +/** + * TaskExecutor implementation. The task executor is responsible for the execution of multiple + * {@link org.apache.flink.runtime.taskmanager.Task}. + * + * It offers the following methods as part of its rpc interface to interact with him remotely: + * <ul> + * <li>{@link #executeTask(TaskDeploymentDescriptor)} executes a given task on the TaskExecutor</li> + * <li>{@link #cancelTask(ExecutionAttemptID)} cancels a given task identified by the {@link ExecutionAttemptID}</li> + * </ul> + */ +public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { + private final ExecutionContext executionContext; + private final Set<ExecutionAttemptID> tasks = new HashSet<>(); + + public TaskExecutor(RpcService rpcService, ExecutorService executorService) { + super(rpcService); + this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(executorService); + } + + /** + * Execute the given task on the task executor. The task is described by the provided + * {@link TaskDeploymentDescriptor}. + * + * @param taskDeploymentDescriptor Descriptor for the task to be executed + * @return Acknowledge the start of the task execution + */ + @RpcMethod + public Acknowledge executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) { + tasks.add(taskDeploymentDescriptor.getExecutionId()); + return Acknowledge.get(); + } + + /** + * Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then + * the method throws an {@link Exception}. + * + * @param executionAttemptId Execution attempt ID identifying the task to be canceled. + * @return Acknowledge the task canceling + * @throws Exception if the task with the given execution attempt id could not be found + */ + @RpcMethod + public Acknowledge cancelTask(ExecutionAttemptID executionAttemptId) throws Exception { + if (tasks.contains(executionAttemptId)) { + return Acknowledge.get(); + } else { + throw new Exception("Could not find task."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java new file mode 100644 index 0000000..450423e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.taskexecutor; + +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.RpcGateway; +import scala.concurrent.Future; + +/** + * {@link TaskExecutor} rpc gateway interface + */ +public interface TaskExecutorGateway extends RpcGateway { + /** + * Execute the given task on the task executor. The task is described by the provided + * {@link TaskDeploymentDescriptor}. + * + * @param taskDeploymentDescriptor Descriptor for the task to be executed + * @return Future acknowledge of the start of the task execution + */ + Future<Acknowledge> executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor); + + /** + * Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then + * the method throws an {@link Exception}. + * + * @param executionAttemptId Execution attempt ID identifying the task to be canceled. + * @return Future acknowledge of the task canceling + */ + Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId); +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java index ca09634..ce57fe6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java @@ -31,7 +31,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.TestingResourceManager; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java new file mode 100644 index 0000000..0ded25e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.reflections.Reflections; +import scala.concurrent.Future; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class RpcCompletenessTest extends TestLogger { + private static final Class<?> futureClass = Future.class; + + @Test + public void testRpcCompleteness() { + Reflections reflections = new Reflections("org.apache.flink"); + + Set<Class<? extends RpcEndpoint>> classes = reflections.getSubTypesOf(RpcEndpoint.class); + + Class<? extends RpcEndpoint> c; + + for (Class<? extends RpcEndpoint> rpcEndpoint :classes){ + c = rpcEndpoint; + Type superClass = c.getGenericSuperclass(); + + Class<?> rpcGatewayType = extractTypeParameter(superClass, 0); + + if (rpcGatewayType != null) { + checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType); + } else { + fail("Could not retrieve the rpc gateway class for the given rpc endpoint class " + rpcEndpoint.getName()); + } + } + } + + private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<? extends RpcGateway> rpcGateway) { + Method[] gatewayMethods = rpcGateway.getDeclaredMethods(); + Method[] serverMethods = rpcEndpoint.getDeclaredMethods(); + + Map<String, Set<Method>> rpcMethods = new HashMap<>(); + Set<Method> unmatchedRpcMethods = new HashSet<>(); + + for (Method serverMethod : serverMethods) { + if (serverMethod.isAnnotationPresent(RpcMethod.class)) { + if (rpcMethods.containsKey(serverMethod.getName())) { + Set<Method> methods = rpcMethods.get(serverMethod.getName()); + methods.add(serverMethod); + + rpcMethods.put(serverMethod.getName(), methods); + } else { + Set<Method> methods = new HashSet<>(); + methods.add(serverMethod); + + rpcMethods.put(serverMethod.getName(), methods); + } + + unmatchedRpcMethods.add(serverMethod); + } + } + + for (Method gatewayMethod : gatewayMethods) { + assertTrue( + "The rpc endpoint " + rpcEndpoint.getName() + " does not contain a RpcMethod " + + "annotated method with the same name and signature " + + generateEndpointMethodSignature(gatewayMethod) + ".", + rpcMethods.containsKey(gatewayMethod.getName())); + + checkGatewayMethod(gatewayMethod); + + if (!matchGatewayMethodWithEndpoint(gatewayMethod, rpcMethods.get(gatewayMethod.getName()), unmatchedRpcMethods)) { + fail("Could not find a RpcMethod annotated method in rpc endpoint " + + rpcEndpoint.getName() + " matching the rpc gateway method " + + generateEndpointMethodSignature(gatewayMethod) + " defined in the rpc gateway " + + rpcGateway.getName() + "."); + } + } + + if (!unmatchedRpcMethods.isEmpty()) { + StringBuilder builder = new StringBuilder(); + + for (Method unmatchedRpcMethod : unmatchedRpcMethods) { + builder.append(unmatchedRpcMethod).append("\n"); + } + + fail("The rpc endpoint " + rpcEndpoint.getName() + " contains rpc methods which " + + "are not matched to gateway methods of " + rpcGateway.getName() + ":\n" + + builder.toString()); + } + } + + /** + * Checks whether the gateway method fulfills the gateway method requirements. + * <ul> + * <li>It checks whether the return type is void or a {@link Future} wrapping the actual result. </li> + * <li>It checks that the method's parameter list contains at most one parameter annotated with {@link RpcTimeout}.</li> + * </ul> + * + * @param gatewayMethod Gateway method to check + */ + private void checkGatewayMethod(Method gatewayMethod) { + if (!gatewayMethod.getReturnType().equals(Void.TYPE)) { + assertTrue( + "The return type of method " + gatewayMethod.getName() + " in the rpc gateway " + + gatewayMethod.getDeclaringClass().getName() + " is non void and not a " + + "future. Non-void return types have to be returned as a future.", + gatewayMethod.getReturnType().equals(futureClass)); + } + + Annotation[][] parameterAnnotations = gatewayMethod.getParameterAnnotations(); + int rpcTimeoutParameters = 0; + + for (Annotation[] parameterAnnotation : parameterAnnotations) { + for (Annotation annotation : parameterAnnotation) { + if (annotation.equals(RpcTimeout.class)) { + rpcTimeoutParameters++; + } + } + } + + assertTrue("The gateway method " + gatewayMethod + " must have at most one RpcTimeout " + + "annotated parameter.", rpcTimeoutParameters <= 1); + } + + /** + * Checks whether we find a matching overloaded version for the gateway method among the methods + * with the same name in the rpc endpoint. + * + * @param gatewayMethod Gateway method + * @param endpointMethods Set of rpc methods on the rpc endpoint with the same name as the gateway + * method + * @param unmatchedRpcMethods Set of unmatched rpc methods on the endpoint side (so far) + */ + private boolean matchGatewayMethodWithEndpoint(Method gatewayMethod, Set<Method> endpointMethods, Set<Method> unmatchedRpcMethods) { + for (Method endpointMethod : endpointMethods) { + if (checkMethod(gatewayMethod, endpointMethod)) { + unmatchedRpcMethods.remove(endpointMethod); + return true; + } + } + + return false; + } + + private boolean checkMethod(Method gatewayMethod, Method endpointMethod) { + Class<?>[] gatewayParameterTypes = gatewayMethod.getParameterTypes(); + Annotation[][] gatewayParameterAnnotations = gatewayMethod.getParameterAnnotations(); + + Class<?>[] endpointParameterTypes = endpointMethod.getParameterTypes(); + + List<Class<?>> filteredGatewayParameterTypes = new ArrayList<>(); + + assertEquals(gatewayParameterTypes.length, gatewayParameterAnnotations.length); + + // filter out the RpcTimeout parameters + for (int i = 0; i < gatewayParameterTypes.length; i++) { + if (!isRpcTimeout(gatewayParameterAnnotations[i])) { + filteredGatewayParameterTypes.add(gatewayParameterTypes[i]); + } + } + + if (filteredGatewayParameterTypes.size() != endpointParameterTypes.length) { + return false; + } else { + // check the parameter types + for (int i = 0; i < filteredGatewayParameterTypes.size(); i++) { + if (!checkType(filteredGatewayParameterTypes.get(i), endpointParameterTypes[i])) { + return false; + } + } + + // check the return types + if (endpointMethod.getReturnType() == void.class) { + if (gatewayMethod.getReturnType() != void.class) { + return false; + } + } else { + // has return value. The gateway method should be wrapped in a future + Class<?> futureClass = gatewayMethod.getReturnType(); + + // sanity check that the return type of a gateway method must be void or a future + if (!futureClass.equals(RpcCompletenessTest.futureClass)) { + return false; + } else { + Class<?> valueClass = extractTypeParameter(futureClass, 0); + + if (endpointMethod.getReturnType().equals(futureClass)) { + Class<?> rpcEndpointValueClass = extractTypeParameter(endpointMethod.getReturnType(), 0); + + // check if we have the same future value types + if (valueClass != null && rpcEndpointValueClass != null && !checkType(valueClass, rpcEndpointValueClass)) { + return false; + } + } else { + if (valueClass != null && !checkType(valueClass, endpointMethod.getReturnType())) { + return false; + } + } + } + } + + return gatewayMethod.getName().equals(endpointMethod.getName()); + } + } + + private boolean checkType(Class<?> firstType, Class<?> secondType) { + return firstType.equals(secondType); + } + + /** + * Generates from a gateway rpc method signature the corresponding rpc endpoint signature. + * + * For example the {@link RpcTimeout} annotation adds an additional parameter to the gateway + * signature which is not relevant on the server side. + * + * @param method Method to generate the signature string for + * @return String of the respective server side rpc method signature + */ + private String generateEndpointMethodSignature(Method method) { + StringBuilder builder = new StringBuilder(); + + if (method.getReturnType().equals(Void.TYPE)) { + builder.append("void").append(" "); + } else if (method.getReturnType().equals(futureClass)) { + Class<?> valueClass = extractTypeParameter(method.getGenericReturnType(), 0); + + builder + .append(futureClass.getSimpleName()) + .append("<") + .append(valueClass != null ? valueClass.getSimpleName() : "") + .append(">"); + + if (valueClass != null) { + builder.append("/").append(valueClass.getSimpleName()); + } + + builder.append(" "); + } else { + return "Invalid rpc method signature."; + } + + builder.append(method.getName()).append("("); + + Class<?>[] parameterTypes = method.getParameterTypes(); + Annotation[][] parameterAnnotations = method.getParameterAnnotations(); + + assertEquals(parameterTypes.length, parameterAnnotations.length); + + for (int i = 0; i < parameterTypes.length; i++) { + // filter out the RpcTimeout parameters + if (!isRpcTimeout(parameterAnnotations[i])) { + builder.append(parameterTypes[i].getName()); + + if (i < parameterTypes.length -1) { + builder.append(", "); + } + } + } + + builder.append(")"); + + return builder.toString(); + } + + private Class<?> extractTypeParameter(Type genericType, int position) { + if (genericType instanceof ParameterizedType) { + ParameterizedType parameterizedType = (ParameterizedType) genericType; + + Type[] typeArguments = parameterizedType.getActualTypeArguments(); + + if (position < 0 || position >= typeArguments.length) { + throw new IndexOutOfBoundsException("The generic type " + + parameterizedType.getRawType() + " only has " + typeArguments.length + + " type arguments."); + } else { + Type typeArgument = typeArguments[position]; + + if (typeArgument instanceof Class<?>) { + return (Class<?>) typeArgument; + } else { + return null; + } + } + } else { + return null; + } + } + + private boolean isRpcTimeout(Annotation[] annotations) { + for (Annotation annotation : annotations) { + if (annotation.annotationType().equals(RpcTimeout.class)) { + return true; + } + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java new file mode 100644 index 0000000..c5bac94 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import akka.actor.ActorSystem; +import akka.util.Timeout; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.rpc.jobmaster.JobMaster; +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AkkaRpcServiceTest extends TestLogger { + + /** + * Tests that the {@link JobMaster} can connect to the {@link ResourceManager} using the + * {@link AkkaRpcService}. + */ + @Test + public void testJobMasterResourceManagerRegistration() throws Exception { + Timeout akkaTimeout = new Timeout(10, TimeUnit.SECONDS); + ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem(); + AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, akkaTimeout); + AkkaRpcService akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaTimeout); + ExecutorService executorService = new ForkJoinPool(); + + ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService); + JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService); + + resourceManager.start(); + + ResourceManagerGateway rm = resourceManager.getSelf(); + + assertTrue(rm instanceof AkkaGateway); + + AkkaGateway akkaClient = (AkkaGateway) rm; + + jobMaster.start(); + jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getActorRef())); + + // wait for successful registration + FiniteDuration timeout = new FiniteDuration(20, TimeUnit.SECONDS); + Deadline deadline = timeout.fromNow(); + + while (deadline.hasTimeLeft() && !jobMaster.isConnected()) { + Thread.sleep(100); + } + + assertFalse(deadline.isOverdue()); + + jobMaster.shutDown(); + resourceManager.shutDown(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java new file mode 100644 index 0000000..c143527 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.taskexecutor; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.util.DirectExecutorService; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.net.URL; +import java.util.Collections; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +public class TaskExecutorTest extends TestLogger { + + /** + * Tests that we can deploy and cancel a task on the TaskExecutor without exceptions + */ + @Test + public void testTaskExecution() throws Exception { + RpcService testingRpcService = mock(RpcService.class); + DirectExecutorService directExecutorService = null; + TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService); + + TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( + new JobID(), + "Test job", + new JobVertexID(), + new ExecutionAttemptID(), + new SerializedValue<ExecutionConfig>(null), + "Test task", + 0, + 1, + 0, + new Configuration(), + new Configuration(), + "Invokable", + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + 0 + ); + + Acknowledge ack = taskExecutor.executeTask(tdd); + + ack = taskExecutor.cancelTask(tdd.getExecutionId()); + } + + /** + * Tests that cancelling a non-existing task will return an exception + */ + @Test(expected=Exception.class) + public void testWrongTaskCancellation() throws Exception { + RpcService testingRpcService = mock(RpcService.class); + DirectExecutorService directExecutorService = null; + TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService); + + taskExecutor.cancelTask(new ExecutionAttemptID()); + + fail("The cancellation should have thrown an exception."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java new file mode 100644 index 0000000..1d7c971 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class DirectExecutorService implements ExecutorService { + private boolean _shutdown = false; + + @Override + public void shutdown() { + _shutdown = true; + } + + @Override + public List<Runnable> shutdownNow() { + _shutdown = true; + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return _shutdown; + } + + @Override + public boolean isTerminated() { + return _shutdown; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return _shutdown; + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + try { + T result = task.call(); + + return new CompletedFuture<>(result, null); + } catch (Exception e) { + return new CompletedFuture<>(null, e); + } + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + task.run(); + + return new CompletedFuture<>(result, null); + } + + @Override + public Future<?> submit(Runnable task) { + task.run(); + return new CompletedFuture<>(null, null); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + ArrayList<Future<T>> result = new ArrayList<>(); + + for (Callable<T> task : tasks) { + try { + result.add(new CompletedFuture<T>(task.call(), null)); + } catch (Exception e) { + result.add(new CompletedFuture<T>(null, e)); + } + } + return result; + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + long end = System.currentTimeMillis() + unit.toMillis(timeout); + Iterator<? extends Callable<T>> iterator = tasks.iterator(); + ArrayList<Future<T>> result = new ArrayList<>(); + + while (end > System.currentTimeMillis() && iterator.hasNext()) { + Callable<T> callable = iterator.next(); + + try { + result.add(new CompletedFuture<T>(callable.call(), null)); + } catch (Exception e) { + result.add(new CompletedFuture<T>(null, e)); + } + } + + while(iterator.hasNext()) { + iterator.next(); + result.add(new Future<T>() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return true; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + throw new CancellationException("Task has been cancelled."); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + throw new CancellationException("Task has been cancelled."); + } + }); + } + + return result; + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + Exception exception = null; + + for (Callable<T> task : tasks) { + try { + return task.call(); + } catch (Exception e) { + // try next task + exception = e; + } + } + + throw new ExecutionException("No tasks finished successfully.", exception); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long end = System.currentTimeMillis() + unit.toMillis(timeout); + Exception exception = null; + + Iterator<? extends Callable<T>> iterator = tasks.iterator(); + + while (end > System.currentTimeMillis() && iterator.hasNext()) { + Callable<T> callable = iterator.next(); + + try { + return callable.call(); + } catch (Exception e) { + // ignore exception and try next + exception = e; + } + } + + if (iterator.hasNext()) { + throw new TimeoutException("Could not finish execution of tasks within time."); + } else { + throw new ExecutionException("No tasks finished successfully.", exception); + } + } + + @Override + public void execute(Runnable command) { + command.run(); + } + + public static class CompletedFuture<V> implements Future<V> { + private final V value; + private final Exception exception; + + public CompletedFuture(V value, Exception exception) { + this.value = value; + this.exception = exception; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public V get() throws InterruptedException, ExecutionException { + if (exception != null) { + throw new ExecutionException(exception); + } else { + return value; + } + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return get(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index b09db1f..3202a9f 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -202,7 +202,6 @@ under the License. <dependency> <groupId>org.reflections</groupId> <artifactId>reflections</artifactId> - <version>0.9.10</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0e8c3b7..f077ba9 100644 --- a/pom.xml +++ b/pom.xml @@ -405,6 +405,13 @@ under the License. <artifactId>jackson-annotations</artifactId> <version>${jackson.version}</version> </dependency> + + <dependency> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> + <version>0.9.10</version> + <scope>test</scope> + </dependency> </dependencies> </dependencyManagement>