This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 73c6cbed8f [INLONG-9772][Agent] Increase auditing for sending 
exceptions and resending (#9775)
73c6cbed8f is described below

commit 73c6cbed8fbbacd323bc00380280988906a7c325
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Mar 5 17:35:36 2024 +0800

    [INLONG-9772][Agent] Increase auditing for sending exceptions and resending 
(#9775)
---
 .../inlong/agent/metrics/audit/AuditUtils.java     |  8 +++++++-
 .../plugin/sinks/filecollect/SenderManager.java    | 23 ++++++++++++++++++++--
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index c2d946b923..fc6f4d90c3 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -54,8 +54,14 @@ public class AuditUtils {
     public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009;
     public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010;
     public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011;
+    public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30013;
     public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014;
-    public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30015;
+    public static final int AUDIT_ID_AGENT_TRY_SEND = 30020;
+    public static final int AUDIT_ID_AGENT_TRY_SEND_REAL_TIME = 30021;
+    public static final int AUDIT_ID_AGENT_SEND_EXCEPTION = 30022;
+    public static final int AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME = 30023;
+    public static final int AUDIT_ID_AGENT_RESEND = 30024;
+    public static final int AUDIT_ID_AGENT_RESEND_REAL_TIME = 30025;
 
     private static boolean IS_AUDIT = true;
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 1c6790df73..51056a1975 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -127,14 +127,14 @@ public class SenderManager {
                 CommonConstants.PROXY_SENDER_MAX_TIMEOUT, 
CommonConstants.DEFAULT_PROXY_SENDER_MAX_TIMEOUT);
         maxSenderRetry = profile.getInt(
                 CommonConstants.PROXY_SENDER_MAX_RETRY, 
CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY);
-        retrySleepTime = profile.getLong(
+        retrySleepTime = agentConf.getLong(
                 CommonConstants.PROXY_RETRY_SLEEP, 
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
         isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE, 
CommonConstants.DEFAULT_IS_FILE);
         ioThreadNum = 
profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
                 CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
         enableBusyWait = 
profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
                 CommonConstants.DEFAULT_PROXY_CLIENT_ENABLE_BUSY_WAIT);
-        batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, 
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
+        batchFlushInterval = agentConf.getInt(PROXY_BATCH_FLUSH_INTERVAL, 
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
         authSecretId = agentConf.get(AGENT_MANAGER_AUTH_SECRET_ID);
         authSecretKey = agentConf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
 
@@ -231,6 +231,12 @@ public class SenderManager {
         while (!suc) {
             try {
                 AgentSenderCallback cb = new AgentSenderCallback(message, 
retry);
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND, 
message.getGroupId(),
+                        message.getStreamId(), message.getDataTime(), 
message.getMsgCnt(),
+                        message.getTotalSize());
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND_REAL_TIME, 
message.getGroupId(),
+                        message.getStreamId(), AgentUtils.getCurrentTime(), 
message.getMsgCnt(),
+                        message.getTotalSize());
                 asyncSendByMessageSender(cb, message.getDataList(), 
message.getGroupId(),
                         message.getStreamId(), message.getDataTime(), 
SEQUENTIAL_ID.getNextUuid(),
                         maxSenderTimeout, TimeUnit.SECONDS, 
message.getExtraMap(), proxySend);
@@ -238,6 +244,12 @@ public class SenderManager {
                         message.getMsgCnt());
                 suc = true;
             } catch (Exception exception) {
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION, 
message.getGroupId(),
+                        message.getStreamId(), message.getDataTime(), 
message.getMsgCnt(),
+                        message.getTotalSize());
+                
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME, 
message.getGroupId(),
+                        message.getStreamId(), AgentUtils.getCurrentTime(), 
message.getMsgCnt(),
+                        message.getTotalSize());
                 suc = false;
                 if (retry > maxSenderRetry) {
                     if (retry % 10 == 0) {
@@ -276,6 +288,13 @@ public class SenderManager {
                 try {
                     AgentSenderCallback callback = resendQueue.poll(1, 
TimeUnit.SECONDS);
                     if (callback != null) {
+                        SenderMessage message = callback.message;
+                        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND, 
message.getGroupId(),
+                                message.getStreamId(), message.getDataTime(), 
message.getMsgCnt(),
+                                message.getTotalSize());
+                        
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND_REAL_TIME, message.getGroupId(),
+                                message.getStreamId(), 
AgentUtils.getCurrentTime(), message.getMsgCnt(),
+                                message.getTotalSize());
                         sendBatchWithRetryCount(callback.message, 
callback.retry + 1);
                     }
                 } catch (Exception ex) {

Reply via email to