This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b1fe567275f [feat](audit log) add queue_time_ms in audit_log (#60418)
b1fe567275f is described below
commit b1fe567275fdd50992a3a05c048b0db327fb101b
Author: Fy <[email protected]>
AuthorDate: Tue Feb 24 18:47:10 2026 +0800
[feat](audit log) add queue_time_ms in audit_log (#60418)
Add a queue_time_ms column to table __internal_schema.audit_log to track
per-query queueing behavior and elapsed time.
---
.../org/apache/doris/catalog/InternalSchema.java | 2 +
.../java/org/apache/doris/plugin/AuditEvent.java | 7 ++
.../org/apache/doris/plugin/audit/AuditLoader.java | 1 +
.../java/org/apache/doris/qe/AuditLogHelper.java | 12 ++
.../apache/doris/qe/AuditEventProcessorTest.java | 2 +
.../data/audit/test_audit_log_behavior.out | 1 +
.../suites/audit/test_audit_log_queue_time.groovy | 129 +++++++++++++++++++++
7 files changed, 154 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
index 955e021fa44..a7ae010f863 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
@@ -163,6 +163,8 @@ public class InternalSchema {
// execution info
AUDIT_SCHEMA.add(new ColumnDef("query_time",
ScalarType.createType(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("queue_time_ms",
+ ScalarType.createType(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
AUDIT_SCHEMA.add(new ColumnDef("cpu_time_ms",
ScalarType.createType(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
AUDIT_SCHEMA.add(new ColumnDef("peak_memory_bytes",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 3b08dc382d4..c54c32f6a18 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -84,6 +84,8 @@ public class AuditEvent {
// execution info
@AuditField(value = "Time(ms)", colName = "query_time")
public long queryTime = -1;
+ @AuditField(value = "QueueTimeMs", colName = "queue_time_ms")
+ public long queueTimeMs = -1;
@AuditField(value = "CpuTimeMS", colName = "cpu_time_ms")
public long cpuTimeMs = -1;
@AuditField(value = "PeakMemoryBytes", colName = "peak_memory_bytes")
@@ -231,6 +233,11 @@ public class AuditEvent {
return this;
}
+ public AuditEventBuilder setQueueTimeMs(long queueTimeMs) {
+ auditEvent.queueTimeMs = queueTimeMs;
+ return this;
+ }
+
public AuditEventBuilder setScanBytes(long scanBytes) {
auditEvent.scanBytes = scanBytes;
return this;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index b4b8195b608..6f43ecdd574 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -168,6 +168,7 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
// execution info
logBuffer.append(event.queryTime).append(AUDIT_TABLE_COL_SEPARATOR);
+ logBuffer.append(event.queueTimeMs).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.cpuTimeMs).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.peakMemoryBytes).append(AUDIT_TABLE_COL_SEPARATOR);
logBuffer.append(event.scanBytes).append(AUDIT_TABLE_COL_SEPARATOR);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 8de74fd6fbe..2c7a35b994f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -47,6 +47,7 @@ import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.resource.workloadgroup.QueueToken;
import org.apache.doris.service.FrontendOptions;
import com.google.common.base.Strings;
@@ -234,6 +235,8 @@ public class AuditLogHelper {
}
String cluster = Config.isCloudMode() ? cloudCluster : "";
String stmtType = getStmtType(parsedStmt);
+ long queueTimeMs = getQueueTimeMs(ctx);
+
AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
// ATTN: MUST reset, otherwise, the same AuditEventBuilder instance
will be used in the next query.
@@ -252,6 +255,7 @@ public class AuditLogHelper {
.setErrorMessage((ctx.getState().getErrorMessage() == null ?
"" :
ctx.getState().getErrorMessage().replace("\n", "
").replace("\t", " ")))
.setQueryTime(elapseMs)
+ .setQueueTimeMs(queueTimeMs)
.setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
.setPeakMemoryBytes(statistics == null ? 0 :
statistics.getMaxPeakMemoryBytes())
.setScanBytes(statistics == null ? 0 :
statistics.getScanBytes())
@@ -447,6 +451,14 @@ public class AuditLogHelper {
}
}
+ private static long getQueueTimeMs(ConnectContext ctx) {
+ QueueToken queueToken = null;
+ if (ctx.getExecutor() != null && ctx.getExecutor().getCoord() != null)
{
+ queueToken = ctx.getExecutor().getCoord().getQueueToken();
+ }
+ return queueToken == null ? -1 : queueToken.getQueueEndTime() -
queueToken.getQueueStartTime();
+ }
+
private static String getStmtType(StatementBase stmt) {
if (stmt == null) {
return StmtType.OTHER.name();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java
index f80b485609c..884f9a5badf 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java
@@ -58,6 +58,7 @@ public class AuditEventProcessorTest {
.setDb("db1")
.setState("EOF")
.setQueryTime(2000)
+ .setQueueTimeMs(2000)
.setScanBytes(100000)
.setScanRows(200000)
.setReturnRows(1)
@@ -68,6 +69,7 @@ public class AuditEventProcessorTest {
Assert.assertEquals("127.0.0.1", event.clientIp);
Assert.assertEquals(200000, event.scanRows);
Assert.assertEquals("SELECT", event.stmtType);
+ Assert.assertEquals(2000, event.queueTimeMs);
}
@Test
diff --git a/regression-test/data/audit/test_audit_log_behavior.out
b/regression-test/data/audit/test_audit_log_behavior.out
index bfb8d22da9d..f9b8c96323c 100644
--- a/regression-test/data/audit/test_audit_log_behavior.out
+++ b/regression-test/data/audit/test_audit_log_behavior.out
@@ -11,6 +11,7 @@ state varchar(128) Yes false \N NONE
error_code int Yes false \N NONE
error_message text Yes false \N NONE
query_time bigint Yes false \N NONE
+queue_time_ms bigint Yes false \N NONE
cpu_time_ms bigint Yes false \N NONE
peak_memory_bytes bigint Yes false \N NONE
scan_bytes bigint Yes false \N NONE
diff --git a/regression-test/suites/audit/test_audit_log_queue_time.groovy
b/regression-test/suites/audit/test_audit_log_queue_time.groovy
new file mode 100644
index 00000000000..e40d1688c42
--- /dev/null
+++ b/regression-test/suites/audit/test_audit_log_queue_time.groovy
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_audit_log_queue_time", "nonConcurrent") {
+ // Check admin privilege
+ try {
+ sql "set global enable_audit_plugin = true"
+ } catch (Exception e) {
+ log.warn("skip this case, because " + e.getMessage())
+ assertTrue(e.getMessage().toUpperCase().contains("ADMIN"))
+ return
+ }
+
+ def tableName = "audit_queue_time_test"
+ def wgName = "test_queue_time_wg"
+ def testMarker = UUID.randomUUID().toString().substring(0, 8)
+
+ // Cleanup environment
+ sql "drop table if exists ${tableName}"
+ sql "drop workload group if exists ${wgName}"
+
+ // Create test table
+ sql """
+ CREATE TABLE `${tableName}` (
+ `id` bigint,
+ `name` varchar(32)
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+ """
+
+ sql "insert into ${tableName} values (1, 'test')"
+
+ def maxConcurrency = 1
+ // Create workload group: max_concurrency=1 ensures queries queue up
+ sql """
+ create workload group ${wgName}
+ properties (
+ 'max_concurrency' = '${maxConcurrency}',
+ 'max_queue_size' = '10',
+ 'queue_timeout' = '30000'
+ )
+ """
+
+ // Wait for workload group to take effect
+ Thread.sleep(5000)
+
+ // Truncate audit_log for easier testing
+ sql "truncate table __internal_schema.audit_log"
+
+ // Submit concurrent queries with marker for later lookup
+ def sqlSleepTime = 5
+ def queuedSqlCnt = 1
+ def threads = []
+ for (int i = 0; i < maxConcurrency + queuedSqlCnt; i++) {
+ def idx = i
+ threads << Thread.start {
+ try {
+ sql "set workload_group=${wgName}"
+ // Use sleep function to simulate long query, ensuring
subsequent queries need to queue
+ sql """
+ select sleep(${sqlSleepTime}), '${testMarker}_${idx}' as
marker
+ from ${tableName} limit 1
+ """
+ } catch (Exception e) {
+ log.warn("Query ${idx} failed: ${e.getMessage()}")
+ }
+ }
+ }
+
+ // Wait for all queries to complete
+ threads.each { it.join() }
+
+ // Wait for audit log to flush
+ Thread.sleep(5000)
+ sql "call flush_audit_log()"
+ Thread.sleep(5000)
+
+ // Verify queue_time_ms column exists
+ def schemaResult = sql "desc internal.__internal_schema.audit_log"
+ def hasQueueTimeMs = schemaResult.any { it[0] == "queue_time_ms" }
+ assertTrue(hasQueueTimeMs)
+
+ // check result
+ def retry = 10
+ def query = """
+ select query_id, queue_time_ms, stmt
+ from __internal_schema.audit_log
+ where stmt like '%${testMarker}%'
+ and queue_time_ms > 0
+ order by time
+ """
+ def auditResult = sql "${query}"
+
+ while (auditResult.isEmpty()) {
+ if (retry-- < 0) {
+ throw new RuntimeException("It has retried a few but still failed,
you need to check it")
+ }
+ sql "call flush_audit_log()"
+ sleep(3000)
+ auditResult = sql "${query}"
+ }
+
+ auditResult.each { row ->
+ assertTrue(row[1] >= sqlSleepTime * 1000)
+ }
+
+ assertTrue(auditResult.size() >= queuedSqlCnt)
+
+ // Cleanup
+ sql "drop table if exists ${tableName}"
+ sql "drop workload group if exists ${wgName}"
+ sql "set global enable_audit_plugin = false"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]