This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cd2a7a84a0 [INLONG-10035][Agent] Report audit data using the new SDK
interface (#10038)
cd2a7a84a0 is described below
commit cd2a7a84a02dbd84a2ffa6851f5185cda8524461
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Apr 22 20:52:56 2024 +0800
[INLONG-10035][Agent] Report audit data using the new SDK interface (#10038)
---
.../agent/message/file/ProxyMessageCache.java | 2 ++
.../inlong/agent/metrics/audit/AuditUtils.java | 14 ++++++++++++--
.../agent/core/instance/InstanceManager.java | 16 +++++++++-------
.../agent/plugin/instance/CommonInstance.java | 4 +++-
.../plugin/sinks/filecollect/SenderManager.java | 22 ++++++++++++----------
.../inlong/agent/plugin/sources/LogFileSource.java | 12 ++++++++++--
.../agent/plugin/sources/file/AbstractSource.java | 8 +++++---
.../inlong/agent/plugin/task/AbstractTask.java | 4 +++-
8 files changed, 56 insertions(+), 26 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
index 08a7b10d33..9216461579 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
@@ -39,6 +39,7 @@ import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PAC
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
+import static org.apache.inlong.common.msg.AttributeConstants.AUDIT_VERSION;
/**
* Handle List of Proxy Message, which belong to the same stream id.
@@ -77,6 +78,7 @@ public class ProxyMessageCache {
dataTime = instanceProfile.getSinkDataTime();
extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
+ extraMap.put(AUDIT_VERSION, taskId);
}
public void generateExtraMap(String dataKey) {
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index e4cc7691c8..7b5edb5a0b 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -30,6 +30,8 @@ import static
org.apache.inlong.agent.constant.AgentConstants.AUDIT_ENABLE;
import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_KEY_PROXYS;
import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_ENABLE;
import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_PROXYS;
+import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
+import static
org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION;
/**
* AuditUtils
@@ -42,6 +44,7 @@ public class AuditUtils {
public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
public static final int AUDIT_ID_AGENT_READ_SUCCESS = 3;
public static final int AUDIT_ID_AGENT_SEND_SUCCESS = 4;
+ public static final int AUDIT_ID_AGENT_READ_FAILED = 10003;
public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004;
public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 30001;
public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 30002;
@@ -54,6 +57,7 @@ public class AuditUtils {
public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009;
public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010;
public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011;
+ public static final int AUDIT_ID_AGENT_READ_FAILED_REAL_TIME = 30012;
public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30013;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014;
public static final int AUDIT_ID_AGENT_TRY_SEND = 30020;
@@ -93,11 +97,17 @@ public class AuditUtils {
* Add audit metric
*/
public static void add(int auditID, String inlongGroupId, String
inlongStreamId,
- long logTime, int count, long size) {
+ long logTime, int count, long size, long version) {
if (!IS_AUDIT) {
return;
}
- AuditOperator.getInstance().add(auditID, inlongGroupId,
inlongStreamId, logTime, count, size);
+ AuditOperator.getInstance()
+ .add(auditID, DEFAULT_AUDIT_TAG, inlongGroupId,
inlongStreamId, logTime, count, size, version);
+ }
+
+ public static void add(int auditID, String inlongGroupId, String
inlongStreamId,
+ long logTime, int count, long size) {
+ add(auditID, inlongGroupId, inlongStreamId, logTime, count, size,
DEFAULT_AUDIT_VERSION);
}
/**
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 2c9bd721d8..5545039f5e 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -80,6 +80,7 @@ public class InstanceManager extends AbstractDaemon {
private final int instanceLimit;
private final AgentConfiguration agentConf;
private final String taskId;
+ private long auditVersion;
private volatile boolean runAtLeastOneTime = false;
private volatile boolean running = false;
private final double reserveCoefficient = 0.8;
@@ -122,6 +123,7 @@ public class InstanceManager extends AbstractDaemon {
*/
public InstanceManager(String taskId, int instanceLimit, Db basicDb,
TaskProfileDb taskProfileDb) {
this.taskId = taskId;
+ this.auditVersion = Long.parseLong(taskId);
instanceDb = new InstanceDb(basicDb);
this.taskProfileDb = taskProfileDb;
this.agentConf = AgentConfiguration.getAgentConf();
@@ -171,7 +173,7 @@ public class InstanceManager extends AbstractDaemon {
String inlongGroupId = taskFromDb.getInlongGroupId();
String inlongStreamId = taskFromDb.getInlongStreamId();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId,
inlongStreamId,
- AgentUtils.getCurrentTime(), 1, 1);
+ AgentUtils.getCurrentTime(), 1, 1, auditVersion);
} catch (Throwable ex) {
LOGGER.error("coreThread error: ", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
ex);
@@ -387,7 +389,7 @@ public class InstanceManager extends AbstractDaemon {
LOGGER.info("delete instance from db: taskId {} instanceId {} result
{}", taskId,
instanceId, instanceDb.getInstance(taskId, instanceId));
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB,
inlongGroupId, inlongStreamId,
- profile.getSinkDataTime(), 1, 1);
+ profile.getSinkDataTime(), 1, 1, auditVersion);
}
private void deleteFromMemory(String instanceId) {
@@ -403,7 +405,7 @@ public class InstanceManager extends AbstractDaemon {
instanceMap.remove(instanceId);
LOGGER.info("delete instance from memory: taskId {} instanceId {}",
taskId, instance.getInstanceId());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM,
inlongGroupId, inlongStreamId,
- instance.getProfile().getSinkDataTime(), 1, 1);
+ instance.getProfile().getSinkDataTime(), 1, 1, auditVersion);
}
private void addToDb(InstanceProfile profile, boolean addNew) {
@@ -413,7 +415,7 @@ public class InstanceManager extends AbstractDaemon {
String inlongGroupId = profile.getInlongGroupId();
String inlongStreamId = profile.getInlongStreamId();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_DB,
inlongGroupId, inlongStreamId,
- profile.getSinkDataTime(), 1, 1);
+ profile.getSinkDataTime(), 1, 1, auditVersion);
}
}
@@ -430,7 +432,7 @@ public class InstanceManager extends AbstractDaemon {
LOGGER.error("old instance {} should not exist, try stop it first",
instanceProfile.getInstanceId());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL,
inlongGroupId, inlongStreamId,
- instanceProfile.getSinkDataTime(), 1, 1);
+ instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
}
LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr());
try {
@@ -445,12 +447,12 @@ public class InstanceManager extends AbstractDaemon {
instance.getInstanceId(), instanceMap.size(),
EXECUTOR_SERVICE.getTaskCount(),
EXECUTOR_SERVICE.getActiveCount());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM,
inlongGroupId, inlongStreamId,
- instanceProfile.getSinkDataTime(), 1, 1);
+ instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
} else {
LOGGER.error(
"add instance to memory init failed instanceId {}",
instance.getInstanceId());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED,
inlongGroupId, inlongStreamId,
- instanceProfile.getSinkDataTime(), 1, 1);
+ instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
}
} catch (Throwable t) {
LOGGER.error("add instance error {}", t.getMessage());
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
index d381c88bd2..e8d848b36b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
@@ -59,12 +59,14 @@ public abstract class CommonInstance extends Instance {
private volatile int checkFinishCount = 0;
private int heartbeatcheckCount = 0;
private long heartBeatStartTime = AgentUtils.getCurrentTime();
+ protected long auditVersion;
@Override
public boolean init(Object srcManager, InstanceProfile srcProfile) {
try {
instanceManager = (InstanceManager) srcManager;
profile = srcProfile;
+ auditVersion = Long.parseLong(getTaskId());
setInodeInfo(profile);
LOGGER.info("task id: {} submit new instance {} profile detail
{}.", profile.getTaskId(),
profile.getInstanceId(), profile.toJsonStr());
@@ -153,7 +155,7 @@ public abstract class CommonInstance extends Instance {
private void heartbeatStatic() {
if (AgentUtils.getCurrentTime() - heartBeatStartTime >
TimeUnit.SECONDS.toMillis(1)) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT,
profile.getInlongGroupId(),
- profile.getInlongStreamId(), AgentUtils.getCurrentTime(),
1, 1);
+ profile.getInlongStreamId(), AgentUtils.getCurrentTime(),
1, 1, auditVersion);
heartbeatcheckCount = 0;
heartBeatStartTime = AgentUtils.getCurrentTime();
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 4e5c07ca26..0eba81e4d2 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -106,9 +106,11 @@ public class SenderManager {
private volatile boolean resendRunning = false;
private volatile boolean started = false;
private static final AgentConfiguration agentConf =
AgentConfiguration.getAgentConf();
+ private long auditVersion;
public SenderManager(InstanceProfile profile, String inlongGroupId, String
sourcePath) {
this.profile = profile;
+ auditVersion = Long.parseLong(profile.getTaskId());
managerAddr = agentConf.get(AGENT_MANAGER_ADDR);
proxySend = profile.getBoolean(TASK_PROXY_SEND,
DEFAULT_TASK_PROXY_SEND);
totalAsyncBufSize = profile
@@ -233,10 +235,10 @@ public class SenderManager {
AgentSenderCallback cb = new AgentSenderCallback(message,
retry);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND,
message.getGroupId(),
message.getStreamId(), message.getDataTime(),
message.getMsgCnt(),
- message.getTotalSize());
+ message.getTotalSize(), auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND_REAL_TIME,
message.getGroupId(),
message.getStreamId(), AgentUtils.getCurrentTime(),
message.getMsgCnt(),
- message.getTotalSize());
+ message.getTotalSize(), auditVersion);
asyncSendByMessageSender(cb, message.getDataList(),
message.getGroupId(),
message.getStreamId(), message.getDataTime(),
SEQUENTIAL_ID.getNextUuid(),
maxSenderTimeout, TimeUnit.SECONDS,
message.getExtraMap(), proxySend);
@@ -246,10 +248,10 @@ public class SenderManager {
} catch (Exception exception) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION,
message.getGroupId(),
message.getStreamId(), message.getDataTime(),
message.getMsgCnt(),
- message.getTotalSize());
+ message.getTotalSize(), auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME,
message.getGroupId(),
message.getStreamId(), AgentUtils.getCurrentTime(),
message.getMsgCnt(),
- message.getTotalSize());
+ message.getTotalSize(), auditVersion);
suc = false;
if (retry > maxSenderRetry) {
if (retry % 10 == 0) {
@@ -291,10 +293,10 @@ public class SenderManager {
SenderMessage message = callback.message;
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND,
message.getGroupId(),
message.getStreamId(), message.getDataTime(),
message.getMsgCnt(),
- message.getTotalSize());
+ message.getTotalSize(), auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND_REAL_TIME, message.getGroupId(),
message.getStreamId(),
AgentUtils.getCurrentTime(), message.getMsgCnt(),
- message.getTotalSize());
+ message.getTotalSize(), auditVersion);
sendBatchWithRetryCount(callback.message,
callback.retry + 1);
}
} catch (Exception ex) {
@@ -353,18 +355,18 @@ public class SenderManager {
message.getOffsetAckList().forEach(ack -> ack.setHasAck(true));
getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId,
- dataTime, message.getMsgCnt(), message.getTotalSize());
+ dataTime, message.getMsgCnt(), message.getTotalSize(),
auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, groupId,
streamId,
- AgentUtils.getCurrentTime(), message.getMsgCnt(),
message.getTotalSize());
+ AgentUtils.getCurrentTime(), message.getMsgCnt(),
message.getTotalSize(), auditVersion);
} else {
LOGGER.warn("send groupId {}, streamId {}, taskId {},
instanceId {}, dataTime {} fail with times {}, "
+ "error {}", groupId, streamId, taskId, instanceId,
dataTime, retry, result);
getMetricItem(groupId,
streamId).pluginSendFailCount.addAndGet(msgCnt);
putInResendQueue(new AgentSenderCallback(message, retry));
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, groupId,
streamId,
- dataTime, message.getMsgCnt(), message.getTotalSize());
+ dataTime, message.getMsgCnt(), message.getTotalSize(),
auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, groupId,
streamId,
- AgentUtils.getCurrentTime(), message.getMsgCnt(),
message.getTotalSize());
+ AgentUtils.getCurrentTime(), message.getMsgCnt(),
message.getTotalSize(), auditVersion);
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 09d200a742..dd6d9ce146 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -246,8 +246,16 @@ public class LogFileSource extends AbstractSource {
if (overLen) {
LOGGER.warn("readLines over len finally string len
{}",
new String(baos.toByteArray()).length());
-
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId,
- inlongStreamId,
AgentUtils.getCurrentTime(), 1, maxPackSize);
+ long auditTime = 0;
+ if (isRealTime) {
+ auditTime = AgentUtils.getCurrentTime();
+ } else {
+ auditTime = profile.getSinkDataTime();
+ }
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED, inlongGroupId,
inlongStreamId,
+ auditTime, 1, maxPackSize, auditVersion);
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED_REAL_TIME, inlongGroupId,
+ inlongStreamId,
AgentUtils.getCurrentTime(), 1, maxPackSize, auditVersion);
}
baos.reset();
overLen = false;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 192756abf8..2449d065e4 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -101,10 +101,11 @@ public abstract class AbstractSource implements Source {
protected volatile boolean runnable = true;
protected volatile boolean running = false;
protected String taskId;
+ protected long auditVersion;
protected String instanceId;
protected InstanceProfile profile;
private ExtendedHandler extendedHandler;
- private boolean isRealTime = false;
+ protected boolean isRealTime = false;
protected volatile long emptyCount = 0;
protected int maxPackSize;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
@@ -118,6 +119,7 @@ public abstract class AbstractSource implements Source {
public void init(InstanceProfile profile) {
this.profile = profile;
taskId = profile.getTaskId();
+ auditVersion = Long.parseLong(taskId);
instanceId = profile.getInstanceId();
inlongGroupId = profile.getInlongGroupId();
inlongStreamId = profile.getInlongStreamId();
@@ -333,9 +335,9 @@ public abstract class AbstractSource implements Source {
auditTime = profile.getSinkDataTime();
}
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
header.get(PROXY_KEY_STREAM_ID),
- auditTime, 1, sourceData.getData().length);
+ auditTime, 1, sourceData.getData().length, auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME,
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
- AgentUtils.getCurrentTime(), 1, sourceData.getData().length);
+ AgentUtils.getCurrentTime(), 1, sourceData.getData().length,
auditVersion);
Message finalMsg = new DefaultMessage(sourceData.getData(), header);
// if the message size is greater than max pack size,should drop it.
if (finalMsg.getBody().length > maxPackSize) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index 56a786c6e5..b288ff1946 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -48,12 +48,14 @@ public abstract class AbstractTask extends Task {
protected volatile boolean running = false;
protected boolean initOK = false;
protected long lastPrintTime = 0;
+ protected long auditVersion;
@Override
public void init(Object srcManager, TaskProfile taskProfile, Db basicDb)
throws IOException {
taskManager = (TaskManager) srcManager;
this.taskProfile = taskProfile;
this.basicDb = basicDb;
+ auditVersion = Long.parseLong(taskProfile.getTaskId());
instanceManager = new InstanceManager(taskProfile.getTaskId(),
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
basicDb, taskManager.getTaskDb());
try {
@@ -132,7 +134,7 @@ public abstract class AbstractTask extends Task {
protected void taskHeartbeat() {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT,
taskProfile.getInlongGroupId(),
- taskProfile.getInlongStreamId(), AgentUtils.getCurrentTime(),
1, 1);
+ taskProfile.getInlongStreamId(), AgentUtils.getCurrentTime(),
1, 1, auditVersion);
}