Repository: flink
Updated Branches:
  refs/heads/flip-6 a19cae3b0 -> 4f891a6c2


[hotfix] Treat taskManager's rpc address and location separately


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e30eab9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e30eab9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e30eab9

Branch: refs/heads/flip-6
Commit: 7e30eab9174ca179809ade60c47d9af54a3717de
Parents: a19cae3
Author: Kurt Young <ykt...@gmail.com>
Authored: Mon Oct 17 09:38:46 2016 +0800
Committer: Kurt Young <ykt...@gmail.com>
Committed: Mon Oct 17 09:38:46 2016 +0800

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      |  7 ++--
 .../runtime/jobmaster/JobMasterGateway.java     |  8 +++--
 .../runtime/taskexecutor/JobLeaderService.java  | 37 +++++++++++++-------
 .../runtime/taskexecutor/TaskExecutor.java      |  2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 14 ++++----
 5 files changed, 43 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e30eab9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 8cb9946..306a28a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -680,13 +680,14 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
        @RpcMethod
        public Future<RegistrationResponse> registerTaskManager(
+                       final String taskManagerRpcAddress,
                        final TaskManagerLocation taskManagerLocation,
                        final UUID leaderId) throws Exception
        {
                if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
                        log.warn("Discard registration from TaskExecutor {} at 
({}) because the expected " +
                                                        "leader session ID {} 
did not equal the received leader session ID {}.",
-                                       taskManagerLocation.getResourceID(), 
taskManagerLocation.addressString(),
+                                       taskManagerLocation.getResourceID(), 
taskManagerRpcAddress,
                                        JobMaster.this.leaderSessionID, 
leaderId);
                        throw new Exception("Leader id not match, expected: " + 
JobMaster.this.leaderSessionID
                                        + ", actual: " + leaderId);
@@ -702,7 +703,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        return getRpcService().execute(new 
Callable<TaskExecutorGateway>() {
                                @Override
                                public TaskExecutorGateway call() throws 
Exception {
-                                       return 
getRpcService().connect(taskManagerLocation.addressString(), 
TaskExecutorGateway.class)
+                                       return 
getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class)
                                                        
.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
                                }
                        }).handleAsync(new BiFunction<TaskExecutorGateway, 
Throwable, RegistrationResponse>() {
@@ -715,7 +716,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                        if 
(!JobMaster.this.leaderSessionID.equals(leaderId)) {
                                                log.warn("Discard registration 
from TaskExecutor {} at ({}) because the expected " +
                                                                                
"leader session ID {} did not equal the received leader session ID {}.",
-                                                               
taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+                                                               taskManagerId, 
taskManagerRpcAddress,
                                                                
JobMaster.this.leaderSessionID, leaderId);
                                                return new 
RegistrationResponse.Decline("Invalid leader session id");
                                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e30eab9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 4c85839..4ee9f92 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -196,12 +196,14 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
        /**
         * Register the task manager at the job manager.
         *
-        * @param taskManagerLocation location of the task manager
-        * @param leaderId            identifying the job leader
-        * @param timeout             for the rpc call
+        * @param taskManagerRpcAddress the rpc address of the task manager
+        * @param taskManagerLocation   location of the task manager
+        * @param leaderId              identifying the job leader
+        * @param timeout               for the rpc call
         * @return Future registration response indicating whether the 
registration was successful or not
         */
        Future<RegistrationResponse> registerTaskManager(
+                       final String taskManagerRpcAddress,
                        final TaskManagerLocation taskManagerLocation,
                        final UUID leaderId,
                        @RpcTimeout final Time timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/7e30eab9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 14d36ab..93c7bb7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -62,6 +62,9 @@ public class JobLeaderService {
        /** Internal state of the service */
        private volatile JobLeaderService.State state;
 
+       /** Address of the owner of this service. This address is used for the 
job manager connection */
+       private String ownerAddress;
+
        /** Rpc service to use for establishing connections */
        private RpcService rpcService;
 
@@ -78,6 +81,7 @@ public class JobLeaderService {
 
                state = JobLeaderService.State.CREATED;
 
+               ownerAddress = null;
                rpcService = null;
                highAvailabilityServices = null;
                jobLeaderListener = null;
@@ -90,20 +94,23 @@ public class JobLeaderService {
        /**
         * Start the job leader service with the given services.
         *
+        * @param initialOwnerAddress to be used for establishing connections 
(source address)
         * @param initialRpcService to be used to create rpc connections
         * @param initialHighAvailabilityServices to create leader retrieval 
services for the different jobs
         * @param initialJobLeaderListener listening for job leader changes
         */
        public void start(
-               final RpcService initialRpcService,
-               final HighAvailabilityServices initialHighAvailabilityServices,
-               final JobLeaderListener initialJobLeaderListener) {
+                       final String initialOwnerAddress,
+                       final RpcService initialRpcService,
+                       final HighAvailabilityServices 
initialHighAvailabilityServices,
+                       final JobLeaderListener initialJobLeaderListener) {
 
                if (JobLeaderService.State.CREATED != state) {
                        throw new IllegalStateException("The service has 
already been started.");
                } else {
                        LOG.info("Start job leader service.");
 
+                       this.ownerAddress = 
Preconditions.checkNotNull(initialOwnerAddress);
                        this.rpcService = 
Preconditions.checkNotNull(initialRpcService);
                        this.highAvailabilityServices = 
Preconditions.checkNotNull(initialHighAvailabilityServices);
                        this.jobLeaderListener = 
Preconditions.checkNotNull(initialJobLeaderListener);
@@ -310,6 +317,7 @@ public class JobLeaderService {
                                                JobMasterGateway.class,
                                                getTargetAddress(),
                                                getTargetLeaderId(),
+                                               ownerAddress,
                                                ownLocation);
                        }
 
@@ -345,19 +353,23 @@ public class JobLeaderService {
                        extends RetryingRegistration<JobMasterGateway, 
JMTMRegistrationSuccess>
        {
 
+               private final String taskManagerRpcAddress;
+
                private final TaskManagerLocation taskManagerLocation;
 
                JobManagerRetryingRegistration(
-                       Logger log,
-                       RpcService rpcService,
-                       String targetName,
-                       Class<JobMasterGateway> targetType,
-                       String targetAddress,
-                       UUID leaderId,
-                       TaskManagerLocation taskManagerLocation) {
-
+                               Logger log,
+                               RpcService rpcService,
+                               String targetName,
+                               Class<JobMasterGateway> targetType,
+                               String targetAddress,
+                               UUID leaderId,
+                               String taskManagerRpcAddress,
+                               TaskManagerLocation taskManagerLocation)
+               {
                        super(log, rpcService, targetName, targetType, 
targetAddress, leaderId);
 
+                       this.taskManagerRpcAddress = taskManagerRpcAddress;
                        this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
                }
 
@@ -365,7 +377,8 @@ public class JobLeaderService {
                protected Future<RegistrationResponse> invokeRegistration(
                                JobMasterGateway gateway, UUID leaderId, long 
timeoutMillis) throws Exception
                {
-                       return gateway.registerTaskManager(taskManagerLocation, 
leaderId, Time.milliseconds(timeoutMillis));
+                       return 
gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation,
+                                       leaderId, 
Time.milliseconds(timeoutMillis));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e30eab9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 1201281..3e3a544 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -206,7 +206,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                taskSlotTable.start(new SlotActionsImpl());
 
                // start the job leader service
-               jobLeaderService.start(getRpcService(), haServices, new 
JobLeaderListenerImpl());
+               jobLeaderService.start(getAddress(), getRpcService(), 
haServices, new JobLeaderListenerImpl());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7e30eab9/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 2220f12..2b5d2dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -379,9 +379,10 @@ public class TaskExecutorTest extends TestLogger {
                final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
 
                when(jobMasterGateway.registerTaskManager(
-                       eq(taskManagerLocation),
-                       eq(jobManagerLeaderId),
-                       any(Time.class)
+                               any(String.class),
+                               eq(taskManagerLocation),
+                               eq(jobManagerLeaderId),
+                               any(Time.class)
                
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
                
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
 
@@ -483,9 +484,10 @@ public class TaskExecutorTest extends TestLogger {
                final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
 
                when(jobMasterGateway.registerTaskManager(
-                       eq(taskManagerLocation),
-                       eq(jobManagerLeaderId),
-                       any(Time.class)
+                               any(String.class),
+                               eq(taskManagerLocation),
+                               eq(jobManagerLeaderId),
+                               any(Time.class)
                
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
                
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
 

Reply via email to