This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new adbef7b44 [CELEBORN-1499] Bump Ratis version from 3.0.1 to 3.1.0
adbef7b44 is described below
commit adbef7b4412db9eecbe1a039a1cc7d4a8ea9f4d6
Author: SteNicholas <[email protected]>
AuthorDate: Thu Jul 11 16:29:58 2024 +0800
[CELEBORN-1499] Bump Ratis version from 3.0.1 to 3.1.0
### What changes were proposed in this pull request?
Bump Ratis version from 3.0.1 to 3.1.0. Meanwhile, remove
`CelebornStateMachineStorage` with the release of
https://github.com/apache/ratis/pull/1111.
### Why are the changes needed?
Bump Ratis version from 3.0.1 to 3.1.0. Ratis has released v3.1.0, of which
release note refers to [3.1.0](https://ratis.apache.org/post/3.1.0.html). The
3.1.0 version is a minor release with multiple improvements and bugfixes
including [[RATIS-2111] Reinitialize should load the latest
snapshot](https://issues.apache.org/jira/browse/RATIS-2111). See the [changes
between 3.0.1 and
3.1.0](https://github.com/apache/ratis/compare/ratis-3.0.1...ratis-3.1.0)
releases.
Follow up #2547.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`MasterStateMachineSuiteJ#testInstallSnapshot`
Closes #2610 from SteNicholas/CELEBORN-1499.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
LICENSE | 3 -
dev/deps/dependencies-server | 18 +-
.../ha/CelebornStateMachineStorage.java | 212 ---------------------
.../deploy/master/clustermeta/ha/HAHelper.java | 4 +-
.../deploy/master/clustermeta/ha/StateMachine.java | 18 +-
pom.xml | 2 +-
project/CelebornBuild.scala | 2 +-
7 files changed, 30 insertions(+), 229 deletions(-)
diff --git a/LICENSE b/LICENSE
index 44365ce18..3c8858ec9 100644
--- a/LICENSE
+++ b/LICENSE
@@ -256,6 +256,3 @@ Remote Shuffle Service for Flink
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/DataBuffer.java
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java
-
-Apache Ratis
-./master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index 33b965865..2cdab3abf 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -118,15 +118,15 @@
netty-transport/4.1.109.Final//netty-transport-4.1.109.Final.jar
osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
paranamer/2.8//paranamer-2.8.jar
protobuf-java/3.21.7//protobuf-java-3.21.7.jar
-ratis-client/3.0.1//ratis-client-3.0.1.jar
-ratis-common/3.0.1//ratis-common-3.0.1.jar
-ratis-grpc/3.0.1//ratis-grpc-3.0.1.jar
-ratis-metrics-default/3.0.1/ratis-metrics-default-3.0.1.jar
-ratis-netty/3.0.1//ratis-netty-3.0.1.jar
-ratis-proto/3.0.1//ratis-proto-3.0.1.jar
-ratis-server-api/3.0.1//ratis-server-api-3.0.1.jar
-ratis-server/3.0.1//ratis-server-3.0.1.jar
-ratis-shell/3.0.1//ratis-shell-3.0.1.jar
+ratis-client/3.1.0//ratis-client-3.1.0.jar
+ratis-common/3.1.0//ratis-common-3.1.0.jar
+ratis-grpc/3.1.0//ratis-grpc-3.1.0.jar
+ratis-metrics-default/3.1.0/ratis-metrics-default-3.1.0.jar
+ratis-netty/3.1.0//ratis-netty-3.1.0.jar
+ratis-proto/3.1.0//ratis-proto-3.1.0.jar
+ratis-server-api/3.1.0//ratis-server-api-3.1.0.jar
+ratis-server/3.1.0//ratis-server-3.1.0.jar
+ratis-shell/3.1.0//ratis-shell-3.1.0.jar
ratis-thirdparty-misc/1.0.5//ratis-thirdparty-misc-1.0.5.jar
reflections/0.10.2//reflections-0.10.2.jar
rocksdbjni/8.11.3//rocksdbjni-8.11.3.jar
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java
deleted file mode 100644
index 5035a8380..000000000
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.service.deploy.master.clustermeta.ha;
-
-import static org.apache.ratis.util.MD5FileUtil.MD5_SUFFIX;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.ratis.io.MD5Hash;
-import org.apache.ratis.server.storage.FileInfo;
-import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
-import org.apache.ratis.statemachine.StateMachineStorage;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
-import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.MD5FileUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Copied from Apache Ratis {@link SimpleStateMachineStorage}, We need to
refresh latest snapshot
- * after installing snapshot from leader which makes StateMachine load latest
snapshot correctly.
- */
-public class CelebornStateMachineStorage implements StateMachineStorage {
-
- private static final Logger LOG =
LoggerFactory.getLogger(CelebornStateMachineStorage.class);
-
- static final String SNAPSHOT_FILE_PREFIX = "snapshot";
- /** snapshot.term_index */
- public static final Pattern SNAPSHOT_REGEX =
- Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)");
-
- public static final Pattern SNAPSHOT_MD5_REGEX =
- Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)" + MD5_SUFFIX);
- private static final DirectoryStream.Filter<Path> SNAPSHOT_MD5_FILTER =
- entry ->
- Optional.ofNullable(entry.getFileName())
- .map(Path::toString)
- .map(SNAPSHOT_MD5_REGEX::matcher)
- .filter(Matcher::matches)
- .isPresent();
-
- private volatile File stateMachineDir = null;
- private final AtomicReference<SingleFileSnapshotInfo> latestSnapshot = new
AtomicReference<>();
-
- File tmpDir = null;
-
- @Override
- public void init(RaftStorage storage) throws IOException {
- this.stateMachineDir = storage.getStorageDir().getStateMachineDir();
- loadLatestSnapshot();
- tmpDir = storage.getStorageDir().getTmpDir();
- }
-
- @Override
- public void format() throws IOException {
- // TODO
- }
-
- static List<SingleFileSnapshotInfo> getSingleFileSnapshotInfos(Path dir)
throws IOException {
- final List<SingleFileSnapshotInfo> infos = new ArrayList<>();
- try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
- for (Path path : stream) {
- final Path filename = path.getFileName();
- if (filename != null) {
- final Matcher matcher = SNAPSHOT_REGEX.matcher(filename.toString());
- if (matcher.matches()) {
- final long term = Long.parseLong(matcher.group(1));
- final long index = Long.parseLong(matcher.group(2));
- final FileInfo fileInfo = new FileInfo(path, null); // No
FileDigest here.
- infos.add(new SingleFileSnapshotInfo(fileInfo, term, index));
- }
- }
- }
- }
- return infos;
- }
-
- @Override
- public void cleanupOldSnapshots(SnapshotRetentionPolicy
snapshotRetentionPolicy)
- throws IOException {
- if (stateMachineDir == null) {
- return;
- }
-
- final int numSnapshotsRetained =
- Optional.ofNullable(snapshotRetentionPolicy)
- .map(SnapshotRetentionPolicy::getNumSnapshotsRetained)
- .orElse(SnapshotRetentionPolicy.DEFAULT_ALL_SNAPSHOTS_RETAINED);
- if (numSnapshotsRetained <= 0) {
- return;
- }
-
- final List<SingleFileSnapshotInfo> allSnapshotFiles =
- getSingleFileSnapshotInfos(stateMachineDir.toPath());
-
- if (allSnapshotFiles.size() >
snapshotRetentionPolicy.getNumSnapshotsRetained()) {
-
allSnapshotFiles.sort(Comparator.comparing(SingleFileSnapshotInfo::getIndex).reversed());
- allSnapshotFiles.subList(numSnapshotsRetained,
allSnapshotFiles.size()).stream()
- .map(SingleFileSnapshotInfo::getFile)
- .map(FileInfo::getPath)
- .forEach(
- snapshotPath -> {
- LOG.info("Deleting old snapshot at {}",
snapshotPath.toAbsolutePath());
- FileUtils.deletePathQuietly(snapshotPath);
- });
- // clean up the md5 files if the corresponding snapshot file does not
exist
- try (DirectoryStream<Path> stream =
- Files.newDirectoryStream(stateMachineDir.toPath(),
SNAPSHOT_MD5_FILTER)) {
- for (Path md5path : stream) {
- Path md5FileNamePath = md5path.getFileName();
- if (md5FileNamePath == null) {
- continue;
- }
- final String md5FileName = md5FileNamePath.toString();
- final File snapshotFile =
- new File(
- stateMachineDir,
- md5FileName.substring(0, md5FileName.length() -
MD5_SUFFIX.length()));
- if (!snapshotFile.exists()) {
- FileUtils.deletePathQuietly(md5path);
- }
- }
- }
- }
- }
-
- public File getSnapshotFile(long term, long endIndex) {
- final File dir = Objects.requireNonNull(stateMachineDir, "stateMachineDir
== null");
- return new File(dir, getSnapshotFileName(term, endIndex));
- }
-
- static SingleFileSnapshotInfo findLatestSnapshot(Path dir) throws
IOException {
- final Iterator<SingleFileSnapshotInfo> i =
getSingleFileSnapshotInfos(dir).iterator();
- if (!i.hasNext()) {
- return null;
- }
-
- SingleFileSnapshotInfo latest = i.next();
- for (; i.hasNext(); ) {
- final SingleFileSnapshotInfo info = i.next();
- if (info.getIndex() > latest.getIndex()) {
- latest = info;
- }
- }
-
- // read md5
- final Path path = latest.getFile().getPath();
- final MD5Hash md5 = MD5FileUtil.readStoredMd5ForFile(path.toFile());
- final FileInfo info = new FileInfo(path, md5);
- return new SingleFileSnapshotInfo(info, latest.getTerm(),
latest.getIndex());
- }
-
- public SingleFileSnapshotInfo updateLatestSnapshot(SingleFileSnapshotInfo
info) {
- return latestSnapshot.updateAndGet(
- previous -> previous == null || info.getIndex() > previous.getIndex()
? info : previous);
- }
-
- public static String getSnapshotFileName(long term, long endIndex) {
- return SNAPSHOT_FILE_PREFIX + "." + term + "_" + endIndex;
- }
-
- @Override
- public SingleFileSnapshotInfo getLatestSnapshot() {
- return latestSnapshot.get();
- }
-
- @Override
- public File getTmpDir() {
- return tmpDir;
- }
-
- public void loadLatestSnapshot() {
- final File dir = stateMachineDir;
- if (dir == null) {
- return;
- }
- try {
- updateLatestSnapshot(findLatestSnapshot(dir.toPath()));
- } catch (IOException ignored) {
- }
- }
-}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
index 8307d6057..71bcaf064 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
@@ -23,6 +23,7 @@ import java.util.Optional;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.celeborn.common.client.MasterNotLeaderException;
@@ -123,8 +124,7 @@ public class HAHelper {
* @return the temporary snapshot file
* @throws IOException if error occurred while creating the snapshot file
*/
- public static File createTempSnapshotFile(CelebornStateMachineStorage
storage)
- throws IOException {
+ public static File createTempSnapshotFile(SimpleStateMachineStorage storage)
throws IOException {
File tempDir = storage.getTmpDir();
if (!tempDir.isDirectory() && !tempDir.mkdir()) {
throw new IOException(
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
index ff43e59c2..40d23c59f 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
@@ -45,6 +45,7 @@ import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.ExitUtils;
@@ -60,7 +61,22 @@ import
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.Reso
public class StateMachine extends BaseStateMachine {
private static final Logger LOG =
LoggerFactory.getLogger(StateMachine.class);
- private final CelebornStateMachineStorage storage = new
CelebornStateMachineStorage();
+ private final SimpleStateMachineStorage storage =
+ new SimpleStateMachineStorage() {
+
+ File tmpDir = null;
+
+ @Override
+ public void init(RaftStorage storage) throws IOException {
+ super.init(storage);
+ tmpDir = storage.getStorageDir().getTmpDir();
+ }
+
+ @Override
+ public File getTmpDir() {
+ return tmpDir;
+ }
+ };
private final HARaftServer masterRatisServer;
private RaftGroupId raftGroupId;
diff --git a/pom.xml b/pom.xml
index e93b7d442..8ada82b3f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,7 @@
<netty.version>4.1.109.Final</netty.version>
<bouncycastle.version>1.77</bouncycastle.version>
<protobuf.version>3.21.7</protobuf.version>
- <ratis.version>3.0.1</ratis.version>
+ <ratis.version>3.1.0</ratis.version>
<scalatest.version>3.2.16</scalatest.version>
<slf4j.version>1.7.36</slf4j.version>
<roaringbitmap.version>1.0.6</roaringbitmap.version>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 00809d8fa..15d45f09c 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -58,7 +58,7 @@ object Dependencies {
val metricsVersion = "4.2.25"
val mockitoVersion = "4.11.0"
val nettyVersion = "4.1.109.Final"
- val ratisVersion = "3.0.1"
+ val ratisVersion = "3.1.0"
val roaringBitmapVersion = "1.0.6"
val rocksdbJniVersion = "8.11.3"
val jacksonVersion = "2.15.3"