This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 18b0bf686f [INLONG-9904][Audit] SDK support checkpoint feature for
Flink job (#9905)
18b0bf686f is described below
commit 18b0bf686fe4339f4c9824c3e0beae202ae73e45
Author: doleyzi <[email protected]>
AuthorDate: Mon Apr 1 23:51:11 2024 +0800
[INLONG-9904][Audit] SDK support checkpoint feature for Flink job (#9905)
---
.../inlong/agent/metrics/audit/AuditUtils.java | 2 +-
.../org/apache/inlong/audit/AuditReporterImpl.java | 296 ++++++++++++++++-----
.../apache/inlong/audit/send/SenderManager.java | 24 +-
.../apache/inlong/audit/util/AuditDimensions.java | 99 +++++++
.../org/apache/inlong/audit/util/AuditValues.java | 58 ++++
inlong-audit/sql/apache_inlong_audit.sql | 1 +
.../inlong/dataproxy/metrics/audit/AuditUtils.java | 2 +-
.../sort/standalone/metrics/audit/AuditUtils.java | 2 +-
.../inlong/sort/base/metric/SinkMetricData.java | 2 +-
.../inlong/sort/base/metric/SourceMetricData.java | 2 +-
.../server/broker/stats/audit/AuditUtils.java | 2 +-
11 files changed, 402 insertions(+), 88 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index fc6f4d90c3..e4cc7691c8 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -107,6 +107,6 @@ public class AuditUtils {
if (!IS_AUDIT) {
return;
}
- AuditOperator.getInstance().send();
+ AuditOperator.getInstance().flush();
}
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index 10c3913dd2..f0f44207d5 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -21,6 +21,8 @@ import org.apache.inlong.audit.loader.SocketAddressListLoader;
import org.apache.inlong.audit.protocol.AuditApi;
import org.apache.inlong.audit.send.SenderManager;
import org.apache.inlong.audit.util.AuditConfig;
+import org.apache.inlong.audit.util.AuditDimensions;
+import org.apache.inlong.audit.util.AuditValues;
import org.apache.inlong.audit.util.Config;
import org.apache.inlong.audit.util.StatInfo;
@@ -33,6 +35,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
@@ -40,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import static
org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST;
@@ -54,20 +58,49 @@ public class AuditReporterImpl implements Serializable {
private static final int BATCH_NUM = 100;
private final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
private static final int PERIOD = 1000 * 60;
- private final ConcurrentHashMap<String, StatInfo> countMap = new
ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, StatInfo> threadCountMap = new
ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new
ConcurrentHashMap<>();
- private final List<String> deleteKeyList = new ArrayList<>();
+ private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>>
preStatMap =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>>
summaryStatMap =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>>
expiredStatMap =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long, List<String>> expiredKeyList = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long, Long> flushTime = new
ConcurrentHashMap<>();
private final Config config = new Config();
private int packageId = 1;
private int dataId = 0;
private boolean initialized = false;
private SenderManager manager;
+ private AtomicInteger flushStat = new AtomicInteger(0);
private final ScheduledExecutorService timeoutExecutor =
Executors.newSingleThreadScheduledExecutor();
private AuditConfig auditConfig = null;
private SocketAddressListLoader loader = null;
+ // Resource isolation key is used in checkpoint and other
scenarios.DEFAULT 0.
+ private static final long DEFAULT_ISOLATE_KEY = 0;
+ private int flushStatThreshold = 100;
+ private boolean autoFlush = true;
+
+ /**
+ * Set stat threshold
+ *
+ * @param flushStatThreshold
+ */
+ public void setFlushStatThreshold(int flushStatThreshold) {
+ this.flushStatThreshold = flushStatThreshold;
+ }
+
+ /**
+ * When the caller needs to isolate resources, please call this method and
pass the parameter true.
+ * For example, in scenarios such as flink checkpoint
+ *
+ * @param autoFlush
+ */
+ public void setAutoFlush(boolean autoFlush) {
+ this.autoFlush = autoFlush;
+ }
+
/**
* Init
*/
@@ -82,7 +115,10 @@ public class AuditReporterImpl implements Serializable {
public void run() {
try {
loadIpPortList();
- send();
+ if (autoFlush) {
+ flush(DEFAULT_ISOLATE_KEY);
+ }
+ checkFlushTime();
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
@@ -198,50 +234,208 @@ public class AuditReporterImpl implements Serializable {
keyJoiner.add(String.valueOf(auditID));
keyJoiner.add(auditTag);
keyJoiner.add(String.valueOf(auditVersion));
- addByKey(keyJoiner.toString(), count, size, delayTime);
+ addByKey(DEFAULT_ISOLATE_KEY, keyJoiner.toString(), count, size,
delayTime);
+ }
+
+ /**
+ * When the caller needs to isolate resources, please call this method.
+ * For example, in scenarios such as flink checkpoint
+ *
+ * @param dimensions
+ * @param values
+ */
+ public void add(AuditDimensions dimensions, AuditValues values) {
+ StringJoiner keyJoiner = new StringJoiner(FIELD_SEPARATORS);
+ keyJoiner.add(String.valueOf(dimensions.getLogTime() / PERIOD));
+ keyJoiner.add(dimensions.getInlongGroupID());
+ keyJoiner.add(dimensions.getInlongStreamID());
+ keyJoiner.add(String.valueOf(dimensions.getAuditID()));
+ keyJoiner.add(dimensions.getAuditTag());
+ keyJoiner.add(String.valueOf(dimensions.getAuditVersion()));
+ addByKey(dimensions.getIsolateKey(), keyJoiner.toString(),
values.getCount(),
+ values.getSize(), values.getDelayTime());
}
/**
* Add audit info by key.
*/
- private void addByKey(String key, long count, long size, long delayTime) {
- if (countMap.get(key) == null) {
- countMap.put(key, new StatInfo(0L, 0L, 0L));
+ private void addByKey(long isolateKey, String statKey, long count, long
size, long delayTime) {
+ if (null == this.preStatMap.get(isolateKey)) {
+ this.preStatMap.putIfAbsent(isolateKey, new ConcurrentHashMap<>());
+ }
+ ConcurrentHashMap<String, StatInfo> statMap =
this.preStatMap.get(isolateKey);
+ if (null == statMap.get(statKey)) {
+ statMap.putIfAbsent(statKey, new StatInfo(0L, 0L, 0L));
}
- countMap.get(key).count.addAndGet(count);
- countMap.get(key).size.addAndGet(size);
- countMap.get(key).delay.addAndGet(delayTime);
+ StatInfo stat = statMap.get(statKey);
+ stat.count.addAndGet(count);
+ stat.size.addAndGet(size);
+ stat.delay.addAndGet(delayTime);
+ }
+
+ /**
+ * Flush audit data by default audit version
+ */
+ public synchronized void flush() {
+ flush(DEFAULT_AUDIT_VERSION);
}
/**
- * Send audit data
+ * Flush audit data
*/
- public synchronized void send() {
+ public synchronized void flush(long isolateKey) {
+ if (flushTime.putIfAbsent(isolateKey, System.currentTimeMillis()) !=
null
+ || flushStat.addAndGet(1) > flushStatThreshold) {
+ return;
+ }
+ LOGGER.info("Audit flush isolate key {} ", isolateKey);
manager.clearBuffer();
resetStat();
- // Retrieve statistics from the list of objects without statistics to
be eliminated
- for (Map.Entry<String, StatInfo> entry :
this.deleteCountMap.entrySet()) {
- this.sumThreadGroup(entry.getKey(), entry.getValue());
+ LOGGER.info("pre stat map size {} {} {} {}", this.preStatMap.size(),
this.expiredStatMap.size(),
+ this.summaryStatMap.size(), this.expiredKeyList.size());
+
+ summaryExpiredStatMap(isolateKey);
+
+ Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>>
iterator = this.preStatMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> entry =
iterator.next();
+ if (entry.getValue().isEmpty()) {
+ LOGGER.info("Remove the key of pre stat map: {},isolate key:
{} ", entry.getKey(), isolateKey);
+ iterator.remove();
+ continue;
+ }
+ if (entry.getKey() > isolateKey) {
+ continue;
+ }
+ summaryPreStatMap(entry.getKey(), entry.getValue());
+ send(entry.getKey());
+
+ }
+
+ clearExpiredKey(isolateKey);
+
+ LOGGER.info("Finish report audit data");
+ }
+
+ /**
+ * Send base command
+ */
+ private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
+ AuditApi.BaseCommand.Builder baseCommand =
AuditApi.BaseCommand.newBuilder();
+
baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build();
+ manager.send(baseCommand.build(), auditRequest);
+ }
+
+ /**
+ * Summary
+ */
+ private void sumThreadGroup(long isolateKey, String key, StatInfo
statInfo) {
+ long count = statInfo.count.getAndSet(0);
+ if (0 == count) {
+ return;
+ }
+ ConcurrentHashMap<String, StatInfo> sumMap =
+ this.summaryStatMap.computeIfAbsent(isolateKey, k -> new
ConcurrentHashMap<>());
+ StatInfo stat = sumMap.computeIfAbsent(key, k -> new StatInfo(0L, 0L,
0L));
+ stat.count.addAndGet(count);
+ stat.size.addAndGet(statInfo.size.getAndSet(0));
+ stat.delay.addAndGet(statInfo.delay.getAndSet(0));
+ }
+
+ /**
+ * Reset statistics
+ */
+ private void resetStat() {
+ dataId = 0;
+ packageId = 1;
+ }
+
+ /**
+ * Summary expired stat map
+ */
+ private void summaryExpiredStatMap(long isolateKey) {
+ Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>>
iterator =
+ this.expiredStatMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> entry =
iterator.next();
+ if (entry.getValue().isEmpty()) {
+ LOGGER.info("Remove the key of expired stat map: {},isolate
key: {} ", entry.getKey(), isolateKey);
+ iterator.remove();
+ continue;
+ }
+ if (entry.getKey() > isolateKey) {
+ continue;
+ }
+ for (Map.Entry<String, StatInfo> statInfo :
entry.getValue().entrySet()) {
+ this.sumThreadGroup(isolateKey, statInfo.getKey(),
statInfo.getValue());
+ }
+ entry.getValue().clear();
}
- this.deleteCountMap.clear();
- for (Map.Entry<String, StatInfo> entry : countMap.entrySet()) {
+ }
+
+ /**
+ * Summary pre stat map
+ */
+ private void summaryPreStatMap(long isolateKey, ConcurrentHashMap<String,
StatInfo> statInfo) {
+ List<String> expiredKeys =
this.expiredKeyList.computeIfAbsent(isolateKey, k -> new ArrayList<>());
+
+ for (Map.Entry<String, StatInfo> entry : statInfo.entrySet()) {
String key = entry.getKey();
StatInfo value = entry.getValue();
// If there is no data, enter the list to be eliminated
if (value.count.get() == 0) {
- this.deleteKeyList.add(key);
+ if (!expiredKeys.contains(key)) {
+ expiredKeys.add(key);
+ }
continue;
}
- this.sumThreadGroup(key, value);
+ sumThreadGroup(isolateKey, key, value);
}
+ }
- // Clean up obsolete statistical data objects
- for (String key : this.deleteKeyList) {
- StatInfo value = this.countMap.remove(key);
- this.deleteCountMap.put(key, value);
+ /**
+ * Clear expired key
+ */
+ private void clearExpiredKey(long isolateKey) {
+ Iterator<Map.Entry<Long, List<String>>> iterator =
+ this.expiredKeyList.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, List<String>> entry = iterator.next();
+ if (entry.getValue().isEmpty()) {
+ LOGGER.info("Remove the key of expired key list: {},isolate
key: {}", entry.getKey(), isolateKey);
+ iterator.remove();
+ continue;
+ }
+ if (entry.getKey() > isolateKey) {
+ continue;
+ }
+
+ ConcurrentHashMap<String, StatInfo> preStatInfo =
this.preStatMap.get(entry.getKey());
+ if (null == preStatInfo) {
+ iterator.remove();
+ continue;
+ }
+ ConcurrentHashMap<String, StatInfo> deleteMap =
+ this.expiredStatMap.computeIfAbsent(entry.getKey(), k ->
new ConcurrentHashMap<>());
+ for (String key : entry.getValue()) {
+ StatInfo value = preStatInfo.remove(key);
+ deleteMap.put(key, value);
+ }
+ entry.getValue().clear();
}
- this.deleteKeyList.clear();
+ }
+ /**
+ * Send Audit data
+ */
+ private void send(long isolateKey) {
+ if (null == summaryStatMap.get(isolateKey)) {
+ return;
+ }
+ if (summaryStatMap.get(isolateKey).isEmpty()) {
+ summaryStatMap.remove(isolateKey);
+ return;
+ }
long sdkTime = Calendar.getInstance().getTimeInMillis();
AuditApi.AuditMessageHeader msgHeader =
AuditApi.AuditMessageHeader.newBuilder()
.setIp(config.getLocalIP()).setDockerId(config.getDockerId())
@@ -250,9 +444,8 @@ public class AuditReporterImpl implements Serializable {
.build();
AuditApi.AuditRequest.Builder requestBuild =
AuditApi.AuditRequest.newBuilder();
requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId());
-
// Process the stat info for all threads
- for (Map.Entry<String, StatInfo> entry : threadCountMap.entrySet()) {
+ for (Map.Entry<String, StatInfo> entry :
summaryStatMap.get(isolateKey).entrySet()) {
// Entry key order: logTime inlongGroupID inlongStreamID auditID
auditTag auditVersion
String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
long logTime = Long.parseLong(keyArray[0]) * PERIOD;
@@ -282,49 +475,24 @@ public class AuditReporterImpl implements Serializable {
requestBuild.clearMsgBody();
}
}
+
if (requestBuild.getMsgBodyCount() > 0) {
sendByBaseCommand(requestBuild.build());
requestBuild.clearMsgBody();
}
- threadCountMap.clear();
-
- LOGGER.info("Finish report audit data");
- }
-
- /**
- * Send base command
- */
- private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
- AuditApi.BaseCommand.Builder baseCommand =
AuditApi.BaseCommand.newBuilder();
-
baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build();
- manager.send(baseCommand.build(), auditRequest);
+ summaryStatMap.get(isolateKey).clear();
}
/**
- * Summary
+ * Check flush time
*/
- private void sumThreadGroup(String key, StatInfo statInfo) {
- long count = statInfo.count.getAndSet(0);
- if (0 == count) {
- return;
- }
- if (threadCountMap.get(key) == null) {
- threadCountMap.put(key, new StatInfo(0, 0, 0));
- }
-
- long size = statInfo.size.getAndSet(0);
- long delay = statInfo.delay.getAndSet(0);
-
- threadCountMap.get(key).count.addAndGet(count);
- threadCountMap.get(key).size.addAndGet(size);
- threadCountMap.get(key).delay.addAndGet(delay);
- }
-
- /**
- * Reset statistics
- */
- private void resetStat() {
- dataId = 0;
- packageId = 1;
+ private void checkFlushTime() {
+ flushStat.set(0);
+ long currentTime = Calendar.getInstance().getTimeInMillis();
+ flushTime.forEach((key, value) -> {
+ if ((currentTime - value) > PERIOD) {
+ flushTime.remove(key);
+ }
+ });
}
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index f3a573c4f9..7b886021a1 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -139,24 +139,7 @@ public class SenderManager {
// cache first
Long requestId = baseCommand.getAuditRequest().getRequestId();
this.dataMap.putIfAbsent(requestId, data);
- requestIdQueue.offer(requestId);
this.sendData(data.getDataByte());
- // resend
- long newTime = System.currentTimeMillis() - 10000;
- if (newTime > lastCheckTime) {
- for (int i = 0; i < requestIdQueue.size(); i++) {
- Long current = requestIdQueue.poll();
- AuditData auditData = this.dataMap.get(current);
- if (auditData == null) {
- continue;
- } else {
- requestIdQueue.offer(current);
- if (newTime > auditData.getSendTime()) {
- this.sendData(auditData.getDataByte());
- }
- }
- }
- }
}
/**
@@ -179,7 +162,7 @@ public class SenderManager {
* Clean up the backlog of unsent message packets
*/
public void clearBuffer() {
- LOG.info("audit failed cache size: {}", this.dataMap.size());
+ LOG.info("Audit failed cache size: {}", this.dataMap.size());
for (AuditData data : this.dataMap.values()) {
this.sendData(data.getDataByte());
this.sleep();
@@ -286,6 +269,11 @@ public class SenderManager {
if (data == null) {
LOG.error("Can not find the request id onMessageReceived
{},message: {}",
requestId, baseCommand.getAuditReply().getMessage());
+ if (LOG.isDebugEnabled()) {
+ for (Map.Entry<Long, AuditData> entry :
this.dataMap.entrySet()) {
+ LOG.debug("Data map key:{},request id:{}",
entry.getKey(), requestId);
+ }
+ }
return;
}
// check resp
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditDimensions.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditDimensions.java
new file mode 100644
index 0000000000..9a8741a6a9
--- /dev/null
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditDimensions.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+/**
+ * Audit dimensions
+ */
+public class AuditDimensions {
+
+ private int auditID;
+ private long logTime;
+ private long auditVersion;
+ private long isolateKey;
+ private String auditTag;
+ private String inlongGroupID;
+ private String inlongStreamID;
+
+ public AuditDimensions(int auditID, long logTime, long auditVersion, long
isolateKey, String auditTag,
+ String inlongGroupID, String inlongStreamID) {
+ this.auditID = auditID;
+ this.logTime = logTime;
+ this.auditVersion = auditVersion;
+ this.isolateKey = isolateKey;
+ this.auditTag = auditTag;
+ this.inlongGroupID = inlongGroupID;
+ this.inlongStreamID = inlongStreamID;
+ }
+
+ public int getAuditID() {
+ return auditID;
+ }
+
+ public void setAuditID(int auditID) {
+ this.auditID = auditID;
+ }
+
+ public long getLogTime() {
+ return logTime;
+ }
+
+ public void setLogTime(long logTime) {
+ this.logTime = logTime;
+ }
+
+ public long getAuditVersion() {
+ return auditVersion;
+ }
+
+ public void setAuditVersion(long auditVersion) {
+ this.auditVersion = auditVersion;
+ }
+
+ public long getIsolateKey() {
+ return isolateKey;
+ }
+
+ public void setIsolateKey(long isolateKey) {
+ this.isolateKey = isolateKey;
+ }
+
+ public String getAuditTag() {
+ return auditTag;
+ }
+
+ public void setAuditTag(String auditTag) {
+ this.auditTag = auditTag;
+ }
+
+ public String getInlongGroupID() {
+ return inlongGroupID;
+ }
+
+ public void setInlongGroupID(String inlongGroupID) {
+ this.inlongGroupID = inlongGroupID;
+ }
+
+ public String getInlongStreamID() {
+ return inlongStreamID;
+ }
+
+ public void setInlongStreamID(String inlongStreamID) {
+ this.inlongStreamID = inlongStreamID;
+ }
+}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditValues.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditValues.java
new file mode 100644
index 0000000000..d5b74856bd
--- /dev/null
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditValues.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+/**
+ * Audit values
+ */
+public class AuditValues {
+
+ private long count;
+ private long size;
+ private long delayTime;
+
+ public AuditValues(long count, long size, long delayTime) {
+ this.count = count;
+ this.size = size;
+ this.delayTime = delayTime;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public void setCount(long count) {
+ this.count = count;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public void setSize(long size) {
+ this.size = size;
+ }
+
+ public long getDelayTime() {
+ return delayTime;
+ }
+
+ public void setDelayTime(long delayTime) {
+ this.delayTime = delayTime;
+ }
+}
diff --git a/inlong-audit/sql/apache_inlong_audit.sql
b/inlong-audit/sql/apache_inlong_audit.sql
index 2d10ca697c..a2b007534a 100644
--- a/inlong-audit/sql/apache_inlong_audit.sql
+++ b/inlong-audit/sql/apache_inlong_audit.sql
@@ -43,6 +43,7 @@ CREATE TABLE IF NOT EXISTS `audit_data`
`inlong_stream_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target
inlong stream id',
`audit_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'Audit id',
`audit_tag` varchar(100) DEFAULT '' COMMENT 'Audit tag',
+ `audit_version` BIGINT DEFAULT -1 COMMENT 'Audit version',
`count` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message
count',
`size` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message
size',
`delay` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message
delay count',
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 7bd27521c6..371a4de0c9 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -123,6 +123,6 @@ public class AuditUtils {
* Send audit data
*/
public static void send() {
- AuditOperator.getInstance().send();
+ AuditOperator.getInstance().flush();
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
index 795d505c1f..f5bd2ebbc2 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
@@ -104,6 +104,6 @@ public class AuditUtils {
* Send audit data
*/
public static void send() {
- AuditOperator.getInstance().send();
+ AuditOperator.getInstance().flush();
}
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 1b09343b5c..7270becdb7 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -285,7 +285,7 @@ public class SinkMetricData implements MetricData,
Serializable {
*/
public void flushAuditData() {
if (auditOperator != null) {
- auditOperator.send();
+ auditOperator.flush();
}
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 91abcf22aa..0568efd7c3 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -341,7 +341,7 @@ public class SourceMetricData implements MetricData,
Serializable {
*/
public void flushAuditData() {
if (auditOperator != null) {
- auditOperator.send();
+ auditOperator.flush();
}
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
index 33b0d9f165..0e0c287739 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
@@ -112,6 +112,6 @@ public class AuditUtils {
if (!auditConfig.isAuditEnable()) {
return;
}
- AuditOperator.getInstance().send();
+ AuditOperator.getInstance().flush();
}
}