This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7a41629 [Audit] Support builtin load audit function to record
successful bulk load job (#5183)
7a41629 is described below
commit 7a41629fbe8995c5688ba3221470f72b862028e3
Author: caiconghui <[email protected]>
AuthorDate: Wed Mar 3 17:01:02 2021 +0800
[Audit] Support builtin load audit function to record successful bulk load
job (#5183)
* [Audit] Support builtin load audit function to record successful bulk
load job
Co-authored-by: caiconghui [蔡聪辉] <[email protected]>
---
.../java/org/apache/doris/common/AuditLog.java | 5 +
.../main/java/org/apache/doris/common/Config.java | 2 +-
.../org/apache/doris/load/loadv2/BulkLoadJob.java | 43 +++++++
.../java/org/apache/doris/load/loadv2/LoadJob.java | 75 +++++++-----
.../java/org/apache/doris/plugin/AuditEvent.java | 3 +-
.../org/apache/doris/plugin/LoadAuditEvent.java | 130 +++++++++++++++++++++
.../java/org/apache/doris/qe/AuditLogBuilder.java | 96 ++++++++++-----
7 files changed, 293 insertions(+), 61 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java
index 0e1b609..235d783 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java
@@ -25,6 +25,7 @@ public class AuditLog {
public static final AuditLog SLOW_AUDIT = new AuditLog("audit.slow_query");
public static final AuditLog QUERY_AUDIT = new AuditLog("audit.query");
+ public static final AuditLog LOAD_AUDIT = new AuditLog("audit.load");
private Logger logger;
@@ -36,6 +37,10 @@ public class AuditLog {
return SLOW_AUDIT;
}
+ public static AuditLog getLoadAudit() {
+ return LOAD_AUDIT;
+ }
+
public AuditLog(String auditName) {
logger = LogManager.getLogger(auditName);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 3e1ed6d..91000e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -103,7 +103,7 @@ public class Config extends ConfigBase {
*/
@ConfField public static String audit_log_dir = PaloFe.DORIS_HOME_DIR +
"/log";
@ConfField public static int audit_log_roll_num = 90;
- @ConfField public static String[] audit_log_modules = {"slow_query",
"query"};
+ @ConfField public static String[] audit_log_modules = {"slow_query",
"query", "load"};
@ConfField(mutable = true) public static long qe_slow_log_ms = 5000;
@ConfField public static String audit_log_roll_interval = "DAY";
@ConfField public static String audit_log_delete_age = "30d";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
index 35334ed..77d7e6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
@@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2;
+import org.apache.commons.lang.StringUtils;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.LoadStmt;
@@ -39,6 +40,8 @@ import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
+import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.plugin.LoadAuditEvent;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
@@ -361,4 +364,44 @@ public abstract class BulkLoadJob extends LoadJob {
public UserIdentity getUserInfo() {
return userInfo;
}
+
+ @Override
+ protected void auditFinishedLoadJob() {
+ try {
+ String dbName = getDb().getFullName();
+ String tableListName = StringUtils.join(getTableNames(), ",");
+ List<String> filePathList = Lists.newArrayList();
+ for (List<BrokerFileGroup> brokerFileGroups :
fileGroupAggInfo.getAggKeyToFileGroups().values()) {
+ for (BrokerFileGroup brokerFileGroup : brokerFileGroups) {
+ filePathList.add("(" +
StringUtils.join(brokerFileGroup.getFilePaths(), ",") + ")");
+ }
+ }
+ String filePathListName = StringUtils.join(filePathList, ",");
+ String brokerUserName = getBrokerUserName();
+ AuditEvent auditEvent = new
LoadAuditEvent.AuditEventBuilder().setEventType(AuditEvent.EventType.LOAD_SUCCEED)
+
.setJobId(id).setLabel(label).setLoadType(jobType.name()).setDb(dbName).setTableList(tableListName)
+
.setFilePathList(filePathListName).setBrokerUser(brokerUserName).setTimestamp(createTimestamp)
+
.setLoadStartTime(loadStartTimestamp).setLoadFinishTime(finishTimestamp)
+
.setScanRows(loadStatistic.getScannedRows()).setScanBytes(loadStatistic.totalFileSizeB)
+ .setFileNumber(loadStatistic.fileNum)
+ .build();
+
Catalog.getCurrentCatalog().getAuditEventProcessor().handleAuditEvent(auditEvent);
+ } catch (Exception e) {
+ LOG.warn("audit finished load job info failed", e);
+ }
+ }
+
+ private String getBrokerUserName() {
+ Map<String, String> properties = brokerDesc.getProperties();
+ if (properties.containsKey("kerberos_principal")) {
+ return properties.get("kerberos_principal");
+ } else if (properties.containsKey("username")) {
+ return properties.get("username");
+ } else if (properties.containsKey("bos_accesskey")) {
+ return properties.get("bos_accesskey");
+ } else if (properties.containsKey("fs.s3a.access.key")) {
+ return properties.get("fs.s3a.access.key");
+ }
+ return null;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index db79fa2..75bc2f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -163,7 +163,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
}
public synchronized void updateLoadProgress(long backendId, TUniqueId
loadId, TUniqueId fragmentId,
- long rows, boolean isDone) {
+ long rows, boolean isDone)
{
if (counterTbl.contains(loadId, fragmentId)) {
counterTbl.put(loadId, fragmentId, rows);
}
@@ -172,6 +172,14 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
}
}
+ public synchronized long getScannedRows() {
+ long total = 0;
+ for (long rows : counterTbl.values()) {
+ total += rows;
+ }
+ return total;
+ }
+
public synchronized String toJson() {
long total = 0;
for (long rows : counterTbl.values()) {
@@ -243,6 +251,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
public long getDbId() {
return dbId;
}
+
public String getLabel() {
return label;
}
@@ -305,6 +314,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
* The method is invoked by 'checkAuth' when authorization info is null in
job.
* Also it is invoked by 'gatherAuthInfo' which saves the auth info in the
constructor of job.
* Throw MetaNofFoundException when table name could not be found.
+ *
* @return
*/
abstract Set<String> getTableNames() throws MetaNotFoundException;
@@ -385,10 +395,10 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
* create pending task for load job and add pending task into pool
* if job has been cancelled, this step will be ignored
*
- * @throws LabelAlreadyUsedException the job is duplicated
- * @throws BeginTransactionException the limit of load job is exceeded
- * @throws AnalysisException there are error params in job
- * @throws DuplicatedRequestException
+ * @throws LabelAlreadyUsedException the job is duplicated
+ * @throws BeginTransactionException the limit of load job is exceeded
+ * @throws AnalysisException there are error params in job
+ * @throws DuplicatedRequestException
*/
public void execute() throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException,
DuplicatedRequestException, LoadException {
@@ -513,15 +523,15 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
}
if (isCommitting) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
- .add("error_msg", "The txn which belongs to
job is committing. "
- + "The job could not be cancelled in
this step").build());
+ .add("error_msg", "The txn which belongs to job is
committing. "
+ + "The job could not be cancelled in this
step").build());
throw new DdlException("Job could not be cancelled while txn
is committing");
}
if (isTxnDone()) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
- .add("state", state)
- .add("error_msg", "Job could not be cancelled
when job is " + state)
- .build());
+ .add("state", state)
+ .add("error_msg", "Job could not be cancelled when job
is " + state)
+ .build());
throw new DdlException("Job could not be cancelled when job is
finished or cancelled");
}
@@ -539,9 +549,9 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
return;
}
if
(!Catalog.getCurrentCatalog().getAuth().checkPrivByAuthInfo(ConnectContext.get(),
authorizationInfo,
-
PrivPredicate.LOAD)) {
+ PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
- PaloPrivilege.LOAD_PRIV);
+ PaloPrivilege.LOAD_PRIV);
}
}
@@ -563,18 +573,18 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
if (tableNames.isEmpty()) {
// forward compatibility
if
(!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(),
db.getFullName(),
-
PrivPredicate.LOAD)) {
+ PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
- PaloPrivilege.LOAD_PRIV);
+ PaloPrivilege.LOAD_PRIV);
}
} else {
for (String tblName : tableNames) {
if
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
db.getFullName(),
-
tblName, PrivPredicate.LOAD)) {
+ tblName, PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
- command,
-
ConnectContext.get().getQualifiedUser(),
-
ConnectContext.get().getRemoteIP(), tblName);
+ command,
+ ConnectContext.get().getQualifiedUser(),
+ ConnectContext.get().getRemoteIP(), tblName);
}
}
}
@@ -587,8 +597,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
* This method will cancel job without edit log and lock
*
* @param failMsg
- * @param abortTxn
- * true: abort txn when cancel job, false: only change the
state of job and ignore abort txn
+ * @param abortTxn true: abort txn when cancel job, false: only change the
state of job and ignore abort txn
*/
protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn)
{
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id).add("transaction_id",
transactionId)
@@ -600,8 +609,8 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
// get load ids of all loading tasks, we will cancel their coordinator
process later
List<TUniqueId> loadIds = Lists.newArrayList();
for (LoadTask loadTask : idToTasks.values()) {
- if (loadTask instanceof LoadLoadingTask ) {
- loadIds.add(((LoadLoadingTask)loadTask).getLoadId());
+ if (loadTask instanceof LoadLoadingTask) {
+ loadIds.add(((LoadLoadingTask) loadTask).getLoadId());
}
}
idToTasks.clear();
@@ -622,9 +631,9 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
// abort txn
try {
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id)
- .add("transaction_id", transactionId)
- .add("msg", "begin to abort txn")
- .build());
+ .add("transaction_id", transactionId)
+ .add("msg", "begin to abort txn")
+ .build());
Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, transactionId,
failMsg.getMsg());
} catch (UserException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
@@ -633,7 +642,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
.build());
}
}
-
+
// cancel all running coordinators, so that the scheduler's worker
thread will be released
for (TUniqueId loadId : loadIds) {
Coordinator coordinator =
QeProcessorImpl.INSTANCE.getCoordinator(loadId);
@@ -677,7 +686,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
protected void logFinalOperation() {
Catalog.getCurrentCatalog().getEditLog().logEndLoadJob(
new LoadJobFinalOperation(id, loadingStatus, progress,
loadStartTimestamp, finishTimestamp,
- state, failMsg));
+ state, failMsg));
}
public void unprotectReadEndOperation(LoadJobFinalOperation
loadJobFinalOperation) {
@@ -731,7 +740,6 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
// task info
jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" +
getTimeout()
+ "; max_filter_ratio:" + getMaxFilterRatio());
-
// error msg
if (failMsg == null) {
jobInfo.add(FeConstants.null_string);
@@ -904,6 +912,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
}
replayTxnAttachment(txnState);
updateState(JobState.FINISHED);
+ auditFinishedLoadJob();
}
@Override
@@ -951,10 +960,10 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
LoadJob other = (LoadJob) obj;
return this.id == other.id
- && this.dbId == other.dbId
- && this.label.equals(other.label)
- && this.state.equals(other.state)
- && this.jobType.equals(other.jobType);
+ && this.dbId == other.dbId
+ && this.label.equals(other.label)
+ && this.state.equals(other.state)
+ && this.jobType.equals(other.jobType);
}
@Override
@@ -1098,6 +1107,8 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
loadStartTimestamp = info.getLoadStartTimestamp();
}
+ protected void auditFinishedLoadJob() {}
+
public static class LoadJobStateUpdateInfo implements Writable {
@SerializedName(value = "jobId")
private long jobId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 1e9f9a7..c02cdd8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -35,7 +35,8 @@ public class AuditEvent {
CONNECTION,
DISCONNECTION,
BEFORE_QUERY,
- AFTER_QUERY
+ AFTER_QUERY,
+ LOAD_SUCCEED
}
@Retention(RetentionPolicy.RUNTIME)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java
new file mode 100644
index 0000000..a79fa80
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.plugin;
+
+public class LoadAuditEvent extends AuditEvent {
+
+ @AuditField(value = "JobId")
+ public long jobId = -1;
+ @AuditField(value = "Label")
+ public String label = "";
+ @AuditField(value = "LoadType")
+ public String loadType = "";
+ @AuditField(value = "TableList")
+ public String tableList = "";
+ @AuditField(value = "FilePathList")
+ public String filePathList = "";
+ // for Baidu HDFS/AFS it is username
+ // for BOS, it is bos_accesskey
+ // for Apache HDFS, it it username or kerberos_principal
+ // for Amazon S3, it is fs.s3a.access.key
+ @AuditField(value = "BrokerUser")
+ public String brokerUser = "";
+ @AuditField(value = "LoadStartTime")
+ public long loadStartTime = -1;
+ @AuditField(value = "LoadFinishTime")
+ public long loadFinishTime = -1;
+ @AuditField(value = "FileNumber")
+ public long fileNumber = -1;
+
+ public static class AuditEventBuilder {
+
+ private LoadAuditEvent auditEvent = new LoadAuditEvent();
+
+ public AuditEventBuilder() {
+ }
+
+ public void reset() {
+ auditEvent = new LoadAuditEvent();
+ }
+
+ public AuditEventBuilder setEventType(EventType eventType) {
+ auditEvent.type = eventType;
+ return this;
+ }
+
+ public AuditEventBuilder setJobId(long jobId) {
+ auditEvent.jobId = jobId;
+ return this;
+ }
+
+ public AuditEventBuilder setLabel(String label) {
+ auditEvent.label = label;
+ return this;
+ }
+
+ public AuditEventBuilder setLoadType(String loadType) {
+ auditEvent.loadType = loadType;
+ return this;
+ }
+
+ public AuditEventBuilder setDb(String db) {
+ auditEvent.db = db;
+ return this;
+ }
+
+ public AuditEventBuilder setTableList(String tableList) {
+ auditEvent.tableList = tableList;
+ return this;
+ }
+
+ public AuditEventBuilder setFilePathList(String filePathList) {
+ auditEvent.filePathList = filePathList;
+ return this;
+ }
+
+ public AuditEventBuilder setBrokerUser(String brokerUser) {
+ auditEvent.brokerUser = brokerUser;
+ return this;
+ }
+
+ public AuditEventBuilder setTimestamp(long timestamp) {
+ auditEvent.timestamp = timestamp;
+ return this;
+ }
+
+ public AuditEventBuilder setLoadStartTime(long loadStartTime) {
+ auditEvent.loadStartTime = loadStartTime;
+ return this;
+ }
+
+ public AuditEventBuilder setLoadFinishTime(long loadFinishTime) {
+ auditEvent.loadFinishTime = loadFinishTime;
+ return this;
+ }
+
+ public AuditEventBuilder setScanRows(long scanRows) {
+ auditEvent.scanRows = scanRows;
+ return this;
+ }
+
+ public AuditEventBuilder setScanBytes(long scanBytes) {
+ auditEvent.scanBytes = scanBytes;
+ return this;
+ }
+
+ public AuditEventBuilder setFileNumber(long fileNumber) {
+ auditEvent.fileNumber = fileNumber;
+ return this;
+ }
+
+ public AuditEvent build() {
+ return this.auditEvent;
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java
index 29214c8..3fd1b0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java
@@ -17,6 +17,8 @@
package org.apache.doris.qe;
+import avro.shaded.com.google.common.collect.Maps;
+import avro.shaded.com.google.common.collect.Sets;
import org.apache.doris.common.AuditLog;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.DigitalVersion;
@@ -33,6 +35,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Set;
// A builtin Audit plugin, registered when FE start.
// it will receive "AFTER_QUERY" AuditEvent and print it as a log in
fe.audit.log
@@ -41,10 +45,17 @@ public class AuditLogBuilder extends Plugin implements
AuditPlugin {
private PluginInfo pluginInfo;
+ private final String[] LOAD_ANNONATION_NAMES = {"JobId", "Label",
"LoadType", "Db", "TableList",
+ "FilePathList", "BrokerUser", "Timestamp", "LoadStartTime",
"LoadFinishTime", "ScanRows",
+ "ScanBytes", "FileNumber"};
+
+ private Set<String> loadAnnotationSet;
+
public AuditLogBuilder() {
pluginInfo = new PluginInfo(PluginMgr.BUILTIN_PLUGIN_PREFIX +
"AuditLogBuilder", PluginType.AUDIT,
"builtin audit logger", DigitalVersion.fromString("0.12.0"),
DigitalVersion.fromString("1.8.31"),
AuditLogBuilder.class.getName(), null, null);
+ loadAnnotationSet = Sets.newHashSet(LOAD_ANNONATION_NAMES);
}
public PluginInfo getPluginInfo() {
@@ -53,41 +64,72 @@ public class AuditLogBuilder extends Plugin implements
AuditPlugin {
@Override
public boolean eventFilter(EventType type) {
- return type == EventType.AFTER_QUERY;
+ return type == EventType.AFTER_QUERY || type == EventType.LOAD_SUCCEED;
}
@Override
public void exec(AuditEvent event) {
try {
- StringBuilder sb = new StringBuilder();
- long queryTime = 0;
- // get each field with annotation "AuditField" in AuditEvent
- // and assemble them into a string.
- Field[] fields = event.getClass().getFields();
- for (Field f : fields) {
- AuditField af = f.getAnnotation(AuditField.class);
- if (af == null) {
- continue;
- }
-
- if (af.value().equals("Timestamp")) {
- continue;
- }
-
- if (af.value().equals("Time")) {
- queryTime = (long) f.get(event);
- }
-
sb.append("|").append(af.value()).append("=").append(String.valueOf(f.get(event)));
+ switch (event.type) {
+ case AFTER_QUERY:
+ auditQueryLog(event);
+ break;
+ case LOAD_SUCCEED:
+ auditLoadLog(event);
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ LOG.debug("failed to process audit event", e);
+ }
+ }
+
+ private void auditQueryLog(AuditEvent event) throws IllegalAccessException
{
+ StringBuilder sb = new StringBuilder();
+ long queryTime = 0;
+ // get each field with annotation "AuditField" in AuditEvent
+ // and assemble them into a string.
+ Field[] fields = event.getClass().getFields();
+ for (Field f : fields) {
+ AuditField af = f.getAnnotation(AuditField.class);
+ if (af == null) {
+ continue;
}
- String auditLog = sb.toString();
- AuditLog.getQueryAudit().log(auditLog);
- // slow query
- if (queryTime > Config.qe_slow_log_ms) {
- AuditLog.getSlowAudit().log(auditLog);
+ if (af.value().equals("Timestamp")) {
+ continue;
}
- } catch (Exception e) {
- LOG.debug("failed to process audit event", e);
+
+ if (af.value().equals("Time")) {
+ queryTime = (long) f.get(event);
+ }
+
sb.append("|").append(af.value()).append("=").append(String.valueOf(f.get(event)));
+ }
+
+ String auditLog = sb.toString();
+ AuditLog.getQueryAudit().log(auditLog);
+ // slow query
+ if (queryTime > Config.qe_slow_log_ms) {
+ AuditLog.getSlowAudit().log(auditLog);
+ }
+ }
+
+ private void auditLoadLog(AuditEvent event) throws IllegalAccessException {
+ Field[] fields = event.getClass().getFields();
+ Map<String, String> annotationToFieldValueMap = Maps.newHashMap();
+ for (Field f : fields) {
+ AuditField af = f.getAnnotation(AuditField.class);
+ if (af == null || !loadAnnotationSet.contains(af.value())) {
+ continue;
+ }
+ annotationToFieldValueMap.put(af.value(),
String.valueOf(f.get(event)));
+ }
+ StringBuilder sb = new StringBuilder();
+ for (String annotation : LOAD_ANNONATION_NAMES) {
+
sb.append("|").append(annotation).append("=").append(annotationToFieldValueMap.get(annotation));
}
+ String auditLog = sb.toString();
+ AuditLog.getLoadAudit().log(auditLog);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]