This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dc6493  RATIS-1498. Take snapshot on specific ratis server (#586)
8dc6493 is described below

commit 8dc6493b0bd320834b87af4c7b362590a63ebdc0
Author: Yaolong Liu <[email protected]>
AuthorDate: Sun Jan 23 21:19:15 2022 +0800

    RATIS-1498. Take snapshot on specific ratis server (#586)
---
 .../java/org/apache/ratis/client/RaftClient.java   |  5 +++-
 .../apache/ratis/client/impl/RaftClientImpl.java   | 10 +++++---
 .../ratis/client/impl/SnapshotManagementImpl.java  | 10 +++++---
 .../ratis/statemachine/SnapshotManagementTest.java | 29 ++++++++++++++++++++--
 .../shell/cli/sh/snapshot/TakeSnapshotCommand.java | 28 +++++++++++++++------
 5 files changed, 66 insertions(+), 16 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index f80a0d9..30e2879 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -58,9 +58,12 @@ public interface RaftClient extends Closeable {
   /** Get the {@link GroupManagementApi} for the given server. */
   GroupManagementApi getGroupManagementApi(RaftPeerId server);
 
-  /** Get the {@link SnapshotManagementApi}. */
+  /** Get the {@link SnapshotManagementApi} for the given server. */
   SnapshotManagementApi getSnapshotManagementApi();
 
+  /** Get the {@link SnapshotManagementApi} for the given server. */
+  SnapshotManagementApi getSnapshotManagementApi(RaftPeerId server);
+
   /** @return the {@link BlockingApi}. */
   BlockingApi io();
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index eae0606..9ccb256 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -141,7 +141,7 @@ public final class RaftClientImpl implements RaftClient {
 
   private final Supplier<AdminImpl> adminApi;
   private final ConcurrentMap<RaftPeerId, GroupManagementImpl> 
groupManagmenets = new ConcurrentHashMap<>();
-  private final Supplier<SnapshotManagementApi> snapshotManagemenet;
+  private final ConcurrentMap<RaftPeerId, SnapshotManagementApi> 
snapshotManagemenet = new ConcurrentHashMap<>();
 
   RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, 
RaftPeer primaryDataStreamServer,
       RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy 
retryPolicy) {
@@ -172,7 +172,6 @@ public final class RaftClientImpl implements RaftClient {
         .setProperties(properties)
         .build());
     this.adminApi = JavaUtils.memoize(() -> new AdminImpl(this));
-    this.snapshotManagemenet= JavaUtils.memoize(() -> new 
SnapshotManagementImpl(this));
   }
 
   public RaftPeerId getLeaderId() {
@@ -248,7 +247,12 @@ public final class RaftClientImpl implements RaftClient {
 
   @Override
   public SnapshotManagementApi getSnapshotManagementApi() {
-    return snapshotManagemenet.get();
+    return JavaUtils.memoize(() -> new SnapshotManagementImpl(null, 
this)).get();
+  }
+
+  @Override
+  public SnapshotManagementApi getSnapshotManagementApi(RaftPeerId server) {
+    return snapshotManagemenet.computeIfAbsent(server, id -> new 
SnapshotManagementImpl(id, this));
   }
 
   @Override
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
index cd72905..1762dc0 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
@@ -19,23 +19,27 @@ package org.apache.ratis.client.impl;
 
 import org.apache.ratis.client.api.SnapshotManagementApi;
 import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.SnapshotManagementRequest;
 import org.apache.ratis.rpc.CallId;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.Optional;
 
 class SnapshotManagementImpl implements SnapshotManagementApi {
   private final RaftClientImpl client;
+  private final RaftPeerId server;
 
-  SnapshotManagementImpl(RaftClientImpl client) {
+  SnapshotManagementImpl(RaftPeerId server, RaftClientImpl client) {
+    this.server = server;
     this.client = Objects.requireNonNull(client, "client == null");
   }
 
   @Override
   public RaftClientReply create(long timeoutMs) throws IOException {
     final long callId = CallId.getAndIncrement();
-    return client.io().sendRequestWithRetry(() -> 
SnapshotManagementRequest.newCreate(
-        client.getId(), client.getLeaderId(), client.getGroupId(), callId, 
timeoutMs));
+    return client.io().sendRequestWithRetry(() -> 
SnapshotManagementRequest.newCreate(client.getId(),
+        Optional.ofNullable(server).orElseGet(client::getLeaderId), 
client.getGroupId(), callId, timeoutMs));
   }
 }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
index d75ba40..6084077 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
@@ -64,6 +64,7 @@ public abstract class SnapshotManagementTest<CLUSTER extends 
MiniRaftCluster>
   public void testTakeSnapshot() throws Exception {
     runWithNewCluster(1, this::runTestTakeSnapshot);
     runWithNewCluster(1,this::runTestTakeSnapshotWithConfigurableGap);
+    runWithNewCluster(3,this::runTestTakeSnapshotOnSpecificServer);
   }
 
   void runTestTakeSnapshot(CLUSTER cluster) throws Exception {
@@ -98,7 +99,7 @@ public abstract class SnapshotManagementTest<CLUSTER extends 
MiniRaftCluster>
       }
       
Assert.assertTrue(leader.getStateMachine().getLastAppliedTermIndex().getIndex()
             < RaftServerConfigKeys.Snapshot.creationGap(getProperties()));
-      snapshotReply = client.getSnapshotManagementApi().create(3000);
+      snapshotReply = client.getSnapshotManagementApi(leaderId).create(3000);
       Assert.assertTrue(snapshotReply.isSuccess());
       Assert.assertEquals(0,snapshotReply.getLogIndex());
       for (int i = 0; i < 
RaftServerConfigKeys.Snapshot.creationGap(getProperties())/2-1; i++) {
@@ -107,7 +108,7 @@ public abstract class SnapshotManagementTest<CLUSTER 
extends MiniRaftCluster>
       }
       final SnapshotManagementRequest r1 = 
SnapshotManagementRequest.newCreate(client.getId(),
           leaderId, cluster.getGroupId(), CallId.getAndIncrement(), 3000);
-      snapshotReply = client.getSnapshotManagementApi().create(3000);
+      snapshotReply = client.getSnapshotManagementApi(leaderId).create(3000);
     }
     Assert.assertTrue(snapshotReply.isSuccess());
     final long snapshotIndex = snapshotReply.getLogIndex();
@@ -118,4 +119,28 @@ public abstract class SnapshotManagementTest<CLUSTER 
extends MiniRaftCluster>
         .getSnapshotFile(leader.getInfo().getCurrentTerm(), snapshotIndex);
     Assert.assertTrue(snapshotFile.exists());
   }
+
+  void runTestTakeSnapshotOnSpecificServer(CLUSTER cluster) throws Exception {
+    final RaftClientReply snapshotReply;
+    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+    final RaftServer.Division follower = cluster.getFollowers().get(0);
+    final RaftPeerId followerId = follower.getId();
+    Assert.assertTrue(follower.getInfo().isFollower());
+    try (final RaftClient client = cluster.createClient(followerId)) {
+      for (int i = 0; i < 
RaftServerConfigKeys.Snapshot.creationGap(getProperties()); i++) {
+        RaftClientReply reply = client.io().send(new 
RaftTestUtil.SimpleMessage("m" + i));
+        Assert.assertTrue(reply.isSuccess());
+      }
+      snapshotReply = client.getSnapshotManagementApi(followerId).create(3000);
+    }
+
+    Assert.assertTrue(snapshotReply.isSuccess());
+    final long snapshotIndex = snapshotReply.getLogIndex();
+    LOG.info("snapshotIndex = {} on {} server {}",
+        snapshotIndex, follower.getInfo().getCurrentRole(), follower.getId());
+
+    final File snapshotFile = SimpleStateMachine4Testing.get(follower)
+        
.getStateMachineStorage().getSnapshotFile(follower.getInfo().getCurrentTerm(), 
snapshotIndex);
+    Assert.assertTrue(snapshotFile.exists());
+  }
 }
diff --git 
a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/snapshot/TakeSnapshotCommand.java
 
b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/snapshot/TakeSnapshotCommand.java
index cc54291..e3c6740 100644
--- 
a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/snapshot/TakeSnapshotCommand.java
+++ 
b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/snapshot/TakeSnapshotCommand.java
@@ -22,6 +22,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.shell.cli.RaftUtils;
 import org.apache.ratis.shell.cli.sh.command.AbstractRatisCommand;
 import org.apache.ratis.shell.cli.sh.command.Context;
@@ -33,6 +34,7 @@ import java.io.IOException;
  */
 public class TakeSnapshotCommand extends AbstractRatisCommand {
   public static final String TAKE_SNAPSHOT_TIMEOUT_OPTION_NAME = 
"snapshotTimeout";
+  public static final String PEER_ID_OPTION_NAME = "peerId";
 
   /**
    * @param context command context
@@ -50,16 +52,22 @@ public class TakeSnapshotCommand extends 
AbstractRatisCommand {
   public int run(CommandLine cl) throws IOException {
     super.run(cl);
     long timeout;
+    final RaftPeerId peerId;
     if (cl.hasOption(TAKE_SNAPSHOT_TIMEOUT_OPTION_NAME)) {
       timeout = 
Long.parseLong(cl.getOptionValue(TAKE_SNAPSHOT_TIMEOUT_OPTION_NAME));
     } else {
       timeout = 3000;
     }
     try(final RaftClient raftClient = RaftUtils.createClient(getRaftGroup())) {
-      RaftClientReply reply = 
raftClient.getSnapshotManagementApi().create(timeout);
-      processReply(reply, () -> String.format("Failed to take snapshot of 
peerId %s", raftClient.getLeaderId()));
-      printf(String.format("Successful take snapshot, the latest snapshot 
index is %d",
-          reply.getLogIndex()));
+      if (cl.hasOption(PEER_ID_OPTION_NAME)) {
+        peerId = 
RaftPeerId.getRaftPeerId(cl.getOptionValue(PEER_ID_OPTION_NAME));
+      } else {
+        peerId = null;
+      }
+      RaftClientReply reply = 
raftClient.getSnapshotManagementApi(peerId).create(timeout);
+      processReply(reply, () -> String.format("Failed to take snapshot of 
peerId %s", peerId));
+      printf(String.format("Successful take snapshot on peerId %s, the latest 
snapshot index is %d",
+          peerId, reply.getLogIndex()));
     }
     return 0;
   }
@@ -69,7 +77,8 @@ public class TakeSnapshotCommand extends AbstractRatisCommand 
{
     return String.format("%s"
             + " -%s 
<PEER0_HOST:PEER0_PORT,PEER1_HOST:PEER1_PORT,PEER2_HOST:PEER2_PORT>"
             + " [-%s <RAFT_GROUP_ID>]"
-            + " [-%s <timeoutInMs>]",
+            + " [-%s <timeoutInMs>]"
+            + " [-%s <raftPeerId>]",
         getCommandName(), PEER_OPTION_NAME, GROUPID_OPTION_NAME, 
TAKE_SNAPSHOT_TIMEOUT_OPTION_NAME);
   }
 
@@ -80,11 +89,16 @@ public class TakeSnapshotCommand extends 
AbstractRatisCommand {
 
   @Override
   public Options getOptions() {
-    return super.getOptions().addOption(
-        Option.builder()
+    return super.getOptions()
+        .addOption(Option.builder()
             .option(TAKE_SNAPSHOT_TIMEOUT_OPTION_NAME)
             .hasArg()
             .desc("timeout to wait taking snapshot in ms")
+            .build())
+        .addOption(Option.builder()
+            .option(PEER_ID_OPTION_NAME)
+            .hasArg()
+            .desc("the id of server takeing snapshot")
             .build());
   }
 

Reply via email to