This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 73c6cbed8f [INLONG-9772][Agent] Increase auditing for sending
exceptions and resending (#9775)
73c6cbed8f is described below
commit 73c6cbed8fbbacd323bc00380280988906a7c325
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Mar 5 17:35:36 2024 +0800
[INLONG-9772][Agent] Increase auditing for sending exceptions and resending
(#9775)
---
.../inlong/agent/metrics/audit/AuditUtils.java | 8 +++++++-
.../plugin/sinks/filecollect/SenderManager.java | 23 ++++++++++++++++++++--
2 files changed, 28 insertions(+), 3 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index c2d946b923..fc6f4d90c3 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -54,8 +54,14 @@ public class AuditUtils {
public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009;
public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010;
public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011;
+ public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30013;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014;
- public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30015;
+ public static final int AUDIT_ID_AGENT_TRY_SEND = 30020;
+ public static final int AUDIT_ID_AGENT_TRY_SEND_REAL_TIME = 30021;
+ public static final int AUDIT_ID_AGENT_SEND_EXCEPTION = 30022;
+ public static final int AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME = 30023;
+ public static final int AUDIT_ID_AGENT_RESEND = 30024;
+ public static final int AUDIT_ID_AGENT_RESEND_REAL_TIME = 30025;
private static boolean IS_AUDIT = true;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 1c6790df73..51056a1975 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -127,14 +127,14 @@ public class SenderManager {
CommonConstants.PROXY_SENDER_MAX_TIMEOUT,
CommonConstants.DEFAULT_PROXY_SENDER_MAX_TIMEOUT);
maxSenderRetry = profile.getInt(
CommonConstants.PROXY_SENDER_MAX_RETRY,
CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY);
- retrySleepTime = profile.getLong(
+ retrySleepTime = agentConf.getLong(
CommonConstants.PROXY_RETRY_SLEEP,
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE,
CommonConstants.DEFAULT_IS_FILE);
ioThreadNum =
profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
enableBusyWait =
profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
CommonConstants.DEFAULT_PROXY_CLIENT_ENABLE_BUSY_WAIT);
- batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL,
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
+ batchFlushInterval = agentConf.getInt(PROXY_BATCH_FLUSH_INTERVAL,
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
authSecretId = agentConf.get(AGENT_MANAGER_AUTH_SECRET_ID);
authSecretKey = agentConf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
@@ -231,6 +231,12 @@ public class SenderManager {
while (!suc) {
try {
AgentSenderCallback cb = new AgentSenderCallback(message,
retry);
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND,
message.getGroupId(),
+ message.getStreamId(), message.getDataTime(),
message.getMsgCnt(),
+ message.getTotalSize());
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND_REAL_TIME,
message.getGroupId(),
+ message.getStreamId(), AgentUtils.getCurrentTime(),
message.getMsgCnt(),
+ message.getTotalSize());
asyncSendByMessageSender(cb, message.getDataList(),
message.getGroupId(),
message.getStreamId(), message.getDataTime(),
SEQUENTIAL_ID.getNextUuid(),
maxSenderTimeout, TimeUnit.SECONDS,
message.getExtraMap(), proxySend);
@@ -238,6 +244,12 @@ public class SenderManager {
message.getMsgCnt());
suc = true;
} catch (Exception exception) {
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION,
message.getGroupId(),
+ message.getStreamId(), message.getDataTime(),
message.getMsgCnt(),
+ message.getTotalSize());
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME,
message.getGroupId(),
+ message.getStreamId(), AgentUtils.getCurrentTime(),
message.getMsgCnt(),
+ message.getTotalSize());
suc = false;
if (retry > maxSenderRetry) {
if (retry % 10 == 0) {
@@ -276,6 +288,13 @@ public class SenderManager {
try {
AgentSenderCallback callback = resendQueue.poll(1,
TimeUnit.SECONDS);
if (callback != null) {
+ SenderMessage message = callback.message;
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND,
message.getGroupId(),
+ message.getStreamId(), message.getDataTime(),
message.getMsgCnt(),
+ message.getTotalSize());
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND_REAL_TIME, message.getGroupId(),
+ message.getStreamId(),
AgentUtils.getCurrentTime(), message.getMsgCnt(),
+ message.getTotalSize());
sendBatchWithRetryCount(callback.message,
callback.retry + 1);
}
} catch (Exception ex) {