This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new adbef7b44 [CELEBORN-1499] Bump Ratis version from 3.0.1 to 3.1.0
adbef7b44 is described below

commit adbef7b4412db9eecbe1a039a1cc7d4a8ea9f4d6
Author: SteNicholas <[email protected]>
AuthorDate: Thu Jul 11 16:29:58 2024 +0800

    [CELEBORN-1499] Bump Ratis version from 3.0.1 to 3.1.0
    
    ### What changes were proposed in this pull request?
    
    Bump Ratis version from 3.0.1 to 3.1.0. Meanwhile, remove 
`CelebornStateMachineStorage` with the release of 
https://github.com/apache/ratis/pull/1111.
    
    ### Why are the changes needed?
    
    Bump Ratis version from 3.0.1 to 3.1.0. Ratis has released v3.1.0, of which 
release note refers to [3.1.0](https://ratis.apache.org/post/3.1.0.html). The 
3.1.0 version is a minor release with multiple improvements and bugfixes 
including [[RATIS-2111] Reinitialize should load the latest 
snapshot](https://issues.apache.org/jira/browse/RATIS-2111). See the [changes 
between 3.0.1 and 
3.1.0](https://github.com/apache/ratis/compare/ratis-3.0.1...ratis-3.1.0) 
releases.
    
    Follow up #2547.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `MasterStateMachineSuiteJ#testInstallSnapshot`
    
    Closes #2610 from SteNicholas/CELEBORN-1499.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 LICENSE                                            |   3 -
 dev/deps/dependencies-server                       |  18 +-
 .../ha/CelebornStateMachineStorage.java            | 212 ---------------------
 .../deploy/master/clustermeta/ha/HAHelper.java     |   4 +-
 .../deploy/master/clustermeta/ha/StateMachine.java |  18 +-
 pom.xml                                            |   2 +-
 project/CelebornBuild.scala                        |   2 +-
 7 files changed, 30 insertions(+), 229 deletions(-)

diff --git a/LICENSE b/LICENSE
index 44365ce18..3c8858ec9 100644
--- a/LICENSE
+++ b/LICENSE
@@ -256,6 +256,3 @@ Remote Shuffle Service for Flink
 
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/DataBuffer.java
 
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java
 
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java
-
-Apache Ratis
-./master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index 33b965865..2cdab3abf 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -118,15 +118,15 @@ 
netty-transport/4.1.109.Final//netty-transport-4.1.109.Final.jar
 osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
 paranamer/2.8//paranamer-2.8.jar
 protobuf-java/3.21.7//protobuf-java-3.21.7.jar
-ratis-client/3.0.1//ratis-client-3.0.1.jar
-ratis-common/3.0.1//ratis-common-3.0.1.jar
-ratis-grpc/3.0.1//ratis-grpc-3.0.1.jar
-ratis-metrics-default/3.0.1/ratis-metrics-default-3.0.1.jar
-ratis-netty/3.0.1//ratis-netty-3.0.1.jar
-ratis-proto/3.0.1//ratis-proto-3.0.1.jar
-ratis-server-api/3.0.1//ratis-server-api-3.0.1.jar
-ratis-server/3.0.1//ratis-server-3.0.1.jar
-ratis-shell/3.0.1//ratis-shell-3.0.1.jar
+ratis-client/3.1.0//ratis-client-3.1.0.jar
+ratis-common/3.1.0//ratis-common-3.1.0.jar
+ratis-grpc/3.1.0//ratis-grpc-3.1.0.jar
+ratis-metrics-default/3.1.0/ratis-metrics-default-3.1.0.jar
+ratis-netty/3.1.0//ratis-netty-3.1.0.jar
+ratis-proto/3.1.0//ratis-proto-3.1.0.jar
+ratis-server-api/3.1.0//ratis-server-api-3.1.0.jar
+ratis-server/3.1.0//ratis-server-3.1.0.jar
+ratis-shell/3.1.0//ratis-shell-3.1.0.jar
 ratis-thirdparty-misc/1.0.5//ratis-thirdparty-misc-1.0.5.jar
 reflections/0.10.2//reflections-0.10.2.jar
 rocksdbjni/8.11.3//rocksdbjni-8.11.3.jar
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java
deleted file mode 100644
index 5035a8380..000000000
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.service.deploy.master.clustermeta.ha;
-
-import static org.apache.ratis.util.MD5FileUtil.MD5_SUFFIX;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.ratis.io.MD5Hash;
-import org.apache.ratis.server.storage.FileInfo;
-import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
-import org.apache.ratis.statemachine.StateMachineStorage;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
-import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.MD5FileUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Copied from Apache Ratis {@link SimpleStateMachineStorage}, We need to 
refresh latest snapshot
- * after installing snapshot from leader which makes StateMachine load latest 
snapshot correctly.
- */
-public class CelebornStateMachineStorage implements StateMachineStorage {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(CelebornStateMachineStorage.class);
-
-  static final String SNAPSHOT_FILE_PREFIX = "snapshot";
-  /** snapshot.term_index */
-  public static final Pattern SNAPSHOT_REGEX =
-      Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)");
-
-  public static final Pattern SNAPSHOT_MD5_REGEX =
-      Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)" + MD5_SUFFIX);
-  private static final DirectoryStream.Filter<Path> SNAPSHOT_MD5_FILTER =
-      entry ->
-          Optional.ofNullable(entry.getFileName())
-              .map(Path::toString)
-              .map(SNAPSHOT_MD5_REGEX::matcher)
-              .filter(Matcher::matches)
-              .isPresent();
-
-  private volatile File stateMachineDir = null;
-  private final AtomicReference<SingleFileSnapshotInfo> latestSnapshot = new 
AtomicReference<>();
-
-  File tmpDir = null;
-
-  @Override
-  public void init(RaftStorage storage) throws IOException {
-    this.stateMachineDir = storage.getStorageDir().getStateMachineDir();
-    loadLatestSnapshot();
-    tmpDir = storage.getStorageDir().getTmpDir();
-  }
-
-  @Override
-  public void format() throws IOException {
-    // TODO
-  }
-
-  static List<SingleFileSnapshotInfo> getSingleFileSnapshotInfos(Path dir) 
throws IOException {
-    final List<SingleFileSnapshotInfo> infos = new ArrayList<>();
-    try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
-      for (Path path : stream) {
-        final Path filename = path.getFileName();
-        if (filename != null) {
-          final Matcher matcher = SNAPSHOT_REGEX.matcher(filename.toString());
-          if (matcher.matches()) {
-            final long term = Long.parseLong(matcher.group(1));
-            final long index = Long.parseLong(matcher.group(2));
-            final FileInfo fileInfo = new FileInfo(path, null); // No 
FileDigest here.
-            infos.add(new SingleFileSnapshotInfo(fileInfo, term, index));
-          }
-        }
-      }
-    }
-    return infos;
-  }
-
-  @Override
-  public void cleanupOldSnapshots(SnapshotRetentionPolicy 
snapshotRetentionPolicy)
-      throws IOException {
-    if (stateMachineDir == null) {
-      return;
-    }
-
-    final int numSnapshotsRetained =
-        Optional.ofNullable(snapshotRetentionPolicy)
-            .map(SnapshotRetentionPolicy::getNumSnapshotsRetained)
-            .orElse(SnapshotRetentionPolicy.DEFAULT_ALL_SNAPSHOTS_RETAINED);
-    if (numSnapshotsRetained <= 0) {
-      return;
-    }
-
-    final List<SingleFileSnapshotInfo> allSnapshotFiles =
-        getSingleFileSnapshotInfos(stateMachineDir.toPath());
-
-    if (allSnapshotFiles.size() > 
snapshotRetentionPolicy.getNumSnapshotsRetained()) {
-      
allSnapshotFiles.sort(Comparator.comparing(SingleFileSnapshotInfo::getIndex).reversed());
-      allSnapshotFiles.subList(numSnapshotsRetained, 
allSnapshotFiles.size()).stream()
-          .map(SingleFileSnapshotInfo::getFile)
-          .map(FileInfo::getPath)
-          .forEach(
-              snapshotPath -> {
-                LOG.info("Deleting old snapshot at {}", 
snapshotPath.toAbsolutePath());
-                FileUtils.deletePathQuietly(snapshotPath);
-              });
-      // clean up the md5 files if the corresponding snapshot file does not 
exist
-      try (DirectoryStream<Path> stream =
-          Files.newDirectoryStream(stateMachineDir.toPath(), 
SNAPSHOT_MD5_FILTER)) {
-        for (Path md5path : stream) {
-          Path md5FileNamePath = md5path.getFileName();
-          if (md5FileNamePath == null) {
-            continue;
-          }
-          final String md5FileName = md5FileNamePath.toString();
-          final File snapshotFile =
-              new File(
-                  stateMachineDir,
-                  md5FileName.substring(0, md5FileName.length() - 
MD5_SUFFIX.length()));
-          if (!snapshotFile.exists()) {
-            FileUtils.deletePathQuietly(md5path);
-          }
-        }
-      }
-    }
-  }
-
-  public File getSnapshotFile(long term, long endIndex) {
-    final File dir = Objects.requireNonNull(stateMachineDir, "stateMachineDir 
== null");
-    return new File(dir, getSnapshotFileName(term, endIndex));
-  }
-
-  static SingleFileSnapshotInfo findLatestSnapshot(Path dir) throws 
IOException {
-    final Iterator<SingleFileSnapshotInfo> i = 
getSingleFileSnapshotInfos(dir).iterator();
-    if (!i.hasNext()) {
-      return null;
-    }
-
-    SingleFileSnapshotInfo latest = i.next();
-    for (; i.hasNext(); ) {
-      final SingleFileSnapshotInfo info = i.next();
-      if (info.getIndex() > latest.getIndex()) {
-        latest = info;
-      }
-    }
-
-    // read md5
-    final Path path = latest.getFile().getPath();
-    final MD5Hash md5 = MD5FileUtil.readStoredMd5ForFile(path.toFile());
-    final FileInfo info = new FileInfo(path, md5);
-    return new SingleFileSnapshotInfo(info, latest.getTerm(), 
latest.getIndex());
-  }
-
-  public SingleFileSnapshotInfo updateLatestSnapshot(SingleFileSnapshotInfo 
info) {
-    return latestSnapshot.updateAndGet(
-        previous -> previous == null || info.getIndex() > previous.getIndex() 
? info : previous);
-  }
-
-  public static String getSnapshotFileName(long term, long endIndex) {
-    return SNAPSHOT_FILE_PREFIX + "." + term + "_" + endIndex;
-  }
-
-  @Override
-  public SingleFileSnapshotInfo getLatestSnapshot() {
-    return latestSnapshot.get();
-  }
-
-  @Override
-  public File getTmpDir() {
-    return tmpDir;
-  }
-
-  public void loadLatestSnapshot() {
-    final File dir = stateMachineDir;
-    if (dir == null) {
-      return;
-    }
-    try {
-      updateLatestSnapshot(findLatestSnapshot(dir.toPath()));
-    } catch (IOException ignored) {
-    }
-  }
-}
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
index 8307d6057..71bcaf064 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
@@ -23,6 +23,7 @@ import java.util.Optional;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 import org.apache.celeborn.common.client.MasterNotLeaderException;
@@ -123,8 +124,7 @@ public class HAHelper {
    * @return the temporary snapshot file
    * @throws IOException if error occurred while creating the snapshot file
    */
-  public static File createTempSnapshotFile(CelebornStateMachineStorage 
storage)
-      throws IOException {
+  public static File createTempSnapshotFile(SimpleStateMachineStorage storage) 
throws IOException {
     File tempDir = storage.getTmpDir();
     if (!tempDir.isDirectory() && !tempDir.mkdir()) {
       throw new IOException(
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
index ff43e59c2..40d23c59f 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
@@ -45,6 +45,7 @@ import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.ExitUtils;
@@ -60,7 +61,22 @@ import 
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.Reso
 public class StateMachine extends BaseStateMachine {
   private static final Logger LOG = 
LoggerFactory.getLogger(StateMachine.class);
 
-  private final CelebornStateMachineStorage storage = new 
CelebornStateMachineStorage();
+  private final SimpleStateMachineStorage storage =
+      new SimpleStateMachineStorage() {
+
+        File tmpDir = null;
+
+        @Override
+        public void init(RaftStorage storage) throws IOException {
+          super.init(storage);
+          tmpDir = storage.getStorageDir().getTmpDir();
+        }
+
+        @Override
+        public File getTmpDir() {
+          return tmpDir;
+        }
+      };
 
   private final HARaftServer masterRatisServer;
   private RaftGroupId raftGroupId;
diff --git a/pom.xml b/pom.xml
index e93b7d442..8ada82b3f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,7 @@
     <netty.version>4.1.109.Final</netty.version>
     <bouncycastle.version>1.77</bouncycastle.version>
     <protobuf.version>3.21.7</protobuf.version>
-    <ratis.version>3.0.1</ratis.version>
+    <ratis.version>3.1.0</ratis.version>
     <scalatest.version>3.2.16</scalatest.version>
     <slf4j.version>1.7.36</slf4j.version>
     <roaringbitmap.version>1.0.6</roaringbitmap.version>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 00809d8fa..15d45f09c 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -58,7 +58,7 @@ object Dependencies {
   val metricsVersion = "4.2.25"
   val mockitoVersion = "4.11.0"
   val nettyVersion = "4.1.109.Final"
-  val ratisVersion = "3.0.1"
+  val ratisVersion = "3.1.0"
   val roaringBitmapVersion = "1.0.6"
   val rocksdbJniVersion = "8.11.3"
   val jacksonVersion = "2.15.3"

Reply via email to