This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 6ad779afb [CELEBORN-1139] Master's follower clean state before install
snapshot
6ad779afb is described below
commit 6ad779afbfb6b312ea8582accdef4debfde4984c
Author: 宪英 <[email protected]>
AuthorDate: Wed Dec 13 09:55:36 2023 +0800
[CELEBORN-1139] Master's follower clean state before install snapshot
Master follower will clean state before install snapshot, instead of adding
When a master's follower node receive a status snapshot from the leader, it
will update the state machine directly without cleaning up the outdated status.
This can cause problems, for example, the worker list may add an extra copy of
registered workers in it.
No.
UT.
org.apache.celeborn.service.deploy.master.clustermeta.ha.MasterStateMachineSuiteJ
Closes #2147 from liying919/main.
Authored-by: 宪英 <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit c1120e8b44dd9b30676cf2028293d33838ab0077)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../deploy/master/clustermeta/AbstractMetaManager.java | 16 ++++++++++++++--
.../master/clustermeta/ha/MasterStateMachineSuiteJ.java | 11 ++++++++++-
2 files changed, 24 insertions(+), 3 deletions(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 5e8f52b6f..f20fd6c26 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -245,6 +245,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public void restoreMetaFromFile(File file) throws IOException {
try (BufferedInputStream in = new BufferedInputStream(new
FileInputStream(file))) {
PbSnapshotMetaInfo snapshotMetaInfo = PbSnapshotMetaInfo.parseFrom(in);
+ cleanUpState();
estimatedPartitionSize = snapshotMetaInfo.getEstimatedPartitionSize();
@@ -293,9 +294,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
- partitionTotalWritten.reset();
partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
- partitionTotalFileCount.reset();
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
appDiskUsageMetric.restoreFromSnapshot(
snapshotMetaInfo.getAppDiskUsageMetricSnapshotsList().stream()
@@ -318,6 +317,19 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
registeredShuffle.forEach(shuffle -> LOG.info("RegisteredShuffle {}",
shuffle));
}
+ private void cleanUpState() {
+ registeredShuffle.clear();
+ hostnameSet.clear();
+ workers.clear();
+ lostWorkers.clear();
+ appHeartbeatTime.clear();
+ excludedWorkers.clear();
+ shutdownWorkers.clear();
+ workerLostEvents.clear();
+ partitionTotalWritten.reset();
+ partitionTotalFileCount.reset();
+ }
+
public void updateMetaByReportWorkerUnavailable(List<WorkerInfo>
failedWorkers) {
synchronized (this.workers) {
shutdownWorkers.addAll(failedWorkers);
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index 05361ddb5..b3c6d7ab2 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -180,13 +180,19 @@ public class MasterStateMachineSuiteJ extends
RatisBaseSuiteJ {
AppDiskUsageSnapShot originCurrentSnapshot =
masterStatusSystem.appDiskUsageMetric.currentSnapShot().get();
+ masterStatusSystem.workers.add(new WorkerInfo(host1, 9095, 9094, 9093,
9092));
+ masterStatusSystem.workers.add(new WorkerInfo(host2, 9095, 9094, 9093,
9092));
+ masterStatusSystem.workers.add(new WorkerInfo(host3, 9095, 9094, 9093,
9092));
+
masterStatusSystem.writeMetaInfoToFile(tmpFile);
masterStatusSystem.hostnameSet.clear();
masterStatusSystem.excludedWorkers.clear();
+ masterStatusSystem.workers.clear();
masterStatusSystem.restoreMetaFromFile(tmpFile);
+ Assert.assertEquals(3, masterStatusSystem.workers.size());
Assert.assertEquals(3, masterStatusSystem.excludedWorkers.size());
Assert.assertEquals(3, masterStatusSystem.hostnameSet.size());
Assert.assertEquals(
@@ -197,6 +203,9 @@ public class MasterStateMachineSuiteJ extends
RatisBaseSuiteJ {
masterStatusSystem.appDiskUsageMetric.currentSnapShot().get().topNItems().length);
Assert.assertEquals(
originCurrentSnapshot,
masterStatusSystem.appDiskUsageMetric.currentSnapShot().get());
- Assert.assertEquals(originSnapshots,
masterStatusSystem.appDiskUsageMetric.snapShots());
+ Assert.assertArrayEquals(originSnapshots,
masterStatusSystem.appDiskUsageMetric.snapShots());
+
+ masterStatusSystem.restoreMetaFromFile(tmpFile);
+ Assert.assertEquals(3, masterStatusSystem.workers.size());
}
}