This is an automated email from the ASF dual-hosted git repository.
pandaapo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 4e72b3c8c [ISSUE #3399] replace anonymous inner class with lambda
(#4575)
4e72b3c8c is described below
commit 4e72b3c8c5f0434ffa144a2a364361d93582fcf0
Author: wizardzhang <[email protected]>
AuthorDate: Fri Nov 24 08:01:38 2023 +0800
[ISSUE #3399] replace anonymous inner class with lambda (#4575)
---
.../protocol/tcp/client/EventMeshTcp2Client.java | 65 ++++++++--------------
1 file changed, 22 insertions(+), 43 deletions(-)
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
index ecca43476..fdde4aeb6 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
@@ -35,7 +35,6 @@ import org.apache.eventmesh.runtime.util.Utils;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
@@ -54,13 +53,9 @@ public class EventMeshTcp2Client {
msg.setHeader(new Header(SERVER_GOODBYE_REQUEST,
OPStatus.SUCCESS.getCode(),
"graceful normal quit from eventmesh", null));
- tcpThreadPoolGroup.getScheduler().submit(new Runnable() {
-
- @Override
- public void run() {
- long taskExecuteTime = System.currentTimeMillis();
- Utils.writeAndFlush(msg, startTime, taskExecuteTime,
session.getContext(), session);
- }
+ tcpThreadPoolGroup.getScheduler().submit(() -> {
+ long taskExecuteTime = System.currentTimeMillis();
+ Utils.writeAndFlush(msg, startTime, taskExecuteTime,
session.getContext(), session);
});
InetSocketAddress address = (InetSocketAddress)
session.getContext().channel().remoteAddress();
@@ -79,13 +74,9 @@ public class EventMeshTcp2Client {
long startTime = System.currentTimeMillis();
Package msg = new Package();
msg.setHeader(new Header(SERVER_GOODBYE_REQUEST, eventMeshStatus,
errMsg, null));
- tcpThreadPoolGroup.getScheduler().schedule(new Runnable() {
-
- @Override
- public void run() {
- long taskExecuteTime = System.currentTimeMillis();
- Utils.writeAndFlush(msg, startTime, taskExecuteTime,
session.getContext(), session);
- }
+ tcpThreadPoolGroup.getScheduler().schedule(() -> {
+ long taskExecuteTime = System.currentTimeMillis();
+ Utils.writeAndFlush(msg, startTime, taskExecuteTime,
session.getContext(), session);
}, 1 * 1000, TimeUnit.MILLISECONDS);
closeSessionIfTimeout(tcpThreadPoolGroup, session, mapping);
@@ -103,16 +94,12 @@ public class EventMeshTcp2Client {
Package pkg = new Package(new Header(SERVER_GOODBYE_REQUEST,
OPStatus.FAIL.getCode(), errMsg, null));
eventMeshTcpMonitor.getTcpSummaryMetrics().getEventMesh2clientMsgNum().incrementAndGet();
log.info("goodBye2Client client[{}]",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- ctx.writeAndFlush(pkg).addListener(new ChannelFutureListener() {
-
- @Override
- public void operationComplete(ChannelFuture future) throws
Exception {
- Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
- try {
- mapping.closeSession(ctx);
- } catch (Exception e) {
- log.warn("close session failed!", e);
- }
+ ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> {
+ Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
+ try {
+ mapping.closeSession(ctx);
+ } catch (Exception e) {
+ log.warn("close session failed!", e);
}
});
}
@@ -127,13 +114,9 @@ public class EventMeshTcp2Client {
Package pkg = new Package();
pkg.setHeader(new Header(REDIRECT_TO_CLIENT,
OPStatus.SUCCESS.getCode(), null, null));
pkg.setBody(new RedirectInfo(newEventMeshIp, port));
- tcpThreadPoolGroup.getScheduler().schedule(new Runnable() {
-
- @Override
- public void run() {
- long taskExecuteTime = System.currentTimeMillis();
- Utils.writeAndFlush(pkg, startTime, taskExecuteTime,
session.getContext(), session);
- }
+ tcpThreadPoolGroup.getScheduler().schedule(() -> {
+ long taskExecuteTime = System.currentTimeMillis();
+ Utils.writeAndFlush(pkg, startTime, taskExecuteTime,
session.getContext(), session);
}, 5 * 1000, TimeUnit.MILLISECONDS);
closeSessionIfTimeout(tcpThreadPoolGroup, session, mapping);
return session.getRemoteAddress() + "--->" + newEventMeshIp + ":"
+ port;
@@ -145,18 +128,14 @@ public class EventMeshTcp2Client {
public static void closeSessionIfTimeout(TCPThreadPoolGroup
tcpThreadPoolGroup, Session session,
ClientSessionGroupMapping mapping) {
- tcpThreadPoolGroup.getScheduler().schedule(new Runnable() {
-
- @Override
- public void run() {
- try {
- if (session.getSessionState() != SessionState.CLOSED) {
- mapping.closeSession(session.getContext());
- log.info("closeSessionIfTimeout success, session[{}]",
session.getClient());
- }
- } catch (Exception e) {
- log.error("close session failed", e);
+ tcpThreadPoolGroup.getScheduler().schedule(() -> {
+ try {
+ if (session.getSessionState() != SessionState.CLOSED) {
+ mapping.closeSession(session.getContext());
+ log.info("closeSessionIfTimeout success, session[{}]",
session.getClient());
}
+ } catch (Exception e) {
+ log.error("close session failed", e);
}
}, 30 * 1000, TimeUnit.MILLISECONDS);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]