This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 3231215543e branch-3.0: [Test](Export) add some debug logs for export
#47400 (#47474)
3231215543e is described below
commit 3231215543eed676f2ee5e26018a991e1ca0b6ad
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Fri Jan 31 10:18:14 2025 +0800
branch-3.0: [Test](Export) add some debug logs for export #47400 (#47474)
bp #47400
---------
Co-authored-by: BePPPower <[email protected]>
---
.../org/apache/doris/load/ExportTaskExecutor.java | 34 ++++++++++++++++++++++
regression-test/pipeline/p0/conf/fe.conf | 1 +
2 files changed, 35 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
index 94f432f1c16..b4923337402 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
@@ -41,6 +41,8 @@ import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
@@ -50,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
public class ExportTaskExecutor implements TransientTaskExecutor {
+ private static final Logger LOG =
LogManager.getLogger(ExportTaskExecutor.class);
List<StatementBase> selectStmtLists;
@@ -78,22 +81,32 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
@Override
public void execute() throws JobException {
+ LOG.debug("[Export Task] taskId: {} starting execution", taskId);
if (isCanceled.get()) {
+ LOG.debug("[Export Task] taskId: {} was already canceled before
execution", taskId);
throw new JobException("Export executor has been canceled, task
id: {}", taskId);
}
+ LOG.debug("[Export Task] taskId: {} updating state to EXPORTING",
taskId);
exportJob.updateExportJobState(ExportJobState.EXPORTING, taskId, null,
null, null);
List<OutfileInfo> outfileInfoList = Lists.newArrayList();
for (int idx = 0; idx < selectStmtLists.size(); ++idx) {
+ LOG.debug("[Export Task] taskId: {} processing statement {}/{}",
+ taskId, idx + 1, selectStmtLists.size());
if (isCanceled.get()) {
+ LOG.debug("[Export Task] taskId: {} canceled during execution
at statement {}", taskId, idx + 1);
throw new JobException("Export executor has been canceled,
task id: {}", taskId);
}
// check the version of tablets, skip if the consistency is in
partition level.
if (exportJob.getExportTable().isManagedTable() &&
!exportJob.isPartitionConsistency()) {
+ LOG.debug("[Export Task] taskId: {} checking tablet versions
for statement {}", taskId, idx + 1);
try {
Database db =
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
exportJob.getTableName().getDb());
OlapTable table =
db.getOlapTableOrAnalysisException(exportJob.getTableName().getTbl());
+ LOG.debug("[Export Lock] taskId: {}, table: {} about to
acquire readLock",
+ taskId, table.getName());
table.readLock();
+ LOG.debug("[Export Lock] taskId: {}, table: {} acquired
readLock", taskId, table.getName());
try {
List<Long> tabletIds;
LogicalPlanAdapter logicalPlanAdapter =
(LogicalPlanAdapter) selectStmtLists.get(idx);
@@ -108,6 +121,8 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
long nowVersion = partition.getVisibleVersion();
long oldVersion =
exportJob.getPartitionToVersion().get(partition.getName());
if (nowVersion != oldVersion) {
+ LOG.debug("[Export Lock] taskId: {}, table: {}
about to release readLock"
+ + "due to version mismatch", taskId,
table.getName());
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
CancelType.RUN_FAIL, "The version of
tablet {" + tabletId + "} has changed");
throw new JobException("Export Job[{}]: Tablet
{} has changed version, old version = {}"
@@ -115,11 +130,17 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
}
}
} catch (Exception e) {
+ LOG.debug("[Export Lock] taskId: {}, table: {} about
to release readLock"
+ + "due to exception: {}", taskId,
table.getName(), e.getMessage());
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
ExportFailMsg.CancelType.RUN_FAIL,
e.getMessage());
throw new JobException(e);
} finally {
+ LOG.debug("[Export Lock] taskId: {}, table: {}
releasing readLock in finally block",
+ taskId, table.getName());
table.readUnlock();
+ LOG.debug("[Export Lock] taskId: {}, table: {}
released readLock successfully",
+ taskId, table.getName());
}
} catch (AnalysisException e) {
exportJob.updateExportJobState(ExportJobState.CANCELLED,
taskId, null,
@@ -129,26 +150,39 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
}
try (AutoCloseConnectContext r = buildConnectContext()) {
+ LOG.debug("[Export Task] taskId: {} executing statement {}",
taskId, idx + 1);
stmtExecutor = new StmtExecutor(r.connectContext,
selectStmtLists.get(idx));
stmtExecutor.execute();
if (r.connectContext.getState().getStateType() ==
MysqlStateType.ERR) {
+ LOG.debug("[Export Task] taskId: {} failed with MySQL
error: {}", taskId,
+ r.connectContext.getState().getErrorMessage());
exportJob.updateExportJobState(ExportJobState.CANCELLED,
taskId, null,
ExportFailMsg.CancelType.RUN_FAIL,
r.connectContext.getState().getErrorMessage());
return;
}
+ LOG.debug("[Export Task] taskId: {} statement {} executed
successfully", taskId, idx + 1);
OutfileInfo outfileInfo =
getOutFileInfo(r.connectContext.getResultAttachedInfo());
+ LOG.debug("[Export Task] taskId: {} got outfile info for
statement {}:"
+ + "fileNumber={}, totalRows={}, fileSize={}",
+ taskId, idx + 1, outfileInfo.getFileNumber(),
+ outfileInfo.getTotalRows(), outfileInfo.getFileSize());
outfileInfoList.add(outfileInfo);
} catch (Exception e) {
+ LOG.debug("[Export Task] taskId: {} failed with exception
during statement {}: {}",
+ taskId, idx + 1, e.getMessage(), e);
exportJob.updateExportJobState(ExportJobState.CANCELLED,
taskId, null,
ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
throw new JobException(e);
}
}
if (isCanceled.get()) {
+ LOG.debug("[Export Task] taskId: {} canceled after processing all
statements", taskId);
throw new JobException("Export executor has been canceled, task
id: {}", taskId);
}
+ LOG.debug("[Export Task] taskId: {} completed successfully, updating
state to FINISHED", taskId);
exportJob.updateExportJobState(ExportJobState.FINISHED, taskId,
outfileInfoList, null, null);
isFinished.getAndSet(true);
+ LOG.debug("[Export Task] taskId: {} execution completed", taskId);
}
@Override
diff --git a/regression-test/pipeline/p0/conf/fe.conf
b/regression-test/pipeline/p0/conf/fe.conf
index 38877abad93..640c9418ac2 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -34,6 +34,7 @@
JAVA_OPTS_FOR_JDK_17="-Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m
sys_log_level = INFO
sys_log_mode = NORMAL
+sys_log_verbose_modules =
org.apache.doris.common.profile,org.apache.doris.qe.QeProcessorImpl,org.apache.doris.load.ExportTaskExecutor
arrow_flight_sql_port = 8081
catalog_trash_expire_second=1
#enable ssl for test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]