This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 104d427afa1 [cherry-pick][chore](audit) Optimize the SQL (insert into 
values) length in audit logs (#37894) and let line comment work well (#40599) 
(#42186)
104d427afa1 is described below

commit 104d427afa1d349c118782e34ac21fc90869d15d
Author: yagagagaga <[email protected]>
AuthorDate: Tue Oct 22 10:16:05 2024 +0800

    [cherry-pick][chore](audit) Optimize the SQL (insert into values) length in 
audit logs (#37894) and let line comment work well (#40599) (#42186)
    
    ## Proposed changes
    
    cherry-pick from master #37894 and #40599
    
    <!--Describe your changes.-->
---
 .../commands/insert/InsertIntoTableCommand.java    |   4 +
 .../org/apache/doris/plugin/audit/AuditLoader.java |  30 +----
 .../java/org/apache/doris/qe/AuditLogHelper.java   | 132 +++++++++++++++++++--
 .../suites/audit/test_audit_log_behavior.groovy    | 102 ++++++++++++++++
 4 files changed, 227 insertions(+), 41 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 7a46948f1b5..59b99c209d2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -95,6 +95,10 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
         this.cte = cte;
     }
 
+    public LogicalPlan getLogicalQuery() {
+        return logicalQuery;
+    }
+
     public Optional<String> getLabelName() {
         return labelName;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index 0aef1a93dba..24eda23fc5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -35,11 +35,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CodingErrorAction;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.concurrent.BlockingQueue;
@@ -169,33 +164,14 @@ public class AuditLoader extends Plugin implements 
AuditPlugin {
         logBuffer.append(event.sqlDigest).append("\t");
         logBuffer.append(event.peakMemoryBytes).append("\t");
         logBuffer.append(event.workloadGroup).append("\t");
-        // trim the query to avoid too long
-        // use `getBytes().length` to get real byte length
-        String stmt = truncateByBytes(event.stmt).replace("\n", " ")
-                .replace("\t", " ")
-                .replace("\r", " ");
+        // already trim the query in 
org.apache.doris.qe.AuditLogHelper#logAuditLog
+        String stmt = event.stmt;
         if (LOG.isDebugEnabled()) {
             LOG.debug("receive audit event with stmt: {}", stmt);
         }
         logBuffer.append(stmt).append("\n");
     }
 
-    private String truncateByBytes(String str) {
-        int maxLen = Math.min(GlobalVariable.auditPluginMaxSqlLength, 
str.getBytes().length);
-        if (maxLen >= str.getBytes().length) {
-            return str;
-        }
-        Charset utf8Charset = Charset.forName("UTF-8");
-        CharsetDecoder decoder = utf8Charset.newDecoder();
-        byte[] sb = str.getBytes();
-        ByteBuffer buffer = ByteBuffer.wrap(sb, 0, maxLen);
-        CharBuffer charBuffer = CharBuffer.allocate(maxLen);
-        decoder.onMalformedInput(CodingErrorAction.IGNORE);
-        decoder.decode(buffer, charBuffer, true);
-        decoder.flush(charBuffer);
-        return new String(charBuffer.array(), 0, charBuffer.position());
-    }
-
     private void loadIfNecessary(AuditStreamLoader loader) {
         long currentTime = System.currentTimeMillis();
 
@@ -229,8 +205,6 @@ public class AuditLoader extends Plugin implements 
AuditPlugin {
                 }
             }
         }
-
-        return;
     }
 
     private void resetBatch(long currentTime) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 4f221fe42d6..afb5b856403 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -17,9 +17,12 @@
 
 package org.apache.doris.qe;
 
-import org.apache.doris.analysis.InsertStmt;
+import org.apache.doris.analysis.NativeInsertStmt;
 import org.apache.doris.analysis.Queriable;
+import org.apache.doris.analysis.QueryStmt;
+import org.apache.doris.analysis.SelectStmt;
 import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.analysis.ValueList;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
@@ -27,6 +30,14 @@ import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
+import org.apache.doris.nereids.analyzer.UnboundTableSink;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.trees.plans.Plan;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
 import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
 import org.apache.doris.plugin.AuditEvent.EventType;
 import org.apache.doris.qe.QueryState.MysqlStateType;
@@ -36,25 +47,127 @@ import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CodingErrorAction;
+import java.util.List;
+
 public class AuditLogHelper {
 
     private static final Logger LOG = 
LogManager.getLogger(AuditLogHelper.class);
 
-    // Add a new method to wrap original logAuditLog to catch all exceptions. 
Because write audit
-    // log may write to a doris internal table, we may meet errors. We do not 
want this affect the
-    // query process. Ignore this error and just write warning log.
+    /**
+     * Add a new method to wrap original logAuditLog to catch all exceptions. 
Because write audit
+     * log may write to a doris internal table, we may meet errors. We do not 
want this affect the
+     * query process. Ignore this error and just write warning log.
+     */
     public static void logAuditLog(ConnectContext ctx, String origStmt, 
StatementBase parsedStmt,
             org.apache.doris.proto.Data.PQueryStatistics statistics, boolean 
printFuzzyVariables) {
         try {
+            origStmt = handleStmt(origStmt, parsedStmt);
             logAuditLogImpl(ctx, origStmt, parsedStmt, statistics, 
printFuzzyVariables);
         } catch (Throwable t) {
             LOG.warn("Failed to write audit log.", t);
         }
     }
 
+    /**
+     * Truncate sql and if SQL is in the following situations, count the 
number of rows:
+     * <ul>
+     * <li>{@code insert into tbl values (1), (2), (3)}</li>
+     * </ul>
+     * The final SQL will be:
+     * {@code insert into tbl values (1), (2 ...}
+     */
+    public static String handleStmt(String origStmt, StatementBase parsedStmt) 
{
+        if (origStmt == null) {
+            return null;
+        }
+        int maxLen = GlobalVariable.auditPluginMaxSqlLength;
+        if (origStmt.length() <= maxLen) {
+            return origStmt.replace("\n", "\\n")
+                .replace("\t", "\\t")
+                .replace("\r", "\\r");
+        }
+        origStmt = truncateByBytes(origStmt)
+            .replace("\n", "\\n")
+            .replace("\t", "\\t")
+            .replace("\r", "\\r");
+        int rowCnt = 0;
+        // old planner
+        if (parsedStmt instanceof NativeInsertStmt) {
+            QueryStmt queryStmt = ((NativeInsertStmt) 
parsedStmt).getQueryStmt();
+            if (queryStmt instanceof SelectStmt) {
+                ValueList list = ((SelectStmt) queryStmt).getValueList();
+                if (list != null && list.getRows() != null) {
+                    rowCnt = list.getRows().size();
+                }
+            }
+        }
+        // nereids planner
+        if (parsedStmt instanceof LogicalPlanAdapter) {
+            LogicalPlan plan = ((LogicalPlanAdapter) 
parsedStmt).getLogicalPlan();
+            if (plan instanceof InsertIntoTableCommand) {
+                LogicalPlan query = ((InsertIntoTableCommand) 
plan).getLogicalQuery();
+                if (query instanceof UnboundTableSink) {
+                    rowCnt = countValues(query.children());
+                }
+            }
+        }
+        if (rowCnt > 0) {
+            return origStmt + " ... /* total " + rowCnt + " rows, truncated 
audit_plugin_max_sql_length="
+                + GlobalVariable.auditPluginMaxSqlLength + " */";
+        } else {
+            return origStmt
+                + " ... /* truncated audit_plugin_max_sql_length="
+                + GlobalVariable.auditPluginMaxSqlLength + " */";
+        }
+    }
+
+    private static String truncateByBytes(String str) {
+        int maxLen = Math.min(GlobalVariable.auditPluginMaxSqlLength, 
str.getBytes().length);
+        // use `getBytes().length` to get real byte length
+        if (maxLen >= str.getBytes().length) {
+            return str;
+        }
+        Charset utf8Charset = Charset.forName("UTF-8");
+        CharsetDecoder decoder = utf8Charset.newDecoder();
+        byte[] sb = str.getBytes();
+        ByteBuffer buffer = ByteBuffer.wrap(sb, 0, maxLen);
+        CharBuffer charBuffer = CharBuffer.allocate(maxLen);
+        decoder.onMalformedInput(CodingErrorAction.IGNORE);
+        decoder.decode(buffer, charBuffer, true);
+        decoder.flush(charBuffer);
+        return new String(charBuffer.array(), 0, charBuffer.position());
+    }
+
+    /**
+     * When SQL is in the following situations, count the number of rows:
+     * <ul>
+     * <li>{@code insert into tbl values (1), (2), (3)}</li>
+     * </ul>
+     */
+    private static int countValues(List<Plan> children) {
+        if (children == null) {
+            return 0;
+        }
+        int cnt = 0;
+        for (Plan child : children) {
+            if (child instanceof UnboundOneRowRelation) {
+                cnt++;
+            } else if (child instanceof LogicalInlineTable) {
+                cnt += ((LogicalInlineTable) 
child).getConstantExprsList().size();
+            } else if (child instanceof LogicalUnion) {
+                cnt += countValues(child.children());
+            }
+        }
+        return cnt;
+    }
+
     private static void logAuditLogImpl(ConnectContext ctx, String origStmt, 
StatementBase parsedStmt,
             org.apache.doris.proto.Data.PQueryStatistics statistics, boolean 
printFuzzyVariables) {
-        origStmt = origStmt.replace("\n", "\\n");
         // slow query
         long endTime = System.currentTimeMillis();
         long elapseMs = endTime - ctx.getStartTime();
@@ -122,14 +235,7 @@ public class AuditLogHelper {
         if (!ctx.getState().isQuery() && (parsedStmt != null && 
parsedStmt.needAuditEncryption())) {
             auditEventBuilder.setStmt(parsedStmt.toSql());
         } else {
-            if (parsedStmt instanceof InsertStmt && !((InsertStmt) 
parsedStmt).needLoadManager()
-                    && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) {
-                // INSERT INTO VALUES may be very long, so we only log at most 
1K bytes.
-                int length = Math.min(1024, origStmt.length());
-                auditEventBuilder.setStmt(origStmt.substring(0, length));
-            } else {
-                auditEventBuilder.setStmt(origStmt);
-            }
+            auditEventBuilder.setStmt(origStmt);
         }
         if (!Env.getCurrentEnv().isMaster()) {
             if (ctx.executor.isForwardToMaster()) {
diff --git a/regression-test/suites/audit/test_audit_log_behavior.groovy 
b/regression-test/suites/audit/test_audit_log_behavior.groovy
new file mode 100644
index 00000000000..a43b456e6d0
--- /dev/null
+++ b/regression-test/suites/audit/test_audit_log_behavior.groovy
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_audit_log_behavior") {
+    try {
+        sql "set global enable_audit_plugin = true"
+        sql "set global audit_plugin_max_sql_length = 58"
+        sql "set global audit_plugin_max_batch_interval_sec = 1"
+    } catch (Exception e) {
+        log.warn("skip this case, because " + e.getMessage())
+        assertTrue(e.getMessage().toUpperCase().contains("ADMIN"))
+        return
+    }
+
+    sql "drop table if exists audit_log_behavior"
+    sql """
+        CREATE TABLE `audit_log_behavior` (
+          `id` bigint,
+          `name` varchar(32)
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+          "replication_allocation" = "tag.location.default: 1"
+        )
+        """
+
+    int cnt = 0
+    def sqls = [
+            [
+                    "insert into audit_log_behavior values (1, 
'3F6B9A_${cnt}')",
+                    "insert into audit_log_behavior values (1, 
'3F6B9A_${cnt++}')"
+            ],
+            [
+                    "insert into audit_log_behavior values (1, 
'3F6B9A_${cnt}'), (2, 'Jelly')",
+                    "insert into audit_log_behavior values (1, 
'3F6B9A_${cnt++}'), (2, ... /* total 2 rows, truncated 
audit_plugin_max_sql_length=58 */"
+            ],
+            [
+                    "insert into audit_log_behavior values (1, 
'3F6B9A_${cnt}'), (2, 'Jelly'), (3, 'foobar')",
+                    "insert into audit_log_behavior values (1, 
'3F6B9A_${cnt++}'), (2, ... /* total 3 rows, truncated 
audit_plugin_max_sql_length=58 */"
+            ],
+            [
+                    "insert into audit_log_behavior select 1, '3F6B9A_${cnt}'",
+                    "insert into audit_log_behavior select 1, 
'3F6B9A_${cnt++}'"],
+            [
+                    "insert into audit_log_behavior select 1, '3F6B9A_${cnt}' 
union select 2, 'Jelly'",
+                    "insert into audit_log_behavior select 1, 
'3F6B9A_${cnt++}' union  ... /* truncated audit_plugin_max_sql_length=58 */"
+            ],
+            [
+                    "insert into audit_log_behavior select 1, '3F6B9A_${cnt}' 
from audit_log_behavior",
+                    "insert into audit_log_behavior select 1, 
'3F6B9A_${cnt++}' from a ... /* truncated audit_plugin_max_sql_length=58 */"
+            ],
+            [
+                    "select id, name from audit_log_behavior as 
loooooooooooooooong_alias",
+                    "select id, name from audit_log_behavior as 
loooooooooooooo ... /* truncated audit_plugin_max_sql_length=58 */"
+            ]
+    ]
+
+    for (def on : [true, false]) {
+        sql "set enable_nereids_planner=${on}"
+        sql "truncate table  __internal_schema.audit_log"
+        // run queries
+        for (int i = 0; i < cnt; i++) {
+            def tuple2 = sqls.get(i)
+            sql tuple2[0]
+        }
+
+        // check result
+        for (int i = 0; i < cnt; i++) {
+            def tuple2 = sqls.get(i)
+            def retry = 90
+            def res = sql "select stmt from __internal_schema.audit_log where 
stmt like '%3F6B9A_${i}%' order by time asc limit 1"
+            while (res.isEmpty()) {
+                if (retry-- < 0) {
+                    logger.warn("It has retried a few but still failed, you 
need to check it")
+                    return
+                }
+                sleep(1000)
+                res = sql "select stmt from __internal_schema.audit_log where 
stmt like '%3F6B9A_${i}%' order by time asc limit 1"
+            }
+            assertEquals(res[0][0].toString(), tuple2[1].toString())
+        }
+    }
+    sql "set global enable_audit_plugin = false"
+    sql "set global audit_plugin_max_sql_length = 4096"
+    sql "set global audit_plugin_max_batch_interval_sec = 60"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to