[ 
https://issues.apache.org/jira/browse/FLINK-4535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458419#comment-15458419
 ] 

ASF GitHub Bot commented on FLINK-4535:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2451#discussion_r77339204
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerTest.java
 ---
    @@ -0,0 +1,85 @@
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
    +import org.apache.flink.runtime.rpc.TestingSerialRpcService;
    +import org.apache.flink.runtime.rpc.UnmatchedLeaderSessionIDException;
    +import org.apache.flink.runtime.rpc.registration.RegistrationResponse;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import 
org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Test;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.*;
    +
    +public class ResourceManagerTest {
    +
    +   private TestingSerialRpcService rpcService;
    +
    +   @Before
    +   public void setup() throws Exception {
    +           rpcService = new TestingSerialRpcService();
    +   }
    +
    +   @After
    +   public void teardown() throws Exception {
    +           rpcService.stopService();
    +   }
    +
    +   /**
    +    * Test registerTaskExecutor, including normal registration, 
registration with unmatched leadershipId,  registration with invalid address, 
duplicate registration
    +    * @throws Exception
    +    */
    +   @Test
    +   public void testRegisterTaskExecutor() throws Exception {
    +           String taskExecutorAddress = "/taskExecutor1";
    +           TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
    +           ResourceID taskExecutorResourceID = ResourceID.generate();
    +           rpcService.registerGateway(taskExecutorAddress, 
taskExecutorGateway);
    +
    +           TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
    +           TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
    +           
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
    +
    +           final ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices);
    +           resourceManager.start();
    +           final UUID leaderSessionId = UUID.randomUUID();
    +           leaderElectionService.isLeader(leaderSessionId);
    +
    +           // test throw exception when receive a registration from 
taskExecutor which takes unmatched leaderSessionId
    +           UUID differentLeaderSessionID = UUID.randomUUID();
    +           Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerTaskExecutor(differentLeaderSessionID, 
taskExecutorAddress, taskExecutorResourceID);
    +           assertTrue(unMatchedLeaderFuture.isCompleted());
    +           assertTrue(unMatchedLeaderFuture.failed().isCompleted());
    +           assertTrue(unMatchedLeaderFuture.failed().value().get().get() 
instanceof UnmatchedLeaderSessionIDException);
    --- End diff --
    
    Why not simply doing `Await.result` and catching the 
`UnmatchedLeaderSessionIDException`?


> ResourceManager registration with TaskExecutor
> ----------------------------------------------
>
>                 Key: FLINK-4535
>                 URL: https://issues.apache.org/jira/browse/FLINK-4535
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: zhangjing
>            Assignee: zhangjing
>
> When TaskExecutor register at ResourceManager, it takes the following 3 input 
> parameters:
> 1. resourceManagerLeaderId:  the fencing token for the ResourceManager leader 
> which is kept by taskExecutor who send the registration
> 2.  taskExecutorAddress: the address of taskExecutor
> 3. resourceID: The resource ID of the TaskExecutor that registers
> ResourceManager need to process the registration event based on the following 
> steps:
> 1. Check whether input resourceManagerLeaderId is as same as the current 
> leadershipSessionId of resourceManager. If not, it means that maybe two or 
> more resourceManager exists at the same time, and current resourceManager is 
> not the proper rm. so it  rejects or ignores the registration.
> 2. Check whether exists a valid taskExecutor at the giving address by 
> connecting to the address. Reject the registration from invalid address.
> 3. Check whether it is a duplicate registration by input resourceId, reject 
> the registration
> 4. Keep resourceID and taskExecutorGateway mapping relationships, And 
> optionally keep resourceID and container mapping relationships in yarn mode.
> 5. Create the connection between resourceManager and taskExecutor, and ensure 
> its healthy based on heartbeat rpc calls between rm and tm ?
> 6. Send registration successful ack to the taskExecutor.
> Discussion:
> Maybe we need import errorCode or several registration decline subclass to 
> distinguish the different causes of decline registration. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to