This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 9a0497c0febf978a0c6309ebf6a21b4f4459bfb1 Author: tison <[email protected]> AuthorDate: Mon Jul 11 05:34:15 2022 +0800 RATIS-1619. Validate server id in raft group if the group is not empty (#677) --- .../java/org/apache/ratis/util/Preconditions.java | 5 + .../apache/ratis/server/impl/ServerImplUtils.java | 5 + .../apache/ratis/server/impl/MiniRaftCluster.java | 30 +++--- .../server/impl/RaftReconfigurationBaseTest.java | 2 +- .../datastream/DataStreamAsyncClusterTests.java | 7 +- .../org/apache/ratis/server/ServerBuilderTest.java | 102 +++++++++++++++++++++ .../shell/cli/sh/PeerCommandIntegrationTest.java | 7 +- 7 files changed, 139 insertions(+), 19 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java index 196bd992..e9a00261 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java @@ -102,6 +102,11 @@ public interface Preconditions { + name + " = " + object + " == null, class = " + object.getClass()); } + static <T> T assertNotNull(T object, String format, Object... args) { + assertTrue(object != null, format, args); + return object; + } + static <T> T assertInstanceOf(Object object, Class<T> clazz) { assertTrue(clazz.isInstance(object), () -> "Required instance of " + clazz + " but object.getClass() is " + object.getClass()); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index 8bbdfa68..6777b909 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -29,6 +29,7 @@ import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; import java.io.IOException; @@ -47,6 +48,10 @@ public final class ServerImplUtils { RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters) throws IOException { RaftServer.LOG.debug("newRaftServer: {}, {}", id, group); + if (group != null && !group.getPeers().isEmpty()) { + Preconditions.assertNotNull(id, "RaftPeerId %s is not in RaftGroup %s", id, group); + Preconditions.assertNotNull(group.getPeer(id), "RaftPeerId %s is not in RaftGroup %s", id, group); + } final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, properties, parameters); proxy.initGroups(group); return proxy; diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index d479ae6e..652f26dc 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -283,8 +283,7 @@ public abstract class MiniRaftCluster implements Closeable { public MiniRaftCluster initServers() { LOG.info("servers = " + servers); if (servers.isEmpty()) { - putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), - true, false); + putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true, group); } return this; } @@ -295,18 +294,10 @@ public abstract class MiniRaftCluster implements Closeable { return s; } - private Collection<RaftServer> putNewServers( - Iterable<RaftPeerId> peers, boolean format, boolean emptyPeer) { - if (emptyPeer) { - final RaftGroup raftGroup = RaftGroup.valueOf(group.getGroupId(), Collections.emptyList()); + private Collection<RaftServer> putNewServers(Iterable<RaftPeerId> peers, boolean format, RaftGroup raftGroup) { return StreamSupport.stream(peers.spliterator(), false) .map(id -> putNewServer(id, raftGroup, format)) .collect(Collectors.toList()); - } else { - return StreamSupport.stream(peers.spliterator(), false) - .map(id -> putNewServer(id, group, format)) - .collect(Collectors.toList()); - } } public void start() throws IOException { @@ -345,7 +336,7 @@ public abstract class MiniRaftCluster implements Closeable { List<RaftPeerId> idList = new ArrayList<>(servers.keySet()); servers.clear(); - putNewServers(idList, format, false); + putNewServers(idList, format, group); start(); } @@ -426,9 +417,20 @@ public abstract class MiniRaftCluster implements Closeable { boolean emptyPeer) throws IOException { LOG.info("Add new peers {}", Arrays.asList(ids)); + final Iterable<RaftPeerId> peerIds = CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf); + final RaftGroup raftGroup; + if (emptyPeer) { + raftGroup = RaftGroup.valueOf(group.getGroupId(), Collections.emptyList()); + } else { + final Collection<RaftPeer> newPeers = StreamSupport.stream(peerIds.spliterator(), false) + .map(id -> RaftPeer.newBuilder().setId(id).build()) + .collect(Collectors.toSet()); + newPeers.addAll(group.getPeers()); + raftGroup = RaftGroup.valueOf(group.getGroupId(), newPeers); + } + // create and add new RaftServers - final Collection<RaftServer> newServers = putNewServers( - CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true, emptyPeer); + final Collection<RaftServer> newServers = putNewServers(peerIds, true, raftGroup); startServers(newServers); if (!startNewPeer) { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index d764988e..abf0292b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -240,7 +240,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste CountDownLatch latch = new CountDownLatch(1); Thread clientThread = new Thread(() -> { try { - PeerChanges c1 = cluster.addNewPeers(2, true); + PeerChanges c1 = cluster.addNewPeers(2, true, true); LOG.info("Start changing the configuration: {}", asList(c1.allPeersInNewConf)); diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java index b453e7b1..a9b691db 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java @@ -145,7 +145,12 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste long runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int bufferNum, boolean stepDownLeader) { final Iterable<RaftServer> servers = CollectionUtils.as(cluster.getServers(), s -> s); - final RaftPeerId leader = cluster.getLeader().getId(); + final RaftPeerId leader; + try { + leader = RaftTestUtil.waitForLeader(cluster).getId(); + } catch (InterruptedException e) { + throw new CompletionException(e); + } final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>(); final RaftPeer primaryServer = CollectionUtils.random(cluster.getGroup().getPeers()); try(RaftClient client = cluster.createClient(primaryServer)) { diff --git a/ratis-test/src/test/java/org/apache/ratis/server/ServerBuilderTest.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerBuilderTest.java new file mode 100644 index 00000000..dd76a2ec --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerBuilderTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server; + +import java.io.IOException; +import org.apache.ratis.BaseTest; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.util.Preconditions; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test {@link RaftServer.Builder}. + */ +public class ServerBuilderTest extends BaseTest { + + @Test + public void testPeerIdInRaftGroup() throws Exception { + RaftPeer peer = RaftPeer.newBuilder().setId("n0").build(); + RaftGroup group = RaftGroup.valueOf(RaftGroupId.randomId(), peer); + RaftServer server = RaftServer.newBuilder() + .setServerId(peer.getId()) + .setGroup(group) + .setStateMachine(new BaseStateMachine()) + .setProperties(new RaftProperties()) + .build(); + server.close(); + } + + @Test + public void testPeerIdNotInRaftGroup() { + RaftPeer peer = RaftPeer.newBuilder().setId("n0").build(); + RaftGroup group = RaftGroup.valueOf(RaftGroupId.randomId(), peer); + try { + RaftServer.newBuilder() + .setServerId(RaftPeerId.valueOf("n1")) + .setGroup(group) + .setStateMachine(new BaseStateMachine()) + .setProperties(new RaftProperties()) + .build(); + Assert.fail("did not get expected exception"); + } catch (IOException e) { + Preconditions.assertInstanceOf(e.getCause(), IllegalStateException.class); + } + } + + @Test + public void testNullPeerIdWithRaftGroup() { + RaftPeer peer = RaftPeer.newBuilder().setId("n0").build(); + RaftGroup group = RaftGroup.valueOf(RaftGroupId.randomId(), peer); + try { + RaftServer.newBuilder() + .setGroup(group) + .setStateMachine(new BaseStateMachine()) + .setProperties(new RaftProperties()) + .build(); + Assert.fail("did not get expected exception"); + } catch (IOException e) { + Preconditions.assertInstanceOf(e.getCause(), IllegalStateException.class); + } + } + + @Test + public void testPeerIdWithNullRaftGroup() throws Exception { + RaftPeer peer = RaftPeer.newBuilder().setId("n0").build(); + RaftServer server = RaftServer.newBuilder() + .setServerId(peer.getId()) + .setStateMachine(new BaseStateMachine()) + .setProperties(new RaftProperties()) + .build(); + server.close(); + } + + @Test + public void testNullPeerIdWithNullRaftGroup() throws Exception { + RaftServer server = RaftServer.newBuilder() + .setStateMachine(new BaseStateMachine()) + .setProperties(new RaftProperties()) + .build(); + server.close(); + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/PeerCommandIntegrationTest.java b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/PeerCommandIntegrationTest.java index 285d431e..99d95553 100644 --- a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/PeerCommandIntegrationTest.java +++ b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/PeerCommandIntegrationTest.java @@ -79,11 +79,12 @@ public abstract class PeerCommandIntegrationTest <CLUSTER extends MiniRaftCluste void runTestPeerAddCommand(MiniRaftCluster cluster) throws Exception { LOG.info("Start testMultiGroup" + cluster.printServers()); - final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + RaftTestUtil.waitForLeader(cluster); RaftPeer[] peers = cluster.getPeers().toArray(new RaftPeer[0]); - RaftPeer[] newPeers = cluster.addNewPeers(1, true).newPeers; + RaftPeer[] newPeers = cluster.addNewPeers(1, true, true).newPeers; + RaftServerTestUtil.waitAndCheckNewConf(cluster, peers, 0, null); - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); for (RaftPeer peer : peers) { sb.append(peer.getAdminAddress()); sb.append(",");
