This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e85e6c89ba6 [fix](auditlog) add missing audit log fields and duplicate
audit log error #42262 (#43015)
e85e6c89ba6 is described below
commit e85e6c89ba64e496fbdbecdf7bc19ec4d1899213
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Fri Nov 1 08:24:55 2024 +0800
[fix](auditlog) add missing audit log fields and duplicate audit log error
#42262 (#43015)
bp #42262
---
.../org/apache/doris/catalog/InternalSchema.java | 4 ++
.../nereids/trees/plans/commands/CallCommand.java | 12 +++++
.../plans/commands/call/CallFlushAuditLogFunc.java | 57 ++++++++++++++++++++++
.../trees/plans/commands/call/CallFunc.java | 2 +
.../java/org/apache/doris/plugin/AuditEvent.java | 8 ++-
.../java/org/apache/doris/plugin/PluginMgr.java | 13 ++++-
.../org/apache/doris/plugin/audit/AuditLoader.java | 26 +++++-----
.../apache/doris/plugin/audit/AuditLogBuilder.java | 2 +-
.../java/org/apache/doris/qe/AuditLogHelper.java | 1 +
.../data/audit/test_audit_log_behavior.out | 30 ++++++++++++
.../suites/audit/test_audit_log_behavior.groovy | 9 +++-
11 files changed, 142 insertions(+), 22 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
index 3e10b961ba5..afe1f9af2da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
@@ -80,14 +80,18 @@ public class InternalSchema {
AUDIT_SCHEMA.add(new ColumnDef("scan_bytes",
TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("scan_rows",
TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("return_rows",
TypeDef.create(PrimitiveType.BIGINT), true));
+ AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_rows",
TypeDef.create(PrimitiveType.BIGINT), true));
+ AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_bytes",
TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("stmt_id",
TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("is_query",
TypeDef.create(PrimitiveType.TINYINT), true));
+ AUDIT_SCHEMA.add(new ColumnDef("is_nereids",
TypeDef.create(PrimitiveType.TINYINT), true));
AUDIT_SCHEMA.add(new ColumnDef("frontend_ip",
TypeDef.createVarchar(128), true));
AUDIT_SCHEMA.add(new ColumnDef("cpu_time_ms",
TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128),
true));
AUDIT_SCHEMA.add(new ColumnDef("sql_digest",
TypeDef.createVarchar(128), true));
AUDIT_SCHEMA.add(new ColumnDef("peak_memory_bytes",
TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("workload_group",
TypeDef.create(PrimitiveType.STRING), true));
+ // Keep stmt as last column. So that in fe.audit.log, it will be
easier to get sql string
AUDIT_SCHEMA.add(new ColumnDef("stmt",
TypeDef.create(PrimitiveType.STRING), true));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java
index 60886acd92f..1afc9253585 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.commands;
+import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.call.CallFunc;
@@ -58,4 +59,15 @@ public class CallCommand extends Command implements
ForwardWithSync, NotAllowFal
return visitor.visitCallCommand(this, context);
}
+ @Override
+ public RedirectStatus toRedirectStatus() {
+ // Some of call statements may need to be redirected, some may not
+ String funcName = unboundFunction.getName().toUpperCase();
+ switch (funcName) {
+ case "FLUSH_AUDIT_LOG":
+ return RedirectStatus.NO_FORWARD;
+ default:
+ return RedirectStatus.FORWARD_WITH_SYNC;
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFlushAuditLogFunc.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFlushAuditLogFunc.java
new file mode 100644
index 00000000000..60cae55e7f5
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFlushAuditLogFunc.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.call;
+
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import java.util.List;
+
+/**
+ * call flush_audit_log()
+ * This will flush audit log immediately to the audit_log table.
+ * Mainly for test cases, so that we don't need to wait 60 sec to flush the
audit log.
+ */
+public class CallFlushAuditLogFunc extends CallFunc {
+
+ private UserIdentity user;
+
+ private CallFlushAuditLogFunc(UserIdentity user) {
+ this.user = user;
+ }
+
+ public static CallFunc create(UserIdentity user, List<Expression> args) {
+ if (!args.isEmpty()) {
+ throw new AnalysisException("FLUSH_AUDIT_LOG function requires no
parameter");
+ }
+ return new CallFlushAuditLogFunc(user);
+ }
+
+ @Override
+ public void run() {
+ // check priv
+ if (!Env.getCurrentEnv().getAuth().checkGlobalPriv(user,
PrivPredicate.ADMIN)) {
+ throw new AnalysisException("Only admin can flush audit log");
+ }
+ // flush audit log
+ Env.getCurrentEnv().getPluginMgr().flushAuditLog();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java
index 4a8cf560c28..4eac9c6f9b8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java
@@ -36,6 +36,8 @@ public abstract class CallFunc {
// TODO, built-in functions require a separate management
case "EXECUTE_STMT": // Call built-in functions first
return CallExecuteStmtFunc.create(user,
unboundFunction.getArguments());
+ case "FLUSH_AUDIT_LOG":
+ return CallFlushAuditLogFunc.create(user,
unboundFunction.getArguments());
default:
return CallProcedure.create(ctx, originSql);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 9e5fc6ec528..5fc735a0bb8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -80,9 +80,9 @@ public class AuditEvent {
public String queryId = "";
@AuditField(value = "IsQuery")
public boolean isQuery = false;
- @AuditField(value = "isNereids")
+ @AuditField(value = "IsNereids")
public boolean isNereids = false;
- @AuditField(value = "feIp")
+ @AuditField(value = "FeIp")
public String feIp = "";
@AuditField(value = "Stmt")
public String stmt = "";
@@ -94,12 +94,10 @@ public class AuditEvent {
public long shuffleSendRows = -1;
@AuditField(value = "SqlHash")
public String sqlHash = "";
- @AuditField(value = "peakMemoryBytes")
+ @AuditField(value = "PeakMemoryBytes")
public long peakMemoryBytes = -1;
@AuditField(value = "SqlDigest")
public String sqlDigest = "";
- @AuditField(value = "TraceId")
- public String traceId = "";
@AuditField(value = "WorkloadGroup")
public String workloadGroup = "";
// note: newly added fields should be always before fuzzyVariables
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java
index ea69b247e66..904c8352f4e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java
@@ -61,6 +61,9 @@ public class PluginMgr implements Writable {
// all dynamic plugins should have unique names,
private final Set<String> dynamicPluginNames;
+ // Save this handler for external call
+ private AuditLoader auditLoader = null;
+
public PluginMgr() {
plugins = new Map[PluginType.MAX_PLUGIN_TYPE_SIZE];
for (int i = 0; i < PluginType.MAX_PLUGIN_TYPE_SIZE; i++) {
@@ -113,8 +116,8 @@ public class PluginMgr implements Writable {
}
// AuditLoader: log audit log to internal table
- AuditLoader auditLoaderPlugin = new AuditLoader();
- if (!registerBuiltinPlugin(auditLoaderPlugin.getPluginInfo(),
auditLoaderPlugin)) {
+ this.auditLoader = new AuditLoader();
+ if (!registerBuiltinPlugin(auditLoader.getPluginInfo(), auditLoader)) {
LOG.warn("failed to register audit log builder");
}
@@ -359,6 +362,12 @@ public class PluginMgr implements Writable {
return rows;
}
+ public void flushAuditLog() {
+ if (auditLoader != null) {
+ auditLoader.loadIfNecessary(true);
+ }
+ }
+
public void readFields(DataInputStream dis) throws IOException {
int size = dis.readInt();
for (int i = 0; i < size; i++) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index 24eda23fc5b..55dbba9805e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -35,8 +35,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -48,9 +46,6 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
public static final String AUDIT_LOG_TABLE = "audit_log";
- private static final DateTimeFormatter DATETIME_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
- .withZone(ZoneId.systemDefault());
-
private StringBuilder auditLogBuffer = new StringBuilder();
private int auditLogNum = 0;
private long lastLoadTimeAuditLog = 0;
@@ -90,7 +85,7 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
// GlobalVariable.audit_plugin_max_batch_bytes.
this.auditEventQueue = Queues.newLinkedBlockingDeque(100000);
this.streamLoader = new AuditStreamLoader();
- this.loadThread = new Thread(new LoadWorker(this.streamLoader),
"audit loader thread");
+ this.loadThread = new Thread(new LoadWorker(), "audit loader
thread");
this.loadThread.start();
isInit = true;
@@ -143,6 +138,7 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
}
private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
+ // should be same order as InternalSchema.AUDIT_SCHEMA
logBuffer.append(event.queryId).append("\t");
logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("\t");
logBuffer.append(event.clientIp).append("\t");
@@ -156,8 +152,11 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
logBuffer.append(event.scanBytes).append("\t");
logBuffer.append(event.scanRows).append("\t");
logBuffer.append(event.returnRows).append("\t");
+ logBuffer.append(event.shuffleSendRows).append("\t");
+ logBuffer.append(event.shuffleSendBytes).append("\t");
logBuffer.append(event.stmtId).append("\t");
logBuffer.append(event.isQuery ? 1 : 0).append("\t");
+ logBuffer.append(event.isNereids ? 1 : 0).append("\t");
logBuffer.append(event.feIp).append("\t");
logBuffer.append(event.cpuTimeMs).append("\t");
logBuffer.append(event.sqlHash).append("\t");
@@ -172,10 +171,12 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
logBuffer.append(stmt).append("\n");
}
- private void loadIfNecessary(AuditStreamLoader loader) {
+ // public for external call.
+ // synchronized to avoid concurrent load.
+ public synchronized void loadIfNecessary(boolean force) {
long currentTime = System.currentTimeMillis();
- if (auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes
+ if (force || auditLogBuffer.length() >=
GlobalVariable.auditPluginMaxBatchBytes
|| currentTime - lastLoadTimeAuditLog >=
GlobalVariable.auditPluginMaxBatchInternalSec * 1000) {
// begin to load
try {
@@ -188,7 +189,7 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
discardLogNum += auditLogNum;
return;
}
- AuditStreamLoader.LoadResponse response =
loader.loadBatch(auditLogBuffer, token);
+ AuditStreamLoader.LoadResponse response =
streamLoader.loadBatch(auditLogBuffer, token);
if (LOG.isDebugEnabled()) {
LOG.debug("audit loader response: {}", response);
}
@@ -214,10 +215,8 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
}
private class LoadWorker implements Runnable {
- private AuditStreamLoader loader;
- public LoadWorker(AuditStreamLoader loader) {
- this.loader = loader;
+ public LoadWorker() {
}
public void run() {
@@ -227,7 +226,7 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
if (event != null) {
assembleAudit(event);
// process all audit logs
- loadIfNecessary(loader);
+ loadIfNecessary(false);
}
} catch (InterruptedException ie) {
if (LOG.isDebugEnabled()) {
@@ -240,3 +239,4 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
}
}
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
index 8d9e2c9d96e..161a5830b9c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
@@ -116,7 +116,7 @@ public class AuditLogBuilder extends Plugin implements
AuditPlugin {
if (af.value().equals("Time(ms)")) {
queryTime = (long) f.get(event);
}
-
sb.append("|").append(af.value()).append("=").append(String.valueOf(f.get(event)));
+ sb.append("|").append(af.value()).append("=").append(f.get(event));
}
String auditLog = sb.toString();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index afb5b856403..02a66a5f6e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -174,6 +174,7 @@ public class AuditLogHelper {
CatalogIf catalog = ctx.getCurrentCatalog();
AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
+ // ATTN: MUST reset, otherwise, the same AuditEventBuilder instance
will be used in the next query.
auditEventBuilder.reset();
auditEventBuilder
.setTimestamp(ctx.getStartTime())
diff --git a/regression-test/data/audit/test_audit_log_behavior.out
b/regression-test/data/audit/test_audit_log_behavior.out
new file mode 100644
index 00000000000..96ad13a4c7c
--- /dev/null
+++ b/regression-test/data/audit/test_audit_log_behavior.out
@@ -0,0 +1,30 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !audit_log_schema --
+query_id varchar(48) Yes true \N
+time datetime(3) Yes true \N
+client_ip varchar(128) Yes true \N
+user varchar(128) Yes false \N NONE
+catalog varchar(128) Yes false \N NONE
+db varchar(128) Yes false \N NONE
+state varchar(128) Yes false \N NONE
+error_code int Yes false \N NONE
+error_message text Yes false \N NONE
+query_time bigint Yes false \N NONE
+scan_bytes bigint Yes false \N NONE
+scan_rows bigint Yes false \N NONE
+return_rows bigint Yes false \N NONE
+shuffle_send_rows bigint Yes false \N NONE
+shuffle_send_bytes bigint Yes false \N NONE
+stmt_id bigint Yes false \N NONE
+stmt_type varchar(48) Yes false \N NONE
+is_query tinyint Yes false \N NONE
+is_nereids tinyint Yes false \N NONE
+frontend_ip varchar(128) Yes false \N NONE
+cpu_time_ms bigint Yes false \N NONE
+sql_hash varchar(128) Yes false \N NONE
+sql_digest varchar(128) Yes false \N NONE
+peak_memory_bytes bigint Yes false \N NONE
+workload_group text Yes false \N NONE
+compute_group text Yes false \N NONE
+stmt text Yes false \N NONE
+
diff --git a/regression-test/suites/audit/test_audit_log_behavior.groovy
b/regression-test/suites/audit/test_audit_log_behavior.groovy
index a43b456e6d0..163de0c931e 100644
--- a/regression-test/suites/audit/test_audit_log_behavior.groovy
+++ b/regression-test/suites/audit/test_audit_log_behavior.groovy
@@ -19,7 +19,7 @@ suite("test_audit_log_behavior") {
try {
sql "set global enable_audit_plugin = true"
sql "set global audit_plugin_max_sql_length = 58"
- sql "set global audit_plugin_max_batch_interval_sec = 1"
+ // sql "set global audit_plugin_max_batch_interval_sec = 1"
} catch (Exception e) {
log.warn("skip this case, because " + e.getMessage())
assertTrue(e.getMessage().toUpperCase().contains("ADMIN"))
@@ -71,6 +71,8 @@ suite("test_audit_log_behavior") {
]
]
+ qt_audit_log_schema """desc internal.__internal_schema.audit_log"""
+
for (def on : [true, false]) {
sql "set enable_nereids_planner=${on}"
sql "truncate table __internal_schema.audit_log"
@@ -80,6 +82,10 @@ suite("test_audit_log_behavior") {
sql tuple2[0]
}
+ // make sure audit event is created.
+ // see WorkloadRuntimeStatusMgr.getQueryNeedAudit()
+ Thread.sleep(6000)
+ sql """call flush_audit_log()"""
// check result
for (int i = 0; i < cnt; i++) {
def tuple2 = sqls.get(i)
@@ -96,6 +102,7 @@ suite("test_audit_log_behavior") {
assertEquals(res[0][0].toString(), tuple2[1].toString())
}
}
+ // do not turn off
sql "set global enable_audit_plugin = false"
sql "set global audit_plugin_max_sql_length = 4096"
sql "set global audit_plugin_max_batch_interval_sec = 60"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]