This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d711152 RATIS-495. Leader Election thread might crash if two servers
having the same id. Contributed by Tsz Wo Nicholas Sze.
d711152 is described below
commit d7111524924623650a01e6bd1011eea79058b9eb
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Apr 23 18:16:32 2019 +0530
RATIS-495. Leader Election thread might crash if two servers having the
same id. Contributed by Tsz Wo Nicholas Sze.
---
.../java/org/apache/ratis/util/ProtoUtils.java | 4 ---
.../apache/ratis/server/impl/LeaderElection.java | 19 +++++++----
.../ratis/server/impl/PeerConfiguration.java | 17 +++++++---
.../apache/ratis/server/impl/RaftServerImpl.java | 2 +-
.../apache/ratis/server/impl/ServerProtoUtils.java | 4 +++
.../ratis/server/impl/TestRaftConfiguration.java | 39 ++++++++++++++++++++++
6 files changed, 70 insertions(+), 15 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 08a4562..60800fb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -23,7 +23,6 @@ import org.apache.ratis.proto.RaftProtos.RaftGroupProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.proto.RaftProtos.RaftRpcReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
@@ -164,7 +163,4 @@ public interface ProtoUtils {
+ "#" + proto.getCallId() + ":"
+ (proto.getSuccess()? "OK": "FAIL");
}
- static String toString(RequestVoteReplyProto proto) {
- return toString(proto.getServerReply()) + "-t" + proto.getTerm();
- }
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 796f5e2..b0b3113 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -27,7 +27,6 @@ import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import org.slf4j.Logger;
@@ -36,7 +35,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
@@ -50,10 +51,10 @@ class LeaderElection implements Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(LeaderElection.class);
private ResultAndTerm logAndReturn(Result result,
- List<RequestVoteReplyProto> responses,
+ Map<RaftPeerId, RequestVoteReplyProto> responses,
List<Exception> exceptions, long newTerm) {
LOG.info(this + ": Election " + result + "; received " + responses.size()
+ " response(s) "
- +
responses.stream().map(ProtoUtils::toString).collect(Collectors.toList())
+ +
responses.values().stream().map(ServerProtoUtils::toString).collect(Collectors.toList())
+ " and " + exceptions.size() + " exception(s); " + server.getState());
int i = 0;
for(Exception e : exceptions) {
@@ -242,7 +243,7 @@ class LeaderElection implements Runnable {
private ResultAndTerm waitForResults(final long electionTerm, final int
submitted,
RaftConfiguration conf, Executor voteExecutor) throws
InterruptedException {
final Timestamp timeout =
Timestamp.currentTime().addTimeMs(server.getRandomTimeoutMs());
- final List<RequestVoteReplyProto> responses = new ArrayList<>();
+ final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
final List<Exception> exceptions = new ArrayList<>();
int waitForNum = submitted;
Collection<RaftPeerId> votedPeers = new ArrayList<>();
@@ -259,7 +260,13 @@ class LeaderElection implements Runnable {
}
final RequestVoteReplyProto r = future.get();
- responses.add(r);
+ final RaftPeerId replierId =
RaftPeerId.valueOf(r.getServerReply().getReplyId());
+ final RequestVoteReplyProto previous =
responses.putIfAbsent(replierId, r);
+ if (previous != null) {
+ LOG.warn("{} received duplicated replies from {}, the 2nd reply is
ignored: 1st = {}, 2nd = {}",
+ server.getId(), replierId, ServerProtoUtils.toString(previous),
ServerProtoUtils.toString(r));
+ continue;
+ }
if (r.getShouldShutdown()) {
return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1);
}
@@ -268,7 +275,7 @@ class LeaderElection implements Runnable {
exceptions, r.getTerm());
}
if (r.getServerReply().getSuccess()) {
- votedPeers.add(RaftPeerId.valueOf(r.getServerReply().getReplyId()));
+ votedPeers.add(replierId);
if (conf.hasMajority(votedPeers, server.getId())) {
return logAndReturn(Result.PASSED, responses, exceptions, -1);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
index 6d92b92..8b4ec86 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,12 +17,18 @@
*/
package org.apache.ratis.server.impl;
-import java.util.*;
-
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
/**
* The peer configuration of a raft cluster.
*
@@ -35,7 +41,10 @@ class PeerConfiguration {
Objects.requireNonNull(peers);
Map<RaftPeerId, RaftPeer> map = new HashMap<>();
for(RaftPeer p : peers) {
- map.put(p.getId(), p);
+ final RaftPeer previous = map.putIfAbsent(p.getId(), p);
+ if (previous != null) {
+ throw new IllegalArgumentException("Found duplicated ids " + p.getId()
+ " in peers " + peers);
+ }
}
this.peers = Collections.unmodifiableMap(map);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index b187108..2ccc5a9 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -757,7 +757,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
groupId, voteGranted, state.getCurrentTerm(), shouldShutdown);
if (LOG.isDebugEnabled()) {
LOG.debug("{} replies to vote request: {}. Peer's state: {}",
- getId(), ProtoUtils.toString(reply), state);
+ getId(), ServerProtoUtils.toString(reply), state);
}
}
return reply;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index adc593e..c7c6355 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -106,6 +106,10 @@ public interface ServerProtoUtils {
+ ",followerCommit:" + reply.getFollowerCommit();
}
+ static String toString(RequestVoteReplyProto proto) {
+ return toString(proto.getServerReply()) + "-t" + proto.getTerm();
+ }
+
static String toString(RaftRpcReplyProto reply) {
return reply.getRequestorId().toStringUtf8() + "->"
+ reply.getReplyId().toStringUtf8() + "," + reply.getSuccess();
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestRaftConfiguration.java
b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestRaftConfiguration.java
new file mode 100644
index 0000000..0f5d771
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestRaftConfiguration.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class TestRaftConfiguration extends BaseTest {
+ @Test
+ public void testPeerConfiguration() {
+ final RaftPeer[] peers = {
+ new RaftPeer(RaftPeerId.valueOf("s0")),
+ new RaftPeer(RaftPeerId.valueOf("s1")),
+ new RaftPeer(RaftPeerId.valueOf("s0")),
+ };
+ testFailureCase("Duplicated peers", () -> {
+ new PeerConfiguration(Arrays.asList(peers));
+ }, IllegalArgumentException.class);
+ }
+}
\ No newline at end of file