This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 527ce132d [ISSUE #5185] Enhancement for http server and tcp server
(#5187)
527ce132d is described below
commit 527ce132da41e7d84506f29113860f46022d00e4
Author: mike_xwm <[email protected]>
AuthorDate: Tue Mar 18 16:01:47 2025 +0800
[ISSUE #5185] Enhancement for http server and tcp server (#5187)
---
.../runtime/admin/handler/AbstractHttpHandler.java | 4 +-
.../eventmesh/runtime/boot/AbstractHTTPServer.java | 17 +++---
.../runtime/boot/AbstractRemotingServer.java | 3 +-
.../eventmesh/runtime/boot/AbstractTCPServer.java | 2 +-
.../runtime/boot/EventMeshAdminServer.java | 11 ++--
.../protocol/http/processor/HandlerService.java | 30 ++++++-----
.../protocol/tcp/client/EventMeshTcp2Client.java | 16 +++---
.../tcp/client/processor/HelloProcessor.java | 22 ++++----
.../client/processor/MessageTransferProcessor.java | 6 ++-
.../tcp/client/session/push/SessionPusher.java | 62 +++++++++++-----------
.../org/apache/eventmesh/runtime/util/Utils.java | 5 +-
11 files changed, 101 insertions(+), 77 deletions(-)
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java
index cdfe4e163..516960e7b 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java
@@ -95,7 +95,9 @@ public abstract class AbstractHttpHandler implements
HttpHandler {
* Use {@link HttpResponseUtils#buildHttpResponse} to build {@link
HttpResponse} param.
*/
protected void write(ChannelHandlerContext ctx, HttpResponse response) {
- ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ ctx.channel().eventLoop().execute(() -> {
+
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ });
}
@Override
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
index 3fcca5183..6c0cb4742 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
@@ -245,16 +245,19 @@ public abstract class AbstractHTTPServer extends
AbstractRemotingServer {
HttpHeaderNames.CONTENT_TYPE, String.format("text/plain;
charset=%s", EventMeshConstants.DEFAULT_CHARSET));
responseHeaders.add(HttpHeaderNames.CONTENT_LENGTH,
response.content().readableBytes());
responseHeaders.add(HttpHeaderNames.CONNECTION,
HttpHeaderValues.KEEP_ALIVE);
-
- ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ ctx.channel().eventLoop().execute(() -> {
+
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ });
}
public void sendResponse(final ChannelHandlerContext ctx, final
DefaultFullHttpResponse response) {
- ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
- if (!f.isSuccess()) {
- log.warn("send response to [{}] fail, will close this
channel", RemotingHelper.parseChannelRemoteAddr(f.channel()));
- f.channel().close();
- }
+ ctx.channel().eventLoop().execute(() -> {
+ ctx.writeAndFlush(response).addListener((ChannelFutureListener) f
-> {
+ if (!f.isSuccess()) {
+ log.warn("send response to [{}] fail, will close this
channel", RemotingHelper.parseChannelRemoteAddr(f.channel()));
+ f.channel().close();
+ }
+ });
});
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
index e02637ec3..e31004262 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
@@ -24,6 +24,7 @@ import
org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
import java.util.concurrent.TimeUnit;
+import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
@@ -76,7 +77,7 @@ public abstract class AbstractRemotingServer implements
RemotingServer {
}
private void buildWorkerGroup(final String threadPrefix) {
- workerGroup = new NioEventLoopGroup(MAX_THREADS, new
EventMeshThreadFactory(threadPrefix + "-worker"));
+ workerGroup = new DefaultEventLoopGroup(MAX_THREADS, new
EventMeshThreadFactory(threadPrefix + "-worker"));
}
protected void initProducerManager() throws Exception {
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
index c9a464bf8..eb9793be6 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
@@ -311,7 +311,7 @@ public class AbstractTCPServer extends
AbstractRemotingServer {
Package res = new Package();
res.setHeader(new Header(getReplyCommand(cmd),
OPStatus.FAIL.getCode(), e.toString(),
pkg.getHeader().getSeq()));
- ctx.writeAndFlush(res);
+ ctx.channel().eventLoop().execute(() ->
ctx.writeAndFlush(res));
} catch (Exception ex) {
log.warn("writeToClient failed", ex);
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java
index 5e98fc690..e7f2748cc 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java
@@ -170,11 +170,16 @@ public class EventMeshAdminServer extends
AbstractHTTPServer {
httpHandlerOpt.get().handle(httpRequest, ctx);
} catch (Exception e) {
log.error("admin server channelRead error", e);
-
ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()),
ctx,
- HttpHeaderValues.APPLICATION_JSON,
HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
+ ctx.channel().eventLoop().execute(() -> {
+
ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()),
ctx,
+ HttpHeaderValues.APPLICATION_JSON,
HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
+ });
}
} else {
-
ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE);
+ ctx.channel().eventLoop().execute(() -> {
+
ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE);
+ }
+ );
}
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java
index 6ecf745fa..c2e20ba6a 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java
@@ -163,14 +163,16 @@ public class HandlerService {
*/
private void sendPersistentResponse(ChannelHandlerContext ctx, HttpRequest
httpRequest, HttpResponse response, boolean isClose) {
ReferenceCountUtil.release(httpRequest);
- ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
- if (!f.isSuccess()) {
- HTTP_LOGGER.warn("send response to [{}] fail, will close this
channel",
- RemotingHelper.parseChannelRemoteAddr(f.channel()));
- if (isClose) {
- f.channel().close();
+ ctx.channel().eventLoop().execute(() -> {
+ ctx.writeAndFlush(response).addListener((ChannelFutureListener) f
-> {
+ if (!f.isSuccess()) {
+ HTTP_LOGGER.warn("send response to [{}] fail, will close
this channel",
+ RemotingHelper.parseChannelRemoteAddr(f.channel()));
+ if (isClose) {
+ f.channel().close();
+ }
}
- }
+ });
});
}
@@ -179,12 +181,14 @@ public class HandlerService {
*/
private void sendShortResponse(ChannelHandlerContext ctx, HttpRequest
httpRequest, HttpResponse response) {
ReferenceCountUtil.release(httpRequest);
- ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
- if (!f.isSuccess()) {
- HTTP_LOGGER.warn("send response to [{}] with short-lived
connection fail, will close this channel",
- RemotingHelper.parseChannelRemoteAddr(f.channel()));
- }
- }).addListener(ChannelFutureListener.CLOSE);
+ ctx.channel().eventLoop().execute(() -> {
+ ctx.writeAndFlush(response).addListener((ChannelFutureListener) f
-> {
+ if (!f.isSuccess()) {
+ HTTP_LOGGER.warn("send response to [{}] with short-lived
connection fail, will close this channel",
+ RemotingHelper.parseChannelRemoteAddr(f.channel()));
+ }
+ }).addListener(ChannelFutureListener.CLOSE);
+ });
}
private HttpEventWrapper parseHttpRequest(HttpRequest httpRequest) throws
IOException {
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
index 6ef8e2bc1..c4acc8d81 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
@@ -95,13 +95,15 @@ public class EventMeshTcp2Client {
Package pkg = new Package(new Header(SERVER_GOODBYE_REQUEST,
OPStatus.FAIL.getCode(), errMsg, null));
eventMeshTcpMetricsManager.eventMesh2clientMsgNumIncrement(IPUtils.parseChannelRemoteAddr(ctx.channel()));
log.info("goodBye2Client client[{}]",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> {
- Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
- try {
- mapping.closeSession(ctx);
- } catch (Exception e) {
- log.warn("close session failed!", e);
- }
+ ctx.channel().eventLoop().execute(() -> {
+ ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future
-> {
+ Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
+ try {
+ mapping.closeSession(ctx);
+ } catch (Exception e) {
+ log.warn("close session failed!", e);
+ }
+ });
});
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/HelloProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/HelloProcessor.java
index a216561c5..2827f9f32 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/HelloProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/HelloProcessor.java
@@ -94,16 +94,18 @@ public class HelloProcessor implements TcpProcessor {
MESSAGE_LOGGER.error("HelloTask failed|address={}",
ctx.channel().remoteAddress(), e);
res.setHeader(new Header(HELLO_RESPONSE, OPStatus.FAIL.getCode(),
Arrays.toString(e.getStackTrace()), pkg
.getHeader().getSeq()));
- ctx.writeAndFlush(res).addListener(
- (ChannelFutureListener) future -> {
- if (!future.isSuccess()) {
- Utils.logFailedMessageFlow(future, res, user,
startTime, taskExecuteTime);
- } else {
- Utils.logSucceedMessageFlow(res, user, startTime,
taskExecuteTime);
- }
- log.warn("HelloTask failed,close session,addr:{}",
ctx.channel().remoteAddress());
-
eventMeshTCPServer.getClientSessionGroupMapping().closeSession(ctx);
- });
+ ctx.channel().eventLoop().execute(() -> {
+ ctx.writeAndFlush(res).addListener(
+ (ChannelFutureListener) future -> {
+ if (!future.isSuccess()) {
+ Utils.logFailedMessageFlow(future, res, user,
startTime, taskExecuteTime);
+ } else {
+ Utils.logSucceedMessageFlow(res, user, startTime,
taskExecuteTime);
+ }
+ log.warn("HelloTask failed,close session,addr:{}",
ctx.channel().remoteAddress());
+
eventMeshTCPServer.getClientSessionGroupMapping().closeSession(ctx);
+ });
+ });
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
index ccbb98255..fdb7595c6 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
@@ -143,8 +143,10 @@ public class MessageTransferProcessor implements
TcpProcessor {
.tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) {
msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(),
"Tps overload, global flow control", pkg.getHeader().getSeq()));
- ctx.writeAndFlush(msg).addListener(
- (ChannelFutureListener) future ->
Utils.logSucceedMessageFlow(msg, session.getClient(), startTime,
taskExecuteTime));
+ ctx.channel().eventLoop().execute(() -> {
+ ctx.writeAndFlush(msg).addListener(
+ (ChannelFutureListener) future ->
Utils.logSucceedMessageFlow(msg, session.getClient(), startTime,
taskExecuteTime));
+ });
TraceUtils.finishSpanWithException(ctx, event, "Tps overload,
global flow control", null);
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
index 22ebc5768..75a6e62d9 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
@@ -120,40 +120,42 @@ public class SessionPusher {
EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_CLIENT_SPAN, false);
try {
- session.getContext().writeAndFlush(pkg).addListener(
- (ChannelFutureListener) future -> {
- if (!future.isSuccess()) {
- log.error("downstreamMsg fail,seq:{},
retryTimes:{}, event:{}", downStreamMsgContext.seq,
- downStreamMsgContext.retryTimes,
downStreamMsgContext.event);
- deliverFailMsgsCount.incrementAndGet();
-
- // how long to isolate client when push fail
- long isolateTime = System.currentTimeMillis()
- +
session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills();
- session.setIsolateTime(isolateTime);
- log.warn("isolate client:{},isolateTime:{}",
session.getClient(), isolateTime);
-
- // retry
- long delayTime = SubscriptionType.SYNC ==
downStreamMsgContext.getSubscriptionItem().getType()
- ?
session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills()
- :
session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills();
-
Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer()
- .newTimeout(downStreamMsgContext, delayTime,
TimeUnit.MILLISECONDS);
- } else {
- deliveredMsgsCount.incrementAndGet();
- log.info("downstreamMsg success,seq:{},
retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
- downStreamMsgContext.retryTimes,
EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event));
-
- if (session.isIsolated()) {
- log.info("cancel isolated,client:{}",
session.getClient());
-
session.setIsolateTime(System.currentTimeMillis());
+ Package finalPkg = pkg;
+ session.getContext().channel().eventLoop().execute(() -> {
+ session.getContext().writeAndFlush(finalPkg).addListener(
+ (ChannelFutureListener) future -> {
+ if (!future.isSuccess()) {
+ log.error("downstreamMsg fail,seq:{},
retryTimes:{}, event:{}", downStreamMsgContext.seq,
+ downStreamMsgContext.retryTimes,
downStreamMsgContext.event);
+ deliverFailMsgsCount.incrementAndGet();
+
+ // how long to isolate client when push fail
+ long isolateTime = System.currentTimeMillis()
+ +
session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills();
+ session.setIsolateTime(isolateTime);
+ log.warn("isolate client:{},isolateTime:{}",
session.getClient(), isolateTime);
+
+ // retry
+ long delayTime = SubscriptionType.SYNC ==
downStreamMsgContext.getSubscriptionItem().getType()
+ ?
session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills()
+ :
session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills();
+
Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer()
+ .newTimeout(downStreamMsgContext,
delayTime, TimeUnit.MILLISECONDS);
+ } else {
+ deliveredMsgsCount.incrementAndGet();
+ log.info("downstreamMsg success,seq:{},
retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
+ downStreamMsgContext.retryTimes,
EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event));
+
+ if (session.isIsolated()) {
+ log.info("cancel isolated,client:{}",
session.getClient());
+
session.setIsolateTime(System.currentTimeMillis());
+ }
}
- }
- });
+ });
+ });
} finally {
TraceUtils.finishSpan(span, downStreamMsgContext.event);
}
-
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java
index 63f5d232c..8f35c7de2 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java
@@ -64,8 +64,8 @@ public class Utils {
new Exception("the session has been closed"));
return;
}
- ctx.writeAndFlush(pkg).addListener(
- (ChannelFutureListener) future -> {
+ ctx.channel().eventLoop().execute(() -> {
+ ctx.writeAndFlush(pkg).addListener((ChannelFutureListener)
future -> {
if (!future.isSuccess()) {
logFailedMessageFlow(future, pkg, user, startTime,
taskExecuteTime);
} else {
@@ -77,6 +77,7 @@ public class Utils {
}
}
});
+ });
} catch (Exception e) {
log.error("exception while sending message to client", e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]