This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new bcb6e683fdd [fix](audit) fix potential audit log missing issue
(#50357) (#50534)
bcb6e683fdd is described below
commit bcb6e683fdd3385b2095f0f6d8b8df4c74c626a2
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue May 6 09:54:15 2025 +0800
[fix](audit) fix potential audit log missing issue (#50357) (#50534)
bp #50357
---
.../apache/doris/plugin/audit/AuditLogBuilder.java | 4 +---
.../org/apache/doris/qe/AuditEventProcessor.java | 27 ++++++++++------------
.../java/org/apache/doris/qe/AuditLogHelper.java | 7 +++++-
.../java/org/apache/doris/qe/ConnectProcessor.java | 1 -
.../org/apache/doris/qe/MysqlConnectProcessor.java | 2 +-
.../WorkloadRuntimeStatusMgr.java | 11 +++++----
6 files changed, 26 insertions(+), 26 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
index 56465f9c17b..4208d5def2e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
@@ -93,9 +93,7 @@ public class AuditLogBuilder extends Plugin implements
AuditPlugin {
break;
}
} catch (Exception e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("failed to process audit event", e);
- }
+ LOG.warn("failed to process audit event: {}", event.queryId, e);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java
index 5cb826dc86c..9a1e350d411 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java
@@ -48,7 +48,7 @@ public class AuditEventProcessor {
private List<Plugin> auditPlugins;
private long lastUpdateTime = 0;
- private BlockingQueue<AuditEvent> eventQueue =
Queues.newLinkedBlockingDeque(10000);
+ private BlockingQueue<AuditEvent> eventQueue =
Queues.newLinkedBlockingDeque();
private Thread workerThread;
private volatile boolean isStopped = false;
@@ -89,22 +89,23 @@ public class AuditEventProcessor {
}
public boolean handleAuditEvent(AuditEvent auditEvent) {
- return handleAuditEvent(auditEvent, false);
- }
-
- public boolean handleAuditEvent(AuditEvent auditEvent, boolean
ignoreQueueFullLog) {
if (skipAuditUsers.contains(auditEvent.user)) {
// return true to ignore this event
return true;
}
boolean isAddSucc = true;
try {
- eventQueue.add(auditEvent);
+ if (eventQueue.size() >= Config.audit_event_log_queue_size) {
+ isAddSucc = false;
+ LOG.warn("the audit event queue is full with size {}, discard
the audit event: {}",
+ eventQueue.size(), auditEvent.queryId);
+ } else {
+ eventQueue.add(auditEvent);
+ }
} catch (Exception e) {
isAddSucc = false;
- if (!ignoreQueueFullLog) {
- LOG.warn("encounter exception when handle audit event {},
ignore", auditEvent.type, e);
- }
+ LOG.warn("encounter exception when handle audit event {}, discard
the event",
+ auditEvent.queryId, e);
}
return isAddSucc;
}
@@ -130,9 +131,7 @@ public class AuditEventProcessor {
continue;
}
} catch (InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("encounter exception when getting audit
event from queue, ignore", e);
- }
+ LOG.warn("encounter exception when getting audit event
from queue, ignore", e);
continue;
}
@@ -143,9 +142,7 @@ public class AuditEventProcessor {
}
}
} catch (Exception e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("encounter exception when processing audit
event.", e);
- }
+ LOG.warn("encounter exception when processing audit
events. ignore", e);
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 2694660330b..df95d3f3173 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -41,6 +41,7 @@ import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableComma
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
+import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.qe.QueryState.MysqlStateType;
@@ -284,7 +285,11 @@ public class AuditLogHelper {
if (ctx.getCommand() == MysqlCommand.COM_STMT_PREPARE &&
ctx.getState().getErrorCode() == null) {
auditEventBuilder.setState(String.valueOf(MysqlStateType.OK));
}
-
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(auditEventBuilder.build());
+ AuditEvent event = auditEventBuilder.build();
+
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(event);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("submit audit event: {}", event.queryId);
+ }
}
private static String getStmtType(StatementBase stmt) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 55c4488e7ac..cbc0aee98a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -377,7 +377,6 @@ public abstract class ConnectProcessor {
}
auditAfterExec(auditStmt, executor.getParsedStmt(),
executor.getQueryStatisticsForAuditLog(),
true);
- LOG.debug("Write audit logs for query {}",
DebugUtil.printId(ctx.queryId));
// execute failed, skip remaining stmts
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
break;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
index 9ded1f04cbc..97b5061a212 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
@@ -183,7 +183,7 @@ public class MysqlConnectProcessor extends ConnectProcessor
{
}
ctx.setStartTime();
- // nererids
+ // nereids
PreparedStatementContext preparedStatementContext =
ctx.getPreparedStementContext(String.valueOf(stmtId));
if (preparedStatementContext == null) {
LOG.warn("No such statement in context, stmtId:{}", stmtId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index 695bf983dc6..0c2e4f458bc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -87,7 +87,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
auditEvent.shuffleSendBytes =
queryStats.shuffle_send_bytes;
auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
}
- boolean ret =
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent, true);
+ boolean ret =
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
if (!ret) {
missedLogCount++;
} else {
@@ -110,9 +110,10 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon
{
queryAuditEventLogWriteLock();
try {
if (queryAuditEventList.size() >=
Config.audit_event_log_queue_size) {
- LOG.warn("audit log event queue size {} is full, this may
cause audit log missed."
- + "you can check whether qps is too high or
reset audit_event_log_queue_size",
- queryAuditEventList.size());
+ LOG.warn("audit log event queue size {} is full, this may
cause audit log missing statistics."
+ + "you can check whether qps is too high or "
+ + "set audit_event_log_queue_size to a larger
value in fe.conf. query id: {}",
+ queryAuditEventList.size(), event.queryId);
return;
}
event.pushToAuditLogQueueTime = System.currentTimeMillis();
@@ -122,7 +123,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
}
}
- public List<AuditEvent> getQueryNeedAudit() {
+ private List<AuditEvent> getQueryNeedAudit() {
List<AuditEvent> ret = new ArrayList<>();
long currentTime = System.currentTimeMillis();
queryAuditEventLogWriteLock();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]