This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit 6c2f52b57852d1988ecf179d552052f3c0cfd80b Author: yukon <[email protected]> AuthorDate: Mon May 27 20:10:43 2019 +0800 Add unit tests for netty handler --- .../remoting/impl/netty/handler/Decoder.java | 4 +- .../remoting/impl/netty/handler/Encoder.java | 11 +-- .../impl/netty/handler/ExceptionHandler.java | 11 ++- .../remoting/impl/netty/handler/DecoderTest.java | 84 ++++++++++++++++++++++ .../remoting/impl/netty/handler/EncoderTest.java | 70 ++++++++++++++++++ .../impl/netty/handler/ExceptionHandlerTest.java | 61 ++++++++++++++++ 6 files changed, 233 insertions(+), 8 deletions(-) diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java index f239ee9..4a906cf 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java @@ -82,11 +82,11 @@ public class Decoder extends ByteToMessageDecoder { int totalLength = wrapper.readInt(); if (totalLength <= 0) { - throw new IllegalArgumentException("Illegal total length " + totalLength); + throw new RemoteCodecException("Illegal total length " + totalLength); } if (totalLength > CodecHelper.PACKET_MAX_LEN) { - throw new IllegalArgumentException(String.format("Total length %d is more than limit %d", totalLength, CodecHelper.PACKET_MAX_LEN)); + throw new RemoteCodecException(String.format("Total length %d is more than limit %d", totalLength, CodecHelper.PACKET_MAX_LEN)); } if (wrapper.readableBytes() < totalLength - 1 /*MagicCode*/ - 4 /*TotalLen*/) { diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java index 78e5e5c..329f343 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java @@ -25,6 +25,7 @@ import io.netty.handler.codec.MessageToByteEncoder; import java.net.InetSocketAddress; import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper; import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.exception.RemoteCodecException; import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper; import org.apache.rocketmq.remoting.impl.command.CodecHelper; import org.slf4j.Logger; @@ -42,11 +43,13 @@ public class Encoder extends MessageToByteEncoder<RemotingCommand> { ByteBufferWrapper wrapper = new NettyByteBufferWrapper(out); encode(remotingCommand, wrapper); - } catch (final Exception e) { - LOG.error("Error occurred when encoding response for channel " + ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(), e); - if (remotingCommand != null) { - LOG.error(remotingCommand.toString()); + } catch (final RemoteCodecException e) { + String remoteAddress = "UnKnown"; + if (ctx.channel().remoteAddress() instanceof InetSocketAddress) { + remoteAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(); } + LOG.error(String.format("Error occurred when encoding command for channel %s", remoteAddress), e); + ctx.channel().close().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java index 52563f4..14baeb9 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.remoting.impl.netty.handler; import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; @@ -28,10 +30,15 @@ public class ExceptionHandler extends ChannelDuplexHandler { private final static Logger LOG = LoggerFactory.getLogger(ExceptionHandler.class); @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { // Uncaught exceptions from inbound handlers will propagate up to this handler LOG.error(String.format("channel exception %s occurred ! ", ctx.channel()), cause); - ctx.close(); + ctx.channel().close().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + LOG.warn("Close channel {} because of error {},result is {}", ctx.channel(), cause, future.isSuccess()); + } + }); } } diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/DecoderTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/DecoderTest.java new file mode 100644 index 0000000..3f81286 --- /dev/null +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/DecoderTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.netty.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.embedded.EmbeddedChannel; +import java.nio.channels.ClosedChannelException; +import org.apache.rocketmq.remoting.BaseTest; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.impl.command.CodecHelper; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; +import static org.junit.Assert.assertEquals; + +public class DecoderTest extends BaseTest { + + @Test + public void decode() { + EmbeddedChannel channel = new EmbeddedChannel(new Encoder(), new Decoder()); + + RemotingCommand request = randomRemotingCommand(); + channel.writeOutbound(request); + channel.flushOutbound(); + + ByteBuf buffer = channel.readOutbound(); + channel.writeInbound(buffer); + channel.flushInbound(); + + RemotingCommand decodedRequest = channel.readInbound(); + assertEquals(request, decodedRequest); + } + + @Test + public void decode_WithException() { + // Magic Code doesn't match + EmbeddedChannel channel = new EmbeddedChannel(new Decoder()); + + ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(); + buf.writeByte(0x15); + buf.retain(); + + flushChannelWithException(channel, buf); + + channel = new EmbeddedChannel(new Decoder()); + + buf.resetReaderIndex(); + buf.resetWriterIndex(); + + buf.writeByte(CodecHelper.PROTOCOL_MAGIC); + buf.writeInt(CodecHelper.PACKET_MAX_LEN + 1); + buf.writeBytes(new byte[CodecHelper.MIN_PROTOCOL_LEN - 1]); + + flushChannelWithException(channel, buf); + } + + private void flushChannelWithException(final EmbeddedChannel channel, final ByteBuf buf) { + try { + channel.writeInbound(buf); + channel.flushInbound(); + failBecauseExceptionWasNotThrown(ClosedChannelException.class); + } catch (Exception e) { + assertThat(e).isInstanceOf(ClosedChannelException.class); + } + } + +} \ No newline at end of file diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java new file mode 100644 index 0000000..15f89be --- /dev/null +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.netty.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.embedded.EmbeddedChannel; +import java.nio.channels.ClosedChannelException; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.rocketmq.remoting.BaseTest; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper; +import org.apache.rocketmq.remoting.impl.command.CodecHelper; +import org.junit.Test; + +import static org.apache.rocketmq.remoting.impl.command.CodecHelper.PROTOCOL_MAGIC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; +import static org.junit.Assert.assertEquals; + +public class EncoderTest extends BaseTest { + + @Test + public void encode() { + EmbeddedChannel channel = new EmbeddedChannel(new Encoder()); + + RemotingCommand request = randomRemotingCommand(); + channel.writeOutbound(request); + + ByteBuf buffer = channel.readOutbound(); + + // Skip magic code and total length + assertEquals(PROTOCOL_MAGIC, buffer.readByte()); + buffer.readInt(); + + RemotingCommand decodedRequest = CodecHelper.decode(new NettyByteBufferWrapper(buffer)); + + assertEquals(request, decodedRequest); + } + + + @Test + public void encode_WithException() { + EmbeddedChannel channel = new EmbeddedChannel(new Encoder()); + + RemotingCommand request = randomRemotingCommand(); + request.remark(RandomStringUtils.randomAlphabetic(Short.MAX_VALUE + 1)); + + try { + channel.writeOutbound(request); + failBecauseExceptionWasNotThrown(ClosedChannelException.class); + } catch (Exception e) { + assertThat(e).isInstanceOf(ClosedChannelException.class); + } + } +} \ No newline at end of file diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandlerTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandlerTest.java new file mode 100644 index 0000000..9687d38 --- /dev/null +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandlerTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.netty.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.ByteToMessageDecoder; +import java.nio.channels.ClosedChannelException; +import java.util.List; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.rocketmq.remoting.BaseTest; +import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper; +import org.apache.rocketmq.remoting.impl.command.CodecHelper; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; + +public class ExceptionHandlerTest extends BaseTest { + + @Test + public void exceptionCaught_WithException() { + EmbeddedChannel channel = new EmbeddedChannel(); + ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(); + channel.pipeline().addLast(new ByteToMessageDecoder() { + + @Override + protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, + final List<Object> out) throws Exception { + throw new NotImplementedException("Emtpy encode method"); + } + }, new ExceptionHandler()); + + CodecHelper.encodeCommand(randomRemotingCommand(), new NettyByteBufferWrapper(buffer)); + channel.writeInbound(buffer); + + try { + channel.flushInbound(); + failBecauseExceptionWasNotThrown(ClosedChannelException.class); + } catch (Exception e) { + assertThat(e).isInstanceOf(ClosedChannelException.class); + } + } +} \ No newline at end of file
