This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d985f92c725 branch-3.0: [fix](cloud-schema-change) Make SC tablet job
abort logic really work #50908 (#51203)
d985f92c725 is described below
commit d985f92c7258718c78d18826e355e888d2fb86e8
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat May 24 10:37:27 2025 +0800
branch-3.0: [fix](cloud-schema-change) Make SC tablet job abort logic
really work #50908 (#51203)
Cherry-picked from #50908
Co-authored-by: Siyang Tang <[email protected]>
---
be/src/cloud/cloud_schema_change_job.cpp | 7 ++
cloud/src/meta-service/meta_service_job.cpp | 4 +-
.../org/apache/doris/alter/CloudRollupJobV2.java | 2 +-
.../apache/doris/alter/CloudSchemaChangeJobV2.java | 4 +-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 6 +-
.../cloud/datasource/CloudInternalCatalog.java | 3 +-
.../test_base_compaction_after_sc_fail.groovy | 100 +++++++++++++++++++++
7 files changed, 119 insertions(+), 7 deletions(-)
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index bd043a0d919..ce88be52649 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -122,6 +122,13 @@ Status CloudSchemaChangeJob::process_alter_tablet(const
TAlterTabletReqV2& reque
}
return st;
}
+ DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.alter_fail", {
+ auto res =
+ Status::InternalError("inject alter tablet failed.
base_tablet={}, new_tablet={}",
+ request.base_tablet_id,
request.new_tablet_id);
+ LOG(WARNING) << "inject error. res=" << res;
+ return res;
+ });
if (request.alter_version > 1) {
// [0-1] is a placeholder rowset, no need to convert
RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2,
start_resp.alter_version()},
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 1fcfa24deca..29f1c9993fd 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -1086,7 +1086,9 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
// MUST check initiator to let the retried BE commit this schema_change
job.
if (schema_change.id() != recorded_schema_change.id() ||
- schema_change.initiator() != recorded_schema_change.initiator()) {
+ (schema_change.initiator() != recorded_schema_change.initiator() &&
+ request->action() != FinishTabletJobRequest::ABORT)) {
+ // abort is from FE, so initiator differ from the original one, just
skip this check
SS << "unmatched job id or initiator, recorded_id=" <<
recorded_schema_change.id()
<< " given_id=" << schema_change.id()
<< " recorded_job=" << proto_to_json(recorded_schema_change)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
index 7873632f468..45be8fdec2e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
@@ -137,7 +137,7 @@ public class CloudRollupJobV2 extends RollupJobV2 {
Long rollupTabletId = tabletEntry.getKey();
Long baseTabletId = tabletEntry.getValue();
((CloudInternalCatalog)
Env.getCurrentInternalCatalog())
- .removeSchemaChangeJob(dbId, tableId,
baseIndexId, rollupIndexId,
+ .removeSchemaChangeJob(jobId, dbId, tableId,
baseIndexId, rollupIndexId,
partitionId, baseTabletId, rollupTabletId);
}
LOG.info("Cancel RollupJob. Remove SchemaChangeJob in ms."
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
index 4e5ba45e8f2..2ec6e473987 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
@@ -140,8 +140,8 @@ public class CloudSchemaChangeJobV2 extends
SchemaChangeJobV2 {
Long shadowTabletId = entry.getKey();
Long originTabletId = entry.getValue();
((CloudInternalCatalog)
Env.getCurrentInternalCatalog())
- .removeSchemaChangeJob(dbId, tableId,
originIndexId, shadowIndexId,
- partitionId, originTabletId,
shadowTabletId);
+ .removeSchemaChangeJob(jobId, dbId, tableId,
originIndexId, shadowIndexId,
+ partitionId, originTabletId,
shadowTabletId);
}
LOG.info("Cancel SchemaChange. Remove SchemaChangeJob in
ms."
+ "dbId:{}, tableId:{}, originIndexId:{},
partitionId:{}. tabletSize:{}",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 8141936033c..ea53a931131 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -665,7 +665,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
commitShadowIndex();
// all partitions are good
onFinished(tbl);
- pruneMeta();
LOG.info("schema change job finished: {}", jobId);
@@ -677,6 +676,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
// Write edit log with table's write lock held, to avoid adding
partitions before writing edit log,
// else it will try to transform index in newly added partition
while replaying and result in failure.
Env.getCurrentEnv().getEditLog().logAlterJob(this);
+ pruneMeta();
} finally {
tbl.writeUnlock();
}
@@ -791,7 +791,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
cancelInternal();
- pruneMeta();
this.errMsg = errMsg;
this.finishedTimeMs = System.currentTimeMillis();
changeTableState(dbId, tableId, OlapTableState.NORMAL);
@@ -800,6 +799,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
Env.getCurrentEnv().getEditLog().logAlterJob(this);
LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg);
onCancel();
+ pruneMeta();
return true;
}
@@ -937,6 +937,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
LOG.info("replay finished schema change job: {} table id: {}", jobId,
tableId);
changeTableState(dbId, tableId, OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL when replay finished, table id:
{}, job id: {}", tableId, jobId);
+ pruneMeta();
}
/**
@@ -952,6 +953,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
LOG.info("replay cancelled schema change job: {}", jobId);
changeTableState(dbId, tableId, OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL when replay cancelled, table id:
{}, job id: {}", tableId, jobId);
+ pruneMeta();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 7867931f071..aa3286a74ae 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -838,7 +838,7 @@ public class CloudInternalCatalog extends InternalCatalog {
}
}
- public void removeSchemaChangeJob(long dbId, long tableId, long indexId,
long newIndexId,
+ public void removeSchemaChangeJob(long jobId, long dbId, long tableId,
long indexId, long newIndexId,
long partitionId, long tabletId, long newTabletId)
throws DdlException {
Cloud.FinishTabletJobRequest.Builder finishTabletJobRequestBuilder =
Cloud.FinishTabletJobRequest.newBuilder();
@@ -867,6 +867,7 @@ public class CloudInternalCatalog extends InternalCatalog {
newtabletIndexPBBuilder.setTabletId(newTabletId);
final Cloud.TabletIndexPB newtabletIndex =
newtabletIndexPBBuilder.build();
schemaChangeJobPBBuilder.setNewTabletIdx(newtabletIndex);
+ schemaChangeJobPBBuilder.setId(String.valueOf(jobId));
final Cloud.TabletSchemaChangeJobPB tabletSchemaChangeJobPb =
schemaChangeJobPBBuilder.build();
diff --git
a/regression-test/suites/compaction/test_base_compaction_after_sc_fail.groovy
b/regression-test/suites/compaction/test_base_compaction_after_sc_fail.groovy
new file mode 100644
index 00000000000..815bfc6281e
--- /dev/null
+++
b/regression-test/suites/compaction/test_base_compaction_after_sc_fail.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.NodeType
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+
+suite("test_base_compaction_after_sc_fail", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ def tableName = "test_base_compaction_after_sc_fail"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int
+ ) DUPLICATE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1");
+ """
+
+ def injectBe = null
+ def backends = sql_return_maparray('show backends')
+ def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
+ def injectBeId = array[0].BackendId
+ def originTabletId = array[0].TabletId
+ injectBe = backends.stream().filter(be -> be.BackendId ==
injectBeId).findFirst().orElse(null)
+ assertNotNull(injectBe)
+
+ def injectName = "CloudSchemaChangeJob::process_alter_tablet.alter_fail"
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(injectName)
+
+ sql """ ALTER TABLE ${tableName} MODIFY COLUMN c1 VARCHAR(44) """
+
+ def wait_for_schema_change = {
+ def try_times=1000
+ while(true){
+ def res = sql " SHOW ALTER TABLE COLUMN WHERE
TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(10)
+ if(res[0][9].toString() == "CANCELLED") {
+ break;
+ }
+ assert(try_times>0)
+ try_times--
+ }
+ }
+ wait_for_schema_change()
+
+ def insert_data = {
+ for (i in 0..100) {
+ sql """ INSERT INTO ${tableName} VALUES(1, "2", 3, 4) """
+ sql """ DELETE FROM ${tableName} WHERE k1=1 """
+ }
+ }
+
+ insert_data()
+
+ trigger_and_wait_compaction(tableName, "cumulative")
+
+ insert_data()
+
+ trigger_and_wait_compaction(tableName, "cumulative")
+
+ insert_data()
+
+ trigger_and_wait_compaction(tableName, "cumulative")
+
+ trigger_and_wait_compaction(tableName, "base")
+
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(injectName)
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]