szetszwo commented on a change in pull request #597:
URL: https://github.com/apache/ratis/pull/597#discussion_r804540181
##########
File path:
ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
##########
@@ -30,7 +30,8 @@
public interface LeaderState {
/** The reasons that this leader steps down and becomes a follower. */
enum StepDownReason {
- HIGHER_TERM, HIGHER_PRIORITY, LOST_MAJORITY_HEARTBEATS,
STATE_MACHINE_EXCEPTION, JVM_PAUSE;
+ HIGHER_TERM, HIGHER_PRIORITY, LOST_MAJORITY_HEARTBEATS,
STATE_MACHINE_EXCEPTION, JVM_PAUSE,
+ NO_LEADER_MODE;
Review comment:
Let's call it "FORCE".
##########
File path:
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
##########
@@ -536,6 +538,9 @@ void submitStepDownEvent(long term, StepDownReason reason) {
private void stepDown(long term, StepDownReason reason) {
try {
server.changeToFollowerAndPersistMetadata(term, reason);
+ if (reason.equals(StepDownReason.NO_LEADER_MODE)) {
+ stepDownLeader.completeStepDownLeader();
+ }
Review comment:
Even if the reason is not NO_LEADER_MODE, we should complete the
request, i.e.
```
private void stepDown(long term, StepDownReason reason) {
try {
server.changeToFollowerAndPersistMetadata(term, reason);
+ pendingStepDown.complete();
} catch(IOException e) {
```
##########
File path:
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
##########
@@ -547,6 +552,10 @@ private void stepDown(long term, StepDownReason reason) {
}
}
+ StepDownLeader getStepDownLeader() {
+ return stepDownLeader;
+ }
+
Review comment:
Let's add a submitStepDownRequestAsync instead.
```
CompletableFuture<RaftClientReply>
submitStepDownRequestAsync(TransferLeadershipRequest request) {
return pendingStepDown.stepDownLeaderAsync(request);
}
```
##########
File path:
ratis-server/src/main/java/org/apache/ratis/server/impl/StepDownLeader.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.server.leader.LeaderState;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+public class StepDownLeader {
+ public static final Logger LOG =
LoggerFactory.getLogger(StepDownLeader.class);
+
+ class PendingRequest {
+ private final TransferLeadershipRequest request;
+ private final CompletableFuture<RaftClientReply> replyFuture = new
CompletableFuture<>();
+
+ PendingRequest(TransferLeadershipRequest request) {
+ this.request = request;
+ }
+
+ TransferLeadershipRequest getRequest() {
+ return request;
+ }
+
+ CompletableFuture<RaftClientReply> getReplyFuture() {
+ return replyFuture;
+ }
+
+ void complete() {
+ LOG.info("Successfully step down leader at {} for request {}",
server.getMemberId(), request);
+ replyFuture.complete(server.newSuccessReply(request));
+ }
+
+ void timeout() {
+ replyFuture.completeExceptionally(new TimeoutIOException(
+ ": Failed to step down leader on " + server.getMemberId() +
"request " + request.getTimeoutMs() + "ms"));
+ }
+
+ @Override
+ public String toString() {
+ return request.toString();
+ }
+ }
+
+
+ static class PendingRequestReference {
+ private final AtomicReference<PendingRequest> ref = new
AtomicReference<>();
+
+ Optional<PendingRequest> get() {
+ return Optional.ofNullable(ref.get());
+ }
+
+ Optional<PendingRequest> getAndSetNull() {
+ return Optional.ofNullable(ref.getAndSet(null));
+ }
+
+ PendingRequest getAndUpdate(Supplier<PendingRequest> supplier) {
+ return ref.getAndUpdate(p -> p != null? p: supplier.get());
+ }
+ }
+
+ private final RaftServerImpl server;
Review comment:
We should have LeaderStateImpl instead.
##########
File path:
ratis-server/src/test/java/org/apache/ratis/server/impl/PreAppendLeaderStepDownTest.java
##########
@@ -108,4 +112,24 @@ private void runTestLeaderStepDown(CLUSTER cluster) throws
Exception {
cluster.shutdown();
}
}
+
+ @Test
+ public void testLeaderStepDownAsync() throws Exception {
+ runWithNewCluster(3, this::runTestLeaderStepDownAsync);
+ }
+
+ void runTestLeaderStepDownAsync(CLUSTER cluster) throws IOException,
InterruptedException {
Review comment:
This is an extra space before "void".
##########
File path:
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1059,6 +1059,25 @@ SnapshotManagementRequestHandler
getSnapshotRequestHandler() {
getId() + ": Request not supported " + request));
}
+ CompletableFuture<RaftClientReply>
stepDownLeaderAsync(TransferLeadershipRequest request) throws IOException {
+ LOG.info("{} receive stepDown leader request {}", getMemberId(), request);
+ assertLifeCycleState(LifeCycle.States.RUNNING);
+ assertGroup(request.getRequestorId(), request.getRaftGroupId());
+
+ CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null,
true);
+ if (reply != null) {
+ return CompletableFuture.completedFuture(newSuccessReply(request));
+ }
+ synchronized (this) {
+ reply = checkLeaderState(request, null, false);
+ if (reply != null) {
+ return CompletableFuture.completedFuture(newSuccessReply(request));
+ }
+ final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
+ return leaderState.getStepDownLeader().stepDownLeaderAsync(request);
+ }
+ }
Review comment:
The code can be simplified as below.
```
CompletableFuture<RaftClientReply>
stepDownLeaderAsync(TransferLeadershipRequest request) throws IOException {
LOG.info("{} receive stepDown leader request {}", getMemberId(),
request);
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(request.getRequestorId(), request.getRaftGroupId());
return role.getLeaderState().map(leader ->
leader.submitStepDownRequestAsync(request))
.orElseGet(() ->
CompletableFuture.completedFuture(newSuccessReply(request)));
}
```
##########
File path:
ratis-server/src/main/java/org/apache/ratis/server/impl/StepDownLeader.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.server.leader.LeaderState;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+public class StepDownLeader {
Review comment:
Let's rename it to "PendingStepDown".
##########
File path:
ratis-server/src/main/java/org/apache/ratis/server/impl/StepDownLeader.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.server.leader.LeaderState;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+public class StepDownLeader {
+ public static final Logger LOG =
LoggerFactory.getLogger(StepDownLeader.class);
+
+ class PendingRequest {
+ private final TransferLeadershipRequest request;
+ private final CompletableFuture<RaftClientReply> replyFuture = new
CompletableFuture<>();
+
+ PendingRequest(TransferLeadershipRequest request) {
+ this.request = request;
+ }
+
+ TransferLeadershipRequest getRequest() {
+ return request;
+ }
+
+ CompletableFuture<RaftClientReply> getReplyFuture() {
+ return replyFuture;
+ }
+
+ void complete() {
+ LOG.info("Successfully step down leader at {} for request {}",
server.getMemberId(), request);
+ replyFuture.complete(server.newSuccessReply(request));
+ }
+
+ void timeout() {
+ replyFuture.completeExceptionally(new TimeoutIOException(
+ ": Failed to step down leader on " + server.getMemberId() +
"request " + request.getTimeoutMs() + "ms"));
+ }
+
+ @Override
+ public String toString() {
+ return request.toString();
+ }
+ }
+
+
+ static class PendingRequestReference {
+ private final AtomicReference<PendingRequest> ref = new
AtomicReference<>();
+
+ Optional<PendingRequest> get() {
+ return Optional.ofNullable(ref.get());
+ }
+
+ Optional<PendingRequest> getAndSetNull() {
+ return Optional.ofNullable(ref.getAndSet(null));
+ }
+
+ PendingRequest getAndUpdate(Supplier<PendingRequest> supplier) {
+ return ref.getAndUpdate(p -> p != null? p: supplier.get());
+ }
+ }
+
+ private final RaftServerImpl server;
+ private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ private final PendingRequestReference pending = new
PendingRequestReference();
+
+ StepDownLeader(RaftServerImpl server) {
+ this.server = server;
+ }
+
+ CompletableFuture<RaftClientReply>
stepDownLeaderAsync(TransferLeadershipRequest request) {
Review comment:
Let's simply call it "submitAsync". Also, we should check the request.
```
CompletableFuture<RaftClientReply> submitAsync(TransferLeadershipRequest
request) {
Preconditions.assertNull(request.getNewLeader(),
"request.getNewLeader()");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]