This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new f1282b18098 branch-4.0: [Fix](insert) add LoadStatistic fileNumber and
fileSize for insert into from s3 #57378 (#57455)
f1282b18098 is described below
commit f1282b1809821d5b362813cb4013010680dedb9f
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Oct 31 09:29:19 2025 +0800
branch-4.0: [Fix](insert) add LoadStatistic fileNumber and fileSize for
insert into from s3 #57378 (#57455)
Cherry-picked from #57378
Co-authored-by: wudi <[email protected]>
---
.../org/apache/doris/datasource/FileScanNode.java | 4 ++++
.../java/org/apache/doris/load/loadv2/LoadJob.java | 5 +++++
.../commands/insert/InsertIntoTableCommand.java | 21 +++++++++++++++++++++
.../streaming_job/test_streaming_insert_job.groovy | 2 +-
.../test_streaming_insert_job_offset.groovy | 4 ++--
.../test_streaming_job_restart_fe.groovy | 4 ++--
6 files changed, 35 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 8cfe183ebf0..d09c1ef1c85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -89,6 +89,10 @@ public abstract class FileScanNode extends ExternalScanNode {
return tableLevelRowCount;
}
+ public long getTotalFileSize() {
+ return totalFileSize;
+ }
+
@Override
public String getNodeExplainString(String prefix, TExplainLevel
detailLevel) {
StringBuilder output = new StringBuilder();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 02a7af54da0..9e90df00403 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -248,6 +248,11 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback
this.loadStatistic.totalFileSizeB = fileSize;
}
+ public void addLoadFileInfo(int fileNum, long fileSize) {
+ this.loadStatistic.fileNum += fileNum;
+ this.loadStatistic.totalFileSizeB += fileSize;
+ }
+
/**
* Show table names for frontend
* If table name could not be found by id, the table id will be used
instead.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index ba666a1d5c6..e618d1dffbc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -29,10 +29,12 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ProfileManager.ProfileType;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.FileScanNode;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.dictionary.Dictionary;
+import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.CascadesContext;
@@ -67,6 +69,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.Coordinator;
@@ -508,11 +511,29 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
LOG.debug("insert into plan for query_id: {} is: {}.",
DebugUtil.printId(ctx.queryId()),
planner.getPhysicalPlan().treeString());
}
+
// step 4
BuildInsertExecutorResult build = executorFactoryRef.get().build();
+
+ // apply insert plan Statistic
+ applyInsertPlanStatistic(planner);
return build;
}
+ private void applyInsertPlanStatistic(FastInsertIntoValuesPlanner planner)
{
+ LoadJob loadJob =
Env.getCurrentEnv().getLoadManager().getLoadJob(getJobId());
+ if (loadJob == null) {
+ return;
+ }
+ for (PlanFragment fragment : planner.getFragments()) {
+ if (fragment.getPlanRoot() instanceof FileScanNode) {
+ FileScanNode fileScanNode = (FileScanNode)
fragment.getPlanRoot();
+ Env.getCurrentEnv().getLoadManager().getLoadJob(getJobId())
+ .addLoadFileInfo((int)
fileScanNode.getSelectedSplitNum(), fileScanNode.getTotalFileSize());
+ }
+ }
+ }
+
private void runInternal(ConnectContext ctx, StmtExecutor executor) throws
Exception {
AbstractInsertExecutor insertExecutor = initPlan(ctx, executor);
// if the insert stmt data source is empty, directly return, no need
to be executed.
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 1e65397742a..18632929329 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -108,7 +108,7 @@ suite("test_streaming_insert_job") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
// alter streaming job
sql """
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
index f1c2f91e00b..f0dc425abdd 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
@@ -173,7 +173,7 @@ suite("test_streaming_insert_job_offset") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":0,\"fileSize\":0}"
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":1,\"fileSize\":138}"
assert jobInfo.get(0).get(3) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/example_0.csv\\\"}\"}"
// alter job init offset, Lexicographic order includes example_[0-1]
@@ -211,7 +211,7 @@ suite("test_streaming_insert_job_offset") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":0,\"fileSize\":0}"
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":3,\"fileSize\":394}"
assert jobInfo.get(0).get(3) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
// has double example_1.csv and example_0.csv data
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
index 34941325ca7..f2e8bef87ab 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
@@ -97,7 +97,7 @@ suite("test_streaming_job_restart_fe", "docker") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
// Restart FE
cluster.restartFrontends()
@@ -115,7 +115,7 @@ suite("test_streaming_job_restart_fe", "docker") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
sql """drop table if exists `${tableName}` force"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]