This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new bc6221b32 RATIS-1519. When DataStreamManagement#read an exception
occurs, remove DataStream (#596)
bc6221b32 is described below
commit bc6221b32fff9022cebb02bb243aea9fff35e290
Author: hao guo <[email protected]>
AuthorDate: Tue Apr 9 02:10:22 2024 +0800
RATIS-1519. When DataStreamManagement#read an exception occurs, remove
DataStream (#596)
---
.../ratis/netty/server/DataStreamManagement.java | 42 ++++++++++++----------
.../ratis/datastream/DataStreamTestUtils.java | 7 +++-
.../datastream/TestNettyDataStreamWithMock.java | 10 ++++--
3 files changed, 38 insertions(+), 21 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index a6e9b815e..e265d8b92 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -140,19 +140,19 @@ public class DataStreamManagement {
private final boolean primary;
private final LocalStream local;
private final Set<RemoteStream> remotes;
- private final RaftServer server;
+ private final Division division;
private final AtomicReference<CompletableFuture<Void>> previous
= new AtomicReference<>(CompletableFuture.completedFuture(null));
- StreamInfo(RaftClientRequest request, boolean primary,
CompletableFuture<DataStream> stream, RaftServer server,
+ StreamInfo(RaftClientRequest request, boolean primary,
CompletableFuture<DataStream> stream, Division division,
CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputImpl>, IOException> getStreams,
Function<RequestType, RequestMetrics> metricsConstructor)
throws IOException {
this.request = request;
this.primary = primary;
this.local = new LocalStream(stream,
metricsConstructor.apply(RequestType.LOCAL_WRITE));
- this.server = server;
- final Set<RaftPeer> successors = getSuccessors(server.getId());
+ this.division = division;
+ final Set<RaftPeer> successors = getSuccessors(division.getId());
final Set<DataStreamOutputImpl> outs = getStreams.apply(request,
successors);
this.remotes = outs.stream()
.map(o -> new RemoteStream(o,
metricsConstructor.apply(RequestType.REMOTE_WRITE)))
@@ -167,16 +167,12 @@ public class DataStreamManagement {
return request;
}
- Division getDivision() throws IOException {
- return server.getDivision(request.getRaftGroupId());
+ Division getDivision() {
+ return division;
}
Collection<CommitInfoProto> getCommitInfos() {
- try {
- return getDivision().getCommitInfos();
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
+ return getDivision().getCommitInfos();
}
boolean isPrimary() {
@@ -196,7 +192,7 @@ public class DataStreamManagement {
return JavaUtils.getClassSimpleName(getClass()) + ":" + request;
}
- private Set<RaftPeer> getSuccessors(RaftPeerId peerId) throws IOException {
+ private Set<RaftPeer> getSuccessors(RaftPeerId peerId) {
final RaftConfiguration conf = getDivision().getRaftConf();
final RoutingTable routingTable = request.getRoutingTable();
@@ -208,7 +204,7 @@ public class DataStreamManagement {
// Default start topology
// get the other peers from the current configuration
return conf.getCurrentPeers().stream()
- .filter(p -> !p.getId().equals(server.getId()))
+ .filter(p -> !p.getId().equals(division.getId()))
.collect(Collectors.toSet());
}
@@ -276,7 +272,8 @@ public class DataStreamManagement {
final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
final boolean isPrimary = server.getId().equals(request.getServerId());
- return new StreamInfo(request, isPrimary,
computeDataStreamIfAbsent(request), server, getStreams,
+ final Division division = server.getDivision(request.getRaftGroupId());
+ return new StreamInfo(request, isPrimary,
computeDataStreamIfAbsent(request), division, getStreams,
getMetrics()::newRequestMetrics);
} catch (Throwable e) {
throw new CompletionException(e);
@@ -411,6 +408,18 @@ public class DataStreamManagement {
readImpl(request, ctx, getStreams);
} catch (Throwable t) {
replyDataStreamException(t, request, ctx);
+ removeDataStream(ClientInvocationId.valueOf(request.getClientId(),
request.getStreamId()), null);
+ }
+ }
+
+ private void removeDataStream(ClientInvocationId invocationId, StreamInfo
info) {
+ final StreamInfo removed = streams.remove(invocationId);
+ if (info == null) {
+ info = removed;
+ }
+ if (info != null) {
+ info.getDivision().getDataStreamMap().remove(invocationId);
+ info.getLocal().cleanUp();
}
}
@@ -429,8 +438,6 @@ public class DataStreamManagement {
() -> newStreamInfo(request.slice(), getStreams));
info = streams.computeIfAbsent(key, id -> supplier.get());
if (!supplier.isInitialized()) {
- final StreamInfo removed = streams.remove(key);
- removed.getLocal().cleanUp();
throw new IllegalStateException("Failed to create a new stream for " +
request
+ " since a stream already exists Key: " + key + " StreamInfo:" +
info);
}
@@ -468,9 +475,8 @@ public class DataStreamManagement {
}, requestExecutor)).whenComplete((v, exception) -> {
try {
if (exception != null) {
- final StreamInfo removed = streams.remove(key);
replyDataStreamException(server, exception, info.getRequest(),
request, ctx);
- removed.getLocal().cleanUp();
+ removeDataStream(key, info);
}
} finally {
request.release();
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 47138919d..7735c3e30 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -151,6 +151,7 @@ public interface DataStreamTestUtils {
@Override
public CompletableFuture<DataStream> stream(RaftClientRequest request) {
final SingleDataStream s = new SingleDataStream(request);
+ LOG.info("XXX {} put {}, {}", this, ClientInvocationId.valueOf(request),
s);
streams.put(ClientInvocationId.valueOf(request), s);
return CompletableFuture.completedFuture(s);
}
@@ -179,7 +180,9 @@ public interface DataStreamTestUtils {
}
SingleDataStream getSingleDataStream(ClientInvocationId invocationId) {
- return streams.get(invocationId);
+ final SingleDataStream s = streams.get(invocationId);
+ LOG.info("XXX {}: get {} return {}", this, invocationId, s);
+ return s;
}
Collection<SingleDataStream> getStreams() {
@@ -329,6 +332,8 @@ public interface DataStreamTestUtils {
static void assertHeader(RaftServer server, RaftClientRequest header, int
dataSize, boolean stepDownLeader)
throws Exception {
+ LOG.info("XXX {}: dataSize={}, stepDownLeader={}, header={}",
+ server.getId(), dataSize, stepDownLeader, header);
// check header
Assertions.assertEquals(RaftClientRequest.dataStreamRequestType(),
header.getType());
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
index 503f8cf66..1d8c67a43 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
@@ -59,12 +59,18 @@ public class TestNettyDataStreamWithMock extends
DataStreamBaseTest {
RaftConfigKeys.DataStream.setType(properties,
SupportedDataStreamType.NETTY);
}
- RaftServer.Division mockDivision(RaftServer server) {
+
+ RaftServer.Division mockDivision(RaftServer server, RaftGroupId groupId) {
final RaftServer.Division division = mock(RaftServer.Division.class);
when(division.getRaftServer()).thenReturn(server);
when(division.getRaftConf()).thenAnswer(i -> getRaftConf());
final MultiDataStreamStateMachine stateMachine = new
MultiDataStreamStateMachine();
+ try {
+ stateMachine.initialize(server, groupId, null);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
when(division.getStateMachine()).thenReturn(stateMachine);
final DataStreamMap streamMap =
RaftServerTestUtil.newDataStreamMap(server.getId());
@@ -95,7 +101,7 @@ public class TestNettyDataStreamWithMock extends
DataStreamBaseTest {
when(raftServer.getId()).thenReturn(peerId);
when(raftServer.getPeer()).thenReturn(RaftPeer.newBuilder().setId(peerId).build());
if (getStateMachineException == null) {
- final RaftServer.Division myDivision = mockDivision(raftServer);
+ final RaftServer.Division myDivision = mockDivision(raftServer,
groupId);
when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenReturn(myDivision);
} else {
when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenThrow(getStateMachineException);