This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 47f33c3ec4 [INLONG-11406][Audit] Provides an interface for
asynchronously flushing Audit data (#11409)
47f33c3ec4 is described below
commit 47f33c3ec4415367feb86b3f2c7cad2e8649616a
Author: doleyzi <[email protected]>
AuthorDate: Fri Oct 25 09:50:07 2024 +0800
[INLONG-11406][Audit] Provides an interface for asynchronously flushing
Audit data (#11409)
---
.../inlong/audit/utils/NamedThreadFactory.java | 36 +++++++++++++++
.../org/apache/inlong/audit/AuditReporterImpl.java | 52 +++++++++++++++++++---
.../org/apache/inlong/audit/send/ProxyManager.java | 7 ++-
.../apache/inlong/audit/send/SenderManager.java | 4 +-
4 files changed, 89 insertions(+), 10 deletions(-)
diff --git
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/NamedThreadFactory.java
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/NamedThreadFactory.java
new file mode 100644
index 0000000000..cd751ec03c
--- /dev/null
+++
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/NamedThreadFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NamedThreadFactory implements ThreadFactory {
+
+ private final String baseName;
+ private final AtomicInteger counter = new AtomicInteger(0);
+
+ public NamedThreadFactory(String baseName) {
+ this.baseName = baseName;
+ }
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ return new Thread(runnable, baseName + "-Thread-" +
counter.getAndIncrement());
+ }
+}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index 7d3f1c5755..bd2c5aae47 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -32,6 +32,7 @@ import org.apache.inlong.audit.util.AuditValues;
import org.apache.inlong.audit.util.Config;
import org.apache.inlong.audit.util.RequestIdUtils;
import org.apache.inlong.audit.util.StatInfo;
+import org.apache.inlong.audit.utils.NamedThreadFactory;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
@@ -76,7 +77,8 @@ public class AuditReporterImpl implements Serializable {
private final ConcurrentHashMap<Long, HashSet<String>> expiredKeyList =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, Long> flushTime = new
ConcurrentHashMap<>();
private final Config config = new Config();
- private final ScheduledExecutorService timeoutExecutor =
Executors.newSingleThreadScheduledExecutor();
+ private final ScheduledExecutorService timerExecutor =
+ Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("inlong-audit-flush"));
private int packageId = 1;
private int dataId = 0;
private volatile boolean initialized = false;
@@ -86,6 +88,7 @@ public class AuditReporterImpl implements Serializable {
private SocketAddressListLoader loader = null;
private int flushStatThreshold = 100;
private boolean autoFlush = true;
+ private boolean enableDebug = false;
private AuditMetric auditMetric = new AuditMetric();
@@ -108,6 +111,14 @@ public class AuditReporterImpl implements Serializable {
this.autoFlush = autoFlush;
}
+ /**
+ * Debug mode supports printing audit details in the log
+ * @param enableDebug
+ */
+ public void setEnableDebug(boolean enableDebug) {
+ this.enableDebug = enableDebug;
+ }
+
/**
* Init
*/
@@ -116,7 +127,7 @@ public class AuditReporterImpl implements Serializable {
return;
}
config.init();
- timeoutExecutor.scheduleWithFixedDelay(new Runnable() {
+ timerExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
@@ -301,6 +312,22 @@ public class AuditReporterImpl implements Serializable {
stat.delay.addAndGet(delayTime);
}
+ /**
+ * Asynchronously flush audit data
+ * @param isolateKey
+ */
+ public synchronized void asyncFlush(long isolateKey) {
+ LOGGER.info("Async flush audit by isolate key: {} ", isolateKey);
+ Runnable task = () -> {
+ try {
+ flush(isolateKey);
+ } catch (Exception e) {
+ LOGGER.error("Async flush audit by isolate key: {}, has
exception: ", isolateKey, e);
+ }
+ };
+ timerExecutor.schedule(task, 0, TimeUnit.MILLISECONDS);
+ }
+
/**
* Flush audit data by default audit version
*/
@@ -314,10 +341,12 @@ public class AuditReporterImpl implements Serializable {
public synchronized void flush(long isolateKey) {
if (flushTime.putIfAbsent(isolateKey, System.currentTimeMillis()) !=
null
|| flushStat.addAndGet(1) > flushStatThreshold) {
+ LOGGER.info("Skip audit flush isolate key: {}, last flush time:
{}, count: {}", isolateKey,
+ flushTime.get(isolateKey), flushStat.get());
return;
}
long startTime = System.currentTimeMillis();
- LOGGER.info("Audit flush isolate key {} ", isolateKey);
+ LOGGER.info("Audit flush isolate key: {} ", isolateKey);
try {
manager.checkFailedData();
@@ -444,7 +473,7 @@ public class AuditReporterImpl implements Serializable {
while (iterator.hasNext()) {
Map.Entry<Long, HashSet<String>> entry = iterator.next();
if (entry.getValue().isEmpty()) {
- LOGGER.info("Remove the key of expired key list: {},isolate
key: {}", entry.getKey(), isolateKey);
+ LOGGER.info("Remove the key of expired key list: {}, isolate
key: {}", entry.getKey(), isolateKey);
iterator.remove();
continue;
}
@@ -528,17 +557,20 @@ public class AuditReporterImpl implements Serializable {
if (dataId++ >= BATCH_NUM) {
dataId = 0;
packageId++;
- sendData(requestBuild);
+ sendData(requestBuild, isolateKey);
}
}
if (requestBuild.getMsgBodyCount() > 0) {
- sendData(requestBuild);
+ sendData(requestBuild, isolateKey);
}
summaryStatMap.get(isolateKey).clear();
}
- private void sendData(AuditApi.AuditRequest.Builder requestBuild) {
+ private void sendData(AuditApi.AuditRequest.Builder requestBuild, long
isolateKey) {
+ if (enableDebug) {
+ LOGGER.info("Send audit data by isolate key: {}, data: {}",
isolateKey, requestBuild);
+ }
requestBuild.setRequestId(RequestIdUtils.nextRequestId());
sendByBaseCommand(requestBuild.build());
auditMetric.addTotalMsg(requestBuild.getMsgBodyCount());
@@ -554,6 +586,7 @@ public class AuditReporterImpl implements Serializable {
flushTime.forEach((key, value) -> {
if ((currentTime - value) > PERIOD) {
flushTime.remove(key);
+ LOGGER.info("Remove audit flush limitation. isolate key: {},
flush time: {}", key, value);
}
});
}
@@ -649,4 +682,9 @@ public class AuditReporterImpl implements Serializable {
public void setMaxGlobalAuditMemory(long maxGlobalAuditMemory) {
SenderManager.setMaxGlobalAuditMemory(maxGlobalAuditMemory);
}
+
+ public void shutdown() {
+ ProxyManager.getInstance().shutdown();
+ timerExecutor.shutdown();
+ }
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
index 127609d2ab..b87c563da3 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
@@ -21,6 +21,7 @@ import org.apache.inlong.audit.entity.AuditComponent;
import org.apache.inlong.audit.entity.AuditProxy;
import org.apache.inlong.audit.entity.CommonResponse;
import org.apache.inlong.audit.utils.HttpUtils;
+import org.apache.inlong.audit.utils.NamedThreadFactory;
import org.apache.inlong.audit.utils.ThreadUtils;
import org.slf4j.Logger;
@@ -41,7 +42,8 @@ public class ProxyManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProxyManager.class);
private static final ProxyManager instance = new ProxyManager();
private final List<String> currentIpPorts = new CopyOnWriteArrayList<>();
- private final ScheduledExecutorService timer =
Executors.newSingleThreadScheduledExecutor();
+ private final ScheduledExecutorService timer =
+ Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("inlong-audit-proxy-manager"));
private final static String GET_AUDIT_PROXY_API_PATH =
"/inlong/manager/openapi/audit/getAuditProxy";
private int timeoutMs = 10000;
private int updateInterval = 60000;
@@ -167,4 +169,7 @@ public class ProxyManager {
}
return new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
}
+ public void shutdown() {
+ timer.shutdown();
+ }
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index f941cbae6d..b795b7aa98 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -80,12 +80,12 @@ public class SenderManager {
try {
InetSocketAddress inetSocketAddress =
ProxyManager.getInstance().getInetSocketAddress();
if (inetSocketAddress == null) {
- LOGGER.error("Audit inet socket address is null!");
+ LOGGER.error("Audit proxy address is null!");
return false;
}
reconnect(inetSocketAddress, auditConfig.getSocketTimeout());
} catch (IOException exception) {
- LOGGER.error("Connect to {} has exception!",
socket.getInetAddress(), exception);
+ LOGGER.error("Connect to audit proxy {} has exception!",
socket.getInetAddress(), exception);
return false;
}
}