This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f932b8d47a [INLONG-11725][SDK] Replace the Sender used in the 
agent-core module with TcpMsgSender (#11726)
f932b8d47a is described below

commit f932b8d47a7abb30b9e8ec728be8b68bbf8611f1
Author: Goson Zhang <[email protected]>
AuthorDate: Sat Feb 8 10:12:04 2025 +0800

    [INLONG-11725][SDK] Replace the Sender used in the agent-core module with 
TcpMsgSender (#11726)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../inlong/agent/core/AgentStatusManager.java      | 25 ++++++++++++----------
 .../inlong/agent/core/FileStaticManager.java       | 25 ++++++++++++----------
 .../apache/inlong/agent/core/HeartbeatManager.java | 18 +++++++++-------
 3 files changed, 38 insertions(+), 30 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
index 14395412bb..50085fa1bd 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
@@ -23,8 +23,9 @@ import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.core.task.TaskManager;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ExcuteLinux;
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
 
 import com.google.common.collect.Lists;
 import lombok.AllArgsConstructor;
@@ -158,23 +159,25 @@ public class AgentStatusManager {
         return manager;
     }
 
-    private void doSendStatusMsg(DefaultMessageSender sender) {
+    private void doSendStatusMsg(TcpMsgSender sender) {
         AgentStatus data = AgentStatusManager.getInstance().getStatus();
         LOGGER.info("status detail: {}", data);
         if (sender == null) {
             return;
         }
-        SendResult ret = 
sender.sendMessage(data.getFieldsString().getBytes(StandardCharsets.UTF_8),
-                INLONG_AGENT_SYSTEM,
-                INLONG_AGENT_STATUS,
-                AgentUtils.getCurrentTime(),
-                "");
-        if (ret != SendResult.OK) {
-            LOGGER.error("send status failed: ret {}", ret);
+        try {
+            ProcessResult procResult = new ProcessResult();
+            if (!sender.sendMessage(new TcpEventInfo(INLONG_AGENT_SYSTEM,
+                    INLONG_AGENT_STATUS, AgentUtils.getCurrentTime(), null,
+                    data.getFieldsString().getBytes(StandardCharsets.UTF_8)), 
procResult)) {
+                LOGGER.error("send status failed: ret = {}", procResult);
+            }
+        } catch (Throwable ex) {
+            LOGGER.error("send status throw exception", ex);
         }
     }
 
-    public static void sendStatusMsg(DefaultMessageSender sender) {
+    public static void sendStatusMsg(TcpMsgSender sender) {
         if (AgentStatusManager.getInstance() != null) {
             AgentStatusManager.getInstance().doSendStatusMsg(sender);
         }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
index 388e2ef279..b3a47f2c6a 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
@@ -19,8 +19,9 @@ package org.apache.inlong.agent.core;
 
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
 
 import com.google.common.collect.Lists;
 import lombok.AllArgsConstructor;
@@ -124,25 +125,27 @@ public class FileStaticManager {
         }
     }
 
-    private void doSendStaticMsg(DefaultMessageSender sender) {
+    private void doSendStaticMsg(TcpMsgSender sender) {
         while (!queue.isEmpty()) {
             FileStatic data = queue.poll();
             LOGGER.info("file static detail: {}", data);
             if (sender == null) {
                 continue;
             }
-            SendResult ret = 
sender.sendMessage(data.getFieldsString().getBytes(StandardCharsets.UTF_8),
-                    INLONG_AGENT_SYSTEM,
-                    INLONG_FILE_STATIC,
-                    AgentUtils.getCurrentTime(),
-                    "");
-            if (ret != SendResult.OK) {
-                LOGGER.error("send static failed: ret {}", ret);
+            try {
+                ProcessResult procResult = new ProcessResult();
+                if (!sender.sendMessage(new TcpEventInfo(INLONG_AGENT_SYSTEM,
+                        INLONG_FILE_STATIC, AgentUtils.getCurrentTime(), null,
+                        
data.getFieldsString().getBytes(StandardCharsets.UTF_8)), procResult)) {
+                    LOGGER.error("send static failed: ret = {}", procResult);
+                }
+            } catch (Throwable ex) {
+                LOGGER.error("send static throw exception", ex);
             }
         }
     }
 
-    public static void sendStaticMsg(DefaultMessageSender sender) {
+    public static void sendStaticMsg(TcpMsgSender sender) {
         if (FileStaticManager.getInstance() != null) {
             FileStaticManager.getInstance().doSendStaticMsg(sender);
         }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 870993a6f2..1599de55f7 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -28,7 +28,8 @@ import org.apache.inlong.common.enums.ComponentTypeEnum;
 import org.apache.inlong.common.enums.NodeSrvStatus;
 import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
 import org.apache.inlong.common.heartbeat.HeartbeatMsg;
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -64,7 +65,7 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
     private final HttpManager httpManager;
     private final String baseManagerUrl;
     private final String reportHeartbeatUrl;
-    private DefaultMessageSender sender;
+    private TcpMsgSender sender;
 
     /**
      * Init heartbeat manager.
@@ -194,18 +195,19 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
         String managerAddr = conf.get(AGENT_MANAGER_ADDR);
         String authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
         String authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
-        TcpMsgSenderConfig proxyClientConfig = null;
+        TcpMsgSenderConfig proxyClientConfig;
         try {
-            proxyClientConfig = new TcpMsgSenderConfig(managerAddr, 
INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
-            
proxyClientConfig.setTotalAsyncCallbackSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
+            proxyClientConfig = new TcpMsgSenderConfig(managerAddr,
+                    INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
+            
proxyClientConfig.setSendBufferSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
             
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
             
proxyClientConfig.setNettyWorkerThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
             proxyClientConfig.setRequestTimeoutMs(30000L);
             ThreadFactory SHARED_FACTORY = new 
DefaultThreadFactory("agent-sender-manager-heartbeat",
                     Thread.currentThread().isDaemon());
-            sender = new DefaultMessageSender(proxyClientConfig, 
SHARED_FACTORY);
-        } catch (Exception e) {
-            LOGGER.error("heartbeat manager create sdk failed: ", e);
+            sender = new InLongTcpMsgSender(proxyClientConfig, SHARED_FACTORY);
+        } catch (Throwable ex) {
+            LOGGER.error("heartbeat manager create sdk failed: ", ex);
         }
     }
 }
\ No newline at end of file

Reply via email to