[FLINK-4355] [cluster management] Add tests for the TaskManager -> 
ResourceManager registration.

This closes #2395.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ea97a18
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ea97a18
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ea97a18

Branch: refs/heads/flip-6
Commit: 5ea97a185d266f96570a4ea3c967ecbc384378cd
Parents: e6b0f12
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 19 23:45:54 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 21 11:39:15 2016 +0200

----------------------------------------------------------------------
 .../rpc/registration/RetryingRegistration.java  |   4 +
 .../runtime/rpc/taskexecutor/SlotReport.java    |  38 ---
 .../runtime/rpc/taskexecutor/TaskExecutor.java  |  12 +
 ...TaskExecutorToResourceManagerConnection.java |   4 +
 .../TestingHighAvailabilityServices.java        |  53 +++
 .../flink/runtime/rpc/TestingGatewayBase.java   |  18 +-
 .../registration/RetryingRegistrationTest.java  | 336 +++++++++++++++++++
 .../registration/TestRegistrationGateway.java   |  85 +++++
 .../rpc/taskexecutor/TaskExecutorTest.java      |  92 ++++-
 9 files changed, 602 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
index 4c93684..dcb5011 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
@@ -58,12 +58,16 @@ public abstract class RetryingRegistration<Gateway extends 
RpcGateway, Success e
        //  default configuration values
        // 
------------------------------------------------------------------------
 
+       /** default value for the initial registration timeout (milliseconds) */
        private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100;
 
+       /** default value for the maximum registration timeout, after 
exponential back-off (milliseconds) */
        private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000;
 
+       /** The pause (milliseconds) made after an registration attempt caused 
an exception (other than timeout) */
        private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000;
 
+       /** The pause (milliseconds) made after the registration attempt was 
refused */
        private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000;
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
deleted file mode 100644
index e42fa4a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.taskexecutor;
-
-import java.io.Serializable;
-
-/**
- * A report about the current status of all slots of the TaskExecutor, 
describing
- * which slots are available and allocated, and what jobs (JobManagers) the 
allocated slots
- * have been allocated to.
- */
-public class SlotReport implements Serializable{
-
-       private static final long serialVersionUID = 1L;
-
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return "SlotReport";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
index 1a637bb..f201e00 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rpc.taskexecutor;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
@@ -72,6 +73,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
        @Override
        public void start() {
+               super.start();
+
                // start by connecting to the ResourceManager
                try {
                        
haServices.getResourceManagerLeaderRetriever().start(new 
ResourceManagerLeaderListener());
@@ -148,6 +151,15 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        }
 
        // 
------------------------------------------------------------------------
+       //  Access to fields for testing
+       // 
------------------------------------------------------------------------
+
+       @VisibleForTesting
+       TaskExecutorToResourceManagerConnection getResourceManagerConnection() {
+               return resourceManagerConnection;
+       }
+
+       // 
------------------------------------------------------------------------
        //  Utility classes
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
index ef75862..f398b7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -40,6 +40,9 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
+/**
+ * The connection between a TaskExecutor and the ResourceManager.
+ */
 public class TaskExecutorToResourceManagerConnection {
 
        /** the logger for all log messages of this class */
@@ -87,6 +90,7 @@ public class TaskExecutorToResourceManagerConnection {
                                log, taskExecutor.getRpcService(),
                                resourceManagerAddress, resourceManagerLeaderId,
                                taskExecutor.getAddress(), 
taskExecutor.getResourceID());
+               registration.startRegistration();
 
                Future<Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess>> future = registration.getFuture();
                

http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
new file mode 100644
index 0000000..3a9f943
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+/**
+ * A variant of the HighAvailabilityServices for testing. Each individual 
service can be set
+ * to an arbitrary implementation, such as a mock or default service.
+ */
+public class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
+
+       private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
+
+
+       // 
------------------------------------------------------------------------
+       //  Setters for mock / testing implementations
+       // 
------------------------------------------------------------------------
+
+       public void setResourceManagerLeaderRetriever(LeaderRetrievalService 
resourceManagerLeaderRetriever) {
+               this.resourceManagerLeaderRetriever = 
resourceManagerLeaderRetriever;
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  HA Services Methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public LeaderRetrievalService getResourceManagerLeaderRetriever() 
throws Exception {
+               LeaderRetrievalService service = 
this.resourceManagerLeaderRetriever;
+               if (service != null) {
+                       return service;
+               } else {
+                       throw new 
IllegalStateException("ResourceManagerLeaderRetriever has not been set");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
index 4256135..8133a87 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
@@ -34,8 +34,15 @@ public abstract class TestingGatewayBase implements 
RpcGateway {
 
        private final ScheduledExecutorService executor;
 
-       protected TestingGatewayBase() {
+       private final String address;
+
+       protected TestingGatewayBase(final String address) {
                this.executor = Executors.newSingleThreadScheduledExecutor();
+               this.address = address;
+       }
+
+       protected TestingGatewayBase() {
+               this("localhost");
        }
 
        // 
------------------------------------------------------------------------
@@ -53,6 +60,15 @@ public abstract class TestingGatewayBase implements 
RpcGateway {
        }
 
        // 
------------------------------------------------------------------------
+       //  Base class methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String getAddress() {
+               return address;
+       }
+
+       // 
------------------------------------------------------------------------
        //  utilities
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
new file mode 100644
index 0000000..9508825
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.registration;
+
+import akka.dispatch.Futures;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the generic retrying registration class, validating the failure, 
retry, and back-off behavior.
+ */
+public class RetryingRegistrationTest extends TestLogger {
+
+       @Test
+       public void testSimpleSuccessfulRegistration() throws Exception {
+               final String testId = "laissez les bon temps roulez";
+               final String testEndpointAddress = "<test-address>";
+               final UUID leaderId = UUID.randomUUID();
+
+               // an endpoint that immediately returns success
+               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(new TestRegistrationSuccess(testId));
+               TestingRpcService rpc = new TestingRpcService();
+
+               try {
+                       rpc.registerGateway(testEndpointAddress, testGateway);
+
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+                       registration.startRegistration();
+
+                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
+                       assertNotNull(future);
+
+                       // multiple accesses return the same future
+                       assertEquals(future, registration.getFuture());
+
+                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success = 
+                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
+
+                       // validate correct invocation and result
+                       assertEquals(testId, success.f1.getCorrelationId());
+                       assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
+               }
+               finally {
+                       testGateway.stop();
+                       rpc.stopService();
+               }
+       }
+       
+       @Test
+       public void testPropagateFailures() throws Exception {
+               final String testExceptionMessage = "testExceptionMessage";
+
+               // RPC service that fails with exception upon the connection
+               RpcService rpc = mock(RpcService.class);
+               when(rpc.connect(anyString(), any(Class.class))).thenThrow(new 
RuntimeException(testExceptionMessage));
+
+               TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID());
+               registration.startRegistration();
+
+               Future<?> future = registration.getFuture();
+               assertTrue(future.failed().isCompleted());
+
+               assertEquals(testExceptionMessage, 
future.failed().value().get().get().getMessage());
+       }
+
+       @Test
+       public void testRetryConnectOnFailure() throws Exception {
+               final String testId = "laissez les bon temps roulez";
+               final UUID leaderId = UUID.randomUUID();
+
+               ExecutorService executor = Executors.newCachedThreadPool();
+               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(new TestRegistrationSuccess(testId));
+
+               try {
+                       // RPC service that fails upon the first connection, 
but succeeds on the second
+                       RpcService rpc = mock(RpcService.class);
+                       when(rpc.connect(anyString(), 
any(Class.class))).thenReturn(
+                                       Futures.failed(new Exception("test 
connect failure")),  // first connection attempt fails
+                                       Futures.successful(testGateway)         
                // second connection attempt succeeds
+                       );
+                       
when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor));
+
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, "foobar address", leaderId);
+                       registration.startRegistration();
+
+                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
+                                       Await.result(registration.getFuture(), 
new FiniteDuration(10, SECONDS));
+
+                       // validate correct invocation and result
+                       assertEquals(testId, success.f1.getCorrelationId());
+                       assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
+               }
+               finally {
+                       testGateway.stop();
+                       executor.shutdown();
+               }
+       }
+
+       @Test
+       public void testRetriesOnTimeouts() throws Exception {
+               final String testId = "rien ne va plus";
+               final String testEndpointAddress = "<test-address>";
+               final UUID leaderId = UUID.randomUUID();
+
+               // an endpoint that immediately returns futures with timeouts 
before returning a successful future
+               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(
+                               null, // timeout
+                               null, // timeout
+                               new TestRegistrationSuccess(testId) // success
+               );
+
+               TestingRpcService rpc = new TestingRpcService();
+
+               try {
+                       rpc.registerGateway(testEndpointAddress, testGateway);
+       
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+       
+                       long started = System.nanoTime();
+                       registration.startRegistration();
+       
+                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
+                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
+                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
+       
+                       long finished = System.nanoTime();
+                       long elapsedMillis = (finished - started) / 1000000;
+       
+                       // validate correct invocation and result
+                       assertEquals(testId, success.f1.getCorrelationId());
+                       assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
+       
+                       // validate that some retry-delay / back-off behavior 
happened
+                       assertTrue("retries did not properly back off", 
elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT);
+               }
+               finally {
+                       rpc.stopService();
+                       testGateway.stop();
+               }
+       }
+
+       @Test
+       public void testDecline() throws Exception {
+               final String testId = "qui a coupe le fromage";
+               final String testEndpointAddress = "<test-address>";
+               final UUID leaderId = UUID.randomUUID();
+
+               TestingRpcService rpc = new TestingRpcService();
+
+               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(
+                               null, // timeout
+                               new RegistrationResponse.Decline("no reason "),
+                               null, // timeout
+                               new TestRegistrationSuccess(testId) // success
+               );
+
+               try {
+                       rpc.registerGateway(testEndpointAddress, testGateway);
+
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+
+                       long started = System.nanoTime();
+                       registration.startRegistration();
+       
+                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
+                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
+                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
+
+                       long finished = System.nanoTime();
+                       long elapsedMillis = (finished - started) / 1000000;
+
+                       // validate correct invocation and result
+                       assertEquals(testId, success.f1.getCorrelationId());
+                       assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
+
+                       // validate that some retry-delay / back-off behavior 
happened
+                       assertTrue("retries did not properly back off", 
elapsedMillis >= 
+                                       2 * 
TestRetryingRegistration.INITIAL_TIMEOUT + 
TestRetryingRegistration.DELAY_ON_DECLINE);
+               }
+               finally {
+                       testGateway.stop();
+                       rpc.stopService();
+               }
+       }
+       
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testRetryOnError() throws Exception {
+               final String testId = "Petit a petit, l'oiseau fait son nid";
+               final String testEndpointAddress = "<test-address>";
+               final UUID leaderId = UUID.randomUUID();
+
+               TestingRpcService rpc = new TestingRpcService();
+
+               try {
+                       // gateway that upon calls first responds with a 
failure, then with a success
+                       TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
+
+                       when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenReturn(
+                                       
Futures.<RegistrationResponse>failed(new Exception("test exception")),
+                                       
Futures.<RegistrationResponse>successful(new TestRegistrationSuccess(testId)));
+                       
+                       rpc.registerGateway(testEndpointAddress, testGateway);
+
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+
+                       long started = System.nanoTime();
+                       registration.startRegistration();
+
+                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
+                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
+                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
+
+                       long finished = System.nanoTime();
+                       long elapsedMillis = (finished - started) / 1000000;
+                       
+                       assertEquals(testId, success.f1.getCorrelationId());
+
+                       // validate that some retry-delay / back-off behavior 
happened
+                       assertTrue("retries did not properly back off",
+                                       elapsedMillis >= 
TestRetryingRegistration.DELAY_ON_ERROR);
+               }
+               finally {
+                       rpc.stopService();
+               }
+       }
+
+       @Test
+       public void testCancellation() throws Exception {
+               final String testEndpointAddress = "my-test-address";
+               final UUID leaderId = UUID.randomUUID();
+
+               TestingRpcService rpc = new TestingRpcService();
+
+               try {
+                       Promise<RegistrationResponse> result = 
Futures.promise();
+
+                       TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
+                       when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenReturn(result.future());
+
+                       rpc.registerGateway(testEndpointAddress, testGateway);
+
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+                       registration.startRegistration();
+
+                       // cancel and fail the current registration attempt
+                       registration.cancel();
+                       result.failure(new TimeoutException());
+
+                       // there should not be a second registration attempt
+                       verify(testGateway, 
atMost(1)).registrationCall(any(UUID.class), anyLong());
+               }
+               finally {
+                       rpc.stopService();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test registration
+       // 
------------------------------------------------------------------------
+
+       private static class TestRegistrationSuccess extends 
RegistrationResponse.Success {
+               private static final long serialVersionUID = 
5542698790917150604L;
+
+               private final String correlationId;
+
+               private TestRegistrationSuccess(String correlationId) {
+                       this.correlationId = correlationId;
+               }
+
+               public String getCorrelationId() {
+                       return correlationId;
+               }
+       }
+
+       private static class TestRetryingRegistration extends 
RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
+
+               // we use shorter timeouts here to speed up the tests
+               static final long INITIAL_TIMEOUT = 20;
+               static final long MAX_TIMEOUT = 200;
+               static final long DELAY_ON_ERROR = 200;
+               static final long DELAY_ON_DECLINE = 200;
+
+               public TestRetryingRegistration(RpcService rpc, String 
targetAddress, UUID leaderId) {
+                       
super(LoggerFactory.getLogger(RetryingRegistrationTest.class),
+                                       rpc, "TestEndpoint",
+                                       TestRegistrationGateway.class,
+                                       targetAddress, leaderId,
+                                       INITIAL_TIMEOUT, MAX_TIMEOUT, 
DELAY_ON_ERROR, DELAY_ON_DECLINE);
+               }
+
+               @Override
+               protected Future<RegistrationResponse> invokeRegistration(
+                               TestRegistrationGateway gateway, UUID leaderId, 
long timeoutMillis) {
+                       return gateway.registrationCall(leaderId, 
timeoutMillis);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
new file mode 100644
index 0000000..a049e48
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.registration;
+
+import akka.dispatch.Futures;
+
+import org.apache.flink.runtime.rpc.TestingGatewayBase;
+import org.apache.flink.util.Preconditions;
+
+import scala.concurrent.Future;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestRegistrationGateway extends TestingGatewayBase {
+
+       private final BlockingQueue<RegistrationCall> invocations;
+
+       private final RegistrationResponse[] responses;
+
+       private int pos;
+
+       public TestRegistrationGateway(RegistrationResponse... responses) {
+               Preconditions.checkArgument(responses != null && 
responses.length > 0);
+
+               this.invocations = new LinkedBlockingQueue<>();
+               this.responses = responses;
+               
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public Future<RegistrationResponse> registrationCall(UUID leaderId, 
long timeout) {
+               invocations.add(new RegistrationCall(leaderId, timeout));
+
+               RegistrationResponse response = responses[pos];
+               if (pos < responses.length - 1) {
+                       pos++;
+               }
+
+               // return a completed future (for a proper value), or one that 
never completes and will time out (for null)
+               return response != null ? Futures.successful(response) : 
this.<RegistrationResponse>futureWithTimeout(timeout);
+       }
+
+       public BlockingQueue<RegistrationCall> getInvocations() {
+               return invocations;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public static class RegistrationCall {
+               private final UUID leaderId;
+               private final long timeout;
+
+               public RegistrationCall(UUID leaderId, long timeout) {
+                       this.leaderId = leaderId;
+                       this.timeout = timeout;
+               }
+
+               public UUID leaderId() {
+                       return leaderId;
+               }
+
+               public long timeout() {
+                       return timeout;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
index 9f9bab3..b831ead 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -18,8 +18,98 @@
 
 package org.apache.flink.runtime.rpc.taskexecutor;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
 public class TaskExecutorTest extends TestLogger {
-       
+
+       @Test
+       public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
+               final ResourceID resourceID = ResourceID.generate();
+               final String resourceManagerAddress = 
"/resource/manager/address/one";
+
+               final TestingRpcService rpc = new TestingRpcService();
+               try {
+                       // register a mock resource manager gateway
+                       ResourceManagerGateway rmGateway = 
mock(ResourceManagerGateway.class);
+                       rpc.registerGateway(resourceManagerAddress, rmGateway);
+
+                       NonHaServices haServices = new 
NonHaServices(resourceManagerAddress);
+                       TaskExecutor taskManager = new TaskExecutor(rpc, 
haServices, resourceID);
+                       String taskManagerAddress = taskManager.getAddress();
+
+                       taskManager.start();
+
+                       verify(rmGateway, timeout(5000)).registerTaskExecutor(
+                                       any(UUID.class), 
eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+               }
+               finally {
+                       rpc.stopService();
+               }
+       }
+
+       @Test
+       public void testTriggerRegistrationOnLeaderChange() throws Exception {
+               final ResourceID resourceID = ResourceID.generate();
+
+               final String address1 = "/resource/manager/address/one";
+               final String address2 = "/resource/manager/address/two";
+               final UUID leaderId1 = UUID.randomUUID();
+               final UUID leaderId2 = UUID.randomUUID();
+
+               final TestingRpcService rpc = new TestingRpcService();
+               try {
+                       // register the mock resource manager gateways
+                       ResourceManagerGateway rmGateway1 = 
mock(ResourceManagerGateway.class);
+                       ResourceManagerGateway rmGateway2 = 
mock(ResourceManagerGateway.class);
+                       rpc.registerGateway(address1, rmGateway1);
+                       rpc.registerGateway(address2, rmGateway2);
+
+                       TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService();
+
+                       TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+                       
haServices.setResourceManagerLeaderRetriever(testLeaderService);
+
+                       TaskExecutor taskManager = new TaskExecutor(rpc, 
haServices, resourceID);
+                       String taskManagerAddress = taskManager.getAddress();
+                       taskManager.start();
+
+                       // no connection initially, since there is no leader
+                       assertNull(taskManager.getResourceManagerConnection());
+
+                       // define a leader and see that a registration happens
+                       testLeaderService.notifyListener(address1, leaderId1);
+
+                       verify(rmGateway1, timeout(5000)).registerTaskExecutor(
+                                       eq(leaderId1), eq(taskManagerAddress), 
eq(resourceID), any(FiniteDuration.class));
+                       
assertNotNull(taskManager.getResourceManagerConnection());
+
+                       // cancel the leader 
+                       testLeaderService.notifyListener(null, null);
+
+                       // set a new leader, see that a registration happens 
+                       testLeaderService.notifyListener(address2, leaderId2);
+
+                       verify(rmGateway2, timeout(5000)).registerTaskExecutor(
+                                       eq(leaderId2), eq(taskManagerAddress), 
eq(resourceID), any(FiniteDuration.class));
+                       
assertNotNull(taskManager.getResourceManagerConnection());
+               }
+               finally {
+                       rpc.stopService();
+               }
+       }
 }

Reply via email to