This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 381cbe5285 [INLONG-10728][Audit] Add global memory control for the
Audit SDK (#10733)
381cbe5285 is described below
commit 381cbe5285deaa71ef4eaea5b5a2abed9eaa6bd6
Author: doleyzi <[email protected]>
AuthorDate: Tue Jul 30 09:44:28 2024 +0800
[INLONG-10728][Audit] Add global memory control for the Audit SDK (#10733)
---
.../org/apache/inlong/audit/AuditReporterImpl.java | 72 +++++++++++++++-------
.../apache/inlong/audit/entity/AuditMetric.java | 6 ++
.../apache/inlong/audit/send/SenderManager.java | 35 +++++++++--
3 files changed, 85 insertions(+), 28 deletions(-)
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index b4cc8028f0..e8a38da28f 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -310,33 +310,40 @@ public class AuditReporterImpl implements Serializable {
}
long startTime = System.currentTimeMillis();
LOGGER.info("Audit flush isolate key {} ", isolateKey);
- manager.checkFailedData();
- resetStat();
- summaryExpiredStatMap(isolateKey);
-
- Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>>
iterator = this.preStatMap.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> entry =
iterator.next();
- if (entry.getValue().isEmpty()) {
- LOGGER.info("Remove the key of pre stat map: {},isolate key:
{} ", entry.getKey(), isolateKey);
- iterator.remove();
- continue;
- }
- if (entry.getKey() > isolateKey) {
- continue;
+ try {
+ manager.checkFailedData();
+ resetStat();
+
+ summaryExpiredStatMap(isolateKey);
+
+ Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>>
iterator =
+ this.preStatMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> entry =
iterator.next();
+ if (entry.getValue().isEmpty()) {
+ LOGGER.info("Remove the key of pre stat map: {},isolate
key: {} ", entry.getKey(), isolateKey);
+ iterator.remove();
+ continue;
+ }
+ if (entry.getKey() > isolateKey) {
+ continue;
+ }
+ summaryPreStatMap(entry.getKey(), entry.getValue());
+ send(entry.getKey());
}
- summaryPreStatMap(entry.getKey(), entry.getValue());
- send(entry.getKey());
+ clearExpiredKey(isolateKey);
+ } catch (Exception exception) {
+ LOGGER.error("Flush audit has exception!", exception);
+ } finally {
+ manager.closeSocket();
}
- clearExpiredKey(isolateKey);
-
- manager.closeSocket();
-
- LOGGER.info("Success report {} package, Failed report {} package,
total {} message, cost: {} ms",
+ LOGGER.info(
+ "Success report {} package, Failed report {} package, total {}
message, memory size {}, cost: {} ms",
auditMetric.getSuccessPack(), auditMetric.getFailedPack(),
auditMetric.getTotalMsg(),
+ auditMetric.getMemorySize(),
System.currentTimeMillis() - startTime);
auditMetric.reset();
@@ -475,12 +482,25 @@ public class AuditReporterImpl implements Serializable {
for (Map.Entry<String, StatInfo> entry :
summaryStatMap.get(isolateKey).entrySet()) {
// Entry key order: logTime inlongGroupID inlongStreamID auditID
auditTag auditVersion
String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
- long logTime = Long.parseLong(keyArray[0]) * PERIOD;
+ if (keyArray.length < 6) {
+ LOGGER.error("Number of keys {} <6", keyArray.length);
+ continue;
+ }
+
+ long logTime;
+ long auditVersion;
+ try {
+ logTime = Long.parseLong(keyArray[0]) * PERIOD;
+ auditVersion = Long.parseLong(keyArray[5]);
+ } catch (NumberFormatException numberFormatException) {
+ LOGGER.error("Failed to parse long from string",
numberFormatException);
+ continue;
+ }
+
String inlongGroupID = keyArray[1];
String inlongStreamID = keyArray[2];
String auditID = keyArray[3];
String auditTag = keyArray[4];
- long auditVersion = Long.parseLong(keyArray[5]);
StatInfo value = entry.getValue();
AuditApi.AuditMessageBody msgBody =
AuditApi.AuditMessageBody.newBuilder()
.setLogTs(logTime)
@@ -495,6 +515,8 @@ public class AuditReporterImpl implements Serializable {
.build();
requestBuild.addMsgBody(msgBody);
+ auditMetric.addMemorySize(msgBody.toByteArray().length);
+
if (dataId++ >= BATCH_NUM) {
dataId = 0;
packageId++;
@@ -615,4 +637,8 @@ public class AuditReporterImpl implements Serializable {
public void setUpdateInterval(int updateInterval) {
ProxyManager.getInstance().setUpdateInterval(updateInterval);
}
+
+ public void setMaxGlobalAuditMemory(long maxGlobalAuditMemory) {
+ SenderManager.setMaxGlobalAuditMemory(maxGlobalAuditMemory);
+ }
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
index 000642bd64..05ac91a002 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
@@ -29,6 +29,7 @@ public class AuditMetric {
private Long successPack = 0L;
private Long failedPack = 0L;
private Long totalMsg = 0L;
+ private Long memorySize = 0L;
public void addSuccessPack(long successPack) {
this.successPack += successPack;
@@ -42,9 +43,14 @@ public class AuditMetric {
this.totalMsg += totalMsg;
}
+ public void addMemorySize(long memorySize) {
+ this.memorySize += memorySize;
+ }
+
public void reset() {
successPack = 0L;
failedPack = 0L;
totalMsg = 0L;
+ memorySize = 0L;
}
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index 4da90f9f0d..f941cbae6d 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -37,6 +37,7 @@ import java.net.Socket;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Audit sender manager
@@ -50,6 +51,12 @@ public class SenderManager {
private Socket socket = new Socket();
private static final int PACKAGE_HEADER_LEN = 4;
private static final int MAX_RESPONSE_LENGTH = 32 * 1024;
+ private static final AtomicLong globalAuditMemory = new AtomicLong(0);
+ private static long maxGlobalAuditMemory = 200 * 1024 * 1024;
+
+ public static void setMaxGlobalAuditMemory(long maxGlobalAuditMemory) {
+ SenderManager.maxGlobalAuditMemory = maxGlobalAuditMemory;
+ }
public SenderManager(AuditConfig config) {
auditConfig = config;
@@ -179,11 +186,22 @@ public class SenderManager {
if (failedDataMap.isEmpty()) {
checkAuditFile();
}
- if (failedDataMap.size() > auditConfig.getMaxCacheRow()) {
- LOGGER.info("Failed cache size: {} > {}", failedDataMap.size(),
auditConfig.getMaxCacheRow());
+
+ long failedDataSize = getFailedDataSize();
+ globalAuditMemory.addAndGet(failedDataSize);
+
+ if (failedDataMap.size() > auditConfig.getMaxCacheRow()
+ || globalAuditMemory.get() > maxGlobalAuditMemory) {
+ LOGGER.warn("Failed cache [size: {}, threshold {}], [count {},
threshold: {}]",
+ globalAuditMemory.get(), maxGlobalAuditMemory,
+ failedDataMap.size(), auditConfig.getMaxCacheRow());
+
writeLocalFile();
+
failedDataMap.clear();
}
+
+ globalAuditMemory.addAndGet(-failedDataSize);
}
/**
@@ -255,10 +273,9 @@ public class SenderManager {
.readObject();
for (Map.Entry<Long, AuditData> entry : fileData.entrySet()) {
- if (failedDataMap.size() < (auditConfig.getMaxCacheRow() / 2))
{
- failedDataMap.putIfAbsent(entry.getKey(),
entry.getValue());
+ if (!sendData(entry.getValue().getDataByte())) {
+ LOGGER.error("Local file recovery failed: {}",
entry.getValue());
}
- sendData(entry.getValue().getDataByte());
sleep();
}
} catch (IOException | ClassNotFoundException e) {
@@ -294,4 +311,12 @@ public class SenderManager {
public void setAuditConfig(AuditConfig config) {
auditConfig = config;
}
+
+ private long getFailedDataSize() {
+ long dataSize = 0;
+ for (AuditData auditData : failedDataMap.values()) {
+ dataSize += auditData.getDataByte().length;
+ }
+ return dataSize;
+ }
}