This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
commit ce3bf56281667004421732c8df426b20721a1ef5 Author: baomingyu <[email protected]> AuthorDate: Sat Jul 30 21:32:53 2022 -0500 [INLONG-5097][TubeMQ] Keep the protocol compatible with previous versions (#5214) --- .../apache/inlong/tubemq/corerpc/RpcConstants.java | 3 - .../inlong/tubemq/corerpc/netty/NettyClient.java | 2 - .../tubemq/corerpc/netty/NettyClientFactory.java | 28 +------ .../tubemq/corerpc/netty/NettyProtocolDecoder.java | 95 +++++++++++++++------- .../tubemq/corerpc/netty/NettyProtocolEncoder.java | 15 ++-- .../tubemq/corerpc/netty/NettyRpcServer.java | 6 -- .../corerpc/netty/NettyProtocolEncoderTest.java | 1 - 7 files changed, 80 insertions(+), 70 deletions(-) diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java index 393863a0d..4bdbd4f36 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java @@ -43,7 +43,6 @@ public final class RpcConstants { public static final String NETTY_WRITE_HIGH_MARK = "rpc.netty.write.highmark"; public static final String NETTY_WRITE_LOW_MARK = "rpc.netty.write.lowmark"; public static final String NETTY_TCP_SENDBUF = "rpc.netty.send.buffer"; - public static final String NETTY_TCP_MAX_MESSAGE_SIZE = "rpc.netty.max.message.size"; public static final String NETTY_TCP_RECEIVEBUF = "rpc.netty.receive.buffer"; public static final String NETTY_TCP_ENABLEBUSYWAIT = "rpc.netty.enable.busy.wait"; @@ -133,6 +132,4 @@ public final class RpcConstants { public static final long CFG_UNAVAILABLE_FORBIDDEN_DURATION_MS = 50000; public static final long CFG_DEFAULT_NETTY_WRITEBUFFER_HIGH_MARK = 50 * 1024 * 1024; public static final long CFG_DEFAULT_NETTY_WRITEBUFFER_LOW_MARK = 5 * 1024 * 1024; - public static final int CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE = 5 * 1024 * 1024; - } diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java index 0faff9a05..30f330e75 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java @@ -285,9 +285,7 @@ public class NettyClient implements Client { */ @Override public void channelRead(ChannelHandlerContext ctx, Object e) { - logger.debug("client message receive!"); if (e instanceof RpcDataPack) { - logger.debug("RpcDataPack client message receive!"); RpcDataPack dataPack = (RpcDataPack) e; Callback callback = requests.remove(dataPack.getSerialNo()); if (callback != null) { diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java index eaa72721b..291a97137 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java @@ -25,15 +25,12 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.concurrent.DefaultThreadFactory; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -61,11 +58,8 @@ public class NettyClientFactory implements ClientFactory { new ConcurrentHashMap<>(); protected AtomicBoolean shutdown = new AtomicBoolean(true); private EventLoopGroup eventLoopGroup; - private ExecutorService bossExecutorService; - private ExecutorService workerExecutorService; private AtomicInteger workerIdCounter = new AtomicInteger(0); // TSL encryption and need Two Way Authentic - private int maxMessageSize; private boolean enableTLS = false; private boolean needTwoWayAuthentic = false; private String keyStorePath; @@ -87,8 +81,6 @@ public class NettyClientFactory implements ClientFactory { if (this.shutdown.compareAndSet(true, false)) { enableTLS = conf.getBoolean(RpcConstants.TLS_OVER_TCP, false); needTwoWayAuthentic = conf.getBoolean(RpcConstants.TLS_TWO_WAY_AUTHENTIC, false); - this.maxMessageSize = conf.getInt(RpcConstants.NETTY_TCP_MAX_MESSAGE_SIZE, - RpcConstants.CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE); if (enableTLS) { trustStorePath = conf.getString(RpcConstants.TLS_TRUSTSTORE_PATH); trustStorePassword = conf.getString(RpcConstants.TLS_TRUSTSTORE_PASSWORD); @@ -105,16 +97,9 @@ public class NettyClientFactory implements ClientFactory { trustStorePath = null; trustStorePassword = null; } - final int bossCount = - conf.getInt(RpcConstants.BOSS_COUNT, - RpcConstants.CFG_DEFAULT_BOSS_COUNT); final int workerCount = conf.getInt(RpcConstants.WORKER_COUNT, RpcConstants.CFG_DEFAULT_CLIENT_WORKER_COUNT); - final int callbackCount = - conf.getInt(RpcConstants.CALLBACK_WORKER_COUNT, 3); - bossExecutorService = Executors.newCachedThreadPool(); - workerExecutorService = Executors.newCachedThreadPool(); String threadName = new StringBuilder(256) .append(conf.getString(RpcConstants.WORKER_THREAD_NAME, RpcConstants.CFG_DEFAULT_WORKER_THREAD_NAME)) @@ -202,11 +187,8 @@ public class NettyClientFactory implements ClientFactory { } } } - if (this.bossExecutorService != null) { - this.bossExecutorService.shutdown(); - } - if (this.workerExecutorService != null) { - this.workerExecutorService.shutdown(); + if (this.eventLoopGroup != null && !eventLoopGroup.isShutdown()) { + this.eventLoopGroup.shutdownGracefully(); } } catch (Exception e) { logger.error("has exception ", e); @@ -248,7 +230,8 @@ public class NettyClientFactory implements ClientFactory { try { SSLEngine sslEngine = TSSLEngineUtil.createSSLEngine(keyStorePath, trustStorePath, - keyStorePassword, trustStorePassword, true, needTwoWayAuthentic); + keyStorePassword, trustStorePassword, true, + needTwoWayAuthentic); pipeline.addLast("ssl", new SslHandler(sslEngine)); } catch (Throwable t) { logger.error(new StringBuilder(256) @@ -257,9 +240,6 @@ public class NettyClientFactory implements ClientFactory { throw new Exception(t); } } - socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxMessageSize, - 0, 4, 0, 4)); - // Encode the data pipeline.addLast("protocolEncoder", new NettyProtocolEncoder()); // Decode the bytes into a Rpc Data Pack diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java index 238677076..9b88870c2 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java @@ -20,9 +20,11 @@ package org.apache.inlong.tubemq.corerpc.netty; import static org.apache.inlong.tubemq.corebase.utils.AddressUtils.getRemoteAddressIP; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.util.ReferenceCountUtil; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -43,50 +45,83 @@ public class NettyProtocolDecoder extends MessageToMessageDecoder<ByteBuf> { new ConcurrentHashMap<>(); private static AtomicLong lastProtolTime = new AtomicLong(0); private static AtomicLong lastSizeTime = new AtomicLong(0); + private boolean packHeaderRead = false; + private int listSize; + private List<RpcDataPack> rpcDataPackList = new ArrayList<>(); + private RpcDataPack dataPack; + private ByteBuf lastByteBuf; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { - if (buffer.readableBytes() < 12) { - logger.warn("Decode buffer.readableBytes() < 12 !"); - return; - } - int frameToken = buffer.readInt(); - filterIllegalPkgToken(frameToken, - RpcConstants.RPC_PROTOCOL_BEGIN_TOKEN, ctx.channel()); - int serialNo = buffer.readInt(); - int tmpListSize = buffer.readInt(); - filterIllegalPackageSize(true, tmpListSize, - RpcConstants.MAX_FRAME_MAX_LIST_SIZE, ctx.channel()); - RpcDataPack dataPack = new RpcDataPack(serialNo, new ArrayList<ByteBuffer>()); - // get PackBody - int i = 0; - while (i < tmpListSize) { - i++; + buffer = convertToNewBuf(buffer); + while (buffer.readableBytes() > 0) { + if (!packHeaderRead) { + if (buffer.readableBytes() < 12) { + saveRemainedByteBuf(buffer); + break; + } + int frameToken = buffer.readInt(); + filterIllegalPkgToken(frameToken, RpcConstants.RPC_PROTOCOL_BEGIN_TOKEN, ctx.channel()); + int serialNo = buffer.readInt(); + int tmpListSize = buffer.readInt(); + filterIllegalPackageSize(true, tmpListSize, + RpcConstants.MAX_FRAME_MAX_LIST_SIZE, ctx.channel()); + this.listSize = tmpListSize; + this.dataPack = new RpcDataPack(serialNo, new ArrayList<>(this.listSize)); + this.packHeaderRead = true; + } + // get PackBody if (buffer.readableBytes() < 4) { - logger.warn("Decode buffer.readableBytes() < 4 !"); + saveRemainedByteBuf(buffer); break; } buffer.markReaderIndex(); int length = buffer.readInt(); - filterIllegalPackageSize(false, length, - RpcConstants.RPC_MAX_BUFFER_SIZE, ctx.channel()); + if (buffer.readableBytes() < length) { + buffer.resetReaderIndex(); + saveRemainedByteBuf(buffer); + break; + } ByteBuffer bb = ByteBuffer.allocate(length); buffer.readBytes(bb); bb.flip(); dataPack.getDataLst().add(bb); + if (dataPack.getDataLst().size() == listSize) { + packHeaderRead = false; + rpcDataPackList.add(dataPack); + } } + if (rpcDataPackList.size() > 0) { + out.addAll(rpcDataPackList); + rpcDataPackList.clear(); + } + } + + private void saveRemainedByteBuf(ByteBuf byteBuf) { + if (byteBuf != null && byteBuf.readableBytes() > 0) { + lastByteBuf = Unpooled.copiedBuffer(byteBuf); + } + } - if (dataPack.getDataLst().size() == tmpListSize) { - out.add(dataPack); - } else { - logger.warn("Decode dataPack.getDataLst().size()[{}] != tmpListSize [{}] !", - dataPack.getDataLst().size(), tmpListSize); - return; + private ByteBuf convertToNewBuf(ByteBuf byteBuf) { + ByteBuf newByteBuf = byteBuf; + int totalReadBytes = byteBuf.readableBytes(); + if (lastByteBuf != null) { + try { + totalReadBytes += lastByteBuf.readableBytes(); + newByteBuf = Unpooled.buffer(totalReadBytes); + newByteBuf.writeBytes(lastByteBuf); + newByteBuf.writeBytes(byteBuf); + } finally { + ReferenceCountUtil.release(lastByteBuf); + } + lastByteBuf = null; } + return newByteBuf; } - private void filterIllegalPkgToken(int inParamValue, - int allowTokenVal, Channel channel) throws UnknownProtocolException { + private void filterIllegalPkgToken(int inParamValue, int allowTokenVal, + Channel channel) throws UnknownProtocolException { if (inParamValue != allowTokenVal) { String rmtaddrIp = getRemoteAddressIP(channel); if (rmtaddrIp != null) { @@ -103,7 +138,11 @@ public class NettyProtocolDecoder extends MessageToMessageDecoder<ByteBuf> { long curTime = System.currentTimeMillis(); if (curTime - befTime > 180000) { if (lastProtolTime.compareAndSet(befTime, System.currentTimeMillis())) { - logger.warn("[Abnormal Visit] OSS Tube visit list is :" + errProtolAddrMap.toString()); + logger.warn("[Abnormal Visit] OSS Tube [inParamValue = {} vs " + + "allowTokenVal = {}] visit " + + "list is : {}", + inParamValue, allowTokenVal, + errProtolAddrMap.toString()); errProtolAddrMap.clear(); } } diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java index fde7b523a..3f52d35bf 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java @@ -24,6 +24,7 @@ import io.netty.handler.codec.MessageToMessageEncoder; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.inlong.tubemq.corerpc.RpcConstants; @@ -38,19 +39,17 @@ public class NettyProtocolEncoder extends MessageToMessageEncoder<RpcDataPack> { @Override protected void encode(ChannelHandlerContext chx, RpcDataPack msg, List<Object> out) { RpcDataPack dataPack = msg; - List<ByteBuffer> origs = dataPack.getDataLst(); - ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); - try { + try (ByteArrayOutputStream byteOut = new ByteArrayOutputStream()) { byteOut.write(getPackHeader(dataPack).array()); + List<ByteBuffer> origs = dataPack.getDataLst(); Iterator<ByteBuffer> iter = origs.iterator(); while (iter.hasNext()) { ByteBuffer entry = iter.next(); byteOut.write(getLengthHeader(entry).array()); - byteOut.write(entry.array()); + byteOut.write(getLengthBody(entry)); } byte[] body = byteOut.toByteArray(); - ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(4 + body.length); - buf.writeInt(body.length); + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(body.length); buf.writeBytes(body); out.add(buf); } catch (IOException e) { @@ -73,4 +72,8 @@ public class NettyProtocolEncoder extends MessageToMessageEncoder<RpcDataPack> { header.flip(); return header; } + + private byte[] getLengthBody(ByteBuffer buf) { + return Arrays.copyOf(buf.array(), buf.limit()); + } } diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java index f21e3ea62..5f19d22e8 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java @@ -28,7 +28,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.DataOutputStream; @@ -82,7 +81,6 @@ public class NettyRpcServer implements ServiceRpcServer { private boolean needTwoWayAuthentic = false; private String trustStorePath = ""; private String trustStorePassword = ""; - private int maxMessageSize; /** * create a server with rpc config info @@ -115,8 +113,6 @@ public class NettyRpcServer implements ServiceRpcServer { } } this.enableBusyWait = conf.getBoolean(RpcConstants.NETTY_TCP_ENABLEBUSYWAIT, false); - this.maxMessageSize = conf.getInt(RpcConstants.NETTY_TCP_MAX_MESSAGE_SIZE, - RpcConstants.CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE); int bossCount = conf.getInt(RpcConstants.BOSS_COUNT, RpcConstants.CFG_DEFAULT_BOSS_COUNT); @@ -172,8 +168,6 @@ public class NettyRpcServer implements ServiceRpcServer { System.exit(1); } } - socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( - maxMessageSize, 0, 4, 0, 4)); // Encode the data handler socketChannel.pipeline().addLast("protocolEncoder", new NettyProtocolDecoder()); // Decode the bytes into a Rpc Data Pack diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java index 07b44a819..af61305e2 100644 --- a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java +++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java @@ -51,7 +51,6 @@ public class NettyProtocolEncoderTest { // read data. int i = buf.readInt(); i = buf.readInt(); - i = buf.readInt(); Assert.assertEquals(123, i); } catch (Exception e) { e.printStackTrace();
