This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 18e5acf93 RATIS-1913. Assert that the primary peers in 
DataStreamClient and RoutingTable are equal (#945)
18e5acf93 is described below

commit 18e5acf93765689ba1cbd316561c208f0f5e157b
Author: Ivan Andika <[email protected]>
AuthorDate: Thu Oct 26 02:10:11 2023 +0800

    RATIS-1913. Assert that the primary peers in DataStreamClient and 
RoutingTable are equal (#945)
---
 .../ratis/client/impl/DataStreamClientImpl.java    |  7 +++++
 .../org/apache/ratis/protocol/RoutingTable.java    | 30 +++++++++++++++-------
 .../ratis/datastream/DataStreamClusterTests.java   | 30 ++++++++++++++++++++++
 ...amSslWithRpcTypeGrpcAndDataStreamTypeNetty.java |  5 ++++
 4 files changed, 63 insertions(+), 9 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 7a25a8e4b..353f532c6 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -44,6 +44,7 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SlidingWindow;
 
 import java.io.IOException;
@@ -230,6 +231,12 @@ public class DataStreamClientImpl implements 
DataStreamClient {
 
   @Override
   public DataStreamOutputRpc stream(ByteBuffer headerMessage, RoutingTable 
routingTable) {
+    if (routingTable != null) {
+      // Validate that the primary peer is equal to the primary peer passed by 
the RoutingTable
+      
Preconditions.assertTrue(dataStreamServer.getId().equals(routingTable.getPrimary()),
+          () -> "Primary peer mismatched: the routing table has " + 
routingTable.getPrimary()
+              + " but the client has " + dataStreamServer.getId());
+    }
     final Message message =
         
Optional.ofNullable(headerMessage).map(ByteString::copyFrom).map(Message::valueOf).orElse(null);
     RaftClientRequest request = RaftClientRequest.newBuilder()
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
index 0157fe49a..56181c482 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
@@ -43,6 +43,9 @@ public interface RoutingTable {
   /** @return the successor peers of the given peer. */
   Set<RaftPeerId> getSuccessors(RaftPeerId peerId);
 
+  /** @return the primary peer. */
+  RaftPeerId getPrimary();
+
   /** @return the proto of this {@link RoutingTable}. */
   RoutingTableProto toProto();
 
@@ -78,15 +81,15 @@ public interface RoutingTable {
     }
 
     public RoutingTable build() {
-      return Optional.ofNullable(ref.getAndSet(null))
-          .map(RoutingTable::newRoutingTable)
-          .orElseThrow(() -> new IllegalStateException("RoutingTable Already 
built"));
+      final Map<RaftPeerId, Set<RaftPeerId>> map = ref.getAndSet(null);
+      if (map == null) {
+        throw new IllegalStateException("RoutingTable is already built.");
+      }
+      return RoutingTable.newRoutingTable(map);
     }
 
-    static void validate(Map<RaftPeerId, Set<RaftPeerId>> map) {
-      if (map != null && !map.isEmpty()) {
-        new Builder.Validation(map).run();
-      }
+    static RaftPeerId validate(Map<RaftPeerId, Set<RaftPeerId>> map) {
+      return new Builder.Validation(map).run();
     }
 
     /** Validate if a map represents a valid routing table. */
@@ -131,10 +134,11 @@ public interface RoutingTable {
         this.unreachablePeers = allPeers;
       }
 
-      private void run() {
+      private RaftPeerId run() {
         depthFirstSearch(primary);
         Preconditions.assertTrue(unreachablePeers.isEmpty() ,
             () -> "Invalid routing table: peer(s) " + unreachablePeers +  " 
are unreachable, " + this);
+        return primary;
       }
 
       private void depthFirstSearch(RaftPeerId current) {
@@ -159,7 +163,10 @@ public interface RoutingTable {
 
   /** @return a new {@link RoutingTable} represented by the given map. */
   static RoutingTable newRoutingTable(Map<RaftPeerId, Set<RaftPeerId>> map){
-    Builder.validate(map);
+    if (map == null || map.isEmpty()) {
+      return null;
+    }
+    final RaftPeerId primary = Builder.validate(map);
 
     final Supplier<RoutingTableProto> proto = JavaUtils.memoize(
         () -> 
RoutingTableProto.newBuilder().addAllRoutes(ProtoUtils.toRouteProtos(map)).build());
@@ -169,6 +176,11 @@ public interface RoutingTable {
         return 
Optional.ofNullable(map.get(peerId)).orElseGet(Collections::emptySet);
       }
 
+      @Override
+      public RaftPeerId getPrimary() {
+        return primary;
+      }
+
       @Override
       public RoutingTableProto toProto() {
         return proto.get();
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index fd47045a4..dcb54be3d 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -62,6 +62,11 @@ public abstract class DataStreamClusterTests<CLUSTER extends 
MiniRaftCluster> ex
     runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
   }
 
+  @Test
+  public void testStreamWithInvalidRoutingTable() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestInvalidPrimaryInRoutingTable);
+  }
+
   void testStreamWrites(CLUSTER cluster) throws Exception {
     waitForLeader(cluster);
     runTestDataStreamOutput(cluster);
@@ -96,6 +101,31 @@ public abstract class DataStreamClusterTests<CLUSTER 
extends MiniRaftCluster> ex
     assertLogEntry(cluster, request);
   }
 
+  void runTestInvalidPrimaryInRoutingTable(CLUSTER cluster) throws Exception {
+    final RaftPeer primaryServer = 
CollectionUtils.random(cluster.getGroup().getPeers());
+
+    RaftPeer notPrimary = null;
+    for (RaftPeer peer: cluster.getGroup().getPeers()) {
+      if (!peer.equals(primaryServer)) {
+        notPrimary = peer;
+        break;
+      }
+    }
+
+    Assert.assertNotNull(
+        "Cannot find peer other than the primary", notPrimary);
+    Assert.assertNotEquals(primaryServer, notPrimary);
+
+    try (RaftClient client = cluster.createClient(primaryServer)) {
+      RoutingTable routingTableWithWrongPrimary =
+          getRoutingTable(cluster.getGroup().getPeers(), notPrimary);
+      testFailureCase("",
+          () -> client.getDataStreamApi().stream(null,
+              routingTableWithWrongPrimary),
+          IllegalStateException.class);
+    }
+  }
+
   void runTestWriteFile(CLUSTER cluster, int i,
       CheckedConsumer<DataStreamOutputImpl, Exception> testCase) throws 
Exception {
     final RaftClientRequest request;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java
index 06702f91a..8e423ab29 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java
@@ -60,6 +60,11 @@ public class 
TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty
   public void testStreamWrites() {
   }
 
+  @Ignore
+  @Override
+  public void testStreamWithInvalidRoutingTable() {
+  }
+
   @Ignore
   @Override
   public void testMultipleStreamsMultipleServers() {

Reply via email to