This is an automated email from the ASF dual-hosted git repository.

msingh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ee1e3a5  RATIS-566. OrderedStreamObservers#closeAllExisting should 
close the streams for a certain group id only. Contributed by Tsz Wo Nicholas 
Sze.
ee1e3a5 is described below

commit ee1e3a5e06335721920743bd187c33628357a003
Author: Mukul Kumar Singh <[email protected]>
AuthorDate: Wed May 29 14:08:43 2019 +0530

    RATIS-566. OrderedStreamObservers#closeAllExisting should close the streams 
for a certain group id only. Contributed by Tsz Wo Nicholas Sze.
---
 .../grpc/client/GrpcClientProtocolService.java     | 37 ++++++++++++++++++----
 .../org/apache/ratis/grpc/server/GrpcService.java  |  5 +--
 .../org/apache/ratis/server/RaftServerRpc.java     |  3 +-
 .../org/apache/ratis/server/impl/LeaderState.java  |  2 +-
 4 files changed, 37 insertions(+), 10 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index 9c2fc0c..772e586 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -108,12 +109,17 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
       CollectionUtils.removeExisting(so.getId(), so, map, () -> 
getClass().getSimpleName());
     }
 
-    void closeAllExisting() {
+    void closeAllExisting(RaftGroupId groupId) {
       // Iteration not synchronized:
       // Okay if an existing object is removed by another mean during the 
iteration since it must be already closed.
       // Also okay if a new object is added during the iteration since this 
method closes only the existing objects.
-      for(OrderedRequestStreamObserver so : map.values()) {
-        so.close(true);
+      for(Iterator<Map.Entry<Integer, OrderedRequestStreamObserver>> i = 
map.entrySet().iterator(); i.hasNext(); ) {
+        final OrderedRequestStreamObserver so = i.next().getValue();
+        final RaftGroupId gid = so.getGroupId();
+        if (gid == null || gid.equals(groupId)) {
+          so.close(true);
+          i.remove();
+        }
       }
     }
   }
@@ -147,9 +153,9 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
     return so;
   }
 
-  public void closeAllOrderedRequestStreamObservers() {
+  public void closeAllOrderedRequestStreamObservers(RaftGroupId groupId) {
     LOG.debug("{}: closeAllOrderedRequestStreamObservers", getId());
-    orderedStreamObservers.closeAllExisting();
+    orderedStreamObservers.closeAllExisting(groupId);
   }
 
   @Override
@@ -304,11 +310,17 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
   private class OrderedRequestStreamObserver extends RequestStreamObserver {
     private final SlidingWindow.Server<PendingOrderedRequest, RaftClientReply> 
slidingWindow
         = new SlidingWindow.Server<>(getName(), COMPLETED);
+    /** The {@link RaftGroupId} for this observer. */
+    private final AtomicReference<RaftGroupId> groupId = new 
AtomicReference<>();
 
     OrderedRequestStreamObserver(StreamObserver<RaftClientReplyProto> 
responseObserver) {
       super(responseObserver);
     }
 
+    RaftGroupId getGroupId() {
+      return groupId.get();
+    }
+
     void processClientRequest(PendingOrderedRequest pending) {
       final long seq = pending.getSeqNum();
       processClientRequest(pending.getRequest(),
@@ -317,7 +329,20 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
 
     @Override
     void processClientRequest(RaftClientRequest r) {
-      slidingWindow.receivedRequest(new PendingOrderedRequest(r), 
this::processClientRequest);
+      final RaftGroupId requestGroupId = r.getRaftGroupId();
+      // use the group id in the first request as the group id of this observer
+      final RaftGroupId updated = groupId.updateAndGet(g -> g != null ? g: 
requestGroupId);
+      final PendingOrderedRequest pending = new PendingOrderedRequest(r);
+
+      if (!requestGroupId.equals(updated)) {
+        final GroupMismatchException exception = new 
GroupMismatchException(getId()
+            + ": The group (" + requestGroupId + ") of " + r.getClientId()
+            + " does not match the group (" + updated + ") of the " + 
getClass().getSimpleName());
+        responseError(exception, () -> "processClientRequest (Group 
mismatched) for " + r);
+        return;
+      }
+
+      slidingWindow.receivedRequest(pending, this::processClientRequest);
     }
 
     private void sendReply(PendingOrderedRequest ready) {
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index de65acc..973dff4 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -20,6 +20,7 @@ package org.apache.ratis.grpc.server;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
@@ -163,8 +164,8 @@ public class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocolClient
   }
 
   @Override
-  public void notifyNotLeader() {
-    clientProtocolService.closeAllOrderedRequestStreamObservers();
+  public void notifyNotLeader(RaftGroupId groupId) {
+    clientProtocolService.closeAllOrderedRequestStreamObservers(groupId);
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index c37433e..e75c340 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server;
 
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.protocol.RaftPeer;
@@ -64,6 +65,6 @@ public interface RaftServerRpc extends RaftServerProtocol, 
RpcType.Get, Closeabl
   void handleException(RaftPeerId serverId, Exception e, boolean reconnect);
 
   /** The server role changes from leader to a non-leader role. */
-  default void notifyNotLeader() {
+  default void notifyNotLeader(RaftGroupId groupId) {
   }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 057936b..21fbd10 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -248,7 +248,7 @@ public class LeaderState {
     } catch (IOException e) {
       LOG.warn(server.getId() + ": Caught exception in 
sendNotLeaderResponses", e);
     }
-    server.getServerRpc().notifyNotLeader();
+    server.getServerRpc().notifyNotLeader(server.getGroupId());
   }
 
   void notifySenders() {

Reply via email to