This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 527ce132d [ISSUE #5185] Enhancement for http server and tcp server 
(#5187)
527ce132d is described below

commit 527ce132da41e7d84506f29113860f46022d00e4
Author: mike_xwm <[email protected]>
AuthorDate: Tue Mar 18 16:01:47 2025 +0800

    [ISSUE #5185] Enhancement for http server and tcp server (#5187)
---
 .../runtime/admin/handler/AbstractHttpHandler.java |  4 +-
 .../eventmesh/runtime/boot/AbstractHTTPServer.java | 17 +++---
 .../runtime/boot/AbstractRemotingServer.java       |  3 +-
 .../eventmesh/runtime/boot/AbstractTCPServer.java  |  2 +-
 .../runtime/boot/EventMeshAdminServer.java         | 11 ++--
 .../protocol/http/processor/HandlerService.java    | 30 ++++++-----
 .../protocol/tcp/client/EventMeshTcp2Client.java   | 16 +++---
 .../tcp/client/processor/HelloProcessor.java       | 22 ++++----
 .../client/processor/MessageTransferProcessor.java |  6 ++-
 .../tcp/client/session/push/SessionPusher.java     | 62 +++++++++++-----------
 .../org/apache/eventmesh/runtime/util/Utils.java   |  5 +-
 11 files changed, 101 insertions(+), 77 deletions(-)

diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java
index cdfe4e163..516960e7b 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java
@@ -95,7 +95,9 @@ public abstract class AbstractHttpHandler implements 
HttpHandler {
      * Use {@link HttpResponseUtils#buildHttpResponse} to build {@link 
HttpResponse} param.
      */
     protected void write(ChannelHandlerContext ctx, HttpResponse response) {
-        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+        ctx.channel().eventLoop().execute(() -> {
+            
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+        });
     }
 
     @Override
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
index 3fcca5183..6c0cb4742 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
@@ -245,16 +245,19 @@ public abstract class AbstractHTTPServer extends 
AbstractRemotingServer {
             HttpHeaderNames.CONTENT_TYPE, String.format("text/plain; 
charset=%s", EventMeshConstants.DEFAULT_CHARSET));
         responseHeaders.add(HttpHeaderNames.CONTENT_LENGTH, 
response.content().readableBytes());
         responseHeaders.add(HttpHeaderNames.CONNECTION, 
HttpHeaderValues.KEEP_ALIVE);
-
-        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+        ctx.channel().eventLoop().execute(() -> {
+            
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+        });
     }
 
     public void sendResponse(final ChannelHandlerContext ctx, final 
DefaultFullHttpResponse response) {
-        ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
-            if (!f.isSuccess()) {
-                log.warn("send response to [{}] fail, will close this 
channel", RemotingHelper.parseChannelRemoteAddr(f.channel()));
-                f.channel().close();
-            }
+        ctx.channel().eventLoop().execute(() -> {
+            ctx.writeAndFlush(response).addListener((ChannelFutureListener) f 
-> {
+                if (!f.isSuccess()) {
+                    log.warn("send response to [{}] fail, will close this 
channel", RemotingHelper.parseChannelRemoteAddr(f.channel()));
+                    f.channel().close();
+                }
+            });
         });
     }
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
index e02637ec3..e31004262 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
@@ -24,6 +24,7 @@ import 
org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
 
 import java.util.concurrent.TimeUnit;
 
+import io.netty.channel.DefaultEventLoopGroup;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
@@ -76,7 +77,7 @@ public abstract class AbstractRemotingServer implements 
RemotingServer {
     }
 
     private void buildWorkerGroup(final String threadPrefix) {
-        workerGroup = new NioEventLoopGroup(MAX_THREADS, new 
EventMeshThreadFactory(threadPrefix + "-worker"));
+        workerGroup = new DefaultEventLoopGroup(MAX_THREADS, new 
EventMeshThreadFactory(threadPrefix + "-worker"));
     }
 
     protected void initProducerManager() throws Exception {
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
index c9a464bf8..eb9793be6 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
@@ -311,7 +311,7 @@ public class AbstractTCPServer extends 
AbstractRemotingServer {
                 Package res = new Package();
                 res.setHeader(new Header(getReplyCommand(cmd), 
OPStatus.FAIL.getCode(), e.toString(),
                     pkg.getHeader().getSeq()));
-                ctx.writeAndFlush(res);
+                ctx.channel().eventLoop().execute(() -> 
ctx.writeAndFlush(res));
             } catch (Exception ex) {
                 log.warn("writeToClient failed", ex);
             }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java
index 5e98fc690..e7f2748cc 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java
@@ -170,11 +170,16 @@ public class EventMeshAdminServer extends 
AbstractHTTPServer {
                     httpHandlerOpt.get().handle(httpRequest, ctx);
                 } catch (Exception e) {
                     log.error("admin server channelRead error", e);
-                    
ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()),
 ctx,
-                        HttpHeaderValues.APPLICATION_JSON, 
HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
+                    ctx.channel().eventLoop().execute(() -> {
+                        
ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()),
 ctx,
+                            HttpHeaderValues.APPLICATION_JSON, 
HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
+                    });
                 }
             } else {
-                
ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE);
+                ctx.channel().eventLoop().execute(() -> {
+                        
ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE);
+                    }
+                );
             }
         }
     }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java
index 6ecf745fa..c2e20ba6a 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java
@@ -163,14 +163,16 @@ public class HandlerService {
      */
     private void sendPersistentResponse(ChannelHandlerContext ctx, HttpRequest 
httpRequest, HttpResponse response, boolean isClose) {
         ReferenceCountUtil.release(httpRequest);
-        ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
-            if (!f.isSuccess()) {
-                HTTP_LOGGER.warn("send response to [{}] fail, will close this 
channel",
-                    RemotingHelper.parseChannelRemoteAddr(f.channel()));
-                if (isClose) {
-                    f.channel().close();
+        ctx.channel().eventLoop().execute(() -> {
+            ctx.writeAndFlush(response).addListener((ChannelFutureListener) f 
-> {
+                if (!f.isSuccess()) {
+                    HTTP_LOGGER.warn("send response to [{}] fail, will close 
this channel",
+                        RemotingHelper.parseChannelRemoteAddr(f.channel()));
+                    if (isClose) {
+                        f.channel().close();
+                    }
                 }
-            }
+            });
         });
     }
 
@@ -179,12 +181,14 @@ public class HandlerService {
      */
     private void sendShortResponse(ChannelHandlerContext ctx, HttpRequest 
httpRequest, HttpResponse response) {
         ReferenceCountUtil.release(httpRequest);
-        ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
-            if (!f.isSuccess()) {
-                HTTP_LOGGER.warn("send response to [{}] with short-lived 
connection fail, will close this channel",
-                    RemotingHelper.parseChannelRemoteAddr(f.channel()));
-            }
-        }).addListener(ChannelFutureListener.CLOSE);
+        ctx.channel().eventLoop().execute(() -> {
+            ctx.writeAndFlush(response).addListener((ChannelFutureListener) f 
-> {
+                if (!f.isSuccess()) {
+                    HTTP_LOGGER.warn("send response to [{}] with short-lived 
connection fail, will close this channel",
+                        RemotingHelper.parseChannelRemoteAddr(f.channel()));
+                }
+            }).addListener(ChannelFutureListener.CLOSE);
+        });
     }
 
     private HttpEventWrapper parseHttpRequest(HttpRequest httpRequest) throws 
IOException {
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
index 6ef8e2bc1..c4acc8d81 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
@@ -95,13 +95,15 @@ public class EventMeshTcp2Client {
         Package pkg = new Package(new Header(SERVER_GOODBYE_REQUEST, 
OPStatus.FAIL.getCode(), errMsg, null));
         
eventMeshTcpMetricsManager.eventMesh2clientMsgNumIncrement(IPUtils.parseChannelRemoteAddr(ctx.channel()));
         log.info("goodBye2Client client[{}]", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-        ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> {
-            Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
-            try {
-                mapping.closeSession(ctx);
-            } catch (Exception e) {
-                log.warn("close session failed!", e);
-            }
+        ctx.channel().eventLoop().execute(() -> {
+            ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future 
-> {
+                Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
+                try {
+                    mapping.closeSession(ctx);
+                } catch (Exception e) {
+                    log.warn("close session failed!", e);
+                }
+            });
         });
     }
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/HelloProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/HelloProcessor.java
index a216561c5..2827f9f32 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/HelloProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/HelloProcessor.java
@@ -94,16 +94,18 @@ public class HelloProcessor implements TcpProcessor {
             MESSAGE_LOGGER.error("HelloTask failed|address={}", 
ctx.channel().remoteAddress(), e);
             res.setHeader(new Header(HELLO_RESPONSE, OPStatus.FAIL.getCode(), 
Arrays.toString(e.getStackTrace()), pkg
                 .getHeader().getSeq()));
-            ctx.writeAndFlush(res).addListener(
-                (ChannelFutureListener) future -> {
-                    if (!future.isSuccess()) {
-                        Utils.logFailedMessageFlow(future, res, user, 
startTime, taskExecuteTime);
-                    } else {
-                        Utils.logSucceedMessageFlow(res, user, startTime, 
taskExecuteTime);
-                    }
-                    log.warn("HelloTask failed,close session,addr:{}", 
ctx.channel().remoteAddress());
-                    
eventMeshTCPServer.getClientSessionGroupMapping().closeSession(ctx);
-                });
+            ctx.channel().eventLoop().execute(() -> {
+                ctx.writeAndFlush(res).addListener(
+                    (ChannelFutureListener) future -> {
+                        if (!future.isSuccess()) {
+                            Utils.logFailedMessageFlow(future, res, user, 
startTime, taskExecuteTime);
+                        } else {
+                            Utils.logSucceedMessageFlow(res, user, startTime, 
taskExecuteTime);
+                        }
+                        log.warn("HelloTask failed,close session,addr:{}", 
ctx.channel().remoteAddress());
+                        
eventMeshTCPServer.getClientSessionGroupMapping().closeSession(ctx);
+                    });
+            });
         }
     }
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
index ccbb98255..fdb7595c6 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
@@ -143,8 +143,10 @@ public class MessageTransferProcessor implements 
TcpProcessor {
                 .tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) {
 
                 msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), 
"Tps overload, global flow control", pkg.getHeader().getSeq()));
-                ctx.writeAndFlush(msg).addListener(
-                    (ChannelFutureListener) future -> 
Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, 
taskExecuteTime));
+                ctx.channel().eventLoop().execute(() -> {
+                    ctx.writeAndFlush(msg).addListener(
+                        (ChannelFutureListener) future -> 
Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, 
taskExecuteTime));
+                });
 
                 TraceUtils.finishSpanWithException(ctx, event, "Tps overload, 
global flow control", null);
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
index 22ebc5768..75a6e62d9 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
@@ -120,40 +120,42 @@ public class SessionPusher {
                 
EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_CLIENT_SPAN, false);
 
             try {
-                session.getContext().writeAndFlush(pkg).addListener(
-                    (ChannelFutureListener) future -> {
-                        if (!future.isSuccess()) {
-                            log.error("downstreamMsg fail,seq:{}, 
retryTimes:{}, event:{}", downStreamMsgContext.seq,
-                                downStreamMsgContext.retryTimes, 
downStreamMsgContext.event);
-                            deliverFailMsgsCount.incrementAndGet();
-
-                            // how long to isolate client when push fail
-                            long isolateTime = System.currentTimeMillis()
-                                + 
session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills();
-                            session.setIsolateTime(isolateTime);
-                            log.warn("isolate client:{},isolateTime:{}", 
session.getClient(), isolateTime);
-
-                            // retry
-                            long delayTime = SubscriptionType.SYNC == 
downStreamMsgContext.getSubscriptionItem().getType()
-                                ? 
session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills()
-                                : 
session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills();
-                            
Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer()
-                                .newTimeout(downStreamMsgContext, delayTime, 
TimeUnit.MILLISECONDS);
-                        } else {
-                            deliveredMsgsCount.incrementAndGet();
-                            log.info("downstreamMsg success,seq:{}, 
retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
-                                downStreamMsgContext.retryTimes, 
EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event));
-
-                            if (session.isIsolated()) {
-                                log.info("cancel isolated,client:{}", 
session.getClient());
-                                
session.setIsolateTime(System.currentTimeMillis());
+                Package finalPkg = pkg;
+                session.getContext().channel().eventLoop().execute(() -> {
+                    session.getContext().writeAndFlush(finalPkg).addListener(
+                        (ChannelFutureListener) future -> {
+                            if (!future.isSuccess()) {
+                                log.error("downstreamMsg fail,seq:{}, 
retryTimes:{}, event:{}", downStreamMsgContext.seq,
+                                    downStreamMsgContext.retryTimes, 
downStreamMsgContext.event);
+                                deliverFailMsgsCount.incrementAndGet();
+
+                                // how long to isolate client when push fail
+                                long isolateTime = System.currentTimeMillis()
+                                    + 
session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills();
+                                session.setIsolateTime(isolateTime);
+                                log.warn("isolate client:{},isolateTime:{}", 
session.getClient(), isolateTime);
+
+                                // retry
+                                long delayTime = SubscriptionType.SYNC == 
downStreamMsgContext.getSubscriptionItem().getType()
+                                    ? 
session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills()
+                                    : 
session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills();
+                                
Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer()
+                                    .newTimeout(downStreamMsgContext, 
delayTime, TimeUnit.MILLISECONDS);
+                            } else {
+                                deliveredMsgsCount.incrementAndGet();
+                                log.info("downstreamMsg success,seq:{}, 
retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
+                                    downStreamMsgContext.retryTimes, 
EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event));
+
+                                if (session.isIsolated()) {
+                                    log.info("cancel isolated,client:{}", 
session.getClient());
+                                    
session.setIsolateTime(System.currentTimeMillis());
+                                }
                             }
-                        }
-                    });
+                        });
+                });
             } finally {
                 TraceUtils.finishSpan(span, downStreamMsgContext.event);
             }
-
         }
     }
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java
index 63f5d232c..8f35c7de2 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java
@@ -64,8 +64,8 @@ public class Utils {
                     new Exception("the session has been closed"));
                 return;
             }
-            ctx.writeAndFlush(pkg).addListener(
-                (ChannelFutureListener) future -> {
+            ctx.channel().eventLoop().execute(() -> {
+                ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) 
future -> {
                     if (!future.isSuccess()) {
                         logFailedMessageFlow(future, pkg, user, startTime, 
taskExecuteTime);
                     } else {
@@ -77,6 +77,7 @@ public class Utils {
                         }
                     }
                 });
+            });
         } catch (Exception e) {
             log.error("exception while sending message to client", e);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to