http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTimeoutException.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTimeoutException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTimeoutException.java new file mode 100644 index 0000000..b8c1bb0 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTimeoutException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.exception; + +/** + * @author shijia.wxr + */ +public class RemotingTimeoutException extends RemotingException { + + private static final long serialVersionUID = 4106899185095245979L; + + + public RemotingTimeoutException(String message) { + super(message); + } + + + public RemotingTimeoutException(String addr, long timeoutMillis) { + this(addr, timeoutMillis, null); + } + + + public RemotingTimeoutException(String addr, long timeoutMillis, Throwable cause) { + super("wait response on the channel <" + addr + "> timeout, " + timeoutMillis + "(ms)", cause); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTooMuchRequestException.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTooMuchRequestException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTooMuchRequestException.java new file mode 100644 index 0000000..41be8b3 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTooMuchRequestException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.exception; + +/** + * @author shijia.wxr + */ +public class RemotingTooMuchRequestException extends RemotingException { + private static final long serialVersionUID = 4326919581254519654L; + + + public RemotingTooMuchRequestException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyClientConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyClientConfig.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyClientConfig.java new file mode 100644 index 0000000..4665b28 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyClientConfig.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.netty; + +/** + * @author shijia.wxr + * + */ +public class NettyClientConfig { + /** + * Worker thread number + */ + private int clientWorkerThreads = 4; + private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors(); + private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE; + private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE; + private int connectTimeoutMillis = 3000; + private long channelNotActiveInterval = 1000 * 60; + + /** + * IdleStateEvent will be triggered when neither read nor write was performed for + * the specified period of this time. Specify {@code 0} to disable + */ + private int clientChannelMaxIdleTimeSeconds = 120; + + private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize; + private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; + private boolean clientPooledByteBufAllocatorEnable = false; + private boolean clientCloseSocketIfTimeout = false; + + public boolean isClientCloseSocketIfTimeout() { + return clientCloseSocketIfTimeout; + } + + public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) { + this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout; + } + + public int getClientWorkerThreads() { + return clientWorkerThreads; + } + + + public void setClientWorkerThreads(int clientWorkerThreads) { + this.clientWorkerThreads = clientWorkerThreads; + } + + + public int getClientOnewaySemaphoreValue() { + return clientOnewaySemaphoreValue; + } + + + public void setClientOnewaySemaphoreValue(int clientOnewaySemaphoreValue) { + this.clientOnewaySemaphoreValue = clientOnewaySemaphoreValue; + } + + + public int getConnectTimeoutMillis() { + return connectTimeoutMillis; + } + + + public void setConnectTimeoutMillis(int connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + } + + + public int getClientCallbackExecutorThreads() { + return clientCallbackExecutorThreads; + } + + + public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) { + this.clientCallbackExecutorThreads = clientCallbackExecutorThreads; + } + + + public long getChannelNotActiveInterval() { + return channelNotActiveInterval; + } + + + public void setChannelNotActiveInterval(long channelNotActiveInterval) { + this.channelNotActiveInterval = channelNotActiveInterval; + } + + + public int getClientAsyncSemaphoreValue() { + return clientAsyncSemaphoreValue; + } + + + public void setClientAsyncSemaphoreValue(int clientAsyncSemaphoreValue) { + this.clientAsyncSemaphoreValue = clientAsyncSemaphoreValue; + } + + + public int getClientChannelMaxIdleTimeSeconds() { + return clientChannelMaxIdleTimeSeconds; + } + + + public void setClientChannelMaxIdleTimeSeconds(int clientChannelMaxIdleTimeSeconds) { + this.clientChannelMaxIdleTimeSeconds = clientChannelMaxIdleTimeSeconds; + } + + + public int getClientSocketSndBufSize() { + return clientSocketSndBufSize; + } + + + public void setClientSocketSndBufSize(int clientSocketSndBufSize) { + this.clientSocketSndBufSize = clientSocketSndBufSize; + } + + + public int getClientSocketRcvBufSize() { + return clientSocketRcvBufSize; + } + + + public void setClientSocketRcvBufSize(int clientSocketRcvBufSize) { + this.clientSocketRcvBufSize = clientSocketRcvBufSize; + } + + + public boolean isClientPooledByteBufAllocatorEnable() { + return clientPooledByteBufAllocatorEnable; + } + + + public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) { + this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyDecoder.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyDecoder.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyDecoder.java new file mode 100644 index 0000000..9e68533 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyDecoder.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.netty; + +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; + + +/** + * @author shijia.wxr + * + */ +public class NettyDecoder extends LengthFieldBasedFrameDecoder { + private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private static final int FRAME_MAX_LENGTH = // + Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); + + + public NettyDecoder() { + super(FRAME_MAX_LENGTH, 0, 4, 0, 4); + } + + + @Override + public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + ByteBuf frame = null; + try { + frame = (ByteBuf) super.decode(ctx, in); + if (null == frame) { + return null; + } + + ByteBuffer byteBuffer = frame.nioBuffer(); + + return RemotingCommand.decode(byteBuffer); + } catch (Exception e) { + log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); + RemotingUtil.closeChannel(ctx.channel()); + } finally { + if (null != frame) { + frame.release(); + } + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEncoder.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEncoder.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEncoder.java new file mode 100644 index 0000000..c6c901c --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEncoder.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.netty; + +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; + + +/** + * @author shijia.wxr + * + */ +public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> { + private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + + @Override + public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) + throws Exception { + try { + ByteBuffer header = remotingCommand.encodeHeader(); + out.writeBytes(header); + byte[] body = remotingCommand.getBody(); + if (body != null) { + out.writeBytes(body); + } + } catch (Exception e) { + log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); + if (remotingCommand != null) { + log.error(remotingCommand.toString()); + } + RemotingUtil.closeChannel(ctx.channel()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEvent.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEvent.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEvent.java new file mode 100644 index 0000000..14a2071 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEvent.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.netty; + +import io.netty.channel.Channel; + + +/** + * @author shijia.wxr + */ +public class NettyEvent { + private final NettyEventType type; + private final String remoteAddr; + private final Channel channel; + + + public NettyEvent(NettyEventType type, String remoteAddr, Channel channel) { + this.type = type; + this.remoteAddr = remoteAddr; + this.channel = channel; + } + + + public NettyEventType getType() { + return type; + } + + + public String getRemoteAddr() { + return remoteAddr; + } + + + public Channel getChannel() { + return channel; + } + + + @Override + public String toString() { + return "NettyEvent [type=" + type + ", remoteAddr=" + remoteAddr + ", channel=" + channel + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEventType.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEventType.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEventType.java new file mode 100644 index 0000000..3113147 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEventType.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.netty; + +/** + * @author shijia.wxr + * + */ +public enum NettyEventType { + CONNECT, + CLOSE, + IDLE, + EXCEPTION +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java new file mode 100644 index 0000000..70ae5b5 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -0,0 +1,452 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.netty; + +import com.alibaba.rocketmq.remoting.ChannelEventListener; +import com.alibaba.rocketmq.remoting.InvokeCallback; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.common.Pair; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; +import com.alibaba.rocketmq.remoting.common.ServiceThread; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import com.alibaba.rocketmq.remoting.protocol.RemotingSysResponseCode; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + + +/** + * @author shijia.wxr + */ +public abstract class NettyRemotingAbstract { + private static final Logger PLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + + + protected final Semaphore semaphoreOneway; + + + protected final Semaphore semaphoreAsync; + + + protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable = + new ConcurrentHashMap<Integer, ResponseFuture>(256); + + protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = + new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64); + protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter(); + + protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor; + + + public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) { + this.semaphoreOneway = new Semaphore(permitsOneway, true); + this.semaphoreAsync = new Semaphore(permitsAsync, true); + } + + public abstract ChannelEventListener getChannelEventListener(); + + public void putNettyEvent(final NettyEvent event) { + this.nettyEventExecuter.putNettyEvent(event); + } + + public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { + final RemotingCommand cmd = msg; + if (cmd != null) { + switch (cmd.getType()) { + case REQUEST_COMMAND: + processRequestCommand(ctx, cmd); + break; + case RESPONSE_COMMAND: + processResponseCommand(ctx, cmd); + break; + default: + break; + } + } + } + + public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { + final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); + final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; + final int opaque = cmd.getOpaque(); + + if (pair != null) { + Runnable run = new Runnable() { + @Override + public void run() { + try { + RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); + if (rpcHook != null) { + rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); + } + + final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); + if (rpcHook != null) { + rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); + } + + if (!cmd.isOnewayRPC()) { + if (response != null) { + response.setOpaque(opaque); + response.markResponseType(); + try { + ctx.writeAndFlush(response); + } catch (Throwable e) { + PLOG.error("process request over, but response failed", e); + PLOG.error(cmd.toString()); + PLOG.error(response.toString()); + } + } else { + + } + } + } catch (Throwable e) { + if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException" + .equals(e.getClass().getCanonicalName())) { + PLOG.error("process request exception", e); + PLOG.error(cmd.toString()); + } + + if (!cmd.isOnewayRPC()) { + final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, // + RemotingHelper.exceptionSimpleDesc(e)); + response.setOpaque(opaque); + ctx.writeAndFlush(response); + } + } + } + }; + + if (pair.getObject1().rejectRequest()) { + final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, + "[REJECTREQUEST]system busy, start flow control for a while"); + response.setOpaque(opaque); + ctx.writeAndFlush(response); + return; + } + + try { + final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); + pair.getObject2().submit(requestTask); + } catch (RejectedExecutionException e) { + if ((System.currentTimeMillis() % 10000) == 0) { + PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) // + + ", too many requests and system thread pool busy, RejectedExecutionException " // + + pair.getObject2().toString() // + + " request code: " + cmd.getCode()); + } + + if (!cmd.isOnewayRPC()) { + final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, + "[OVERLOAD]system busy, start flow control for a while"); + response.setOpaque(opaque); + ctx.writeAndFlush(response); + } + } + } else { + String error = " request type " + cmd.getCode() + " not supported"; + final RemotingCommand response = + RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); + response.setOpaque(opaque); + ctx.writeAndFlush(response); + PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); + } + } + + public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { + final int opaque = cmd.getOpaque(); + final ResponseFuture responseFuture = responseTable.get(opaque); + if (responseFuture != null) { + responseFuture.setResponseCommand(cmd); + + responseFuture.release(); + + responseTable.remove(opaque); + + if (responseFuture.getInvokeCallback() != null) { + boolean runInThisThread = false; + ExecutorService executor = this.getCallbackExecutor(); + if (executor != null) { + try { + executor.submit(new Runnable() { + @Override + public void run() { + try { + responseFuture.executeInvokeCallback(); + } catch (Throwable e) { + PLOG.warn("execute callback in executor exception, and callback throw", e); + } + } + }); + } catch (Exception e) { + runInThisThread = true; + PLOG.warn("execute callback in executor exception, maybe executor busy", e); + } + } else { + runInThisThread = true; + } + + if (runInThisThread) { + try { + responseFuture.executeInvokeCallback(); + } catch (Throwable e) { + PLOG.warn("executeInvokeCallback Exception", e); + } + } + } else { + responseFuture.putResponse(cmd); + } + } else { + PLOG.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + PLOG.warn(cmd.toString()); + } + } + + public abstract RPCHook getRPCHook(); + + abstract public ExecutorService getCallbackExecutor(); + + public void scanResponseTable() { + final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>(); + Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<Integer, ResponseFuture> next = it.next(); + ResponseFuture rep = next.getValue(); + + if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { + rep.release(); + it.remove(); + rfList.add(rep); + PLOG.warn("remove timeout request, " + rep); + } + } + + for (ResponseFuture rf : rfList) { + try { + rf.executeInvokeCallback(); + } catch (Throwable e) { + PLOG.warn("scanResponseTable, operationComplete Exception", e); + } + } + } + + public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) + throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { + final int opaque = request.getOpaque(); + + try { + final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); + this.responseTable.put(opaque, responseFuture); + final SocketAddress addr = channel.remoteAddress(); + channel.writeAndFlush(request).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) throws Exception { + if (f.isSuccess()) { + responseFuture.setSendRequestOK(true); + return; + } else { + responseFuture.setSendRequestOK(false); + } + + responseTable.remove(opaque); + responseFuture.setCause(f.cause()); + responseFuture.putResponse(null); + PLOG.warn("send a request command to channel <" + addr + "> failed."); + } + }); + + RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); + if (null == responseCommand) { + if (responseFuture.isSendRequestOK()) { + throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, + responseFuture.getCause()); + } else { + throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); + } + } + + return responseCommand; + } finally { + this.responseTable.remove(opaque); + } + } + + public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, + final InvokeCallback invokeCallback) + throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + final int opaque = request.getOpaque(); + boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + if (acquired) { + final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); + + final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once); + this.responseTable.put(opaque, responseFuture); + try { + channel.writeAndFlush(request).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) throws Exception { + if (f.isSuccess()) { + responseFuture.setSendRequestOK(true); + return; + } else { + responseFuture.setSendRequestOK(false); + } + + responseFuture.putResponse(null); + responseTable.remove(opaque); + try { + responseFuture.executeInvokeCallback(); + } catch (Throwable e) { + PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e); + } finally { + responseFuture.release(); + } + + PLOG.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); + } + }); + } catch (Exception e) { + responseFuture.release(); + PLOG.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); + throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); + } + } else { + String info = + String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // + timeoutMillis, // + this.semaphoreAsync.getQueueLength(), // + this.semaphoreAsync.availablePermits()// + ); + PLOG.warn(info); + throw new RemotingTooMuchRequestException(info); + } + } + + public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) + throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + request.markOnewayRPC(); + boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + if (acquired) { + final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); + try { + channel.writeAndFlush(request).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) throws Exception { + once.release(); + if (!f.isSuccess()) { + PLOG.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); + } + } + }); + } catch (Exception e) { + once.release(); + PLOG.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed."); + throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); + } + } else { + if (timeoutMillis <= 0) { + throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); + } else { + String info = String.format( + "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // + timeoutMillis, // + this.semaphoreOneway.getQueueLength(), // + this.semaphoreOneway.availablePermits()// + ); + PLOG.warn(info); + throw new RemotingTimeoutException(info); + } + } + } + + class NettyEventExecuter extends ServiceThread { + private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>(); + private final int maxSize = 10000; + + + public void putNettyEvent(final NettyEvent event) { + if (this.eventQueue.size() <= maxSize) { + this.eventQueue.add(event); + } else { + PLOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString()); + } + } + + + @Override + public void run() { + PLOG.info(this.getServiceName() + " service started"); + + final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener(); + + while (!this.isStopped()) { + try { + NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS); + if (event != null && listener != null) { + switch (event.getType()) { + case IDLE: + listener.onChannelIdle(event.getRemoteAddr(), event.getChannel()); + break; + case CLOSE: + listener.onChannelClose(event.getRemoteAddr(), event.getChannel()); + break; + case CONNECT: + listener.onChannelConnect(event.getRemoteAddr(), event.getChannel()); + break; + case EXCEPTION: + listener.onChannelException(event.getRemoteAddr(), event.getChannel()); + break; + default: + break; + + } + } + } catch (Exception e) { + PLOG.warn(this.getServiceName() + " service has exception. ", e); + } + } + + PLOG.info(this.getServiceName() + " service end"); + } + + + @Override + public String getServiceName() { + return NettyEventExecuter.class.getSimpleName(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java new file mode 100644 index 0000000..68555c5 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java @@ -0,0 +1,682 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.netty; + +import com.alibaba.rocketmq.remoting.ChannelEventListener; +import com.alibaba.rocketmq.remoting.InvokeCallback; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.RemotingClient; +import com.alibaba.rocketmq.remoting.common.Pair; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.SocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * @author shijia.wxr + */ +public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { + private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + + private static final long LOCK_TIMEOUT_MILLIS = 3000; + + private final NettyClientConfig nettyClientConfig; + private final Bootstrap bootstrap = new Bootstrap(); + private final EventLoopGroup eventLoopGroupWorker; + private final Lock lockChannelTables = new ReentrantLock(); + private final ConcurrentHashMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>(); + + private final Timer timer = new Timer("ClientHouseKeepingService", true); + + private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>(); + private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>(); + private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex()); + private final Lock lockNamesrvChannel = new ReentrantLock(); + + private final ExecutorService publicExecutor; + private final ChannelEventListener channelEventListener; + private DefaultEventExecutorGroup defaultEventExecutorGroup; + private RPCHook rpcHook; + + public NettyRemotingClient(final NettyClientConfig nettyClientConfig) { + this(nettyClientConfig, null); + } + + public NettyRemotingClient(final NettyClientConfig nettyClientConfig, // + final ChannelEventListener channelEventListener) { + super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue()); + this.nettyClientConfig = nettyClientConfig; + this.channelEventListener = channelEventListener; + + int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads(); + if (publicThreadNums <= 0) { + publicThreadNums = 4; + } + + this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet()); + } + }); + + this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet())); + } + }); + } + + private static int initValueIndex() { + Random r = new Random(); + + return Math.abs(r.nextInt() % 999) % 999; + } + + @Override + public void start() { + this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(// + nettyClientConfig.getClientWorkerThreads(), // + new ThreadFactory() { + + private AtomicInteger threadIndex = new AtomicInteger(0); + + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); + } + }); + + Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, false) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) + .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) + .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) + .handler(new ChannelInitializer<SocketChannel>() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + defaultEventExecutorGroup, + new NettyEncoder(), + new NettyDecoder(), + new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), + new NettyConnetManageHandler(), + new NettyClientHandler()); + } + }); + + this.timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + NettyRemotingClient.this.scanResponseTable(); + } catch (Exception e) { + log.error("scanResponseTable exception", e); + } + } + }, 1000 * 3, 1000); + + if (this.channelEventListener != null) { + this.nettyEventExecuter.start(); + } + } + + @Override + public void shutdown() { + try { + this.timer.cancel(); + + for (ChannelWrapper cw : this.channelTables.values()) { + this.closeChannel(null, cw.getChannel()); + } + + this.channelTables.clear(); + + this.eventLoopGroupWorker.shutdownGracefully(); + + if (this.nettyEventExecuter != null) { + this.nettyEventExecuter.shutdown(); + } + + if (this.defaultEventExecutorGroup != null) { + this.defaultEventExecutorGroup.shutdownGracefully(); + } + } catch (Exception e) { + log.error("NettyRemotingClient shutdown exception, ", e); + } + + if (this.publicExecutor != null) { + try { + this.publicExecutor.shutdown(); + } catch (Exception e) { + log.error("NettyRemotingServer shutdown exception, ", e); + } + } + } + + public void closeChannel(final String addr, final Channel channel) { + if (null == channel) + return; + + final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr; + + try { + if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + boolean removeItemFromTable = true; + final ChannelWrapper prevCW = this.channelTables.get(addrRemote); + + log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null); + + if (null == prevCW) { + log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote); + removeItemFromTable = false; + } else if (prevCW.getChannel() != channel) { + log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.", + addrRemote); + removeItemFromTable = false; + } + + if (removeItemFromTable) { + this.channelTables.remove(addrRemote); + log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote); + } + + RemotingUtil.closeChannel(channel); + } catch (Exception e) { + log.error("closeChannel: close the channel exception", e); + } finally { + this.lockChannelTables.unlock(); + } + } else { + log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); + } + } catch (InterruptedException e) { + log.error("closeChannel exception", e); + } + } + + @Override + public void registerRPCHook(RPCHook rpcHook) { + this.rpcHook = rpcHook; + } + + public void closeChannel(final Channel channel) { + if (null == channel) + return; + + try { + if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + boolean removeItemFromTable = true; + ChannelWrapper prevCW = null; + String addrRemote = null; + for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) { + String key = entry.getKey(); + ChannelWrapper prev = entry.getValue(); + if (prev.getChannel() != null) { + if (prev.getChannel() == channel) { + prevCW = prev; + addrRemote = key; + break; + } + } + } + + if (null == prevCW) { + log.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote); + removeItemFromTable = false; + } + + if (removeItemFromTable) { + this.channelTables.remove(addrRemote); + log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote); + RemotingUtil.closeChannel(channel); + } + } catch (Exception e) { + log.error("closeChannel: close the channel exception", e); + } finally { + this.lockChannelTables.unlock(); + } + } else { + log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); + } + } catch (InterruptedException e) { + log.error("closeChannel exception", e); + } + } + + @Override + public void updateNameServerAddressList(List<String> addrs) { + List<String> old = this.namesrvAddrList.get(); + boolean update = false; + + if (!addrs.isEmpty()) { + if (null == old) { + update = true; + } else if (addrs.size() != old.size()) { + update = true; + } else { + for (int i = 0; i < addrs.size() && !update; i++) { + if (!old.contains(addrs.get(i))) { + update = true; + } + } + } + + if (update) { + Collections.shuffle(addrs); + this.namesrvAddrList.set(addrs); + } + } + } + + @Override + public List<String> getNameServerAddressList() { + return this.namesrvAddrList.get(); + } + + @Override + public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) + throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { + final Channel channel = this.getAndCreateChannel(addr); + if (channel != null && channel.isActive()) { + try { + if (this.rpcHook != null) { + this.rpcHook.doBeforeRequest(addr, request); + } + RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis); + if (this.rpcHook != null) { + this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); + } + return response; + } catch (RemotingSendRequestException e) { + log.warn("invokeSync: send request exception, so close the channel[{}]", addr); + this.closeChannel(addr, channel); + throw e; + } catch (RemotingTimeoutException e) { + if (nettyClientConfig.isClientCloseSocketIfTimeout()) { + this.closeChannel(addr, channel); + log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); + } + log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); + throw e; + } + } else { + this.closeChannel(addr, channel); + throw new RemotingConnectException(addr); + } + } + + private Channel getAndCreateChannel(final String addr) throws InterruptedException { + if (null == addr) + return getAndCreateNameserverChannel(); + + ChannelWrapper cw = this.channelTables.get(addr); + if (cw != null && cw.isOK()) { + return cw.getChannel(); + } + + return this.createChannel(addr); + } + + private Channel getAndCreateNameserverChannel() throws InterruptedException { + String addr = this.namesrvAddrChoosed.get(); + if (addr != null) { + ChannelWrapper cw = this.channelTables.get(addr); + if (cw != null && cw.isOK()) { + return cw.getChannel(); + } + } + + final List<String> addrList = this.namesrvAddrList.get(); + if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + addr = this.namesrvAddrChoosed.get(); + if (addr != null) { + ChannelWrapper cw = this.channelTables.get(addr); + if (cw != null && cw.isOK()) { + return cw.getChannel(); + } + } + + if (addrList != null && !addrList.isEmpty()) { + for (int i = 0; i < addrList.size(); i++) { + int index = this.namesrvIndex.incrementAndGet(); + index = Math.abs(index); + index = index % addrList.size(); + String newAddr = addrList.get(index); + + this.namesrvAddrChoosed.set(newAddr); + Channel channelNew = this.createChannel(newAddr); + if (channelNew != null) + return channelNew; + } + } + } catch (Exception e) { + log.error("getAndCreateNameserverChannel: create name server channel exception", e); + } finally { + this.lockNamesrvChannel.unlock(); + } + } else { + log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); + } + + return null; + } + + private Channel createChannel(final String addr) throws InterruptedException { + ChannelWrapper cw = this.channelTables.get(addr); + if (cw != null && cw.isOK()) { + return cw.getChannel(); + } + + + if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + boolean createNewConnection = false; + cw = this.channelTables.get(addr); + if (cw != null) { + + if (cw.isOK()) { + return cw.getChannel(); + } else if (!cw.getChannelFuture().isDone()) { + createNewConnection = false; + } else { + this.channelTables.remove(addr); + createNewConnection = true; + } + } else { + createNewConnection = true; + } + + if (createNewConnection) { + ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); + log.info("createChannel: begin to connect remote host[{}] asynchronously", addr); + cw = new ChannelWrapper(channelFuture); + this.channelTables.put(addr, cw); + } + } catch (Exception e) { + log.error("createChannel: create channel exception", e); + } finally { + this.lockChannelTables.unlock(); + } + } else { + log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); + } + + if (cw != null) { + ChannelFuture channelFuture = cw.getChannelFuture(); + if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { + if (cw.isOK()) { + log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); + return cw.getChannel(); + } else { + log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause()); + } + } else { + log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), + channelFuture.toString()); + } + } + + return null; + } + + @Override + public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) + throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, + RemotingSendRequestException { + final Channel channel = this.getAndCreateChannel(addr); + if (channel != null && channel.isActive()) { + try { + if (this.rpcHook != null) { + this.rpcHook.doBeforeRequest(addr, request); + } + this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback); + } catch (RemotingSendRequestException e) { + log.warn("invokeAsync: send request exception, so close the channel[{}]", addr); + this.closeChannel(addr, channel); + throw e; + } + } else { + this.closeChannel(addr, channel); + throw new RemotingConnectException(addr); + } + } + + @Override + public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, + RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + final Channel channel = this.getAndCreateChannel(addr); + if (channel != null && channel.isActive()) { + try { + if (this.rpcHook != null) { + this.rpcHook.doBeforeRequest(addr, request); + } + this.invokeOnewayImpl(channel, request, timeoutMillis); + } catch (RemotingSendRequestException e) { + log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); + this.closeChannel(addr, channel); + throw e; + } + } else { + this.closeChannel(addr, channel); + throw new RemotingConnectException(addr); + } + } + + @Override + public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) { + ExecutorService executorThis = executor; + if (null == executor) { + executorThis = this.publicExecutor; + } + + Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis); + this.processorTable.put(requestCode, pair); + } + + @Override + public boolean isChannelWriteable(String addr) { + ChannelWrapper cw = this.channelTables.get(addr); + if (cw != null && cw.isOK()) { + return cw.isWriteable(); + } + return true; + } + + @Override + public ChannelEventListener getChannelEventListener() { + return channelEventListener; + } + + @Override + public RPCHook getRPCHook() { + return this.rpcHook; + } + + @Override + public ExecutorService getCallbackExecutor() { + return this.publicExecutor; + } + + public List<String> getNamesrvAddrList() { + return namesrvAddrList.get(); + } + + public RPCHook getRpcHook() { + return rpcHook; + } + + static class ChannelWrapper { + private final ChannelFuture channelFuture; + + + public ChannelWrapper(ChannelFuture channelFuture) { + this.channelFuture = channelFuture; + } + + + public boolean isOK() { + return this.channelFuture.channel() != null && this.channelFuture.channel().isActive(); + } + + + public boolean isWriteable() { + return this.channelFuture.channel().isWritable(); + } + + + private Channel getChannel() { + return this.channelFuture.channel(); + } + + + public ChannelFuture getChannelFuture() { + return channelFuture; + } + } + + class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { + processMessageReceived(ctx, msg); + + } + } + + class NettyConnetManageHandler extends ChannelDuplexHandler { + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) + throws Exception { + final String local = localAddress == null ? "UNKNOW" : localAddress.toString(); + final String remote = remoteAddress == null ? "UNKNOW" : remoteAddress.toString(); + log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote); + super.connect(ctx, remoteAddress, localAddress, promise); + + if (NettyRemotingClient.this.channelEventListener != null) { + NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel())); + } + } + + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress); + closeChannel(ctx.channel()); + super.disconnect(ctx, promise); + + if (NettyRemotingClient.this.channelEventListener != null) { + NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel())); + } + } + + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress); + closeChannel(ctx.channel()); + super.close(ctx, promise); + + if (NettyRemotingClient.this.channelEventListener != null) { + NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel())); + } + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent evnet = (IdleStateEvent) evt; + if (evnet.state().equals(IdleState.ALL_IDLE)) { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress); + closeChannel(ctx.channel()); + if (NettyRemotingClient.this.channelEventListener != null) { + NettyRemotingClient.this + .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel())); + } + } + } + + ctx.fireUserEventTriggered(evt); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress); + log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause); + closeChannel(ctx.channel()); + if (NettyRemotingClient.this.channelEventListener != null) { + NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel())); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingServer.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingServer.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingServer.java new file mode 100644 index 0000000..a14947e --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingServer.java @@ -0,0 +1,384 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.netty; + +import com.alibaba.rocketmq.remoting.ChannelEventListener; +import com.alibaba.rocketmq.remoting.InvokeCallback; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.RemotingServer; +import com.alibaba.rocketmq.remoting.common.Pair; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + + +public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { + private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private final ServerBootstrap serverBootstrap; + private final EventLoopGroup eventLoopGroupSelector; + private final EventLoopGroup eventLoopGroupBoss; + private final NettyServerConfig nettyServerConfig; + + private final ExecutorService publicExecutor; + private final ChannelEventListener channelEventListener; + + private final Timer timer = new Timer("ServerHouseKeepingService", true); + private DefaultEventExecutorGroup defaultEventExecutorGroup; + + private RPCHook rpcHook; + + + private int port = 0; + + + public NettyRemotingServer(final NettyServerConfig nettyServerConfig) { + this(nettyServerConfig, null); + } + + + public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) { + super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); + this.serverBootstrap = new ServerBootstrap(); + this.nettyServerConfig = nettyServerConfig; + this.channelEventListener = channelEventListener; + + int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads(); + if (publicThreadNums <= 0) { + publicThreadNums = 4; + } + + this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet()); + } + }); + + this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet())); + } + }); + + if (RemotingUtil.isLinuxPlatform() // + && nettyServerConfig.isUseEpollNativeSelector()) { + this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + private int threadTotal = nettyServerConfig.getServerSelectorThreads(); + + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); + } + }); + } else { + this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + private int threadTotal = nettyServerConfig.getServerSelectorThreads(); + + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); + } + }); + } + } + + + @Override + public void start() { + this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(// + nettyServerConfig.getServerWorkerThreads(), // + new ThreadFactory() { + + private AtomicInteger threadIndex = new AtomicInteger(0); + + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); + } + }); + + ServerBootstrap childHandler = + this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 1024) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_KEEPALIVE, false) + .childOption(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) + .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) + .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + defaultEventExecutorGroup, + new NettyEncoder(), + new NettyDecoder(), + new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), + new NettyConnetManageHandler(), + new NettyServerHandler()); + } + }); + + if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { + childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + } + + try { + ChannelFuture sync = this.serverBootstrap.bind().sync(); + InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); + this.port = addr.getPort(); + } catch (InterruptedException e1) { + throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); + } + + if (this.channelEventListener != null) { + this.nettyEventExecuter.start(); + } + + this.timer.scheduleAtFixedRate(new TimerTask() { + + @Override + public void run() { + try { + NettyRemotingServer.this.scanResponseTable(); + } catch (Exception e) { + log.error("scanResponseTable exception", e); + } + } + }, 1000 * 3, 1000); + } + + @Override + public void shutdown() { + try { + if (this.timer != null) { + this.timer.cancel(); + } + + this.eventLoopGroupBoss.shutdownGracefully(); + + this.eventLoopGroupSelector.shutdownGracefully(); + + if (this.nettyEventExecuter != null) { + this.nettyEventExecuter.shutdown(); + } + + if (this.defaultEventExecutorGroup != null) { + this.defaultEventExecutorGroup.shutdownGracefully(); + } + } catch (Exception e) { + log.error("NettyRemotingServer shutdown exception, ", e); + } + + if (this.publicExecutor != null) { + try { + this.publicExecutor.shutdown(); + } catch (Exception e) { + log.error("NettyRemotingServer shutdown exception, ", e); + } + } + } + + @Override + public void registerRPCHook(RPCHook rpcHook) { + this.rpcHook = rpcHook; + } + + @Override + public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) { + ExecutorService executorThis = executor; + if (null == executor) { + executorThis = this.publicExecutor; + } + + Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis); + this.processorTable.put(requestCode, pair); + } + + @Override + public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) { + this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor); + } + + @Override + public int localListenPort() { + return this.port; + } + + @Override + public Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(int requestCode) { + return processorTable.get(requestCode); + } + + @Override + public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis) + throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { + return this.invokeSyncImpl(channel, request, timeoutMillis); + } + + @Override + public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) + throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback); + } + + @Override + public void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException, + RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + this.invokeOnewayImpl(channel, request, timeoutMillis); + } + + @Override + public ChannelEventListener getChannelEventListener() { + return channelEventListener; + } + + @Override + public RPCHook getRPCHook() { + return this.rpcHook; + } + + @Override + public ExecutorService getCallbackExecutor() { + return this.publicExecutor; + } + + class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { + processMessageReceived(ctx, msg); + } + } + + class NettyConnetManageHandler extends ChannelDuplexHandler { + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress); + super.channelRegistered(ctx); + } + + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress); + super.channelUnregistered(ctx); + } + + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress); + super.channelActive(ctx); + + if (NettyRemotingServer.this.channelEventListener != null) { + NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel())); + } + } + + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress); + super.channelInactive(ctx); + + if (NettyRemotingServer.this.channelEventListener != null) { + NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel())); + } + } + + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent evnet = (IdleStateEvent) evt; + if (evnet.state().equals(IdleState.ALL_IDLE)) { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress); + RemotingUtil.closeChannel(ctx.channel()); + if (NettyRemotingServer.this.channelEventListener != null) { + NettyRemotingServer.this + .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel())); + } + } + } + + ctx.fireUserEventTriggered(evt); + } + + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress); + log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause); + + if (NettyRemotingServer.this.channelEventListener != null) { + NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel())); + } + + RemotingUtil.closeChannel(ctx.channel()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRequestProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRequestProcessor.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRequestProcessor.java new file mode 100644 index 0000000..dae7f9e --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRequestProcessor.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.netty; + +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.ChannelHandlerContext; + + +/** + * Common remoting command processor + * + * @author shijia.wxr + * + */ +public interface NettyRequestProcessor { + RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) + throws Exception; + boolean rejectRequest(); +}
