This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d985f92c725 branch-3.0: [fix](cloud-schema-change) Make SC tablet job 
abort logic really work #50908 (#51203)
d985f92c725 is described below

commit d985f92c7258718c78d18826e355e888d2fb86e8
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat May 24 10:37:27 2025 +0800

    branch-3.0: [fix](cloud-schema-change) Make SC tablet job abort logic 
really work #50908 (#51203)
    
    Cherry-picked from #50908
    
    Co-authored-by: Siyang Tang <[email protected]>
---
 be/src/cloud/cloud_schema_change_job.cpp           |   7 ++
 cloud/src/meta-service/meta_service_job.cpp        |   4 +-
 .../org/apache/doris/alter/CloudRollupJobV2.java   |   2 +-
 .../apache/doris/alter/CloudSchemaChangeJobV2.java |   4 +-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |   6 +-
 .../cloud/datasource/CloudInternalCatalog.java     |   3 +-
 .../test_base_compaction_after_sc_fail.groovy      | 100 +++++++++++++++++++++
 7 files changed, 119 insertions(+), 7 deletions(-)

diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index bd043a0d919..ce88be52649 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -122,6 +122,13 @@ Status CloudSchemaChangeJob::process_alter_tablet(const 
TAlterTabletReqV2& reque
         }
         return st;
     }
+    DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.alter_fail", {
+        auto res =
+                Status::InternalError("inject alter tablet failed. 
base_tablet={}, new_tablet={}",
+                                      request.base_tablet_id, 
request.new_tablet_id);
+        LOG(WARNING) << "inject error. res=" << res;
+        return res;
+    });
     if (request.alter_version > 1) {
         // [0-1] is a placeholder rowset, no need to convert
         RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, 
start_resp.alter_version()},
diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 1fcfa24deca..29f1c9993fd 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -1086,7 +1086,9 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
 
     // MUST check initiator to let the retried BE commit this schema_change 
job.
     if (schema_change.id() != recorded_schema_change.id() ||
-        schema_change.initiator() != recorded_schema_change.initiator()) {
+        (schema_change.initiator() != recorded_schema_change.initiator() &&
+         request->action() != FinishTabletJobRequest::ABORT)) {
+        // abort is from FE, so initiator differ from the original one, just 
skip this check
         SS << "unmatched job id or initiator, recorded_id=" << 
recorded_schema_change.id()
            << " given_id=" << schema_change.id()
            << " recorded_job=" << proto_to_json(recorded_schema_change)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
index 7873632f468..45be8fdec2e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
@@ -137,7 +137,7 @@ public class CloudRollupJobV2 extends RollupJobV2 {
                         Long rollupTabletId = tabletEntry.getKey();
                         Long baseTabletId = tabletEntry.getValue();
                         ((CloudInternalCatalog) 
Env.getCurrentInternalCatalog())
-                                .removeSchemaChangeJob(dbId, tableId, 
baseIndexId, rollupIndexId,
+                                .removeSchemaChangeJob(jobId, dbId, tableId, 
baseIndexId, rollupIndexId,
                                     partitionId, baseTabletId, rollupTabletId);
                     }
                     LOG.info("Cancel RollupJob. Remove SchemaChangeJob in ms."
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
index 4e5ba45e8f2..2ec6e473987 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
@@ -140,8 +140,8 @@ public class CloudSchemaChangeJobV2 extends 
SchemaChangeJobV2 {
                         Long shadowTabletId = entry.getKey();
                         Long originTabletId = entry.getValue();
                         ((CloudInternalCatalog) 
Env.getCurrentInternalCatalog())
-                                .removeSchemaChangeJob(dbId, tableId, 
originIndexId, shadowIndexId,
-                                    partitionId, originTabletId, 
shadowTabletId);
+                                .removeSchemaChangeJob(jobId, dbId, tableId, 
originIndexId, shadowIndexId,
+                                        partitionId, originTabletId, 
shadowTabletId);
                     }
                     LOG.info("Cancel SchemaChange. Remove SchemaChangeJob in 
ms."
                             + "dbId:{}, tableId:{}, originIndexId:{}, 
partitionId:{}. tabletSize:{}",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 8141936033c..ea53a931131 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -665,7 +665,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             commitShadowIndex();
             // all partitions are good
             onFinished(tbl);
-            pruneMeta();
 
             LOG.info("schema change job finished: {}", jobId);
 
@@ -677,6 +676,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             // Write edit log with table's write lock held, to avoid adding 
partitions before writing edit log,
             // else it will try to transform index in newly added partition 
while replaying and result in failure.
             Env.getCurrentEnv().getEditLog().logAlterJob(this);
+            pruneMeta();
         } finally {
             tbl.writeUnlock();
         }
@@ -791,7 +791,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
         cancelInternal();
 
-        pruneMeta();
         this.errMsg = errMsg;
         this.finishedTimeMs = System.currentTimeMillis();
         changeTableState(dbId, tableId, OlapTableState.NORMAL);
@@ -800,6 +799,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         Env.getCurrentEnv().getEditLog().logAlterJob(this);
         LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg);
         onCancel();
+        pruneMeta();
 
         return true;
     }
@@ -937,6 +937,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         LOG.info("replay finished schema change job: {} table id: {}", jobId, 
tableId);
         changeTableState(dbId, tableId, OlapTableState.NORMAL);
         LOG.info("set table's state to NORMAL when replay finished, table id: 
{}, job id: {}", tableId, jobId);
+        pruneMeta();
     }
 
     /**
@@ -952,6 +953,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         LOG.info("replay cancelled schema change job: {}", jobId);
         changeTableState(dbId, tableId, OlapTableState.NORMAL);
         LOG.info("set table's state to NORMAL when replay cancelled, table id: 
{}, job id: {}", tableId, jobId);
+        pruneMeta();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 7867931f071..aa3286a74ae 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -838,7 +838,7 @@ public class CloudInternalCatalog extends InternalCatalog {
         }
     }
 
-    public void removeSchemaChangeJob(long dbId, long tableId, long indexId, 
long newIndexId,
+    public void removeSchemaChangeJob(long jobId, long dbId, long tableId, 
long indexId, long newIndexId,
             long partitionId, long tabletId, long newTabletId)
             throws DdlException {
         Cloud.FinishTabletJobRequest.Builder finishTabletJobRequestBuilder = 
Cloud.FinishTabletJobRequest.newBuilder();
@@ -867,6 +867,7 @@ public class CloudInternalCatalog extends InternalCatalog {
         newtabletIndexPBBuilder.setTabletId(newTabletId);
         final Cloud.TabletIndexPB newtabletIndex = 
newtabletIndexPBBuilder.build();
         schemaChangeJobPBBuilder.setNewTabletIdx(newtabletIndex);
+        schemaChangeJobPBBuilder.setId(String.valueOf(jobId));
         final Cloud.TabletSchemaChangeJobPB tabletSchemaChangeJobPb =
                 schemaChangeJobPBBuilder.build();
 
diff --git 
a/regression-test/suites/compaction/test_base_compaction_after_sc_fail.groovy 
b/regression-test/suites/compaction/test_base_compaction_after_sc_fail.groovy
new file mode 100644
index 00000000000..815bfc6281e
--- /dev/null
+++ 
b/regression-test/suites/compaction/test_base_compaction_after_sc_fail.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.NodeType
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+
+suite("test_base_compaction_after_sc_fail", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+    
+    def tableName = "test_base_compaction_after_sc_fail"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int NOT NULL,
+                `c1` int,
+                `c2` int,
+                `c3` int
+                ) DUPLICATE KEY(k1)
+            DISTRIBUTED BY HASH(k1) BUCKETS 1
+            PROPERTIES (
+                "disable_auto_compaction" = "true",
+                "replication_num" = "1");
+    """
+
+    def injectBe = null
+    def backends = sql_return_maparray('show backends')
+    def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
+    def injectBeId = array[0].BackendId
+    def originTabletId = array[0].TabletId
+    injectBe = backends.stream().filter(be -> be.BackendId == 
injectBeId).findFirst().orElse(null)
+    assertNotNull(injectBe)
+
+    def injectName = "CloudSchemaChangeJob::process_alter_tablet.alter_fail"
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    try {
+        GetDebugPoint().enableDebugPointForAllBEs(injectName)
+
+        sql """ ALTER TABLE ${tableName} MODIFY COLUMN c1 VARCHAR(44) """
+
+        def wait_for_schema_change = {
+                    def try_times=1000
+                    while(true){
+                        def res = sql " SHOW ALTER TABLE COLUMN WHERE 
TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+                        Thread.sleep(10)
+                        if(res[0][9].toString() == "CANCELLED") {
+                            break;
+                        }
+                        assert(try_times>0)
+                        try_times--
+                    }
+                }
+        wait_for_schema_change()
+
+        def insert_data = {
+            for (i in 0..100) {
+                sql """ INSERT INTO ${tableName} VALUES(1, "2", 3, 4) """
+                sql """ DELETE FROM ${tableName} WHERE k1=1 """
+            }
+        }
+
+        insert_data()
+
+        trigger_and_wait_compaction(tableName, "cumulative")
+
+        insert_data()
+
+        trigger_and_wait_compaction(tableName, "cumulative")
+
+        insert_data()
+
+        trigger_and_wait_compaction(tableName, "cumulative")
+
+        trigger_and_wait_compaction(tableName, "base")
+
+    } finally {
+        GetDebugPoint().disableDebugPointForAllBEs(injectName)
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to