This is an automated email from the ASF dual-hosted git repository.

hello-stephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 38430219e62 [fix](audit) serialize audit loader batch assembly (#65107)
38430219e62 is described below

commit 38430219e625b491f47920bf9925c687117c8076
Author: shuke <[email protected]>
AuthorDate: Thu Jul 2 10:17:23 2026 +0800

    [fix](audit) serialize audit loader batch assembly (#65107)
    
    ### What problem does this PR solve?
    
    `AuditLoader.loadIfNecessary()` is synchronized and resets
    `auditLogBuffer` after stream load, but `assembleAudit()` appended
    events to the same shared `StringBuilder` without holding the same
    monitor.
    
    When `call flush_audit_log()` forces a load while the audit loader
    worker is assembling a new event, the worker may append to the old
    buffer after the payload has already been materialized for stream load
    and before `resetBatch()` replaces the buffer. That event is then
    neither included in the current load nor retained for the next load.
    
    This can make `flush_audit_log()` miss audit events that have already
    reached the audit plugin pipeline, which matches the flaky
    `test_audit_log_queue_time` symptom.
    
    Related: DORIS-25958
    
    ### Check List
    
    - [x] Added test
    - [ ] This is a behavior change and it is documented
    - [ ] This is a new feature and it is documented
    - [ ] This needs upgrade
    - [ ] This needs downgrade
    
    ### Release note
    
    None
    
    ### Testing
    
    - `git diff --check`
    - Not run: `run-fe-ut.sh --run
    org.apache.doris.plugin.audit.AuditLoaderTest` because local
    `thirdparty/installed/bin/protoc` is missing, and `fe/AGENTS.md`
    requires stopping FE build/test in that case.
---
 .../org/apache/doris/plugin/audit/AuditLoader.java |  3 +-
 .../apache/doris/plugin/audit/AuditLoaderTest.java | 82 ++++++++++++++++++++++
 2 files changed, 83 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index 6f43ecdd574..6a09fdfa1fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -140,7 +140,7 @@ public class AuditLoader extends Plugin implements 
AuditPlugin {
         }
     }
 
-    private void assembleAudit(AuditEvent event) {
+    private synchronized void assembleAudit(AuditEvent event) {
         fillLogBuffer(event, auditLogBuffer);
         ++auditLogNum;
     }
@@ -287,4 +287,3 @@ public class AuditLoader extends Plugin implements 
AuditPlugin {
         }
     }
 }
-
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLoaderTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLoaderTest.java
new file mode 100644
index 00000000000..63bab9c3f95
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLoaderTest.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.plugin.audit;
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.plugin.AuditEvent;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class AuditLoaderTest {
+
+    @Test
+    public void testAssembleAuditIsSerializedWithLoadLock() throws Exception {
+        AuditLoader auditLoader = new AuditLoader();
+        AuditEvent auditEvent = new AuditEvent.AuditEventBuilder()
+                .setQueryId("query-in-shared-monitor-test")
+                .setTimestamp(1L)
+                .setStmt("select 1")
+                .build();
+
+        CountDownLatch started = new CountDownLatch(1);
+        AtomicReference<Throwable> error = new AtomicReference<>();
+        Thread assembleThread = new Thread(() -> {
+            started.countDown();
+            try {
+                Deencapsulation.invoke(auditLoader, "assembleAudit", 
auditEvent);
+            } catch (Throwable t) {
+                error.set(t);
+            }
+        });
+
+        synchronized (auditLoader) {
+            assembleThread.start();
+            Assert.assertTrue(started.await(5, TimeUnit.SECONDS));
+            Assert.assertTrue(waitForBlocked(assembleThread));
+            
Assert.assertFalse(getAuditLogBuffer(auditLoader).contains(auditEvent.queryId));
+        }
+
+        assembleThread.join(5000);
+        Assert.assertFalse(assembleThread.isAlive());
+        if (error.get() != null) {
+            throw new AssertionError("failed to assemble audit event", 
error.get());
+        }
+        
Assert.assertTrue(getAuditLogBuffer(auditLoader).contains(auditEvent.queryId));
+    }
+
+    private boolean waitForBlocked(Thread thread) throws InterruptedException {
+        long deadline = System.currentTimeMillis() + 5000;
+        while (System.currentTimeMillis() < deadline) {
+            if (thread.getState() == Thread.State.BLOCKED) {
+                return true;
+            }
+            Thread.sleep(10);
+        }
+        return false;
+    }
+
+    private String getAuditLogBuffer(AuditLoader auditLoader) {
+        StringBuilder buffer = Deencapsulation.getField(auditLoader, 
"auditLogBuffer");
+        return buffer.toString();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to