This is an automated email from the ASF dual-hosted git repository.
cambyzju pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new cc1770ca359 [fix](audit) duplicate audit log for multi-statements at
the same query (#37933)
cc1770ca359 is described below
commit cc1770ca359b2d6680c92a51946f69ccd256bf5d
Author: camby <[email protected]>
AuthorDate: Wed Jul 17 19:32:41 2024 +0800
[fix](audit) duplicate audit log for multi-statements at the same query
(#37933)
---
.../java/org/apache/doris/qe/AuditLogHelper.java | 33 ++++----
.../java/org/apache/doris/qe/ConnectProcessor.java | 90 +---------------------
2 files changed, 21 insertions(+), 102 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index dcfedc26792..b0a038dbf5e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -25,6 +25,7 @@ import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.service.FrontendOptions;
@@ -44,7 +45,13 @@ public class AuditLogHelper {
long elapseMs = endTime - ctx.getStartTime();
SpanContext spanContext =
Span.fromContext(Context.current()).getSpanContext();
- ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
+ AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
+ auditEventBuilder.reset();
+ auditEventBuilder.setTimestamp(ctx.getStartTime())
+ .setClientIp(ctx.getMysqlChannel().getRemoteHostPortString())
+
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
+ .setSqlHash(ctx.getSqlHash())
+ .setEventType(EventType.AFTER_QUERY)
.setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
.setState(ctx.getState().toString())
.setErrorCode(ctx.getState().getErrorCode() == null ? 0 :
ctx.getState().getErrorCode().getCode())
@@ -78,10 +85,10 @@ public class AuditLogHelper {
if (elapseMs > Config.qe_slow_log_ms) {
String sqlDigest = DigestUtils.md5Hex(((Queriable)
parsedStmt).toDigest());
- ctx.getAuditEventBuilder().setSqlDigest(sqlDigest);
+ auditEventBuilder.setSqlDigest(sqlDigest);
}
}
- ctx.getAuditEventBuilder().setIsQuery(true);
+ auditEventBuilder.setIsQuery(true);
if (ctx.getQueryDetail() != null) {
ctx.getQueryDetail().setEventTime(endTime);
ctx.getQueryDetail().setEndTime(endTime);
@@ -91,35 +98,35 @@ public class AuditLogHelper {
ctx.setQueryDetail(null);
}
} else {
- ctx.getAuditEventBuilder().setIsQuery(false);
+ auditEventBuilder.setIsQuery(false);
}
- ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids);
+ auditEventBuilder.setIsNereids(ctx.getState().isNereids);
-
ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress());
+ auditEventBuilder.setFeIp(FrontendOptions.getLocalHostAddress());
// We put origin query stmt at the end of audit log, for parsing the
log more convenient.
if (!ctx.getState().isQuery() && (parsedStmt != null &&
parsedStmt.needAuditEncryption())) {
- ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql());
+ auditEventBuilder.setStmt(parsedStmt.toSql());
} else {
if (parsedStmt instanceof InsertStmt && !((InsertStmt)
parsedStmt).needLoadManager()
&& ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) {
// INSERT INTO VALUES may be very long, so we only log at most
1K bytes.
int length = Math.min(1024, origStmt.length());
- ctx.getAuditEventBuilder().setStmt(origStmt.substring(0,
length));
+ auditEventBuilder.setStmt(origStmt.substring(0, length));
} else {
- ctx.getAuditEventBuilder().setStmt(origStmt);
+ auditEventBuilder.setStmt(origStmt);
}
}
if (!Env.getCurrentEnv().isMaster()) {
if (ctx.executor.isForwardToMaster()) {
-
ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus());
+ auditEventBuilder.setState(ctx.executor.getProxyStatus());
int proxyStatusCode = ctx.executor.getProxyStatusCode();
if (proxyStatusCode != 0) {
- ctx.getAuditEventBuilder().setErrorCode(proxyStatusCode);
-
ctx.getAuditEventBuilder().setErrorMessage(ctx.executor.getProxyErrMsg());
+ auditEventBuilder.setErrorCode(proxyStatusCode);
+
auditEventBuilder.setErrorMessage(ctx.executor.getProxyErrMsg());
}
}
}
-
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(ctx.getAuditEventBuilder().build());
+
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(auditEventBuilder.build());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index ba4fbfa1fca..d306a533bb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -22,7 +22,6 @@ import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.KillStmt;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
-import org.apache.doris.analysis.Queriable;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
@@ -34,7 +33,6 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@@ -56,10 +54,8 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.minidump.MinidumpUtils;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
-import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.proto.Data;
import org.apache.doris.qe.QueryState.MysqlStateType;
-import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
@@ -68,7 +64,6 @@ import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
@@ -276,84 +271,7 @@ public class ConnectProcessor {
private void auditAfterExec(String origStmt, StatementBase parsedStmt,
Data.PQueryStatistics statistics, boolean
printFuzzyVariables) {
- origStmt = origStmt.replace("\n", " ");
- // slow query
- long endTime = System.currentTimeMillis();
- long elapseMs = endTime - ctx.getStartTime();
- SpanContext spanContext =
Span.fromContext(Context.current()).getSpanContext();
-
- ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
- .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
- .setState(ctx.getState().toString())
- .setErrorCode(ctx.getState().getErrorCode() == null ? 0 :
ctx.getState().getErrorCode().getCode())
- .setErrorMessage((ctx.getState().getErrorMessage() == null ?
"" :
- ctx.getState().getErrorMessage().replace("\n", "
").replace("\t", " ")))
- .setQueryTime(elapseMs)
- .setScanBytes(statistics == null ? 0 :
statistics.getScanBytes())
- .setScanRows(statistics == null ? 0 : statistics.getScanRows())
- .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
- .setPeakMemoryBytes(statistics == null ? 0 :
statistics.getMaxPeakMemoryBytes())
- .setReturnRows(ctx.getReturnRows())
- .setStmtId(ctx.getStmtId())
- .setQueryId(ctx.queryId() == null ? "NaN" :
DebugUtil.printId(ctx.queryId()))
- .setTraceId(spanContext.isValid() ? spanContext.getTraceId() :
"")
- .setWorkloadGroup(ctx.getWorkloadGroupName())
- .setFuzzyVariables(!printFuzzyVariables ? "" :
ctx.getSessionVariable().printFuzzyVariables());
-
- if (ctx.getState().isQuery()) {
- MetricRepo.COUNTER_QUERY_ALL.increase(1L);
-
MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L);
- if (ctx.getState().getStateType() == MysqlStateType.ERR
- && ctx.getState().getErrType() !=
QueryState.ErrType.ANALYSIS_ERR) {
- // err query
- MetricRepo.COUNTER_QUERY_ERR.increase(1L);
-
MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L);
- } else if (ctx.getState().getStateType() == MysqlStateType.OK
- || ctx.getState().getStateType() == MysqlStateType.EOF) {
- // ok query
- MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs);
-
MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs);
-
- if (elapseMs > Config.qe_slow_log_ms) {
- String sqlDigest = DigestUtils.md5Hex(((Queriable)
parsedStmt).toDigest());
- ctx.getAuditEventBuilder().setSqlDigest(sqlDigest);
- }
- }
- ctx.getAuditEventBuilder().setIsQuery(true);
- if (ctx.getQueryDetail() != null) {
- ctx.getQueryDetail().setEventTime(endTime);
- ctx.getQueryDetail().setEndTime(endTime);
- ctx.getQueryDetail().setLatency(elapseMs);
-
ctx.getQueryDetail().setState(QueryDetail.QueryMemState.FINISHED);
- QueryDetailQueue.addOrUpdateQueryDetail(ctx.getQueryDetail());
- ctx.setQueryDetail(null);
- }
- } else {
- ctx.getAuditEventBuilder().setIsQuery(false);
- }
- ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids);
-
-
ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress());
-
- // We put origin query stmt at the end of audit log, for parsing the
log more convenient.
- if (!ctx.getState().isQuery() && (parsedStmt != null &&
parsedStmt.needAuditEncryption())) {
- ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql());
- } else {
- if (parsedStmt instanceof InsertStmt && !((InsertStmt)
parsedStmt).needLoadManager()
- && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) {
- // INSERT INTO VALUES may be very long, so we only log at most
1K bytes.
- int length = Math.min(1024, origStmt.length());
- ctx.getAuditEventBuilder().setStmt(origStmt.substring(0,
length));
- } else {
- ctx.getAuditEventBuilder().setStmt(origStmt);
- }
- }
- if (!Env.getCurrentEnv().isMaster()) {
- if (ctx.executor.isForwardToMaster()) {
-
ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus());
- }
- }
- AuditLogHelper.logAuditLog(ctx, origStmt, parsedStmt, null, true);
+ AuditLogHelper.logAuditLog(ctx, origStmt, parsedStmt, statistics,
printFuzzyVariables);
}
// Process COM_QUERY statement,
@@ -372,12 +290,6 @@ public class ConnectProcessor {
String sqlHash = DigestUtils.md5Hex(originStmt);
ctx.setSqlHash(sqlHash);
- ctx.getAuditEventBuilder().reset();
- ctx.getAuditEventBuilder()
- .setTimestamp(System.currentTimeMillis())
- .setClientIp(ctx.getMysqlChannel().getRemoteHostPortString())
-
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
- .setSqlHash(ctx.getSqlHash());
List<StatementBase> stmts = null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]