This is an automated email from the ASF dual-hosted git repository.

pandaapo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e72b3c8c [ISSUE #3399] replace anonymous inner class with lambda 
(#4575)
4e72b3c8c is described below

commit 4e72b3c8c5f0434ffa144a2a364361d93582fcf0
Author: wizardzhang <[email protected]>
AuthorDate: Fri Nov 24 08:01:38 2023 +0800

    [ISSUE #3399] replace anonymous inner class with lambda (#4575)
---
 .../protocol/tcp/client/EventMeshTcp2Client.java   | 65 ++++++++--------------
 1 file changed, 22 insertions(+), 43 deletions(-)

diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
index ecca43476..fdde4aeb6 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java
@@ -35,7 +35,6 @@ import org.apache.eventmesh.runtime.util.Utils;
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 
@@ -54,13 +53,9 @@ public class EventMeshTcp2Client {
             msg.setHeader(new Header(SERVER_GOODBYE_REQUEST, 
OPStatus.SUCCESS.getCode(),
                 "graceful normal quit from eventmesh", null));
 
-            tcpThreadPoolGroup.getScheduler().submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    long taskExecuteTime = System.currentTimeMillis();
-                    Utils.writeAndFlush(msg, startTime, taskExecuteTime, 
session.getContext(), session);
-                }
+            tcpThreadPoolGroup.getScheduler().submit(() -> {
+                long taskExecuteTime = System.currentTimeMillis();
+                Utils.writeAndFlush(msg, startTime, taskExecuteTime, 
session.getContext(), session);
             });
             InetSocketAddress address = (InetSocketAddress) 
session.getContext().channel().remoteAddress();
 
@@ -79,13 +74,9 @@ public class EventMeshTcp2Client {
             long startTime = System.currentTimeMillis();
             Package msg = new Package();
             msg.setHeader(new Header(SERVER_GOODBYE_REQUEST, eventMeshStatus, 
errMsg, null));
-            tcpThreadPoolGroup.getScheduler().schedule(new Runnable() {
-
-                @Override
-                public void run() {
-                    long taskExecuteTime = System.currentTimeMillis();
-                    Utils.writeAndFlush(msg, startTime, taskExecuteTime, 
session.getContext(), session);
-                }
+            tcpThreadPoolGroup.getScheduler().schedule(() -> {
+                long taskExecuteTime = System.currentTimeMillis();
+                Utils.writeAndFlush(msg, startTime, taskExecuteTime, 
session.getContext(), session);
             }, 1 * 1000, TimeUnit.MILLISECONDS);
 
             closeSessionIfTimeout(tcpThreadPoolGroup, session, mapping);
@@ -103,16 +94,12 @@ public class EventMeshTcp2Client {
         Package pkg = new Package(new Header(SERVER_GOODBYE_REQUEST, 
OPStatus.FAIL.getCode(), errMsg, null));
         
eventMeshTcpMonitor.getTcpSummaryMetrics().getEventMesh2clientMsgNum().incrementAndGet();
         log.info("goodBye2Client client[{}]", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-        ctx.writeAndFlush(pkg).addListener(new ChannelFutureListener() {
-
-            @Override
-            public void operationComplete(ChannelFuture future) throws 
Exception {
-                Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
-                try {
-                    mapping.closeSession(ctx);
-                } catch (Exception e) {
-                    log.warn("close session failed!", e);
-                }
+        ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> {
+            Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
+            try {
+                mapping.closeSession(ctx);
+            } catch (Exception e) {
+                log.warn("close session failed!", e);
             }
         });
     }
@@ -127,13 +114,9 @@ public class EventMeshTcp2Client {
             Package pkg = new Package();
             pkg.setHeader(new Header(REDIRECT_TO_CLIENT, 
OPStatus.SUCCESS.getCode(), null, null));
             pkg.setBody(new RedirectInfo(newEventMeshIp, port));
-            tcpThreadPoolGroup.getScheduler().schedule(new Runnable() {
-
-                @Override
-                public void run() {
-                    long taskExecuteTime = System.currentTimeMillis();
-                    Utils.writeAndFlush(pkg, startTime, taskExecuteTime, 
session.getContext(), session);
-                }
+            tcpThreadPoolGroup.getScheduler().schedule(() -> {
+                long taskExecuteTime = System.currentTimeMillis();
+                Utils.writeAndFlush(pkg, startTime, taskExecuteTime, 
session.getContext(), session);
             }, 5 * 1000, TimeUnit.MILLISECONDS);
             closeSessionIfTimeout(tcpThreadPoolGroup, session, mapping);
             return session.getRemoteAddress() + "--->" + newEventMeshIp + ":" 
+ port;
@@ -145,18 +128,14 @@ public class EventMeshTcp2Client {
 
     public static void closeSessionIfTimeout(TCPThreadPoolGroup 
tcpThreadPoolGroup, Session session,
         ClientSessionGroupMapping mapping) {
-        tcpThreadPoolGroup.getScheduler().schedule(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    if (session.getSessionState() != SessionState.CLOSED) {
-                        mapping.closeSession(session.getContext());
-                        log.info("closeSessionIfTimeout success, session[{}]", 
session.getClient());
-                    }
-                } catch (Exception e) {
-                    log.error("close session failed", e);
+        tcpThreadPoolGroup.getScheduler().schedule(() -> {
+            try {
+                if (session.getSessionState() != SessionState.CLOSED) {
+                    mapping.closeSession(session.getContext());
+                    log.info("closeSessionIfTimeout success, session[{}]", 
session.getClient());
                 }
+            } catch (Exception e) {
+                log.error("close session failed", e);
             }
         }, 30 * 1000, TimeUnit.MILLISECONDS);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to