This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b1fe567275f [feat](audit log) add queue_time_ms in audit_log (#60418)
b1fe567275f is described below

commit b1fe567275fdd50992a3a05c048b0db327fb101b
Author: Fy <[email protected]>
AuthorDate: Tue Feb 24 18:47:10 2026 +0800

    [feat](audit log) add queue_time_ms in audit_log (#60418)
    
    Add a queue_time_ms column to table __internal_schema.audit_log to track
    per-query queueing behavior and elapsed time.
---
 .../org/apache/doris/catalog/InternalSchema.java   |   2 +
 .../java/org/apache/doris/plugin/AuditEvent.java   |   7 ++
 .../org/apache/doris/plugin/audit/AuditLoader.java |   1 +
 .../java/org/apache/doris/qe/AuditLogHelper.java   |  12 ++
 .../apache/doris/qe/AuditEventProcessorTest.java   |   2 +
 .../data/audit/test_audit_log_behavior.out         |   1 +
 .../suites/audit/test_audit_log_queue_time.groovy  | 129 +++++++++++++++++++++
 7 files changed, 154 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
index 955e021fa44..a7ae010f863 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
@@ -163,6 +163,8 @@ public class InternalSchema {
         // execution info
         AUDIT_SCHEMA.add(new ColumnDef("query_time",
                 ScalarType.createType(PrimitiveType.BIGINT), 
ColumnNullableType.NULLABLE));
+        AUDIT_SCHEMA.add(new ColumnDef("queue_time_ms",
+                ScalarType.createType(PrimitiveType.BIGINT), 
ColumnNullableType.NULLABLE));
         AUDIT_SCHEMA.add(new ColumnDef("cpu_time_ms",
                 ScalarType.createType(PrimitiveType.BIGINT), 
ColumnNullableType.NULLABLE));
         AUDIT_SCHEMA.add(new ColumnDef("peak_memory_bytes",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 3b08dc382d4..c54c32f6a18 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -84,6 +84,8 @@ public class AuditEvent {
     // execution info
     @AuditField(value = "Time(ms)", colName = "query_time")
     public long queryTime = -1;
+    @AuditField(value = "QueueTimeMs", colName = "queue_time_ms")
+    public long queueTimeMs = -1;
     @AuditField(value = "CpuTimeMS", colName = "cpu_time_ms")
     public long cpuTimeMs = -1;
     @AuditField(value = "PeakMemoryBytes", colName = "peak_memory_bytes")
@@ -231,6 +233,11 @@ public class AuditEvent {
             return this;
         }
 
+        public AuditEventBuilder setQueueTimeMs(long queueTimeMs) {
+            auditEvent.queueTimeMs = queueTimeMs;
+            return this;
+        }
+
         public AuditEventBuilder setScanBytes(long scanBytes) {
             auditEvent.scanBytes = scanBytes;
             return this;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index b4b8195b608..6f43ecdd574 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -168,6 +168,7 @@ public class AuditLoader extends Plugin implements 
AuditPlugin {
 
         // execution info
         logBuffer.append(event.queryTime).append(AUDIT_TABLE_COL_SEPARATOR);
+        logBuffer.append(event.queueTimeMs).append(AUDIT_TABLE_COL_SEPARATOR);
         logBuffer.append(event.cpuTimeMs).append(AUDIT_TABLE_COL_SEPARATOR);
         
logBuffer.append(event.peakMemoryBytes).append(AUDIT_TABLE_COL_SEPARATOR);
         logBuffer.append(event.scanBytes).append(AUDIT_TABLE_COL_SEPARATOR);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 8de74fd6fbe..2c7a35b994f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -47,6 +47,7 @@ import org.apache.doris.plugin.AuditEvent;
 import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
 import org.apache.doris.plugin.AuditEvent.EventType;
 import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.resource.workloadgroup.QueueToken;
 import org.apache.doris.service.FrontendOptions;
 
 import com.google.common.base.Strings;
@@ -234,6 +235,8 @@ public class AuditLogHelper {
         }
         String cluster = Config.isCloudMode() ? cloudCluster : "";
         String stmtType = getStmtType(parsedStmt);
+        long queueTimeMs = getQueueTimeMs(ctx);
+
 
         AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
         // ATTN: MUST reset, otherwise, the same AuditEventBuilder instance 
will be used in the next query.
@@ -252,6 +255,7 @@ public class AuditLogHelper {
                 .setErrorMessage((ctx.getState().getErrorMessage() == null ? 
"" :
                         ctx.getState().getErrorMessage().replace("\n", " 
").replace("\t", " ")))
                 .setQueryTime(elapseMs)
+                .setQueueTimeMs(queueTimeMs)
                 .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
                 .setPeakMemoryBytes(statistics == null ? 0 : 
statistics.getMaxPeakMemoryBytes())
                 .setScanBytes(statistics == null ? 0 : 
statistics.getScanBytes())
@@ -447,6 +451,14 @@ public class AuditLogHelper {
         }
     }
 
+    private static long getQueueTimeMs(ConnectContext ctx) {
+        QueueToken queueToken = null;
+        if (ctx.getExecutor() != null && ctx.getExecutor().getCoord() != null) 
{
+            queueToken = ctx.getExecutor().getCoord().getQueueToken();
+        }
+        return queueToken == null ? -1 : queueToken.getQueueEndTime() - 
queueToken.getQueueStartTime();
+    }
+
     private static String getStmtType(StatementBase stmt) {
         if (stmt == null) {
             return StmtType.OTHER.name();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java
index f80b485609c..884f9a5badf 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java
@@ -58,6 +58,7 @@ public class AuditEventProcessorTest {
                 .setDb("db1")
                 .setState("EOF")
                 .setQueryTime(2000)
+                .setQueueTimeMs(2000)
                 .setScanBytes(100000)
                 .setScanRows(200000)
                 .setReturnRows(1)
@@ -68,6 +69,7 @@ public class AuditEventProcessorTest {
         Assert.assertEquals("127.0.0.1", event.clientIp);
         Assert.assertEquals(200000, event.scanRows);
         Assert.assertEquals("SELECT", event.stmtType);
+        Assert.assertEquals(2000, event.queueTimeMs);
     }
 
     @Test
diff --git a/regression-test/data/audit/test_audit_log_behavior.out 
b/regression-test/data/audit/test_audit_log_behavior.out
index bfb8d22da9d..f9b8c96323c 100644
--- a/regression-test/data/audit/test_audit_log_behavior.out
+++ b/regression-test/data/audit/test_audit_log_behavior.out
@@ -11,6 +11,7 @@ state varchar(128)    Yes     false   \N      NONE
 error_code     int     Yes     false   \N      NONE
 error_message  text    Yes     false   \N      NONE
 query_time     bigint  Yes     false   \N      NONE
+queue_time_ms  bigint  Yes     false   \N      NONE
 cpu_time_ms    bigint  Yes     false   \N      NONE
 peak_memory_bytes      bigint  Yes     false   \N      NONE
 scan_bytes     bigint  Yes     false   \N      NONE
diff --git a/regression-test/suites/audit/test_audit_log_queue_time.groovy 
b/regression-test/suites/audit/test_audit_log_queue_time.groovy
new file mode 100644
index 00000000000..e40d1688c42
--- /dev/null
+++ b/regression-test/suites/audit/test_audit_log_queue_time.groovy
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_audit_log_queue_time", "nonConcurrent") {
+    // Check admin privilege
+    try {
+        sql "set global enable_audit_plugin = true"
+    } catch (Exception e) {
+        log.warn("skip this case, because " + e.getMessage())
+        assertTrue(e.getMessage().toUpperCase().contains("ADMIN"))
+        return
+    }
+
+    def tableName = "audit_queue_time_test"
+    def wgName = "test_queue_time_wg"
+    def testMarker = UUID.randomUUID().toString().substring(0, 8)
+
+    // Cleanup environment
+    sql "drop table if exists ${tableName}"
+    sql "drop workload group if exists ${wgName}"
+
+    // Create test table
+    sql """
+        CREATE TABLE `${tableName}` (
+          `id` bigint,
+          `name` varchar(32)
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+    """
+
+    sql "insert into ${tableName} values (1, 'test')"
+
+    def maxConcurrency = 1
+    // Create workload group: max_concurrency=1 ensures queries queue up
+    sql """
+        create workload group ${wgName}
+        properties (
+            'max_concurrency' = '${maxConcurrency}',
+            'max_queue_size' = '10',
+            'queue_timeout' = '30000'
+        )
+    """
+
+    // Wait for workload group to take effect
+    Thread.sleep(5000)
+
+    // Truncate audit_log for easier testing
+    sql "truncate table __internal_schema.audit_log"
+
+    // Submit concurrent queries with marker for later lookup
+    def sqlSleepTime = 5
+    def queuedSqlCnt = 1
+    def threads = []
+    for (int i = 0; i < maxConcurrency + queuedSqlCnt; i++) {
+        def idx = i
+        threads << Thread.start {
+            try {
+                sql "set workload_group=${wgName}"
+                // Use sleep function to simulate long query, ensuring 
subsequent queries need to queue
+                sql """
+                    select sleep(${sqlSleepTime}), '${testMarker}_${idx}' as 
marker
+                    from ${tableName} limit 1
+                """
+            } catch (Exception e) {
+                log.warn("Query ${idx} failed: ${e.getMessage()}")
+            }
+        }
+    }
+
+    // Wait for all queries to complete
+    threads.each { it.join() }
+
+    // Wait for audit log to flush
+    Thread.sleep(5000)
+    sql "call flush_audit_log()"
+    Thread.sleep(5000)
+
+    // Verify queue_time_ms column exists
+    def schemaResult = sql "desc internal.__internal_schema.audit_log"
+    def hasQueueTimeMs = schemaResult.any { it[0] == "queue_time_ms" }
+    assertTrue(hasQueueTimeMs)
+
+    // check result
+    def retry = 10
+    def query = """
+        select query_id, queue_time_ms, stmt
+        from __internal_schema.audit_log
+        where stmt like '%${testMarker}%'
+        and queue_time_ms > 0
+        order by time
+    """
+    def auditResult = sql "${query}"
+
+    while (auditResult.isEmpty()) {
+        if (retry-- < 0) {
+            throw new RuntimeException("It has retried a few but still failed, 
you need to check it")
+        }
+        sql "call flush_audit_log()"
+        sleep(3000)
+        auditResult = sql "${query}"
+    }
+
+    auditResult.each { row ->
+        assertTrue(row[1] >= sqlSleepTime * 1000)
+    }
+
+    assertTrue(auditResult.size() >= queuedSqlCnt)
+
+    // Cleanup
+    sql "drop table if exists ${tableName}"
+    sql "drop workload group if exists ${wgName}"
+    sql "set global enable_audit_plugin = false"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to