This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f932b8d47a [INLONG-11725][SDK] Replace the Sender used in the
agent-core module with TcpMsgSender (#11726)
f932b8d47a is described below
commit f932b8d47a7abb30b9e8ec728be8b68bbf8611f1
Author: Goson Zhang <[email protected]>
AuthorDate: Sat Feb 8 10:12:04 2025 +0800
[INLONG-11725][SDK] Replace the Sender used in the agent-core module with
TcpMsgSender (#11726)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/agent/core/AgentStatusManager.java | 25 ++++++++++++----------
.../inlong/agent/core/FileStaticManager.java | 25 ++++++++++++----------
.../apache/inlong/agent/core/HeartbeatManager.java | 18 +++++++++-------
3 files changed, 38 insertions(+), 30 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
index 14395412bb..50085fa1bd 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
@@ -23,8 +23,9 @@ import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ExcuteLinux;
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
@@ -158,23 +159,25 @@ public class AgentStatusManager {
return manager;
}
- private void doSendStatusMsg(DefaultMessageSender sender) {
+ private void doSendStatusMsg(TcpMsgSender sender) {
AgentStatus data = AgentStatusManager.getInstance().getStatus();
LOGGER.info("status detail: {}", data);
if (sender == null) {
return;
}
- SendResult ret =
sender.sendMessage(data.getFieldsString().getBytes(StandardCharsets.UTF_8),
- INLONG_AGENT_SYSTEM,
- INLONG_AGENT_STATUS,
- AgentUtils.getCurrentTime(),
- "");
- if (ret != SendResult.OK) {
- LOGGER.error("send status failed: ret {}", ret);
+ try {
+ ProcessResult procResult = new ProcessResult();
+ if (!sender.sendMessage(new TcpEventInfo(INLONG_AGENT_SYSTEM,
+ INLONG_AGENT_STATUS, AgentUtils.getCurrentTime(), null,
+ data.getFieldsString().getBytes(StandardCharsets.UTF_8)),
procResult)) {
+ LOGGER.error("send status failed: ret = {}", procResult);
+ }
+ } catch (Throwable ex) {
+ LOGGER.error("send status throw exception", ex);
}
}
- public static void sendStatusMsg(DefaultMessageSender sender) {
+ public static void sendStatusMsg(TcpMsgSender sender) {
if (AgentStatusManager.getInstance() != null) {
AgentStatusManager.getInstance().doSendStatusMsg(sender);
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
index 388e2ef279..b3a47f2c6a 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
@@ -19,8 +19,9 @@ package org.apache.inlong.agent.core;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
@@ -124,25 +125,27 @@ public class FileStaticManager {
}
}
- private void doSendStaticMsg(DefaultMessageSender sender) {
+ private void doSendStaticMsg(TcpMsgSender sender) {
while (!queue.isEmpty()) {
FileStatic data = queue.poll();
LOGGER.info("file static detail: {}", data);
if (sender == null) {
continue;
}
- SendResult ret =
sender.sendMessage(data.getFieldsString().getBytes(StandardCharsets.UTF_8),
- INLONG_AGENT_SYSTEM,
- INLONG_FILE_STATIC,
- AgentUtils.getCurrentTime(),
- "");
- if (ret != SendResult.OK) {
- LOGGER.error("send static failed: ret {}", ret);
+ try {
+ ProcessResult procResult = new ProcessResult();
+ if (!sender.sendMessage(new TcpEventInfo(INLONG_AGENT_SYSTEM,
+ INLONG_FILE_STATIC, AgentUtils.getCurrentTime(), null,
+
data.getFieldsString().getBytes(StandardCharsets.UTF_8)), procResult)) {
+ LOGGER.error("send static failed: ret = {}", procResult);
+ }
+ } catch (Throwable ex) {
+ LOGGER.error("send static throw exception", ex);
}
}
}
- public static void sendStaticMsg(DefaultMessageSender sender) {
+ public static void sendStaticMsg(TcpMsgSender sender) {
if (FileStaticManager.getInstance() != null) {
FileStaticManager.getInstance().doSendStaticMsg(sender);
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 870993a6f2..1599de55f7 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -28,7 +28,8 @@ import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -64,7 +65,7 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
private final HttpManager httpManager;
private final String baseManagerUrl;
private final String reportHeartbeatUrl;
- private DefaultMessageSender sender;
+ private TcpMsgSender sender;
/**
* Init heartbeat manager.
@@ -194,18 +195,19 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
String managerAddr = conf.get(AGENT_MANAGER_ADDR);
String authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
String authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
- TcpMsgSenderConfig proxyClientConfig = null;
+ TcpMsgSenderConfig proxyClientConfig;
try {
- proxyClientConfig = new TcpMsgSenderConfig(managerAddr,
INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
-
proxyClientConfig.setTotalAsyncCallbackSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
+ proxyClientConfig = new TcpMsgSenderConfig(managerAddr,
+ INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
+
proxyClientConfig.setSendBufferSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
proxyClientConfig.setNettyWorkerThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
proxyClientConfig.setRequestTimeoutMs(30000L);
ThreadFactory SHARED_FACTORY = new
DefaultThreadFactory("agent-sender-manager-heartbeat",
Thread.currentThread().isDaemon());
- sender = new DefaultMessageSender(proxyClientConfig,
SHARED_FACTORY);
- } catch (Exception e) {
- LOGGER.error("heartbeat manager create sdk failed: ", e);
+ sender = new InLongTcpMsgSender(proxyClientConfig, SHARED_FACTORY);
+ } catch (Throwable ex) {
+ LOGGER.error("heartbeat manager create sdk failed: ", ex);
}
}
}
\ No newline at end of file