This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new cf68a6485a8 [opt](audit) use one line in audit log and origin 
statement in audit table (#52032) (#52205)
cf68a6485a8 is described below

commit cf68a6485a84b432481f2ee6aab64fdb395b1b53
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Jun 24 19:43:32 2025 +0800

    [opt](audit) use one line in audit log and origin statement in audit table 
(#52032) (#52205)
    
    bp #52032
---
 .../java/org/apache/doris/plugin/AuditEvent.java   | 116 ++++++++++++---------
 .../org/apache/doris/plugin/audit/AuditLoader.java |  66 ++++++------
 .../apache/doris/plugin/audit/AuditLogBuilder.java |   6 ++
 .../doris/plugin/audit/AuditStreamLoader.java      |   2 +
 .../java/org/apache/doris/qe/AuditLogHelper.java   |   7 +-
 .../doris/plugin/audit/AuditLogBuilderTest.java    |  24 ++---
 6 files changed, 119 insertions(+), 102 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 20c05d982f8..7a2f48d4e6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -42,77 +42,95 @@ public class AuditEvent {
     }
 
     @Retention(RetentionPolicy.RUNTIME)
-    public static @interface AuditField {
+    public @interface AuditField {
         String value() default "";
+
+        String colName() default "";
     }
 
     public EventType type;
 
-    // all fields which is about to be audit should be annotated by 
"@AuditField"
+    // all fields which is about to be audited should be annotated by 
"@AuditField"
     // make them all "public" so that easy to visit.
-    @AuditField(value = "Timestamp")
+
+    // uuid and time
+    @AuditField(value = "QueryId", colName = "query_id")
+    public String queryId = "";
+    @AuditField(value = "Timestamp", colName = "time")
     public long timestamp = -1;
-    @AuditField(value = "Client")
+
+    // cs info
+    @AuditField(value = "Client", colName = "client_ip")
     public String clientIp = "";
-    @AuditField(value = "User")
+    @AuditField(value = "User", colName = "user")
     public String user = "";
-    @AuditField(value = "Ctl")
+    @AuditField(value = "FeIp", colName = "frontend_ip")
+    public String feIp = "";
+
+    // default ctl and db
+    @AuditField(value = "Ctl", colName = "catalog")
     public String ctl = "";
-    @AuditField(value = "Db")
+    @AuditField(value = "Db", colName = "db")
     public String db = "";
-    @AuditField(value = "CommandType")
-    public String commandType = "";
-    @AuditField(value = "State")
+
+    // query state
+    @AuditField(value = "State", colName = "state")
     public String state = "";
-    @AuditField(value = "ErrorCode")
+    @AuditField(value = "ErrorCode", colName = "error_code")
     public int errorCode = 0;
-    @AuditField(value = "ErrorMessage")
+    @AuditField(value = "ErrorMessage", colName = "error_message")
     public String errorMessage = "";
-    @AuditField(value = "Time(ms)")
+
+    // execution info
+    @AuditField(value = "Time(ms)", colName = "query_time")
     public long queryTime = -1;
-    @AuditField(value = "ScanBytes")
+    @AuditField(value = "CpuTimeMS", colName = "cpu_time_ms")
+    public long cpuTimeMs = -1;
+    @AuditField(value = "PeakMemoryBytes", colName = "peak_memory_bytes")
+    public long peakMemoryBytes = -1;
+    @AuditField(value = "ScanBytes", colName = "scan_bytes")
     public long scanBytes = -1;
-    @AuditField(value = "ScanRows")
+    @AuditField(value = "ScanRows", colName = "scan_rows")
     public long scanRows = -1;
-    @AuditField(value = "ReturnRows")
+    @AuditField(value = "ReturnRows", colName = "return_rows")
     public long returnRows = -1;
-    @AuditField(value = "StmtId")
-    public long stmtId = -1;
-    @AuditField(value = "QueryId")
-    public String queryId = "";
-    @AuditField(value = "IsQuery")
-    public boolean isQuery = false;
-    @AuditField(value = "IsNereids")
-    public boolean isNereids = false;
-    @AuditField(value = "FeIp")
-    public String feIp = "";
-    @AuditField(value = "StmtType")
-    public String stmtType = "";
-    @AuditField(value = "Stmt")
-    public String stmt = "";
-    @AuditField(value = "CpuTimeMS")
-    public long cpuTimeMs = -1;
-    @AuditField(value = "ShuffleSendBytes")
-    public long shuffleSendBytes = -1;
-    @AuditField(value = "ShuffleSendRows")
+    @AuditField(value = "ShuffleSendRows", colName = "shuffle_send_rows")
     public long shuffleSendRows = -1;
-    @AuditField(value = "SqlHash")
+    @AuditField(value = "ShuffleSendBytes", colName = "shuffle_send_bytes")
+    public long shuffleSendBytes = -1;
+    @AuditField(value = "ScanBytesFromLocalStorage", colName = 
"scan_bytes_from_local_storage")
+    public long scanBytesFromLocalStorage = -1;
+    @AuditField(value = "ScanBytesFromRemoteStorage", colName = 
"scan_bytes_from_remote_storage")
+    public long scanBytesFromRemoteStorage = -1;
+
+    @AuditField(value = "FuzzyVariables")
+    public String fuzzyVariables = "";
+
+    // type and digest
+    @AuditField(value = "CommandType")
+    public String commandType = "";
+    @AuditField(value = "StmtType", colName = "stmt_type")
+    public String stmtType = "";
+    @AuditField(value = "StmtId", colName = "stmt_id")
+    public long stmtId = -1;
+    @AuditField(value = "SqlHash", colName = "sql_hash")
     public String sqlHash = "";
-    @AuditField(value = "PeakMemoryBytes")
-    public long peakMemoryBytes = -1;
-    @AuditField(value = "SqlDigest")
+    @AuditField(value = "SqlDigest", colName = "sql_digest")
     public String sqlDigest = "";
-    @AuditField(value = "ComputeGroupName")
-    public String cloudClusterName = "";
-    @AuditField(value = "WorkloadGroup")
+    @AuditField(value = "IsQuery", colName = "is_query")
+    public boolean isQuery = false;
+    @AuditField(value = "IsNereids", colName = "is_nereids")
+    public boolean isNereids = false;
+
+    // resource
+    @AuditField(value = "WorkloadGroup", colName = "workload_group")
     public String workloadGroup = "";
-    // note: newly added fields should be always before fuzzyVariables
-    @AuditField(value = "FuzzyVariables")
-    public String fuzzyVariables = "";
-    @AuditField(value = "ScanBytesFromLocalStorage")
-    public long scanBytesFromLocalStorage = -1;
-    @AuditField(value = "ScanBytesFromRemoteStorage")
-    public long scanBytesFromRemoteStorage = -1;
+    @AuditField(value = "ComputeGroupName", colName = "compute_group")
+    public String cloudClusterName = "";
+
+    // stmt should be last one
+    @AuditField(value = "Stmt", colName = "stmt")
+    public String stmt = "";
 
     public long pushToAuditLogQueueTime;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index 722ab48669b..c1047bec1b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -46,6 +46,14 @@ public class AuditLoader extends Plugin implements 
AuditPlugin {
 
     public static final String AUDIT_LOG_TABLE = "audit_log";
 
+    // the "\\u001F" and "\\u001E" are used to separate columns and lines in 
audit log data
+    public static final String AUDIT_TABLE_COL_SEPARATOR = "\\u001F";
+    public static final String AUDIT_TABLE_LINE_DELIMITER = "\\u001E";
+    // the "\\x1F" and "\\x1E" are used to specified column and line delimiter 
in stream load request
+    // which is corresponding to the "\\u001F" and "\\u001E" in audit log data.
+    public static final String AUDIT_TABLE_COL_SEPARATOR_STR = "\\x1F";
+    public static final String AUDIT_TABLE_LINE_DELIMITER_STR = "\\x1E";
+
     private StringBuilder auditLogBuffer = new StringBuilder();
     private int auditLogNum = 0;
     private long lastLoadTimeAuditLog = 0;
@@ -139,40 +147,40 @@ public class AuditLoader extends Plugin implements 
AuditPlugin {
 
     private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
         // should be same order as InternalSchema.AUDIT_SCHEMA
-        logBuffer.append(event.queryId).append("\t");
-        
logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("\t");
-        logBuffer.append(event.clientIp).append("\t");
-        logBuffer.append(event.user).append("\t");
-        logBuffer.append(event.ctl).append("\t");
-        logBuffer.append(event.db).append("\t");
-        logBuffer.append(event.state).append("\t");
-        logBuffer.append(event.errorCode).append("\t");
-        logBuffer.append(event.errorMessage).append("\t");
-        logBuffer.append(event.queryTime).append("\t");
-        logBuffer.append(event.scanBytes).append("\t");
-        logBuffer.append(event.scanRows).append("\t");
-        logBuffer.append(event.returnRows).append("\t");
-        logBuffer.append(event.shuffleSendRows).append("\t");
-        logBuffer.append(event.shuffleSendBytes).append("\t");
-        logBuffer.append(event.scanBytesFromLocalStorage).append("\t");
-        logBuffer.append(event.scanBytesFromRemoteStorage).append("\t");
-        logBuffer.append(event.stmtId).append("\t");
-        logBuffer.append(event.stmtType).append("\t");
-        logBuffer.append(event.isQuery ? 1 : 0).append("\t");
-        logBuffer.append(event.isNereids ? 1 : 0).append("\t");
-        logBuffer.append(event.feIp).append("\t");
-        logBuffer.append(event.cpuTimeMs).append("\t");
-        logBuffer.append(event.sqlHash).append("\t");
-        logBuffer.append(event.sqlDigest).append("\t");
-        logBuffer.append(event.peakMemoryBytes).append("\t");
-        logBuffer.append(event.workloadGroup).append("\t");
-        logBuffer.append(event.cloudClusterName).append("\t");
+        logBuffer.append(event.queryId).append("AUDIT_TABLE_COL_SEPARATOR");
+        
logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.clientIp).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.user).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.ctl).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.db).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.state).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.errorCode).append("AUDIT_TABLE_COL_SEPARATOR");
+        
logBuffer.append(event.errorMessage).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.queryTime).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.scanBytes).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.scanRows).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.returnRows).append("AUDIT_TABLE_COL_SEPARATOR");
+        
logBuffer.append(event.shuffleSendRows).append("AUDIT_TABLE_COL_SEPARATOR");
+        
logBuffer.append(event.shuffleSendBytes).append("AUDIT_TABLE_COL_SEPARATOR");
+        
logBuffer.append(event.scanBytesFromLocalStorage).append("AUDIT_TABLE_COL_SEPARATOR");
+        
logBuffer.append(event.scanBytesFromRemoteStorage).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.stmtId).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.stmtType).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.isQuery ? 1 : 
0).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.isNereids ? 1 : 
0).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.feIp).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.cpuTimeMs).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.sqlHash).append("AUDIT_TABLE_COL_SEPARATOR");
+        logBuffer.append(event.sqlDigest).append("AUDIT_TABLE_COL_SEPARATOR");
+        
logBuffer.append(event.peakMemoryBytes).append("AUDIT_TABLE_COL_SEPARATOR");
+        
logBuffer.append(event.workloadGroup).append("AUDIT_TABLE_COL_SEPARATOR");
+        
logBuffer.append(event.cloudClusterName).append("AUDIT_TABLE_COL_SEPARATOR");
         // already trim the query in 
org.apache.doris.qe.AuditLogHelper#logAuditLog
         String stmt = event.stmt;
         if (LOG.isDebugEnabled()) {
             LOG.debug("receive audit event with stmt: {}", stmt);
         }
-        logBuffer.append(stmt).append("\n");
+        logBuffer.append(stmt).append(AUDIT_TABLE_LINE_DELIMITER);
     }
 
     // public for external call.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
index 4208d5def2e..94d7973f294 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
@@ -121,6 +121,12 @@ public class AuditLogBuilder extends Plugin implements 
AuditPlugin {
                 }
             }
 
+            // replace new line characters with escaped characters to make 
sure the stmt in one line
+            if (af.value().equals("Stmt")) {
+                fieldValue = ((String) fieldValue).replace("\n", "\\n")
+                        .replace("\r", "\\r");
+            }
+
             sb.append("|").append(af.value()).append("=").append(fieldValue);
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
index 0b70e9591d5..d2576937d98 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
@@ -68,6 +68,8 @@ public class AuditStreamLoader {
                 InternalSchema.AUDIT_SCHEMA.stream().map(c -> 
c.getName()).collect(
                         Collectors.joining(",")));
         conn.addRequestProperty("redirect-policy", "random-be");
+        conn.addRequestProperty("column_separator", 
AuditLoader.AUDIT_TABLE_COL_SEPARATOR_STR);
+        conn.addRequestProperty("line_delimiter", 
AuditLoader.AUDIT_TABLE_LINE_DELIMITER_STR);
         conn.setDoOutput(true);
         conn.setDoInput(true);
         return conn;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index f1470d444b9..ecfd08aaa71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -101,9 +101,7 @@ public class AuditLogHelper {
         int maxLen = GlobalVariable.auditPluginMaxSqlLength;
         origStmt = truncateByBytes(origStmt, maxLen, " ... /* truncated. 
audit_plugin_max_sql_length=" + maxLen
                 + " */");
-        return origStmt.replace("\n", "\\n")
-                .replace("\t", "\\t")
-                .replace("\r", "\\r");
+        return origStmt;
     }
 
     private static Optional<String> handleInsertStmt(String origStmt, 
StatementBase parsedStmt) {
@@ -134,9 +132,6 @@ public class AuditLogHelper {
                     Math.min(GlobalVariable.auditPluginMaxInsertStmtLength, 
GlobalVariable.auditPluginMaxSqlLength));
             origStmt = truncateByBytes(origStmt, maxLen, " ... /* total " + 
rowCnt
                     + " rows, truncated. audit_plugin_max_insert_stmt_length=" 
+ maxLen + " */");
-            origStmt = origStmt.replace("\n", "\\n")
-                    .replace("\t", "\\t")
-                    .replace("\r", "\\r");
             return Optional.of(origStmt);
         } else {
             return Optional.empty();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java
index 8c678447c3a..f3e71c248df 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java
@@ -80,12 +80,9 @@ public class AuditLogBuilderTest {
             // 4. Test statement with newlines, tabs, carriage returns
             String stmtWithSpecialChars = "SELECT *\nFROM table1\tWHERE id = 
1\r";
             result = AuditLogHelper.handleStmt(stmtWithSpecialChars, 
nonInsertStmt);
-            Assert.assertTrue("Should escape newlines", 
result.contains("\\n"));
-            Assert.assertTrue("Should escape tabs", result.contains("\\t"));
-            Assert.assertTrue("Should escape carriage returns", 
result.contains("\\r"));
-            Assert.assertFalse("Should not contain actual newlines", 
result.contains("\n"));
-            Assert.assertFalse("Should not contain actual tabs", 
result.contains("\t"));
-            Assert.assertFalse("Should not contain actual carriage returns", 
result.contains("\r"));
+            Assert.assertTrue("Should contain actual newlines", 
result.contains("\n"));
+            Assert.assertTrue("Should contain actual tabs", 
result.contains("\t"));
+            Assert.assertTrue("Should contain actual carriage returns", 
result.contains("\r"));
 
             // 5. Test long statement with Chinese characters truncation
             String chineseStmt
@@ -118,12 +115,6 @@ public class AuditLogBuilderTest {
             String emptyStmt = "";
             result = AuditLogHelper.handleStmt(emptyStmt, nonInsertStmt);
             Assert.assertEquals("Empty string should remain empty", "", 
result);
-
-            // 9. Test statement with only special characters
-            String specialCharsStmt = "\n\t\r\n\t\r";
-            result = AuditLogHelper.handleStmt(specialCharsStmt, 
nonInsertStmt);
-            Assert.assertEquals("Should escape all special characters", 
"\\n\\t\\r\\n\\t\\r", result);
-
         } finally {
             // Restore original values
             GlobalVariable.auditPluginMaxSqlLength = originalMaxSqlLength;
@@ -172,12 +163,9 @@ public class AuditLogBuilderTest {
             result = AuditLogHelper.handleStmt(insertWithSpecialChars, 
insertStmt);
 
             // Verify special characters are properly escaped
-            Assert.assertTrue("Should escape newlines in INSERT", 
result.contains("\\n"));
-            Assert.assertTrue("Should escape tabs in INSERT", 
result.contains("\\t"));
-            Assert.assertTrue("Should escape carriage returns in INSERT", 
result.contains("\\r"));
-            Assert.assertFalse("Should not contain actual newlines", 
result.contains("\n"));
-            Assert.assertFalse("Should not contain actual tabs", 
result.contains("\t"));
-            Assert.assertFalse("Should not contain actual carriage returns", 
result.contains("\r"));
+            Assert.assertTrue("Should contain actual newlines", 
result.contains("\n"));
+            Assert.assertTrue("Should contain actual tabs", 
result.contains("\t"));
+            Assert.assertTrue("Should contain actual carriage returns", 
result.contains("\r"));
 
             // 4. Test comparison: same length statements, different handling 
for INSERT vs non-INSERT
             // Create a statement with length between 80-200


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to