133tosakarin commented on code in PR #1168:
URL: https://github.com/apache/ratis/pull/1168#discussion_r1806766378
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java:
##########
@@ -210,25 +214,27 @@ private InstallSnapshotReplyProto
checkAndInstallSnapshot(InstallSnapshotRequest
if (snapshotChunkRequest.getDone()) {
LOG.info("{}: successfully install the entire snapshot-{}",
getMemberId(), lastIncludedIndex);
}
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
- currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.SUCCESS);
+ return future.thenApply(dummy -> toInstallSnapshotReplyProto(leaderId,
getMemberId(),
+ currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.SUCCESS));
}
- private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
+ private CompletableFuture<InstallSnapshotReplyProto>
notifyStateMachineToInstallSnapshot(
InstallSnapshotRequestProto request, RaftPeerId leaderId) throws
IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
request.getNotification().getFirstAvailableTermIndex());
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
+ final CompletableFuture<Void> future;
+ final InstallSnapshotReplyProto replyProto;
Review Comment:
Ok
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java:
##########
@@ -156,21 +157,23 @@ private InstallSnapshotReplyProto
installSnapshotImpl(InstallSnapshotRequestProt
return failedReply;
}
- private InstallSnapshotReplyProto
checkAndInstallSnapshot(InstallSnapshotRequestProto request,
+ private CompletableFuture<InstallSnapshotReplyProto>
checkAndInstallSnapshot(InstallSnapshotRequestProto request,
RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest
= request.getSnapshotChunk();
final TermIndex lastIncluded =
TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
final long lastIncludedIndex = lastIncluded.getIndex();
+ final CompletableFuture<Void> future;
+ final InstallSnapshotReplyProto reply;
Review Comment:
ok, thanks for your comment
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java:
##########
@@ -186,8 +189,9 @@ private InstallSnapshotReplyProto
checkAndInstallSnapshot(InstallSnapshotRequest
// have a lot of requests
if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1);
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
Review Comment:
Ok
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java:
##########
@@ -156,21 +157,23 @@ private InstallSnapshotReplyProto
installSnapshotImpl(InstallSnapshotRequestProt
return failedReply;
}
- private InstallSnapshotReplyProto
checkAndInstallSnapshot(InstallSnapshotRequestProto request,
+ private CompletableFuture<InstallSnapshotReplyProto>
checkAndInstallSnapshot(InstallSnapshotRequestProto request,
RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest
= request.getSnapshotChunk();
final TermIndex lastIncluded =
TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
final long lastIncludedIndex = lastIncluded.getIndex();
+ final CompletableFuture<Void> future;
+ final InstallSnapshotReplyProto reply;
Review Comment:
ok, thanks for your comment
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java:
##########
@@ -210,25 +214,27 @@ private InstallSnapshotReplyProto
checkAndInstallSnapshot(InstallSnapshotRequest
if (snapshotChunkRequest.getDone()) {
LOG.info("{}: successfully install the entire snapshot-{}",
getMemberId(), lastIncludedIndex);
}
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
- currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.SUCCESS);
+ return future.thenApply(dummy -> toInstallSnapshotReplyProto(leaderId,
getMemberId(),
+ currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.SUCCESS));
Review Comment:
Ok
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java:
##########
@@ -156,21 +157,23 @@ private InstallSnapshotReplyProto
installSnapshotImpl(InstallSnapshotRequestProt
return failedReply;
}
- private InstallSnapshotReplyProto
checkAndInstallSnapshot(InstallSnapshotRequestProto request,
+ private CompletableFuture<InstallSnapshotReplyProto>
checkAndInstallSnapshot(InstallSnapshotRequestProto request,
RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest
= request.getSnapshotChunk();
final TermIndex lastIncluded =
TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
final long lastIncludedIndex = lastIncluded.getIndex();
+ final CompletableFuture<Void> future;
+ final InstallSnapshotReplyProto reply;
Review Comment:
ok, thanks for your comment
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java:
##########
@@ -186,8 +189,9 @@ private InstallSnapshotReplyProto
checkAndInstallSnapshot(InstallSnapshotRequest
// have a lot of requests
if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1);
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
Review Comment:
Ok
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java:
##########
@@ -341,21 +349,23 @@ private InstallSnapshotReplyProto
notifyStateMachineToInstallSnapshot(
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex,
server.getPeer());
installedIndex.set(latestInstalledIndex);
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED,
latestInstalledSnapshotTermIndex.getIndex());
+ return future.thenApply(dummy -> replyProto);
}
// Otherwise, Snapshot installation is in progress.
if (LOG.isDebugEnabled()) {
LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.IN_PROGRESS);
}
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ replyProto = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.IN_PROGRESS);
+ return future.thenApply(dummy -> replyProto);
}
}
- private RoleInfoProto getRoleInfoProto(RaftPeer leader) {
+ private RoleInfoProto getRoleInfoProto (RaftPeer leader){
Review Comment:
Ok
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java:
##########
@@ -210,25 +214,27 @@ private InstallSnapshotReplyProto
checkAndInstallSnapshot(InstallSnapshotRequest
if (snapshotChunkRequest.getDone()) {
LOG.info("{}: successfully install the entire snapshot-{}",
getMemberId(), lastIncludedIndex);
}
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
- currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.SUCCESS);
+ return future.thenApply(dummy -> toInstallSnapshotReplyProto(leaderId,
getMemberId(),
+ currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.SUCCESS));
Review Comment:
Ok
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java:
##########
@@ -186,8 +189,9 @@ private InstallSnapshotReplyProto
checkAndInstallSnapshot(InstallSnapshotRequest
// have a lot of requests
if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1);
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
Review Comment:
Ok
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java:
##########
@@ -210,25 +214,27 @@ private InstallSnapshotReplyProto
checkAndInstallSnapshot(InstallSnapshotRequest
if (snapshotChunkRequest.getDone()) {
LOG.info("{}: successfully install the entire snapshot-{}",
getMemberId(), lastIncludedIndex);
}
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
- currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.SUCCESS);
+ return future.thenApply(dummy -> toInstallSnapshotReplyProto(leaderId,
getMemberId(),
+ currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.SUCCESS));
Review Comment:
Ok
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]