This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
new a0e85ae559 [enhancement](AuditLoaderPlugin): add audit queue capacity
configurat… (#12887)
a0e85ae559 is described below
commit a0e85ae559e3a011d1d160ebbc776393cfff711a
Author: wxy <[email protected]>
AuthorDate: Tue Sep 27 08:50:30 2022 +0800
[enhancement](AuditLoaderPlugin): add audit queue capacity configurat…
(#12887)
---
fe_plugins/auditloader/src/main/assembly/plugin.conf | 3 +++
.../apache/doris/plugin/audit/AuditLoaderPlugin.java | 17 ++++++++++++-----
2 files changed, 15 insertions(+), 5 deletions(-)
diff --git a/fe_plugins/auditloader/src/main/assembly/plugin.conf
b/fe_plugins/auditloader/src/main/assembly/plugin.conf
index 4013920938..14fdd32b98 100755
--- a/fe_plugins/auditloader/src/main/assembly/plugin.conf
+++ b/fe_plugins/auditloader/src/main/assembly/plugin.conf
@@ -26,6 +26,9 @@ max_batch_interval_sec=60
# the max stmt length to be loaded in audit table, default is 4096
max_stmt_length=4096
+# the capacity of audit queue, default is 1000
+max_queue_size=1000
+
# Doris FE host for loading the audit, default is 127.0.0.1:8030.
# this should be the host port for stream load
frontend_host_port=127.0.0.1:8030
diff --git
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index ea5c05ee66..8ebd5ac6dd 100755
---
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -17,6 +17,7 @@
package org.apache.doris.plugin.audit;
+import com.google.common.collect.Queues;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.AuditPlugin;
import org.apache.doris.plugin.Plugin;
@@ -42,7 +43,6 @@ import java.util.Date;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -53,12 +53,13 @@ import java.util.stream.Collectors;
public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
private final static Logger LOG =
LogManager.getLogger(AuditLoaderPlugin.class);
- private static SimpleDateFormat DATETIME_FORMAT = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ private static final ThreadLocal<SimpleDateFormat> dateFormatContainer =
ThreadLocal.withInitial(
+ () -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
private StringBuilder auditBuffer = new StringBuilder();
private long lastLoadTime = 0;
- private BlockingQueue<AuditEvent> auditEventQueue = new
LinkedBlockingDeque<AuditEvent>(1);
+ private BlockingQueue<AuditEvent> auditEventQueue;
private DorisStreamLoader streamLoader;
private Thread loadThread;
@@ -78,6 +79,7 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
loadConfig(ctx, info.getProperties());
+ this.auditEventQueue =
Queues.newLinkedBlockingDeque(conf.maxQueueSize);
this.streamLoader = new DorisStreamLoader(conf);
this.loadThread = new Thread(new LoadWorker(this.streamLoader),
"audit loader thread");
this.loadThread.start();
@@ -206,6 +208,7 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
public static class AuditLoaderConf {
public static final String PROP_MAX_BATCH_SIZE = "max_batch_size";
public static final String PROP_MAX_BATCH_INTERVAL_SEC =
"max_batch_interval_sec";
+ public static final String PROP_MAX_QUEUE_SIZE = "max_queue_size";
public static final String PROP_FRONTEND_HOST_PORT =
"frontend_host_port";
public static final String PROP_USER = "user";
public static final String PROP_PASSWORD = "password";
@@ -216,6 +219,7 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
public long maxBatchSize = 50 * 1024 * 1024;
public long maxBatchIntervalSec = 60;
+ public int maxQueueSize = 1000;
public String frontendHostPort = "127.0.0.1:8030";
public String user = "root";
public String password = "";
@@ -233,6 +237,9 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
if (properties.containsKey(PROP_MAX_BATCH_INTERVAL_SEC)) {
maxBatchIntervalSec =
Long.valueOf(properties.get(PROP_MAX_BATCH_INTERVAL_SEC));
}
+ if (properties.containsKey(PROP_MAX_QUEUE_SIZE)) {
+ maxQueueSize =
Integer.valueOf(properties.get(PROP_MAX_QUEUE_SIZE));
+ }
if (properties.containsKey(PROP_FRONTEND_HOST_PORT)) {
frontendHostPort = properties.get(PROP_FRONTEND_HOST_PORT);
}
@@ -281,10 +288,10 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
}
}
- public static synchronized String longToTimeString(long timeStamp) {
+ public static String longToTimeString(long timeStamp) {
if (timeStamp <= 0L) {
return "1900-01-01 00:00:00";
}
- return DATETIME_FORMAT.format(new Date(timeStamp));
+ return dateFormatContainer.get().format(new Date(timeStamp));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]