This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit 58fcc75a6c1206a33909e3ba76b0d1e4c29a8b21 Author: yukon <[email protected]> AuthorDate: Fri May 17 20:08:47 2019 +0800 Clean code and polish protocol description --- .../api/channel/ChannelHandlerContextWrapper.java | 21 ------------- .../rocketmq/remoting/common/ResponseFuture.java | 9 ------ .../remoting/common/SemaphoreReleaseOnlyOnce.java | 4 --- .../rocketmq/remoting/external/ThreadUtils.java | 3 +- .../channel/ChannelHandlerContextWrapperImpl.java | 33 -------------------- .../impl/command/RemotingSysResponseCode.java | 2 -- .../remoting/impl/netty/NettyRemotingAbstract.java | 10 ++++++- .../org/apache/rocketmq/remoting/package-info.java | 35 +++++++++------------- 8 files changed, 24 insertions(+), 93 deletions(-) diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelHandlerContextWrapper.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelHandlerContextWrapper.java deleted file mode 100644 index 05c3b18..0000000 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelHandlerContextWrapper.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.remoting.api.channel; - -public interface ChannelHandlerContextWrapper<T> { - T getContext(); -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java index 014dd78..76f3472 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java @@ -42,7 +42,6 @@ public class ResponseFuture { private SemaphoreReleaseOnlyOnce once; private RemotingCommand requestCommand; - private InterceptorGroup interceptorGroup; private String remoteAddr; public ResponseFuture(int requestId, long timeoutMillis, AsyncHandler asyncHandler, @@ -139,14 +138,6 @@ public class ResponseFuture { this.requestCommand = requestCommand; } - public InterceptorGroup getInterceptorGroup() { - return interceptorGroup; - } - - public void setInterceptorGroup(InterceptorGroup interceptorGroup) { - this.interceptorGroup = interceptorGroup; - } - public String getRemoteAddr() { return remoteAddr; } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java index 1c5849b..9babace 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java @@ -33,8 +33,4 @@ public class SemaphoreReleaseOnlyOnce { this.semaphore.release(); } } - - public Semaphore getSemaphore() { - return semaphore; - } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java index a4a7487..3f43b62 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java @@ -168,8 +168,7 @@ public final class ThreadUtils { executor.shutdown(); try { // Wait a while for existing tasks to terminate. - if (!executor - .awaitTermination(timeout, timeUnit)) { + if (!executor.awaitTermination(timeout, timeUnit)) { executor.shutdownNow(); // Wait a while for tasks to respond to being cancelled. if (!executor.awaitTermination(timeout, timeUnit)) { diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java deleted file mode 100644 index bbd33ea..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.impl.channel; - -import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper; - -public class ChannelHandlerContextWrapperImpl<ChannelHandlerContext> implements ChannelHandlerContextWrapper { - - private io.netty.channel.ChannelHandlerContext context; - - public ChannelHandlerContextWrapperImpl(io.netty.channel.ChannelHandlerContext context) { - this.context = context; - } - - public io.netty.channel.ChannelHandlerContext getContext() { - return context; - } -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java index ae76c6f..f47fd72 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java @@ -26,6 +26,4 @@ public class RemotingSysResponseCode { public static final short SYSTEM_BUSY = 2; public static final short REQUEST_CODE_NOT_SUPPORTED = 3; - - public static final short TRANSACTION_FAILED = 4; } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index 4c72b78..38059a8 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -147,7 +147,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { processResponseCommand(ctx, command); break; default: - LOG.warn("Not supported The traffic type {} !", command.trafficType()); + LOG.warn("The traffic type {} is NOT supported!", command.trafficType()); break; } } @@ -156,6 +156,14 @@ public abstract class NettyRemotingAbstract implements RemotingService { public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { Pair<RequestProcessor, ExecutorService> processorExecutorPair = this.processorTables.get(cmd.cmdCode()); + if (processorExecutorPair == null) { + final RemotingCommand response = commandFactory().createResponse(cmd); + response.opCode(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED); + ctx.writeAndFlush(response); + LOG.warn("The command code {} is NOT supported!", cmd.cmdCode()); + return; + } + RemotingChannel channel = new NettyChannelImpl(ctx.channel()); Runnable run = buildProcessorTask(ctx, cmd, processorExecutorPair, channel); diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/package-info.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/package-info.java index e64f66b..28de0af 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/package-info.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/package-info.java @@ -21,28 +21,21 @@ * Remoting wire-format protocol description: * * <pre> - * 2015-04-29 16:07:14 v1.0 - * 2016-04-23 16:18:05 v2.0 - * 2016-05-31 09:33:11 v3.0 - * 2016-11-10 09:33:11 v3.1 remove deprecated tag field - * - * - * 1.Protocol Type 1 byte - * 2.Total Length 4 byte,exclude protocol type size - * 3.RequestID 4 byte,used for repeatable requests,connection reuse.an requestID string + * 1.Protocol Magic 1 byte(0x14) + * 2.Total Length 4 byte,exclude protocol type size + * 3.Command Code 2 byte, command key + * 4.Command Version 2 byte, command version + * 5.RequestID 4 byte,used for repeatable requests,connection reuse.an requestID string * representing a client-generated, globally unique for some time unit, identifier for the request - * 4.Serializer Type 1 byte - * 5.Traffic Type 1 byte,0-sync;1-async;2-oneway;3-response - * 6.OpCode Length 2 byte - * 7.OpCode variant length,utf8 string - * 8.Remark Length 2 byte - * 9.Remark variant length,utf8 string - * 10.Properties Size 2 byte - * Property Length 2 byte - * Property Body variant length,utf8,Key\nValue - * 11.Inbound or OutBound payload length 4 byte - * 12.Inbound or OutBound payload variant length, max size limitation is 16M - * 13.Extra payload variant length + * 6.Traffic Type 1 byte,0-sync;1-async;2-oneway;3-response + * 7.OpCode 2 byte, operation result code(success or error) + * 8.Remark Length 2 byte + * 9.Remark variant length,utf8 string + * 10.Properties Size 2 byte + * 11.Property Length 2 byte + * 12.Property Body variant length,utf8,Key\nValue + * 13.Inbound or OutBound payload length 4 byte + * 14.Inbound or OutBound payload variant length, max size limitation is 16M * * </pre> */
