This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8be5772a9f4 [Fix](insert) add LoadStatistic fileNumber and fileSize
for insert into from s3 (#57378)
8be5772a9f4 is described below
commit 8be5772a9f4483f04b9bbb6fea685ee94d9833b1
Author: wudi <[email protected]>
AuthorDate: Wed Oct 29 15:05:58 2025 +0800
[Fix](insert) add LoadStatistic fileNumber and fileSize for insert into
from s3 (#57378)
### What problem does this PR solve?
add LoadStatistic fileNumber and fileSize for insert into tbl select *
from S3()
---
.../org/apache/doris/datasource/FileScanNode.java | 4 ++++
.../java/org/apache/doris/load/loadv2/LoadJob.java | 5 +++++
.../commands/insert/InsertIntoTableCommand.java | 21 +++++++++++++++++++++
.../streaming_job/test_streaming_insert_job.groovy | 2 +-
.../test_streaming_insert_job_offset.groovy | 4 ++--
.../test_streaming_job_restart_fe.groovy | 4 ++--
6 files changed, 35 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 8cfe183ebf0..d09c1ef1c85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -89,6 +89,10 @@ public abstract class FileScanNode extends ExternalScanNode {
return tableLevelRowCount;
}
+ public long getTotalFileSize() {
+ return totalFileSize;
+ }
+
@Override
public String getNodeExplainString(String prefix, TExplainLevel
detailLevel) {
StringBuilder output = new StringBuilder();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index f8c849dc741..947ed64244d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -248,6 +248,11 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback
this.loadStatistic.totalFileSizeB = fileSize;
}
+ public void addLoadFileInfo(int fileNum, long fileSize) {
+ this.loadStatistic.fileNum += fileNum;
+ this.loadStatistic.totalFileSizeB += fileSize;
+ }
+
/**
* Show table names for frontend
* If table name could not be found by id, the table id will be used
instead.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index f4f880e2027..cbe69bd598c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -29,10 +29,12 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ProfileManager.ProfileType;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.FileScanNode;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.dictionary.Dictionary;
+import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.CascadesContext;
@@ -69,6 +71,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.Coordinator;
@@ -529,11 +532,29 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
LOG.debug("insert into plan for query_id: {} is: {}.",
DebugUtil.printId(ctx.queryId()),
planner.getPhysicalPlan().treeString());
}
+
// step 4
BuildInsertExecutorResult build = executorFactoryRef.get().build();
+
+ // apply insert plan Statistic
+ applyInsertPlanStatistic(planner);
return build;
}
+ private void applyInsertPlanStatistic(FastInsertIntoValuesPlanner planner)
{
+ LoadJob loadJob =
Env.getCurrentEnv().getLoadManager().getLoadJob(getJobId());
+ if (loadJob == null) {
+ return;
+ }
+ for (PlanFragment fragment : planner.getFragments()) {
+ if (fragment.getPlanRoot() instanceof FileScanNode) {
+ FileScanNode fileScanNode = (FileScanNode)
fragment.getPlanRoot();
+ Env.getCurrentEnv().getLoadManager().getLoadJob(getJobId())
+ .addLoadFileInfo((int)
fileScanNode.getSelectedSplitNum(), fileScanNode.getTotalFileSize());
+ }
+ }
+ }
+
private void runInternal(ConnectContext ctx, StmtExecutor executor) throws
Exception {
AbstractInsertExecutor insertExecutor = initPlan(ctx, executor);
// if the insert stmt data source is empty, directly return, no need
to be executed.
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 93dc64dfd4e..d87b5aa17b8 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -108,7 +108,7 @@ suite("test_streaming_insert_job") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
// alter streaming job
sql """
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
index 31f8421ff42..2971222e9a0 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
@@ -173,7 +173,7 @@ suite("test_streaming_insert_job_offset") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":0,\"fileSize\":0}"
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":1,\"fileSize\":138}"
assert jobInfo.get(0).get(3) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/example_0.csv\\\"}\"}"
// alter job init offset, Lexicographic order includes example_[0-1]
@@ -211,7 +211,7 @@ suite("test_streaming_insert_job_offset") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":0,\"fileSize\":0}"
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":3,\"fileSize\":394}"
assert jobInfo.get(0).get(3) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
// has double example_1.csv and example_0.csv data
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
index 34941325ca7..f2e8bef87ab 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
@@ -97,7 +97,7 @@ suite("test_streaming_job_restart_fe", "docker") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
// Restart FE
cluster.restartFrontends()
@@ -115,7 +115,7 @@ suite("test_streaming_job_restart_fe", "docker") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
sql """drop table if exists `${tableName}` force"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]