[FLINK-4530] [rpc] Generalize TaskExecutorToResourceManagerConnection to be 
reusable

This closes #2520


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8c6b998
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8c6b998
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8c6b998

Branch: refs/heads/flip-6
Commit: b8c6b9986522aea96c79c774eeea5dc57a7bfc64
Parents: 07512e0
Author: zhuhaifengleon <zhu.haifeng.l...@gmail.com>
Authored: Mon Sep 26 17:43:44 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:41 2016 +0200

----------------------------------------------------------------------
 .../JobMasterToResourceManagerConnection.java   | 117 +++++++++++
 .../registration/RegisteredRpcConnection.java   | 192 +++++++++++++++++++
 .../runtime/taskexecutor/TaskExecutor.java      |   4 +-
 ...TaskExecutorToResourceManagerConnection.java | 127 +++---------
 .../RegisteredRpcConnectionTest.java            | 183 ++++++++++++++++++
 .../registration/RetryingRegistrationTest.java  |   6 +-
 6 files changed, 519 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8c6b998/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
new file mode 100644
index 0000000..71fce8c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.registration.RegisteredRpcConnection;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.registration.RetryingRegistration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.concurrent.Future;
+
+import org.slf4j.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The connection between a JobMaster and the ResourceManager.
+ */
+public class JobMasterToResourceManagerConnection 
+               extends RegisteredRpcConnection<ResourceManagerGateway, 
JobMasterRegistrationSuccess> {
+
+       /** the JobMaster whose connection to the ResourceManager this 
represents */
+       private final JobMaster jobMaster;
+
+       private final JobID jobID;
+
+       private final UUID jobMasterLeaderId;
+
+       public JobMasterToResourceManagerConnection(
+                       Logger log,
+                       JobID jobID,
+                       JobMaster jobMaster,
+                       UUID jobMasterLeaderId,
+                       String resourceManagerAddress,
+                       UUID resourceManagerLeaderId,
+                       Executor executor) {
+
+               super(log, resourceManagerAddress, resourceManagerLeaderId, 
executor);
+               this.jobMaster = checkNotNull(jobMaster);
+               this.jobID = checkNotNull(jobID);
+               this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId);
+       }
+
+       @Override
+       protected RetryingRegistration<ResourceManagerGateway, 
JobMasterRegistrationSuccess> generateRegistration() {
+               return new 
JobMasterToResourceManagerConnection.ResourceManagerRegistration(
+                       log, jobMaster.getRpcService(),
+                       getTargetAddress(), getTargetLeaderId(),
+                       jobMaster.getAddress(),jobID, jobMasterLeaderId);
+       }
+
+       @Override
+       protected void onRegistrationSuccess(JobMasterRegistrationSuccess 
success) {
+       }
+
+       @Override
+       protected void onRegistrationFailure(Throwable failure) {
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private static class ResourceManagerRegistration
+               extends RetryingRegistration<ResourceManagerGateway, 
JobMasterRegistrationSuccess> {
+
+               private final String jobMasterAddress;
+
+               private final JobID jobID;
+
+               private final UUID jobMasterLeaderId;
+
+               ResourceManagerRegistration(
+                       Logger log,
+                       RpcService rpcService,
+                       String targetAddress,
+                       UUID leaderId,
+                       String jobMasterAddress,
+                       JobID jobID,
+                       UUID jobMasterLeaderId) {
+
+                       super(log, rpcService, "ResourceManager", 
ResourceManagerGateway.class, targetAddress, leaderId);
+                       this.jobMasterAddress = checkNotNull(jobMasterAddress);
+                       this.jobID = checkNotNull(jobID);
+                       this.jobMasterLeaderId = 
checkNotNull(jobMasterLeaderId);
+               }
+
+               @Override
+               protected Future<RegistrationResponse> invokeRegistration(
+                       ResourceManagerGateway gateway, UUID leaderId, long 
timeoutMillis) throws Exception {
+
+                       Time timeout = Time.milliseconds(timeoutMillis);
+                       return gateway.registerJobMaster(leaderId, 
jobMasterLeaderId,jobMasterAddress, jobID, timeout);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c6b998/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
new file mode 100644
index 0000000..76093b0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+
+import org.slf4j.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This utility class implements the basis of RPC connecting from one 
component to another component,
+ * for example the RPC connection from TaskExecutor to ResourceManager.
+ * This {@code RegisteredRpcConnection} implements registration and get target 
gateway .
+ *
+ * <p>The registration gives access to a future that is completed upon 
successful registration.
+ * The RPC connection can be closed, for example when the target where it 
tries to register
+ * at looses leader status.
+ *
+ * @param <Gateway> The type of the gateway to connect to.
+ * @param <Success> The type of the successful registration responses.
+ */
+public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, 
Success extends RegistrationResponse.Success> {
+
+       /** the logger for all log messages of this class */
+       protected final Logger log;
+
+       /** the target component leaderID, for example the ResourceManager 
leaderID */
+       private final UUID targetLeaderId;
+
+       /** the target component Address, for example the ResourceManager 
Address */
+       private final String targetAddress;
+
+       /** Execution context to be used to execute the on complete action of 
the ResourceManagerRegistration */
+       private final Executor executor;
+
+       /** the Registration of this RPC connection */
+       private RetryingRegistration<Gateway, Success> pendingRegistration;
+
+       /** the gateway to register, it's null until the registration is 
completed */
+       private volatile Gateway targetGateway;
+
+       /** flag indicating that the RPC connection is closed */
+       private volatile boolean closed;
+
+       // 
------------------------------------------------------------------------
+
+       public RegisteredRpcConnection(
+               Logger log,
+               String targetAddress,
+               UUID targetLeaderId,
+               Executor executor)
+       {
+               this.log = checkNotNull(log);
+               this.targetAddress = checkNotNull(targetAddress);
+               this.targetLeaderId = checkNotNull(targetLeaderId);
+               this.executor = checkNotNull(executor);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Life cycle
+       // 
------------------------------------------------------------------------
+
+       @SuppressWarnings("unchecked")
+       public void start() {
+               checkState(!closed, "The RPC connection is already closed");
+               checkState(!isConnected() && pendingRegistration == null, "The 
RPC connection is already started");
+
+               pendingRegistration = checkNotNull(generateRegistration());
+               pendingRegistration.startRegistration();
+
+               Future<Tuple2<Gateway, Success>> future = 
pendingRegistration.getFuture();
+
+               future.thenAcceptAsync(new AcceptFunction<Tuple2<Gateway, 
Success>>() {
+                       @Override
+                       public void accept(Tuple2<Gateway, Success> result) {
+                               targetGateway = result.f0;
+                               onRegistrationSuccess(result.f1);
+                       }
+               }, executor);
+
+               // this future should only ever fail if there is a bug, not if 
the registration is declined
+               future.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+                       @Override
+                       public Void apply(Throwable failure) {
+                               onRegistrationFailure(failure);
+                               return null;
+                       }
+               }, executor);
+       }
+
+       /**
+        * This method generate a specific Registration, for example 
TaskExecutor Registration at the ResourceManager
+        */
+       protected abstract RetryingRegistration<Gateway, Success> 
generateRegistration();
+
+       /**
+        * This method handle the Registration Response
+        */
+       protected abstract void onRegistrationSuccess(Success success);
+
+       /**
+        * This method handle the Registration failure
+        */
+       protected abstract void onRegistrationFailure(Throwable failure);
+
+       /**
+        * close connection
+        */
+       public void close() {
+               closed = true;
+
+               // make sure we do not keep re-trying forever
+               if (pendingRegistration != null) {
+                       pendingRegistration.cancel();
+               }
+       }
+
+       public boolean isClosed() {
+               return closed;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+
+       public UUID getTargetLeaderId() {
+               return targetLeaderId;
+       }
+
+       public String getTargetAddress() {
+               return targetAddress;
+       }
+
+       /**
+        * Gets the RegisteredGateway. This returns null until the registration 
is completed.
+        */
+       public Gateway getTargetGateway() {
+               return targetGateway;
+       }
+
+       public boolean isConnected() {
+               return targetGateway != null;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               String connectionInfo = "(ADDRESS: " + targetAddress + " 
LEADERID: " + targetLeaderId + ")";
+
+               if (isConnected()) {
+                       connectionInfo = "RPC connection to " + 
targetGateway.getClass().getSimpleName() + " " + connectionInfo;
+               } else {
+                       connectionInfo = "RPC connection to " + connectionInfo;
+               }
+
+               if (isClosed()) {
+                       connectionInfo = connectionInfo + " is closed";
+               } else if (isConnected()){
+                       connectionInfo = connectionInfo + " is established";
+               } else {
+                       connectionInfo = connectionInfo + " is connecting";
+               }
+
+               return connectionInfo;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c6b998/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9e3c3b9..9d9ad2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -178,12 +178,12 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        if (newLeaderAddress != null) {
                                // the resource manager switched to a new leader
                                log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
-                                       
resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
+                                       
resourceManagerConnection.getTargetAddress(), newLeaderAddress);
                        }
                        else {
                                // address null means that the current leader 
is lost without a new leader being there, yet
                                log.info("Current ResourceManager {} lost 
leader status. Waiting for new ResourceManager leader.",
-                                       
resourceManagerConnection.getResourceManagerAddress());
+                                       
resourceManagerConnection.getTargetAddress());
                        }
 
                        // drop the current connection or connection attempt

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c6b998/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 647359d..b4b3bae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -19,16 +19,14 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.concurrent.Future;
 
 import org.slf4j.Logger;
 
@@ -36,115 +34,46 @@ import java.util.UUID;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The connection between a TaskExecutor and the ResourceManager.
  */
-public class TaskExecutorToResourceManagerConnection {
-
-       /** the logger for all log messages of this class */
-       private final Logger log;
+public class TaskExecutorToResourceManagerConnection
+               extends RegisteredRpcConnection<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> {
 
        /** the TaskExecutor whose connection to the ResourceManager this 
represents */
        private final TaskExecutor taskExecutor;
 
-       private final UUID resourceManagerLeaderId;
-
-       private final String resourceManagerAddress;
-
-       /** Execution context to be used to execute the on complete action of 
the ResourceManagerRegistration */
-       private final Executor executor;
-
-       private 
TaskExecutorToResourceManagerConnection.ResourceManagerRegistration 
pendingRegistration;
-
-       private volatile ResourceManagerGateway registeredResourceManager;
-
        private InstanceID registrationId;
 
-       /** flag indicating that the connection is closed */
-       private volatile boolean closed;
-
-
        public TaskExecutorToResourceManagerConnection(
-               Logger log,
-               TaskExecutor taskExecutor,
-               String resourceManagerAddress,
-               UUID resourceManagerLeaderId,
-               Executor executor) {
+                       Logger log,
+                       TaskExecutor taskExecutor,
+                       String resourceManagerAddress,
+                       UUID resourceManagerLeaderId,
+                       Executor executor) {
 
-               this.log = checkNotNull(log);
+               super(log, resourceManagerAddress, resourceManagerLeaderId, 
executor);
                this.taskExecutor = checkNotNull(taskExecutor);
-               this.resourceManagerAddress = 
checkNotNull(resourceManagerAddress);
-               this.resourceManagerLeaderId = 
checkNotNull(resourceManagerLeaderId);
-               this.executor = checkNotNull(executor);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Life cycle
-       // 
------------------------------------------------------------------------
-
-       @SuppressWarnings("unchecked")
-       public void start() {
-               checkState(!closed, "The connection is already closed");
-               checkState(!isRegistered() && pendingRegistration == null, "The 
connection is already started");
-
-               pendingRegistration = new 
TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
-                               log, taskExecutor.getRpcService(),
-                               resourceManagerAddress, resourceManagerLeaderId,
-                               taskExecutor.getAddress(), 
taskExecutor.getResourceID());
-               pendingRegistration.startRegistration();
-
-               Future<Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture();
-
-               future.thenAcceptAsync(new 
AcceptFunction<Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess>>() {
-                       @Override
-                       public void accept(Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> result) {
-                               registrationId = result.f1.getRegistrationId();
-                               registeredResourceManager = result.f0;
-                       }
-               }, executor);
-               
-               // this future should only ever fail if there is a bug, not if 
the registration is declined
-               future.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
-                       @Override
-                       public Void apply(Throwable failure) {
-                               taskExecutor.onFatalErrorAsync(failure);
-                               return null;
-                       }
-               }, executor);
-       }
-
-       public void close() {
-               closed = true;
-
-               // make sure we do not keep re-trying forever
-               if (pendingRegistration != null) {
-                       pendingRegistration.cancel();
-               }
        }
 
-       public boolean isClosed() {
-               return closed;
-       }
 
-       // 
------------------------------------------------------------------------
-       //  Properties
-       // 
------------------------------------------------------------------------
-
-       public UUID getResourceManagerLeaderId() {
-               return resourceManagerLeaderId;
+       @Override
+       protected RetryingRegistration<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> generateRegistration() {
+               return new 
TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
+                       log, taskExecutor.getRpcService(),
+                       getTargetAddress(), getTargetLeaderId(),
+                       taskExecutor.getAddress(),taskExecutor.getResourceID());
        }
 
-       public String getResourceManagerAddress() {
-               return resourceManagerAddress;
+       @Override
+       protected void onRegistrationSuccess(TaskExecutorRegistrationSuccess 
success) {
+               registrationId = success.getRegistrationId();
        }
 
-       /**
-        * Gets the ResourceManagerGateway. This returns null until the 
registration is completed.
-        */
-       public ResourceManagerGateway getResourceManager() {
-               return registeredResourceManager;
+       @Override
+       protected void onRegistrationFailure(Throwable failure) {
+               taskExecutor.onFatalErrorAsync(failure);
        }
 
        /**
@@ -155,18 +84,6 @@ public class TaskExecutorToResourceManagerConnection {
                return registrationId;
        }
 
-       public boolean isRegistered() {
-               return registeredResourceManager != null;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return String.format("Connection to ResourceManager %s 
(leaderId=%s)",
-                               resourceManagerAddress, 
resourceManagerLeaderId); 
-       }
-
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c6b998/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
new file mode 100644
index 0000000..8558205
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import 
org.apache.flink.runtime.registration.RetryingRegistrationTest.TestRegistrationSuccess;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for RegisteredRpcConnection, validating the successful, failure and 
close behavior.
+ */
+public class RegisteredRpcConnectionTest extends TestLogger {
+
+       @Test
+       public void testSuccessfulRpcConnection() throws Exception {
+               final String testRpcConnectionEndpointAddress = 
"<TestRpcConnectionEndpointAddress>";
+               final UUID leaderId = UUID.randomUUID();
+               final String connectionID = "Test RPC Connection ID";
+
+               // an endpoint that immediately returns success
+               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(new 
RetryingRegistrationTest.TestRegistrationSuccess(connectionID));
+               TestingRpcService rpcService = new TestingRpcService();
+
+               try {
+                       
rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+
+                       TestRpcConnection connection = new 
TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, 
rpcService.getExecutor(), rpcService);
+                       connection.start();
+
+                       //wait for connection established
+                       
Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+
+                       // validate correct invocation and result
+                       assertTrue(connection.isConnected());
+                       assertEquals(testRpcConnectionEndpointAddress, 
connection.getTargetAddress());
+                       assertEquals(leaderId, connection.getTargetLeaderId());
+                       assertEquals(testGateway, 
connection.getTargetGateway());
+                       assertEquals(connectionID, 
connection.getConnectionId());
+               }
+               finally {
+                       testGateway.stop();
+                       rpcService.stopService();
+               }
+       }
+
+       @Test
+       public void testRpcConnectionFailures() throws Exception {
+               final String connectionFailureMessage = "Test RPC Connection 
failure";
+               final String testRpcConnectionEndpointAddress = 
"<TestRpcConnectionEndpointAddress>";
+               final UUID leaderId = UUID.randomUUID();
+
+               TestingRpcService rpcService = new TestingRpcService();
+
+               try {
+                       // gateway that upon calls Throw an exception
+                       TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
+                       when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenThrow(
+                               new RuntimeException(connectionFailureMessage));
+
+                       
rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+
+                       TestRpcConnection connection = new 
TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, 
rpcService.getExecutor(), rpcService);
+                       connection.start();
+
+                       //wait for connection failure
+                       
Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+
+                       // validate correct invocation and result
+                       assertFalse(connection.isConnected());
+                       assertEquals(testRpcConnectionEndpointAddress, 
connection.getTargetAddress());
+                       assertEquals(leaderId, connection.getTargetLeaderId());
+                       assertNull(connection.getTargetGateway());
+                       assertEquals(connectionFailureMessage, 
connection.getFailareMessage());
+               }
+               finally {
+                       rpcService.stopService();
+               }
+       }
+
+       @Test
+       public void testRpcConnectionClose() throws Exception {
+               final String testRpcConnectionEndpointAddress = 
"<TestRpcConnectionEndpointAddress>";
+               final UUID leaderId = UUID.randomUUID();
+               final String connectionID = "Test RPC Connection ID";
+
+               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(new 
RetryingRegistrationTest.TestRegistrationSuccess(connectionID));
+               TestingRpcService rpcService = new TestingRpcService();
+
+               try{
+                       
rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+
+                       TestRpcConnection connection = new 
TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, 
rpcService.getExecutor(), rpcService);
+                       connection.start();
+                       //close the connection
+                       connection.close();
+
+                       // validate connection is closed
+                       assertEquals(testRpcConnectionEndpointAddress, 
connection.getTargetAddress());
+                       assertEquals(leaderId, connection.getTargetLeaderId());
+                       assertTrue(connection.isClosed());
+               }
+               finally {
+                       testGateway.stop();
+                       rpcService.stopService();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test RegisteredRpcConnection
+       // 
------------------------------------------------------------------------
+
+       private static class TestRpcConnection extends 
RegisteredRpcConnection<TestRegistrationGateway, TestRegistrationSuccess> {
+
+               private final RpcService rpcService;
+
+               private String connectionId;
+
+               private String failureMessage;
+
+               public TestRpcConnection(String targetAddress,
+                                                                UUID 
targetLeaderId,
+                                                                Executor 
executor,
+                                                                RpcService 
rpcService)
+               {
+                       
super(LoggerFactory.getLogger(RegisteredRpcConnectionTest.class), 
targetAddress, targetLeaderId, executor);
+                       this.rpcService = rpcService;
+               }
+
+               @Override
+               protected RetryingRegistration<TestRegistrationGateway, 
RetryingRegistrationTest.TestRegistrationSuccess> generateRegistration() {
+                       return new 
RetryingRegistrationTest.TestRetryingRegistration(rpcService, 
getTargetAddress(), getTargetLeaderId());
+               }
+
+               @Override
+               protected void 
onRegistrationSuccess(RetryingRegistrationTest.TestRegistrationSuccess success) 
{
+                       connectionId = success.getCorrelationId();
+               }
+
+               @Override
+               protected void onRegistrationFailure(Throwable failure) {
+                       failureMessage = failure.getMessage();
+               }
+
+               public String getConnectionId() {
+                       return connectionId;
+               }
+
+               public String getFailareMessage() {
+                       return failureMessage;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c6b998/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index e56a9ec..6d6bbef 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -298,12 +298,12 @@ public class RetryingRegistrationTest extends TestLogger {
        //  test registration
        // 
------------------------------------------------------------------------
 
-       private static class TestRegistrationSuccess extends 
RegistrationResponse.Success {
+       protected static class TestRegistrationSuccess extends 
RegistrationResponse.Success {
                private static final long serialVersionUID = 
5542698790917150604L;
 
                private final String correlationId;
 
-               private TestRegistrationSuccess(String correlationId) {
+               public TestRegistrationSuccess(String correlationId) {
                        this.correlationId = correlationId;
                }
 
@@ -312,7 +312,7 @@ public class RetryingRegistrationTest extends TestLogger {
                }
        }
 
-       private static class TestRetryingRegistration extends 
RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
+       protected static class TestRetryingRegistration extends 
RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
 
                // we use shorter timeouts here to speed up the tests
                static final long INITIAL_TIMEOUT = 20;

Reply via email to