This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d68997346bf [fix](job) fix routine load job incorrectly cancelled on 
FE restart after swap table (#61046)
d68997346bf is described below

commit d68997346bf5f8cca394b835a855b98b845f7a97
Author: hui lai <[email protected]>
AuthorDate: Mon Mar 9 09:52:40 2026 +0800

    [fix](job) fix routine load job incorrectly cancelled on FE restart after 
swap table (#61046)
    
    ### Problem
    
    After performing `REPLACE TABLE` (swap) followed by `DROP TABLE` on the
    original table name,
    restarting FE causes all routine load jobs targeting the original table
    to be set to `CANCELLED`.
    
    ### Steps to Reproduce
    
    1. Create table `tbl_test`
    2. Create a routine load job on `tbl_test`
    3. `ALTER TABLE tbl_main REPLACE WITH TABLE tbl_test PROPERTIES('swap' =
    'true')`
       → `tbl_test` (by ID) is now registered under the name `tbl_main`
       → `tbl_main` (by ID) is now registered under the name `tbl_test`
    4. `DROP TABLE tbl_test` (drops the old `tbl_main` data, now named
    `tbl_test`)
    5. Restart FE → routine load job state becomes `CANCELLED`
    
    ### Root Cause
    
    `RoutineLoadJob.gsonPostProcess()` is called during image replay on FE
    restart.
    It re-parses the original `CREATE ROUTINE LOAD` SQL (`origStmt`) and
    calls
    `CreateRoutineLoadInfo.validate()`, which calls `checkDBTable()` to look
    up the
    target table **by name**.
    
    The name stored in `origStmt` is `tbl_test` (the name at creation time).
    After the
    swap + drop sequence, this name no longer exists in the catalog. The
    lookup throws
    `AnalysisException: Unknown table 'tbl_test'`, which is caught and the
    job state is
    set to `CANCELLED`.
    
    The `tableId` field is already correctly persisted and still points to
    the original
    table object (now registered under `tbl_main`), but `gsonPostProcess()`
    never uses it.
    
    ### Fix
    
    Before calling `validate()` in `gsonPostProcess()`, resolve the
    **current** table name
    from the catalog using the persisted `tableId`, and override the stale
    name parsed from
    `origStmt`. If `tableId` resolves to a valid table, validation succeeds
    with the correct
    name. If the table has genuinely been deleted (no entry for `tableId`),
    validation still
    fails and the job is correctly cancelled.
    
    ### Changes
    
    - `RoutineLoadJob.java`: in `gsonPostProcess()`, look up current table
    name by `tableId`
      before calling `validate()`, overriding the stale name from `origStmt`
    - `CreateRoutineLoadInfo.java`: add `setTableName()` to allow the name
    override
    - Add regression test `test_routine_load_swap_table_restart`
---
 .../doris/load/routineload/RoutineLoadJob.java     |  13 +++
 .../plans/commands/info/CreateRoutineLoadInfo.java |   4 +
 .../test_routine_load_swap_table_restart.groovy    | 104 +++++++++++++++++++++
 3 files changed, 121 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 796c427a16c..406c3f84249 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1990,6 +1990,19 @@ public abstract class RoutineLoadJob
                 CreateRoutineLoadCommand command = (CreateRoutineLoadCommand) 
nereidsParser.parseSingle(
                         origStmt.originStmt);
                 CreateRoutineLoadInfo createRoutineLoadInfo = 
command.getCreateRoutineLoadInfo();
+                // If tableId is set, resolve the current table name by ID so 
that
+                // table rename / SWAP TABLE won't cause replay to fail with 
stale name in origStmt.
+                if (!isMultiTable && tableId != 0) {
+                    try {
+                        Database db = 
Env.getCurrentEnv().getInternalCatalog().getDb(dbId).orElse(null);
+                        if (db != null) {
+                            db.getTable(tableId).ifPresent(
+                                    table -> 
createRoutineLoadInfo.setTableName(table.getName()));
+                        }
+                    } catch (Exception ignored) {
+                        // fall through; let validate() surface the real error
+                    }
+                }
                 createRoutineLoadInfo.validate(ctx);
                 setRoutineLoadDesc(createRoutineLoadInfo.getRoutineLoadDesc());
             } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index 450386d3fb4..f740a481b03 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -278,6 +278,10 @@ public class CreateRoutineLoadInfo {
         return tableName;
     }
 
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
     public String getDBName() {
         return dbName;
     }
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_swap_table_restart.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_swap_table_restart.groovy
new file mode 100644
index 00000000000..d51f9cff139
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_swap_table_restart.groovy
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+
+suite("test_routine_load_swap_table_restart", "docker") {
+    if (!RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+        return
+    }
+
+    def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+    def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+    def topic = "test_routine_load_swap_table_restart"
+    def tblTest = "rl_swap_tbl_test"
+    def tblMain = "rl_swap_tbl_main"
+    def jobName = "test_rl_swap_table_job"
+
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    options.setBeNum(1)
+
+    docker(options) {
+        def runSql = { String q -> sql q }
+
+        sql "DROP TABLE IF EXISTS ${tblTest}"
+        sql "DROP TABLE IF EXISTS ${tblMain}"
+        def createTableSql = { name ->
+            """
+            CREATE TABLE IF NOT EXISTS ${name} (
+                `k1` int(20)  NULL,
+                `k2` string   NULL,
+                `v1` date     NULL,
+                `v2` string   NULL,
+                `v3` datetime NULL,
+                `v4` string   NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_num" = "1");
+            """
+        }
+        sql createTableSql(tblTest)
+        sql createTableSql(tblMain)
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tblTest}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES ("max_batch_interval" = "5")
+                FROM KAFKA
+                (
+                    "kafka_broker_list"              = "${kafka_broker}",
+                    "kafka_topic"                    = "${topic}",
+                    "property.group.id"              = 
"test-swap-table-consumer-group",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // Send data and wait for routine load to be RUNNING with rows 
loaded
+            RoutineLoadTestUtils.sendTestDataToKafka(producer, [topic])
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, jobName, tblTest, 0)
+
+            // SWAP tblMain with tblTest:
+            //   after swap, tableId of tblTest (the RL target) is now 
registered under tblMain name
+            sql "ALTER TABLE ${tblMain} REPLACE WITH TABLE ${tblTest} 
PROPERTIES('swap' = 'true')"
+            // DROP tblTest (now holds the original empty tblMain data)
+            sql "DROP TABLE IF EXISTS ${tblTest}"
+            logger.info("Swapped and dropped ${tblTest}")
+        } finally {
+            // Restart FE to trigger gsonPostProcess() replay
+            cluster.restartFrontends()
+            sleep(30000)
+            context.reconnectFe()
+            logger.info("FE restarted and reconnected")
+
+            def res = sql "SHOW ROUTINE LOAD FOR ${jobName}"
+            def stateAfterRestart = res[0][8].toString()
+            logger.info("Routine load state after restart: 
${stateAfterRestart}, reason: ${res[0][17]}")
+
+            assertNotEquals("CANCELLED", stateAfterRestart,
+                "Routine load must NOT be CANCELLED after FE restart following 
SWAP TABLE + DROP TABLE")
+
+            sql "STOP ROUTINE LOAD FOR ${jobName}"
+            sql "DROP TABLE IF EXISTS ${tblMain}"
+        }
+    }
+
+    producer.close()
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to