This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new bc6221b32 RATIS-1519. When DataStreamManagement#read an exception 
occurs, remove DataStream (#596)
bc6221b32 is described below

commit bc6221b32fff9022cebb02bb243aea9fff35e290
Author: hao guo <[email protected]>
AuthorDate: Tue Apr 9 02:10:22 2024 +0800

    RATIS-1519. When DataStreamManagement#read an exception occurs, remove 
DataStream (#596)
---
 .../ratis/netty/server/DataStreamManagement.java   | 42 ++++++++++++----------
 .../ratis/datastream/DataStreamTestUtils.java      |  7 +++-
 .../datastream/TestNettyDataStreamWithMock.java    | 10 ++++--
 3 files changed, 38 insertions(+), 21 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index a6e9b815e..e265d8b92 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -140,19 +140,19 @@ public class DataStreamManagement {
     private final boolean primary;
     private final LocalStream local;
     private final Set<RemoteStream> remotes;
-    private final RaftServer server;
+    private final Division division;
     private final AtomicReference<CompletableFuture<Void>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
 
-    StreamInfo(RaftClientRequest request, boolean primary, 
CompletableFuture<DataStream> stream, RaftServer server,
+    StreamInfo(RaftClientRequest request, boolean primary, 
CompletableFuture<DataStream> stream, Division division,
         CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputImpl>, IOException> getStreams,
         Function<RequestType, RequestMetrics> metricsConstructor)
         throws IOException {
       this.request = request;
       this.primary = primary;
       this.local = new LocalStream(stream, 
metricsConstructor.apply(RequestType.LOCAL_WRITE));
-      this.server = server;
-      final Set<RaftPeer> successors = getSuccessors(server.getId());
+      this.division = division;
+      final Set<RaftPeer> successors = getSuccessors(division.getId());
       final Set<DataStreamOutputImpl> outs = getStreams.apply(request, 
successors);
       this.remotes = outs.stream()
           .map(o -> new RemoteStream(o, 
metricsConstructor.apply(RequestType.REMOTE_WRITE)))
@@ -167,16 +167,12 @@ public class DataStreamManagement {
       return request;
     }
 
-    Division getDivision() throws IOException {
-      return server.getDivision(request.getRaftGroupId());
+    Division getDivision() {
+      return division;
     }
 
     Collection<CommitInfoProto> getCommitInfos() {
-      try {
-        return getDivision().getCommitInfos();
-      } catch (IOException e) {
-        throw new IllegalStateException(e);
-      }
+      return getDivision().getCommitInfos();
     }
 
     boolean isPrimary() {
@@ -196,7 +192,7 @@ public class DataStreamManagement {
       return JavaUtils.getClassSimpleName(getClass()) + ":" + request;
     }
 
-    private Set<RaftPeer> getSuccessors(RaftPeerId peerId) throws IOException {
+    private Set<RaftPeer> getSuccessors(RaftPeerId peerId) {
       final RaftConfiguration conf = getDivision().getRaftConf();
       final RoutingTable routingTable = request.getRoutingTable();
 
@@ -208,7 +204,7 @@ public class DataStreamManagement {
         // Default start topology
         // get the other peers from the current configuration
         return conf.getCurrentPeers().stream()
-            .filter(p -> !p.getId().equals(server.getId()))
+            .filter(p -> !p.getId().equals(division.getId()))
             .collect(Collectors.toSet());
       }
 
@@ -276,7 +272,8 @@ public class DataStreamManagement {
       final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       final boolean isPrimary = server.getId().equals(request.getServerId());
-      return new StreamInfo(request, isPrimary, 
computeDataStreamIfAbsent(request), server, getStreams,
+      final Division division = server.getDivision(request.getRaftGroupId());
+      return new StreamInfo(request, isPrimary, 
computeDataStreamIfAbsent(request), division, getStreams,
           getMetrics()::newRequestMetrics);
     } catch (Throwable e) {
       throw new CompletionException(e);
@@ -411,6 +408,18 @@ public class DataStreamManagement {
       readImpl(request, ctx, getStreams);
     } catch (Throwable t) {
       replyDataStreamException(t, request, ctx);
+      removeDataStream(ClientInvocationId.valueOf(request.getClientId(), 
request.getStreamId()), null);
+    }
+  }
+
+  private void removeDataStream(ClientInvocationId invocationId, StreamInfo 
info) {
+    final StreamInfo removed = streams.remove(invocationId);
+    if (info == null) {
+      info = removed;
+    }
+    if (info != null) {
+      info.getDivision().getDataStreamMap().remove(invocationId);
+      info.getLocal().cleanUp();
     }
   }
 
@@ -429,8 +438,6 @@ public class DataStreamManagement {
           () -> newStreamInfo(request.slice(), getStreams));
       info = streams.computeIfAbsent(key, id -> supplier.get());
       if (!supplier.isInitialized()) {
-        final StreamInfo removed = streams.remove(key);
-        removed.getLocal().cleanUp();
         throw new IllegalStateException("Failed to create a new stream for " + 
request
             + " since a stream already exists Key: " + key + " StreamInfo:" + 
info);
       }
@@ -468,9 +475,8 @@ public class DataStreamManagement {
         }, requestExecutor)).whenComplete((v, exception) -> {
       try {
         if (exception != null) {
-          final StreamInfo removed = streams.remove(key);
           replyDataStreamException(server, exception, info.getRequest(), 
request, ctx);
-          removed.getLocal().cleanUp();
+          removeDataStream(key, info);
         }
       } finally {
         request.release();
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 47138919d..7735c3e30 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -151,6 +151,7 @@ public interface DataStreamTestUtils {
     @Override
     public CompletableFuture<DataStream> stream(RaftClientRequest request) {
       final SingleDataStream s = new SingleDataStream(request);
+      LOG.info("XXX {} put {}, {}", this, ClientInvocationId.valueOf(request), 
s);
       streams.put(ClientInvocationId.valueOf(request), s);
       return CompletableFuture.completedFuture(s);
     }
@@ -179,7 +180,9 @@ public interface DataStreamTestUtils {
     }
 
     SingleDataStream getSingleDataStream(ClientInvocationId invocationId) {
-      return streams.get(invocationId);
+      final SingleDataStream s = streams.get(invocationId);
+      LOG.info("XXX {}: get {} return {}", this, invocationId, s);
+      return s;
     }
 
     Collection<SingleDataStream> getStreams() {
@@ -329,6 +332,8 @@ public interface DataStreamTestUtils {
 
   static void assertHeader(RaftServer server, RaftClientRequest header, int 
dataSize, boolean stepDownLeader)
       throws Exception {
+    LOG.info("XXX {}: dataSize={}, stepDownLeader={}, header={}",
+        server.getId(), dataSize, stepDownLeader, header);
     // check header
     Assertions.assertEquals(RaftClientRequest.dataStreamRequestType(), 
header.getType());
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
index 503f8cf66..1d8c67a43 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
@@ -59,12 +59,18 @@ public class TestNettyDataStreamWithMock extends 
DataStreamBaseTest {
     RaftConfigKeys.DataStream.setType(properties, 
SupportedDataStreamType.NETTY);
   }
 
-  RaftServer.Division mockDivision(RaftServer server) {
+
+  RaftServer.Division mockDivision(RaftServer server, RaftGroupId groupId) {
     final RaftServer.Division division = mock(RaftServer.Division.class);
     when(division.getRaftServer()).thenReturn(server);
     when(division.getRaftConf()).thenAnswer(i -> getRaftConf());
 
     final MultiDataStreamStateMachine stateMachine = new 
MultiDataStreamStateMachine();
+    try {
+      stateMachine.initialize(server, groupId, null);
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
     when(division.getStateMachine()).thenReturn(stateMachine);
 
     final DataStreamMap streamMap = 
RaftServerTestUtil.newDataStreamMap(server.getId());
@@ -95,7 +101,7 @@ public class TestNettyDataStreamWithMock extends 
DataStreamBaseTest {
       when(raftServer.getId()).thenReturn(peerId);
       
when(raftServer.getPeer()).thenReturn(RaftPeer.newBuilder().setId(peerId).build());
       if (getStateMachineException == null) {
-        final RaftServer.Division myDivision = mockDivision(raftServer);
+        final RaftServer.Division myDivision = mockDivision(raftServer, 
groupId);
         
when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenReturn(myDivision);
       } else {
         
when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenThrow(getStateMachineException);

Reply via email to