This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 104d427afa1 [cherry-pick][chore](audit) Optimize the SQL (insert into
values) length in audit logs (#37894) and let line comment work well (#40599)
(#42186)
104d427afa1 is described below
commit 104d427afa1d349c118782e34ac21fc90869d15d
Author: yagagagaga <[email protected]>
AuthorDate: Tue Oct 22 10:16:05 2024 +0800
[cherry-pick][chore](audit) Optimize the SQL (insert into values) length in
audit logs (#37894) and let line comment work well (#40599) (#42186)
## Proposed changes
cherry-pick from master #37894 and #40599
<!--Describe your changes.-->
---
.../commands/insert/InsertIntoTableCommand.java | 4 +
.../org/apache/doris/plugin/audit/AuditLoader.java | 30 +----
.../java/org/apache/doris/qe/AuditLogHelper.java | 132 +++++++++++++++++++--
.../suites/audit/test_audit_log_behavior.groovy | 102 ++++++++++++++++
4 files changed, 227 insertions(+), 41 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 7a46948f1b5..59b99c209d2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -95,6 +95,10 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
this.cte = cte;
}
+ public LogicalPlan getLogicalQuery() {
+ return logicalQuery;
+ }
+
public Optional<String> getLabelName() {
return labelName;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index 0aef1a93dba..24eda23fc5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -35,11 +35,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CodingErrorAction;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.BlockingQueue;
@@ -169,33 +164,14 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
logBuffer.append(event.sqlDigest).append("\t");
logBuffer.append(event.peakMemoryBytes).append("\t");
logBuffer.append(event.workloadGroup).append("\t");
- // trim the query to avoid too long
- // use `getBytes().length` to get real byte length
- String stmt = truncateByBytes(event.stmt).replace("\n", " ")
- .replace("\t", " ")
- .replace("\r", " ");
+ // already trim the query in
org.apache.doris.qe.AuditLogHelper#logAuditLog
+ String stmt = event.stmt;
if (LOG.isDebugEnabled()) {
LOG.debug("receive audit event with stmt: {}", stmt);
}
logBuffer.append(stmt).append("\n");
}
- private String truncateByBytes(String str) {
- int maxLen = Math.min(GlobalVariable.auditPluginMaxSqlLength,
str.getBytes().length);
- if (maxLen >= str.getBytes().length) {
- return str;
- }
- Charset utf8Charset = Charset.forName("UTF-8");
- CharsetDecoder decoder = utf8Charset.newDecoder();
- byte[] sb = str.getBytes();
- ByteBuffer buffer = ByteBuffer.wrap(sb, 0, maxLen);
- CharBuffer charBuffer = CharBuffer.allocate(maxLen);
- decoder.onMalformedInput(CodingErrorAction.IGNORE);
- decoder.decode(buffer, charBuffer, true);
- decoder.flush(charBuffer);
- return new String(charBuffer.array(), 0, charBuffer.position());
- }
-
private void loadIfNecessary(AuditStreamLoader loader) {
long currentTime = System.currentTimeMillis();
@@ -229,8 +205,6 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
}
}
}
-
- return;
}
private void resetBatch(long currentTime) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 4f221fe42d6..afb5b856403 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -17,9 +17,12 @@
package org.apache.doris.qe;
-import org.apache.doris.analysis.InsertStmt;
+import org.apache.doris.analysis.NativeInsertStmt;
import org.apache.doris.analysis.Queriable;
+import org.apache.doris.analysis.QueryStmt;
+import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.analysis.ValueList;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
@@ -27,6 +30,14 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
+import org.apache.doris.nereids.analyzer.UnboundTableSink;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.trees.plans.Plan;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.qe.QueryState.MysqlStateType;
@@ -36,25 +47,127 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CodingErrorAction;
+import java.util.List;
+
public class AuditLogHelper {
private static final Logger LOG =
LogManager.getLogger(AuditLogHelper.class);
- // Add a new method to wrap original logAuditLog to catch all exceptions.
Because write audit
- // log may write to a doris internal table, we may meet errors. We do not
want this affect the
- // query process. Ignore this error and just write warning log.
+ /**
+ * Add a new method to wrap original logAuditLog to catch all exceptions.
Because write audit
+ * log may write to a doris internal table, we may meet errors. We do not
want this affect the
+ * query process. Ignore this error and just write warning log.
+ */
public static void logAuditLog(ConnectContext ctx, String origStmt,
StatementBase parsedStmt,
org.apache.doris.proto.Data.PQueryStatistics statistics, boolean
printFuzzyVariables) {
try {
+ origStmt = handleStmt(origStmt, parsedStmt);
logAuditLogImpl(ctx, origStmt, parsedStmt, statistics,
printFuzzyVariables);
} catch (Throwable t) {
LOG.warn("Failed to write audit log.", t);
}
}
+ /**
+ * Truncate sql and if SQL is in the following situations, count the
number of rows:
+ * <ul>
+ * <li>{@code insert into tbl values (1), (2), (3)}</li>
+ * </ul>
+ * The final SQL will be:
+ * {@code insert into tbl values (1), (2 ...}
+ */
+ public static String handleStmt(String origStmt, StatementBase parsedStmt)
{
+ if (origStmt == null) {
+ return null;
+ }
+ int maxLen = GlobalVariable.auditPluginMaxSqlLength;
+ if (origStmt.length() <= maxLen) {
+ return origStmt.replace("\n", "\\n")
+ .replace("\t", "\\t")
+ .replace("\r", "\\r");
+ }
+ origStmt = truncateByBytes(origStmt)
+ .replace("\n", "\\n")
+ .replace("\t", "\\t")
+ .replace("\r", "\\r");
+ int rowCnt = 0;
+ // old planner
+ if (parsedStmt instanceof NativeInsertStmt) {
+ QueryStmt queryStmt = ((NativeInsertStmt)
parsedStmt).getQueryStmt();
+ if (queryStmt instanceof SelectStmt) {
+ ValueList list = ((SelectStmt) queryStmt).getValueList();
+ if (list != null && list.getRows() != null) {
+ rowCnt = list.getRows().size();
+ }
+ }
+ }
+ // nereids planner
+ if (parsedStmt instanceof LogicalPlanAdapter) {
+ LogicalPlan plan = ((LogicalPlanAdapter)
parsedStmt).getLogicalPlan();
+ if (plan instanceof InsertIntoTableCommand) {
+ LogicalPlan query = ((InsertIntoTableCommand)
plan).getLogicalQuery();
+ if (query instanceof UnboundTableSink) {
+ rowCnt = countValues(query.children());
+ }
+ }
+ }
+ if (rowCnt > 0) {
+ return origStmt + " ... /* total " + rowCnt + " rows, truncated
audit_plugin_max_sql_length="
+ + GlobalVariable.auditPluginMaxSqlLength + " */";
+ } else {
+ return origStmt
+ + " ... /* truncated audit_plugin_max_sql_length="
+ + GlobalVariable.auditPluginMaxSqlLength + " */";
+ }
+ }
+
+ private static String truncateByBytes(String str) {
+ int maxLen = Math.min(GlobalVariable.auditPluginMaxSqlLength,
str.getBytes().length);
+ // use `getBytes().length` to get real byte length
+ if (maxLen >= str.getBytes().length) {
+ return str;
+ }
+ Charset utf8Charset = Charset.forName("UTF-8");
+ CharsetDecoder decoder = utf8Charset.newDecoder();
+ byte[] sb = str.getBytes();
+ ByteBuffer buffer = ByteBuffer.wrap(sb, 0, maxLen);
+ CharBuffer charBuffer = CharBuffer.allocate(maxLen);
+ decoder.onMalformedInput(CodingErrorAction.IGNORE);
+ decoder.decode(buffer, charBuffer, true);
+ decoder.flush(charBuffer);
+ return new String(charBuffer.array(), 0, charBuffer.position());
+ }
+
+ /**
+ * When SQL is in the following situations, count the number of rows:
+ * <ul>
+ * <li>{@code insert into tbl values (1), (2), (3)}</li>
+ * </ul>
+ */
+ private static int countValues(List<Plan> children) {
+ if (children == null) {
+ return 0;
+ }
+ int cnt = 0;
+ for (Plan child : children) {
+ if (child instanceof UnboundOneRowRelation) {
+ cnt++;
+ } else if (child instanceof LogicalInlineTable) {
+ cnt += ((LogicalInlineTable)
child).getConstantExprsList().size();
+ } else if (child instanceof LogicalUnion) {
+ cnt += countValues(child.children());
+ }
+ }
+ return cnt;
+ }
+
private static void logAuditLogImpl(ConnectContext ctx, String origStmt,
StatementBase parsedStmt,
org.apache.doris.proto.Data.PQueryStatistics statistics, boolean
printFuzzyVariables) {
- origStmt = origStmt.replace("\n", "\\n");
// slow query
long endTime = System.currentTimeMillis();
long elapseMs = endTime - ctx.getStartTime();
@@ -122,14 +235,7 @@ public class AuditLogHelper {
if (!ctx.getState().isQuery() && (parsedStmt != null &&
parsedStmt.needAuditEncryption())) {
auditEventBuilder.setStmt(parsedStmt.toSql());
} else {
- if (parsedStmt instanceof InsertStmt && !((InsertStmt)
parsedStmt).needLoadManager()
- && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) {
- // INSERT INTO VALUES may be very long, so we only log at most
1K bytes.
- int length = Math.min(1024, origStmt.length());
- auditEventBuilder.setStmt(origStmt.substring(0, length));
- } else {
- auditEventBuilder.setStmt(origStmt);
- }
+ auditEventBuilder.setStmt(origStmt);
}
if (!Env.getCurrentEnv().isMaster()) {
if (ctx.executor.isForwardToMaster()) {
diff --git a/regression-test/suites/audit/test_audit_log_behavior.groovy
b/regression-test/suites/audit/test_audit_log_behavior.groovy
new file mode 100644
index 00000000000..a43b456e6d0
--- /dev/null
+++ b/regression-test/suites/audit/test_audit_log_behavior.groovy
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_audit_log_behavior") {
+ try {
+ sql "set global enable_audit_plugin = true"
+ sql "set global audit_plugin_max_sql_length = 58"
+ sql "set global audit_plugin_max_batch_interval_sec = 1"
+ } catch (Exception e) {
+ log.warn("skip this case, because " + e.getMessage())
+ assertTrue(e.getMessage().toUpperCase().contains("ADMIN"))
+ return
+ }
+
+ sql "drop table if exists audit_log_behavior"
+ sql """
+ CREATE TABLE `audit_log_behavior` (
+ `id` bigint,
+ `name` varchar(32)
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ )
+ """
+
+ int cnt = 0
+ def sqls = [
+ [
+ "insert into audit_log_behavior values (1,
'3F6B9A_${cnt}')",
+ "insert into audit_log_behavior values (1,
'3F6B9A_${cnt++}')"
+ ],
+ [
+ "insert into audit_log_behavior values (1,
'3F6B9A_${cnt}'), (2, 'Jelly')",
+ "insert into audit_log_behavior values (1,
'3F6B9A_${cnt++}'), (2, ... /* total 2 rows, truncated
audit_plugin_max_sql_length=58 */"
+ ],
+ [
+ "insert into audit_log_behavior values (1,
'3F6B9A_${cnt}'), (2, 'Jelly'), (3, 'foobar')",
+ "insert into audit_log_behavior values (1,
'3F6B9A_${cnt++}'), (2, ... /* total 3 rows, truncated
audit_plugin_max_sql_length=58 */"
+ ],
+ [
+ "insert into audit_log_behavior select 1, '3F6B9A_${cnt}'",
+ "insert into audit_log_behavior select 1,
'3F6B9A_${cnt++}'"],
+ [
+ "insert into audit_log_behavior select 1, '3F6B9A_${cnt}'
union select 2, 'Jelly'",
+ "insert into audit_log_behavior select 1,
'3F6B9A_${cnt++}' union ... /* truncated audit_plugin_max_sql_length=58 */"
+ ],
+ [
+ "insert into audit_log_behavior select 1, '3F6B9A_${cnt}'
from audit_log_behavior",
+ "insert into audit_log_behavior select 1,
'3F6B9A_${cnt++}' from a ... /* truncated audit_plugin_max_sql_length=58 */"
+ ],
+ [
+ "select id, name from audit_log_behavior as
loooooooooooooooong_alias",
+ "select id, name from audit_log_behavior as
loooooooooooooo ... /* truncated audit_plugin_max_sql_length=58 */"
+ ]
+ ]
+
+ for (def on : [true, false]) {
+ sql "set enable_nereids_planner=${on}"
+ sql "truncate table __internal_schema.audit_log"
+ // run queries
+ for (int i = 0; i < cnt; i++) {
+ def tuple2 = sqls.get(i)
+ sql tuple2[0]
+ }
+
+ // check result
+ for (int i = 0; i < cnt; i++) {
+ def tuple2 = sqls.get(i)
+ def retry = 90
+ def res = sql "select stmt from __internal_schema.audit_log where
stmt like '%3F6B9A_${i}%' order by time asc limit 1"
+ while (res.isEmpty()) {
+ if (retry-- < 0) {
+ logger.warn("It has retried a few but still failed, you
need to check it")
+ return
+ }
+ sleep(1000)
+ res = sql "select stmt from __internal_schema.audit_log where
stmt like '%3F6B9A_${i}%' order by time asc limit 1"
+ }
+ assertEquals(res[0][0].toString(), tuple2[1].toString())
+ }
+ }
+ sql "set global enable_audit_plugin = false"
+ sql "set global audit_plugin_max_sql_length = 4096"
+ sql "set global audit_plugin_max_batch_interval_sec = 60"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]