This is an automated email from the ASF dual-hosted git repository.
msingh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ee1e3a5 RATIS-566. OrderedStreamObservers#closeAllExisting should
close the streams for a certain group id only. Contributed by Tsz Wo Nicholas
Sze.
ee1e3a5 is described below
commit ee1e3a5e06335721920743bd187c33628357a003
Author: Mukul Kumar Singh <[email protected]>
AuthorDate: Wed May 29 14:08:43 2019 +0530
RATIS-566. OrderedStreamObservers#closeAllExisting should close the streams
for a certain group id only. Contributed by Tsz Wo Nicholas Sze.
---
.../grpc/client/GrpcClientProtocolService.java | 37 ++++++++++++++++++----
.../org/apache/ratis/grpc/server/GrpcService.java | 5 +--
.../org/apache/ratis/server/RaftServerRpc.java | 3 +-
.../org/apache/ratis/server/impl/LeaderState.java | 2 +-
4 files changed, 37 insertions(+), 10 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index 9c2fc0c..772e586 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -108,12 +109,17 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
CollectionUtils.removeExisting(so.getId(), so, map, () ->
getClass().getSimpleName());
}
- void closeAllExisting() {
+ void closeAllExisting(RaftGroupId groupId) {
// Iteration not synchronized:
// Okay if an existing object is removed by another mean during the
iteration since it must be already closed.
// Also okay if a new object is added during the iteration since this
method closes only the existing objects.
- for(OrderedRequestStreamObserver so : map.values()) {
- so.close(true);
+ for(Iterator<Map.Entry<Integer, OrderedRequestStreamObserver>> i =
map.entrySet().iterator(); i.hasNext(); ) {
+ final OrderedRequestStreamObserver so = i.next().getValue();
+ final RaftGroupId gid = so.getGroupId();
+ if (gid == null || gid.equals(groupId)) {
+ so.close(true);
+ i.remove();
+ }
}
}
}
@@ -147,9 +153,9 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
return so;
}
- public void closeAllOrderedRequestStreamObservers() {
+ public void closeAllOrderedRequestStreamObservers(RaftGroupId groupId) {
LOG.debug("{}: closeAllOrderedRequestStreamObservers", getId());
- orderedStreamObservers.closeAllExisting();
+ orderedStreamObservers.closeAllExisting(groupId);
}
@Override
@@ -304,11 +310,17 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
private class OrderedRequestStreamObserver extends RequestStreamObserver {
private final SlidingWindow.Server<PendingOrderedRequest, RaftClientReply>
slidingWindow
= new SlidingWindow.Server<>(getName(), COMPLETED);
+ /** The {@link RaftGroupId} for this observer. */
+ private final AtomicReference<RaftGroupId> groupId = new
AtomicReference<>();
OrderedRequestStreamObserver(StreamObserver<RaftClientReplyProto>
responseObserver) {
super(responseObserver);
}
+ RaftGroupId getGroupId() {
+ return groupId.get();
+ }
+
void processClientRequest(PendingOrderedRequest pending) {
final long seq = pending.getSeqNum();
processClientRequest(pending.getRequest(),
@@ -317,7 +329,20 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
@Override
void processClientRequest(RaftClientRequest r) {
- slidingWindow.receivedRequest(new PendingOrderedRequest(r),
this::processClientRequest);
+ final RaftGroupId requestGroupId = r.getRaftGroupId();
+ // use the group id in the first request as the group id of this observer
+ final RaftGroupId updated = groupId.updateAndGet(g -> g != null ? g:
requestGroupId);
+ final PendingOrderedRequest pending = new PendingOrderedRequest(r);
+
+ if (!requestGroupId.equals(updated)) {
+ final GroupMismatchException exception = new
GroupMismatchException(getId()
+ + ": The group (" + requestGroupId + ") of " + r.getClientId()
+ + " does not match the group (" + updated + ") of the " +
getClass().getSimpleName());
+ responseError(exception, () -> "processClientRequest (Group
mismatched) for " + r);
+ return;
+ }
+
+ slidingWindow.receivedRequest(pending, this::processClientRequest);
}
private void sendReply(PendingOrderedRequest ready) {
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index de65acc..973dff4 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -20,6 +20,7 @@ package org.apache.ratis.grpc.server;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
@@ -163,8 +164,8 @@ public class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocolClient
}
@Override
- public void notifyNotLeader() {
- clientProtocolService.closeAllOrderedRequestStreamObservers();
+ public void notifyNotLeader(RaftGroupId groupId) {
+ clientProtocolService.closeAllOrderedRequestStreamObservers(groupId);
}
@Override
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index c37433e..e75c340 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server;
+import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.protocol.RaftPeer;
@@ -64,6 +65,6 @@ public interface RaftServerRpc extends RaftServerProtocol,
RpcType.Get, Closeabl
void handleException(RaftPeerId serverId, Exception e, boolean reconnect);
/** The server role changes from leader to a non-leader role. */
- default void notifyNotLeader() {
+ default void notifyNotLeader(RaftGroupId groupId) {
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 057936b..21fbd10 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -248,7 +248,7 @@ public class LeaderState {
} catch (IOException e) {
LOG.warn(server.getId() + ": Caught exception in
sendNotLeaderResponses", e);
}
- server.getServerRpc().notifyNotLeader();
+ server.getServerRpc().notifyNotLeader(server.getGroupId());
}
void notifySenders() {