This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e51e9edfc9 [INLONG-9265][Agent] Add audit of agent send success (#9266)
e51e9edfc9 is described below
commit e51e9edfc913c6c6bc55df111531a1c8254b82ff
Author: justinwwhuang <[email protected]>
AuthorDate: Sun Nov 12 14:57:39 2023 +0800
[INLONG-9265][Agent] Add audit of agent send success (#9266)
---
.../apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index abb21fe8ba..d13ef19963 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -28,6 +28,7 @@ import
org.apache.inlong.agent.message.filecollect.PackageAckInfo;
import org.apache.inlong.agent.message.filecollect.SenderMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.message.SequentialID;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
@@ -47,7 +48,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@@ -122,7 +122,6 @@ public class SenderManager {
private List<PackageAckInfo> packageAckInfoList = new ArrayList<>();
private final ReentrantReadWriteLock packageAckInfoLock = new
ReentrantReadWriteLock(true);
protected InstanceProfile profile;
- private Random testRandom = new Random();
private volatile boolean offsetRunning = false;
private volatile boolean resendRunning = false;
private volatile boolean started = false;
@@ -175,7 +174,6 @@ public class SenderManager {
this.metricItemSet = new AgentMetricItemSet(metricName);
MetricRegister.register(metricItemSet);
resendQueue = new LinkedBlockingQueue<>();
-
}
public void Start() throws Exception {
@@ -429,6 +427,8 @@ public class SenderManager {
if (result != null && result.equals(SendResult.OK)) {
message.getAckInfo().setHasAck(true);
getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId,
+ System.currentTimeMillis(), message.getMsgCnt(),
message.getTotalSize());
} else {
LOGGER.warn("send groupId {}, streamId {}, taskId {},
instanceId {}, dataTime {} fail with times {}, "
+ "error {}", groupId, streamId, taskId, instanceId,
dataTime, retry, result);