This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 285041b34c6 branch-4.0: [fix](insert) fix InsertLoadJob memory leak
caused by jobs permanently stuck in PENDING state (#62890)
285041b34c6 is described below
commit 285041b34c64a722de908c7e0cef6ac49a1c28cf
Author: hui lai <[email protected]>
AuthorDate: Thu May 7 14:41:34 2026 +0800
branch-4.0: [fix](insert) fix InsertLoadJob memory leak caused by jobs
permanently stuck in PENDING state (#62890)
pick https://github.com/apache/doris/pull/62282
---
.../commands/insert/AbstractInsertExecutor.java | 11 +++-
.../commands/insert/InsertIntoTableCommand.java | 12 ++++-
.../plans/commands/insert/OlapInsertExecutor.java | 60 ++++++++++++----------
.../iceberg/write/test_iceberg_write_insert.groovy | 5 ++
.../test_group_commit_http_stream.groovy | 4 ++
.../load_p0/insert/test_insert_statistic.groovy | 42 +++++++++++----
6 files changed, 94 insertions(+), 40 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index 0af33c009a7..246b5e91ce3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -96,11 +96,18 @@ public abstract class AbstractInsertExecutor {
*/
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String
labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx, boolean emptyInsert,
long jobId) {
+ this(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId,
false);
+ }
+
+ /**
+ * constructor
+ */
+ public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String
labelName, NereidsPlanner planner,
+ Optional<InsertCommandContext> insertCtx, boolean emptyInsert,
long jobId, boolean needRegister) {
this.ctx = ctx;
this.database = table.getDatabase();
this.insertLoadJob = new InsertLoadJob(database.getId(), labelName,
jobId);
- // Do not add load job if job id is -1.
- if (jobId != -1) {
+ if (needRegister) {
ctx.getEnv().getLoadManager().addLoadJob(insertLoadJob);
}
this.coordinator = EnvFactory.getInstance().createCoordinator(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index bdf0d207ed6..f623004141e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -57,6 +57,7 @@ import org.apache.doris.nereids.trees.plans.commands.Command;
import
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.commands.NeedAuditEncryption;
+import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import
org.apache.doris.nereids.trees.plans.commands.insert.AbstractInsertExecutor.InsertExecutorListener;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
@@ -422,11 +423,20 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
if (getLogicalQuery().containsType(InlineTable.class)) {
jobId = -1;
}
+ // Do not register internal group commit loads to
LoadManager.
+ // Internal group commit is identified by
DMLCommandType.GROUP_COMMIT, which is set
+ // by the parser when the target table is specified via
tableId (doris_internal_table_id).
+ // The actual commit is managed by BE group commit
mechanism, so these jobs will
+ // never transition to a completed state through FE,
causing a memory leak if registered.
+ if (((PhysicalOlapTableSink<?>)
physicalSink).getDmlCommandType() == DMLCommandType.GROUP_COMMIT) {
+ jobId = -1;
+ }
executorFactory = ExecutorFactory.from(
planner,
dataSink,
physicalSink,
- () -> new OlapInsertExecutor(ctx, olapTable,
label, planner, insertCtx, emptyInsert, jobId)
+ () -> new OlapInsertExecutor(
+ ctx, olapTable, label, planner, insertCtx,
emptyInsert, jobId, jobId != -1)
);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 550844bd3ec..9bdc98c578f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -88,7 +88,16 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
public OlapInsertExecutor(ConnectContext ctx, Table table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx, boolean emptyInsert,
long jobId) {
- super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
+ this(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId,
false);
+ }
+
+ /**
+ * constructor
+ */
+ public OlapInsertExecutor(ConnectContext ctx, Table table,
+ String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx, boolean emptyInsert,
+ long jobId, boolean needRegister) {
+ super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId,
needRegister);
this.olapTable = (OlapTable) table;
}
@@ -303,41 +312,36 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
}
String finalErrorMsg = InsertUtils.getFinalErrorMsg(errMsg,
firstErrorMsgPart, urlPart);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, finalErrorMsg);
+ recordLoadJob(ctx.getCurrentUserIdentity());
}
- @Override
- protected void afterExec(StmtExecutor executor) {
- // Go here, which means:
- // 1. transaction is finished successfully (COMMITTED or VISIBLE), or
- // 2. transaction failed but Config.using_old_load_usage_pattern is
true.
- // we will record the load job info for these 2 cases
- try {
- // the statement parsed by Nereids is saved at
executor::parsedStmt.
- StatementBase statement = executor.getParsedStmt();
- UserIdentity userIdentity;
- //if we use job scheduler, parse statement will not set user
identity,so we need to get it from context
- if (null == statement) {
- userIdentity = ctx.getCurrentUserIdentity();
- } else {
- userIdentity = statement.getUserInfo();
- }
- EtlJobType etlJobType = EtlJobType.INSERT;
- // Do not register job if job id is -1.
- if (!Config.enable_nereids_load && jobId != -1) {
- // just record for loadv2 here
+ private void recordLoadJob(UserIdentity userIdentity) {
+ if (!Config.enable_nereids_load && jobId != -1) {
+ try {
ctx.getEnv().getLoadManager()
.recordFinishedLoadJob(labelName, txnId,
database.getFullName(),
- table.getId(),
- etlJobType, createTime, errMsg,
- coordinator.getTrackingUrl(),
- coordinator.getFirstErrorMsg(),
+ table.getId(), EtlJobType.INSERT, createTime,
errMsg,
+ coordinator.getTrackingUrl(),
coordinator.getFirstErrorMsg(),
userIdentity, insertLoadJob.getId());
+ } catch (MetaNotFoundException e) {
+ LOG.warn("Record info of insert load with error {}",
e.getMessage(), e);
+ errMsg = "Record info of insert load with error " +
e.getMessage();
}
- } catch (MetaNotFoundException e) {
- LOG.warn("Record info of insert load with error {}",
e.getMessage(), e);
- errMsg = "Record info of insert load with error " + e.getMessage();
}
+ }
+ @Override
+ protected void afterExec(StmtExecutor executor) {
+ // the statement parsed by Nereids is saved at executor::parsedStmt.
+ StatementBase statement = executor.getParsedStmt();
+ UserIdentity userIdentity;
+ // if we use job scheduler, parse statement will not set user
identity, so we need to get it from context
+ if (null == statement) {
+ userIdentity = ctx.getCurrentUserIdentity();
+ } else {
+ userIdentity = statement.getUserInfo();
+ }
+ recordLoadJob(userIdentity);
setReturnInfo();
}
diff --git
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
index 617d830e16d..4981123c36f 100644
---
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
@@ -866,4 +866,9 @@ suite("test_iceberg_write_insert",
"p0,external,iceberg,external_docker,external
} finally {
}
}
+
+ // external table insert should not register a load job in LoadManager
+ sql """ SWITCH internal """
+ def showLoadResult = sql """ SHOW LOAD """
+ assertEquals(0, showLoadResult.size())
}
diff --git
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
index b83ad4f0b6f..b160b478c8e 100644
---
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
+++
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
@@ -251,6 +251,10 @@ suite("test_group_commit_http_stream") {
getRowCount(19)
qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
+
+ // group commit http stream (SELECT * FROM http_stream(...)) should
not register load jobs
+ def showLoadResult = sql """ SHOW LOAD FROM ${db} """
+ assertEquals(0, showLoadResult.size())
} finally {
// try_sql("DROP TABLE ${tableName}")
}
diff --git a/regression-test/suites/load_p0/insert/test_insert_statistic.groovy
b/regression-test/suites/load_p0/insert/test_insert_statistic.groovy
index b70146d8eeb..62e8201f00f 100644
--- a/regression-test/suites/load_p0/insert/test_insert_statistic.groovy
+++ b/regression-test/suites/load_p0/insert/test_insert_statistic.groovy
@@ -75,10 +75,11 @@ suite("test_insert_statistic", "p0") {
"replication_num"="1"
);
"""
- sql """
- INSERT INTO ${insert_tbl}_2 select * from ${insert_tbl}_1
- """
- result = sql "SHOW LOAD FROM ${dbName}"
+ def selectLabel = "label_insert_select_" + System.currentTimeMillis()
+ sql """ INSERT INTO ${insert_tbl}_2 WITH LABEL ${selectLabel} select *
from ${insert_tbl}_1 """
+ result = sql "SHOW LOAD FROM ${dbName} WHERE LABEL = '${selectLabel}'"
+ assertEquals(1, result.size())
+ assertEquals("FINISHED", result[0][2])
logger.info("JobDetails: " + result[0][14])
def json = parseJson(result[0][14])
assertEquals(json.ScannedRows, 3)
@@ -86,6 +87,27 @@ suite("test_insert_statistic", "p0") {
assertEquals(json.FileSize, 0)
assertTrue(json.LoadBytes > 0)
+ // failed insert into select → job should be CANCELLED
+ sql """ DROP TABLE IF EXISTS ${insert_tbl}_fail"""
+ sql """
+ CREATE TABLE ${insert_tbl}_fail (
+ `k1` varchar(3) NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES ("replication_num"="1");
+ """
+ sql """ set enable_insert_strict = true """
+ try {
+ sql """ INSERT INTO ${insert_tbl}_fail SELECT
'this_value_is_too_long_for_varchar3' """
+ } catch (Exception e) {
+ logger.info("Expected insert failure: " + e.getMessage())
+ }
+ sql """ set enable_insert_strict = false """
+ result = sql "SHOW LOAD FROM ${dbName}"
+ def cancelledJob = result.find { it[2] == "CANCELLED" }
+ assertNotNull(cancelledJob, "Expected a CANCELLED load job for failed
insert")
+
// insert into s3 tvf
String ak = getS3AK()
String sk = getS3SK()
@@ -107,7 +129,8 @@ suite("test_insert_statistic", "p0") {
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
- sql """ insert into ${insert_tbl}_3 select * from S3 (
+ def s3Label = "label_insert_s3_" + System.currentTimeMillis()
+ sql """ insert into ${insert_tbl}_3 WITH LABEL ${s3Label} select * from S3
(
"uri" =
"http://${bucket}.${s3_endpoint}/regression/load/data/empty_field_as_null.csv",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
@@ -117,12 +140,13 @@ suite("test_insert_statistic", "p0") {
"region" = "${region}"
);
"""
- result = sql "SHOW LOAD FROM ${dbName}"
- logger.info("JobDetails: " + result[1][14])
- json = parseJson(result[1][14])
+ result = sql "SHOW LOAD FROM ${dbName} WHERE LABEL = '${s3Label}'"
+ assertEquals(1, result.size())
+ assertEquals("FINISHED", result[0][2])
+ logger.info("JobDetails: " + result[0][14])
+ json = parseJson(result[0][14])
assertEquals(json.ScannedRows, 2)
assertEquals(json.FileNumber, 1)
assertEquals(json.FileSize, 86)
- assertEquals(result.size(), 2)
assertTrue(json.LoadBytes > 0)
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]