This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f8f6705  RATIS-1153. Implement ChannelInboundHandler.exceptionCaught 
in NettyServerStreamRpc. (#278)
f8f6705 is described below

commit f8f67055a9755c32a97e79ab8f307ba3c7f09c7b
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Nov 16 16:57:29 2020 +0800

    RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in 
NettyServerStreamRpc. (#278)
---
 .../ratis/client/impl/DataStreamClientImpl.java    |  8 +++--
 .../java/org/apache/ratis/protocol/ClientId.java   |  7 +++-
 .../org/apache/ratis/protocol/RaftGroupId.java     |  4 +--
 .../java/org/apache/ratis/protocol/RaftId.java     | 10 ++++--
 .../java/org/apache/ratis/util/Preconditions.java  | 38 ++++++++++----------
 .../ratis/netty/server/DataStreamManagement.java   | 32 +++++++++++++----
 .../ratis/netty/server/NettyServerStreamRpc.java   | 42 +++++++++++++++++++---
 .../ratis/datastream/DataStreamBaseTest.java       | 12 +++++--
 .../ratis/datastream/TestDataStreamNetty.java      | 25 ++++++++++---
 9 files changed, 133 insertions(+), 45 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 27583b6..c078218 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -71,7 +71,11 @@ public class DataStreamClientImpl implements 
DataStreamClient {
     }
 
     private CompletableFuture<DataStreamReply> send(Type type) {
-      return send(type, DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER);
+      return combineHeader(send(type, 
DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER));
+    }
+
+    private CompletableFuture<DataStreamReply> 
combineHeader(CompletableFuture<DataStreamReply> future) {
+      return future.thenCombine(headerFuture, (reply, headerReply) -> 
headerReply.isSuccess()? reply : headerReply);
     }
 
     // send to the attached dataStreamClientRpc
@@ -79,7 +83,7 @@ public class DataStreamClientImpl implements DataStreamClient 
{
     public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
       final CompletableFuture<DataStreamReply> f = send(Type.STREAM_DATA, buf);
       streamOffset += buf.remaining();
-      return f;
+      return combineHeader(f);
     }
 
     @Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
index c8a591a..84f7d72 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -26,6 +26,11 @@ import java.util.UUID;
  * to correctly identify retry requests from the same client.
  */
 public final class ClientId extends RaftId {
+  private static final ClientId EMPTY_CLIENT_ID = new 
ClientId(ZERO_UUID_BYTESTRING);
+
+  public static ClientId emptyClientId() {
+    return EMPTY_CLIENT_ID;
+  }
 
   public static ClientId randomId() {
     return new ClientId(UUID.randomUUID());
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
index 17b48d9..f4820d6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,7 +27,7 @@ import java.util.UUID;
  * This is a value-based class.
  */
 public final class RaftGroupId extends RaftId {
-  private static final RaftGroupId EMPTY_GROUP_ID = new RaftGroupId(new 
UUID(0L, 0L));
+  private static final RaftGroupId EMPTY_GROUP_ID = new 
RaftGroupId(ZERO_UUID_BYTESTRING);
 
   public static RaftGroupId emptyGroupId() {
     return EMPTY_GROUP_ID;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
index 36d29af..b0f6228 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,6 +28,8 @@ import java.util.function.Supplier;
 
 /** Unique identifier implemented using {@link UUID}. */
 public abstract class RaftId {
+  static final UUID ZERO_UUID = new UUID(0L, 0L);
+  static final ByteString ZERO_UUID_BYTESTRING = toByteString(ZERO_UUID);
   private static final int BYTE_LENGTH = 16;
 
   private static void checkLength(int length, String name) {
@@ -55,16 +57,18 @@ public abstract class RaftId {
   private final Supplier<String> uuidString;
 
   private RaftId(UUID uuid, Supplier<ByteString> uuidBytes) {
-    this.uuid = uuid;
+    this.uuid = Preconditions.assertNotNull(uuid, "uuid");
     this.uuidBytes = uuidBytes;
     this.uuidString = JavaUtils.memoize(() -> createUuidString(uuid));
   }
 
   RaftId(UUID uuid) {
     this(uuid, JavaUtils.memoize(() -> toByteString(uuid)));
+    Preconditions.assertTrue(!uuid.equals(ZERO_UUID),
+        () -> "Failed to create " + JavaUtils.getClassSimpleName(getClass()) + 
": UUID " + ZERO_UUID + " is reserved.");
   }
 
-  public RaftId(ByteString uuidBytes) {
+  RaftId(ByteString uuidBytes) {
     this(toUuid(uuidBytes), () -> uuidBytes);
   }
 
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java 
b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
index fbad32d..4b57e7f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -1,23 +1,20 @@
 /*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
  *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
-
 package org.apache.ratis.util;
 
 import java.util.Collections;
@@ -90,12 +87,13 @@ public interface Preconditions {
         + name + " = " + object + " != null, class = " + object.getClass());
   }
 
-  static void assertNotNull(Object object, Supplier<String> message) {
+  static <T> T assertNotNull(T object, Supplier<String> message) {
     assertTrue(object != null, message);
+    return object;
   }
 
-  static void assertNotNull(Object object, String name) {
-    assertNotNull(object, () -> name + " is expected to not be null but "
+  static <T> T assertNotNull(T object, String name) {
+    return assertNotNull(object, () -> name + " is expected to not be null but 
"
         + name + " = " + object + " == null, class = " + object.getClass());
   }
 
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 6ea857d..11923b0 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -25,9 +25,11 @@ import 
org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.exceptions.DataStreamException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -331,7 +333,24 @@ public class DataStreamManagement {
       DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
     DataStreamException dataStreamException = new 
DataStreamException(server.getId(), cause);
     RaftClientReply reply = new RaftClientReply(raftClientRequest, 
dataStreamException, null);
-    ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
+    sendDataStreamException(cause, request, reply, ctx);
+  }
+
+  void replyDataStreamException(Throwable cause, DataStreamRequestByteBuf 
request, ChannelHandlerContext ctx) {
+    DataStreamException dataStreamException = new 
DataStreamException(server.getId(), cause);
+    RaftClientReply reply = new RaftClientReply(ClientId.emptyClientId(), 
server.getId(), RaftGroupId.emptyGroupId(),
+        -1, false, null, dataStreamException, 0L, null);
+    sendDataStreamException(cause, request, reply, ctx);
+  }
+
+  static void sendDataStreamException(Throwable throwable, 
DataStreamRequestByteBuf request, RaftClientReply reply,
+      ChannelHandlerContext ctx) {
+    LOG.warn("Failed to process {}",  request, throwable);
+    try {
+      ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
+    } catch (Throwable t) {
+      LOG.warn("Failed to sendDataStreamException {} for {}", throwable, 
request, t);
+    }
   }
 
   static void forwardStartTransaction(StreamInfo info, 
DataStreamRequestByteBuf request, RaftClientReply localReply,
@@ -368,10 +387,15 @@ public class DataStreamManagement {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), 
request.getStreamId());
+    final StreamInfo info = request.getType() != Type.STREAM_HEADER? 
streams.get(key)
+        : streams.computeIfAbsent(key, id -> newStreamInfo(buf, 
getDataStreamOutput));
+
+    if (info == null) {
+      throw new IllegalStateException("Failed to get StreamInfo for " + 
request);
+    }
 
     if (request.getType() == Type.START_TRANSACTION) {
       // for peers to start transaction
-      final StreamInfo info = streams.get(key);
       composeAsync(info.getPrevious(), executor, v -> startTransaction(info, 
request, ctx))
           .whenComplete((v, exception) -> {
         try {
@@ -385,19 +409,15 @@ public class DataStreamManagement {
       return;
     }
 
-    final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites;
     if (request.getType() == Type.STREAM_HEADER) {
-      info = streams.computeIfAbsent(key, id -> newStreamInfo(buf, 
getDataStreamOutput));
       localWrite = CompletableFuture.completedFuture(0L);
       remoteWrites = Collections.emptyList();
     } else if (request.getType() == Type.STREAM_DATA) {
-      info = streams.get(key);
       localWrite = info.getLocal().write(buf, executor);
       remoteWrites = info.applyToRemotes(out -> out.write(request));
     } else if (request.getType() == Type.STREAM_CLOSE) {
-      info = streams.get(key);
       localWrite = info.getLocal().close(executor);
       remoteWrites = info.isPrimary()? 
info.applyToRemotes(RemoteStream::close): Collections.emptyList();
     } else {
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index efe671a..9b06e24 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -22,7 +22,6 @@ import org.apache.ratis.client.DataStreamClient;
 import org.apache.ratis.client.DataStreamOutputRpc;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
-import org.apache.ratis.io.CloseAsync;
 import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.protocol.ClientId;
@@ -49,6 +48,7 @@ import 
org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
 import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +61,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class NettyServerStreamRpc implements DataStreamServerRpc {
   public static final Logger LOG = 
LoggerFactory.getLogger(NettyServerStreamRpc.class);
@@ -90,7 +91,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       try {
         getDataStreamOutput(outs, request);
       } catch (IOException e) {
-        outs.forEach(CloseAsync::closeAsync);
+        outs.forEach(DataStreamOutputRpc::closeAsync);
         throw e;
       }
       return outs;
@@ -149,13 +150,46 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     proxies.addPeers(newPeers);
   }
 
+  static class RequestRef {
+    private final AtomicReference<DataStreamRequestByteBuf> ref = new 
AtomicReference<>();
+
+    DataStreamRequestByteBuf set(DataStreamRequestByteBuf current) {
+      Optional.ofNullable(ref.getAndSet(current)).ifPresent(previous -> {
+        throw new IllegalStateException("previous = " + previous + " != null, 
current=" + current);
+      });
+      return current;
+    }
+
+    void reset(DataStreamRequestByteBuf expected) {
+      final DataStreamRequestByteBuf stored = ref.getAndSet(null);
+      Preconditions.assertTrue(stored == expected, () -> "Expected=" + 
expected + " but stored=" + stored);
+    }
+
+    DataStreamRequestByteBuf getAndSetNull() {
+      return ref.getAndSet(null);
+    }
+  }
+
   private ChannelInboundHandler newChannelInboundHandlerAdapter(){
     return new ChannelInboundHandlerAdapter(){
+      private final RequestRef requestRef = new RequestRef();
+
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        if (msg instanceof DataStreamRequestByteBuf) {
-          requests.read((DataStreamRequestByteBuf)msg, ctx, 
proxies::getDataStreamOutput);
+        if (!(msg instanceof DataStreamRequestByteBuf)) {
+          LOG.error("Unexpected message class {}, ignoring ...", 
msg.getClass().getName());
+          return;
         }
+
+        final DataStreamRequestByteBuf request = 
requestRef.set((DataStreamRequestByteBuf)msg);
+        requests.read(request, ctx, proxies::getDataStreamOutput);
+        requestRef.reset(request);
+      }
+
+      @Override
+      public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
throwable) {
+        Optional.ofNullable(requestRef.getAndSetNull())
+            .ifPresent(request -> requests.replyDataStreamException(throwable, 
request, ctx));
       }
     };
   }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 8fc78a1..0cabd07 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -39,7 +39,6 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.SetConfigurationRequest;
-import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.DataStreamServerImpl;
@@ -401,10 +400,19 @@ abstract class DataStreamBaseTest extends BaseTest {
   }
 
 
-  void runTestMockCluster(int bufferSize, int bufferNum, Exception 
expectedException)
+  void runTestMockCluster(int bufferSize, int bufferNum, Exception 
expectedException, Exception headerException)
       throws IOException {
     try (final RaftClient client = newRaftClientForDataStream()) {
       final DataStreamOutputImpl out = (DataStreamOutputImpl) 
client.getDataStreamApi().stream();
+      if (headerException != null) {
+        final DataStreamReply headerReply = out.getHeaderFuture().join();
+        Assert.assertFalse(headerReply.isSuccess());
+        final RaftClientReply clientReply = 
ClientProtoUtils.toRaftClientReply(RaftClientReplyProto.parseFrom(
+            ((DataStreamReplyByteBuffer)headerReply).slice()));
+        
Assert.assertTrue(clientReply.getException().getMessage().contains(headerException.getMessage()));
+        return;
+      }
+
       runTestDataStream(out, bufferSize, bufferNum);
       DataStreamReplyByteBuffer replyByteBuffer = (DataStreamReplyByteBuffer) 
out.closeAsync().join();
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index edfc8da..fdf68ad 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -83,12 +83,17 @@ public class TestDataStreamNetty extends DataStreamBaseTest 
{
 
   private void testMockCluster(int leaderIndex, int numServers, RaftException 
leaderException,
       Exception submitException) throws Exception {
+    testMockCluster(leaderIndex, numServers, leaderException, submitException, 
null);
+  }
+
+  private void testMockCluster(int leaderIndex, int numServers, RaftException 
leaderException,
+      Exception submitException, IOException getStateMachineException) throws 
Exception {
     List<RaftServer> raftServers = new ArrayList<>();
     ClientId clientId = ClientId.randomId();
     RaftGroupId groupId = RaftGroupId.randomId();
     final RaftPeer suggestedLeader = RaftPeer.newBuilder().setId("s" + 
leaderIndex).build();
     RaftClientReply expectedClientReply = new RaftClientReply(clientId, 
suggestedLeader.getId(), groupId, 0,
-        leaderException == null ? true : false, null, leaderException, 0, 
null);
+        leaderException == null, null, leaderException, 0, null);
 
     for (int i = 0; i < numServers; i ++) {
       RaftServer raftServer = mock(RaftServer.class);
@@ -117,23 +122,27 @@ public class TestDataStreamNetty extends 
DataStreamBaseTest {
 
       when(raftServer.getProperties()).thenReturn(properties);
       when(raftServer.getId()).thenReturn(peerId);
-      
when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenReturn(new 
MultiDataStreamStateMachine());
+      if (getStateMachineException == null) {
+        
when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenReturn(new 
MultiDataStreamStateMachine());
+      } else {
+        
when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenThrow(getStateMachineException);
+      }
 
       raftServers.add(raftServer);
     }
 
     runTestMockCluster(raftServers, 1_000_000, 10,
-        submitException != null ? submitException : leaderException);
+        submitException != null ? submitException : leaderException, 
getStateMachineException);
   }
 
   void runTestMockCluster(List<RaftServer> raftServers, int bufferSize, int 
bufferNum,
-      Exception expectedException) throws Exception {
+      Exception expectedException, Exception headerException) throws Exception 
{
     try {
       final List<RaftPeer> peers = raftServers.stream()
           .map(TestDataStreamNetty::newRaftPeer)
           .collect(Collectors.toList());
       setup(peers, raftServers);
-      runTestMockCluster(bufferSize, bufferNum, expectedException);
+      runTestMockCluster(bufferSize, bufferNum, expectedException, 
headerException);
     } finally {
       shutdown();
     }
@@ -184,4 +193,10 @@ public class TestDataStreamNetty extends 
DataStreamBaseTest {
     IOException ioException = new IOException("leader throw IOException");
     testMockCluster(1, 3, null, ioException);
   }
+
+  @Test
+  public void testDataStreamExceptionGetStateMachine() throws Exception {
+    final IOException getStateMachineException = new IOException("Failed to 
get StateMachine");
+    testMockCluster(1, 1, null, null, getStateMachineException);
+  }
 }

Reply via email to