This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 484731e28ae [fix](insert) fix insert into statistic never update
(#56412)
484731e28ae is described below
commit 484731e28aeed98eb09efd7617790f5261f4c5ec
Author: hui lai <[email protected]>
AuthorDate: Mon Sep 29 10:20:22 2025 +0800
[fix](insert) fix insert into statistic never update (#56412)
### What problem does this PR solve?
1. support insert can be observed by `show load` command when loading
2. fix insert into statistic never update: if user executes insert
command, and show load after insert
before bug fixed:
```
JobDetails: {"Unfinished
backends":{},"ScannedRows":0,"TaskNumber":0,"LoadBytes":0,"All
backends":{},"FileNumber":0,"FileSize":0}
```
after bug fixed:
```
JobDetails: {"Unfinished
backends":{"1c59379fc9214ec0-866797d1c03a7e4e":[]},"ScannedRows":5,"TaskNumber":1,"LoadBytes":156,"All
backends":{"1c59379fc9214ec0-866797d1c03a7e4e":[1754377661179]},"FileNumber":0,"FileSize":0}
```
---
.../java/org/apache/doris/catalog/EnvFactory.java | 8 ++
.../doris/cloud/catalog/CloudEnvFactory.java | 9 ++
.../apache/doris/load/loadv2/InsertLoadJob.java | 28 ++----
.../org/apache/doris/load/loadv2/LoadManager.java | 58 +++++++----
.../commands/insert/AbstractInsertExecutor.java | 9 +-
.../plans/commands/insert/OlapInsertExecutor.java | 2 +-
.../org/apache/doris/qe/NereidsCoordinator.java | 19 +++-
regression-test/suites/ddl_p0/test_ctas.groovy | 8 +-
.../load_p0/insert/test_insert_statistic.groovy | 111 +++++++++++++++++++++
9 files changed, 203 insertions(+), 49 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
index f095e98cf8c..08c78986bcb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
@@ -147,6 +147,14 @@ public class EnvFactory {
return new Coordinator(context, planner, statsErrorEstimator);
}
+ public Coordinator createCoordinator(ConnectContext context, Planner
planner,
+ StatsErrorEstimator
statsErrorEstimator, long jobId) {
+ if (planner instanceof NereidsPlanner &&
SessionVariable.canUseNereidsDistributePlanner()) {
+ return new NereidsCoordinator(context, (NereidsPlanner) planner,
statsErrorEstimator, jobId);
+ }
+ return new Coordinator(context, planner, statsErrorEstimator);
+ }
+
// Used for broker load task/export task/update coordinator
public Coordinator createCoordinator(Long jobId, TUniqueId queryId,
DescriptorTable descTable,
List<PlanFragment> fragments,
List<ScanNode> scanNodes,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
index 25bc8dcb0e4..958c4fd5e1f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
@@ -158,6 +158,15 @@ public class CloudEnvFactory extends EnvFactory {
return new CloudCoordinator(context, planner, statsErrorEstimator);
}
+ @Override
+ public Coordinator createCoordinator(ConnectContext context, Planner
planner,
+ StatsErrorEstimator
statsErrorEstimator, long jobId) {
+ if (planner instanceof NereidsPlanner &&
SessionVariable.canUseNereidsDistributePlanner()) {
+ return new NereidsCoordinator(context, (NereidsPlanner) planner,
statsErrorEstimator, jobId);
+ }
+ return new CloudCoordinator(context, planner, statsErrorEstimator);
+ }
+
@Override
public Coordinator createCoordinator(Long jobId, TUniqueId queryId,
DescriptorTable descTable,
List<PlanFragment> fragments,
List<ScanNode> scanNodes,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
index 737b3134270..e52a87fe926 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
@@ -53,33 +53,27 @@ public class InsertLoadJob extends LoadJob {
super(EtlJobType.INSERT);
}
+ public InsertLoadJob(long dbId, String label) {
+ super(EtlJobType.INSERT, dbId, label);
+ }
+
public InsertLoadJob(String label, long transactionId, long dbId, long
tableId,
long createTimestamp, String failMsg, String trackingUrl, String
firstErrorMsg,
UserIdentity userInfo) throws MetaNotFoundException {
super(EtlJobType.INSERT, dbId, label);
- this.tableId = tableId;
- this.transactionId = transactionId;
- this.createTimestamp = createTimestamp;
- this.loadStartTimestamp = createTimestamp;
- this.finishTimestamp = System.currentTimeMillis();
- if (Strings.isNullOrEmpty(failMsg)) {
- this.state = JobState.FINISHED;
- this.progress = 100;
- } else {
- this.state = JobState.CANCELLED;
- this.failMsg = new FailMsg(CancelType.LOAD_RUN_FAIL, failMsg);
- this.progress = 0;
- }
- this.authorizationInfo = gatherAuthInfo();
- this.loadingStatus.setTrackingUrl(trackingUrl);
- this.loadingStatus.setFirstErrorMsg(firstErrorMsg);
- this.userInfo = userInfo;
+ setJobProperties(transactionId, tableId, createTimestamp, failMsg,
trackingUrl, firstErrorMsg, userInfo);
}
public InsertLoadJob(String label, long transactionId, long dbId, long
tableId,
long createTimestamp, String failMsg, String
trackingUrl, String firstErrorMsg,
UserIdentity userInfo, Long jobId) throws
MetaNotFoundException {
super(EtlJobType.INSERT_JOB, dbId, label, jobId);
+ setJobProperties(transactionId, tableId, createTimestamp, failMsg,
trackingUrl, firstErrorMsg, userInfo);
+ }
+
+ public void setJobProperties(long transactionId, long tableId, long
createTimestamp,
+ String failMsg, String trackingUrl,
String firstErrorMsg,
+ UserIdentity userInfo) throws
MetaNotFoundException {
this.tableId = tableId;
this.transactionId = transactionId;
this.createTimestamp = createTimestamp;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 8aa6b85c975..43ab01fb942 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -162,7 +162,10 @@ public class LoadManager implements Writable {
private long unprotectedGetUnfinishedJobNum() {
return idToLoadJob.values().stream()
- .filter(j -> (j.getState() != JobState.FINISHED &&
j.getState() != JobState.CANCELLED)).count();
+ .filter(j -> (j.getJobType() != EtlJobType.INSERT
+ && j.getJobType() != EtlJobType.INSERT_JOB
+ && j.getState() != JobState.FINISHED
+ && j.getState() !=
JobState.CANCELLED)).count();
}
public MysqlLoadManager getMysqlLoadManager() {
@@ -189,17 +192,20 @@ public class LoadManager implements Writable {
}
}
- private void addLoadJob(LoadJob loadJob) {
- idToLoadJob.put(loadJob.getId(), loadJob);
- long dbId = loadJob.getDbId();
- if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
- dbIdToLabelToLoadJobs.put(loadJob.getDbId(), new
ConcurrentHashMap<>());
- }
- Map<String, List<LoadJob>> labelToLoadJobs =
dbIdToLabelToLoadJobs.get(dbId);
- if (!labelToLoadJobs.containsKey(loadJob.getLabel())) {
- labelToLoadJobs.put(loadJob.getLabel(), new ArrayList<>());
+ public void addLoadJob(LoadJob loadJob) {
+ // Insert label may be null in txn mode, we add txn insert job after
success.
+ if (loadJob.getLabel() != null) {
+ idToLoadJob.put(loadJob.getId(), loadJob);
+ long dbId = loadJob.getDbId();
+ if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
+ dbIdToLabelToLoadJobs.put(loadJob.getDbId(), new
ConcurrentHashMap<>());
+ }
+ Map<String, List<LoadJob>> labelToLoadJobs =
dbIdToLabelToLoadJobs.get(dbId);
+ if (!labelToLoadJobs.containsKey(loadJob.getLabel())) {
+ labelToLoadJobs.put(loadJob.getLabel(), new ArrayList<>());
+ }
+ labelToLoadJobs.get(loadJob.getLabel()).add(loadJob);
}
- labelToLoadJobs.get(loadJob.getLabel()).add(loadJob);
}
/**
@@ -213,17 +219,25 @@ public class LoadManager implements Writable {
Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException(dbName);
LoadJob loadJob;
- switch (jobType) {
- case INSERT:
- loadJob = new InsertLoadJob(label, transactionId, db.getId(),
tableId, createTimestamp, failMsg,
- trackingUrl, firstErrorMsg, userInfo);
- break;
- case INSERT_JOB:
- loadJob = new InsertLoadJob(label, transactionId, db.getId(),
tableId, createTimestamp, failMsg,
- trackingUrl, firstErrorMsg, userInfo, jobId);
- break;
- default:
- return;
+ if (idToLoadJob.containsKey(jobId)) {
+ loadJob = idToLoadJob.get(jobId);
+ if (loadJob instanceof InsertLoadJob) {
+ ((InsertLoadJob) loadJob).setJobProperties(transactionId,
tableId, createTimestamp,
+ failMsg, trackingUrl, firstErrorMsg, userInfo);
+ }
+ } else {
+ switch (jobType) {
+ case INSERT:
+ loadJob = new InsertLoadJob(label, transactionId,
db.getId(), tableId, createTimestamp, failMsg,
+ trackingUrl, firstErrorMsg, userInfo);
+ break;
+ case INSERT_JOB:
+ loadJob = new InsertLoadJob(label, transactionId,
db.getId(), tableId, createTimestamp, failMsg,
+ trackingUrl, firstErrorMsg, userInfo, jobId);
+ break;
+ default:
+ return;
+ }
}
addLoadJob(loadJob);
// persistent
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index dfa2879efef..bc032a9d61f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.load.loadv2.InsertLoadJob;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
@@ -56,6 +57,7 @@ public abstract class AbstractInsertExecutor {
protected long jobId;
protected final ConnectContext ctx;
protected final Coordinator coordinator;
+ protected InsertLoadJob insertLoadJob;
protected String labelName;
protected final DatabaseIf database;
protected final TableIf table;
@@ -74,10 +76,13 @@ public abstract class AbstractInsertExecutor {
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String
labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
this.ctx = ctx;
- this.coordinator = EnvFactory.getInstance().createCoordinator(ctx,
planner, ctx.getStatsErrorEstimator());
+ this.database = table.getDatabase();
+ this.insertLoadJob = new InsertLoadJob(database.getId(), labelName);
+ ctx.getEnv().getLoadManager().addLoadJob(insertLoadJob);
+ this.coordinator = EnvFactory.getInstance().createCoordinator(
+ ctx, planner, ctx.getStatsErrorEstimator(),
insertLoadJob.getId());
this.labelName = labelName;
this.table = table;
- this.database = table.getDatabase();
this.insertCtx = insertCtx;
this.emptyInsert = emptyInsert;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 22b77da1d0a..357523a7b4b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -298,7 +298,7 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
etlJobType, createTime, errMsg,
coordinator.getTrackingUrl(),
coordinator.getFirstErrorMsg(),
- userIdentity, jobId);
+ userIdentity, insertLoadJob.getId());
}
} catch (MetaNotFoundException e) {
LOG.warn("Record info of insert load with error {}",
e.getMessage(), e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
index 7464079084a..858b3de08a9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
@@ -91,7 +91,19 @@ public class NereidsCoordinator extends Coordinator {
super(context, planner, statsErrorEstimator);
this.coordinatorContext = CoordinatorContext.buildForSql(planner,
this);
-
this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext));
+
this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext,
-1L));
+ this.needEnqueue = true;
+
+ Preconditions.checkState(!planner.getFragments().isEmpty()
+ && coordinatorContext.instanceNum.get() > 0, "Fragment and
Instance can not be empty˚");
+ }
+
+ public NereidsCoordinator(ConnectContext context,
+ NereidsPlanner planner, StatsErrorEstimator statsErrorEstimator,
long jobId) {
+ super(context, planner, statsErrorEstimator);
+
+ this.coordinatorContext = CoordinatorContext.buildForSql(planner,
this);
+
this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext,
jobId));
this.needEnqueue = true;
Preconditions.checkState(!planner.getFragments().isEmpty()
@@ -516,13 +528,12 @@ public class NereidsCoordinator extends Coordinator {
return false;
}
- private JobProcessor buildJobProcessor(CoordinatorContext
coordinatorContext) {
+ private JobProcessor buildJobProcessor(CoordinatorContext
coordinatorContext, long jobId) {
DataSink dataSink = coordinatorContext.dataSink;
if ((dataSink instanceof ResultSink || dataSink instanceof
ResultFileSink)) {
return QueryProcessor.build(coordinatorContext);
} else {
- // insert statement has jobId == -1
- return new LoadProcessor(coordinatorContext, -1L);
+ return new LoadProcessor(coordinatorContext, jobId);
}
}
diff --git a/regression-test/suites/ddl_p0/test_ctas.groovy
b/regression-test/suites/ddl_p0/test_ctas.groovy
index 6d176e30b4f..0bf5678e4d3 100644
--- a/regression-test/suites/ddl_p0/test_ctas.groovy
+++ b/regression-test/suites/ddl_p0/test_ctas.groovy
@@ -104,9 +104,11 @@ suite("test_ctas") {
result([[null]])
}
- test {
- sql """show load from ${dbname}"""
- rowNum isGroupCommitMode() ? 4: 6
+ res = sql """show load from ${dbname}"""
+ if (isGroupCommitMode()) {
+ assertTrue(res.size() > 4)
+ } else {
+ assertTrue(res.size() > 6)
}
sql """
diff --git a/regression-test/suites/load_p0/insert/test_insert_statistic.groovy
b/regression-test/suites/load_p0/insert/test_insert_statistic.groovy
new file mode 100644
index 00000000000..61df308c58e
--- /dev/null
+++ b/regression-test/suites/load_p0/insert/test_insert_statistic.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_insert_statistic", "p0") {
+ def dbName = "test_insert_statistic_db"
+ def insert_tbl = "test_insert_statistic_tbl"
+ def label = "test_insert_statistic_label"
+ sql """CREATE DATABASE IF NOT EXISTS ${dbName}"""
+ sql """use ${dbName}"""
+
+ // insert into value
+ sql """ DROP TABLE IF EXISTS ${insert_tbl}_1"""
+ sql """
+ CREATE TABLE ${insert_tbl}_1 (
+ `k1` char(5) NULL,
+ `k2` int(11) NULL,
+ `k3` tinyint(4) NULL,
+ `k4` int(11) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`, `k2`, `k3`, `k4`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ sql """
+ INSERT INTO ${insert_tbl}_1 values(1, 1, 1, 1)
+ """
+ def result = sql "SHOW LOAD FROM ${dbName}"
+ logger.info("JobDetails: " + result[0][14])
+ def json = parseJson(result[0][14])
+ assertEquals(json.ScannedRows, 1)
+ assertTrue(json.LoadBytes > 0)
+
+ // insert into select
+ sql """ DROP TABLE IF EXISTS ${insert_tbl}_2"""
+ sql """
+ CREATE TABLE ${insert_tbl}_2 (
+ `k1` char(5) NULL,
+ `k2` int(11) NULL,
+ `k3` tinyint(4) NULL,
+ `k4` int(11) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`, `k2`, `k3`, `k4`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ sql """
+ INSERT INTO ${insert_tbl}_2 select * from ${insert_tbl}_1
+ """
+ result = sql "SHOW LOAD FROM ${dbName}"
+ logger.info("JobDetails: " + result[1][14])
+ json = parseJson(result[1][14])
+ assertEquals(json.ScannedRows, 1)
+ assertTrue(json.LoadBytes > 0)
+
+ // insert into s3 tvf
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = getS3Region()
+ String bucket = context.config.otherConfigs.get("s3BucketName");
+ sql """ DROP TABLE IF EXISTS ${insert_tbl}_3 """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${insert_tbl}_3 (
+ `k1` int NULL,
+ `k2` varchar(50) NULL,
+ `v1` varchar(50) NULL,
+ `v2` varchar(50) NULL,
+ `v3` varchar(50) NULL,
+ `v4` varchar(50) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ sql """ insert into ${insert_tbl}_3 select * from S3 (
+ "uri" =
"http://${bucket}.${s3_endpoint}/regression/load/data/empty_field_as_null.csv",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "csv",
+ "empty_field_as_null" = "true",
+ "column_separator" = ",",
+ "region" = "${region}"
+ );
+ """
+ result = sql "SHOW LOAD FROM ${dbName}"
+ logger.info("JobDetails: " + result[2][14])
+ json = parseJson(result[2][14])
+ assertEquals(json.ScannedRows, 1)
+ assertTrue(json.LoadBytes > 0)
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]