This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fd6cfba554 [INLONG-11727][SDK] Replace the Sender used in the
agent-plugins module with TcpMsgSender (#11728)
fd6cfba554 is described below
commit fd6cfba554b99eb0f424f4e97550ffee23ce8990
Author: Goson Zhang <[email protected]>
AuthorDate: Sat Feb 8 16:12:20 2025 +0800
[INLONG-11727][SDK] Replace the Sender used in the agent-plugins module
with TcpMsgSender (#11728)
* [INLONG-11727][SDK] Replace the Sender used in the agent-plugins module
with TcpMsgSender
* fix ut test fails
---------
Co-authored-by: gosonzhang <[email protected]>
---
.../apache/inlong/agent/core/HeartbeatManager.java | 7 +++
.../plugin/sinks/filecollect/SenderManager.java | 55 ++++++++++++++--------
.../sinks/filecollect/TestSenderManager.java | 13 ++---
.../dataproxy/network/tcp/codec/DecodeObject.java | 1 +
.../sdk/dataproxy/ProxyClientConfigTest.java | 1 +
5 files changed, 51 insertions(+), 26 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 1599de55f7..50282be586 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -28,6 +28,8 @@ import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
@@ -206,6 +208,11 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
ThreadFactory SHARED_FACTORY = new
DefaultThreadFactory("agent-sender-manager-heartbeat",
Thread.currentThread().isDaemon());
sender = new InLongTcpMsgSender(proxyClientConfig, SHARED_FACTORY);
+ // start sender object
+ ProcessResult procResult = new ProcessResult();
+ if (!sender.start(procResult)) {
+ throw new ProxySdkException("Sender start failure, " +
procResult);
+ }
} catch (Throwable ex) {
LOGGER.error("heartbeat manager create sdk failed: ", ex);
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index b2303feafe..34c1b67f70 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -30,10 +30,13 @@ import org.apache.inlong.agent.plugin.message.SequentialID;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -71,7 +74,7 @@ public class SenderManager {
private static final SequentialID SEQUENTIAL_ID =
SequentialID.getInstance();
public static final int RESEND_QUEUE_WAIT_MS = 10;
// cache for group and sender list, share the map cross agent lifecycle.
- private DefaultMessageSender sender;
+ private TcpMsgSender sender;
private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
0, Integer.MAX_VALUE,
@@ -200,20 +203,22 @@ public class SenderManager {
private void createMessageSender() throws Exception {
TcpMsgSenderConfig proxyClientConfig = new TcpMsgSenderConfig(
managerAddr, inlongGroupId, authSecretId, authSecretKey);
- proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
+ proxyClientConfig.setSendBufferSize(totalAsyncBufSize);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L);
-
proxyClientConfig.setNettyWorkerThreadNum(ioThreadNum);
proxyClientConfig.setEnableEpollBusyWait(enableBusyWait);
-
+ proxyClientConfig.setSdkMsgType(MsgType.valueOf(msgType));
+ proxyClientConfig.setEnableDataCompress(isCompress);
SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" +
sourcePath,
Thread.currentThread().isDaemon());
-
- DefaultMessageSender sender = new
DefaultMessageSender(proxyClientConfig, SHARED_FACTORY);
- sender.setMsgtype(msgType);
- sender.setCompress(isCompress);
- this.sender = sender;
+ // build sender object
+ this.sender = new InLongTcpMsgSender(proxyClientConfig,
SHARED_FACTORY);
+ ProcessResult procResult = new ProcessResult();
+ // start sender object
+ if (!sender.start(procResult)) {
+ throw new ProxySdkException("Start sender failure, " + procResult);
+ }
}
public void sendBatch(SenderMessage message) {
@@ -230,7 +235,7 @@ public class SenderManager {
*/
private void sendBatchWithRetryCount(SenderMessage message, int retry) {
boolean suc = false;
- while (!suc) {
+ while (!suc && !shutdown) {
try {
AgentSenderCallback cb = new AgentSenderCallback(message,
retry);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND,
message.getGroupId(),
@@ -267,11 +272,21 @@ public class SenderManager {
}
}
- private void asyncSendByMessageSender(SendMessageCallback cb,
+ private void asyncSendByMessageSender(MsgSendCallback cb,
List<byte[]> bodyList, String groupId, String streamId, long
dataTime, String msgUUID,
- Map<String, String> extraAttrMap, boolean isProxySend) throws
ProxySdkException {
- sender.asyncSendMessage(cb, bodyList, groupId,
- streamId, dataTime, msgUUID, extraAttrMap, isProxySend);
+ Map<String, String> extraAttrMap, boolean isProxySend) throws
Exception {
+ boolean isSuccess;
+ ProcessResult procResult = new ProcessResult();
+ if (isProxySend) {
+ isSuccess = sender.asyncSendMsgWithSinkAck(new TcpEventInfo(
+ groupId, streamId, dataTime, msgUUID, extraAttrMap,
bodyList), cb, procResult);
+ } else {
+ isSuccess = sender.asyncSendMessage(new TcpEventInfo(
+ groupId, streamId, dataTime, msgUUID, extraAttrMap,
bodyList), cb, procResult);
+ }
+ if (!isSuccess) {
+ throw new ProxySdkException("Send message failure, " + procResult);
+ }
}
/**
@@ -330,7 +345,7 @@ public class SenderManager {
/**
* sender callback
*/
- private class AgentSenderCallback implements SendMessageCallback {
+ private class AgentSenderCallback implements MsgSendCallback {
private final int retry;
private final SenderMessage message;
@@ -343,13 +358,13 @@ public class SenderManager {
}
@Override
- public void onMessageAck(SendResult result) {
+ public void onMessageAck(ProcessResult result) {
String groupId = message.getGroupId();
String streamId = message.getStreamId();
String taskId = message.getTaskId();
String instanceId = message.getInstanceId();
long dataTime = message.getDataTime();
- if (result != null && result.equals(SendResult.OK)) {
+ if (result.isSuccess()) {
message.getOffsetAckList().forEach(ack -> ack.setHasAck(true));
getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId,
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 508e21588f..4e068f5930 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -27,8 +27,9 @@ import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -84,14 +85,14 @@ public class TestSenderManager {
@Test
public void testNormalAck() {
- List<SendMessageCallback> cbList = new ArrayList<>();
+ List<MsgSendCallback> cbList = new ArrayList<>();
try {
profile.set(TaskConstants.INODE_INFO,
FileDataUtils.getInodeInfo(profile.getInstanceId()));
SenderManager senderManager = PowerMockito.spy(new
SenderManager(profile, "inlongGroupId", "sourceName"));
PowerMockito.doNothing().when(senderManager,
"createMessageSender");
PowerMockito.doAnswer(invocation -> {
- SendMessageCallback cb = invocation.getArgument(0);
+ MsgSendCallback cb = invocation.getArgument(0);
cbList.add(cb);
return null;
}).when(senderManager, "asyncSendByMessageSender", Mockito.any(),
@@ -115,11 +116,11 @@ public class TestSenderManager {
}
Assert.assertTrue(cbList.size() == 10);
for (int i = 0; i < 5; i++) {
- cbList.get(4 - i).onMessageAck(SendResult.OK);
+ cbList.get(4 - i).onMessageAck(new
ProcessResult(ErrorCode.OK));
}
Assert.assertTrue(calHasAckCount(ackInfoListTotal) == 5);
for (int i = 5; i < 10; i++) {
- cbList.get(i).onMessageAck(SendResult.OK);
+ cbList.get(i).onMessageAck(new ProcessResult(ErrorCode.OK));
AgentUtils.silenceSleepInMs(10);
}
Assert.assertTrue(String.valueOf(calHasAckCount(ackInfoListTotal)),
calHasAckCount(ackInfoListTotal) == 10);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
index 669f8330de..d9b24b0aae 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
@@ -84,6 +84,7 @@ public class DecodeObject {
private void handleAttr(String attributes) {
if (StringUtils.isBlank(attributes)) {
+ this.procResult = new ProcessResult(ErrorCode.OK);
return;
}
retAttr = new HashMap<>(MAP_SPLITTER.split(attributes));
diff --git
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
index 0173651169..68a08a00b9 100644
---
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
+++
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
@@ -29,6 +29,7 @@ public class ProxyClientConfigTest {
public void testManagerConfig() throws Exception {
HttpMsgSenderConfig httpConfig = new HttpMsgSenderConfig(
"http://127.0.0.1:800", "test_id", "secretId", "secretKey");
+ httpConfig.setHttpAsyncRptPoolConfig(30, 20);
HttpMsgSenderConfig httpConfig1 = httpConfig.clone();
Assert.assertEquals(httpConfig, httpConfig1);
httpConfig1.setRegionName("sz");