This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 56a8e17 RATIS-1441. Add snapshot manually request and proto (#538)
56a8e17 is described below
commit 56a8e1730059fdc4e50544e85b09318851f1a492
Author: Yaolong Liu <[email protected]>
AuthorDate: Fri Jan 7 19:04:13 2022 +0800
RATIS-1441. Add snapshot manually request and proto (#538)
---
.../ratis/client/api/SnapshotManagementApi.java | 17 +++++--
.../apache/ratis/client/impl/ClientProtoUtils.java | 24 +++++++++
.../ratis/client/impl/SnapshotManagementImpl.java | 33 ++++++------
.../ratis/protocol/AdminAsynchronousProtocol.java | 2 +
.../org/apache/ratis/protocol/AdminProtocol.java | 2 +
.../ratis/protocol/SnapshotManagementRequest.java | 58 ++++++++++++++++++++++
...mbinedClientProtocolClientSideTranslatorPB.java | 7 +++
ratis-proto/src/main/proto/Raft.proto | 13 +++++
.../apache/ratis/server/impl/RaftServerImpl.java | 12 +++--
.../apache/ratis/server/impl/RaftServerProxy.java | 32 ++++++++++--
....java => SnapshotManagementRequestHandler.java} | 12 ++---
.../ratis/server/impl/RaftServerTestUtil.java | 4 +-
.../server/simulation/SimulatedServerRpc.java | 3 ++
.../ratis/statemachine/SnapshotManagementTest.java | 18 ++++---
14 files changed, 194 insertions(+), 43 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotRequest.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/SnapshotManagementApi.java
similarity index 70%
rename from
ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotRequest.java
rename to
ratis-client/src/main/java/org/apache/ratis/client/api/SnapshotManagementApi.java
index 5d4c9aa..edd0475 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotRequest.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/api/SnapshotManagementApi.java
@@ -15,11 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.protocol;
+package org.apache.ratis.client.api;
-public final class SnapshotRequest extends RaftClientRequest {
+import org.apache.ratis.protocol.RaftClientReply;
- public SnapshotRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId
groupId,long callId, long timeoutMs) {
- super(clientId, serverId, groupId, callId, readRequestType(), timeoutMs);
- }
+import java.io.IOException;
+
+/**
+ * An API to support control snapshot
+ * such as create and list snapshot file.
+ */
+public interface SnapshotManagementApi {
+
+ /** trigger create snapshot file. */
+ RaftClientReply create(long timeoutMs) throws IOException;
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 9262186..9478c89 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -595,6 +595,30 @@ public interface ClientProtoUtils {
return b.build();
}
+ static SnapshotManagementRequest
toSnapshotManagementRequest(SnapshotManagementRequestProto p) {
+ final RaftRpcRequestProto m = p.getRpcRequest();
+ final ClientId clientId = ClientId.valueOf(m.getRequestorId());
+ final RaftPeerId serverId = RaftPeerId.valueOf(m.getReplyId());
+ switch(p.getOpCase()) {
+ case CREATE:
+ return SnapshotManagementRequest.newCreate(clientId, serverId,
+ ProtoUtils.toRaftGroupId(m.getRaftGroupId()), m.getCallId(),
m.getTimeoutMs());
+ default:
+ throw new IllegalArgumentException("Unexpected op " + p.getOpCase() +
" in " + p);
+ }
+ }
+
+ static SnapshotManagementRequestProto toSnapshotManagementRequestProto(
+ SnapshotManagementRequest request) {
+ final SnapshotManagementRequestProto.Builder b =
SnapshotManagementRequestProto.newBuilder()
+ .setRpcRequest(toRaftRpcRequestProtoBuilder(request));
+ final SnapshotManagementRequest.Create create = request.getCreate();
+ if (create != null) {
+ b.setCreate(SnapshotCreateRequestProto.newBuilder().build());
+ }
+ return b.build();
+ }
+
static GroupInfoRequestProto toGroupInfoRequestProto(
GroupInfoRequest request) {
return GroupInfoRequestProto.newBuilder()
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
similarity index 51%
copy from
ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
copy to
ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
index c9b2f7e..cd72905 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/SnapshotManagementImpl.java
@@ -15,22 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.protocol;
+package org.apache.ratis.client.impl;
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
-/** Asynchronous version of {@link AdminProtocol}. */
-public interface AdminAsynchronousProtocol {
- CompletableFuture<GroupListReply> getGroupListAsync(GroupListRequest
request);
+import org.apache.ratis.client.api.SnapshotManagementApi;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.SnapshotManagementRequest;
+import org.apache.ratis.rpc.CallId;
- CompletableFuture<GroupInfoReply> getGroupInfoAsync(GroupInfoRequest
request);
+import java.io.IOException;
+import java.util.Objects;
- CompletableFuture<RaftClientReply>
groupManagementAsync(GroupManagementRequest request);
+class SnapshotManagementImpl implements SnapshotManagementApi {
+ private final RaftClientImpl client;
- CompletableFuture<RaftClientReply> setConfigurationAsync(
- SetConfigurationRequest request) throws IOException;
+ SnapshotManagementImpl(RaftClientImpl client) {
+ this.client = Objects.requireNonNull(client, "client == null");
+ }
- CompletableFuture<RaftClientReply> transferLeadershipAsync(
- TransferLeadershipRequest request) throws IOException;
-}
\ No newline at end of file
+ @Override
+ public RaftClientReply create(long timeoutMs) throws IOException {
+ final long callId = CallId.getAndIncrement();
+ return client.io().sendRequestWithRetry(() ->
SnapshotManagementRequest.newCreate(
+ client.getId(), client.getLeaderId(), client.getGroupId(), callId,
timeoutMs));
+ }
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
index c9b2f7e..9c7d62c 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
@@ -28,6 +28,8 @@ public interface AdminAsynchronousProtocol {
CompletableFuture<RaftClientReply>
groupManagementAsync(GroupManagementRequest request);
+ CompletableFuture<RaftClientReply>
snapshotManagementAsync(SnapshotManagementRequest request);
+
CompletableFuture<RaftClientReply> setConfigurationAsync(
SetConfigurationRequest request) throws IOException;
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
index b10642c..849466a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
@@ -27,6 +27,8 @@ public interface AdminProtocol {
RaftClientReply groupManagement(GroupManagementRequest request) throws
IOException;
+ RaftClientReply snapshotManagement(SnapshotManagementRequest request) throws
IOException;
+
RaftClientReply setConfiguration(SetConfigurationRequest request) throws
IOException;
RaftClientReply transferLeadership(TransferLeadershipRequest request) throws
IOException;
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotManagementRequest.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotManagementRequest.java
new file mode 100644
index 0000000..2ea2059
--- /dev/null
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotManagementRequest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.util.JavaUtils;
+
+public final class SnapshotManagementRequest extends RaftClientRequest {
+
+ public abstract static class Op {
+
+ }
+ public static class Create extends Op {
+
+ @Override
+ public String toString() {
+ return JavaUtils.getClassSimpleName(getClass()) + ":" ;
+ }
+
+ }
+
+ public static SnapshotManagementRequest newCreate(ClientId clientId,
+ RaftPeerId serverId, RaftGroupId groupId, long callId, long timeoutMs) {
+ return new SnapshotManagementRequest(clientId,
+ serverId, groupId, callId, timeoutMs,new
SnapshotManagementRequest.Create());
+ }
+
+ private final Op op;
+
+ public SnapshotManagementRequest(ClientId clientId,
+ RaftPeerId serverId, RaftGroupId groupId, long callId, long timeoutMs,
Op op) {
+ super(clientId, serverId, groupId, callId, readRequestType(), timeoutMs);
+ this.op = op;
+ }
+
+ public SnapshotManagementRequest.Create getCreate() {
+ return op instanceof Create ? (Create)op: null;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", " + op;
+ }
+}
diff --git
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
index dba6046..4f10d26 100644
---
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
+++
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
@@ -34,6 +34,7 @@ import org.apache.ratis.protocol.GroupManagementRequest;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.SnapshotManagementRequest;
import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.thirdparty.com.google.protobuf
.GeneratedMessageV3;
@@ -98,6 +99,12 @@ public class CombinedClientProtocolClientSideTranslatorPB
}
@Override
+ public RaftClientReply snapshotManagement(SnapshotManagementRequest request)
throws IOException {
+ //todo(codings-dan): add proto related to hadoop
+ return null;
+ }
+
+ @Override
public GroupListReply getGroupList(GroupListRequest request) throws
IOException {
return handleRequest(request,
ClientProtoUtils::toGroupListRequestProto,
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index 109841f..17e7002 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -413,6 +413,19 @@ message TransferLeadershipRequestProto {
RaftPeerProto newLeader = 2;
}
+// snapshot request
+message SnapshotManagementRequestProto {
+ RaftRpcRequestProto rpcRequest = 1;
+
+ oneof Op {
+ SnapshotCreateRequestProto create = 2;
+ }
+}
+
+message SnapshotCreateRequestProto {
+
+}
+
message StartLeaderElectionRequestProto {
RaftRpcRequestProto serverRequest = 1;
TermIndexProto leaderLastEntry = 2;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index fdba1d7..d7d45b5 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -179,7 +179,7 @@ class RaftServerImpl implements RaftServer.Division,
private final AtomicBoolean startComplete;
private final TransferLeadership transferLeadership;
- private final SnapshotRequestHandler snapshotRequestHandler;
+ private final SnapshotManagementRequestHandler snapshotRequestHandler;
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy
proxy) throws IOException {
final RaftPeerId id = proxy.getId();
@@ -220,7 +220,7 @@ class RaftServerImpl implements RaftServer.Division,
});
this.transferLeadership = new TransferLeadership(this);
- this.snapshotRequestHandler = new SnapshotRequestHandler(this);
+ this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
}
@Override
@@ -974,7 +974,11 @@ class RaftServerImpl implements RaftServer.Division,
}
}
- CompletableFuture<RaftClientReply> takeSnapshotAsync(SnapshotRequest
request) throws IOException {
+ public RaftClientReply takeSnapshot(SnapshotManagementRequest request)
throws IOException {
+ return waitForReply(request, takeSnapshotAsync(request));
+ }
+
+ CompletableFuture<RaftClientReply>
takeSnapshotAsync(SnapshotManagementRequest request) throws IOException {
LOG.info("{}: takeSnapshotAsync {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(request.getRequestorId(), request.getRaftGroupId());
@@ -1001,7 +1005,7 @@ class RaftServerImpl implements RaftServer.Division,
}
}
- SnapshotRequestHandler getSnapshotRequestHandler() {
+ SnapshotManagementRequestHandler getSnapshotRequestHandler() {
return snapshotRequestHandler;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index f9dc2bb..7c81d58 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -510,6 +510,34 @@ class RaftServerProxy implements RaftServer {
}
@Override
+ public RaftClientReply snapshotManagement(SnapshotManagementRequest request)
throws IOException {
+ return RaftServerImpl.waitForReply(getId(), request,
snapshotManagementAsync(request),
+ e -> RaftClientReply.newBuilder()
+ .setRequest(request)
+ .setException(e)
+ .build());
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply>
snapshotManagementAsync(SnapshotManagementRequest request) {
+ final RaftGroupId groupId = request.getRaftGroupId();
+ if (groupId == null) {
+ return JavaUtils.completeExceptionally(new GroupMismatchException(
+ getId() + ": Request group id == null"));
+ }
+ final SnapshotManagementRequest.Create create = request.getCreate();
+ if (create != null) {
+ return createAsync(request);
+ }
+ return JavaUtils.completeExceptionally(new UnsupportedOperationException(
+ getId() + ": Request not supported " + request));
+ }
+
+ private CompletableFuture<RaftClientReply>
createAsync(SnapshotManagementRequest request) {
+ return submitRequest(request.getRaftGroupId(), impl ->
impl.takeSnapshotAsync(request));
+ }
+
+ @Override
public GroupListReply getGroupList(GroupListRequest request) {
return new GroupListReply(request, getGroupIds());
}
@@ -530,10 +558,6 @@ class RaftServerProxy implements RaftServer {
server -> server.getGroupInfo(request));
}
- public CompletableFuture<RaftClientReply> takeSnapshotAsync(SnapshotRequest
request) {
- return submitRequest(request.getRaftGroupId(), impl ->
impl.takeSnapshotAsync(request));
- }
-
/**
* Handle a raft configuration change request from client.
*/
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotRequestHandler.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java
similarity index 93%
rename from
ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotRequestHandler.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java
index 9e2f355..416e501 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotRequestHandler.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java
@@ -18,7 +18,7 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.protocol.SnapshotManagementRequest;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
@@ -34,15 +34,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
-class SnapshotRequestHandler {
+class SnapshotManagementRequestHandler {
public static final Logger LOG =
LoggerFactory.getLogger(TransferLeadership.class);
class PendingRequest {
- private final SnapshotRequest request;
+ private final SnapshotManagementRequest request;
private final CompletableFuture<RaftClientReply> replyFuture = new
CompletableFuture<>();
private final AtomicBoolean triggerTakingSnapshot = new
AtomicBoolean(true);
- PendingRequest(SnapshotRequest request) {
+ PendingRequest(SnapshotManagementRequest request) {
LOG.info("new PendingRequest " + request);
this.request = request;
}
@@ -92,11 +92,11 @@ class SnapshotRequestHandler {
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private final PendingRequestReference pending = new
PendingRequestReference();
- SnapshotRequestHandler(RaftServerImpl server) {
+ SnapshotManagementRequestHandler(RaftServerImpl server) {
this.server = server;
}
- CompletableFuture<RaftClientReply> takingSnapshotAsync(SnapshotRequest
request) {
+ CompletableFuture<RaftClientReply>
takingSnapshotAsync(SnapshotManagementRequest request) {
final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() ->
new PendingRequest(request));
final PendingRequest previous = pending.getAndUpdate(supplier);
if (previous != null) {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index de0474a..f3cc38c 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -25,7 +25,7 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.protocol.SnapshotManagementRequest;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.DataStreamServer;
import org.apache.ratis.server.DivisionInfo;
@@ -181,7 +181,7 @@ public class RaftServerTestUtil {
return ((RaftConfigurationImpl)config).isHighestPriority(peerId);
}
- public static CompletableFuture<RaftClientReply>
takeSnapshotAsync(RaftServer.Division leader, SnapshotRequest r)
+ public static CompletableFuture<RaftClientReply>
takeSnapshotAsync(RaftServer.Division leader, SnapshotManagementRequest r)
throws IOException {
return ((RaftServerImpl)leader).takeSnapshotAsync(r);
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 081253f..0a58826 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -33,6 +33,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.SnapshotManagementRequest;
import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
@@ -192,6 +193,8 @@ class SimulatedServerRpc implements RaftServerRpc {
future = server.setConfigurationAsync((SetConfigurationRequest)
request);
} else if (request instanceof TransferLeadershipRequest) {
future = server.transferLeadershipAsync((TransferLeadershipRequest)
request);
+ } else if (request instanceof SnapshotManagementRequest) {
+ future = server.snapshotManagementAsync((SnapshotManagementRequest)
request);
} else {
future = server.submitClientRequestAsync(request);
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
index f941458..385b77e 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
@@ -24,7 +24,7 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.protocol.SnapshotManagementRequest;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -75,8 +75,8 @@ public abstract class SnapshotManagementTest<CLUSTER extends
MiniRaftCluster>
RaftClientReply reply = client.io().send(new
RaftTestUtil.SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
- final SnapshotRequest r = new SnapshotRequest(client.getId(), leaderId,
cluster.getGroupId(),
- CallId.getAndIncrement(), 3000);
+ final SnapshotManagementRequest r =
SnapshotManagementRequest.newCreate(client.getId(),
+ leaderId, cluster.getGroupId(), CallId.getAndIncrement(), 3000);
snapshotReply = RaftServerTestUtil.takeSnapshotAsync(leader, r).join();
}
@@ -100,8 +100,9 @@ public abstract class SnapshotManagementTest<CLUSTER
extends MiniRaftCluster>
}
Assert.assertTrue(leader.getStateMachine().getLastAppliedTermIndex().getIndex()
< RaftServerConfigKeys.Snapshot.creationGap(getProperties()));
- final SnapshotRequest r = new SnapshotRequest(client.getId(), leaderId,
cluster.getGroupId(),
- CallId.getAndIncrement(), 3000);
+ final SnapshotManagementRequest r =
+ SnapshotManagementRequest.newCreate(client.getId(), leaderId,
cluster.getGroupId(),
+ CallId.getAndIncrement(), 3000);
snapshotReply = RaftServerTestUtil.takeSnapshotAsync(leader, r).join();
Assert.assertTrue(snapshotReply.isSuccess());
Assert.assertEquals(0,snapshotReply.getLogIndex());
@@ -109,8 +110,8 @@ public abstract class SnapshotManagementTest<CLUSTER
extends MiniRaftCluster>
RaftClientReply reply = client.io().send(new
RaftTestUtil.SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
- final SnapshotRequest r1 = new SnapshotRequest(client.getId(), leaderId,
cluster.getGroupId(),
- CallId.getAndIncrement(), 3000);
+ final SnapshotManagementRequest r1 =
SnapshotManagementRequest.newCreate(client.getId(),
+ leaderId, cluster.getGroupId(), CallId.getAndIncrement(), 3000);
snapshotReply = RaftServerTestUtil.takeSnapshotAsync(leader, r1).join();
}
Assert.assertTrue(snapshotReply.isSuccess());
@@ -118,7 +119,8 @@ public abstract class SnapshotManagementTest<CLUSTER
extends MiniRaftCluster>
LOG.info("snapshotIndex = {}", snapshotIndex);
final File snapshotFile = SimpleStateMachine4Testing.get(leader)
-
.getStateMachineStorage().getSnapshotFile(leader.getInfo().getCurrentTerm(),
snapshotIndex);
+ .getStateMachineStorage()
+ .getSnapshotFile(leader.getInfo().getCurrentTerm(), snapshotIndex);
Assert.assertTrue(snapshotFile.exists());
}
}