This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new b6f61e99082 branch-4.1: [fix](cloud) Refresh base tablet before schema
change V1 #64312 (#64430)
b6f61e99082 is described below
commit b6f61e99082e0b66036924a060f039c8ca8fa8f2
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 16 16:06:18 2026 +0800
branch-4.1: [fix](cloud) Refresh base tablet before schema change V1 #64312
(#64430)
Cherry-picked from #64312
Co-authored-by: Jimmy <[email protected]>
---
be/src/cloud/cloud_schema_change_job.cpp | 3 +-
be/src/cloud/cloud_tablet.cpp | 12 ++
be/test/cloud/cloud_schema_change_job_test.cpp | 78 ++++++++++
...c_compaction_cross_v1_stale_base_refresh.groovy | 158 +++++++++++++++++++++
4 files changed, 250 insertions(+), 1 deletion(-)
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index e6b83a1d5fe..87d11264d10 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -105,7 +105,8 @@ Status CloudSchemaChangeJob::process_alter_tablet(const
TAlterTabletReqV2& reque
}
// MUST sync rowsets before capturing rowset readers and building
DeleteHandler
SyncOptions options;
- options.query_version = request.alter_version;
+ // The SC boundary (V1) must be calculated from the latest visible rowsets
of the base
+ // tablet. Do not cap this sync by request.alter_version, which may be
stale across retries.
RETURN_IF_ERROR(_base_tablet->sync_rowsets(options));
// ATTN: Only convert rowsets of version larger than 1, MUST let the new
tablet cache have rowset [0-1]
_output_cumulative_point = _base_tablet->cumulative_layer_point();
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 92ace798789..efd766a4a81 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -289,6 +289,18 @@ Status CloudTablet::sync_rowsets(const SyncOptions&
options, SyncRowsetStats* st
RETURN_IF_ERROR(sync_if_not_running(stats));
if (options.query_version > 0) {
+
DBUG_EXECUTE_IF("CloudTablet::sync_rowsets.stale_local_max_for_query_version", {
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ auto stale_version = dp->param<int64_t>("version", -1);
+ if (target_tablet_id == tablet_id() && stale_version >= 0) {
+ std::unique_lock wlock(_meta_lock);
+ LOG(INFO) << "override cloud tablet local max_version for
query_version sync"
+ << ", tablet_id=" << tablet_id() << ",
old_max_version=" << _max_version
+ << ", stale_version=" << stale_version
+ << ", query_version=" << options.query_version;
+ _max_version = stale_version;
+ }
+ });
auto lock_start = std::chrono::steady_clock::now();
std::shared_lock rlock(_meta_lock);
if (stats) {
diff --git a/be/test/cloud/cloud_schema_change_job_test.cpp
b/be/test/cloud/cloud_schema_change_job_test.cpp
index 972ff2af255..74b27bed7ad 100644
--- a/be/test/cloud/cloud_schema_change_job_test.cpp
+++ b/be/test/cloud/cloud_schema_change_job_test.cpp
@@ -205,6 +205,84 @@ TEST_F(CloudSchemaChangeJobTest,
FillVersionHolesBeforeNewTabletRunning) {
ASSERT_EQ(versions[1], Version(4, 4));
}
+TEST_F(CloudSchemaChangeJobTest,
RefreshBaseTabletBeforeRegisteringSchemaChangeJob) {
+ int64_t base_tablet_id = 50001;
+ int64_t new_tablet_id = 50002;
+
+ TabletMetaSharedPtr base_meta(new TabletMeta(
+ 1, 2, base_tablet_id, base_tablet_id + 100, 4, 5, TTabletSchema(),
6, {{7, 8}},
+ UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
+ TabletMetaSharedPtr new_meta(new TabletMeta(
+ 1, 2, new_tablet_id, new_tablet_id + 100, 4, 5, TTabletSchema(),
6, {{7, 8}},
+ UniqueId(11, 12), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
+
+ auto base_tablet = std::make_shared<CloudTablet>(_engine,
std::move(base_meta));
+ auto new_tablet = std::make_shared<CloudTablet>(_engine,
std::move(new_meta));
+ static_cast<void>(new_tablet->set_tablet_state(TABLET_NOTREADY));
+
+ auto base_initial_rowset = create_rowset(base_tablet->tablet_schema(),
base_tablet_id, 2, 6);
+ auto base_latest_rowset = create_rowset(base_tablet->tablet_schema(),
base_tablet_id, 7, 10);
+
+ auto* sp = SyncPoint::get_instance();
+ sp->clear_all_call_backs();
+ sp->enable_processing();
+
+ sp->set_call_back("CloudMetaMgr::get_tablet_meta", [&](auto&& args) {
+ auto tablet_id = try_any_cast<int64_t>(args[0]);
+ auto* meta_ptr = try_any_cast<TabletMetaSharedPtr*>(args[1]);
+ if (tablet_id == base_tablet_id) {
+ *meta_ptr = base_tablet->tablet_meta();
+ } else if (tablet_id == new_tablet_id) {
+ *meta_ptr = new_tablet->tablet_meta();
+ }
+ try_any_cast_ret<Status>(args)->second = true;
+ });
+
+ int base_sync_count = 0;
+ sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", [&](auto&& outcome)
{
+ auto* tablet = try_any_cast<CloudTablet*>(outcome[0]);
+ if (tablet->tablet_id() == base_tablet_id) {
+ ++base_sync_count;
+ std::unique_lock lock(tablet->get_header_lock());
+ if (base_sync_count == 1) {
+ tablet->add_rowsets({base_initial_rowset}, false, lock);
+ } else {
+ tablet->add_rowsets({base_latest_rowset}, false, lock);
+ }
+ }
+ auto* pairs = try_any_cast_ret<Status>(outcome);
+ pairs->second = true;
+ pairs->first = Status::OK();
+ });
+
+ int64_t prepared_alter_version = -1;
+ sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [&](auto&& outcome) {
+ auto job = try_any_cast<cloud::TabletJobInfoPB>(outcome[0]);
+ ASSERT_TRUE(job.has_schema_change());
+ prepared_alter_version = job.schema_change().alter_version();
+
+ auto* pairs = try_any_cast_ret<Status>(outcome);
+ pairs->second = true;
+ pairs->first = Status::InternalError("mock job already success");
+
+ auto* resp = try_any_cast<cloud::StartTabletJobResponse*>(outcome[1]);
+ resp->mutable_status()->set_code(cloud::JOB_ALREADY_SUCCESS);
+ });
+
+ TAlterTabletReqV2 request;
+ request.base_tablet_id = base_tablet_id;
+ request.new_tablet_id = new_tablet_id;
+ request.alter_version = 4;
+ request.__set_alter_tablet_type(TAlterTabletType::SCHEMA_CHANGE);
+
+ CloudSchemaChangeJob sc_job(_engine, "test_refresh_base_tablet_before_sc",
9999999999);
+ auto status = sc_job.process_alter_tablet(request);
+
+ ASSERT_TRUE(status.ok()) << status.to_string();
+ ASSERT_EQ(base_sync_count, 2);
+ ASSERT_EQ(prepared_alter_version, 10);
+}
+
// Test: cross-V1 compaction detected → abort SC job → return
SC_COMPACTION_CONFLICT
TEST_F(CloudSchemaChangeJobTest, CrossV1CompactionDetected) {
int64_t base_tablet_id = 10001;
diff --git
a/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy
new file mode 100644
index 00000000000..679385562b5
--- /dev/null
+++
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_stale_base_refresh.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test: schema change retry refreshes base tablet rowsets before registering
a fresh V1.
+//
+// Timeline:
+// 1. Block SC, insert data, and let new tablet compaction create a cross-V1
rowset.
+// 2. Force V1=6 once to prove the cross-V1 retry path is active.
+// 3. Disable the V1 override, but inject stale local base max=6 only for
query_version sync.
+// 4. Retry must refresh the base tablet and finish with the latest V1.
+//
+// The stale-local-max debug point only fires when the caller uses
SyncOptions.query_version.
+// Old code capped SC base sync by request.alter_version, so it keeps V1 stale
and cannot finish.
+// The fixed path does not set query_version, so it refreshes from
meta-service and succeeds.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_sc_compaction_cross_v1_stale_base_refresh', 'docker') {
+
+ def options = new ClusterOptions()
+ options.cloudMode = true
+ options.enableDebugPoints()
+ options.beConfigs += ["enable_java_support=false"]
+ options.beConfigs += ["enable_new_tablet_do_compaction=true"]
+ options.beConfigs += ["alter_tablet_worker_count=1"]
+ options.beConfigs += ["cumulative_compaction_min_deltas=2"]
+ options.beNum = 1
+ options.feConfigs += ["http_port=8030"]
+ options.feConfigs += ["rpc_port=9020"]
+ options.feConfigs += ["query_port=9030"]
+ options.feConfigs += ["edit_log_port=9010"]
+ options.feConfigs += ["enable_schema_change_retry=true"]
+ options.feConfigs += ["schema_change_max_retry_time=10"]
+
+ docker(options) {
+ def getJobState = {
+ def result = sql """
+ SHOW ALTER TABLE COLUMN
+ WHERE IndexName='sc_cross_v1_stale_base_refresh_test'
+ ORDER BY createtime DESC LIMIT 1
+ """
+ logger.info("getJobState: ${result}")
+ return result[0][9]
+ }
+
+ sql "DROP TABLE IF EXISTS sc_cross_v1_stale_base_refresh_test"
+ sql """
+ CREATE TABLE sc_cross_v1_stale_base_refresh_test (
+ k1 int NOT NULL,
+ v1 varchar(100) NOT NULL,
+ v2 int NOT NULL
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ def tablets = sql_return_maparray "SHOW TABLETS FROM
sc_cross_v1_stale_base_refresh_test"
+ assertEquals(1, tablets.size())
+ def baseTabletId = tablets[0].TabletId.toString()
+ logger.info("base tablet id for stale refresh test: ${baseTabletId}")
+
+ for (int i = 0; i < 3; i++) {
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO sc_cross_v1_stale_base_refresh_test VALUES
")
+ for (int j = 0; j < 20; j++) {
+ if (j > 0) {
+ sb.append(", ")
+ }
+ def key = i * 20 + j + 1
+ sb.append("(${key}, 'val_${key}', ${key * 10})")
+ }
+ sql sb.toString()
+ }
+ assertEquals(60L, (sql "SELECT count(*) FROM
sc_cross_v1_stale_base_refresh_test")[0][0])
+
+ def scBlock = 'CloudSchemaChangeJob::process_alter_tablet.block'
+ def overrideDP =
'CloudSchemaChangeJob::process_alter_tablet.override_base_max_version'
+ def staleMaxDP =
'CloudTablet::sync_rowsets.stale_local_max_for_query_version'
+
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(scBlock)
+ try {
+ sql "ALTER TABLE sc_cross_v1_stale_base_refresh_test MODIFY
COLUMN v2 bigint"
+ sleep(10000)
+ assertEquals("RUNNING", getJobState())
+
+ for (int i = 0; i < 6; i++) {
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO sc_cross_v1_stale_base_refresh_test
VALUES ")
+ for (int j = 0; j < 10; j++) {
+ if (j > 0) {
+ sb.append(", ")
+ }
+ def key = 100 + i * 10 + j + 1
+ sb.append("(${key}, 'new_${key}', ${key * 10})")
+ }
+ sql sb.toString()
+ }
+
+ sleep(30000)
+ GetDebugPoint().enableDebugPointForAllBEs(overrideDP,
[version: 6])
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(scBlock)
+ }
+
+ sleep(10000)
+ assertEquals("RUNNING", getJobState(),
+ "SC should still be RUNNING while V1 is forced to cross
the compacted rowset")
+
+ GetDebugPoint().enableDebugPointForAllBEs(
+ staleMaxDP, [tablet_id: baseTabletId, version: 6])
+ GetDebugPoint().disableDebugPointForAllBEs(overrideDP)
+
+ int maxTries = 180
+ def finalState = ""
+ while (maxTries-- > 0) {
+ finalState = getJobState()
+ if (finalState == "FINISHED" || finalState == "CANCELLED") {
+ break
+ }
+ sleep(1000)
+ }
+
+ logger.info("SC final state after stale base refresh retry:
${finalState}")
+ assertEquals("FINISHED", finalState)
+
+ assertEquals(120L,
+ (sql "SELECT count(*) FROM
sc_cross_v1_stale_base_refresh_test")[0][0])
+
+ def columns = sql "DESC sc_cross_v1_stale_base_refresh_test"
+ def v2Col = columns.find { it[0] == "v2" }
+ assertTrue(v2Col[1].toString().toLowerCase().contains("bigint"),
+ "v2 column should be bigint after schema change, got:
${v2Col[1]}")
+
+ def backends = sql_return_maparray("SHOW BACKENDS")
+ assertTrue(backends.every { it.Alive.toString() == "true" },
+ "BE should be alive after stale base refresh retry")
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]