xwm1992 commented on code in PR #4794: URL: https://github.com/apache/eventmesh/pull/4794#discussion_r1538516651
########## eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.boot; + + +import org.apache.eventmesh.common.config.CommonConfiguration; +import org.apache.eventmesh.runtime.acl.Acl; +import org.apache.eventmesh.runtime.configuration.EventMeshMQTTConfiguration; +import org.apache.eventmesh.runtime.core.protocol.mqtt.exception.MqttException; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.ClientConnectProcessor; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.ClientDisConnectProcessor; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.HealthCheckProcessor; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.MqttProcessor; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.PublishProcessor; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.SubscrubeProcessor; +import org.apache.eventmesh.runtime.core.protocol.mqtt.processor.UnSubscrubeProcessor; +import org.apache.eventmesh.runtime.meta.MetaStorage; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttDecoder; +import io.netty.handler.codec.mqtt.MqttEncoder; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttIdentifierRejectedException; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageFactory; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttUnacceptableProtocolVersionException; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class EventMeshMQTTServer extends AbstractRemotingServer { + + private final EventMeshMQTTConfiguration eventMeshMQTTConfiguration; + + private final EventMeshServer eventMeshServer; + + private final MetaStorage metaStorage; + + private final Acl acl; + + + protected final transient Map<MqttMessageType, MqttProcessor> processorTable = + new ConcurrentHashMap<>(64); + + private final transient AtomicBoolean started = new AtomicBoolean(false); + + + public EventMeshMQTTServer(final EventMeshServer eventMeshServer, final EventMeshMQTTConfiguration eventMeshMQTTConfiguration) { + this.eventMeshServer = eventMeshServer; + this.eventMeshMQTTConfiguration = eventMeshMQTTConfiguration; + this.metaStorage = eventMeshServer.getMetaStorage(); + this.acl = eventMeshServer.getAcl(); + } + + @Override + public void init() throws Exception { + log.info("==================EventMeshMQTTServer Initialing=================="); + super.init("eventMesh-mqtt"); + registerMQTTProcessor(); + + } + + private void registerMQTTProcessor() { + processorTable.putIfAbsent(MqttMessageType.CONNECT, new ClientConnectProcessor(this, getWorkerGroup())); + processorTable.putIfAbsent(MqttMessageType.DISCONNECT, new ClientDisConnectProcessor(this, getWorkerGroup())); + processorTable.putIfAbsent(MqttMessageType.PINGREQ, new HealthCheckProcessor(this, getWorkerGroup())); + processorTable.putIfAbsent(MqttMessageType.SUBSCRIBE, new SubscrubeProcessor(this, getWorkerGroup())); + processorTable.putIfAbsent(MqttMessageType.UNSUBSCRIBE, new UnSubscrubeProcessor(this, getWorkerGroup())); + processorTable.putIfAbsent(MqttMessageType.PUBLISH, new PublishProcessor(this, getWorkerGroup())); + } + + + @Override + public void start() throws Exception { + Thread thread = new Thread(() -> { + final ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(this.getBossGroup(), this.getIoGroup()) + .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class); + bootstrap.option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_BACKLOG, 1024) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_RCVBUF, 10485760); + + bootstrap.childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bootstrap.childHandler(new MQTTServerInitializer()); + + try { + int port = eventMeshMQTTConfiguration.getEventMeshTcpServerPort(); + ChannelFuture f = bootstrap.bind(port).sync(); + log.info("EventMeshMQTTServer[port={}] started.....", port); + f.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("EventMeshMQTTServer RemotingServer Start Err!", e); + try { + shutdown(); + } catch (Exception ex) { + log.error("EventMeshMQTTServer RemotingServer shutdown Err!", ex); + } + System.exit(-1); Review Comment: 我理解这里的必须启动成功实际上是受是否开启MQTT协议的配置控制的吧?这里的退出我认为没有问题,如果MQTT协议加载有问题退出了,那其实可以在配置中移除MQTT协议,保证TCP、HTTP等协议正常启动服务就好。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org For additional commands, e-mail: issues-h...@eventmesh.apache.org