This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a9abfe3b [CELEBORN-1245][FOLLOWUP] Fix SendWorkerEvent in HA mode
9a9abfe3b is described below

commit 9a9abfe3bc15954ce3e9594cf21a8164a983f97c
Author: Shuang <[email protected]>
AuthorDate: Tue May 7 15:16:47 2024 +0800

    [CELEBORN-1245][FOLLOWUP] Fix SendWorkerEvent in HA mode
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    Handle worker event use wrong request.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    `RatisMasterStatusSystemSuiteJ#testHandleWorkerEvent`
    
    Closes #2493 from RexXiong/CELEBORN-1245-FOLLOW-UP.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 docs/monitoring.md                                 |  2 +-
 .../deploy/master/clustermeta/ha/MetaHandler.java  |  3 +-
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 79 ++++++++++++++++++++++
 3 files changed, 82 insertions(+), 2 deletions(-)

diff --git a/docs/monitoring.md b/docs/monitoring.md
index 2b98e5ea3..2d5b493aa 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -389,5 +389,5 @@ API path listed as below:
 | /unavailablePeers          | List the unavailable peers of the worker, this 
always means the worker connect to the peer failed.                             
                                                                                
                                                                                
                                                              |
 | /isShutdown                | Show if the worker is during the process of 
shutdown.                                                                       
                                                                                
                                                                                
                                                                 |
 | /isRegistered              | Show if the worker is registered to the master 
success.                                                                        
                                                                                
                                                                                
                                                              |
-| /exit?type=${TYPE}         | Trigger this worker to exit. Legal `type`s are 
'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'.                                   
                                                                                
                                                                                
                                                              |
+| /exit?type=${TYPE}         | Trigger this worker to exit. Legal `type`s are 
'Decommission', 'Graceful' and 'Immediately'.                                   
                                                                                
                                                                                
                                                              |
 | /help                      | List the available API providers of the worker. 
                                                                                
                                                                                
                                                                                
                                                             |
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 948cca6ef..aa5a97827 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -264,11 +264,12 @@ public class MetaHandler {
 
         case WorkerEvent:
           List<ResourceProtos.WorkerAddress> workerAddresses =
-              
request.getRemoveWorkersUnavailableInfoRequest().getUnavailableList();
+              request.getWorkerEventRequest().getWorkerAddressList();
           List<WorkerInfo> workerInfoList =
               
workerAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
           metaSystem.updateWorkerEventMeta(
               
request.getWorkerEventRequest().getWorkerEventType().getNumber(), 
workerInfoList);
+          break;
 
         case ApplicationMeta:
           appId = request.getApplicationMetaRequest().getAppId();
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 9dbcff797..75ba1a298 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import scala.Tuple2;
 
+import com.google.common.collect.Lists;
 import org.junit.*;
 import org.mockito.Mockito;
 
@@ -37,6 +38,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.meta.DiskInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.meta.WorkerStatus;
+import org.apache.celeborn.common.protocol.WorkerEventType;
 import org.apache.celeborn.common.quota.ResourceConsumption;
 import org.apache.celeborn.common.rpc.RpcEndpointAddress;
 import org.apache.celeborn.common.rpc.RpcEndpointRef;
@@ -45,6 +47,7 @@ import 
org.apache.celeborn.common.rpc.netty.NettyRpcEndpointRef;
 import org.apache.celeborn.common.util.Utils;
 import org.apache.celeborn.common.util.Utils$;
 import 
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager;
+import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
 
 public class RatisMasterStatusSystemSuiteJ {
   protected static HARaftServer RATISSERVER1 = null;
@@ -1177,6 +1180,82 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.minPartitionSizeToEstimate());
   }
 
+  @Test
+  public void testHandleWorkerEvent() throws InterruptedException {
+    AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+    Assert.assertNotNull(statusSystem);
+
+    statusSystem.handleRegisterWorker(
+        HOSTNAME1,
+        RPCPORT1,
+        PUSHPORT1,
+        FETCHPORT1,
+        REPLICATEPORT1,
+        INTERNALPORT1,
+        NETWORK_LOCATION1,
+        disks1,
+        userResourceConsumption1,
+        getNewReqeustId());
+    statusSystem.handleRegisterWorker(
+        HOSTNAME2,
+        RPCPORT2,
+        PUSHPORT2,
+        FETCHPORT2,
+        REPLICATEPORT2,
+        INTERNALPORT2,
+        NETWORK_LOCATION2,
+        disks2,
+        userResourceConsumption2,
+        getNewReqeustId());
+    statusSystem.handleRegisterWorker(
+        HOSTNAME3,
+        RPCPORT3,
+        PUSHPORT3,
+        FETCHPORT3,
+        REPLICATEPORT3,
+        INTERNALPORT3,
+        NETWORK_LOCATION3,
+        disks3,
+        userResourceConsumption3,
+        getNewReqeustId());
+
+    WorkerInfo workerInfo1 =
+        WorkerInfo.fromUniqueId(
+            HOSTNAME1 + ":" + RPCPORT1 + ":" + PUSHPORT1 + ":" + FETCHPORT1 + 
":" + REPLICATEPORT1);
+    WorkerInfo workerInfo2 =
+        WorkerInfo.fromUniqueId(
+            HOSTNAME2 + ":" + RPCPORT2 + ":" + PUSHPORT2 + ":" + FETCHPORT2 + 
":" + REPLICATEPORT2);
+    statusSystem.handleWorkerEvent(
+        ResourceProtos.WorkerEventType.Decommission_VALUE,
+        Lists.newArrayList(workerInfo1, workerInfo2),
+        getNewReqeustId());
+
+    Thread.sleep(3000L);
+    Assert.assertEquals(2, STATUSSYSTEM1.workerEventInfos.size());
+    Assert.assertEquals(2, STATUSSYSTEM2.workerEventInfos.size());
+    Assert.assertEquals(2, STATUSSYSTEM3.workerEventInfos.size());
+
+    Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo1));
+    Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo2));
+
+    Assert.assertEquals(
+        WorkerEventType.Decommission,
+        STATUSSYSTEM1.workerEventInfos.get(workerInfo1).getEventType());
+
+    statusSystem.handleWorkerEvent(
+        ResourceProtos.WorkerEventType.None_VALUE,
+        Lists.newArrayList(workerInfo1),
+        getNewReqeustId());
+    Thread.sleep(3000L);
+    Assert.assertEquals(1, STATUSSYSTEM1.workerEventInfos.size());
+    Assert.assertEquals(1, STATUSSYSTEM2.workerEventInfos.size());
+    Assert.assertEquals(1, STATUSSYSTEM3.workerEventInfos.size());
+    Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo2));
+    Assert.assertEquals(
+        WorkerEventType.Decommission,
+        STATUSSYSTEM1.workerEventInfos.get(workerInfo2).getEventType());
+  }
+
   @AfterClass
   public static void testNotifyLogFailed() {
     List<HARaftServer> list = Arrays.asList(RATISSERVER1, RATISSERVER2, 
RATISSERVER3);

Reply via email to