This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 484731e28ae [fix](insert) fix insert into statistic never update 
(#56412)
484731e28ae is described below

commit 484731e28aeed98eb09efd7617790f5261f4c5ec
Author: hui lai <[email protected]>
AuthorDate: Mon Sep 29 10:20:22 2025 +0800

    [fix](insert) fix insert into statistic never update (#56412)
    
    ### What problem does this PR solve?
    
    1. support insert can be observed by `show load` command when loading
    2. fix insert into statistic never update: if user executes insert
    command, and show load after insert
    
    before bug fixed:
    ```
    JobDetails: {"Unfinished 
backends":{},"ScannedRows":0,"TaskNumber":0,"LoadBytes":0,"All 
backends":{},"FileNumber":0,"FileSize":0}
    ```
    
    after bug fixed:
    ```
    JobDetails: {"Unfinished 
backends":{"1c59379fc9214ec0-866797d1c03a7e4e":[]},"ScannedRows":5,"TaskNumber":1,"LoadBytes":156,"All
 
backends":{"1c59379fc9214ec0-866797d1c03a7e4e":[1754377661179]},"FileNumber":0,"FileSize":0}
    ```
---
 .../java/org/apache/doris/catalog/EnvFactory.java  |   8 ++
 .../doris/cloud/catalog/CloudEnvFactory.java       |   9 ++
 .../apache/doris/load/loadv2/InsertLoadJob.java    |  28 ++----
 .../org/apache/doris/load/loadv2/LoadManager.java  |  58 +++++++----
 .../commands/insert/AbstractInsertExecutor.java    |   9 +-
 .../plans/commands/insert/OlapInsertExecutor.java  |   2 +-
 .../org/apache/doris/qe/NereidsCoordinator.java    |  19 +++-
 regression-test/suites/ddl_p0/test_ctas.groovy     |   8 +-
 .../load_p0/insert/test_insert_statistic.groovy    | 111 +++++++++++++++++++++
 9 files changed, 203 insertions(+), 49 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
index f095e98cf8c..08c78986bcb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
@@ -147,6 +147,14 @@ public class EnvFactory {
         return new Coordinator(context, planner, statsErrorEstimator);
     }
 
+    public Coordinator createCoordinator(ConnectContext context, Planner 
planner,
+                                         StatsErrorEstimator 
statsErrorEstimator, long jobId) {
+        if (planner instanceof NereidsPlanner && 
SessionVariable.canUseNereidsDistributePlanner()) {
+            return new NereidsCoordinator(context, (NereidsPlanner) planner, 
statsErrorEstimator, jobId);
+        }
+        return new Coordinator(context, planner, statsErrorEstimator);
+    }
+
     // Used for broker load task/export task/update coordinator
     public Coordinator createCoordinator(Long jobId, TUniqueId queryId, 
DescriptorTable descTable,
                                          List<PlanFragment> fragments, 
List<ScanNode> scanNodes,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
index 25bc8dcb0e4..958c4fd5e1f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
@@ -158,6 +158,15 @@ public class CloudEnvFactory extends EnvFactory {
         return new CloudCoordinator(context, planner, statsErrorEstimator);
     }
 
+    @Override
+    public Coordinator createCoordinator(ConnectContext context, Planner 
planner,
+                                         StatsErrorEstimator 
statsErrorEstimator, long jobId) {
+        if (planner instanceof NereidsPlanner && 
SessionVariable.canUseNereidsDistributePlanner()) {
+            return new NereidsCoordinator(context, (NereidsPlanner) planner, 
statsErrorEstimator, jobId);
+        }
+        return new CloudCoordinator(context, planner, statsErrorEstimator);
+    }
+
     @Override
     public Coordinator createCoordinator(Long jobId, TUniqueId queryId, 
DescriptorTable descTable,
                                          List<PlanFragment> fragments, 
List<ScanNode> scanNodes,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
index 737b3134270..e52a87fe926 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
@@ -53,33 +53,27 @@ public class InsertLoadJob extends LoadJob {
         super(EtlJobType.INSERT);
     }
 
+    public InsertLoadJob(long dbId, String label) {
+        super(EtlJobType.INSERT, dbId, label);
+    }
+
     public InsertLoadJob(String label, long transactionId, long dbId, long 
tableId,
             long createTimestamp, String failMsg, String trackingUrl, String 
firstErrorMsg,
             UserIdentity userInfo) throws MetaNotFoundException {
         super(EtlJobType.INSERT, dbId, label);
-        this.tableId = tableId;
-        this.transactionId = transactionId;
-        this.createTimestamp = createTimestamp;
-        this.loadStartTimestamp = createTimestamp;
-        this.finishTimestamp = System.currentTimeMillis();
-        if (Strings.isNullOrEmpty(failMsg)) {
-            this.state = JobState.FINISHED;
-            this.progress = 100;
-        } else {
-            this.state = JobState.CANCELLED;
-            this.failMsg = new FailMsg(CancelType.LOAD_RUN_FAIL, failMsg);
-            this.progress = 0;
-        }
-        this.authorizationInfo = gatherAuthInfo();
-        this.loadingStatus.setTrackingUrl(trackingUrl);
-        this.loadingStatus.setFirstErrorMsg(firstErrorMsg);
-        this.userInfo = userInfo;
+        setJobProperties(transactionId, tableId, createTimestamp, failMsg, 
trackingUrl, firstErrorMsg, userInfo);
     }
 
     public InsertLoadJob(String label, long transactionId, long dbId, long 
tableId,
                          long createTimestamp, String failMsg, String 
trackingUrl, String firstErrorMsg,
                          UserIdentity userInfo, Long jobId) throws 
MetaNotFoundException {
         super(EtlJobType.INSERT_JOB, dbId, label, jobId);
+        setJobProperties(transactionId, tableId, createTimestamp, failMsg, 
trackingUrl, firstErrorMsg, userInfo);
+    }
+
+    public void setJobProperties(long transactionId, long tableId, long 
createTimestamp,
+                                        String failMsg, String trackingUrl, 
String firstErrorMsg,
+                                        UserIdentity userInfo) throws 
MetaNotFoundException {
         this.tableId = tableId;
         this.transactionId = transactionId;
         this.createTimestamp = createTimestamp;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 8aa6b85c975..43ab01fb942 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -162,7 +162,10 @@ public class LoadManager implements Writable {
 
     private long unprotectedGetUnfinishedJobNum() {
         return idToLoadJob.values().stream()
-                .filter(j -> (j.getState() != JobState.FINISHED && 
j.getState() != JobState.CANCELLED)).count();
+                .filter(j -> (j.getJobType() != EtlJobType.INSERT
+                                && j.getJobType() != EtlJobType.INSERT_JOB
+                                && j.getState() != JobState.FINISHED
+                                && j.getState() != 
JobState.CANCELLED)).count();
     }
 
     public MysqlLoadManager getMysqlLoadManager() {
@@ -189,17 +192,20 @@ public class LoadManager implements Writable {
         }
     }
 
-    private void addLoadJob(LoadJob loadJob) {
-        idToLoadJob.put(loadJob.getId(), loadJob);
-        long dbId = loadJob.getDbId();
-        if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
-            dbIdToLabelToLoadJobs.put(loadJob.getDbId(), new 
ConcurrentHashMap<>());
-        }
-        Map<String, List<LoadJob>> labelToLoadJobs = 
dbIdToLabelToLoadJobs.get(dbId);
-        if (!labelToLoadJobs.containsKey(loadJob.getLabel())) {
-            labelToLoadJobs.put(loadJob.getLabel(), new ArrayList<>());
+    public void addLoadJob(LoadJob loadJob) {
+        // Insert label may be null in txn mode, we add txn insert job after 
success.
+        if (loadJob.getLabel() != null) {
+            idToLoadJob.put(loadJob.getId(), loadJob);
+            long dbId = loadJob.getDbId();
+            if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
+                dbIdToLabelToLoadJobs.put(loadJob.getDbId(), new 
ConcurrentHashMap<>());
+            }
+            Map<String, List<LoadJob>> labelToLoadJobs = 
dbIdToLabelToLoadJobs.get(dbId);
+            if (!labelToLoadJobs.containsKey(loadJob.getLabel())) {
+                labelToLoadJobs.put(loadJob.getLabel(), new ArrayList<>());
+            }
+            labelToLoadJobs.get(loadJob.getLabel()).add(loadJob);
         }
-        labelToLoadJobs.get(loadJob.getLabel()).add(loadJob);
     }
 
     /**
@@ -213,17 +219,25 @@ public class LoadManager implements Writable {
         Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException(dbName);
 
         LoadJob loadJob;
-        switch (jobType) {
-            case INSERT:
-                loadJob = new InsertLoadJob(label, transactionId, db.getId(), 
tableId, createTimestamp, failMsg,
-                        trackingUrl, firstErrorMsg, userInfo);
-                break;
-            case INSERT_JOB:
-                loadJob = new InsertLoadJob(label, transactionId, db.getId(), 
tableId, createTimestamp, failMsg,
-                        trackingUrl, firstErrorMsg, userInfo, jobId);
-                break;
-            default:
-                return;
+        if (idToLoadJob.containsKey(jobId)) {
+            loadJob = idToLoadJob.get(jobId);
+            if (loadJob instanceof InsertLoadJob) {
+                ((InsertLoadJob) loadJob).setJobProperties(transactionId, 
tableId, createTimestamp,
+                        failMsg, trackingUrl, firstErrorMsg, userInfo);
+            }
+        } else {
+            switch (jobType) {
+                case INSERT:
+                    loadJob = new InsertLoadJob(label, transactionId, 
db.getId(), tableId, createTimestamp, failMsg,
+                            trackingUrl, firstErrorMsg, userInfo);
+                    break;
+                case INSERT_JOB:
+                    loadJob = new InsertLoadJob(label, transactionId, 
db.getId(), tableId, createTimestamp, failMsg,
+                            trackingUrl, firstErrorMsg, userInfo, jobId);
+                    break;
+                default:
+                    return;
+            }
         }
         addLoadJob(loadJob);
         // persistent
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index dfa2879efef..bc032a9d61f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.load.loadv2.InsertLoadJob;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
 import org.apache.doris.planner.DataSink;
@@ -56,6 +57,7 @@ public abstract class AbstractInsertExecutor {
     protected long jobId;
     protected final ConnectContext ctx;
     protected final Coordinator coordinator;
+    protected InsertLoadJob insertLoadJob;
     protected String labelName;
     protected final DatabaseIf database;
     protected final TableIf table;
@@ -74,10 +76,13 @@ public abstract class AbstractInsertExecutor {
     public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String 
labelName, NereidsPlanner planner,
             Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
         this.ctx = ctx;
-        this.coordinator = EnvFactory.getInstance().createCoordinator(ctx, 
planner, ctx.getStatsErrorEstimator());
+        this.database = table.getDatabase();
+        this.insertLoadJob = new InsertLoadJob(database.getId(), labelName);
+        ctx.getEnv().getLoadManager().addLoadJob(insertLoadJob);
+        this.coordinator = EnvFactory.getInstance().createCoordinator(
+                ctx, planner, ctx.getStatsErrorEstimator(), 
insertLoadJob.getId());
         this.labelName = labelName;
         this.table = table;
-        this.database = table.getDatabase();
         this.insertCtx = insertCtx;
         this.emptyInsert = emptyInsert;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 22b77da1d0a..357523a7b4b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -298,7 +298,7 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
                                 etlJobType, createTime, errMsg,
                                 coordinator.getTrackingUrl(),
                                 coordinator.getFirstErrorMsg(),
-                                userIdentity, jobId);
+                                userIdentity, insertLoadJob.getId());
             }
         } catch (MetaNotFoundException e) {
             LOG.warn("Record info of insert load with error {}", 
e.getMessage(), e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
index 7464079084a..858b3de08a9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
@@ -91,7 +91,19 @@ public class NereidsCoordinator extends Coordinator {
         super(context, planner, statsErrorEstimator);
 
         this.coordinatorContext = CoordinatorContext.buildForSql(planner, 
this);
-        
this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext));
+        
this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext, 
-1L));
+        this.needEnqueue = true;
+
+        Preconditions.checkState(!planner.getFragments().isEmpty()
+                && coordinatorContext.instanceNum.get() > 0, "Fragment and 
Instance can not be empty˚");
+    }
+
+    public NereidsCoordinator(ConnectContext context,
+            NereidsPlanner planner, StatsErrorEstimator statsErrorEstimator, 
long jobId) {
+        super(context, planner, statsErrorEstimator);
+
+        this.coordinatorContext = CoordinatorContext.buildForSql(planner, 
this);
+        
this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext, 
jobId));
         this.needEnqueue = true;
 
         Preconditions.checkState(!planner.getFragments().isEmpty()
@@ -516,13 +528,12 @@ public class NereidsCoordinator extends Coordinator {
         return false;
     }
 
-    private JobProcessor buildJobProcessor(CoordinatorContext 
coordinatorContext) {
+    private JobProcessor buildJobProcessor(CoordinatorContext 
coordinatorContext, long jobId) {
         DataSink dataSink = coordinatorContext.dataSink;
         if ((dataSink instanceof ResultSink || dataSink instanceof 
ResultFileSink)) {
             return QueryProcessor.build(coordinatorContext);
         } else {
-            // insert statement has jobId == -1
-            return new LoadProcessor(coordinatorContext, -1L);
+            return new LoadProcessor(coordinatorContext, jobId);
         }
     }
 
diff --git a/regression-test/suites/ddl_p0/test_ctas.groovy 
b/regression-test/suites/ddl_p0/test_ctas.groovy
index 6d176e30b4f..0bf5678e4d3 100644
--- a/regression-test/suites/ddl_p0/test_ctas.groovy
+++ b/regression-test/suites/ddl_p0/test_ctas.groovy
@@ -104,9 +104,11 @@ suite("test_ctas") {
             result([[null]])
         }
 
-        test {
-            sql """show load from ${dbname}"""
-            rowNum isGroupCommitMode() ? 4: 6
+        res = sql """show load from ${dbname}"""
+        if (isGroupCommitMode()) {
+            assertTrue(res.size() > 4)
+        } else {
+            assertTrue(res.size() > 6)
         }
 
         sql """
diff --git a/regression-test/suites/load_p0/insert/test_insert_statistic.groovy 
b/regression-test/suites/load_p0/insert/test_insert_statistic.groovy
new file mode 100644
index 00000000000..61df308c58e
--- /dev/null
+++ b/regression-test/suites/load_p0/insert/test_insert_statistic.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_insert_statistic", "p0") {
+    def dbName = "test_insert_statistic_db"
+    def insert_tbl = "test_insert_statistic_tbl"
+    def label = "test_insert_statistic_label"
+    sql """CREATE DATABASE IF NOT EXISTS ${dbName}"""
+    sql """use ${dbName}"""
+
+    // insert into value
+    sql """ DROP TABLE IF EXISTS ${insert_tbl}_1"""
+    sql """
+     CREATE TABLE ${insert_tbl}_1 (
+       `k1` char(5) NULL,
+       `k2` int(11) NULL,
+       `k3` tinyint(4) NULL,
+       `k4` int(11) NULL
+     ) ENGINE=OLAP
+     DUPLICATE KEY(`k1`, `k2`, `k3`, `k4`)
+     COMMENT 'OLAP'
+     DISTRIBUTED BY HASH(`k1`) BUCKETS 5
+     PROPERTIES (
+       "replication_num"="1"
+     );
+    """
+    sql """ 
+    INSERT INTO ${insert_tbl}_1 values(1, 1, 1, 1)
+    """
+    def result = sql "SHOW LOAD FROM ${dbName}"
+    logger.info("JobDetails: " + result[0][14])
+    def json = parseJson(result[0][14])
+    assertEquals(json.ScannedRows, 1)
+    assertTrue(json.LoadBytes > 0)
+
+    // insert into select
+    sql """ DROP TABLE IF EXISTS ${insert_tbl}_2"""
+    sql """
+     CREATE TABLE ${insert_tbl}_2 (
+       `k1` char(5) NULL,
+       `k2` int(11) NULL,
+       `k3` tinyint(4) NULL,
+       `k4` int(11) NULL
+     ) ENGINE=OLAP
+     DUPLICATE KEY(`k1`, `k2`, `k3`, `k4`)
+     COMMENT 'OLAP'
+     DISTRIBUTED BY HASH(`k1`) BUCKETS 5
+     PROPERTIES (
+       "replication_num"="1"
+     );
+    """
+    sql """ 
+    INSERT INTO ${insert_tbl}_2 select * from ${insert_tbl}_1
+    """
+    result = sql "SHOW LOAD FROM ${dbName}"
+    logger.info("JobDetails: " + result[1][14])
+    json = parseJson(result[1][14])
+    assertEquals(json.ScannedRows, 1)
+    assertTrue(json.LoadBytes > 0)
+
+    // insert into s3 tvf
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName");
+    sql """ DROP TABLE IF EXISTS ${insert_tbl}_3 """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${insert_tbl}_3 (
+            `k1` int NULL,
+            `k2` varchar(50) NULL,
+            `v1` varchar(50)  NULL,
+            `v2` varchar(50)  NULL,
+            `v3` varchar(50)  NULL,
+            `v4` varchar(50)  NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+    sql """ insert into ${insert_tbl}_3 select * from S3 (
+                        "uri" = 
"http://${bucket}.${s3_endpoint}/regression/load/data/empty_field_as_null.csv";,
+                        "ACCESS_KEY"= "${ak}",
+                        "SECRET_KEY" = "${sk}",
+                        "format" = "csv",
+                        "empty_field_as_null" = "true",
+                        "column_separator" = ",",
+                        "region" = "${region}"
+                        );
+                    """
+    result = sql "SHOW LOAD FROM ${dbName}"
+    logger.info("JobDetails: " + result[2][14])
+    json = parseJson(result[2][14])
+    assertEquals(json.ScannedRows, 1)
+    assertTrue(json.LoadBytes > 0)
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to