Repository: incubator-rocketmq Updated Branches: refs/heads/rocketmq5 0b88e66fa -> 6593294f0
Minor polish Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/114b6ae0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/114b6ae0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/114b6ae0 Branch: refs/heads/rocketmq5 Commit: 114b6ae083ed5338e2b59a501ec08ec23c3e2ece Parents: 0b88e66 Author: yukon <yu...@apache.org> Authored: Wed Sep 20 17:06:04 2017 +0800 Committer: yukon <yu...@apache.org> Committed: Wed Sep 20 17:06:04 2017 +0800 ---------------------------------------------------------------------- .../remoting/api/buffer/ByteBufferWrapper.java | 28 ++++---- .../impl/buffer/NettyByteBufferWrapper.java | 73 ++++++++++---------- .../impl/netty/NettyRemotingAbstract.java | 16 +++-- .../remoting/impl/netty/handler/Decoder.java | 2 +- 4 files changed, 64 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/114b6ae0/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java index 7cae3ac..7360c88 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java @@ -20,37 +20,37 @@ package org.apache.rocketmq.remoting.api.buffer; import java.nio.ByteBuffer; public interface ByteBufferWrapper { - void writeByte(int index, byte data); - void writeByte(byte data); - byte readByte(); - - void writeInt(int data); + void writeByte(int index, byte data); void writeBytes(byte[] data); void writeBytes(ByteBuffer data); - int readableBytes(); + void writeInt(int data); - int readInt(); + void writeShort(short value); + + void writeLong(long id); + + byte readByte(); void readBytes(byte[] dst); void readBytes(ByteBuffer dst); - int readerIndex(); - - void setReaderIndex(int readerIndex); + short readShort(); - void writeLong(long id); + int readInt(); long readLong(); - void ensureCapacity(int capacity); + int readableBytes(); - short readShort(); + int readerIndex(); - void writeShort(short value); + void setReaderIndex(int readerIndex); + + void ensureCapacity(int capacity); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/114b6ae0/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java index e17bcfd..5a71452 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java @@ -18,39 +18,27 @@ package org.apache.rocketmq.remoting.impl.buffer; import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; import java.nio.ByteBuffer; import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper; public class NettyByteBufferWrapper implements ByteBufferWrapper { private final ByteBuf buffer; - private final Channel channel; public NettyByteBufferWrapper(ByteBuf buffer) { - this(buffer, null); - } - - public NettyByteBufferWrapper(ByteBuf buffer, Channel channel) { - this.channel = channel; this.buffer = buffer; } - public void writeByte(int index, byte data) { - buffer.writeByte(data); - } - + @Override public void writeByte(byte data) { buffer.writeByte(data); } - public byte readByte() { - return buffer.readByte(); - } - - public void writeInt(int data) { - buffer.writeInt(data); + @Override + public void writeByte(int index, byte data) { + buffer.writeByte(data); } + @Override public void writeBytes(byte[] data) { buffer.writeBytes(data); } @@ -60,16 +48,24 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper { buffer.writeBytes(data); } - public int readableBytes() { - return buffer.readableBytes(); + @Override + public void writeShort(final short value) { + buffer.writeShort(value); } - public int readInt() { - return buffer.readInt(); + @Override + public void writeInt(int data) { + buffer.writeInt(data); } - public void readBytes(byte[] dst) { - buffer.readBytes(dst); + @Override + public void writeLong(long value) { + buffer.writeLong(value); + } + + @Override + public byte readByte() { + return buffer.readByte(); } @Override @@ -77,17 +73,19 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper { buffer.readBytes(dst); } - public int readerIndex() { - return buffer.readerIndex(); + @Override + public void readBytes(byte[] dst) { + buffer.readBytes(dst); } - public void setReaderIndex(int index) { - buffer.setIndex(index, buffer.writerIndex()); + @Override + public short readShort() { + return buffer.readShort(); } @Override - public void writeLong(long value) { - buffer.writeLong(value); + public int readInt() { + return buffer.readInt(); } @Override @@ -96,18 +94,23 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper { } @Override - public void ensureCapacity(int capacity) { - buffer.capacity(capacity); + public int readableBytes() { + return buffer.readableBytes(); } @Override - public short readShort() { - return buffer.readShort(); + public int readerIndex() { + return buffer.readerIndex(); } @Override - public void writeShort(final short value) { - buffer.writeShort(value); + public void setReaderIndex(int index) { + buffer.setIndex(index, buffer.writerIndex()); + } + + @Override + public void ensureCapacity(int capacity) { + buffer.capacity(capacity); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/114b6ae0/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index 1af62cb..4c22e7c 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -77,7 +77,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { private final Semaphore semaphoreAsync; private final Map<Integer, ResponseResult> ackTables = new ConcurrentHashMap<Integer, ResponseResult>(256); private final Map<String, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<String, Pair<RequestProcessor, ExecutorService>>(); - private final AtomicLong count = new AtomicLong(0); + private final AtomicLong responseCounter = new AtomicLong(0); private final RemotingCommandFactory remotingCommandFactory; private final String remotingInstanceId = UIDGenerator.instance().createUID(); @@ -93,8 +93,13 @@ public abstract class NettyRemotingAbstract implements RemotingService { NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) { this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true); this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true); - this.publicExecutor = ThreadUtils.newThreadPoolExecutor(clientConfig.getClientAsyncCallbackExecutorThreads(), clientConfig.getClientAsyncCallbackExecutorThreads(), 60, - TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10000), "PublicExecutor", true); + this.publicExecutor = ThreadUtils.newThreadPoolExecutor( + clientConfig.getClientAsyncCallbackExecutorThreads(), + clientConfig.getClientAsyncCallbackExecutorThreads(), + 60, + TimeUnit.SECONDS, + new ArrayBlockingQueue<Runnable>(10000), + "PublicExecutor", true); this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta); } @@ -237,9 +242,10 @@ public abstract class NettyRemotingAbstract implements RemotingService { long time = System.currentTimeMillis(); ackTables.remove(cmd.requestID()); - if (count.incrementAndGet() % 5000 == 0) - LOG.warn("REQUEST ID:{}, cost time:{}, ackTables.size:{}", cmd.requestID(), time - responseResult.getBeginTimestamp(), + if (responseCounter.incrementAndGet() % 5000 == 0) { + LOG.info("REQUEST ID:{}, cost time:{}, ackTables.size:{}", cmd.requestID(), time - responseResult.getBeginTimestamp(), ackTables.size()); + } if (responseResult.getAsyncHandler() != null) { boolean sameThread = false; ExecutorService executor = this.getCallbackExecutor(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/114b6ae0/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java index 87a0912..ec1d69d 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java @@ -44,7 +44,7 @@ public class Decoder extends ByteToMessageDecoder { return; } - NettyByteBufferWrapper wrapper = new NettyByteBufferWrapper(in, ctx.channel()); + NettyByteBufferWrapper wrapper = new NettyByteBufferWrapper(in); Object msg = this.decode(ctx, wrapper); if (msg != null) {