This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 285041b34c6 branch-4.0: [fix](insert) fix InsertLoadJob memory leak 
caused by jobs permanently stuck in PENDING state (#62890)
285041b34c6 is described below

commit 285041b34c64a722de908c7e0cef6ac49a1c28cf
Author: hui lai <[email protected]>
AuthorDate: Thu May 7 14:41:34 2026 +0800

    branch-4.0: [fix](insert) fix InsertLoadJob memory leak caused by jobs 
permanently stuck in PENDING state (#62890)
    
    pick https://github.com/apache/doris/pull/62282
---
 .../commands/insert/AbstractInsertExecutor.java    | 11 +++-
 .../commands/insert/InsertIntoTableCommand.java    | 12 ++++-
 .../plans/commands/insert/OlapInsertExecutor.java  | 60 ++++++++++++----------
 .../iceberg/write/test_iceberg_write_insert.groovy |  5 ++
 .../test_group_commit_http_stream.groovy           |  4 ++
 .../load_p0/insert/test_insert_statistic.groovy    | 42 +++++++++++----
 6 files changed, 94 insertions(+), 40 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index 0af33c009a7..246b5e91ce3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -96,11 +96,18 @@ public abstract class AbstractInsertExecutor {
      */
     public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String 
labelName, NereidsPlanner planner,
             Optional<InsertCommandContext> insertCtx, boolean emptyInsert, 
long jobId) {
+        this(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId, 
false);
+    }
+
+    /**
+     * constructor
+     */
+    public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String 
labelName, NereidsPlanner planner,
+            Optional<InsertCommandContext> insertCtx, boolean emptyInsert, 
long jobId, boolean needRegister) {
         this.ctx = ctx;
         this.database = table.getDatabase();
         this.insertLoadJob = new InsertLoadJob(database.getId(), labelName, 
jobId);
-        // Do not add load job if job id is -1.
-        if (jobId != -1) {
+        if (needRegister) {
             ctx.getEnv().getLoadManager().addLoadJob(insertLoadJob);
         }
         this.coordinator = EnvFactory.getInstance().createCoordinator(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index bdf0d207ed6..f623004141e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -57,6 +57,7 @@ import org.apache.doris.nereids.trees.plans.commands.Command;
 import 
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
 import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
 import org.apache.doris.nereids.trees.plans.commands.NeedAuditEncryption;
+import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.AbstractInsertExecutor.InsertExecutorListener;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
@@ -422,11 +423,20 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
                     if (getLogicalQuery().containsType(InlineTable.class)) {
                         jobId = -1;
                     }
+                    // Do not register internal group commit loads to 
LoadManager.
+                    // Internal group commit is identified by 
DMLCommandType.GROUP_COMMIT, which is set
+                    // by the parser when the target table is specified via 
tableId (doris_internal_table_id).
+                    // The actual commit is managed by BE group commit 
mechanism, so these jobs will
+                    // never transition to a completed state through FE, 
causing a memory leak if registered.
+                    if (((PhysicalOlapTableSink<?>) 
physicalSink).getDmlCommandType() == DMLCommandType.GROUP_COMMIT) {
+                        jobId = -1;
+                    }
                     executorFactory = ExecutorFactory.from(
                             planner,
                             dataSink,
                             physicalSink,
-                            () -> new OlapInsertExecutor(ctx, olapTable, 
label, planner, insertCtx, emptyInsert, jobId)
+                            () -> new OlapInsertExecutor(
+                                    ctx, olapTable, label, planner, insertCtx, 
emptyInsert, jobId, jobId != -1)
                     );
                 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 550844bd3ec..9bdc98c578f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -88,7 +88,16 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
     public OlapInsertExecutor(ConnectContext ctx, Table table,
             String labelName, NereidsPlanner planner, 
Optional<InsertCommandContext> insertCtx, boolean emptyInsert,
             long jobId) {
-        super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
+        this(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId, 
false);
+    }
+
+    /**
+     * constructor
+     */
+    public OlapInsertExecutor(ConnectContext ctx, Table table,
+            String labelName, NereidsPlanner planner, 
Optional<InsertCommandContext> insertCtx, boolean emptyInsert,
+            long jobId, boolean needRegister) {
+        super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId, 
needRegister);
         this.olapTable = (OlapTable) table;
     }
 
@@ -303,41 +312,36 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
         }
         String finalErrorMsg = InsertUtils.getFinalErrorMsg(errMsg, 
firstErrorMsgPart, urlPart);
         ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, finalErrorMsg);
+        recordLoadJob(ctx.getCurrentUserIdentity());
     }
 
-    @Override
-    protected void afterExec(StmtExecutor executor) {
-        // Go here, which means:
-        // 1. transaction is finished successfully (COMMITTED or VISIBLE), or
-        // 2. transaction failed but Config.using_old_load_usage_pattern is 
true.
-        // we will record the load job info for these 2 cases
-        try {
-            // the statement parsed by Nereids is saved at 
executor::parsedStmt.
-            StatementBase statement = executor.getParsedStmt();
-            UserIdentity userIdentity;
-            //if we use job scheduler, parse statement will not set user 
identity,so we need to get it from context
-            if (null == statement) {
-                userIdentity = ctx.getCurrentUserIdentity();
-            } else {
-                userIdentity = statement.getUserInfo();
-            }
-            EtlJobType etlJobType = EtlJobType.INSERT;
-            // Do not register job if job id is -1.
-            if (!Config.enable_nereids_load && jobId != -1) {
-                // just record for loadv2 here
+    private void recordLoadJob(UserIdentity userIdentity) {
+        if (!Config.enable_nereids_load && jobId != -1) {
+            try {
                 ctx.getEnv().getLoadManager()
                         .recordFinishedLoadJob(labelName, txnId, 
database.getFullName(),
-                                table.getId(),
-                                etlJobType, createTime, errMsg,
-                                coordinator.getTrackingUrl(),
-                                coordinator.getFirstErrorMsg(),
+                                table.getId(), EtlJobType.INSERT, createTime, 
errMsg,
+                                coordinator.getTrackingUrl(), 
coordinator.getFirstErrorMsg(),
                                 userIdentity, insertLoadJob.getId());
+            } catch (MetaNotFoundException e) {
+                LOG.warn("Record info of insert load with error {}", 
e.getMessage(), e);
+                errMsg = "Record info of insert load with error " + 
e.getMessage();
             }
-        } catch (MetaNotFoundException e) {
-            LOG.warn("Record info of insert load with error {}", 
e.getMessage(), e);
-            errMsg = "Record info of insert load with error " + e.getMessage();
         }
+    }
 
+    @Override
+    protected void afterExec(StmtExecutor executor) {
+        // the statement parsed by Nereids is saved at executor::parsedStmt.
+        StatementBase statement = executor.getParsedStmt();
+        UserIdentity userIdentity;
+        // if we use job scheduler, parse statement will not set user 
identity, so we need to get it from context
+        if (null == statement) {
+            userIdentity = ctx.getCurrentUserIdentity();
+        } else {
+            userIdentity = statement.getUserInfo();
+        }
+        recordLoadJob(userIdentity);
         setReturnInfo();
     }
 
diff --git 
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
index 617d830e16d..4981123c36f 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
@@ -866,4 +866,9 @@ suite("test_iceberg_write_insert", 
"p0,external,iceberg,external_docker,external
         } finally {
         }
     }
+
+    // external table insert should not register a load job in LoadManager
+    sql """ SWITCH internal """
+    def showLoadResult = sql """ SHOW LOAD """
+    assertEquals(0, showLoadResult.size())
 }
diff --git 
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
index b83ad4f0b6f..b160b478c8e 100644
--- 
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
+++ 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
@@ -251,6 +251,10 @@ suite("test_group_commit_http_stream") {
 
         getRowCount(19)
         qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
+
+        // group commit http stream (SELECT * FROM http_stream(...)) should 
not register load jobs
+        def showLoadResult = sql """ SHOW LOAD FROM ${db} """
+        assertEquals(0, showLoadResult.size())
     } finally {
         // try_sql("DROP TABLE ${tableName}")
     }
diff --git a/regression-test/suites/load_p0/insert/test_insert_statistic.groovy 
b/regression-test/suites/load_p0/insert/test_insert_statistic.groovy
index b70146d8eeb..62e8201f00f 100644
--- a/regression-test/suites/load_p0/insert/test_insert_statistic.groovy
+++ b/regression-test/suites/load_p0/insert/test_insert_statistic.groovy
@@ -75,10 +75,11 @@ suite("test_insert_statistic", "p0") {
        "replication_num"="1"
      );
     """
-    sql """ 
-    INSERT INTO ${insert_tbl}_2 select * from ${insert_tbl}_1
-    """
-    result = sql "SHOW LOAD FROM ${dbName}"
+    def selectLabel = "label_insert_select_" + System.currentTimeMillis()
+    sql """ INSERT INTO ${insert_tbl}_2 WITH LABEL ${selectLabel} select * 
from ${insert_tbl}_1 """
+    result = sql "SHOW LOAD FROM ${dbName} WHERE LABEL = '${selectLabel}'"
+    assertEquals(1, result.size())
+    assertEquals("FINISHED", result[0][2])
     logger.info("JobDetails: " + result[0][14])
     def json = parseJson(result[0][14])
     assertEquals(json.ScannedRows, 3)
@@ -86,6 +87,27 @@ suite("test_insert_statistic", "p0") {
     assertEquals(json.FileSize, 0)
     assertTrue(json.LoadBytes > 0)
 
+    // failed insert into select → job should be CANCELLED
+    sql """ DROP TABLE IF EXISTS ${insert_tbl}_fail"""
+    sql """
+     CREATE TABLE ${insert_tbl}_fail (
+       `k1` varchar(3) NOT NULL
+     ) ENGINE=OLAP
+     DUPLICATE KEY(`k1`)
+     DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+     PROPERTIES ("replication_num"="1");
+    """
+    sql """ set enable_insert_strict = true """
+    try {
+        sql """ INSERT INTO ${insert_tbl}_fail SELECT 
'this_value_is_too_long_for_varchar3' """
+    } catch (Exception e) {
+        logger.info("Expected insert failure: " + e.getMessage())
+    }
+    sql """ set enable_insert_strict = false """
+    result = sql "SHOW LOAD FROM ${dbName}"
+    def cancelledJob = result.find { it[2] == "CANCELLED" }
+    assertNotNull(cancelledJob, "Expected a CANCELLED load job for failed 
insert")
+
     // insert into s3 tvf
     String ak = getS3AK()
     String sk = getS3SK()
@@ -107,7 +129,8 @@ suite("test_insert_statistic", "p0") {
         DISTRIBUTED BY HASH(`k1`) BUCKETS 3
         PROPERTIES ("replication_allocation" = "tag.location.default: 1");
     """
-    sql """ insert into ${insert_tbl}_3 select * from S3 (
+    def s3Label = "label_insert_s3_" + System.currentTimeMillis()
+    sql """ insert into ${insert_tbl}_3 WITH LABEL ${s3Label} select * from S3 
(
                         "uri" = 
"http://${bucket}.${s3_endpoint}/regression/load/data/empty_field_as_null.csv";,
                         "ACCESS_KEY"= "${ak}",
                         "SECRET_KEY" = "${sk}",
@@ -117,12 +140,13 @@ suite("test_insert_statistic", "p0") {
                         "region" = "${region}"
                         );
                     """
-    result = sql "SHOW LOAD FROM ${dbName}"
-    logger.info("JobDetails: " + result[1][14])
-    json = parseJson(result[1][14])
+    result = sql "SHOW LOAD FROM ${dbName} WHERE LABEL = '${s3Label}'"
+    assertEquals(1, result.size())
+    assertEquals("FINISHED", result[0][2])
+    logger.info("JobDetails: " + result[0][14])
+    json = parseJson(result[0][14])
     assertEquals(json.ScannedRows, 2)
     assertEquals(json.FileNumber, 1)
     assertEquals(json.FileSize, 86)
-    assertEquals(result.size(), 2)
     assertTrue(json.LoadBytes > 0)
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to