This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d68997346bf [fix](job) fix routine load job incorrectly cancelled on
FE restart after swap table (#61046)
d68997346bf is described below
commit d68997346bf5f8cca394b835a855b98b845f7a97
Author: hui lai <[email protected]>
AuthorDate: Mon Mar 9 09:52:40 2026 +0800
[fix](job) fix routine load job incorrectly cancelled on FE restart after
swap table (#61046)
### Problem
After performing `REPLACE TABLE` (swap) followed by `DROP TABLE` on the
original table name,
restarting FE causes all routine load jobs targeting the original table
to be set to `CANCELLED`.
### Steps to Reproduce
1. Create table `tbl_test`
2. Create a routine load job on `tbl_test`
3. `ALTER TABLE tbl_main REPLACE WITH TABLE tbl_test PROPERTIES('swap' =
'true')`
→ `tbl_test` (by ID) is now registered under the name `tbl_main`
→ `tbl_main` (by ID) is now registered under the name `tbl_test`
4. `DROP TABLE tbl_test` (drops the old `tbl_main` data, now named
`tbl_test`)
5. Restart FE → routine load job state becomes `CANCELLED`
### Root Cause
`RoutineLoadJob.gsonPostProcess()` is called during image replay on FE
restart.
It re-parses the original `CREATE ROUTINE LOAD` SQL (`origStmt`) and
calls
`CreateRoutineLoadInfo.validate()`, which calls `checkDBTable()` to look
up the
target table **by name**.
The name stored in `origStmt` is `tbl_test` (the name at creation time).
After the
swap + drop sequence, this name no longer exists in the catalog. The
lookup throws
`AnalysisException: Unknown table 'tbl_test'`, which is caught and the
job state is
set to `CANCELLED`.
The `tableId` field is already correctly persisted and still points to
the original
table object (now registered under `tbl_main`), but `gsonPostProcess()`
never uses it.
### Fix
Before calling `validate()` in `gsonPostProcess()`, resolve the
**current** table name
from the catalog using the persisted `tableId`, and override the stale
name parsed from
`origStmt`. If `tableId` resolves to a valid table, validation succeeds
with the correct
name. If the table has genuinely been deleted (no entry for `tableId`),
validation still
fails and the job is correctly cancelled.
### Changes
- `RoutineLoadJob.java`: in `gsonPostProcess()`, look up current table
name by `tableId`
before calling `validate()`, overriding the stale name from `origStmt`
- `CreateRoutineLoadInfo.java`: add `setTableName()` to allow the name
override
- Add regression test `test_routine_load_swap_table_restart`
---
.../doris/load/routineload/RoutineLoadJob.java | 13 +++
.../plans/commands/info/CreateRoutineLoadInfo.java | 4 +
.../test_routine_load_swap_table_restart.groovy | 104 +++++++++++++++++++++
3 files changed, 121 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 796c427a16c..406c3f84249 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1990,6 +1990,19 @@ public abstract class RoutineLoadJob
CreateRoutineLoadCommand command = (CreateRoutineLoadCommand)
nereidsParser.parseSingle(
origStmt.originStmt);
CreateRoutineLoadInfo createRoutineLoadInfo =
command.getCreateRoutineLoadInfo();
+ // If tableId is set, resolve the current table name by ID so
that
+ // table rename / SWAP TABLE won't cause replay to fail with
stale name in origStmt.
+ if (!isMultiTable && tableId != 0) {
+ try {
+ Database db =
Env.getCurrentEnv().getInternalCatalog().getDb(dbId).orElse(null);
+ if (db != null) {
+ db.getTable(tableId).ifPresent(
+ table ->
createRoutineLoadInfo.setTableName(table.getName()));
+ }
+ } catch (Exception ignored) {
+ // fall through; let validate() surface the real error
+ }
+ }
createRoutineLoadInfo.validate(ctx);
setRoutineLoadDesc(createRoutineLoadInfo.getRoutineLoadDesc());
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index 450386d3fb4..f740a481b03 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -278,6 +278,10 @@ public class CreateRoutineLoadInfo {
return tableName;
}
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
public String getDBName() {
return dbName;
}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_swap_table_restart.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_swap_table_restart.groovy
new file mode 100644
index 00000000000..d51f9cff139
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_swap_table_restart.groovy
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+
+suite("test_routine_load_swap_table_restart", "docker") {
+ if (!RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+ return
+ }
+
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+ def topic = "test_routine_load_swap_table_restart"
+ def tblTest = "rl_swap_tbl_test"
+ def tblMain = "rl_swap_tbl_main"
+ def jobName = "test_rl_swap_table_job"
+
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+ options.setBeNum(1)
+
+ docker(options) {
+ def runSql = { String q -> sql q }
+
+ sql "DROP TABLE IF EXISTS ${tblTest}"
+ sql "DROP TABLE IF EXISTS ${tblMain}"
+ def createTableSql = { name ->
+ """
+ CREATE TABLE IF NOT EXISTS ${name} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_num" = "1");
+ """
+ }
+ sql createTableSql(tblTest)
+ sql createTableSql(tblMain)
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tblTest}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES ("max_batch_interval" = "5")
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${topic}",
+ "property.group.id" =
"test-swap-table-consumer-group",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // Send data and wait for routine load to be RUNNING with rows
loaded
+ RoutineLoadTestUtils.sendTestDataToKafka(producer, [topic])
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, jobName, tblTest, 0)
+
+ // SWAP tblMain with tblTest:
+ // after swap, tableId of tblTest (the RL target) is now
registered under tblMain name
+ sql "ALTER TABLE ${tblMain} REPLACE WITH TABLE ${tblTest}
PROPERTIES('swap' = 'true')"
+ // DROP tblTest (now holds the original empty tblMain data)
+ sql "DROP TABLE IF EXISTS ${tblTest}"
+ logger.info("Swapped and dropped ${tblTest}")
+ } finally {
+ // Restart FE to trigger gsonPostProcess() replay
+ cluster.restartFrontends()
+ sleep(30000)
+ context.reconnectFe()
+ logger.info("FE restarted and reconnected")
+
+ def res = sql "SHOW ROUTINE LOAD FOR ${jobName}"
+ def stateAfterRestart = res[0][8].toString()
+ logger.info("Routine load state after restart:
${stateAfterRestart}, reason: ${res[0][17]}")
+
+ assertNotEquals("CANCELLED", stateAfterRestart,
+ "Routine load must NOT be CANCELLED after FE restart following
SWAP TABLE + DROP TABLE")
+
+ sql "STOP ROUTINE LOAD FOR ${jobName}"
+ sql "DROP TABLE IF EXISTS ${tblMain}"
+ }
+ }
+
+ producer.close()
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]