This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c1120e8b4 [CELEBORN-1139] Master's follower clean state before install
snapshot
c1120e8b4 is described below
commit c1120e8b44dd9b30676cf2028293d33838ab0077
Author: 宪英 <[email protected]>
AuthorDate: Wed Dec 13 09:55:36 2023 +0800
[CELEBORN-1139] Master's follower clean state before install snapshot
### What changes were proposed in this pull request?
Master follower will clean state before install snapshot, instead of adding
### Why are the changes needed?
When a master's follower node receive a status snapshot from the leader, it
will update the state machine directly without cleaning up the outdated status.
This can cause problems, for example, the worker list may add an extra copy of
registered workers in it.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
org.apache.celeborn.service.deploy.master.clustermeta.ha.MasterStateMachineSuiteJ
Closes #2147 from liying919/main.
Authored-by: 宪英 <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../deploy/master/clustermeta/AbstractMetaManager.java | 17 +++++++++++++++--
.../master/clustermeta/ha/MasterStateMachineSuiteJ.java | 9 +++++++++
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index a6526d218..3ddd39232 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -253,6 +253,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public void restoreMetaFromFile(File file) throws IOException {
try (BufferedInputStream in = new BufferedInputStream(new
FileInputStream(file))) {
PbSnapshotMetaInfo snapshotMetaInfo = PbSnapshotMetaInfo.parseFrom(in);
+ cleanUpState();
estimatedPartitionSize = snapshotMetaInfo.getEstimatedPartitionSize();
@@ -305,9 +306,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
- partitionTotalWritten.reset();
partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
- partitionTotalFileCount.reset();
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
appDiskUsageMetric.restoreFromSnapshot(
snapshotMetaInfo.getAppDiskUsageMetricSnapshotsList().stream()
@@ -331,6 +330,20 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
registeredShuffle.forEach(shuffle -> LOG.info("RegisteredShuffle {}",
shuffle));
}
+ private void cleanUpState() {
+ registeredShuffle.clear();
+ hostnameSet.clear();
+ workers.clear();
+ lostWorkers.clear();
+ appHeartbeatTime.clear();
+ excludedWorkers.clear();
+ shutdownWorkers.clear();
+ manuallyExcludedWorkers.clear();
+ workerLostEvents.clear();
+ partitionTotalWritten.reset();
+ partitionTotalFileCount.reset();
+ }
+
public void updateMetaByReportWorkerUnavailable(List<WorkerInfo>
failedWorkers) {
synchronized (this.workers) {
shutdownWorkers.addAll(failedWorkers);
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index a7fe7eb09..b202961da 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -183,14 +183,20 @@ public class MasterStateMachineSuiteJ extends
RatisBaseSuiteJ {
AppDiskUsageSnapShot originCurrentSnapshot =
masterStatusSystem.appDiskUsageMetric.currentSnapShot().get();
+ masterStatusSystem.workers.add(new WorkerInfo(host1, 9095, 9094, 9093,
9092));
+ masterStatusSystem.workers.add(new WorkerInfo(host2, 9095, 9094, 9093,
9092));
+ masterStatusSystem.workers.add(new WorkerInfo(host3, 9095, 9094, 9093,
9092));
+
masterStatusSystem.writeMetaInfoToFile(tmpFile);
masterStatusSystem.hostnameSet.clear();
masterStatusSystem.excludedWorkers.clear();
masterStatusSystem.manuallyExcludedWorkers.clear();
+ masterStatusSystem.workers.clear();
masterStatusSystem.restoreMetaFromFile(tmpFile);
+ Assert.assertEquals(3, masterStatusSystem.workers.size());
Assert.assertEquals(3, masterStatusSystem.excludedWorkers.size());
Assert.assertEquals(2, masterStatusSystem.manuallyExcludedWorkers.size());
Assert.assertEquals(3, masterStatusSystem.hostnameSet.size());
@@ -203,5 +209,8 @@ public class MasterStateMachineSuiteJ extends
RatisBaseSuiteJ {
Assert.assertEquals(
originCurrentSnapshot,
masterStatusSystem.appDiskUsageMetric.currentSnapShot().get());
Assert.assertArrayEquals(originSnapshots,
masterStatusSystem.appDiskUsageMetric.snapShots());
+
+ masterStatusSystem.restoreMetaFromFile(tmpFile);
+ Assert.assertEquals(3, masterStatusSystem.workers.size());
}
}