[hotfix] Treat taskManager's rpc address and location separately
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7ed9a5e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7ed9a5e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7ed9a5e Branch: refs/heads/master Commit: a7ed9a5e3876c538deae147217c5443a287e98d5 Parents: bc68236 Author: Kurt Young <[email protected]> Authored: Mon Oct 17 09:38:46 2016 +0800 Committer: Stephan Ewen <[email protected]> Committed: Fri Dec 23 20:54:23 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmaster/JobMaster.java | 7 ++-- .../runtime/jobmaster/JobMasterGateway.java | 8 +++-- .../runtime/taskexecutor/JobLeaderService.java | 37 +++++++++++++------- .../runtime/taskexecutor/TaskExecutor.java | 2 +- .../runtime/taskexecutor/TaskExecutorTest.java | 14 ++++---- 5 files changed, 43 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 1fb5474..7bcfb3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -674,13 +674,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @RpcMethod public Future<RegistrationResponse> registerTaskManager( + final String taskManagerRpcAddress, final TaskManagerLocation taskManagerLocation, final UUID leaderId) throws Exception { if (!JobMaster.this.leaderSessionID.equals(leaderId)) { log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " + "leader session ID {} did not equal the received leader session ID {}.", - taskManagerLocation.getResourceID(), taskManagerLocation.addressString(), + taskManagerLocation.getResourceID(), taskManagerRpcAddress, JobMaster.this.leaderSessionID, leaderId); throw new Exception("Leader id not match, expected: " + JobMaster.this.leaderSessionID + ", actual: " + leaderId); @@ -696,7 +697,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { return getRpcService().execute(new Callable<TaskExecutorGateway>() { @Override public TaskExecutorGateway call() throws Exception { - return getRpcService().connect(taskManagerLocation.addressString(), TaskExecutorGateway.class) + return getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class) .get(rpcTimeout.getSize(), rpcTimeout.getUnit()); } }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() { @@ -709,7 +710,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { if (!JobMaster.this.leaderSessionID.equals(leaderId)) { log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " + "leader session ID {} did not equal the received leader session ID {}.", - taskManagerLocation.getResourceID(), taskManagerLocation.addressString(), + taskManagerId, taskManagerRpcAddress, JobMaster.this.leaderSessionID, leaderId); return new RegistrationResponse.Decline("Invalid leader session id"); } http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 508e70a..8925d94 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -185,12 +185,14 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { /** * Register the task manager at the job manager. * - * @param taskManagerLocation location of the task manager - * @param leaderId identifying the job leader - * @param timeout for the rpc call + * @param taskManagerRpcAddress the rpc address of the task manager + * @param taskManagerLocation location of the task manager + * @param leaderId identifying the job leader + * @param timeout for the rpc call * @return Future registration response indicating whether the registration was successful or not */ Future<RegistrationResponse> registerTaskManager( + final String taskManagerRpcAddress, final TaskManagerLocation taskManagerLocation, final UUID leaderId, @RpcTimeout final Time timeout); http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java index 14d36ab..93c7bb7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java @@ -62,6 +62,9 @@ public class JobLeaderService { /** Internal state of the service */ private volatile JobLeaderService.State state; + /** Address of the owner of this service. This address is used for the job manager connection */ + private String ownerAddress; + /** Rpc service to use for establishing connections */ private RpcService rpcService; @@ -78,6 +81,7 @@ public class JobLeaderService { state = JobLeaderService.State.CREATED; + ownerAddress = null; rpcService = null; highAvailabilityServices = null; jobLeaderListener = null; @@ -90,20 +94,23 @@ public class JobLeaderService { /** * Start the job leader service with the given services. * + * @param initialOwnerAddress to be used for establishing connections (source address) * @param initialRpcService to be used to create rpc connections * @param initialHighAvailabilityServices to create leader retrieval services for the different jobs * @param initialJobLeaderListener listening for job leader changes */ public void start( - final RpcService initialRpcService, - final HighAvailabilityServices initialHighAvailabilityServices, - final JobLeaderListener initialJobLeaderListener) { + final String initialOwnerAddress, + final RpcService initialRpcService, + final HighAvailabilityServices initialHighAvailabilityServices, + final JobLeaderListener initialJobLeaderListener) { if (JobLeaderService.State.CREATED != state) { throw new IllegalStateException("The service has already been started."); } else { LOG.info("Start job leader service."); + this.ownerAddress = Preconditions.checkNotNull(initialOwnerAddress); this.rpcService = Preconditions.checkNotNull(initialRpcService); this.highAvailabilityServices = Preconditions.checkNotNull(initialHighAvailabilityServices); this.jobLeaderListener = Preconditions.checkNotNull(initialJobLeaderListener); @@ -310,6 +317,7 @@ public class JobLeaderService { JobMasterGateway.class, getTargetAddress(), getTargetLeaderId(), + ownerAddress, ownLocation); } @@ -345,19 +353,23 @@ public class JobLeaderService { extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> { + private final String taskManagerRpcAddress; + private final TaskManagerLocation taskManagerLocation; JobManagerRetryingRegistration( - Logger log, - RpcService rpcService, - String targetName, - Class<JobMasterGateway> targetType, - String targetAddress, - UUID leaderId, - TaskManagerLocation taskManagerLocation) { - + Logger log, + RpcService rpcService, + String targetName, + Class<JobMasterGateway> targetType, + String targetAddress, + UUID leaderId, + String taskManagerRpcAddress, + TaskManagerLocation taskManagerLocation) + { super(log, rpcService, targetName, targetType, targetAddress, leaderId); + this.taskManagerRpcAddress = taskManagerRpcAddress; this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); } @@ -365,7 +377,8 @@ public class JobLeaderService { protected Future<RegistrationResponse> invokeRegistration( JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception { - return gateway.registerTaskManager(taskManagerLocation, leaderId, Time.milliseconds(timeoutMillis)); + return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation, + leaderId, Time.milliseconds(timeoutMillis)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 1b1c02b..5146e5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -208,7 +208,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { taskSlotTable.start(new SlotActionsImpl()); // start the job leader service - jobLeaderService.start(getRpcService(), haServices, new JobLeaderListenerImpl()); + jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index a8da4fd..55cc142 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -394,9 +394,10 @@ public class TaskExecutorTest extends TestLogger { final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); when(jobMasterGateway.registerTaskManager( - eq(taskManagerLocation), - eq(jobManagerLeaderId), - any(Time.class) + any(String.class), + eq(taskManagerLocation), + eq(jobManagerLeaderId), + any(Time.class) )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress); @@ -498,9 +499,10 @@ public class TaskExecutorTest extends TestLogger { final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); when(jobMasterGateway.registerTaskManager( - eq(taskManagerLocation), - eq(jobManagerLeaderId), - any(Time.class) + any(String.class), + eq(taskManagerLocation), + eq(jobManagerLeaderId), + any(Time.class) )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
