lrhkobe commented on code in PR #4333: URL: https://github.com/apache/eventmesh/pull/4333#discussion_r1307024781
########## eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java: ########## @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.boot; + +import org.apache.eventmesh.common.protocol.tcp.Command; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; +import org.apache.eventmesh.common.protocol.tcp.Header; +import org.apache.eventmesh.common.protocol.tcp.OPStatus; +import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.common.protocol.tcp.codec.Codec; +import org.apache.eventmesh.common.utils.AssertUtils; +import org.apache.eventmesh.runtime.common.Pair; +import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration; +import org.apache.eventmesh.runtime.constants.EventMeshConstants; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.TcpProcessor; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.SessionState; +import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor; +import org.apache.eventmesh.runtime.util.EventMeshUtil; +import org.apache.eventmesh.runtime.util.RemotingHelper; +import org.apache.eventmesh.runtime.util.TraceUtils; +import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.AdaptiveRecvByteBufAllocator; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.handler.traffic.ChannelTrafficShapingHandler; +import io.netty.handler.traffic.GlobalTrafficShapingHandler; +import io.opentelemetry.api.trace.Span; + +import lombok.extern.slf4j.Slf4j; + +/** + * TCP serves as the runtime module server for the protocol + * + */ +@Slf4j +public class AbstractTCPServer extends AbstractRemotingServer { + + private final EventMeshTCPConfiguration eventMeshTCPConfiguration; + private ClientSessionGroupMapping clientSessionGroupMapping; + + private EventMeshTcpMonitor eventMeshTcpMonitor; + + private transient GlobalTrafficShapingHandler globalTrafficShapingHandler; + private TcpConnectionHandler tcpConnectionHandler; + private TcpDispatcher tcpDispatcher; + + private final Map<Command, Pair<TcpProcessor, ThreadPoolExecutor>> tcpRequestProcessorTable = + new ConcurrentHashMap<>(64); + + private final transient AtomicBoolean started = new AtomicBoolean(false); + + private final TCPThreadPoolGroup tcpThreadPoolGroup; + + public AbstractTCPServer(EventMeshTCPConfiguration eventMeshTCPConfiguration) { + this.eventMeshTCPConfiguration = eventMeshTCPConfiguration; + this.tcpThreadPoolGroup = new TCPThreadPoolGroup(eventMeshTCPConfiguration); + } + + private void initSharableHandlers() { + tcpConnectionHandler = new TcpConnectionHandler(); + tcpDispatcher = new TcpDispatcher(); + } + + public void init() throws Exception { + super.init("eventMesh-tcp"); + tcpThreadPoolGroup.initThreadPool(); + } + + @Override + public void start() throws Exception { + initSharableHandlers(); + + Thread thread = new Thread(() -> { + final ServerBootstrap bootstrap = new ServerBootstrap(); + + bootstrap.group(this.getBossGroup(), this.getIoGroup()) + .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 128) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) + .childOption(ChannelOption.SO_KEEPALIVE, false) + .childOption(ChannelOption.SO_LINGER, 0) + .childOption(ChannelOption.SO_TIMEOUT, 600_000) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_SNDBUF, 65_535 * 4) + .childOption(ChannelOption.SO_RCVBUF, 65_535 * 4) + .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(2_048, 4_096, 65_536)) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childHandler(new TcpServerInitializer()); + + try { + int port = eventMeshTCPConfiguration.getEventMeshTcpServerPort(); + ChannelFuture f = bootstrap.bind(port).sync(); + log.info("EventMeshTCPServer[port={}] started.....", port); + f.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("EventMeshTCPServer RemotingServer Start Err!", e); + try { + shutdown(); + } catch (Exception ex) { + log.error("EventMeshTCPServer RemotingServer shutdown Err!", ex); + } + System.exit(-1); + } + }, "eventMesh-tcp-server"); + thread.start(); + + started.compareAndSet(false, true); + + } + + + @Override + public void shutdown() throws Exception { + super.shutdown(); + tcpThreadPoolGroup.shutdownThreadPool(); + globalTrafficShapingHandler.release(); + started.compareAndSet(true, false); + } + + + /** + * Registers the processors required by the runtime module + * + */ + public void registerProcessor(final Command command, final TcpProcessor processor, + final ThreadPoolExecutor executor) { + AssertUtils.notNull(command, "command can't be null"); + AssertUtils.notNull(processor, "processor can't be null"); + AssertUtils.notNull(executor, "executor can't be null"); + this.tcpRequestProcessorTable.put(command, new Pair<>(processor, executor)); + } + + + private class TcpServerInitializer extends ChannelInitializer<SocketChannel> { + + @Override + protected void initChannel(SocketChannel ch) { + globalTrafficShapingHandler = newGTSHandler(tcpThreadPoolGroup.getScheduler(), eventMeshTCPConfiguration.getCtc().getReadLimit()); + ch.pipeline() + .addLast(getWorkerGroup(), new Codec.Encoder()) + .addLast(getWorkerGroup(), new Codec.Decoder()) + .addLast(getWorkerGroup(), "global-traffic-shaping", globalTrafficShapingHandler) + .addLast(getWorkerGroup(), "channel-traffic-shaping", newCTSHandler(eventMeshTCPConfiguration.getCtc().getReadLimit())) + .addLast(getWorkerGroup(), tcpConnectionHandler) + .addLast(getWorkerGroup(), + new IdleStateHandler( + eventMeshTCPConfiguration.getEventMeshTcpIdleReadSeconds(), + eventMeshTCPConfiguration.getEventMeshTcpIdleWriteSeconds(), + eventMeshTCPConfiguration.getEventMeshTcpIdleAllSeconds()), + new TcpDispatcher()); + } + + private GlobalTrafficShapingHandler newGTSHandler(final ScheduledExecutorService executor, final long readLimit) { + GlobalTrafficShapingHandler handler = new GlobalTrafficShapingHandler(executor, 0, readLimit) { + @Override + protected long calculateSize(final Object msg) { + return 1; + } + }; + handler.setMaxTimeWait(1_000); + return handler; + } + + private ChannelTrafficShapingHandler newCTSHandler(final long readLimit) { + ChannelTrafficShapingHandler handler = new ChannelTrafficShapingHandler(0, readLimit) { + @Override + protected long calculateSize(final Object msg) { + return 1; + } + }; + handler.setMaxTimeWait(3_000); + return handler; + } + + } + + @Sharable + private class TcpDispatcher extends SimpleChannelInboundHandler<Package> { + + private final Logger messageLogger = LoggerFactory.getLogger(EventMeshConstants.MESSAGE); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Exception { + long startTime = System.currentTimeMillis(); + validateMsg(pkg); + + eventMeshTcpMonitor.getTcpSummaryMetrics().getClient2eventMeshMsgNum().incrementAndGet(); + + Command cmd = pkg.getHeader().getCmd(); + try { + if (isNeedTrace(cmd)) { + pkg.getHeader().getProperties() + .put(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, startTime); + pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SEND_EVENTMESH_IP, + eventMeshTCPConfiguration.getEventMeshServerIp()); + Session session = clientSessionGroupMapping.getSession(ctx); + + pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SYS, session.getClient().getSubsystem()); + pkg.getHeader().getProperties().put(EventMeshConstants.REQ_IP, session.getClient().getHost()); + pkg.getHeader().getProperties().put(EventMeshConstants.REQ_IDC, session.getClient().getIdc()); + pkg.getHeader().getProperties().put(EventMeshConstants.REQ_GROUP, session.getClient().getGroup()); + } + + if (Command.HELLO_REQUEST == cmd || Command.RECOMMEND_REQUEST == cmd) { + if (messageLogger.isInfoEnabled()) { + messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg); + } + processHttpCommandRequest(pkg, ctx, startTime, cmd); + return; + } + + if (clientSessionGroupMapping.getSession(ctx) == null) { + if (messageLogger.isInfoEnabled()) { + messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={},no session is found", cmd, pkg); + } + throw new Exception("no session is found"); + } + + logMessageFlow(ctx, pkg, cmd); + + if (clientSessionGroupMapping.getSession(ctx) + .getSessionState() == SessionState.CLOSED) { + throw new Exception( + "this eventMesh tcp session will be closed, may be reboot or version change!"); + } + + processHttpCommandRequest(pkg, ctx, startTime, cmd); Review Comment: The method name should be changed to `processTcpCommandRequest` would be better . @harshithasudhakar -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
