http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index effa498..4abcdf4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -27,6 +28,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; @@ -59,7 +61,6 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException; @@ -105,7 +106,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * TaskExecutor implementation. The task executor is responsible for the execution of multiple * {@link Task}. */ -public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { +public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { public static final String TASK_MANAGER_NAME = "taskmanager"; @@ -288,48 +289,51 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { // Task lifecycle RPCs // ---------------------------------------------------------------------- - @RpcMethod - public Acknowledge submitTask(TaskDeploymentDescriptor tdd, UUID jobManagerLeaderId) throws TaskSubmissionException { + @Override + public CompletableFuture<Acknowledge> submitTask( + TaskDeploymentDescriptor tdd, + UUID jobManagerLeaderId, + Time timeout) { - // first, deserialize the pre-serialized information - final JobInformation jobInformation; - final TaskInformation taskInformation; try { - jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader()); - taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader()); - } - catch (IOException | ClassNotFoundException e) { - throw new TaskSubmissionException("Could not deserialize the job or task information.", e); - } + // first, deserialize the pre-serialized information + final JobInformation jobInformation; + final TaskInformation taskInformation; + try { + jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader()); + taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader()); + } catch (IOException | ClassNotFoundException e) { + throw new TaskSubmissionException("Could not deserialize the job or task information.", e); + } - final JobID jobId = jobInformation.getJobId(); - final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId); + final JobID jobId = jobInformation.getJobId(); + final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId); - if (jobManagerConnection == null) { - final String message = "Could not submit task because there is no JobManager " + - "associated for the job " + jobId + '.'; + if (jobManagerConnection == null) { + final String message = "Could not submit task because there is no JobManager " + + "associated for the job " + jobId + '.'; - log.debug(message); - throw new TaskSubmissionException(message); - } + log.debug(message); + throw new TaskSubmissionException(message); + } - if (!Objects.equals(jobManagerConnection.getLeaderId(), jobManagerLeaderId)) { - final String message = "Rejecting the task submission because the job manager leader id " + - jobManagerLeaderId + " does not match the expected job manager leader id " + - jobManagerConnection.getLeaderId() + '.'; + if (!Objects.equals(jobManagerConnection.getLeaderId(), jobManagerLeaderId)) { + final String message = "Rejecting the task submission because the job manager leader id " + + jobManagerLeaderId + " does not match the expected job manager leader id " + + jobManagerConnection.getLeaderId() + '.'; - log.debug(message); - throw new TaskSubmissionException(message); - } + log.debug(message); + throw new TaskSubmissionException(message); + } - if (!taskSlotTable.existsActiveSlot(jobId, tdd.getAllocationId())) { - final String message = "No task slot allocated for job ID " + jobId + - " and allocation ID " + tdd.getAllocationId() + '.'; - log.debug(message); - throw new TaskSubmissionException(message); - } + if (!taskSlotTable.existsActiveSlot(jobId, tdd.getAllocationId())) { + final String message = "No task slot allocated for job ID " + jobId + + " and allocation ID " + tdd.getAllocationId() + '.'; + log.debug(message); + throw new TaskSubmissionException(message); + } - TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob( + TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob( jobInformation.getJobId(), jobInformation.getJobName(), taskInformation.getJobVertexId(), @@ -338,7 +342,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { tdd.getSubtaskIndex(), tdd.getAttemptNumber()); - InputSplitProvider inputSplitProvider = new RpcInputSplitProvider( + InputSplitProvider inputSplitProvider = new RpcInputSplitProvider( jobManagerConnection.getLeaderId(), jobManagerConnection.getJobManagerGateway(), jobInformation.getJobId(), @@ -346,96 +350,100 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { tdd.getExecutionAttemptId(), taskManagerConfiguration.getTimeout()); - TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions(); - CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder(); - LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager(); - ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier(); - PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker(); - - Task task = new Task( - jobInformation, - taskInformation, - tdd.getExecutionAttemptId(), - tdd.getAllocationId(), - tdd.getSubtaskIndex(), - tdd.getAttemptNumber(), - tdd.getProducedPartitions(), - tdd.getInputGates(), - tdd.getTargetSlotNumber(), - tdd.getTaskStateHandles(), - memoryManager, - ioManager, - networkEnvironment, - broadcastVariableManager, - taskManagerActions, - inputSplitProvider, - checkpointResponder, - libraryCache, - fileCache, - taskManagerConfiguration, - taskMetricGroup, - resultPartitionConsumableNotifier, - partitionStateChecker, - getRpcService().getExecutor()); + TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions(); + CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder(); + LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager(); + ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier(); + PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker(); - log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); - - boolean taskAdded; + Task task = new Task( + jobInformation, + taskInformation, + tdd.getExecutionAttemptId(), + tdd.getAllocationId(), + tdd.getSubtaskIndex(), + tdd.getAttemptNumber(), + tdd.getProducedPartitions(), + tdd.getInputGates(), + tdd.getTargetSlotNumber(), + tdd.getTaskStateHandles(), + memoryManager, + ioManager, + networkEnvironment, + broadcastVariableManager, + taskManagerActions, + inputSplitProvider, + checkpointResponder, + libraryCache, + fileCache, + taskManagerConfiguration, + taskMetricGroup, + resultPartitionConsumableNotifier, + partitionStateChecker, + getRpcService().getExecutor()); + + log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); + + boolean taskAdded; - try { - taskAdded = taskSlotTable.addTask(task); - } catch (SlotNotFoundException | SlotNotActiveException e) { - throw new TaskSubmissionException("Could not submit task.", e); - } + try { + taskAdded = taskSlotTable.addTask(task); + } catch (SlotNotFoundException | SlotNotActiveException e) { + throw new TaskSubmissionException("Could not submit task.", e); + } - if (taskAdded) { - task.startTaskThread(); + if (taskAdded) { + task.startTaskThread(); - return Acknowledge.get(); - } else { - final String message = "TaskManager already contains a task for id " + - task.getExecutionId() + '.'; + return CompletableFuture.completedFuture(Acknowledge.get()); + } else { + final String message = "TaskManager already contains a task for id " + + task.getExecutionId() + '.'; - log.debug(message); - throw new TaskSubmissionException(message); + log.debug(message); + throw new TaskSubmissionException(message); + } + } catch (TaskSubmissionException e) { + return FutureUtils.completedExceptionally(e); } } - @RpcMethod - public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) throws TaskException { + @Override + public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { try { task.cancelExecution(); - return Acknowledge.get(); + return CompletableFuture.completedFuture(Acknowledge.get()); } catch (Throwable t) { - throw new TaskException("Cannot cancel task for execution " + executionAttemptID + '.', t); + return FutureUtils.completedExceptionally( + new TaskException("Cannot cancel task for execution " + executionAttemptID + '.', t)); } } else { final String message = "Cannot find task to stop for execution " + executionAttemptID + '.'; log.debug(message); - throw new TaskException(message); + return FutureUtils.completedExceptionally(new TaskException(message)); } } - @RpcMethod - public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) throws TaskException { + @Override + public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) { final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { try { task.stopExecution(); - return Acknowledge.get(); + return CompletableFuture.completedFuture(Acknowledge.get()); } catch (Throwable t) { - throw new TaskException("Cannot stop task for execution " + executionAttemptID + '.', t); + return FutureUtils.completedExceptionally(new TaskException("Cannot stop task for execution " + executionAttemptID + '.', t)); } } else { final String message = "Cannot find task to stop for execution " + executionAttemptID + '.'; log.debug(message); - throw new TaskException(message); + return FutureUtils.completedExceptionally(new TaskException(message)); } } @@ -443,8 +451,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { // Partition lifecycle RPCs // ---------------------------------------------------------------------- - @RpcMethod - public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos) throws PartitionException { + @Override + public CompletableFuture<Acknowledge> updatePartitions( + final ExecutionAttemptID executionAttemptID, + Iterable<PartitionInfo> partitionInfos, + Time timeout) { final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { @@ -455,9 +466,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { if (singleInputGate != null) { // Run asynchronously because it might be blocking - getRpcService().execute(new Runnable() { - @Override - public void run() { + getRpcService().execute( + () -> { try { singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor()); } catch (IOException | InterruptedException e) { @@ -470,23 +480,22 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { log.error("Failed canceling task with execution ID {} after task update failure.", executionAttemptID, re); } } - } - }); + }); } else { - throw new PartitionException("No reader with ID " + - intermediateResultPartitionID + " for task " + executionAttemptID + - " was found."); + return FutureUtils.completedExceptionally( + new PartitionException("No reader with ID " + intermediateResultPartitionID + + " for task " + executionAttemptID + " was found.")); } } - return Acknowledge.get(); + return CompletableFuture.completedFuture(Acknowledge.get()); } else { log.debug("Discard update for input partitions of task {}. Task is no longer running.", executionAttemptID); - return Acknowledge.get(); + return CompletableFuture.completedFuture(Acknowledge.get()); } } - @RpcMethod + @Override public void failPartition(ExecutionAttemptID executionAttemptID) { log.info("Discarding the results produced by task execution {}.", executionAttemptID); @@ -504,12 +513,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { // Heartbeat RPC // ---------------------------------------------------------------------- - @RpcMethod + @Override public void heartbeatFromJobManager(ResourceID resourceID) { jobManagerHeartbeatManager.requestHeartbeat(resourceID, null); } - @RpcMethod + @Override public void heartbeatFromResourceManager(ResourceID resourceID) { resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null); } @@ -518,8 +527,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { // Checkpointing RPCs // ---------------------------------------------------------------------- - @RpcMethod - public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) throws CheckpointException { + @Override + public CompletableFuture<Acknowledge> triggerCheckpoint( + ExecutionAttemptID executionAttemptID, + long checkpointId, + long checkpointTimestamp, + CheckpointOptions checkpointOptions) { log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID); final Task task = taskSlotTable.getTask(executionAttemptID); @@ -527,17 +540,20 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { if (task != null) { task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions); - return Acknowledge.get(); + return CompletableFuture.completedFuture(Acknowledge.get()); } else { final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.'; log.debug(message); - throw new CheckpointException(message); + return FutureUtils.completedExceptionally(new CheckpointException(message)); } } - @RpcMethod - public Acknowledge confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException { + @Override + public CompletableFuture<Acknowledge> confirmCheckpoint( + ExecutionAttemptID executionAttemptID, + long checkpointId, + long checkpointTimestamp) { log.debug("Confirm checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID); final Task task = taskSlotTable.getTask(executionAttemptID); @@ -545,12 +561,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { if (task != null) { task.notifyCheckpointComplete(checkpointId); - return Acknowledge.get(); + return CompletableFuture.completedFuture(Acknowledge.get()); } else { final String message = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.'; log.debug(message); - throw new CheckpointException(message); + return FutureUtils.completedExceptionally(new CheckpointException(message)); } } @@ -569,85 +585,90 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { * @throws SlotAllocationException if the slot allocation fails * @return answer to the slot request */ - @RpcMethod - public Acknowledge requestSlot( + @Override + public CompletableFuture<Acknowledge> requestSlot( final SlotID slotId, final JobID jobId, final AllocationID allocationId, final String targetAddress, - final UUID rmLeaderId) throws SlotAllocationException { + final UUID rmLeaderId, + final Time timeout) { // TODO: Filter invalid requests from the resource manager by using the instance/registration Id log.info("Receive slot request {} for job {} from resource manager with leader id {}.", allocationId, jobId, rmLeaderId); - if (resourceManagerConnection == null) { - final String message = "TaskManager is not connected to a resource manager."; - log.debug(message); - throw new SlotAllocationException(message); - } - - if (!resourceManagerConnection.getTargetLeaderId().equals(rmLeaderId)) { - final String message = "The leader id " + rmLeaderId + - " does not match with the leader id of the connected resource manager " + - resourceManagerConnection.getTargetLeaderId() + '.'; + try { + if (resourceManagerConnection == null) { + final String message = "TaskManager is not connected to a resource manager."; + log.debug(message); + throw new SlotAllocationException(message); + } - log.debug(message); - throw new SlotAllocationException(message); - } + if (!resourceManagerConnection.getTargetLeaderId().equals(rmLeaderId)) { + final String message = "The leader id " + rmLeaderId + + " does not match with the leader id of the connected resource manager " + + resourceManagerConnection.getTargetLeaderId() + '.'; - if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) { - if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) { - log.info("Allocated slot for {}.", allocationId); - } else { - log.info("Could not allocate slot for {}.", allocationId); - throw new SlotAllocationException("Could not allocate slot."); + log.debug(message); + throw new SlotAllocationException(message); } - } else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) { - final String message = "The slot " + slotId + " has already been allocated for a different job."; - log.info(message); + if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) { + if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) { + log.info("Allocated slot for {}.", allocationId); + } else { + log.info("Could not allocate slot for {}.", allocationId); + throw new SlotAllocationException("Could not allocate slot."); + } + } else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) { + final String message = "The slot " + slotId + " has already been allocated for a different job."; - throw new SlotOccupiedException(message, taskSlotTable.getCurrentAllocation(slotId.getSlotNumber())); - } + log.info(message); - if (jobManagerTable.contains(jobId)) { - offerSlotsToJobManager(jobId); - } else { - try { - jobLeaderService.addJob(jobId, targetAddress); - } catch (Exception e) { - // free the allocated slot + throw new SlotOccupiedException(message, taskSlotTable.getCurrentAllocation(slotId.getSlotNumber())); + } + + if (jobManagerTable.contains(jobId)) { + offerSlotsToJobManager(jobId); + } else { try { - taskSlotTable.freeSlot(allocationId); - } catch (SlotNotFoundException slotNotFoundException) { - // slot no longer existent, this should actually never happen, because we've - // just allocated the slot. So let's fail hard in this case! - onFatalError(slotNotFoundException); - } + jobLeaderService.addJob(jobId, targetAddress); + } catch (Exception e) { + // free the allocated slot + try { + taskSlotTable.freeSlot(allocationId); + } catch (SlotNotFoundException slotNotFoundException) { + // slot no longer existent, this should actually never happen, because we've + // just allocated the slot. So let's fail hard in this case! + onFatalError(slotNotFoundException); + } - // sanity check - if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) { - onFatalError(new Exception("Could not free slot " + slotId)); - } + // sanity check + if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) { + onFatalError(new Exception("Could not free slot " + slotId)); + } - throw new SlotAllocationException("Could not add job to job leader service.", e); + throw new SlotAllocationException("Could not add job to job leader service.", e); + } } + } catch (SlotAllocationException slotAllocationException) { + return FutureUtils.completedExceptionally(slotAllocationException); } - return Acknowledge.get(); + return CompletableFuture.completedFuture(Acknowledge.get()); } // ---------------------------------------------------------------------- // Disconnection RPCs // ---------------------------------------------------------------------- - @RpcMethod + @Override public void disconnectJobManager(JobID jobId, Exception cause) { closeJobManagerConnection(jobId, cause); } - @RpcMethod + @Override public void disconnectResourceManager(Exception cause) { closeResourceManagerConnection(cause); } @@ -767,7 +788,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { reservedSlots.add(offer); } - CompletableFuture<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots( + CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots( getResourceID(), reservedSlots, leaderId,
http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index e1144c9..3ca0327 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -528,7 +528,8 @@ public class ResourceManagerTest extends TestLogger { rmLeaderSessionId, taskManagerAddress, taskManagerResourceID, - slotReport); + slotReport, + Time.milliseconds(0L)); RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); assertTrue(response instanceof TaskExecutorRegistrationSuccess); @@ -627,7 +628,8 @@ public class ResourceManagerTest extends TestLogger { jmLeaderId, jmResourceId, jobMasterAddress, - jobId); + jobId, + Time.milliseconds(0L)); RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); assertTrue(response instanceof JobMasterRegistrationSuccess); http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 267f10b..3814684 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -85,7 +85,7 @@ public class DispatcherTest extends TestLogger { dispatcher.start(); - DispatcherGateway dispatcherGateway = dispatcher.getSelf(); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java index 4cc4f11..8d613ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java @@ -82,7 +82,7 @@ public class SlotPoolRpcTest { ); pool.start(UUID.randomUUID(), "foobar"); - CompletableFuture<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null); + CompletableFuture<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.days(1)); try { future.get(4, TimeUnit.SECONDS); http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index 3e2293b..aeceb59 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -101,7 +101,7 @@ public class SlotPoolTest extends TestLogger { slotPool.registerTaskManager(resourceID); ScheduledUnit task = mock(ScheduledUnit.class); - CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null); + CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); assertFalse(future.isDone()); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); @@ -110,7 +110,7 @@ public class SlotPoolTest extends TestLogger { final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertTrue(slotPool.offerSlot(allocatedSlot)); + assertTrue(slotPool.offerSlot(allocatedSlot).get()); SimpleSlot slot = future.get(1, TimeUnit.SECONDS); assertTrue(future.isDone()); @@ -126,8 +126,8 @@ public class SlotPoolTest extends TestLogger { ResourceID resourceID = new ResourceID("resource"); slotPool.registerTaskManager(resourceID); - CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null); - CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null); + CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); + CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); assertFalse(future1.isDone()); assertFalse(future2.isDone()); @@ -139,7 +139,7 @@ public class SlotPoolTest extends TestLogger { final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues(); AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertTrue(slotPool.offerSlot(allocatedSlot)); + assertTrue(slotPool.offerSlot(allocatedSlot).get()); SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS); assertTrue(future1.isDone()); @@ -165,7 +165,7 @@ public class SlotPoolTest extends TestLogger { ResourceID resourceID = new ResourceID("resource"); slotPool.registerTaskManager(resourceID); - CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null); + CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); assertFalse(future1.isDone()); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); @@ -174,7 +174,7 @@ public class SlotPoolTest extends TestLogger { final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertTrue(slotPool.offerSlot(allocatedSlot)); + assertTrue(slotPool.offerSlot(allocatedSlot).get()); SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS); assertTrue(future1.isDone()); @@ -182,7 +182,7 @@ public class SlotPoolTest extends TestLogger { // return this slot to pool slot1.releaseSlot(); - CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null); + CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); // second allocation fulfilled by previous slot returning SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS); @@ -200,7 +200,7 @@ public class SlotPoolTest extends TestLogger { ResourceID resourceID = new ResourceID("resource"); slotPool.registerTaskManager(resourceID); - CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null); + CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); assertFalse(future.isDone()); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); @@ -210,30 +210,30 @@ public class SlotPoolTest extends TestLogger { // slot from unregistered resource AllocatedSlot invalid = createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertFalse(slotPool.offerSlot(invalid)); + assertFalse(slotPool.offerSlot(invalid).get()); AllocatedSlot notRequested = createAllocatedSlot(resourceID, new AllocationID(), jobId, DEFAULT_TESTING_PROFILE); // we'll also accept non requested slots - assertTrue(slotPool.offerSlot(notRequested)); + assertTrue(slotPool.offerSlot(notRequested).get()); AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); // accepted slot - assertTrue(slotPool.offerSlot(allocatedSlot)); + assertTrue(slotPool.offerSlot(allocatedSlot).get()); SimpleSlot slot = future.get(1, TimeUnit.SECONDS); assertTrue(future.isDone()); assertTrue(slot.isAlive()); // duplicated offer with using slot - assertTrue(slotPool.offerSlot(allocatedSlot)); + assertTrue(slotPool.offerSlot(allocatedSlot).get()); assertTrue(future.isDone()); assertTrue(slot.isAlive()); // duplicated offer with free slot slot.releaseSlot(); assertTrue(slot.isReleased()); - assertTrue(slotPool.offerSlot(allocatedSlot)); + assertTrue(slotPool.offerSlot(allocatedSlot).get()); } @Test @@ -241,17 +241,17 @@ public class SlotPoolTest extends TestLogger { ResourceID resourceID = new ResourceID("resource"); slotPool.registerTaskManager(resourceID); - CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null); + CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); - CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null); + CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertTrue(slotPool.offerSlot(allocatedSlot)); + assertTrue(slotPool.offerSlot(allocatedSlot).get()); SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS); assertTrue(future1.isDone()); http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java index 7c58879..435c23d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java @@ -52,6 +52,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -83,7 +84,7 @@ public class JobManagerRunnerMockTest extends TestLogger { jobManager = mock(JobMaster.class); jobManagerGateway = mock(JobMasterGateway.class); - when(jobManager.getSelf()).thenReturn(jobManagerGateway); + when(jobManager.getSelfGateway(eq(JobMasterGateway.class))).thenReturn(jobManagerGateway); when(jobManager.getRpcService()).thenReturn(mockRpc); PowerMockito.whenNew(JobMaster.class).withAnyArguments().thenReturn(jobManager); http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 48a1d45..0c4d376 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -112,7 +112,7 @@ public class JobMasterTest extends TestLogger { jobMaster.start(jmLeaderId); // register task manager will trigger monitor heartbeat target, schedule heartbeat request at interval time - jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId); + jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId, Time.milliseconds(0L)); ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); verify(scheduledExecutor, times(1)).scheduleAtFixedRate( http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 6480d75..10d6a72 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -52,6 +52,8 @@ public class ResourceManagerJobMasterTest extends TestLogger { private TestingSerialRpcService rpcService; + private final Time timeout = Time.milliseconds(0L); + @Before public void setup() throws Exception { rpcService = new TestingSerialRpcService(); @@ -83,7 +85,8 @@ public class ResourceManagerJobMasterTest extends TestLogger { jmLeaderID, jmResourceId, jobMasterAddress, - jobID); + jobID, + timeout); RegistrationResponse response = successfulFuture.get(5L, TimeUnit.SECONDS); assertTrue(response instanceof JobMasterRegistrationSuccess); @@ -114,7 +117,8 @@ public class ResourceManagerJobMasterTest extends TestLogger { jmLeaderID, jmResourceId, jobMasterAddress, - jobID); + jobID, + timeout); assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); if (testingFatalErrorHandler.hasExceptionOccurred()) { @@ -146,7 +150,8 @@ public class ResourceManagerJobMasterTest extends TestLogger { differentLeaderSessionID, jmResourceId, jobMasterAddress, - jobID); + jobID, + timeout); assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); if (testingFatalErrorHandler.hasExceptionOccurred()) { @@ -178,7 +183,8 @@ public class ResourceManagerJobMasterTest extends TestLogger { jmLeaderSessionId, jmResourceId, invalidAddress, - jobID); + jobID, + timeout); assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); if (testingFatalErrorHandler.hasExceptionOccurred()) { @@ -210,7 +216,8 @@ public class ResourceManagerJobMasterTest extends TestLogger { jmLeaderSessionId, jmResourceId, jobMasterAddress, - unknownJobIDToHAServices); + unknownJobIDToHAServices, + timeout); RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS); assertTrue(response instanceof RegistrationResponse.Decline); http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 4127cea..fc96f0d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -90,13 +90,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { try { // test response successful CompletableFuture<RegistrationResponse> successfulFuture = - resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport); + resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L)); RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); assertTrue(response instanceof TaskExecutorRegistrationSuccess); // test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor CompletableFuture<RegistrationResponse> duplicateFuture = - resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport); + resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L)); RegistrationResponse duplicateResponse = duplicateFuture.get(); assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess); assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId()); @@ -116,7 +116,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { // test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId UUID differentLeaderSessionID = UUID.randomUUID(); CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = - resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport); + resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L)); assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); } finally { if (testingFatalErrorHandler.hasExceptionOccurred()) { @@ -134,7 +134,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { // test throw exception when receive a registration from taskExecutor which takes invalid address String invalidAddress = "/taskExecutor2"; CompletableFuture<RegistrationResponse> invalidAddressFuture = - resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport); + resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L)); assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); } finally { if (testingFatalErrorHandler.hasExceptionOccurred()) { http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index 4be5257..00762b9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -67,7 +67,7 @@ public class AsyncCallsTest extends TestLogger { TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock); testEndpoint.start(); - TestGateway gateway = testEndpoint.getSelf(); + TestGateway gateway = testEndpoint.getSelfGateway(TestGateway.class); // a bunch of gateway calls gateway.someCall(); @@ -108,7 +108,7 @@ public class AsyncCallsTest extends TestLogger { assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess()); assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get()); - akkaRpcService.stopServer(testEndpoint.getSelf()); + testEndpoint.shutDown(); } @Test @@ -174,7 +174,7 @@ public class AsyncCallsTest extends TestLogger { } @SuppressWarnings("unused") - public static class TestEndpoint extends RpcEndpoint<TestGateway> { + public static class TestEndpoint extends RpcEndpoint implements TestGateway { private final ReentrantLock lock; @@ -185,7 +185,7 @@ public class AsyncCallsTest extends TestLogger { this.lock = lock; } - @RpcMethod + @Override public void someCall() { boolean holdsLock = lock.tryLock(); if (holdsLock) { @@ -195,7 +195,7 @@ public class AsyncCallsTest extends TestLogger { } } - @RpcMethod + @Override public void anotherCall() { boolean holdsLock = lock.tryLock(); if (holdsLock) { http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java deleted file mode 100644 index 07dadae..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ /dev/null @@ -1,452 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.util.ReflectionUtil; -import org.apache.flink.util.TestLogger; -import org.junit.Test; -import org.reflections.Reflections; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.annotation.Annotation; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.lang.reflect.Type; -import java.lang.reflect.TypeVariable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Test which ensures that all classes of subtype {@link RpcEndpoint} implement - * the methods specified in the generic gateway type argument. - * - * {@code - * RpcEndpoint<GatewayTypeParameter extends RpcGateway> - * } - * - * Note, that the class hierarchy can also be nested. In this case the type argument - * always has to be the first argument, e.g. {@code - * - * // RpcClass needs to implement RpcGatewayClass' methods - * RpcClass extends RpcEndpoint<RpcGatewayClass> - * - * // RpcClass2 or its subclass needs to implement RpcGatewayClass' methods - * RpcClass<GatewayTypeParameter extends RpcGateway,...> extends RpcEndpoint<GatewayTypeParameter> - * RpcClass2 extends RpcClass<RpcGatewayClass,...> - * - * // needless to say, this can even be nested further - * ... - * } - * - */ -public class RpcCompletenessTest extends TestLogger { - - private static Logger LOG = LoggerFactory.getLogger(RpcCompletenessTest.class); - - private static final Class<?> futureClass = CompletableFuture.class; - private static final Class<?> timeoutClass = Time.class; - - @Test - @SuppressWarnings({"rawtypes", "unchecked"}) - public void testRpcCompleteness() { - Reflections reflections = new Reflections("org.apache.flink"); - - Set<Class<? extends RpcEndpoint>> classes = reflections.getSubTypesOf(RpcEndpoint.class); - - Class<? extends RpcEndpoint> c; - - mainloop: - for (Class<? extends RpcEndpoint> rpcEndpoint : classes) { - c = rpcEndpoint; - - LOG.debug("-------------"); - LOG.debug("c: {}", c); - - // skip abstract classes - if (Modifier.isAbstract(c.getModifiers())) { - LOG.debug("Skipping abstract class"); - continue; - } - - // check for type parameter bound to RpcGateway - // skip if one is found because a subclass will provide the concrete argument - TypeVariable<? extends Class<? extends RpcEndpoint>>[] typeParameters = c.getTypeParameters(); - LOG.debug("Checking {} parameters.", typeParameters.length); - for (int i = 0; i < typeParameters.length; i++) { - for (Type bound : typeParameters[i].getBounds()) { - LOG.debug("checking bound {} of type parameter {}", bound, typeParameters[i]); - if (bound.toString().equals("interface " + RpcGateway.class.getName())) { - if (i > 0) { - fail("Type parameter for RpcGateway should come first in " + c); - } - LOG.debug("Skipping class with type parameter bound to RpcGateway."); - // Type parameter is bound to RpcGateway which a subclass will provide - continue mainloop; - } - } - } - - // check if this class or any super class contains the RpcGateway argument - Class<?> rpcGatewayType; - do { - LOG.debug("checking type argument of class: {}", c); - rpcGatewayType = ReflectionUtil.getTemplateType1(c); - LOG.debug("type argument is: {}", rpcGatewayType); - - c = (Class<? extends RpcEndpoint>) c.getSuperclass(); - - } while (!RpcGateway.class.isAssignableFrom(rpcGatewayType)); - - LOG.debug("Checking RRC completeness of endpoint '{}' with gateway '{}'", - rpcEndpoint.getSimpleName(), rpcGatewayType.getSimpleName()); - - checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType); - } - } - - @SuppressWarnings("rawtypes") - private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<? extends RpcGateway> rpcGateway) { - List<Method> rpcMethodsFromGateway = getRpcMethodsFromGateway(rpcGateway); - Method[] gatewayMethods = rpcMethodsFromGateway.toArray(new Method[rpcMethodsFromGateway.size()]); - Method[] serverMethods = rpcEndpoint.getMethods(); - - Map<String, Set<Method>> rpcMethods = new HashMap<>(); - Set<Method> unmatchedRpcMethods = new HashSet<>(); - - for (Method serverMethod : serverMethods) { - if (serverMethod.isAnnotationPresent(RpcMethod.class)) { - if (rpcMethods.containsKey(serverMethod.getName())) { - Set<Method> methods = rpcMethods.get(serverMethod.getName()); - methods.add(serverMethod); - - rpcMethods.put(serverMethod.getName(), methods); - } else { - Set<Method> methods = new HashSet<>(); - methods.add(serverMethod); - - rpcMethods.put(serverMethod.getName(), methods); - } - - unmatchedRpcMethods.add(serverMethod); - } - } - - for (Method gatewayMethod : gatewayMethods) { - assertTrue( - "The rpc endpoint " + rpcEndpoint.getName() + " does not contain a RpcMethod " + - "annotated method with the same name and signature " + - generateEndpointMethodSignature(gatewayMethod) + ".", - rpcMethods.containsKey(gatewayMethod.getName())); - - checkGatewayMethod(gatewayMethod); - - if (!matchGatewayMethodWithEndpoint(gatewayMethod, rpcMethods.get(gatewayMethod.getName()), unmatchedRpcMethods)) { - fail("Could not find a RpcMethod annotated method in rpc endpoint " + - rpcEndpoint.getName() + " matching the rpc gateway method " + - generateEndpointMethodSignature(gatewayMethod) + " defined in the rpc gateway " + - rpcGateway.getName() + "."); - } - } - - if (!unmatchedRpcMethods.isEmpty()) { - StringBuilder builder = new StringBuilder(); - - for (Method unmatchedRpcMethod : unmatchedRpcMethods) { - builder.append(unmatchedRpcMethod).append("\n"); - } - - fail("The rpc endpoint " + rpcEndpoint.getName() + " contains rpc methods which " + - "are not matched to gateway methods of " + rpcGateway.getName() + ":\n" + - builder.toString()); - } - } - - /** - * Checks whether the gateway method fulfills the gateway method requirements. - * <ul> - * <li>It checks whether the return type is void or a {@link CompletableFuture} wrapping the actual result. </li> - * <li>It checks that the method's parameter list contains at most one parameter annotated with {@link RpcTimeout}.</li> - * </ul> - * - * @param gatewayMethod Gateway method to check - */ - private void checkGatewayMethod(Method gatewayMethod) { - if (!gatewayMethod.getReturnType().equals(Void.TYPE)) { - assertTrue( - "The return type of method " + gatewayMethod.getName() + " in the rpc gateway " + - gatewayMethod.getDeclaringClass().getName() + " is non void and not a " + - "future. Non-void return types have to be returned as a future.", - gatewayMethod.getReturnType().equals(futureClass)); - } - - Annotation[][] parameterAnnotations = gatewayMethod.getParameterAnnotations(); - Class<?>[] parameterTypes = gatewayMethod.getParameterTypes(); - int rpcTimeoutParameters = 0; - - for (int i = 0; i < parameterAnnotations.length; i++) { - if (RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) { - assertTrue( - "The rpc timeout has to be of type " + timeoutClass.getName() + ".", - parameterTypes[i].equals(timeoutClass)); - - rpcTimeoutParameters++; - } - } - - assertTrue("The gateway method " + gatewayMethod + " must have at most one RpcTimeout " + - "annotated parameter.", rpcTimeoutParameters <= 1); - } - - /** - * Checks whether we find a matching overloaded version for the gateway method among the methods - * with the same name in the rpc endpoint. - * - * @param gatewayMethod Gateway method - * @param endpointMethods Set of rpc methods on the rpc endpoint with the same name as the gateway - * method - * @param unmatchedRpcMethods Set of unmatched rpc methods on the endpoint side (so far) - */ - private boolean matchGatewayMethodWithEndpoint(Method gatewayMethod, Set<Method> endpointMethods, Set<Method> unmatchedRpcMethods) { - for (Method endpointMethod : endpointMethods) { - if (checkMethod(gatewayMethod, endpointMethod)) { - unmatchedRpcMethods.remove(endpointMethod); - return true; - } - } - - return false; - } - - private boolean checkMethod(Method gatewayMethod, Method endpointMethod) { - Class<?>[] gatewayParameterTypes = gatewayMethod.getParameterTypes(); - Annotation[][] gatewayParameterAnnotations = gatewayMethod.getParameterAnnotations(); - - Class<?>[] endpointParameterTypes = endpointMethod.getParameterTypes(); - - List<Class<?>> filteredGatewayParameterTypes = new ArrayList<>(); - - assertEquals(gatewayParameterTypes.length, gatewayParameterAnnotations.length); - - // filter out the RpcTimeout parameters - for (int i = 0; i < gatewayParameterTypes.length; i++) { - if (!RpcCompletenessTest.isRpcTimeout(gatewayParameterAnnotations[i])) { - filteredGatewayParameterTypes.add(gatewayParameterTypes[i]); - } - } - - if (filteredGatewayParameterTypes.size() != endpointParameterTypes.length) { - return false; - } else { - // check the parameter types - for (int i = 0; i < filteredGatewayParameterTypes.size(); i++) { - if (!checkType(filteredGatewayParameterTypes.get(i), endpointParameterTypes[i])) { - return false; - } - } - - // check the return types - if (endpointMethod.getReturnType() == void.class) { - if (gatewayMethod.getReturnType() != void.class) { - return false; - } - } else { - // has return value. The gateway method should be wrapped in a future - Class<?> futureClass = gatewayMethod.getReturnType(); - - // sanity check that the return type of a gateway method must be void or a future - if (!futureClass.equals(RpcCompletenessTest.futureClass)) { - return false; - } else { - ReflectionUtil.FullTypeInfo fullValueTypeInfo = ReflectionUtil.getFullTemplateType(gatewayMethod.getGenericReturnType(), 0); - - if (endpointMethod.getReturnType().equals(futureClass)) { - ReflectionUtil.FullTypeInfo fullRpcEndpointValueTypeInfo = ReflectionUtil.getFullTemplateType(endpointMethod.getGenericReturnType(), 0); - - // check if we have the same future value types - if (fullValueTypeInfo != null && fullRpcEndpointValueTypeInfo != null) { - Iterator<Class<?>> valueClasses = fullValueTypeInfo.getClazzIterator(); - Iterator<Class<?>> rpcClasses = fullRpcEndpointValueTypeInfo.getClazzIterator(); - - while (valueClasses.hasNext() && rpcClasses.hasNext()) { - if (!checkType(valueClasses.next(), rpcClasses.next())) { - return false; - } - } - - // both should be empty - return !valueClasses.hasNext() && !rpcClasses.hasNext(); - } - } else { - if (fullValueTypeInfo != null && !checkType(fullValueTypeInfo.getClazz(), endpointMethod.getReturnType())) { - return false; - } - } - } - } - - return gatewayMethod.getName().equals(endpointMethod.getName()); - } - } - - private boolean checkType(Class<?> firstType, Class<?> secondType) { - Class<?> firstResolvedType; - Class<?> secondResolvedType; - - if (firstType.isPrimitive()) { - firstResolvedType = RpcCompletenessTest.resolvePrimitiveType(firstType); - } else { - firstResolvedType = firstType; - } - - if (secondType.isPrimitive()) { - secondResolvedType = RpcCompletenessTest.resolvePrimitiveType(secondType); - } else { - secondResolvedType = secondType; - } - - return firstResolvedType.equals(secondResolvedType); - } - - /** - * Generates from a gateway rpc method signature the corresponding rpc endpoint signature. - * - * For example the {@link RpcTimeout} annotation adds an additional parameter to the gateway - * signature which is not relevant on the server side. - * - * @param method Method to generate the signature string for - * @return String of the respective server side rpc method signature - */ - private String generateEndpointMethodSignature(Method method) { - StringBuilder builder = new StringBuilder(); - - if (method.getReturnType().equals(Void.TYPE)) { - builder.append("void").append(" "); - } else if (method.getReturnType().equals(futureClass)) { - ReflectionUtil.FullTypeInfo fullTypeInfo = ReflectionUtil.getFullTemplateType(method.getGenericReturnType(), 0); - - builder - .append(futureClass.getSimpleName()) - .append("<") - .append(fullTypeInfo != null ? fullTypeInfo.toString() : "") - .append(">"); - - if (fullTypeInfo != null) { - builder.append("/").append(fullTypeInfo); - } - - builder.append(" "); - } else { - return "Invalid rpc method signature."; - } - - builder.append(method.getName()).append("("); - - Class<?>[] parameterTypes = method.getParameterTypes(); - Annotation[][] parameterAnnotations = method.getParameterAnnotations(); - - assertEquals(parameterTypes.length, parameterAnnotations.length); - - for (int i = 0; i < parameterTypes.length; i++) { - // filter out the RpcTimeout parameters - if (!RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) { - builder.append(parameterTypes[i].getName()); - - if (i < parameterTypes.length -1) { - builder.append(", "); - } - } - } - - builder.append(")"); - - return builder.toString(); - } - - private static boolean isRpcTimeout(Annotation[] annotations) { - for (Annotation annotation : annotations) { - if (annotation.annotationType().equals(RpcTimeout.class)) { - return true; - } - } - - return false; - } - - /** - * Returns the boxed type for a primitive type. - * - * @param primitveType Primitive type to resolve - * @return Boxed type for the given primitive type - */ - private static Class<?> resolvePrimitiveType(Class<?> primitveType) { - assert primitveType.isPrimitive(); - - TypeInformation<?> typeInformation = BasicTypeInfo.getInfoFor(primitveType); - - if (typeInformation != null) { - return typeInformation.getTypeClass(); - } else { - throw new RuntimeException("Could not retrive basic type information for primitive type " + primitveType + '.'); - } - } - - /** - * Extract all rpc methods defined by the gateway interface - * - * @param interfaceClass the given rpc gateway interface - * @return all methods defined by the given interface - */ - private List<Method> getRpcMethodsFromGateway(Class<? extends RpcGateway> interfaceClass) { - if(!interfaceClass.isInterface()) { - fail(interfaceClass.getName() + " is not a interface"); - } - - ArrayList<Method> allMethods = new ArrayList<>(); - // Methods defined in RpcGateway are native method - if(interfaceClass.equals(RpcGateway.class)) { - return allMethods; - } - - // Get all methods declared in current interface - Collections.addAll(allMethods, interfaceClass.getDeclaredMethods()); - - // Get all method inherited from super interface - for (Class<?> superClass : interfaceClass.getInterfaces()) { - @SuppressWarnings("unchecked") - Class<? extends RpcGateway> gatewayClass = (Class<? extends RpcGateway>) superClass; - allMethods.addAll(getRpcMethodsFromGateway(gatewayClass)); - } - return allMethods; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java new file mode 100644 index 0000000..b3e8ee6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.util.TestLogger; + +import akka.actor.ActorSystem; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.FiniteDuration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Tests for the RpcEndpoint and its self gateways. + */ +public class RpcEndpointTest extends TestLogger { + + private static final Time TIMEOUT = Time.seconds(10L); + private static ActorSystem actorSystem = null; + private static RpcService rpcService = null; + + @BeforeClass + public static void setup() { + actorSystem = AkkaUtils.createDefaultActorSystem(); + rpcService = new AkkaRpcService(actorSystem, TIMEOUT); + } + + @AfterClass + public static void teardown() throws Exception { + if (rpcService != null) { + rpcService.stopService(); + } + + if (actorSystem != null) { + actorSystem.shutdown(); + } + + if (rpcService != null) { + rpcService.getTerminationFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + } + + if (actorSystem != null) { + actorSystem.awaitTermination(new FiniteDuration(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)); + } + } + + /** + * Tests that we can obtain the self gateway from a RpcEndpoint and can interact with + * it via the self gateway. + */ + @Test + public void testSelfGateway() throws Exception { + int expectedValue = 1337; + BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, expectedValue); + + try { + baseEndpoint.start(); + + BaseGateway baseGateway = baseEndpoint.getSelfGateway(BaseGateway.class); + + CompletableFuture<Integer> foobar = baseGateway.foobar(); + + assertEquals(Integer.valueOf(expectedValue), foobar.get()); + } finally { + baseEndpoint.shutDown(); + } + } + + /** + * Tests that we cannot accidentally obtain a wrong self gateway type which is + * not implemented by the RpcEndpoint. + */ + @Test(expected = RuntimeException.class) + public void testWrongSelfGateway() throws Exception { + int expectedValue = 1337; + BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, expectedValue); + + try { + baseEndpoint.start(); + + DifferentGateway differentGateway = baseEndpoint.getSelfGateway(DifferentGateway.class); + + fail("Expected to fail with a RuntimeException since we requested the wrong gateway type."); + } finally { + baseEndpoint.shutDown(); + } + } + + /** + * Tests that we can extend existing RpcEndpoints and can communicate with them via the + * self gateways. + */ + @Test + public void testEndpointInheritance() throws Exception { + int foobar = 1; + int barfoo = 2; + String foo = "foobar"; + + ExtendedEndpoint endpoint = new ExtendedEndpoint(rpcService, foobar, barfoo, foo); + + try { + endpoint.start(); + + BaseGateway baseGateway = endpoint.getSelfGateway(BaseGateway.class); + ExtendedGateway extendedGateway = endpoint.getSelfGateway(ExtendedGateway.class); + DifferentGateway differentGateway = endpoint.getSelfGateway(DifferentGateway.class); + + assertEquals(Integer.valueOf(foobar), baseGateway.foobar().get()); + assertEquals(Integer.valueOf(foobar), extendedGateway.foobar().get()); + + assertEquals(Integer.valueOf(barfoo), extendedGateway.barfoo().get()); + assertEquals(foo, differentGateway.foo().get()); + } finally { + endpoint.shutDown(); + } + } + + public interface BaseGateway extends RpcGateway { + CompletableFuture<Integer> foobar(); + } + + public interface ExtendedGateway extends BaseGateway { + CompletableFuture<Integer> barfoo(); + } + + public interface DifferentGateway extends RpcGateway { + CompletableFuture<String> foo(); + } + + public static class BaseEndpoint extends RpcEndpoint implements BaseGateway { + + private final int foobarValue; + + protected BaseEndpoint(RpcService rpcService, int foobarValue) { + super(rpcService); + + this.foobarValue = foobarValue; + } + + @Override + public CompletableFuture<Integer> foobar() { + return CompletableFuture.completedFuture(foobarValue); + } + } + + public static class ExtendedEndpoint extends BaseEndpoint implements ExtendedGateway, DifferentGateway { + + private final int barfooValue; + + private final String fooString; + + protected ExtendedEndpoint(RpcService rpcService, int foobarValue, int barfooValue, String fooString) { + super(rpcService, foobarValue); + + this.barfooValue = barfooValue; + this.fooString = fooString; + } + + @Override + public CompletableFuture<Integer> barfoo() { + return CompletableFuture.completedFuture(barfooValue); + } + + @Override + public CompletableFuture<String> foo() { + return CompletableFuture.completedFuture(fooString); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index 37349a1..cb38f6f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; @@ -30,8 +29,8 @@ import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; -import java.util.BitSet; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -139,32 +138,32 @@ public class TestingSerialRpcService implements RpcService { } @Override - public void stopServer(RpcGateway selfGateway) { + public void stopServer(RpcServer selfGateway) { registeredConnections.remove(selfGateway.getAddress()); } @Override - public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) { + public <S extends RpcEndpoint & RpcGateway> RpcServer startServer(S rpcEndpoint) { final String address = UUID.randomUUID().toString(); InvocationHandler akkaInvocationHandler = new TestingSerialRpcService.TestingSerialInvocationHandler<>(address, rpcEndpoint); ClassLoader classLoader = getClass().getClassLoader(); + Set<Class<? extends RpcGateway>> implementedRpcGateways = RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()); + + implementedRpcGateways.add(RpcServer.class); + + @SuppressWarnings("unchecked") - C self = (C) Proxy.newProxyInstance( + RpcServer rpcServer = (RpcServer) Proxy.newProxyInstance( classLoader, - new Class<?>[]{ - rpcEndpoint.getSelfGatewayType(), - MainThreadExecutable.class, - StartStoppable.class, - RpcGateway.class - }, + implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]), akkaInvocationHandler); // register self - registeredConnections.putIfAbsent(self.getAddress(), self); + registeredConnections.putIfAbsent(rpcServer.getAddress(), rpcServer); - return self; + return rpcServer; } @Override @@ -211,7 +210,7 @@ public class TestingSerialRpcService implements RpcService { registeredConnections.clear(); } - private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutable, StartStoppable { + private static final class TestingSerialInvocationHandler<T extends RpcEndpoint & RpcGateway> implements InvocationHandler, RpcGateway, MainThreadExecutable, StartStoppable { private final T rpcEndpoint; @@ -234,7 +233,9 @@ public class TestingSerialRpcService implements RpcService { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?> declaringClass = method.getDeclaringClass(); if (declaringClass.equals(MainThreadExecutable.class) || - declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || + declaringClass.equals(Object.class) || + declaringClass.equals(StartStoppable.class) || + declaringClass.equals(RpcServer.class) || declaringClass.equals(RpcGateway.class)) { return method.invoke(this, args); } else { @@ -243,22 +244,17 @@ public class TestingSerialRpcService implements RpcService { Annotation[][] parameterAnnotations = method.getParameterAnnotations(); Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); - final Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments( - parameterTypes, - parameterAnnotations, - args); - Class<?> returnType = method.getReturnType(); if (returnType.equals(CompletableFuture.class)) { try { - Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout); + Object result = handleRpcInvocationSync(methodName, parameterTypes, args, futureTimeout); return CompletableFuture.completedFuture(result); } catch (Throwable e) { return FutureUtils.completedExceptionally(e); } } else { - return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout); + return handleRpcInvocationSync(methodName, parameterTypes, args, futureTimeout); } } } @@ -380,61 +376,6 @@ public class TestingSerialRpcService implements RpcService { } /** - * Removes all {@link RpcTimeout} annotated parameters from the parameter type and argument - * list. - * - * @param parameterTypes Array of parameter types - * @param parameterAnnotations Array of parameter annotations - * @param args Arary of arguments - * @return Tuple of filtered parameter types and arguments which no longer contain the - * {@link RpcTimeout} annotated parameter types and arguments - */ - private static Tuple2<Class<?>[], Object[]> filterArguments( - Class<?>[] parameterTypes, - Annotation[][] parameterAnnotations, - Object[] args) { - - Class<?>[] filteredParameterTypes; - Object[] filteredArgs; - - if (args == null) { - filteredParameterTypes = parameterTypes; - filteredArgs = null; - } else { - Preconditions.checkArgument(parameterTypes.length == parameterAnnotations.length); - Preconditions.checkArgument(parameterAnnotations.length == args.length); - - BitSet isRpcTimeoutParameter = new BitSet(parameterTypes.length); - int numberRpcParameters = parameterTypes.length; - - for (int i = 0; i < parameterTypes.length; i++) { - if (isRpcTimeout(parameterAnnotations[i])) { - isRpcTimeoutParameter.set(i); - numberRpcParameters--; - } - } - - if (numberRpcParameters == parameterTypes.length) { - filteredParameterTypes = parameterTypes; - filteredArgs = args; - } else { - filteredParameterTypes = new Class<?>[numberRpcParameters]; - filteredArgs = new Object[numberRpcParameters]; - int counter = 0; - - for (int i = 0; i < parameterTypes.length; i++) { - if (!isRpcTimeoutParameter.get(i)) { - filteredParameterTypes[counter] = parameterTypes[i]; - filteredArgs[counter] = args[i]; - counter++; - } - } - } - } - return Tuple2.of(filteredParameterTypes, filteredArgs); - } - - /** * Checks whether any of the annotations is of type {@link RpcTimeout} * * @param annotations Array of annotations http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 793d292..56d17e9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; -import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; @@ -105,7 +104,7 @@ public class AkkaRpcActorTest extends TestLogger { DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService); - DummyRpcGateway rpcGateway = rpcEndpoint.getSelf(); + DummyRpcGateway rpcGateway = rpcEndpoint.getSelfGateway(DummyRpcGateway.class); // this message should be discarded and completed with an AkkaRpcException CompletableFuture<Integer> result = rpcGateway.foobar(); @@ -192,7 +191,7 @@ public class AkkaRpcActorTest extends TestLogger { ExceptionalEndpoint rpcEndpoint = new ExceptionalEndpoint(akkaRpcService); rpcEndpoint.start(); - ExceptionalGateway rpcGateway = rpcEndpoint.getSelf(); + ExceptionalGateway rpcGateway = rpcEndpoint.getSelfGateway(ExceptionalGateway.class); CompletableFuture<Integer> result = rpcGateway.doStuff(); try { @@ -211,7 +210,7 @@ public class AkkaRpcActorTest extends TestLogger { ExceptionalFutureEndpoint rpcEndpoint = new ExceptionalFutureEndpoint(akkaRpcService); rpcEndpoint.start(); - ExceptionalGateway rpcGateway = rpcEndpoint.getSelf(); + ExceptionalGateway rpcGateway = rpcEndpoint.getSelfGateway(ExceptionalGateway.class); CompletableFuture<Integer> result = rpcGateway.doStuff(); try { @@ -275,7 +274,7 @@ public class AkkaRpcActorTest extends TestLogger { void tell(String message); } - private static class DummyRpcEndpoint extends RpcEndpoint<DummyRpcGateway> { + private static class DummyRpcEndpoint extends RpcEndpoint implements DummyRpcGateway { private volatile int _foobar = 42; @@ -283,9 +282,9 @@ public class AkkaRpcActorTest extends TestLogger { super(rpcService); } - @RpcMethod - public int foobar() { - return _foobar; + @Override + public CompletableFuture<Integer> foobar() { + return CompletableFuture.completedFuture(_foobar); } public void setFoobar(int value) { @@ -299,25 +298,25 @@ public class AkkaRpcActorTest extends TestLogger { CompletableFuture<Integer> doStuff(); } - private static class ExceptionalEndpoint extends RpcEndpoint<ExceptionalGateway> { + private static class ExceptionalEndpoint extends RpcEndpoint implements ExceptionalGateway { protected ExceptionalEndpoint(RpcService rpcService) { super(rpcService); } - @RpcMethod - public int doStuff() { + @Override + public CompletableFuture<Integer> doStuff() { throw new RuntimeException("my super specific test exception"); } } - private static class ExceptionalFutureEndpoint extends RpcEndpoint<ExceptionalGateway> { + private static class ExceptionalFutureEndpoint extends RpcEndpoint implements ExceptionalGateway { protected ExceptionalFutureEndpoint(RpcService rpcService) { super(rpcService); } - @RpcMethod + @Override public CompletableFuture<Integer> doStuff() { final CompletableFuture<Integer> future = new CompletableFuture<>(); @@ -338,7 +337,7 @@ public class AkkaRpcActorTest extends TestLogger { // ------------------------------------------------------------------------ - private static class SimpleRpcEndpoint extends RpcEndpoint<RpcGateway> { + private static class SimpleRpcEndpoint extends RpcEndpoint implements RpcGateway { protected SimpleRpcEndpoint(RpcService rpcService, String endpointId) { super(rpcService, endpointId); @@ -352,7 +351,7 @@ public class AkkaRpcActorTest extends TestLogger { // ------------------------------------------------------------------------ - private static class FailingPostStopEndpoint extends RpcEndpoint<RpcGateway> { + private static class FailingPostStopEndpoint extends RpcEndpoint implements RpcGateway { protected FailingPostStopEndpoint(RpcService rpcService, String endpointId) { super(rpcService, endpointId); http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java index 9f134d8..96a9ee4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; -import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.TestLogger; @@ -52,7 +51,7 @@ public class MainThreadValidationTest extends TestLogger { testEndpoint.start(); // this works, because it is executed as an RPC call - testEndpoint.getSelf().someConcurrencyCriticalFunction(); + testEndpoint.getSelfGateway(TestGateway.class).someConcurrencyCriticalFunction(); // this fails, because it is executed directly boolean exceptionThrown; @@ -65,7 +64,7 @@ public class MainThreadValidationTest extends TestLogger { } assertTrue("should fail with an assertion error", exceptionThrown); - akkaRpcService.stopServer(testEndpoint.getSelf()); + testEndpoint.shutDown(); } finally { akkaRpcService.stopService(); @@ -82,13 +81,13 @@ public class MainThreadValidationTest extends TestLogger { } @SuppressWarnings("unused") - public static class TestEndpoint extends RpcEndpoint<TestGateway> { + public static class TestEndpoint extends RpcEndpoint implements TestGateway { public TestEndpoint(RpcService rpcService) { super(rpcService); } - @RpcMethod + @Override public void someConcurrencyCriticalFunction() { validateRunsInMainThread(); } http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java index 34cf412..c722980 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; -import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.TestLogger; import org.hamcrest.core.Is; @@ -85,7 +84,7 @@ public class MessageSerializationTest extends TestLogger { TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue); testEndpoint.start(); - TestGateway testGateway = testEndpoint.getSelf(); + TestGateway testGateway = testEndpoint.getSelfGateway(TestGateway.class); NonSerializableObject expected = new NonSerializableObject(42); @@ -169,7 +168,7 @@ public class MessageSerializationTest extends TestLogger { void foobar(Object object) throws IOException, InterruptedException; } - private static class TestEndpoint extends RpcEndpoint<TestGateway> { + private static class TestEndpoint extends RpcEndpoint implements TestGateway { private final LinkedBlockingQueue<Object> queue; @@ -178,7 +177,7 @@ public class MessageSerializationTest extends TestLogger { this.queue = queue; } - @RpcMethod + @Override public void foobar(Object object) throws InterruptedException { queue.put(object); } http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 53c435e..4c87671 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; @@ -167,7 +168,7 @@ public class TaskExecutorITCase extends TestLogger { any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); - rpcService.registerGateway(rmAddress, resourceManager.getSelf()); + rpcService.registerGateway(rmAddress, resourceManager.getSelfGateway(ResourceManagerGateway.class)); rpcService.registerGateway(jmAddress, jmGateway); final AllocationID allocationId = new AllocationID(); @@ -189,13 +190,14 @@ public class TaskExecutorITCase extends TestLogger { jmLeaderId, jmResourceId, jmAddress, - jobId); + jobId, + Time.milliseconds(0L)); RegistrationResponse registrationResponse = registrationResponseFuture.get(); assertTrue(registrationResponse instanceof JobMasterRegistrationSuccess); - resourceManager.requestSlot(jmLeaderId, rmLeaderId, slotRequest); + resourceManager.requestSlot(jmLeaderId, rmLeaderId, slotRequest, Time.milliseconds(0L)); verify(jmGateway).offerSlots( eq(taskManagerResourceId),