This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f8f6705 RATIS-1153. Implement ChannelInboundHandler.exceptionCaught
in NettyServerStreamRpc. (#278)
f8f6705 is described below
commit f8f67055a9755c32a97e79ab8f307ba3c7f09c7b
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Nov 16 16:57:29 2020 +0800
RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in
NettyServerStreamRpc. (#278)
---
.../ratis/client/impl/DataStreamClientImpl.java | 8 +++--
.../java/org/apache/ratis/protocol/ClientId.java | 7 +++-
.../org/apache/ratis/protocol/RaftGroupId.java | 4 +--
.../java/org/apache/ratis/protocol/RaftId.java | 10 ++++--
.../java/org/apache/ratis/util/Preconditions.java | 38 ++++++++++----------
.../ratis/netty/server/DataStreamManagement.java | 32 +++++++++++++----
.../ratis/netty/server/NettyServerStreamRpc.java | 42 +++++++++++++++++++---
.../ratis/datastream/DataStreamBaseTest.java | 12 +++++--
.../ratis/datastream/TestDataStreamNetty.java | 25 ++++++++++---
9 files changed, 133 insertions(+), 45 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 27583b6..c078218 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -71,7 +71,11 @@ public class DataStreamClientImpl implements
DataStreamClient {
}
private CompletableFuture<DataStreamReply> send(Type type) {
- return send(type, DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER);
+ return combineHeader(send(type,
DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER));
+ }
+
+ private CompletableFuture<DataStreamReply>
combineHeader(CompletableFuture<DataStreamReply> future) {
+ return future.thenCombine(headerFuture, (reply, headerReply) ->
headerReply.isSuccess()? reply : headerReply);
}
// send to the attached dataStreamClientRpc
@@ -79,7 +83,7 @@ public class DataStreamClientImpl implements DataStreamClient
{
public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
final CompletableFuture<DataStreamReply> f = send(Type.STREAM_DATA, buf);
streamOffset += buf.remaining();
- return f;
+ return combineHeader(f);
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
index c8a591a..84f7d72 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -26,6 +26,11 @@ import java.util.UUID;
* to correctly identify retry requests from the same client.
*/
public final class ClientId extends RaftId {
+ private static final ClientId EMPTY_CLIENT_ID = new
ClientId(ZERO_UUID_BYTESTRING);
+
+ public static ClientId emptyClientId() {
+ return EMPTY_CLIENT_ID;
+ }
public static ClientId randomId() {
return new ClientId(UUID.randomUUID());
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
index 17b48d9..f4820d6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -27,7 +27,7 @@ import java.util.UUID;
* This is a value-based class.
*/
public final class RaftGroupId extends RaftId {
- private static final RaftGroupId EMPTY_GROUP_ID = new RaftGroupId(new
UUID(0L, 0L));
+ private static final RaftGroupId EMPTY_GROUP_ID = new
RaftGroupId(ZERO_UUID_BYTESTRING);
public static RaftGroupId emptyGroupId() {
return EMPTY_GROUP_ID;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
index 36d29af..b0f6228 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,6 +28,8 @@ import java.util.function.Supplier;
/** Unique identifier implemented using {@link UUID}. */
public abstract class RaftId {
+ static final UUID ZERO_UUID = new UUID(0L, 0L);
+ static final ByteString ZERO_UUID_BYTESTRING = toByteString(ZERO_UUID);
private static final int BYTE_LENGTH = 16;
private static void checkLength(int length, String name) {
@@ -55,16 +57,18 @@ public abstract class RaftId {
private final Supplier<String> uuidString;
private RaftId(UUID uuid, Supplier<ByteString> uuidBytes) {
- this.uuid = uuid;
+ this.uuid = Preconditions.assertNotNull(uuid, "uuid");
this.uuidBytes = uuidBytes;
this.uuidString = JavaUtils.memoize(() -> createUuidString(uuid));
}
RaftId(UUID uuid) {
this(uuid, JavaUtils.memoize(() -> toByteString(uuid)));
+ Preconditions.assertTrue(!uuid.equals(ZERO_UUID),
+ () -> "Failed to create " + JavaUtils.getClassSimpleName(getClass()) +
": UUID " + ZERO_UUID + " is reserved.");
}
- public RaftId(ByteString uuidBytes) {
+ RaftId(ByteString uuidBytes) {
this(toUuid(uuidBytes), () -> uuidBytes);
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
index fbad32d..4b57e7f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -1,23 +1,20 @@
/*
- * *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.ratis.util;
import java.util.Collections;
@@ -90,12 +87,13 @@ public interface Preconditions {
+ name + " = " + object + " != null, class = " + object.getClass());
}
- static void assertNotNull(Object object, Supplier<String> message) {
+ static <T> T assertNotNull(T object, Supplier<String> message) {
assertTrue(object != null, message);
+ return object;
}
- static void assertNotNull(Object object, String name) {
- assertNotNull(object, () -> name + " is expected to not be null but "
+ static <T> T assertNotNull(T object, String name) {
+ return assertNotNull(object, () -> name + " is expected to not be null but
"
+ name + " = " + object + " == null, class = " + object.getClass());
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 6ea857d..11923b0 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -25,9 +25,11 @@ import
org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.exceptions.DataStreamException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -331,7 +333,24 @@ public class DataStreamManagement {
DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
DataStreamException dataStreamException = new
DataStreamException(server.getId(), cause);
RaftClientReply reply = new RaftClientReply(raftClientRequest,
dataStreamException, null);
- ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
+ sendDataStreamException(cause, request, reply, ctx);
+ }
+
+ void replyDataStreamException(Throwable cause, DataStreamRequestByteBuf
request, ChannelHandlerContext ctx) {
+ DataStreamException dataStreamException = new
DataStreamException(server.getId(), cause);
+ RaftClientReply reply = new RaftClientReply(ClientId.emptyClientId(),
server.getId(), RaftGroupId.emptyGroupId(),
+ -1, false, null, dataStreamException, 0L, null);
+ sendDataStreamException(cause, request, reply, ctx);
+ }
+
+ static void sendDataStreamException(Throwable throwable,
DataStreamRequestByteBuf request, RaftClientReply reply,
+ ChannelHandlerContext ctx) {
+ LOG.warn("Failed to process {}", request, throwable);
+ try {
+ ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
+ } catch (Throwable t) {
+ LOG.warn("Failed to sendDataStreamException {} for {}", throwable,
request, t);
+ }
}
static void forwardStartTransaction(StreamInfo info,
DataStreamRequestByteBuf request, RaftClientReply localReply,
@@ -368,10 +387,15 @@ public class DataStreamManagement {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
+ final StreamInfo info = request.getType() != Type.STREAM_HEADER?
streams.get(key)
+ : streams.computeIfAbsent(key, id -> newStreamInfo(buf,
getDataStreamOutput));
+
+ if (info == null) {
+ throw new IllegalStateException("Failed to get StreamInfo for " +
request);
+ }
if (request.getType() == Type.START_TRANSACTION) {
// for peers to start transaction
- final StreamInfo info = streams.get(key);
composeAsync(info.getPrevious(), executor, v -> startTransaction(info,
request, ctx))
.whenComplete((v, exception) -> {
try {
@@ -385,19 +409,15 @@ public class DataStreamManagement {
return;
}
- final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites;
if (request.getType() == Type.STREAM_HEADER) {
- info = streams.computeIfAbsent(key, id -> newStreamInfo(buf,
getDataStreamOutput));
localWrite = CompletableFuture.completedFuture(0L);
remoteWrites = Collections.emptyList();
} else if (request.getType() == Type.STREAM_DATA) {
- info = streams.get(key);
localWrite = info.getLocal().write(buf, executor);
remoteWrites = info.applyToRemotes(out -> out.write(request));
} else if (request.getType() == Type.STREAM_CLOSE) {
- info = streams.get(key);
localWrite = info.getLocal().close(executor);
remoteWrites = info.isPrimary()?
info.applyToRemotes(RemoteStream::close): Collections.emptyList();
} else {
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index efe671a..9b06e24 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -22,7 +22,6 @@ import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
-import org.apache.ratis.io.CloseAsync;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.protocol.ClientId;
@@ -49,6 +48,7 @@ import
org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +61,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
public class NettyServerStreamRpc implements DataStreamServerRpc {
public static final Logger LOG =
LoggerFactory.getLogger(NettyServerStreamRpc.class);
@@ -90,7 +91,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
try {
getDataStreamOutput(outs, request);
} catch (IOException e) {
- outs.forEach(CloseAsync::closeAsync);
+ outs.forEach(DataStreamOutputRpc::closeAsync);
throw e;
}
return outs;
@@ -149,13 +150,46 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
proxies.addPeers(newPeers);
}
+ static class RequestRef {
+ private final AtomicReference<DataStreamRequestByteBuf> ref = new
AtomicReference<>();
+
+ DataStreamRequestByteBuf set(DataStreamRequestByteBuf current) {
+ Optional.ofNullable(ref.getAndSet(current)).ifPresent(previous -> {
+ throw new IllegalStateException("previous = " + previous + " != null,
current=" + current);
+ });
+ return current;
+ }
+
+ void reset(DataStreamRequestByteBuf expected) {
+ final DataStreamRequestByteBuf stored = ref.getAndSet(null);
+ Preconditions.assertTrue(stored == expected, () -> "Expected=" +
expected + " but stored=" + stored);
+ }
+
+ DataStreamRequestByteBuf getAndSetNull() {
+ return ref.getAndSet(null);
+ }
+ }
+
private ChannelInboundHandler newChannelInboundHandlerAdapter(){
return new ChannelInboundHandlerAdapter(){
+ private final RequestRef requestRef = new RequestRef();
+
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
- if (msg instanceof DataStreamRequestByteBuf) {
- requests.read((DataStreamRequestByteBuf)msg, ctx,
proxies::getDataStreamOutput);
+ if (!(msg instanceof DataStreamRequestByteBuf)) {
+ LOG.error("Unexpected message class {}, ignoring ...",
msg.getClass().getName());
+ return;
}
+
+ final DataStreamRequestByteBuf request =
requestRef.set((DataStreamRequestByteBuf)msg);
+ requests.read(request, ctx, proxies::getDataStreamOutput);
+ requestRef.reset(request);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable
throwable) {
+ Optional.ofNullable(requestRef.getAndSetNull())
+ .ifPresent(request -> requests.replyDataStreamException(throwable,
request, ctx));
}
};
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 8fc78a1..0cabd07 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -39,7 +39,6 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.SetConfigurationRequest;
-import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.DataStreamServerImpl;
@@ -401,10 +400,19 @@ abstract class DataStreamBaseTest extends BaseTest {
}
- void runTestMockCluster(int bufferSize, int bufferNum, Exception
expectedException)
+ void runTestMockCluster(int bufferSize, int bufferNum, Exception
expectedException, Exception headerException)
throws IOException {
try (final RaftClient client = newRaftClientForDataStream()) {
final DataStreamOutputImpl out = (DataStreamOutputImpl)
client.getDataStreamApi().stream();
+ if (headerException != null) {
+ final DataStreamReply headerReply = out.getHeaderFuture().join();
+ Assert.assertFalse(headerReply.isSuccess());
+ final RaftClientReply clientReply =
ClientProtoUtils.toRaftClientReply(RaftClientReplyProto.parseFrom(
+ ((DataStreamReplyByteBuffer)headerReply).slice()));
+
Assert.assertTrue(clientReply.getException().getMessage().contains(headerException.getMessage()));
+ return;
+ }
+
runTestDataStream(out, bufferSize, bufferNum);
DataStreamReplyByteBuffer replyByteBuffer = (DataStreamReplyByteBuffer)
out.closeAsync().join();
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index edfc8da..fdf68ad 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -83,12 +83,17 @@ public class TestDataStreamNetty extends DataStreamBaseTest
{
private void testMockCluster(int leaderIndex, int numServers, RaftException
leaderException,
Exception submitException) throws Exception {
+ testMockCluster(leaderIndex, numServers, leaderException, submitException,
null);
+ }
+
+ private void testMockCluster(int leaderIndex, int numServers, RaftException
leaderException,
+ Exception submitException, IOException getStateMachineException) throws
Exception {
List<RaftServer> raftServers = new ArrayList<>();
ClientId clientId = ClientId.randomId();
RaftGroupId groupId = RaftGroupId.randomId();
final RaftPeer suggestedLeader = RaftPeer.newBuilder().setId("s" +
leaderIndex).build();
RaftClientReply expectedClientReply = new RaftClientReply(clientId,
suggestedLeader.getId(), groupId, 0,
- leaderException == null ? true : false, null, leaderException, 0,
null);
+ leaderException == null, null, leaderException, 0, null);
for (int i = 0; i < numServers; i ++) {
RaftServer raftServer = mock(RaftServer.class);
@@ -117,23 +122,27 @@ public class TestDataStreamNetty extends
DataStreamBaseTest {
when(raftServer.getProperties()).thenReturn(properties);
when(raftServer.getId()).thenReturn(peerId);
-
when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenReturn(new
MultiDataStreamStateMachine());
+ if (getStateMachineException == null) {
+
when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenReturn(new
MultiDataStreamStateMachine());
+ } else {
+
when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenThrow(getStateMachineException);
+ }
raftServers.add(raftServer);
}
runTestMockCluster(raftServers, 1_000_000, 10,
- submitException != null ? submitException : leaderException);
+ submitException != null ? submitException : leaderException,
getStateMachineException);
}
void runTestMockCluster(List<RaftServer> raftServers, int bufferSize, int
bufferNum,
- Exception expectedException) throws Exception {
+ Exception expectedException, Exception headerException) throws Exception
{
try {
final List<RaftPeer> peers = raftServers.stream()
.map(TestDataStreamNetty::newRaftPeer)
.collect(Collectors.toList());
setup(peers, raftServers);
- runTestMockCluster(bufferSize, bufferNum, expectedException);
+ runTestMockCluster(bufferSize, bufferNum, expectedException,
headerException);
} finally {
shutdown();
}
@@ -184,4 +193,10 @@ public class TestDataStreamNetty extends
DataStreamBaseTest {
IOException ioException = new IOException("leader throw IOException");
testMockCluster(1, 3, null, ioException);
}
+
+ @Test
+ public void testDataStreamExceptionGetStateMachine() throws Exception {
+ final IOException getStateMachineException = new IOException("Failed to
get StateMachine");
+ testMockCluster(1, 1, null, null, getStateMachineException);
+ }
}