This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 9a9abfe3b [CELEBORN-1245][FOLLOWUP] Fix SendWorkerEvent in HA mode
9a9abfe3b is described below
commit 9a9abfe3bc15954ce3e9594cf21a8164a983f97c
Author: Shuang <[email protected]>
AuthorDate: Tue May 7 15:16:47 2024 +0800
[CELEBORN-1245][FOLLOWUP] Fix SendWorkerEvent in HA mode
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
Handle worker event use wrong request.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
`RatisMasterStatusSystemSuiteJ#testHandleWorkerEvent`
Closes #2493 from RexXiong/CELEBORN-1245-FOLLOW-UP.
Authored-by: Shuang <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
docs/monitoring.md | 2 +-
.../deploy/master/clustermeta/ha/MetaHandler.java | 3 +-
.../ha/RatisMasterStatusSystemSuiteJ.java | 79 ++++++++++++++++++++++
3 files changed, 82 insertions(+), 2 deletions(-)
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 2b98e5ea3..2d5b493aa 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -389,5 +389,5 @@ API path listed as below:
| /unavailablePeers | List the unavailable peers of the worker, this
always means the worker connect to the peer failed.
|
| /isShutdown | Show if the worker is during the process of
shutdown.
|
| /isRegistered | Show if the worker is registered to the master
success.
|
-| /exit?type=${TYPE} | Trigger this worker to exit. Legal `type`s are
'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'.
|
+| /exit?type=${TYPE} | Trigger this worker to exit. Legal `type`s are
'Decommission', 'Graceful' and 'Immediately'.
|
| /help | List the available API providers of the worker.
|
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 948cca6ef..aa5a97827 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -264,11 +264,12 @@ public class MetaHandler {
case WorkerEvent:
List<ResourceProtos.WorkerAddress> workerAddresses =
-
request.getRemoveWorkersUnavailableInfoRequest().getUnavailableList();
+ request.getWorkerEventRequest().getWorkerAddressList();
List<WorkerInfo> workerInfoList =
workerAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
metaSystem.updateWorkerEventMeta(
request.getWorkerEventRequest().getWorkerEventType().getNumber(),
workerInfoList);
+ break;
case ApplicationMeta:
appId = request.getApplicationMetaRequest().getAppId();
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 9dbcff797..75ba1a298 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
import scala.Tuple2;
+import com.google.common.collect.Lists;
import org.junit.*;
import org.mockito.Mockito;
@@ -37,6 +38,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.meta.WorkerStatus;
+import org.apache.celeborn.common.protocol.WorkerEventType;
import org.apache.celeborn.common.quota.ResourceConsumption;
import org.apache.celeborn.common.rpc.RpcEndpointAddress;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
@@ -45,6 +47,7 @@ import
org.apache.celeborn.common.rpc.netty.NettyRpcEndpointRef;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.common.util.Utils$;
import
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager;
+import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
public class RatisMasterStatusSystemSuiteJ {
protected static HARaftServer RATISSERVER1 = null;
@@ -1177,6 +1180,82 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.minPartitionSizeToEstimate());
}
+ @Test
+ public void testHandleWorkerEvent() throws InterruptedException {
+ AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+ Assert.assertNotNull(statusSystem);
+
+ statusSystem.handleRegisterWorker(
+ HOSTNAME1,
+ RPCPORT1,
+ PUSHPORT1,
+ FETCHPORT1,
+ REPLICATEPORT1,
+ INTERNALPORT1,
+ NETWORK_LOCATION1,
+ disks1,
+ userResourceConsumption1,
+ getNewReqeustId());
+ statusSystem.handleRegisterWorker(
+ HOSTNAME2,
+ RPCPORT2,
+ PUSHPORT2,
+ FETCHPORT2,
+ REPLICATEPORT2,
+ INTERNALPORT2,
+ NETWORK_LOCATION2,
+ disks2,
+ userResourceConsumption2,
+ getNewReqeustId());
+ statusSystem.handleRegisterWorker(
+ HOSTNAME3,
+ RPCPORT3,
+ PUSHPORT3,
+ FETCHPORT3,
+ REPLICATEPORT3,
+ INTERNALPORT3,
+ NETWORK_LOCATION3,
+ disks3,
+ userResourceConsumption3,
+ getNewReqeustId());
+
+ WorkerInfo workerInfo1 =
+ WorkerInfo.fromUniqueId(
+ HOSTNAME1 + ":" + RPCPORT1 + ":" + PUSHPORT1 + ":" + FETCHPORT1 +
":" + REPLICATEPORT1);
+ WorkerInfo workerInfo2 =
+ WorkerInfo.fromUniqueId(
+ HOSTNAME2 + ":" + RPCPORT2 + ":" + PUSHPORT2 + ":" + FETCHPORT2 +
":" + REPLICATEPORT2);
+ statusSystem.handleWorkerEvent(
+ ResourceProtos.WorkerEventType.Decommission_VALUE,
+ Lists.newArrayList(workerInfo1, workerInfo2),
+ getNewReqeustId());
+
+ Thread.sleep(3000L);
+ Assert.assertEquals(2, STATUSSYSTEM1.workerEventInfos.size());
+ Assert.assertEquals(2, STATUSSYSTEM2.workerEventInfos.size());
+ Assert.assertEquals(2, STATUSSYSTEM3.workerEventInfos.size());
+
+ Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo1));
+ Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo2));
+
+ Assert.assertEquals(
+ WorkerEventType.Decommission,
+ STATUSSYSTEM1.workerEventInfos.get(workerInfo1).getEventType());
+
+ statusSystem.handleWorkerEvent(
+ ResourceProtos.WorkerEventType.None_VALUE,
+ Lists.newArrayList(workerInfo1),
+ getNewReqeustId());
+ Thread.sleep(3000L);
+ Assert.assertEquals(1, STATUSSYSTEM1.workerEventInfos.size());
+ Assert.assertEquals(1, STATUSSYSTEM2.workerEventInfos.size());
+ Assert.assertEquals(1, STATUSSYSTEM3.workerEventInfos.size());
+ Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo2));
+ Assert.assertEquals(
+ WorkerEventType.Decommission,
+ STATUSSYSTEM1.workerEventInfos.get(workerInfo2).getEventType());
+ }
+
@AfterClass
public static void testNotifyLogFailed() {
List<HARaftServer> list = Arrays.asList(RATISSERVER1, RATISSERVER2,
RATISSERVER3);