[ https://issues.apache.org/jira/browse/FLINK-4535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458419#comment-15458419 ]
ASF GitHub Bot commented on FLINK-4535: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2451#discussion_r77339204 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerTest.java --- @@ -0,0 +1,85 @@ +package org.apache.flink.runtime.rpc.resourcemanager; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.rpc.UnmatchedLeaderSessionIDException; +import org.apache.flink.runtime.rpc.registration.RegistrationResponse; +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; + +public class ResourceManagerTest { + + private TestingSerialRpcService rpcService; + + @Before + public void setup() throws Exception { + rpcService = new TestingSerialRpcService(); + } + + @After + public void teardown() throws Exception { + rpcService.stopService(); + } + + /** + * Test registerTaskExecutor, including normal registration, registration with unmatched leadershipId, registration with invalid address, duplicate registration + * @throws Exception + */ + @Test + public void testRegisterTaskExecutor() throws Exception { + String taskExecutorAddress = "/taskExecutor1"; + TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + ResourceID taskExecutorResourceID = ResourceID.generate(); + rpcService.registerGateway(taskExecutorAddress, taskExecutorGateway); + + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); + + final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices); + resourceManager.start(); + final UUID leaderSessionId = UUID.randomUUID(); + leaderElectionService.isLeader(leaderSessionId); + + // test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId + UUID differentLeaderSessionID = UUID.randomUUID(); + Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID); + assertTrue(unMatchedLeaderFuture.isCompleted()); + assertTrue(unMatchedLeaderFuture.failed().isCompleted()); + assertTrue(unMatchedLeaderFuture.failed().value().get().get() instanceof UnmatchedLeaderSessionIDException); --- End diff -- Why not simply doing `Await.result` and catching the `UnmatchedLeaderSessionIDException`? > ResourceManager registration with TaskExecutor > ---------------------------------------------- > > Key: FLINK-4535 > URL: https://issues.apache.org/jira/browse/FLINK-4535 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: zhangjing > Assignee: zhangjing > > When TaskExecutor register at ResourceManager, it takes the following 3 input > parameters: > 1. resourceManagerLeaderId: the fencing token for the ResourceManager leader > which is kept by taskExecutor who send the registration > 2. taskExecutorAddress: the address of taskExecutor > 3. resourceID: The resource ID of the TaskExecutor that registers > ResourceManager need to process the registration event based on the following > steps: > 1. Check whether input resourceManagerLeaderId is as same as the current > leadershipSessionId of resourceManager. If not, it means that maybe two or > more resourceManager exists at the same time, and current resourceManager is > not the proper rm. so it rejects or ignores the registration. > 2. Check whether exists a valid taskExecutor at the giving address by > connecting to the address. Reject the registration from invalid address. > 3. Check whether it is a duplicate registration by input resourceId, reject > the registration > 4. Keep resourceID and taskExecutorGateway mapping relationships, And > optionally keep resourceID and container mapping relationships in yarn mode. > 5. Create the connection between resourceManager and taskExecutor, and ensure > its healthy based on heartbeat rpc calls between rm and tm ? > 6. Send registration successful ack to the taskExecutor. > Discussion: > Maybe we need import errorCode or several registration decline subclass to > distinguish the different causes of decline registration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)