Repository: incubator-ratis Updated Branches: refs/heads/master e1620e804 -> 5565425f0
RATIS-18. A new leader should start serving client requests only after it commits the first leader-placeholder entry. Contributed by Jing Zhao. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/5565425f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/5565425f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/5565425f Branch: refs/heads/master Commit: 5565425f0e4ebc450051340522702465b37d97c6 Parents: e1620e8 Author: Jing Zhao <[email protected]> Authored: Fri Feb 17 18:16:38 2017 -0800 Committer: Jing Zhao <[email protected]> Committed: Fri Feb 17 18:16:38 2017 -0800 ---------------------------------------------------------------------- .../ratis/protocol/LeaderNotReadyException.java | 34 ++++++++++++ .../client/HadoopClientRequestSender.java | 7 ++- .../apache/ratis/server/impl/LeaderState.java | 13 ++++- .../ratis/server/impl/RaftServerImpl.java | 7 +++ .../java/org/apache/ratis/MiniRaftCluster.java | 3 ++ .../impl/DelayLocalExecutionInjection.java | 6 ++- .../impl/RaftReconfigurationBaseTest.java | 56 +++++++++++++++++++- 7 files changed, 119 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java new file mode 100644 index 0000000..33f6a4d --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +/** + * This exception is sent from the server to a client. The server has just + * become the current leader, but has not committed its first place-holder + * log entry yet. Thus the leader cannot accept any new client requests since + * it cannot determine whether a request is just a retry. + */ +public class LeaderNotReadyException extends RaftException { + public LeaderNotReadyException() { + this("The leader is not ready yet"); + } + + public LeaderNotReadyException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java index 116be19..918a191 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java @@ -50,9 +50,12 @@ public class HadoopClientRequestSender implements RaftClientRequestSender { return proxy.submitClientRequest(request); } } catch (RemoteException e) { - throw e.unwrapRemoteException(StateMachineException.class, + throw e.unwrapRemoteException( + StateMachineException.class, ReconfigurationTimeoutException.class, - ReconfigurationInProgressException.class, RaftException.class); + ReconfigurationInProgressException.class, + RaftException.class, + LeaderNotReadyException.class); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index d5d6adc..1750a04 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -53,6 +53,7 @@ import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.shaded.proto.RaftProtos.LeaderNoOp; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.Daemon; import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.Timestamp; @@ -70,6 +71,7 @@ import com.google.common.base.Preconditions; */ public class LeaderState { private static final Logger LOG = RaftServerImpl.LOG; + public static final String APPEND_PLACEHOLDER = LeaderState.class.getSimpleName() + ".placeholder"; enum StateUpdateEventType { STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS @@ -113,6 +115,7 @@ public class LeaderState { private final int stagingCatchupGap; private final int snapshotChunkMaxSize; private final int syncInterval; + private final long placeHolderIndex; LeaderState(RaftServerImpl server, RaftProperties properties) { this.server = server; @@ -137,11 +140,11 @@ public class LeaderState { final RaftConfiguration conf = server.getRaftConf(); Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId()); final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); - final long nextIndex = raftLog.getNextIndex(); + placeHolderIndex = raftLog.getNextIndex(); senders = new ArrayList<>(others.size()); for (RaftPeer p : others) { - FollowerInfo f = new FollowerInfo(p, t, nextIndex, true); + FollowerInfo f = new FollowerInfo(p, t, placeHolderIndex, true); senders.add(server.getFactory().newLogAppender(server, this, f)); } voterLists = divideFollowers(conf); @@ -156,12 +159,18 @@ public class LeaderState { .setTerm(server.getState().getCurrentTerm()) .setIndex(raftLog.getNextIndex()) .setNoOp(LeaderNoOp.newBuilder()).build(); + CodeInjectionForTesting.execute(APPEND_PLACEHOLDER, + server.getId().toString(), null); raftLog.append(placeHolder); processor.start(); startSenders(); } + boolean isReady() { + return server.getState().getLastAppliedIndex() >= placeHolderIndex; + } + private void startSenders() { senders.forEach(Thread::start); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 7d9e049..152d6a5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.LeaderNotReadyException; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.NotLeaderException; import org.apache.ratis.protocol.RaftClientReply; @@ -347,6 +348,12 @@ public class RaftServerImpl implements RaftServer { CompletableFuture<RaftClientReply> future = new CompletableFuture<>(); future.complete(new RaftClientReply(request, exception)); return future; + } else { + if (leaderState == null || !leaderState.isReady()) { + CompletableFuture<RaftClientReply> future = new CompletableFuture<>(); + future.completeExceptionally(new LeaderNotReadyException()); + return future; + } } return null; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 993d861..682849f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -26,6 +26,7 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; +import org.apache.ratis.server.impl.LeaderState; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerImplUtils; @@ -51,6 +52,8 @@ public abstract class MiniRaftCluster { public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class); public static final DelayLocalExecutionInjection logSyncDelay = new DelayLocalExecutionInjection(RaftLog.LOG_SYNC); + public static final DelayLocalExecutionInjection leaderPlaceHolderDelay = + new DelayLocalExecutionInjection(LeaderState.APPEND_PLACEHOLDER); public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName(); public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class"; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java index 1818722..6df6176 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java @@ -29,8 +29,10 @@ import java.util.concurrent.atomic.AtomicInteger; public class DelayLocalExecutionInjection implements CodeInjectionForTesting.Code { private final Map<String, AtomicInteger> delays = new ConcurrentHashMap<>(); - public DelayLocalExecutionInjection(String method) { - CodeInjectionForTesting.put(method, this); + public DelayLocalExecutionInjection(String... methods) { + for (String method : methods) { + CodeInjectionForTesting.put(method, this); + } } public void clear() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index e07f5cb..5b46af8 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static java.util.Arrays.asList; +import static org.apache.ratis.MiniRaftCluster.leaderPlaceHolderDelay; import static org.apache.ratis.MiniRaftCluster.logSyncDelay; import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf; @@ -509,7 +510,6 @@ public abstract class RaftReconfigurationBaseTest { @Test public void testRevertConfigurationChange() throws Exception { LOG.info("Start testRevertConfigurationChange"); - // originally 3 peers final MiniRaftCluster cluster = getCluster(5); try { cluster.start(); @@ -575,4 +575,58 @@ public abstract class RaftReconfigurationBaseTest { cluster.shutdown(); } } + + /** + * Delay the commit of the leader placeholder log entry and see if the client + * can correctly receive and handle the LeaderNotReadyException. + */ + @Test + public void testLeaderNotReadyException() throws Exception { + LOG.info("Start testLeaderNotReadyException"); + final MiniRaftCluster cluster = getCluster(1); + final RaftPeerId leaderId = cluster.getPeers().iterator().next().getId(); + try { + // delay 1s for each logSync call + cluster.getPeers().forEach( + peer -> leaderPlaceHolderDelay.setDelayMs(peer.getId().toString(), 2000)); + cluster.start(); + + AtomicBoolean caughtNotReady = new AtomicBoolean(false); + AtomicBoolean success = new AtomicBoolean(false); + new Thread(() -> { + final RaftClient client = cluster.createClient(leaderId); + final RaftClientRequestSender sender = client.getRequestSender(); + + final RaftClientRequest request = new RaftClientRequest(client.getId(), + leaderId, 0, new SimpleMessage("test")); + while (!success.get()) { + try { + RaftClientReply reply = sender.sendRequest(request); + success.set(reply.isSuccess()); + } catch (LeaderNotReadyException e) { + LOG.info("Hit LeaderNotReadyException", e); + caughtNotReady.set(true); + } catch (IOException e) { + LOG.info("Hit other IOException", e); + } + if (!success.get()) { + try { + Thread.sleep(200); + } catch (InterruptedException ignored) { + } + } + } + }).start(); + + RaftTestUtil.waitForLeader(cluster); + for (int i = 0; !success.get() && i < 5; i++) { + Thread.sleep(1000); + } + Assert.assertTrue(success.get()); + Assert.assertTrue(caughtNotReady.get()); + } finally { + leaderPlaceHolderDelay.clear(); + cluster.shutdown(); + } + } }
