This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 18e5acf93 RATIS-1913. Assert that the primary peers in
DataStreamClient and RoutingTable are equal (#945)
18e5acf93 is described below
commit 18e5acf93765689ba1cbd316561c208f0f5e157b
Author: Ivan Andika <[email protected]>
AuthorDate: Thu Oct 26 02:10:11 2023 +0800
RATIS-1913. Assert that the primary peers in DataStreamClient and
RoutingTable are equal (#945)
---
.../ratis/client/impl/DataStreamClientImpl.java | 7 +++++
.../org/apache/ratis/protocol/RoutingTable.java | 30 +++++++++++++++-------
.../ratis/datastream/DataStreamClusterTests.java | 30 ++++++++++++++++++++++
...amSslWithRpcTypeGrpcAndDataStreamTypeNetty.java | 5 ++++
4 files changed, 63 insertions(+), 9 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 7a25a8e4b..353f532c6 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -44,6 +44,7 @@ import org.apache.ratis.protocol.*;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SlidingWindow;
import java.io.IOException;
@@ -230,6 +231,12 @@ public class DataStreamClientImpl implements
DataStreamClient {
@Override
public DataStreamOutputRpc stream(ByteBuffer headerMessage, RoutingTable
routingTable) {
+ if (routingTable != null) {
+ // Validate that the primary peer is equal to the primary peer passed by
the RoutingTable
+
Preconditions.assertTrue(dataStreamServer.getId().equals(routingTable.getPrimary()),
+ () -> "Primary peer mismatched: the routing table has " +
routingTable.getPrimary()
+ + " but the client has " + dataStreamServer.getId());
+ }
final Message message =
Optional.ofNullable(headerMessage).map(ByteString::copyFrom).map(Message::valueOf).orElse(null);
RaftClientRequest request = RaftClientRequest.newBuilder()
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
index 0157fe49a..56181c482 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
@@ -43,6 +43,9 @@ public interface RoutingTable {
/** @return the successor peers of the given peer. */
Set<RaftPeerId> getSuccessors(RaftPeerId peerId);
+ /** @return the primary peer. */
+ RaftPeerId getPrimary();
+
/** @return the proto of this {@link RoutingTable}. */
RoutingTableProto toProto();
@@ -78,15 +81,15 @@ public interface RoutingTable {
}
public RoutingTable build() {
- return Optional.ofNullable(ref.getAndSet(null))
- .map(RoutingTable::newRoutingTable)
- .orElseThrow(() -> new IllegalStateException("RoutingTable Already
built"));
+ final Map<RaftPeerId, Set<RaftPeerId>> map = ref.getAndSet(null);
+ if (map == null) {
+ throw new IllegalStateException("RoutingTable is already built.");
+ }
+ return RoutingTable.newRoutingTable(map);
}
- static void validate(Map<RaftPeerId, Set<RaftPeerId>> map) {
- if (map != null && !map.isEmpty()) {
- new Builder.Validation(map).run();
- }
+ static RaftPeerId validate(Map<RaftPeerId, Set<RaftPeerId>> map) {
+ return new Builder.Validation(map).run();
}
/** Validate if a map represents a valid routing table. */
@@ -131,10 +134,11 @@ public interface RoutingTable {
this.unreachablePeers = allPeers;
}
- private void run() {
+ private RaftPeerId run() {
depthFirstSearch(primary);
Preconditions.assertTrue(unreachablePeers.isEmpty() ,
() -> "Invalid routing table: peer(s) " + unreachablePeers + "
are unreachable, " + this);
+ return primary;
}
private void depthFirstSearch(RaftPeerId current) {
@@ -159,7 +163,10 @@ public interface RoutingTable {
/** @return a new {@link RoutingTable} represented by the given map. */
static RoutingTable newRoutingTable(Map<RaftPeerId, Set<RaftPeerId>> map){
- Builder.validate(map);
+ if (map == null || map.isEmpty()) {
+ return null;
+ }
+ final RaftPeerId primary = Builder.validate(map);
final Supplier<RoutingTableProto> proto = JavaUtils.memoize(
() ->
RoutingTableProto.newBuilder().addAllRoutes(ProtoUtils.toRouteProtos(map)).build());
@@ -169,6 +176,11 @@ public interface RoutingTable {
return
Optional.ofNullable(map.get(peerId)).orElseGet(Collections::emptySet);
}
+ @Override
+ public RaftPeerId getPrimary() {
+ return primary;
+ }
+
@Override
public RoutingTableProto toProto() {
return proto.get();
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index fd47045a4..dcb54be3d 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -62,6 +62,11 @@ public abstract class DataStreamClusterTests<CLUSTER extends
MiniRaftCluster> ex
runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
}
+ @Test
+ public void testStreamWithInvalidRoutingTable() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::runTestInvalidPrimaryInRoutingTable);
+ }
+
void testStreamWrites(CLUSTER cluster) throws Exception {
waitForLeader(cluster);
runTestDataStreamOutput(cluster);
@@ -96,6 +101,31 @@ public abstract class DataStreamClusterTests<CLUSTER
extends MiniRaftCluster> ex
assertLogEntry(cluster, request);
}
+ void runTestInvalidPrimaryInRoutingTable(CLUSTER cluster) throws Exception {
+ final RaftPeer primaryServer =
CollectionUtils.random(cluster.getGroup().getPeers());
+
+ RaftPeer notPrimary = null;
+ for (RaftPeer peer: cluster.getGroup().getPeers()) {
+ if (!peer.equals(primaryServer)) {
+ notPrimary = peer;
+ break;
+ }
+ }
+
+ Assert.assertNotNull(
+ "Cannot find peer other than the primary", notPrimary);
+ Assert.assertNotEquals(primaryServer, notPrimary);
+
+ try (RaftClient client = cluster.createClient(primaryServer)) {
+ RoutingTable routingTableWithWrongPrimary =
+ getRoutingTable(cluster.getGroup().getPeers(), notPrimary);
+ testFailureCase("",
+ () -> client.getDataStreamApi().stream(null,
+ routingTableWithWrongPrimary),
+ IllegalStateException.class);
+ }
+ }
+
void runTestWriteFile(CLUSTER cluster, int i,
CheckedConsumer<DataStreamOutputImpl, Exception> testCase) throws
Exception {
final RaftClientRequest request;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java
index 06702f91a..8e423ab29 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java
@@ -60,6 +60,11 @@ public class
TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty
public void testStreamWrites() {
}
+ @Ignore
+ @Override
+ public void testStreamWithInvalidRoutingTable() {
+ }
+
@Ignore
@Override
public void testMultipleStreamsMultipleServers() {