This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 8dc6493 RATIS-1498. Take snapshot on specific ratis server (#586)
8dc6493 is described below
commit 8dc6493b0bd320834b87af4c7b362590a63ebdc0
Author: Yaolong Liu <[email protected]>
AuthorDate: Sun Jan 23 21:19:15 2022 +0800
RATIS-1498. Take snapshot on specific ratis server (#586)
---
.../java/org/apache/ratis/client/RaftClient.java | 5 +++-
.../apache/ratis/client/impl/RaftClientImpl.java | 10 +++++---
.../ratis/client/impl/SnapshotManagementImpl.java | 10 +++++---
.../ratis/statemachine/SnapshotManagementTest.java | 29 ++++++++++++++++++++--
.../shell/cli/sh/snapshot/TakeSnapshotCommand.java | 28 +++++++++++++++------
5 files changed, 66 insertions(+), 16 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index f80a0d9..30e2879 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -58,9 +58,12 @@ public interface RaftClient extends Closeable {
/** Get the {@link GroupManagementApi} for the given server. */
GroupManagementApi getGroupManagementApi(RaftPeerId server);
- /** Get the {@link SnapshotManagementApi}. */
+ /** Get the {@link SnapshotManagementApi} for the given server. */
SnapshotManagementApi getSnapshotManagementApi();
+ /** Get the {@link SnapshotManagementApi} for the given server. */
+ SnapshotManagementApi getSnapshotManagementApi(RaftPeerId server);
+
/** @return the {@link BlockingApi}. */
BlockingApi io();
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index eae0606..9ccb256 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -141,7 +141,7 @@ public final class RaftClientImpl implements RaftClient {
private final Supplier<AdminImpl> adminApi;
private final ConcurrentMap<RaftPeerId, GroupManagementImpl>
groupManagmenets = new ConcurrentHashMap<>();
- private final Supplier<SnapshotManagementApi> snapshotManagemenet;
+ private final ConcurrentMap<RaftPeerId, SnapshotManagementApi>
snapshotManagemenet = new ConcurrentHashMap<>();
RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
RaftPeer primaryDataStreamServer,
RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy
retryPolicy) {
@@ -172,7 +172,6 @@ public final class RaftClientImpl implements RaftClient {
.setProperties(properties)
.build());
this.adminApi = JavaUtils.memoize(() -> new AdminImpl(this));
- this.snapshotManagemenet= JavaUtils.memoize(() -> new
SnapshotManagementImpl(this));
}
public RaftPeerId getLeaderId() {
@@ -248,7 +247,12 @@ public final class RaftClientImpl implements RaftClient {
@Override
public SnapshotManagementApi getSnapshotManagementApi() {
- return snapshotManagemenet.get();
+ return JavaUtils.memoize(() -> new SnapshotManagementImpl(null,
this)).get();
+ }
+
+ @Override
+ public SnapshotManagementApi getSnapshotManagementApi(RaftPeerId server) {
+ return snapshotManagemenet.computeIfAbsent(server, id -> new
SnapshotManagementImpl(id, this));
}
@Override
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
index cd72905..1762dc0 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
@@ -19,23 +19,27 @@ package org.apache.ratis.client.impl;
import org.apache.ratis.client.api.SnapshotManagementApi;
import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SnapshotManagementRequest;
import org.apache.ratis.rpc.CallId;
import java.io.IOException;
import java.util.Objects;
+import java.util.Optional;
class SnapshotManagementImpl implements SnapshotManagementApi {
private final RaftClientImpl client;
+ private final RaftPeerId server;
- SnapshotManagementImpl(RaftClientImpl client) {
+ SnapshotManagementImpl(RaftPeerId server, RaftClientImpl client) {
+ this.server = server;
this.client = Objects.requireNonNull(client, "client == null");
}
@Override
public RaftClientReply create(long timeoutMs) throws IOException {
final long callId = CallId.getAndIncrement();
- return client.io().sendRequestWithRetry(() ->
SnapshotManagementRequest.newCreate(
- client.getId(), client.getLeaderId(), client.getGroupId(), callId,
timeoutMs));
+ return client.io().sendRequestWithRetry(() ->
SnapshotManagementRequest.newCreate(client.getId(),
+ Optional.ofNullable(server).orElseGet(client::getLeaderId),
client.getGroupId(), callId, timeoutMs));
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
index d75ba40..6084077 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
@@ -64,6 +64,7 @@ public abstract class SnapshotManagementTest<CLUSTER extends
MiniRaftCluster>
public void testTakeSnapshot() throws Exception {
runWithNewCluster(1, this::runTestTakeSnapshot);
runWithNewCluster(1,this::runTestTakeSnapshotWithConfigurableGap);
+ runWithNewCluster(3,this::runTestTakeSnapshotOnSpecificServer);
}
void runTestTakeSnapshot(CLUSTER cluster) throws Exception {
@@ -98,7 +99,7 @@ public abstract class SnapshotManagementTest<CLUSTER extends
MiniRaftCluster>
}
Assert.assertTrue(leader.getStateMachine().getLastAppliedTermIndex().getIndex()
< RaftServerConfigKeys.Snapshot.creationGap(getProperties()));
- snapshotReply = client.getSnapshotManagementApi().create(3000);
+ snapshotReply = client.getSnapshotManagementApi(leaderId).create(3000);
Assert.assertTrue(snapshotReply.isSuccess());
Assert.assertEquals(0,snapshotReply.getLogIndex());
for (int i = 0; i <
RaftServerConfigKeys.Snapshot.creationGap(getProperties())/2-1; i++) {
@@ -107,7 +108,7 @@ public abstract class SnapshotManagementTest<CLUSTER
extends MiniRaftCluster>
}
final SnapshotManagementRequest r1 =
SnapshotManagementRequest.newCreate(client.getId(),
leaderId, cluster.getGroupId(), CallId.getAndIncrement(), 3000);
- snapshotReply = client.getSnapshotManagementApi().create(3000);
+ snapshotReply = client.getSnapshotManagementApi(leaderId).create(3000);
}
Assert.assertTrue(snapshotReply.isSuccess());
final long snapshotIndex = snapshotReply.getLogIndex();
@@ -118,4 +119,28 @@ public abstract class SnapshotManagementTest<CLUSTER
extends MiniRaftCluster>
.getSnapshotFile(leader.getInfo().getCurrentTerm(), snapshotIndex);
Assert.assertTrue(snapshotFile.exists());
}
+
+ void runTestTakeSnapshotOnSpecificServer(CLUSTER cluster) throws Exception {
+ final RaftClientReply snapshotReply;
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftServer.Division follower = cluster.getFollowers().get(0);
+ final RaftPeerId followerId = follower.getId();
+ Assert.assertTrue(follower.getInfo().isFollower());
+ try (final RaftClient client = cluster.createClient(followerId)) {
+ for (int i = 0; i <
RaftServerConfigKeys.Snapshot.creationGap(getProperties()); i++) {
+ RaftClientReply reply = client.io().send(new
RaftTestUtil.SimpleMessage("m" + i));
+ Assert.assertTrue(reply.isSuccess());
+ }
+ snapshotReply = client.getSnapshotManagementApi(followerId).create(3000);
+ }
+
+ Assert.assertTrue(snapshotReply.isSuccess());
+ final long snapshotIndex = snapshotReply.getLogIndex();
+ LOG.info("snapshotIndex = {} on {} server {}",
+ snapshotIndex, follower.getInfo().getCurrentRole(), follower.getId());
+
+ final File snapshotFile = SimpleStateMachine4Testing.get(follower)
+
.getStateMachineStorage().getSnapshotFile(follower.getInfo().getCurrentTerm(),
snapshotIndex);
+ Assert.assertTrue(snapshotFile.exists());
+ }
}
diff --git
a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/snapshot/TakeSnapshotCommand.java
b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/snapshot/TakeSnapshotCommand.java
index cc54291..e3c6740 100644
---
a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/snapshot/TakeSnapshotCommand.java
+++
b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/snapshot/TakeSnapshotCommand.java
@@ -22,6 +22,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.shell.cli.RaftUtils;
import org.apache.ratis.shell.cli.sh.command.AbstractRatisCommand;
import org.apache.ratis.shell.cli.sh.command.Context;
@@ -33,6 +34,7 @@ import java.io.IOException;
*/
public class TakeSnapshotCommand extends AbstractRatisCommand {
public static final String TAKE_SNAPSHOT_TIMEOUT_OPTION_NAME =
"snapshotTimeout";
+ public static final String PEER_ID_OPTION_NAME = "peerId";
/**
* @param context command context
@@ -50,16 +52,22 @@ public class TakeSnapshotCommand extends
AbstractRatisCommand {
public int run(CommandLine cl) throws IOException {
super.run(cl);
long timeout;
+ final RaftPeerId peerId;
if (cl.hasOption(TAKE_SNAPSHOT_TIMEOUT_OPTION_NAME)) {
timeout =
Long.parseLong(cl.getOptionValue(TAKE_SNAPSHOT_TIMEOUT_OPTION_NAME));
} else {
timeout = 3000;
}
try(final RaftClient raftClient = RaftUtils.createClient(getRaftGroup())) {
- RaftClientReply reply =
raftClient.getSnapshotManagementApi().create(timeout);
- processReply(reply, () -> String.format("Failed to take snapshot of
peerId %s", raftClient.getLeaderId()));
- printf(String.format("Successful take snapshot, the latest snapshot
index is %d",
- reply.getLogIndex()));
+ if (cl.hasOption(PEER_ID_OPTION_NAME)) {
+ peerId =
RaftPeerId.getRaftPeerId(cl.getOptionValue(PEER_ID_OPTION_NAME));
+ } else {
+ peerId = null;
+ }
+ RaftClientReply reply =
raftClient.getSnapshotManagementApi(peerId).create(timeout);
+ processReply(reply, () -> String.format("Failed to take snapshot of
peerId %s", peerId));
+ printf(String.format("Successful take snapshot on peerId %s, the latest
snapshot index is %d",
+ peerId, reply.getLogIndex()));
}
return 0;
}
@@ -69,7 +77,8 @@ public class TakeSnapshotCommand extends AbstractRatisCommand
{
return String.format("%s"
+ " -%s
<PEER0_HOST:PEER0_PORT,PEER1_HOST:PEER1_PORT,PEER2_HOST:PEER2_PORT>"
+ " [-%s <RAFT_GROUP_ID>]"
- + " [-%s <timeoutInMs>]",
+ + " [-%s <timeoutInMs>]"
+ + " [-%s <raftPeerId>]",
getCommandName(), PEER_OPTION_NAME, GROUPID_OPTION_NAME,
TAKE_SNAPSHOT_TIMEOUT_OPTION_NAME);
}
@@ -80,11 +89,16 @@ public class TakeSnapshotCommand extends
AbstractRatisCommand {
@Override
public Options getOptions() {
- return super.getOptions().addOption(
- Option.builder()
+ return super.getOptions()
+ .addOption(Option.builder()
.option(TAKE_SNAPSHOT_TIMEOUT_OPTION_NAME)
.hasArg()
.desc("timeout to wait taking snapshot in ms")
+ .build())
+ .addOption(Option.builder()
+ .option(PEER_ID_OPTION_NAME)
+ .hasArg()
+ .desc("the id of server takeing snapshot")
.build());
}