This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c1120e8b4 [CELEBORN-1139] Master's follower clean state before install 
snapshot
c1120e8b4 is described below

commit c1120e8b44dd9b30676cf2028293d33838ab0077
Author: 宪英 <[email protected]>
AuthorDate: Wed Dec 13 09:55:36 2023 +0800

    [CELEBORN-1139] Master's follower clean state before install snapshot
    
    ### What changes were proposed in this pull request?
    
    Master follower  will clean state before install snapshot, instead of adding
    
    ### Why are the changes needed?
    When a master's follower node receive a status snapshot from the leader, it 
will update the state machine directly without cleaning up the outdated status. 
This can cause problems, for example, the worker list may add an extra copy of 
registered workers in it.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT.
    
org.apache.celeborn.service.deploy.master.clustermeta.ha.MasterStateMachineSuiteJ
    
    Closes #2147 from liying919/main.
    
    Authored-by: 宪英 <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../deploy/master/clustermeta/AbstractMetaManager.java  | 17 +++++++++++++++--
 .../master/clustermeta/ha/MasterStateMachineSuiteJ.java |  9 +++++++++
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index a6526d218..3ddd39232 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -253,6 +253,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public void restoreMetaFromFile(File file) throws IOException {
     try (BufferedInputStream in = new BufferedInputStream(new 
FileInputStream(file))) {
       PbSnapshotMetaInfo snapshotMetaInfo = PbSnapshotMetaInfo.parseFrom(in);
+      cleanUpState();
 
       estimatedPartitionSize = snapshotMetaInfo.getEstimatedPartitionSize();
 
@@ -305,9 +306,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
               .map(PbSerDeUtils::fromPbWorkerInfo)
               .collect(Collectors.toSet()));
 
-      partitionTotalWritten.reset();
       partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
-      partitionTotalFileCount.reset();
       
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
       appDiskUsageMetric.restoreFromSnapshot(
           snapshotMetaInfo.getAppDiskUsageMetricSnapshotsList().stream()
@@ -331,6 +330,20 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     registeredShuffle.forEach(shuffle -> LOG.info("RegisteredShuffle {}", 
shuffle));
   }
 
+  private void cleanUpState() {
+    registeredShuffle.clear();
+    hostnameSet.clear();
+    workers.clear();
+    lostWorkers.clear();
+    appHeartbeatTime.clear();
+    excludedWorkers.clear();
+    shutdownWorkers.clear();
+    manuallyExcludedWorkers.clear();
+    workerLostEvents.clear();
+    partitionTotalWritten.reset();
+    partitionTotalFileCount.reset();
+  }
+
   public void updateMetaByReportWorkerUnavailable(List<WorkerInfo> 
failedWorkers) {
     synchronized (this.workers) {
       shutdownWorkers.addAll(failedWorkers);
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index a7fe7eb09..b202961da 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -183,14 +183,20 @@ public class MasterStateMachineSuiteJ extends 
RatisBaseSuiteJ {
     AppDiskUsageSnapShot originCurrentSnapshot =
         masterStatusSystem.appDiskUsageMetric.currentSnapShot().get();
 
+    masterStatusSystem.workers.add(new WorkerInfo(host1, 9095, 9094, 9093, 
9092));
+    masterStatusSystem.workers.add(new WorkerInfo(host2, 9095, 9094, 9093, 
9092));
+    masterStatusSystem.workers.add(new WorkerInfo(host3, 9095, 9094, 9093, 
9092));
+
     masterStatusSystem.writeMetaInfoToFile(tmpFile);
 
     masterStatusSystem.hostnameSet.clear();
     masterStatusSystem.excludedWorkers.clear();
     masterStatusSystem.manuallyExcludedWorkers.clear();
+    masterStatusSystem.workers.clear();
 
     masterStatusSystem.restoreMetaFromFile(tmpFile);
 
+    Assert.assertEquals(3, masterStatusSystem.workers.size());
     Assert.assertEquals(3, masterStatusSystem.excludedWorkers.size());
     Assert.assertEquals(2, masterStatusSystem.manuallyExcludedWorkers.size());
     Assert.assertEquals(3, masterStatusSystem.hostnameSet.size());
@@ -203,5 +209,8 @@ public class MasterStateMachineSuiteJ extends 
RatisBaseSuiteJ {
     Assert.assertEquals(
         originCurrentSnapshot, 
masterStatusSystem.appDiskUsageMetric.currentSnapShot().get());
     Assert.assertArrayEquals(originSnapshots, 
masterStatusSystem.appDiskUsageMetric.snapShots());
+
+    masterStatusSystem.restoreMetaFromFile(tmpFile);
+    Assert.assertEquals(3, masterStatusSystem.workers.size());
   }
 }

Reply via email to