This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git

commit 6c2f52b57852d1988ecf179d552052f3c0cfd80b
Author: yukon <[email protected]>
AuthorDate: Mon May 27 20:10:43 2019 +0800

    Add unit tests for netty handler
---
 .../remoting/impl/netty/handler/Decoder.java       |  4 +-
 .../remoting/impl/netty/handler/Encoder.java       | 11 +--
 .../impl/netty/handler/ExceptionHandler.java       | 11 ++-
 .../remoting/impl/netty/handler/DecoderTest.java   | 84 ++++++++++++++++++++++
 .../remoting/impl/netty/handler/EncoderTest.java   | 70 ++++++++++++++++++
 .../impl/netty/handler/ExceptionHandlerTest.java   | 61 ++++++++++++++++
 6 files changed, 233 insertions(+), 8 deletions(-)

diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
index f239ee9..4a906cf 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
@@ -82,11 +82,11 @@ public class Decoder extends ByteToMessageDecoder {
         int totalLength = wrapper.readInt();
 
         if (totalLength <= 0) {
-            throw new IllegalArgumentException("Illegal total length " + 
totalLength);
+            throw new RemoteCodecException("Illegal total length " + 
totalLength);
         }
 
         if (totalLength > CodecHelper.PACKET_MAX_LEN) {
-            throw new IllegalArgumentException(String.format("Total length %d 
is more than limit %d", totalLength, CodecHelper.PACKET_MAX_LEN));
+            throw new RemoteCodecException(String.format("Total length %d is 
more than limit %d", totalLength, CodecHelper.PACKET_MAX_LEN));
         }
 
         if (wrapper.readableBytes() < totalLength - 1 /*MagicCode*/ - 4 
/*TotalLen*/) {
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
index 78e5e5c..329f343 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
@@ -25,6 +25,7 @@ import io.netty.handler.codec.MessageToByteEncoder;
 import java.net.InetSocketAddress;
 import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.exception.RemoteCodecException;
 import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper;
 import org.apache.rocketmq.remoting.impl.command.CodecHelper;
 import org.slf4j.Logger;
@@ -42,11 +43,13 @@ public class Encoder extends 
MessageToByteEncoder<RemotingCommand> {
             ByteBufferWrapper wrapper = new NettyByteBufferWrapper(out);
 
             encode(remotingCommand, wrapper);
-        } catch (final Exception e) {
-            LOG.error("Error occurred when encoding response for channel " + 
((InetSocketAddress) 
ctx.channel().remoteAddress()).getAddress().getHostAddress(), e);
-            if (remotingCommand != null) {
-                LOG.error(remotingCommand.toString());
+        } catch (final RemoteCodecException e) {
+            String remoteAddress = "UnKnown";
+            if (ctx.channel().remoteAddress() instanceof InetSocketAddress) {
+                remoteAddress = ((InetSocketAddress) 
ctx.channel().remoteAddress()).getAddress().getHostAddress();
             }
+            LOG.error(String.format("Error occurred when encoding command for 
channel %s", remoteAddress), e);
+
             ctx.channel().close().addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws 
Exception {
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java
index 52563f4..14baeb9 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java
@@ -18,6 +18,8 @@
 package org.apache.rocketmq.remoting.impl.netty.handler;
 
 import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import org.slf4j.Logger;
@@ -28,10 +30,15 @@ public class ExceptionHandler extends ChannelDuplexHandler {
     private final static Logger LOG = 
LoggerFactory.getLogger(ExceptionHandler.class);
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) {
         // Uncaught exceptions from inbound handlers will propagate up to this 
handler
         LOG.error(String.format("channel exception %s occurred ! ", 
ctx.channel()), cause);
-        ctx.close();
+        ctx.channel().close().addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws 
Exception {
+                LOG.warn("Close channel {} because of error {},result is {}", 
ctx.channel(), cause, future.isSuccess());
+            }
+        });
     }
 
 }
diff --git 
a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/DecoderTest.java
 
b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/DecoderTest.java
new file mode 100644
index 0000000..3f81286
--- /dev/null
+++ 
b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/DecoderTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.nio.channels.ClosedChannelException;
+import org.apache.rocketmq.remoting.BaseTest;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.impl.command.CodecHelper;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.junit.Assert.assertEquals;
+
+public class DecoderTest extends BaseTest {
+
+    @Test
+    public void decode() {
+        EmbeddedChannel channel = new EmbeddedChannel(new Encoder(), new 
Decoder());
+
+        RemotingCommand request = randomRemotingCommand();
+        channel.writeOutbound(request);
+        channel.flushOutbound();
+
+        ByteBuf buffer = channel.readOutbound();
+        channel.writeInbound(buffer);
+        channel.flushInbound();
+
+        RemotingCommand decodedRequest = channel.readInbound();
+        assertEquals(request, decodedRequest);
+    }
+
+    @Test
+    public void decode_WithException() {
+        // Magic Code doesn't match
+        EmbeddedChannel channel = new EmbeddedChannel(new Decoder());
+
+        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+        buf.writeByte(0x15);
+        buf.retain();
+
+        flushChannelWithException(channel, buf);
+
+        channel = new EmbeddedChannel(new Decoder());
+
+        buf.resetReaderIndex();
+        buf.resetWriterIndex();
+
+        buf.writeByte(CodecHelper.PROTOCOL_MAGIC);
+        buf.writeInt(CodecHelper.PACKET_MAX_LEN + 1);
+        buf.writeBytes(new byte[CodecHelper.MIN_PROTOCOL_LEN - 1]);
+
+        flushChannelWithException(channel, buf);
+    }
+
+    private void flushChannelWithException(final EmbeddedChannel channel, 
final ByteBuf buf) {
+        try {
+            channel.writeInbound(buf);
+            channel.flushInbound();
+            failBecauseExceptionWasNotThrown(ClosedChannelException.class);
+        } catch (Exception e) {
+            assertThat(e).isInstanceOf(ClosedChannelException.class);
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java
 
b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java
new file mode 100644
index 0000000..15f89be
--- /dev/null
+++ 
b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.nio.channels.ClosedChannelException;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.remoting.BaseTest;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper;
+import org.apache.rocketmq.remoting.impl.command.CodecHelper;
+import org.junit.Test;
+
+import static 
org.apache.rocketmq.remoting.impl.command.CodecHelper.PROTOCOL_MAGIC;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.junit.Assert.assertEquals;
+
+public class EncoderTest extends BaseTest {
+
+    @Test
+    public void encode() {
+        EmbeddedChannel channel = new EmbeddedChannel(new Encoder());
+
+        RemotingCommand request = randomRemotingCommand();
+        channel.writeOutbound(request);
+
+        ByteBuf buffer = channel.readOutbound();
+
+        // Skip magic code and total length
+        assertEquals(PROTOCOL_MAGIC, buffer.readByte());
+        buffer.readInt();
+
+        RemotingCommand decodedRequest = CodecHelper.decode(new 
NettyByteBufferWrapper(buffer));
+
+        assertEquals(request, decodedRequest);
+    }
+
+
+    @Test
+    public void encode_WithException() {
+        EmbeddedChannel channel = new EmbeddedChannel(new Encoder());
+
+        RemotingCommand request = randomRemotingCommand();
+        request.remark(RandomStringUtils.randomAlphabetic(Short.MAX_VALUE + 
1));
+
+        try {
+            channel.writeOutbound(request);
+            failBecauseExceptionWasNotThrown(ClosedChannelException.class);
+        } catch (Exception e) {
+            assertThat(e).isInstanceOf(ClosedChannelException.class);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandlerTest.java
 
b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandlerTest.java
new file mode 100644
index 0000000..9687d38
--- /dev/null
+++ 
b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandlerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import java.nio.channels.ClosedChannelException;
+import java.util.List;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.rocketmq.remoting.BaseTest;
+import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper;
+import org.apache.rocketmq.remoting.impl.command.CodecHelper;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
+
+public class ExceptionHandlerTest extends BaseTest {
+
+    @Test
+    public void exceptionCaught_WithException() {
+        EmbeddedChannel channel = new EmbeddedChannel();
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer();
+        channel.pipeline().addLast(new ByteToMessageDecoder() {
+
+            @Override
+            protected void decode(final ChannelHandlerContext ctx, final 
ByteBuf in,
+                final List<Object> out) throws Exception {
+                throw new NotImplementedException("Emtpy encode method");
+            }
+        }, new ExceptionHandler());
+
+        CodecHelper.encodeCommand(randomRemotingCommand(), new 
NettyByteBufferWrapper(buffer));
+        channel.writeInbound(buffer);
+
+        try {
+            channel.flushInbound();
+            failBecauseExceptionWasNotThrown(ClosedChannelException.class);
+        } catch (Exception e) {
+            assertThat(e).isInstanceOf(ClosedChannelException.class);
+        }
+    }
+}
\ No newline at end of file

Reply via email to