[FLINK-7387] [rpc] Require RpcEndpoints to directly implement RpcGateways This commit changes the relation between RpcEndpoints and RpcGateways. From now on, the RpcEndpoints have to implement the RpcGateways they want to support instead of coupling it loosely via a type parameter. In order to obtain self gateway a new method RpcEndpoint#getSelfGateway(Class) has been introduced. This method can be used to obtain the RpcGateway type at run time to talk to the RpcEndpoint asynchronously.
All existing RpcEndpoints have been adapted to the new model. This basically means that they now return a CompletableFuture<X> instead of X. Add RpcEndpointTest This closes #4498. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d95d20eb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d95d20eb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d95d20eb Branch: refs/heads/master Commit: d95d20eb45185151d731c9b41e9b1ac22cf81334 Parents: 9f790d3 Author: Till Rohrmann <[email protected]> Authored: Tue Aug 8 14:43:47 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Fri Aug 11 13:49:44 2017 +0200 ---------------------------------------------------------------------- .../MesosResourceManagerTest.java | 6 +- .../flink/runtime/dispatcher/Dispatcher.java | 32 +- .../apache/flink/runtime/instance/SlotPool.java | 81 ++-- .../flink/runtime/instance/SlotPoolGateway.java | 3 +- .../runtime/jobmaster/JobManagerRunner.java | 2 +- .../flink/runtime/jobmaster/JobMaster.java | 212 +++++---- .../runtime/jobmaster/JobMasterGateway.java | 3 +- .../resourcemanager/ResourceManager.java | 66 +-- .../runtime/rpc/MainThreadValidatorUtil.java | 4 +- .../apache/flink/runtime/rpc/RpcEndpoint.java | 107 ++--- .../org/apache/flink/runtime/rpc/RpcMethod.java | 37 -- .../org/apache/flink/runtime/rpc/RpcServer.java | 34 ++ .../apache/flink/runtime/rpc/RpcService.java | 10 +- .../org/apache/flink/runtime/rpc/RpcUtils.java | 52 +++ .../apache/flink/runtime/rpc/SelfGateway.java | 34 -- .../runtime/rpc/akka/AkkaInvocationHandler.java | 88 +--- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 3 +- .../flink/runtime/rpc/akka/AkkaRpcService.java | 28 +- .../runtime/taskexecutor/TaskExecutor.java | 351 +++++++------- .../clusterframework/ResourceManagerTest.java | 6 +- .../runtime/dispatcher/DispatcherTest.java | 2 +- .../flink/runtime/instance/SlotPoolRpcTest.java | 2 +- .../flink/runtime/instance/SlotPoolTest.java | 34 +- .../jobmaster/JobManagerRunnerMockTest.java | 3 +- .../flink/runtime/jobmaster/JobMasterTest.java | 2 +- .../ResourceManagerJobMasterTest.java | 17 +- .../ResourceManagerTaskExecutorTest.java | 8 +- .../flink/runtime/rpc/AsyncCallsTest.java | 10 +- .../flink/runtime/rpc/RpcCompletenessTest.java | 452 ------------------- .../flink/runtime/rpc/RpcEndpointTest.java | 195 ++++++++ .../runtime/rpc/TestingSerialRpcService.java | 95 +--- .../runtime/rpc/akka/AkkaRpcActorTest.java | 29 +- .../rpc/akka/MainThreadValidationTest.java | 9 +- .../rpc/akka/MessageSerializationTest.java | 7 +- .../taskexecutor/TaskExecutorITCase.java | 8 +- .../runtime/taskexecutor/TaskExecutorTest.java | 41 +- 36 files changed, 920 insertions(+), 1153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index e63b4ab..f9e35a9 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -122,6 +122,8 @@ public class MesosResourceManagerTest extends TestLogger { private static ActorSystem system; + private static final Time timeout = Time.seconds(10L); + @Before public void setup() { system = AkkaUtils.createLocalActorSystem(flinkConfig); @@ -415,7 +417,7 @@ public class MesosResourceManagerTest extends TestLogger { */ public void registerJobMaster(MockJobMaster jobMaster) throws Exception { CompletableFuture<RegistrationResponse> registration = resourceManager.registerJobManager( - rmServices.rmLeaderSessionId, jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, jobMaster.jobID); + rmServices.rmLeaderSessionId, jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, jobMaster.jobID, timeout); assertTrue(registration.get() instanceof JobMasterRegistrationSuccess); } @@ -589,7 +591,7 @@ public class MesosResourceManagerTest extends TestLogger { // send registration message CompletableFuture<RegistrationResponse> successfulFuture = - resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, task1Executor.address, task1Executor.resourceID, slotReport); + resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, task1Executor.address, task1Executor.resourceID, slotReport, timeout); RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); assertTrue(response instanceof TaskExecutorRegistrationSuccess); http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 2eb0e36..9fc1fc4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -20,11 +20,13 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.BlobService; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; @@ -37,7 +39,6 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -47,6 +48,7 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; /** * Base class for the Dispatcher component. The Dispatcher component is responsible @@ -54,7 +56,7 @@ import java.util.Map; * the jobs and to recover them in case of a master failure. Furthermore, it knows * about the state of the Flink session cluster. */ -public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> { +public abstract class Dispatcher extends RpcEndpoint implements DispatcherGateway { public static final String DISPATCHER_NAME = "dispatcher"; @@ -131,8 +133,8 @@ public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> { // RPCs //------------------------------------------------------ - @RpcMethod - public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException { + @Override + public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) { final JobID jobId = jobGraph.getJobID(); log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName()); @@ -143,7 +145,8 @@ public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> { jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId); } catch (IOException e) { log.warn("Cannot retrieve job status for {}.", jobId, e); - throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e); + return FutureUtils.completedExceptionally( + new JobSubmissionException(jobId, "Could not retrieve the job status.", e)); } if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) { @@ -151,7 +154,8 @@ public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> { submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null)); } catch (Exception e) { log.warn("Cannot persist JobGraph.", e); - throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e); + return FutureUtils.completedExceptionally( + new JobSubmissionException(jobId, "Could not persist JobGraph.", e)); } final JobManagerRunner jobManagerRunner; @@ -180,22 +184,24 @@ public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> { e.addSuppressed(t); } - throw new JobSubmissionException(jobId, "Could not start JobManager.", e); + return FutureUtils.completedExceptionally( + new JobSubmissionException(jobId, "Could not start JobManager.", e)); } jobManagerRunners.put(jobId, jobManagerRunner); - return Acknowledge.get(); + return CompletableFuture.completedFuture(Acknowledge.get()); } else { - throw new JobSubmissionException(jobId, "Job has already been submitted and " + - "is currently in state " + jobSchedulingStatus + '.'); + return FutureUtils.completedExceptionally( + new JobSubmissionException(jobId, "Job has already been submitted and " + + "is currently in state " + jobSchedulingStatus + '.')); } } - @RpcMethod - public Collection<JobID> listJobs() { + @Override + public CompletableFuture<Collection<JobID>> listJobs(Time timeout) { // TODO: return proper list of running jobs - return jobManagerRunners.keySet(); + return CompletableFuture.completedFuture(jobManagerRunners.keySet()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 508e54f..de2b3e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; @@ -35,9 +36,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.clock.Clock; @@ -46,15 +45,19 @@ import org.apache.flink.runtime.util.clock.SystemClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -73,7 +76,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * TODO : Make pending requests location preference aware * TODO : Make pass location preferences to ResourceManager when sending a slot request */ -public class SlotPool extends RpcEndpoint<SlotPoolGateway> { +public class SlotPool extends RpcEndpoint implements SlotPoolGateway { /** The log for the pool - shared also with the internal classes */ static final Logger LOG = LoggerFactory.getLogger(SlotPool.class); @@ -154,7 +157,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { this.pendingRequests = new HashMap<>(); this.waitingForResourceManager = new HashMap<>(); - this.providerAndOwner = new ProviderAndOwner(getSelf(), slotRequestTimeout); + this.providerAndOwner = new ProviderAndOwner(getSelfGateway(SlotPoolGateway.class), slotRequestTimeout); } // ------------------------------------------------------------------------ @@ -187,12 +190,12 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { /** * Suspends this pool, meaning it has lost its authority to accept and distribute slots. */ - @RpcMethod + @Override public void suspend() { validateRunsInMainThread(); // suspend this RPC endpoint - ((StartStoppable) getSelf()).stop(); + stop(); // do not accept any requests jobManagerLeaderId = null; @@ -236,7 +239,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { // Resource Manager Connection // ------------------------------------------------------------------------ - @RpcMethod + @Override public void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway) { this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); this.resourceManagerGateway = checkNotNull(resourceManagerGateway); @@ -250,7 +253,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { waitingForResourceManager.clear(); } - @RpcMethod + @Override public void disconnectResourceManager() { this.resourceManagerLeaderId = null; this.resourceManagerGateway = null; @@ -260,16 +263,17 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { // Slot Allocation // ------------------------------------------------------------------------ - @RpcMethod + @Override public CompletableFuture<SimpleSlot> allocateSlot( ScheduledUnit task, ResourceProfile resources, - Iterable<TaskManagerLocation> locationPreferences) { + Iterable<TaskManagerLocation> locationPreferences, + Time timeout) { return internalAllocateSlot(task, resources, locationPreferences); } - @RpcMethod + @Override public void returnAllocatedSlot(Slot slot) { internalReturnAllocatedSlot(slot); } @@ -457,18 +461,39 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { return null; } - @RpcMethod - public Iterable<SlotOffer> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers) { + @Override + public CompletableFuture<Collection<SlotOffer>> offerSlots(Collection<Tuple2<AllocatedSlot, SlotOffer>> offers) { validateRunsInMainThread(); - final ArrayList<SlotOffer> result = new ArrayList<>(); - for (Tuple2<AllocatedSlot, SlotOffer> offer : offers) { - if (offerSlot(offer.f0)) { - result.add(offer.f1); + List<CompletableFuture<Optional<SlotOffer>>> acceptedSlotOffers = offers.stream().map( + offer -> { + CompletableFuture<Optional<SlotOffer>> acceptedSlotOffer = offerSlot(offer.f0).thenApply( + (acceptedSlot) -> { + if (acceptedSlot) { + return Optional.of(offer.f1); + } else { + return Optional.empty(); + } + }); + + return acceptedSlotOffer; } - } + ).collect(Collectors.toList()); + + CompletableFuture<Collection<Optional<SlotOffer>>> optionalSlotOffers = FutureUtils.combineAll(acceptedSlotOffers); + + CompletableFuture<Collection<SlotOffer>> resultingSlotOffers = optionalSlotOffers.thenApply( + collection -> { + Collection<SlotOffer> slotOffers = collection + .stream() + .flatMap( + opt -> opt.map(Stream::of).orElseGet(Stream::empty)) + .collect(Collectors.toList()); - return result.isEmpty() ? Collections.<SlotOffer>emptyList() : result; + return slotOffers; + }); + + return resultingSlotOffers; } /** @@ -480,8 +505,8 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { * @param slot The offered slot * @return True if we accept the offering */ - @RpcMethod - public boolean offerSlot(final AllocatedSlot slot) { + @Override + public CompletableFuture<Boolean> offerSlot(final AllocatedSlot slot) { validateRunsInMainThread(); // check if this TaskManager is valid @@ -491,7 +516,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { if (!registeredTaskManagers.contains(resourceID)) { LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}", slot.getSlotAllocationId(), slot); - return false; + return CompletableFuture.completedFuture(false); } // check whether we have already using this slot @@ -500,7 +525,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { // return true here so that the sender will get a positive acknowledgement to the retry // and mark the offering as a success - return true; + return CompletableFuture.completedFuture(true); } // check whether we have request waiting for this slot @@ -520,7 +545,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { // we accepted the request in any case. slot will be released after it idled for // too long and timed out - return true; + return CompletableFuture.completedFuture(true); } @@ -541,7 +566,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { * @param allocationID Represents the allocation which should be failed * @param cause The cause of the failure */ - @RpcMethod + @Override public void failAllocation(final AllocationID allocationID, final Exception cause) { final PendingRequest pendingRequest = pendingRequests.remove(allocationID); if (pendingRequest != null) { @@ -576,7 +601,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { * * @param resourceID The id of the TaskManager */ - @RpcMethod + @Override public void registerTaskManager(final ResourceID resourceID) { registeredTaskManagers.add(resourceID); } @@ -587,7 +612,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { * * @param resourceID The id of the TaskManager */ - @RpcMethod + @Override public void releaseTaskManager(final ResourceID resourceID) { if (registeredTaskManagers.remove(resourceID)) { availableSlots.removeAllForTaskManager(resourceID); http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java index 43f407a..8d4f2a5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -77,7 +78,7 @@ public interface SlotPoolGateway extends RpcGateway { CompletableFuture<Boolean> offerSlot(AllocatedSlot slot); - CompletableFuture<Iterable<SlotOffer>> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers); + CompletableFuture<Collection<SlotOffer>> offerSlots(Collection<Tuple2<AllocatedSlot, SlotOffer>> offers); void failAllocation(AllocationID allocationID, Exception cause); http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index d557f6b..5838cf2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -433,7 +433,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F log.info("JobManager for job {} ({}) was revoked leadership at {}.", jobGraph.getName(), jobGraph.getJobID(), getAddress()); - jobManager.getSelf().suspendExecution(new Exception("JobManager is no longer the leader.")); + jobManager.getSelfGateway(JobMasterGateway.class).suspendExecution(new Exception("JobManager is no longer the leader.")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index cdae89f..31036f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -29,13 +29,13 @@ import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; -import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.executiongraph.Execution; @@ -82,9 +82,7 @@ import org.apache.flink.runtime.registration.RetryingRegistration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; @@ -93,12 +91,14 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -122,7 +122,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * given task</li> * </ul> */ -public class JobMaster extends RpcEndpoint<JobMasterGateway> { +public class JobMaster extends RpcEndpoint implements JobMasterGateway { /** Default names for Flink's distributed components */ public static final String JOB_MANAGER_NAME = "jobmanager"; @@ -133,6 +133,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { // ------------------------------------------------------------------------ + private final JobMasterGateway selfGateway; + private final ResourceID resourceId; /** Logical representation of the job */ @@ -211,6 +213,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME)); + selfGateway = getSelfGateway(JobMasterGateway.class); + this.resourceId = checkNotNull(resourceId); this.jobGraph = checkNotNull(jobGraph); this.configuration = checkNotNull(configuration); @@ -224,7 +228,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( resourceId, - new TaskManagerHeartbeatListener(), + new TaskManagerHeartbeatListener(selfGateway), rpcService.getScheduledExecutor(), log); @@ -263,7 +267,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); this.slotPool = new SlotPool(rpcService, jobGraph.getJobID()); - this.slotPoolGateway = slotPool.getSelf(); + this.slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); this.executionGraph = ExecutionGraphBuilder.buildGraph( null, @@ -307,7 +311,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { super.start(); log.info("JobManager started as leader {} for job {}", leaderSessionID, jobGraph.getJobID()); - getSelf().startJobExecution(); + selfGateway.startJobExecution(); } else { log.warn("Job already started with leader ID {}, ignoring this start request.", leaderSessionID); @@ -334,7 +338,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { //-- job starting and stopping ----------------------------------------------------------------- - @RpcMethod + @Override public void startJobExecution() { // double check that the leader status did not change if (leaderSessionID == null) { @@ -393,7 +397,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { * * @param cause The reason of why this job been suspended. */ - @RpcMethod + @Override public void suspendExecution(final Throwable cause) { if (leaderSessionID == null) { log.debug("Job has already been suspended or shutdown."); @@ -413,7 +417,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { executionGraph.suspend(cause); // receive no more messages until started again, should be called before we clear self leader id - ((StartStoppable) getSelf()).stop(); + stop(); // the slot pool stops receiving messages and clears its pooled slots slotPoolGateway.suspend(); @@ -430,29 +434,39 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { * @param taskExecutionState New task execution state for a given task * @return Acknowledge the task execution state update */ - @RpcMethod - public Acknowledge updateTaskExecutionState( + @Override + public CompletableFuture<Acknowledge> updateTaskExecutionState( final UUID leaderSessionID, - final TaskExecutionState taskExecutionState) throws Exception + final TaskExecutionState taskExecutionState) { checkNotNull(taskExecutionState, "taskExecutionState"); - validateLeaderSessionId(leaderSessionID); + + try { + validateLeaderSessionId(leaderSessionID); + } catch (LeaderIdMismatchException e) { + return FutureUtils.completedExceptionally(e); + } if (executionGraph.updateState(taskExecutionState)) { - return Acknowledge.get(); + return CompletableFuture.completedFuture(Acknowledge.get()); } else { - throw new ExecutionGraphException("The execution attempt " + - taskExecutionState.getID() + " was not found."); + return FutureUtils.completedExceptionally( + new ExecutionGraphException("The execution attempt " + + taskExecutionState.getID() + " was not found.")); } } - @RpcMethod - public SerializedInputSplit requestNextInputSplit( + @Override + public CompletableFuture<SerializedInputSplit> requestNextInputSplit( final UUID leaderSessionID, final JobVertexID vertexID, - final ExecutionAttemptID executionAttempt) throws Exception - { - validateLeaderSessionId(leaderSessionID); + final ExecutionAttemptID executionAttempt) { + + try { + validateLeaderSessionId(leaderSessionID); + } catch (LeaderIdMismatchException e) { + return FutureUtils.completedExceptionally(e); + } final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); if (execution == null) { @@ -462,19 +476,19 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { log.debug("Can not find Execution for attempt {}.", executionAttempt); } // but we should TaskManager be aware of this - throw new Exception("Can not find Execution for attempt " + executionAttempt); + return FutureUtils.completedExceptionally(new Exception("Can not find Execution for attempt " + executionAttempt)); } final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); if (vertex == null) { log.error("Cannot find execution vertex for vertex ID {}.", vertexID); - throw new Exception("Cannot find execution vertex for vertex ID " + vertexID); + return FutureUtils.completedExceptionally(new Exception("Cannot find execution vertex for vertex ID " + vertexID)); } final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); if (splitAssigner == null) { log.error("No InputSplitAssigner for vertex ID {}.", vertexID); - throw new Exception("No InputSplitAssigner for vertex ID " + vertexID); + return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex ID " + vertexID)); } final Slot slot = execution.getAssignedResource(); @@ -488,27 +502,31 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { try { final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); - return new SerializedInputSplit(serializedInputSplit); + return CompletableFuture.completedFuture(new SerializedInputSplit(serializedInputSplit)); } catch (Exception ex) { log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); IOException reason = new IOException("Could not serialize the next input split of class " + nextInputSplit.getClass() + ".", ex); vertex.fail(reason); - throw reason; + return FutureUtils.completedExceptionally(reason); } } - @RpcMethod - public ExecutionState requestPartitionState( + @Override + public CompletableFuture<ExecutionState> requestPartitionState( final UUID leaderSessionID, final IntermediateDataSetID intermediateResultId, - final ResultPartitionID resultPartitionId) throws Exception { + final ResultPartitionID resultPartitionId) { - validateLeaderSessionId(leaderSessionID); + try { + validateLeaderSessionId(leaderSessionID); + } catch (LeaderIdMismatchException e) { + return FutureUtils.completedExceptionally(e); + } final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId()); if (execution != null) { - return execution.getState(); + return CompletableFuture.completedFuture(execution.getState()); } else { final IntermediateResult intermediateResult = @@ -522,29 +540,33 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { .getCurrentExecutionAttempt(); if (producerExecution.getAttemptId() == resultPartitionId.getProducerId()) { - return producerExecution.getState(); + return CompletableFuture.completedFuture(producerExecution.getState()); } else { - throw new PartitionProducerDisposedException(resultPartitionId); + return FutureUtils.completedExceptionally(new PartitionProducerDisposedException(resultPartitionId)); } } else { - throw new IllegalArgumentException("Intermediate data set with ID " - + intermediateResultId + " not found."); + return FutureUtils.completedExceptionally(new IllegalArgumentException("Intermediate data set with ID " + + intermediateResultId + " not found.")); } } } - @RpcMethod - public Acknowledge scheduleOrUpdateConsumers( + @Override + public CompletableFuture<Acknowledge> scheduleOrUpdateConsumers( final UUID leaderSessionID, - final ResultPartitionID partitionID) throws Exception - { - validateLeaderSessionId(leaderSessionID); + final ResultPartitionID partitionID, + final Time timeout) { + try { + validateLeaderSessionId(leaderSessionID); - executionGraph.scheduleOrUpdateConsumers(partitionID); - return Acknowledge.get(); + executionGraph.scheduleOrUpdateConsumers(partitionID); + return CompletableFuture.completedFuture(Acknowledge.get()); + } catch (Exception e) { + return FutureUtils.completedExceptionally(e); + } } - @RpcMethod + @Override public void disconnectTaskManager(final ResourceID resourceID, final Exception cause) { taskManagerHeartbeatManager.unmonitorTarget(resourceID); slotPoolGateway.releaseTaskManager(resourceID); @@ -558,13 +580,13 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } // TODO: This method needs a leader session ID - @RpcMethod + @Override public void acknowledgeCheckpoint( final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, - final SubtaskState checkpointState) throws CheckpointException { + final SubtaskState checkpointState) { final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); final AcknowledgeCheckpoint ackMessage = @@ -588,7 +610,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } // TODO: This method needs a leader session ID - @RpcMethod + @Override public void declineCheckpoint( final JobID jobID, final ExecutionAttemptID executionAttemptID, @@ -616,8 +638,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } } - @RpcMethod - public KvStateLocation lookupKvStateLocation(final String registrationName) throws Exception { + @Override + public CompletableFuture<KvStateLocation> lookupKvStateLocation(final String registrationName) { if (log.isDebugEnabled()) { log.debug("Lookup key-value state for job {} with registration " + "name {}.", jobGraph.getJobID(), registrationName); @@ -626,13 +648,13 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry(); final KvStateLocation location = registry.getKvStateLocation(registrationName); if (location != null) { - return location; + return CompletableFuture.completedFuture(location); } else { - throw new UnknownKvStateLocation(registrationName); + return FutureUtils.completedExceptionally(new UnknownKvStateLocation(registrationName)); } } - @RpcMethod + @Override public void notifyKvStateRegistered( final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, @@ -653,7 +675,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } } - @RpcMethod + @Override public void notifyKvStateUnregistered( JobVertexID jobVertexId, KeyGroupRange keyGroupRange, @@ -672,25 +694,31 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } } - @RpcMethod - public ClassloadingProps requestClassloadingProps() throws Exception { - return new ClassloadingProps(libraryCacheManager.getBlobServerPort(), + @Override + public CompletableFuture<ClassloadingProps> requestClassloadingProps() { + return CompletableFuture.completedFuture( + new ClassloadingProps(libraryCacheManager.getBlobServerPort(), executionGraph.getRequiredJarFiles(), - executionGraph.getRequiredClasspaths()); + executionGraph.getRequiredClasspaths())); } - @RpcMethod - public CompletableFuture<Iterable<SlotOffer>> offerSlots( + @Override + public CompletableFuture<Collection<SlotOffer>> offerSlots( final ResourceID taskManagerId, final Iterable<SlotOffer> slots, - final UUID leaderId) throws Exception { + final UUID leaderId, + final Time timeout) { - validateLeaderSessionId(leaderId); + try { + validateLeaderSessionId(leaderId); + } catch (LeaderIdMismatchException e) { + return FutureUtils.completedExceptionally(e); + } Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId); if (taskManager == null) { - throw new Exception("Unknown TaskManager " + taskManagerId); + return FutureUtils.completedExceptionally(new Exception("Unknown TaskManager " + taskManagerId)); } final JobID jid = jobGraph.getJobID(); @@ -716,34 +744,40 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { return slotPoolGateway.offerSlots(slotsAndOffers); } - @RpcMethod - public void failSlot(final ResourceID taskManagerId, + @Override + public void failSlot( + final ResourceID taskManagerId, final AllocationID allocationId, final UUID leaderId, - final Exception cause) throws Exception - { - validateLeaderSessionId(leaderSessionID); + final Exception cause) { - if (!registeredTaskManagers.containsKey(taskManagerId)) { - throw new Exception("Unknown TaskManager " + taskManagerId); + try { + validateLeaderSessionId(leaderSessionID); + } catch (LeaderIdMismatchException e) { + log.warn("Cannot fail slot " + allocationId + '.', e); } - slotPoolGateway.failAllocation(allocationId, cause); + if (registeredTaskManagers.containsKey(taskManagerId)) { + slotPoolGateway.failAllocation(allocationId, cause); + } else { + log.warn("Cannot fail slot " + allocationId + " because the TaskManager " + + taskManagerId + " is unknown."); + } } - @RpcMethod + @Override public CompletableFuture<RegistrationResponse> registerTaskManager( final String taskManagerRpcAddress, final TaskManagerLocation taskManagerLocation, - final UUID leaderId) throws Exception - { - if (!JobMaster.this.leaderSessionID.equals(leaderId)) { + final UUID leaderId, + final Time timeout) { + if (!Objects.equals(leaderSessionID, leaderId)) { log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " + "leader session ID {} did not equal the received leader session ID {}.", - taskManagerLocation.getResourceID(), taskManagerRpcAddress, - JobMaster.this.leaderSessionID, leaderId); - throw new Exception("Leader id not match, expected: " + JobMaster.this.leaderSessionID - + ", actual: " + leaderId); + taskManagerLocation.getResourceID(), taskManagerRpcAddress, leaderSessionID, leaderId); + return FutureUtils.completedExceptionally( + new Exception("Leader id not match, expected: " + + leaderSessionID + ", actual: " + leaderId)); } final ResourceID taskManagerId = taskManagerLocation.getResourceID(); @@ -790,13 +824,17 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } } - @RpcMethod + @Override public void disconnectResourceManager( final UUID jobManagerLeaderId, final UUID resourceManagerLeaderId, - final Exception cause) throws Exception { + final Exception cause) { - validateLeaderSessionId(jobManagerLeaderId); + try { + validateLeaderSessionId(jobManagerLeaderId); + } catch (LeaderIdMismatchException e) { + log.warn("Cannot disconnect resource manager " + resourceManagerLeaderId + '.', e); + } if (resourceManagerConnection != null && resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) { @@ -804,12 +842,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } } - @RpcMethod + @Override public void heartbeatFromTaskManager(final ResourceID resourceID) { taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null); } - @RpcMethod + @Override public void heartbeatFromResourceManager(final ResourceID resourceID) { resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null); } @@ -1100,11 +1138,17 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> { + private final JobMasterGateway jobMasterGateway; + + private TaskManagerHeartbeatListener(JobMasterGateway jobMasterGateway) { + this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); + } + @Override public void notifyHeartbeatTimeout(ResourceID resourceID) { log.info("Heartbeat of TaskManager with id {} timed out.", resourceID); - getSelf().disconnectTaskManager( + jobMasterGateway.disconnectTaskManager( resourceID, new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out.")); } http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index e3611a3..b396cd6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -186,7 +187,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * @param timeout for the rpc call * @return Future set of accepted slots. */ - CompletableFuture<Iterable<SlotOffer>> offerSlots( + CompletableFuture<Collection<SlotOffer>> offerSlots( final ResourceID taskManagerId, final Iterable<SlotOffer> slots, final UUID leaderId, http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 8318c09..c2b0590 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.messages.InfoMessage; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -44,9 +45,9 @@ import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistrat import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.jobmaster.JobMasterGateway; @@ -77,13 +78,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * It offers the following methods as part of its rpc interface to interact with him remotely: * <ul> - * <li>{@link #registerJobManager(UUID, UUID, ResourceID, String, JobID)} registers a {@link JobMaster} at the resource manager</li> - * <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li> + * <li>{@link #registerJobManager(UUID, UUID, ResourceID, String, JobID, Time)} registers a {@link JobMaster} at the resource manager</li> + * <li>{@link #requestSlot(UUID, UUID, SlotRequest, Time)} requests a slot from the resource manager</li> * </ul> */ public abstract class ResourceManager<WorkerType extends Serializable> - extends RpcEndpoint<ResourceManagerGateway> - implements LeaderContender { + extends RpcEndpoint + implements ResourceManagerGateway, LeaderContender { public static final String RESOURCE_MANAGER_NAME = "resourcemanager"; @@ -244,13 +245,14 @@ public abstract class ResourceManager<WorkerType extends Serializable> // RPC methods // ------------------------------------------------------------------------ - @RpcMethod + @Override public CompletableFuture<RegistrationResponse> registerJobManager( final UUID resourceManagerLeaderId, final UUID jobManagerLeaderId, final ResourceID jobManagerResourceId, final String jobManagerAddress, - final JobID jobId) { + final JobID jobId, + final Time timeout) { checkNotNull(resourceManagerLeaderId); checkNotNull(jobManagerLeaderId); @@ -352,12 +354,13 @@ public abstract class ResourceManager<WorkerType extends Serializable> * * @return The response by the ResourceManager. */ - @RpcMethod + @Override public CompletableFuture<RegistrationResponse> registerTaskExecutor( final UUID resourceManagerLeaderId, final String taskExecutorAddress, final ResourceID taskExecutorResourceId, - final SlotReport slotReport) { + final SlotReport slotReport, + final Time timeout) { if (Objects.equals(leaderSessionId, resourceManagerLeaderId)) { CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); @@ -387,22 +390,22 @@ public abstract class ResourceManager<WorkerType extends Serializable> } } - @RpcMethod + @Override public void heartbeatFromTaskManager(final ResourceID resourceID, final SlotReport slotReport) { taskManagerHeartbeatManager.receiveHeartbeat(resourceID, slotReport); } - @RpcMethod + @Override public void heartbeatFromJobManager(final ResourceID resourceID) { jobManagerHeartbeatManager.receiveHeartbeat(resourceID, null); } - @RpcMethod + @Override public void disconnectTaskManager(final ResourceID resourceId, final Exception cause) { closeTaskManagerConnection(resourceId, cause); } - @RpcMethod + @Override public void disconnectJobManager(final JobID jobId, final Exception cause) { closeJobManagerConnection(jobId, cause); } @@ -413,14 +416,15 @@ public abstract class ResourceManager<WorkerType extends Serializable> * @param slotRequest Slot request * @return Slot assignment */ - @RpcMethod - public Acknowledge requestSlot( + @Override + public CompletableFuture<Acknowledge> requestSlot( UUID jobMasterLeaderID, UUID resourceManagerLeaderID, - SlotRequest slotRequest) throws ResourceManagerException, LeaderSessionIDException { + SlotRequest slotRequest, + final Time timeout) { if (!Objects.equals(resourceManagerLeaderID, leaderSessionId)) { - throw new LeaderSessionIDException(resourceManagerLeaderID, leaderSessionId); + return FutureUtils.completedExceptionally(new LeaderSessionIDException(resourceManagerLeaderID, leaderSessionId)); } JobID jobId = slotRequest.getJobId(); @@ -433,15 +437,19 @@ public abstract class ResourceManager<WorkerType extends Serializable> slotRequest.getJobId(), slotRequest.getAllocationId()); - slotManager.registerSlotRequest(slotRequest); + try { + slotManager.registerSlotRequest(slotRequest); + } catch (SlotManagerException e) { + return FutureUtils.completedExceptionally(e); + } - return Acknowledge.get(); + return CompletableFuture.completedFuture(Acknowledge.get()); } else { - throw new LeaderSessionIDException(jobMasterLeaderID, jobManagerRegistration.getLeaderID()); + return FutureUtils.completedExceptionally(new LeaderSessionIDException(jobMasterLeaderID, jobManagerRegistration.getLeaderID())); } } else { - throw new ResourceManagerException("Could not find registered job manager for job " + jobId + '.'); + return FutureUtils.completedExceptionally(new ResourceManagerException("Could not find registered job manager for job " + jobId + '.')); } } @@ -451,7 +459,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> * @param instanceID TaskExecutor's instance id * @param slotId The slot id of the available slot */ - @RpcMethod + @Override public void notifySlotAvailable( final UUID resourceManagerLeaderId, final InstanceID instanceID, @@ -487,7 +495,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> * * @param address address of infoMessage listener to register to this resource manager */ - @RpcMethod + @Override public void registerInfoMessageListener(final String address) { if(infoMessageListeners.containsKey(address)) { log.warn("Receive a duplicate registration from info message listener on ({})", address); @@ -514,7 +522,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> * @param address of the info message listener to unregister from this resource manager * */ - @RpcMethod + @Override public void unRegisterInfoMessageListener(final String address) { infoMessageListeners.remove(address); } @@ -525,7 +533,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> * @param finalStatus of the Flink application * @param optionalDiagnostics for the Flink application */ - @RpcMethod + @Override public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) { log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, optionalDiagnostics); @@ -536,13 +544,13 @@ public abstract class ResourceManager<WorkerType extends Serializable> } } - @RpcMethod - public Integer getNumberOfRegisteredTaskManagers(UUID requestLeaderSessionId) throws LeaderIdMismatchException { + @Override + public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers(UUID requestLeaderSessionId) { if (Objects.equals(leaderSessionId, requestLeaderSessionId)) { - return taskExecutors.size(); + return CompletableFuture.completedFuture(taskExecutors.size()); } else { - throw new LeaderIdMismatchException(leaderSessionId, requestLeaderSessionId); + return FutureUtils.completedExceptionally(new LeaderIdMismatchException(leaderSessionId, requestLeaderSessionId)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java index b3fea77..8a0e5f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java @@ -29,9 +29,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public final class MainThreadValidatorUtil { - private final RpcEndpoint<?> endpoint; + private final RpcEndpoint endpoint; - public MainThreadValidatorUtil(RpcEndpoint<?> endpoint) { + public MainThreadValidatorUtil(RpcEndpoint endpoint) { this.endpoint = checkNotNull(endpoint); } http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index b5bbc2b..980ae48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.api.common.time.Time; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.ReflectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +29,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -51,11 +51,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * of Erlang or Akka. * * <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Time)} - * and the {@link #getMainThreadExecutor()} to execute code in the RPC endpoint's main thread. - * - * @param <C> The RPC gateway counterpart for the implementing RPC endpoint + * and the {@link #getMainThreadExecutor()} to execute code in the RPC endpoint's main thread. */ -public abstract class RpcEndpoint<C extends RpcGateway> { +public abstract class RpcEndpoint implements RpcGateway { protected final Logger log = LoggerFactory.getLogger(getClass()); @@ -67,11 +65,8 @@ public abstract class RpcEndpoint<C extends RpcGateway> { /** Unique identifier for this rpc endpoint */ private final String endpointId; - /** Class of the self gateway */ - private final Class<C> selfGatewayType; - - /** Self gateway which can be used to schedule asynchronous calls on yourself */ - private final C self; + /** Interface to access the underlying rpc server */ + private final RpcServer rpcServer; /** The main thread executor to be used to execute future callbacks in the main thread * of the executing rpc server. */ @@ -90,12 +85,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> { this.rpcService = checkNotNull(rpcService, "rpcService"); this.endpointId = checkNotNull(endpointId, "endpointId"); - // IMPORTANT: Don't change order of selfGatewayType and self because rpcService.startServer - // requires that selfGatewayType has been initialized - this.selfGatewayType = determineSelfGatewayType(); - this.self = rpcService.startServer(this); + this.rpcServer = rpcService.startServer(this); - this.mainThreadExecutor = new MainThreadExecutor((MainThreadExecutable) self); + this.mainThreadExecutor = new MainThreadExecutor(rpcServer); } /** @@ -108,15 +100,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> { } /** - * Returns the class of the self gateway type. - * - * @return Class of the self gateway type - */ - public final Class<C> getSelfGatewayType() { - return selfGatewayType; - } - - /** * Returns the rpc endpoint's identifier. * * @return Rpc endpoint's identifier. @@ -139,7 +122,15 @@ public abstract class RpcEndpoint<C extends RpcGateway> { * @throws Exception indicating that something went wrong while starting the RPC endpoint */ public void start() throws Exception { - ((StartStoppable) self).start(); + rpcServer.start(); + } + + /** + * Stops the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is + * no longer ready to process remote procedure calls. + */ + protected final void stop() { + rpcServer.stop(); } /** @@ -161,7 +152,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> { * via {@link #getTerminationFuture()}} and wait on its completion. */ public final void shutDown() { - rpcService.stopServer(self); + rpcService.stopServer(rpcServer); } // ------------------------------------------------------------------------ @@ -169,15 +160,25 @@ public abstract class RpcEndpoint<C extends RpcGateway> { // ------------------------------------------------------------------------ /** - * Get self-gateway which should be used to run asynchronous RPC calls on this endpoint. + * Returns a self gateway of the specified type which can be used to issue asynchronous + * calls against the RpcEndpoint. * - * <p><b>IMPORTANT</b>: Always issue local method calls via the self-gateway if the current thread - * is not the main thread of the underlying rpc server, e.g. from within a future callback. + * <p>IMPORTANT: The self gateway type must be implemented by the RpcEndpoint. Otherwise + * the method will fail. * - * @return The self gateway + * @param selfGatewayType class of the self gateway type + * @param <C> type of the self gateway to create + * @return Self gateway of the specified type which can be used to issue asynchronous rpcs */ - public C getSelf() { - return self; + public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType) { + if (selfGatewayType.isInstance(rpcServer)) { + @SuppressWarnings("unchecked") + C selfGateway = ((C) rpcServer); + + return selfGateway; + } else { + throw new RuntimeException("RpcEndpoint does not implement the RpcGateway interface of type " + selfGatewayType + '.'); + } } /** @@ -186,8 +187,19 @@ public abstract class RpcEndpoint<C extends RpcGateway> { * * @return Fully qualified address of the underlying RPC endpoint */ + @Override public String getAddress() { - return self.getAddress(); + return rpcServer.getAddress(); + } + + /** + * Gets the hostname of the underlying RPC endpoint. + * + * @return Hostname on which the RPC endpoint is running + */ + @Override + public String getHostname() { + return rpcServer.getHostname(); } /** @@ -215,7 +227,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> { * @return Future which is completed when the rpc endpoint has been terminated. */ public CompletableFuture<Void> getTerminationFuture() { - return ((SelfGateway)self).getTerminationFuture(); + return rpcServer.getTerminationFuture(); } // ------------------------------------------------------------------------ @@ -229,7 +241,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> { * @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint */ protected void runAsync(Runnable runnable) { - ((MainThreadExecutable) self).runAsync(runnable); + rpcServer.runAsync(runnable); } /** @@ -251,13 +263,13 @@ public abstract class RpcEndpoint<C extends RpcGateway> { * @param delay The delay after which the runnable will be executed */ protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) { - ((MainThreadExecutable) self).scheduleRunAsync(runnable, unit.toMillis(delay)); + rpcServer.scheduleRunAsync(runnable, unit.toMillis(delay)); } /** * Execute the callable in the main thread of the underlying RPC service, returning a future for * the result of the callable. If the callable is not completed within the given timeout, then - * the future will be failed with a {@link java.util.concurrent.TimeoutException}. + * the future will be failed with a {@link TimeoutException}. * * @param callable Callable to be executed in the main thread of the underlying rpc server * @param timeout Timeout for the callable to be completed @@ -265,7 +277,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> { * @return Future for the result of the callable. */ protected <V> CompletableFuture<V> callAsync(Callable<V> callable, Time timeout) { - return ((MainThreadExecutable) self).callAsync(callable, timeout); + return rpcServer.callAsync(callable, timeout); } // ------------------------------------------------------------------------ @@ -311,23 +323,4 @@ public abstract class RpcEndpoint<C extends RpcGateway> { gateway.runAsync(runnable); } } - - /** - * Determines the self gateway type specified in one of the subclasses which extend this class. - * May traverse multiple class hierarchies until a Gateway type is found as a first type argument. - * @return Class<C> The determined self gateway type - */ - private Class<C> determineSelfGatewayType() { - - // determine self gateway type - Class<?> c = getClass(); - Class<C> determinedSelfGatewayType; - do { - determinedSelfGatewayType = ReflectionUtil.getTemplateType1(c); - // check if super class contains self gateway type in next loop - c = c.getSuperclass(); - } while (!RpcGateway.class.isAssignableFrom(determinedSelfGatewayType)); - - return determinedSelfGatewayType; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java deleted file mode 100644 index e4b0e94..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Inherited; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * Annotation for rpc method in a {@link RpcEndpoint} implementation. Every rpc method must have a - * respective counterpart in the {@link RpcGateway} implementation for this rpc server. The - * RpcCompletenessTest makes sure that the set of rpc methods in a rpc server and the set of - * gateway methods in the corresponding gateway implementation are identical. - */ -@Inherited -@Target(ElementType.METHOD) -@Retention(RetentionPolicy.RUNTIME) -public @interface RpcMethod { -} http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java new file mode 100644 index 0000000..9c645cc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import java.util.concurrent.CompletableFuture; + +/** + * Interface for self gateways + */ +public interface RpcServer extends StartStoppable, MainThreadExecutable, RpcGateway { + + /** + * Return a future which is completed when the rpc endpoint has been terminated. + * + * @return Future indicating when the rpc endpoint has been terminated + */ + CompletableFuture<Void> getTerminationFuture(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index a92f3e2..3b5a5e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -66,20 +66,18 @@ public interface RpcService { /** * Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint. * - * @param rpcEndpoint Rpc protocl to dispath the rpcs to - * @param <S> Type of the rpc endpoint - * @param <C> Type of the self rpc gateway associated with the rpc server + * @param rpcEndpoint Rpc protocol to dispatch the rpcs to + * @param <C> Type of the rpc endpoint * @return Self gateway to dispatch remote procedure calls to oneself */ - <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint); + <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint); /** * Stop the underlying rpc server of the provided self gateway. * * @param selfGateway Self gateway describing the underlying rpc server - * @param <C> Type of the rpc gateway */ - <C extends RpcGateway> void stopServer(C selfGateway); + void stopServer(RpcServer selfGateway); /** * Stop the rpc service shutting down all started rpc servers. http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java new file mode 100644 index 0000000..9738970 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import java.util.HashSet; +import java.util.Set; + +/** + * Utility functions for Flink's RPC implementation + */ +public class RpcUtils { + /** + * Extracts all {@link RpcGateway} interfaces implemented by the given clazz. + * + * @param clazz from which to extract the implemented RpcGateway interfaces + * @return A set of all implemented RpcGateway interfaces + */ + public static Set<Class<? extends RpcGateway>> extractImplementedRpcGateways(Class<?> clazz) { + HashSet<Class<? extends RpcGateway>> interfaces = new HashSet<>(); + + while (clazz != null) { + for (Class<?> interfaze : clazz.getInterfaces()) { + if (RpcGateway.class.isAssignableFrom(interfaze)) { + interfaces.add((Class<? extends RpcGateway>)interfaze); + } + } + + clazz = clazz.getSuperclass(); + } + + return interfaces; + } + + // We don't want this class to be instantiable + private RpcUtils() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java deleted file mode 100644 index d39b1ef..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc; - -import java.util.concurrent.CompletableFuture; - -/** - * Interface for self gateways - */ -public interface SelfGateway { - - /** - * Return a future which is completed when the rpc endpoint has been terminated. - * - * @return Future indicating when the rpc endpoint has been terminated - */ - CompletableFuture<Void> getTerminationFuture(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index ae6b832..0521f2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -21,10 +21,9 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorRef; import akka.pattern.Patterns; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.MainThreadExecutable; -import org.apache.flink.runtime.rpc.SelfGateway; +import org.apache.flink.runtime.rpc.RpcServer; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.StartStoppable; @@ -38,11 +37,12 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; -import java.util.BitSet; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -55,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is * executed. */ -class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable, SelfGateway { +class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer { private static final Logger LOG = LoggerFactory.getLogger(AkkaInvocationHandler.class); /** @@ -88,7 +88,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea ActorRef rpcEndpoint, Time timeout, long maximumFramesize, - CompletableFuture<Void> terminationFuture) { + @Nullable CompletableFuture<Void> terminationFuture) { this.address = Preconditions.checkNotNull(address); this.hostname = Preconditions.checkNotNull(hostname); @@ -105,9 +105,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea Object result; - if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutable.class) || - declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || - declaringClass.equals(RpcGateway.class) || declaringClass.equals(SelfGateway.class)) { + if (declaringClass.equals(AkkaGateway.class) || + declaringClass.equals(Object.class) || + declaringClass.equals(RpcGateway.class) || + declaringClass.equals(StartStoppable.class) || + declaringClass.equals(MainThreadExecutable.class) || + declaringClass.equals(RpcServer.class)) { result = method.invoke(this, args); } else { String methodName = method.getName(); @@ -115,24 +118,19 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea Annotation[][] parameterAnnotations = method.getParameterAnnotations(); Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); - Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments( - parameterTypes, - parameterAnnotations, - args); - RpcInvocation rpcInvocation; if (isLocal) { rpcInvocation = new LocalRpcInvocation( methodName, - filteredArguments.f0, - filteredArguments.f1); + parameterTypes, + args); } else { try { RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation( methodName, - filteredArguments.f0, - filteredArguments.f1); + parameterTypes, + args); if (remoteRpcInvocation.getSize() > maximumFramesize) { throw new IOException("The rpc invocation size exceeds the maximum akka framesize."); @@ -249,62 +247,6 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea } /** - * Removes all {@link RpcTimeout} annotated parameters from the parameter type and argument - * list. - * - * @param parameterTypes Array of parameter types - * @param parameterAnnotations Array of parameter annotations - * @param args Arary of arguments - * @return Tuple of filtered parameter types and arguments which no longer contain the - * {@link RpcTimeout} annotated parameter types and arguments - */ - private static Tuple2<Class<?>[], Object[]> filterArguments( - Class<?>[] parameterTypes, - Annotation[][] parameterAnnotations, - Object[] args) { - - Class<?>[] filteredParameterTypes; - Object[] filteredArgs; - - if (args == null) { - filteredParameterTypes = parameterTypes; - filteredArgs = null; - } else { - Preconditions.checkArgument(parameterTypes.length == parameterAnnotations.length); - Preconditions.checkArgument(parameterAnnotations.length == args.length); - - BitSet isRpcTimeoutParameter = new BitSet(parameterTypes.length); - int numberRpcParameters = parameterTypes.length; - - for (int i = 0; i < parameterTypes.length; i++) { - if (isRpcTimeout(parameterAnnotations[i])) { - isRpcTimeoutParameter.set(i); - numberRpcParameters--; - } - } - - if (numberRpcParameters == parameterTypes.length) { - filteredParameterTypes = parameterTypes; - filteredArgs = args; - } else { - filteredParameterTypes = new Class<?>[numberRpcParameters]; - filteredArgs = new Object[numberRpcParameters]; - int counter = 0; - - for (int i = 0; i < parameterTypes.length; i++) { - if (!isRpcTimeoutParameter.get(i)) { - filteredParameterTypes[counter] = parameterTypes[i]; - filteredArgs[counter] = args[i]; - counter++; - } - } - } - } - - return Tuple2.of(filteredParameterTypes, filteredArgs); - } - - /** * Checks whether any of the annotations is of type {@link RpcTimeout} * * @param annotations Array of annotations http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 5845473..f557447 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -66,10 +66,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * stops processing messages. All messages which arrive when the processing is stopped, will be * discarded. * - * @param <C> Type of the {@link RpcGateway} associated with the {@link RpcEndpoint} * @param <T> Type of the {@link RpcEndpoint} */ -class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends UntypedActor { +class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActor.class); http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 80267f9..ab851f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -33,18 +33,18 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; -import org.apache.flink.runtime.rpc.MainThreadExecutable; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.SelfGateway; -import org.apache.flink.runtime.rpc.StartStoppable; +import org.apache.flink.runtime.rpc.RpcServer; +import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.akka.messages.Shutdown; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; +import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import javax.annotation.Nonnull; @@ -188,7 +188,7 @@ public class AkkaRpcService implements RpcService { } @Override - public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) { + public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); @@ -225,22 +225,22 @@ public class AkkaRpcService implements RpcService { // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader(); + Set<Class<? extends RpcGateway>> implementedRpcGateways = RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()); + + implementedRpcGateways.add(RpcServer.class); + implementedRpcGateways.add(AkkaGateway.class); + @SuppressWarnings("unchecked") - C self = (C) Proxy.newProxyInstance( + RpcServer server = (RpcServer) Proxy.newProxyInstance( classLoader, - new Class<?>[]{ - rpcEndpoint.getSelfGatewayType(), - SelfGateway.class, - MainThreadExecutable.class, - StartStoppable.class, - AkkaGateway.class}, + implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]), akkaInvocationHandler); - return self; + return server; } @Override - public void stopServer(RpcGateway selfGateway) { + public void stopServer(RpcServer selfGateway) { if (selfGateway instanceof AkkaGateway) { AkkaGateway akkaClient = (AkkaGateway) selfGateway; @@ -312,7 +312,7 @@ public class AkkaRpcService implements RpcService { @Override public <T> CompletableFuture<T> execute(Callable<T> callable) { - scala.concurrent.Future<T> scalaFuture = Futures.future(callable, actorSystem.dispatcher()); + Future<T> scalaFuture = Futures.future(callable, actorSystem.dispatcher()); return FutureUtils.toJava(scalaFuture); }
