This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 6ad779afb [CELEBORN-1139] Master's follower clean state before install 
snapshot
6ad779afb is described below

commit 6ad779afbfb6b312ea8582accdef4debfde4984c
Author: 宪英 <[email protected]>
AuthorDate: Wed Dec 13 09:55:36 2023 +0800

    [CELEBORN-1139] Master's follower clean state before install snapshot
    
    Master follower  will clean state before install snapshot, instead of adding
    
    When a master's follower node receive a status snapshot from the leader, it 
will update the state machine directly without cleaning up the outdated status. 
This can cause problems, for example, the worker list may add an extra copy of 
registered workers in it.
    
    No.
    
    UT.
    
org.apache.celeborn.service.deploy.master.clustermeta.ha.MasterStateMachineSuiteJ
    
    Closes #2147 from liying919/main.
    
    Authored-by: 宪英 <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit c1120e8b44dd9b30676cf2028293d33838ab0077)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../deploy/master/clustermeta/AbstractMetaManager.java   | 16 ++++++++++++++--
 .../master/clustermeta/ha/MasterStateMachineSuiteJ.java  | 11 ++++++++++-
 2 files changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 5e8f52b6f..f20fd6c26 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -245,6 +245,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public void restoreMetaFromFile(File file) throws IOException {
     try (BufferedInputStream in = new BufferedInputStream(new 
FileInputStream(file))) {
       PbSnapshotMetaInfo snapshotMetaInfo = PbSnapshotMetaInfo.parseFrom(in);
+      cleanUpState();
 
       estimatedPartitionSize = snapshotMetaInfo.getEstimatedPartitionSize();
 
@@ -293,9 +294,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
               .map(PbSerDeUtils::fromPbWorkerInfo)
               .collect(Collectors.toSet()));
 
-      partitionTotalWritten.reset();
       partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
-      partitionTotalFileCount.reset();
       
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
       appDiskUsageMetric.restoreFromSnapshot(
           snapshotMetaInfo.getAppDiskUsageMetricSnapshotsList().stream()
@@ -318,6 +317,19 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     registeredShuffle.forEach(shuffle -> LOG.info("RegisteredShuffle {}", 
shuffle));
   }
 
+  private void cleanUpState() {
+    registeredShuffle.clear();
+    hostnameSet.clear();
+    workers.clear();
+    lostWorkers.clear();
+    appHeartbeatTime.clear();
+    excludedWorkers.clear();
+    shutdownWorkers.clear();
+    workerLostEvents.clear();
+    partitionTotalWritten.reset();
+    partitionTotalFileCount.reset();
+  }
+
   public void updateMetaByReportWorkerUnavailable(List<WorkerInfo> 
failedWorkers) {
     synchronized (this.workers) {
       shutdownWorkers.addAll(failedWorkers);
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index 05361ddb5..b3c6d7ab2 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -180,13 +180,19 @@ public class MasterStateMachineSuiteJ extends 
RatisBaseSuiteJ {
     AppDiskUsageSnapShot originCurrentSnapshot =
         masterStatusSystem.appDiskUsageMetric.currentSnapShot().get();
 
+    masterStatusSystem.workers.add(new WorkerInfo(host1, 9095, 9094, 9093, 
9092));
+    masterStatusSystem.workers.add(new WorkerInfo(host2, 9095, 9094, 9093, 
9092));
+    masterStatusSystem.workers.add(new WorkerInfo(host3, 9095, 9094, 9093, 
9092));
+
     masterStatusSystem.writeMetaInfoToFile(tmpFile);
 
     masterStatusSystem.hostnameSet.clear();
     masterStatusSystem.excludedWorkers.clear();
+    masterStatusSystem.workers.clear();
 
     masterStatusSystem.restoreMetaFromFile(tmpFile);
 
+    Assert.assertEquals(3, masterStatusSystem.workers.size());
     Assert.assertEquals(3, masterStatusSystem.excludedWorkers.size());
     Assert.assertEquals(3, masterStatusSystem.hostnameSet.size());
     Assert.assertEquals(
@@ -197,6 +203,9 @@ public class MasterStateMachineSuiteJ extends 
RatisBaseSuiteJ {
         
masterStatusSystem.appDiskUsageMetric.currentSnapShot().get().topNItems().length);
     Assert.assertEquals(
         originCurrentSnapshot, 
masterStatusSystem.appDiskUsageMetric.currentSnapShot().get());
-    Assert.assertEquals(originSnapshots, 
masterStatusSystem.appDiskUsageMetric.snapShots());
+    Assert.assertArrayEquals(originSnapshots, 
masterStatusSystem.appDiskUsageMetric.snapShots());
+
+    masterStatusSystem.restoreMetaFromFile(tmpFile);
+    Assert.assertEquals(3, masterStatusSystem.workers.size());
   }
 }

Reply via email to