This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 9ac86aae215 branch-4.1: [opt](cloud) Enable compaction on new tablets 
during schema change queuing #61089 (#61629)
9ac86aae215 is described below

commit 9ac86aae215fafd21309de2e498457d456334a03
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 24 16:38:55 2026 +0800

    branch-4.1: [opt](cloud) Enable compaction on new tablets during schema 
change queuing #61089 (#61629)
    
    Cherry-picked from #61089
    
    Co-authored-by: Jimmy <[email protected]>
---
 be/src/agent/agent_server.cpp                      |   3 +-
 be/src/agent/task_worker_pool.cpp                  |  50 +++++-
 be/src/agent/task_worker_pool.h                    |   6 +-
 be/src/cloud/cloud_cumulative_compaction.cpp       |  16 ++
 be/src/cloud/cloud_schema_change_job.cpp           |  37 +++++
 be/test/agent/task_worker_pool_test.cpp            |  49 ++++++
 .../test_sc_compaction_cross_v1_race.groovy        | 136 ++++++++++++++++
 .../test_sc_compaction_optimization.groovy         | 159 +++++++++++++++++++
 ...est_sc_compaction_optimization_with_load.groovy | 174 +++++++++++++++++++++
 9 files changed, 625 insertions(+), 5 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index c54390d16cf..c00613b5df6 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -211,7 +211,8 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& 
engine, ExecEnv* exec_
 
     _workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>(
             "ALTER_TABLE", config::alter_tablet_worker_count,
-            [&engine](auto&& task) { return 
alter_cloud_tablet_callback(engine, task); });
+            [&engine](auto&& task) { return 
alter_cloud_tablet_callback(engine, task); },
+            [&engine](auto&& task) { set_alter_version_before_enqueue(engine, 
task); });
 
     _workers[TTaskType::CALCULATE_DELETE_BITMAP] = 
std::make_unique<TaskWorkerPool>(
             "CALC_DBM_TASK", config::calc_delete_bitmap_worker_count,
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index ffdd110cf81..252f553dc55 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -520,9 +520,11 @@ bvar::Adder<uint64_t> report_index_policy_failed("report", 
"index_policy_failed"
 
 } // namespace
 
-TaskWorkerPool::TaskWorkerPool(std::string_view name, int worker_count,
-                               std::function<void(const TAgentTaskRequest& 
task)> callback)
-        : _callback(std::move(callback)) {
+TaskWorkerPool::TaskWorkerPool(
+        std::string_view name, int worker_count,
+        std::function<void(const TAgentTaskRequest& task)> callback,
+        std::function<void(const TAgentTaskRequest& task)> pre_submit_callback)
+        : _callback(std::move(callback)), 
_pre_submit_callback(std::move(pre_submit_callback)) {
     auto st = ThreadPoolBuilder(fmt::format("TaskWP_{}", name))
                       .set_min_threads(worker_count)
                       .set_max_threads(worker_count)
@@ -546,6 +548,9 @@ void TaskWorkerPool::stop() {
 
 Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
     return _submit_task(task, [this](auto&& task) {
+        if (_pre_submit_callback) {
+            _pre_submit_callback(task);
+        }
         add_task_count(task, 1);
         return _thread_pool->submit_func([this, task]() {
             _callback(task);
@@ -2239,9 +2244,48 @@ void alter_cloud_tablet_callback(CloudStorageEngine& 
engine, const TAgentTaskReq
                           std::chrono::system_clock::now().time_since_epoch())
                           .count();
     g_fragment_last_active_time.set_value(now);
+
+    // Clean up alter_version before remove_task_info to avoid race:
+    // remove_task_info allows same-signature re-submit, whose 
pre_submit_callback
+    // would set alter_version, then this cleanup would wipe it.
+    if (req.__isset.alter_tablet_req_v2) {
+        const auto& alter_req = req.alter_tablet_req_v2;
+        auto new_tablet = 
engine.tablet_mgr().get_tablet(alter_req.new_tablet_id);
+        auto base_tablet = 
engine.tablet_mgr().get_tablet(alter_req.base_tablet_id);
+        if (new_tablet.has_value()) {
+            new_tablet.value()->set_alter_version(-1);
+        }
+        if (base_tablet.has_value()) {
+            base_tablet.value()->set_alter_version(-1);
+        }
+    }
+
     remove_task_info(req.task_type, req.signature);
 }
 
+void set_alter_version_before_enqueue(CloudStorageEngine& engine, const 
TAgentTaskRequest& req) {
+    if (!req.__isset.alter_tablet_req_v2) {
+        return;
+    }
+    const auto& alter_req = req.alter_tablet_req_v2;
+    if (alter_req.alter_version <= 1) {
+        return;
+    }
+    auto new_tablet = engine.tablet_mgr().get_tablet(alter_req.new_tablet_id);
+    if (!new_tablet.has_value() || new_tablet.value()->tablet_state() == 
TABLET_RUNNING) {
+        return;
+    }
+    auto base_tablet = 
engine.tablet_mgr().get_tablet(alter_req.base_tablet_id);
+    if (!base_tablet.has_value()) {
+        return;
+    }
+    new_tablet.value()->set_alter_version(alter_req.alter_version);
+    base_tablet.value()->set_alter_version(alter_req.alter_version);
+    LOG(INFO) << "set alter_version=" << alter_req.alter_version
+              << " before enqueue, base_tablet=" << alter_req.base_tablet_id
+              << ", new_tablet=" << alter_req.new_tablet_id;
+}
+
 void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
     std::unordered_map<int64_t, int64_t> gc_tablet_infos;
     if (!req.__isset.gc_binlog_req) {
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 300e1daa606..06e2f1f419c 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -50,7 +50,8 @@ public:
 class TaskWorkerPool : public TaskWorkerPoolIf {
 public:
     TaskWorkerPool(std::string_view name, int worker_count,
-                   std::function<void(const TAgentTaskRequest&)> callback);
+                   std::function<void(const TAgentTaskRequest&)> callback,
+                   std::function<void(const TAgentTaskRequest&)> 
pre_submit_callback = nullptr);
 
     ~TaskWorkerPool() override;
 
@@ -62,6 +63,7 @@ protected:
     std::atomic_bool _stopped {false};
     std::unique_ptr<ThreadPool> _thread_pool;
     std::function<void(const TAgentTaskRequest&)> _callback;
+    std::function<void(const TAgentTaskRequest&)> _pre_submit_callback;
 };
 
 class PublishVersionWorkerPool final : public TaskWorkerPool {
@@ -180,6 +182,8 @@ void alter_tablet_callback(StorageEngine& engine, const 
TAgentTaskRequest& req);
 
 void alter_cloud_tablet_callback(CloudStorageEngine& engine, const 
TAgentTaskRequest& req);
 
+void set_alter_version_before_enqueue(CloudStorageEngine& engine, const 
TAgentTaskRequest& req);
+
 void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info,
                     const TAgentTaskRequest& req);
 
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp 
b/be/src/cloud/cloud_cumulative_compaction.cpp
index a6ad435ce63..bc37133e271 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -292,6 +292,22 @@ Status CloudCumulativeCompaction::modify_rowsets() {
         LOG(INFO) << 
"CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, exit";
     });
 
+    // Block only NOTREADY tablets (SC new tablets) before compaction commit.
+    // RUNNING tablets (system tables, base tablets) are not affected.
+    
DBUG_EXECUTE_IF("CloudCumulativeCompaction::modify_rowsets.block_notready", {
+        if (_tablet->tablet_state() == TABLET_NOTREADY) {
+            LOG(INFO) << "block NOTREADY tablet compaction before commit"
+                      << ", tablet_id=" << _tablet->tablet_id() << ", output=["
+                      << _input_rowsets.front()->start_version() << "-"
+                      << _input_rowsets.back()->end_version() << "]";
+            while (DebugPoints::instance()->is_enable(
+                    
"CloudCumulativeCompaction::modify_rowsets.block_notready")) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(50));
+            }
+            LOG(INFO) << "release NOTREADY tablet compaction, tablet_id=" << 
_tablet->tablet_id();
+        }
+    });
+
     DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
     int64_t initiator = this->initiator();
     int64_t get_delete_bitmap_lock_start_time = 0;
diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index 807422c1fce..6e5b9b8b9c8 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -110,6 +110,13 @@ Status CloudSchemaChangeJob::process_alter_tablet(const 
TAlterTabletReqV2& reque
     _output_cumulative_point = _base_tablet->cumulative_layer_point();
     std::vector<RowSetSplits> rs_splits;
     int64_t base_max_version = _base_tablet->max_version_unlocked();
+    
DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.override_base_max_version",
 {
+        auto v = dp->param<int64_t>("version", -1);
+        if (v > 0) {
+            LOG(INFO) << "override base_max_version from " << base_max_version 
<< " to " << v;
+            base_max_version = v;
+        }
+    });
     cloud::TabletJobInfoPB job;
     auto* idx = job.mutable_idx();
     idx->set_tablet_id(_base_tablet->tablet_id());
@@ -147,6 +154,32 @@ Status CloudSchemaChangeJob::process_alter_tablet(const 
TAlterTabletReqV2& reque
         LOG(WARNING) << "inject error. res=" << res;
         return res;
     });
+
+    // Check for cross-V1 compaction rowsets on new tablet.
+    // During queue wait, compaction may have committed a rowset that crosses 
the
+    // alter_version boundary (V1). This happens when compaction commits before
+    // prepare_tablet_job registers the SC job in meta-service.
+    // If such a rowset exists, SC commit would create version overlap, so we
+    // fail early and let FE retry (with a higher V1 next time).
+    {
+        RETURN_IF_ERROR(_new_tablet->sync_rowsets());
+        std::shared_lock rlock(_new_tablet->get_header_lock());
+        for (auto& [v, rs] : _new_tablet->rowset_map()) {
+            if (v.first > 1 && v.first <= start_resp.alter_version() &&
+                v.second > start_resp.alter_version()) {
+                LOG(WARNING) << "cross-V1 compaction detected on new tablet"
+                             << ", tablet_id=" << _new_tablet->tablet_id() << 
", rowset=["
+                             << v.first << "-" << v.second << "]"
+                             << ", alter_version=" << 
start_resp.alter_version()
+                             << ", job_id=" << _job_id << ". Will retry with 
higher alter_version.";
+                return Status::Error<ErrorCode::INTERNAL_ERROR>(
+                        "cross-V1 compaction detected on new tablet, 
tablet_id={}, "
+                        "rowset=[{}-{}], alter_version={}",
+                        _new_tablet->tablet_id(), v.first, v.second, 
start_resp.alter_version());
+            }
+        }
+    }
+
     if (request.alter_version > 1) {
         // [0-1] is a placeholder rowset, no need to convert
         RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, 
start_resp.alter_version()},
@@ -155,6 +188,10 @@ Status CloudSchemaChangeJob::process_alter_tablet(const 
TAlterTabletReqV2& reque
                                                           
.enable_prefer_cached_rowset = false,
                                                           
.query_freshness_tolerance_ms = -1}));
     }
+    // Between prepare_tablet_job (SC job registered in meta-service) and
+    // set_alter_version (local alter_version update). Used to test cross-V1 
race.
+    
DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.after_prepare_job", 
DBUG_BLOCK);
+
     Defer defer2 {[&]() {
         _new_tablet->set_alter_version(-1);
         _base_tablet->set_alter_version(-1);
diff --git a/be/test/agent/task_worker_pool_test.cpp 
b/be/test/agent/task_worker_pool_test.cpp
index 3b5c8ff3df0..9cd7ddd640d 100644
--- a/be/test/agent/task_worker_pool_test.cpp
+++ b/be/test/agent/task_worker_pool_test.cpp
@@ -54,6 +54,55 @@ TEST(TaskWorkerPoolTest, TaskWorkerPool) {
     EXPECT_EQ(count.load(), 2);
 }
 
+TEST(TaskWorkerPoolTest, PreSubmitCallback) {
+    std::atomic_int callback_count {0};
+    std::atomic_int pre_submit_count {0};
+    TaskWorkerPool workers(
+            "test", 1,
+            [&](auto&& task) {
+                std::this_thread::sleep_for(200ms);
+                ++callback_count;
+            },
+            [&](auto&& task) { ++pre_submit_count; });
+
+    TAgentTaskRequest task;
+    task.__set_signature(-1);
+    auto _ = workers.submit_task(task);
+    _ = workers.submit_task(task);
+
+    // pre_submit_callback is called synchronously before enqueue
+    EXPECT_EQ(pre_submit_count.load(), 2);
+
+    std::this_thread::sleep_for(600ms);
+    workers.stop();
+    EXPECT_EQ(callback_count.load(), 2);
+    EXPECT_EQ(pre_submit_count.load(), 2);
+}
+
+TEST(TaskWorkerPoolTest, PreSubmitCallbackWithDedup) {
+    std::atomic_int pre_submit_count {0};
+    std::atomic_int callback_count {0};
+    TaskWorkerPool workers(
+            "test", 1,
+            [&](auto&& task) {
+                std::this_thread::sleep_for(500ms);
+                ++callback_count;
+            },
+            [&](auto&& task) { ++pre_submit_count; });
+
+    TAgentTaskRequest task;
+    task.__set_task_type(TTaskType::ALTER);
+    task.__set_signature(12345);
+    auto _ = workers.submit_task(task);
+    _ = workers.submit_task(task); // Should be deduped by register_task_info
+
+    EXPECT_EQ(pre_submit_count.load(), 1); // Only called once, second was 
deduped
+
+    std::this_thread::sleep_for(600ms);
+    workers.stop();
+    EXPECT_EQ(callback_count.load(), 1);
+}
+
 TEST(TaskWorkerPoolTest, PriorTaskWorkerPool) {
     std::atomic_int normal_count {0};
     std::atomic_int high_prior_count {0};
diff --git 
a/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_race.groovy
 
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_race.groovy
new file mode 100644
index 00000000000..a34dd18e590
--- /dev/null
+++ 
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_race.groovy
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test: Reproduce cross-V1 compaction race that causes BE crash.
+//
+// Timeline:
+//   SC blocked → compaction commits [5-10] on new tablet → SC runs with V1=6 
(override)
+//   → SC commit replaces [2,6] but [5-10] not deleted (crosses V1) → version 
overlap → BE crash
+//
+// This simulates the multi-BE scenario where the SC-executing BE has a stale
+// base tablet version, causing V1 to be lower than compaction output max.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_sc_compaction_cross_v1_race', 'docker') {
+
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.enableDebugPoints()
+    options.beConfigs += ["enable_java_support=false"]
+    options.beConfigs += ["enable_new_tablet_do_compaction=true"]
+    options.beConfigs += ["alter_tablet_worker_count=1"]
+    options.beConfigs += ["cumulative_compaction_min_deltas=2"]
+    options.beNum = 1
+
+    docker(options) {
+        def tableName = "sc_cross_v1_test"
+
+        def getJobState = { tbl ->
+            def result = sql """SHOW ALTER TABLE COLUMN WHERE 
IndexName='${tbl}' ORDER BY createtime DESC LIMIT 1"""
+            logger.info("getJobState: ${result}")
+            return result[0][9]
+        }
+
+        sql "DROP TABLE IF EXISTS ${tableName}"
+        sql """
+            CREATE TABLE ${tableName} (
+                k1 int NOT NULL,
+                v1 varchar(100) NOT NULL,
+                v2 int NOT NULL
+            )
+            DISTRIBUTED BY HASH(k1) BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "1"
+            )
+        """
+
+        // Phase 1: Insert initial data (versions 2, 3, 4)
+        for (int i = 0; i < 3; i++) {
+            StringBuilder sb = new StringBuilder()
+            sb.append("INSERT INTO ${tableName} VALUES ")
+            for (int j = 0; j < 20; j++) {
+                if (j > 0) sb.append(", ")
+                def key = i * 20 + j + 1
+                sb.append("(${key}, 'val_${key}', ${key * 10})")
+            }
+            sql sb.toString()
+        }
+        assertEquals(60L, (sql "SELECT count(*) FROM ${tableName}")[0][0])
+
+        // Phase 2: Block SC at entry, let compaction run freely during block
+        def scBlock = 'CloudSchemaChangeJob::process_alter_tablet.block'
+        GetDebugPoint().enableDebugPointForAllBEs(scBlock)
+
+        try {
+            sql "ALTER TABLE ${tableName} MODIFY COLUMN v2 bigint"
+            sleep(10000)
+            assertEquals("RUNNING", getJobState(tableName))
+
+            // Phase 3: Insert 6 batches (versions 5-10), compaction runs 
freely
+            for (int i = 0; i < 6; i++) {
+                StringBuilder sb = new StringBuilder()
+                sb.append("INSERT INTO ${tableName} VALUES ")
+                for (int j = 0; j < 10; j++) {
+                    if (j > 0) sb.append(", ")
+                    def key = 100 + i * 10 + j + 1
+                    sb.append("(${key}, 'new_${key}', ${key * 10})")
+                }
+                sql sb.toString()
+            }
+
+            // Phase 4: Wait for compaction to merge [5-10] on new tablet
+            sleep(30000)
+
+            // Phase 5: Override V1=6, then release SC
+            // Compaction [5-10] already committed to meta-service (no SC job 
yet → success)
+            // SC will run with V1=6 → SC commit replaces [2,6] → [5-10] not 
deleted → overlap
+            GetDebugPoint().enableDebugPointForAllBEs(
+                
'CloudSchemaChangeJob::process_alter_tablet.override_base_max_version',
+                [version: 6])
+
+        } finally {
+            GetDebugPoint().disableDebugPointForAllBEs(scBlock)
+        }
+
+        // Wait for SC to finish
+        int maxTries = 120
+        def finalState = ""
+        while (maxTries-- > 0) {
+            finalState = getJobState(tableName)
+            if (finalState == "FINISHED" || finalState == "CANCELLED") {
+                break
+            }
+            sleep(1000)
+        }
+
+        // Clean up debug point
+        GetDebugPoint().disableDebugPointForAllBEs(
+            
'CloudSchemaChangeJob::process_alter_tablet.override_base_max_version')
+
+        logger.info("SC final state: ${finalState}")
+
+        // Wait for potential BE crash from overlapping rowsets
+        sleep(15000)
+
+        // Verify BE is still alive
+        def backendsAfter = sql_return_maparray("show backends")
+        logger.info("BE alive status after SC: ${backendsAfter.collect { 
it.Alive }}")
+        assertTrue(backendsAfter.every { it.Alive.toString() == "true" },
+            "BE crashed after SC due to cross-V1 compaction rowset overlap")
+    }
+}
diff --git 
a/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_optimization.groovy
 
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_optimization.groovy
new file mode 100644
index 00000000000..a2953806515
--- /dev/null
+++ 
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_optimization.groovy
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test: Verify that pre_submit_callback correctly sets alter_version on new 
tablets
+// so that auto cumulative compaction can compact double-write rowsets during 
SC.
+// Without the optimization, alter_version=-1 and auto compaction skips 
NOTREADY tablets.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_sc_compaction_optimization', 'docker') {
+
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.enableDebugPoints()
+    options.beConfigs += ["enable_java_support=false"]
+    options.beConfigs += ["enable_new_tablet_do_compaction=true"]
+    options.beConfigs += ["alter_tablet_worker_count=1"]
+    options.beConfigs += ["cumulative_compaction_min_deltas=2"]
+    options.beNum = 1
+
+    docker(options) {
+        def tableName = "sc_opt_test"
+
+        def getJobState = { tbl ->
+            def result = sql """SHOW ALTER TABLE COLUMN WHERE 
IndexName='${tbl}' ORDER BY createtime DESC LIMIT 1"""
+            logger.info("getJobState: ${result}")
+            return result[0][9]
+        }
+
+        sql "DROP TABLE IF EXISTS ${tableName}"
+        sql """
+            CREATE TABLE ${tableName} (
+                k1 int NOT NULL,
+                v1 varchar(100) NOT NULL,
+                v2 int NOT NULL
+            )
+            DISTRIBUTED BY HASH(k1) BUCKETS 2
+            PROPERTIES (
+                "replication_num" = "1"
+            )
+        """
+
+        // Insert initial data
+        for (int i = 0; i < 3; i++) {
+            StringBuilder sb = new StringBuilder()
+            sb.append("INSERT INTO ${tableName} VALUES ")
+            for (int j = 0; j < 20; j++) {
+                if (j > 0) sb.append(", ")
+                def key = i * 20 + j + 1
+                sb.append("(${key}, 'val_${key}', ${key * 10})")
+            }
+            sql sb.toString()
+        }
+
+        assertEquals(60L, (sql "SELECT count(*) FROM ${tableName}")[0][0])
+
+        def baseTablets = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
+        assertEquals(2, baseTablets.size())
+        def baseTabletIds = baseTablets.collect { it.TabletId.toString() }
+
+        def backends = sql_return_maparray("show backends")
+        def be = backends[0]
+
+        // Block SC at the very beginning of process_alter_tablet, BEFORE 
prepare_tablet_job.
+        // This avoids meta-service tablet job lock which would block BE HTTP 
service.
+        def injectName = 'CloudSchemaChangeJob::process_alter_tablet.block'
+        GetDebugPoint().enableDebugPointForAllBEs(injectName)
+
+        try {
+            sql "ALTER TABLE ${tableName} MODIFY COLUMN v2 bigint"
+            sleep(10000)
+            assertEquals("RUNNING", getJobState(tableName))
+
+            def allTablets = sql_return_maparray("SHOW TABLETS FROM 
${tableName}")
+            assertEquals(4, allTablets.size())
+            def newTablets = allTablets.findAll { !(it.TabletId.toString() in 
baseTabletIds) }
+            assertEquals(2, newTablets.size())
+
+            // Insert 6 batches during SC -> creates double-write rowsets on 
new tablets
+            for (int i = 0; i < 6; i++) {
+                StringBuilder sb = new StringBuilder()
+                sb.append("INSERT INTO ${tableName} VALUES ")
+                for (int j = 0; j < 10; j++) {
+                    if (j > 0) sb.append(", ")
+                    def key = 100 + i * 10 + j + 1
+                    sb.append("(${key}, 'new_${key}', ${key * 10})")
+                }
+                sql sb.toString()
+            }
+
+            // Wait for auto compaction to sync double-write rowsets and 
compact them
+            sleep(30000)
+
+            // Verify compaction happened: check if any non-placeholder rowset 
spans multiple
+            // versions (e.g. [4-6]). Each INSERT creates a single-version 
rowset [v-v],
+            // so a multi-version rowset is direct proof of compaction.
+            boolean compactionHappened = false
+            for (def tablet : newTablets) {
+                def tabletId = tablet.TabletId.toString()
+                def (code, out, err) = curl("GET", tablet.CompactionStatus)
+                if (code == 0) {
+                    def status = parseJson(out.trim())
+                    if (status.rowsets instanceof List) {
+                        logger.info("New tablet ${tabletId} rowsets: 
${status.rowsets}")
+                        for (def rowset : status.rowsets) {
+                            def match = (rowset =~ /\[(\d+)-(\d+)\]/)
+                            if (match) {
+                                def start = match[0][1] as int
+                                def end = match[0][2] as int
+                                if (start > 1 && end > start) {
+                                    logger.info("New tablet ${tabletId} has 
merged rowset [${start}-${end}], compaction confirmed")
+                                    compactionHappened = true
+                                    break
+                                }
+                            }
+                        }
+                    }
+                }
+                if (compactionHappened) break
+            }
+            assertTrue(compactionHappened, "Expected auto compaction on new 
tablets during SC queue wait")
+
+        } finally {
+            GetDebugPoint().disableDebugPointForAllBEs(injectName)
+        }
+
+        // Wait for SC to finish
+        int maxTries = 300
+        def finalState = ""
+        while (maxTries-- > 0) {
+            finalState = getJobState(tableName)
+            if (finalState == "FINISHED" || finalState == "CANCELLED") {
+                sleep(10000)  // Wait 10s for BE to fully recover after SC
+                break
+            }
+            sleep(1000)
+        }
+        assertEquals("FINISHED", finalState)
+
+        // Verify data correctness after SC
+        assertEquals(120L, (sql "SELECT count(*) FROM ${tableName}")[0][0])
+        assertEquals(120L, (sql "SELECT count(distinct k1) FROM 
${tableName}")[0][0])
+
+    }
+}
diff --git 
a/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_optimization_with_load.groovy
 
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_optimization_with_load.groovy
new file mode 100644
index 00000000000..31361fc1723
--- /dev/null
+++ 
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_optimization_with_load.groovy
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test: End-to-end correctness with SC compaction optimization.
+// Verifies:
+// 1. SC completes successfully with concurrent writes and auto compaction on 
new tablets
+// 2. alter_version cleanup works — post-SC compaction runs normally
+// 3. Data consistency after SC + compaction + continued loading
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_sc_compaction_optimization_with_load', 'docker') {
+
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.enableDebugPoints()
+    options.beConfigs += ["enable_java_support=false"]
+    options.beConfigs += ["enable_new_tablet_do_compaction=true"]
+    options.beConfigs += ["alter_tablet_worker_count=1"]
+    options.beConfigs += ["cumulative_compaction_min_deltas=2"]
+    options.beNum = 1
+
+    docker(options) {
+        def tableName = "sc_opt_load_test"
+
+        def getJobState = { tbl ->
+            def result = sql """SHOW ALTER TABLE COLUMN WHERE 
IndexName='${tbl}' ORDER BY createtime DESC LIMIT 1"""
+            logger.info("getJobState: ${result}")
+            return result[0][9]
+        }
+
+        def insertBatch = { int startKey, int count, String prefix ->
+            StringBuilder sb = new StringBuilder()
+            sb.append("INSERT INTO ${tableName} VALUES ")
+            for (int j = 0; j < count; j++) {
+                if (j > 0) sb.append(", ")
+                def key = startKey + j
+                sb.append("(${key}, '${prefix}_k2_${key}', ${key}, 
'${prefix}_v2_${key}')")
+            }
+            sql sb.toString()
+        }
+
+        sql "DROP TABLE IF EXISTS ${tableName}"
+        sql """
+            CREATE TABLE ${tableName} (
+                k1 int NOT NULL,
+                k2 varchar(50) NOT NULL,
+                v1 int NOT NULL,
+                v2 varchar(200) NOT NULL
+            )
+            DISTRIBUTED BY HASH(k1) BUCKETS 3
+            PROPERTIES (
+                "replication_num" = "1"
+            )
+        """
+
+        // Phase 1: Load initial data
+        for (int i = 0; i < 5; i++) {
+            insertBatch(i * 30 + 1, 30, "init_${i}")
+        }
+        assertEquals(150L, (sql "SELECT count(*) FROM ${tableName}")[0][0])
+
+        def baseTablets = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
+        assertEquals(3, baseTablets.size())
+        def baseTabletIds = baseTablets.collect { it.TabletId.toString() }
+
+        def backends = sql_return_maparray("show backends")
+        def be = backends[0]
+
+        // Block SC at the very beginning of process_alter_tablet, BEFORE 
prepare_tablet_job.
+        // This avoids meta-service tablet job lock which would block BE HTTP 
service.
+        def injectName = 'CloudSchemaChangeJob::process_alter_tablet.block'
+        GetDebugPoint().enableDebugPointForAllBEs(injectName)
+
+        try {
+            sql "ALTER TABLE ${tableName} MODIFY COLUMN v1 bigint"
+            sleep(10000)
+            assertEquals("RUNNING", getJobState(tableName))
+
+            // Phase 3: Heavy loading during SC
+            for (int i = 0; i < 8; i++) {
+                insertBatch(200 + i * 20, 20, "sc_${i}")
+            }
+
+            def allTablets = sql_return_maparray("SHOW TABLETS FROM 
${tableName}")
+            assertEquals(6, allTablets.size())
+            def newTablets = allTablets.findAll { !(it.TabletId.toString() in 
baseTabletIds) }
+            assertEquals(3, newTablets.size())
+
+            // Wait for auto compaction to sync double-write rowsets and 
compact them
+            sleep(30000)
+
+            // Verify compaction happened: check if any non-placeholder rowset 
spans multiple
+            // versions (e.g. [4-6]). Each INSERT creates a single-version 
rowset [v-v],
+            // so a multi-version rowset is direct proof of compaction.
+            boolean compactionHappened = false
+            for (def tablet : newTablets) {
+                def tabletId = tablet.TabletId.toString()
+                def (code, out, err) = curl("GET", tablet.CompactionStatus)
+                if (code == 0) {
+                    def status = parseJson(out.trim())
+                    if (status.rowsets instanceof List) {
+                        logger.info("New tablet ${tabletId} rowsets: 
${status.rowsets}")
+                        for (def rowset : status.rowsets) {
+                            def match = (rowset =~ /\[(\d+)-(\d+)\]/)
+                            if (match) {
+                                def start = match[0][1] as int
+                                def end = match[0][2] as int
+                                if (start > 1 && end > start) {
+                                    logger.info("New tablet ${tabletId} has 
merged rowset [${start}-${end}], compaction confirmed")
+                                    compactionHappened = true
+                                    break
+                                }
+                            }
+                        }
+                    }
+                }
+                if (compactionHappened) break
+            }
+            assertTrue(compactionHappened, "Expected auto compaction on new 
tablets during SC queue wait")
+
+        } finally {
+            GetDebugPoint().disableDebugPointForAllBEs(injectName)
+        }
+
+        // Phase 4: Wait for SC completion
+        int maxTries = 300
+        def finalState = ""
+        while (maxTries-- > 0) {
+            finalState = getJobState(tableName)
+            if (finalState == "FINISHED" || finalState == "CANCELLED") {
+                sleep(10000)  // Wait 10s for BE to fully recover
+                break
+            }
+            sleep(1000)
+        }
+        assertEquals("FINISHED", finalState)
+
+        // Phase 5: Verify data correctness and compaction
+        assertEquals(310L, (sql "SELECT count(*) FROM ${tableName}")[0][0])
+
+        // Verify column type changed
+        def schema = sql "DESC ${tableName}"
+        def v1Type = schema.find { it[0] == "v1" }[1]
+        assertEquals("bigint", v1Type.toLowerCase())
+
+        // Phase 6: Post-SC loading (verify alter_version cleanup)
+        for (int i = 0; i < 3; i++) {
+            insertBatch(500 + i * 10, 10, "post_${i}")
+        }
+        // Phase 6: Verify data correctness and schema change
+        def expectedCount = 150 + 160 + 30  // initial(5*30) + SC 
inserts(8*20) + post-SC(3*10)
+        assertEquals(expectedCount, (sql "SELECT count(*) FROM 
${tableName}")[0][0])
+        assertEquals(expectedCount, (sql "SELECT count(distinct k1) FROM 
${tableName}")[0][0])
+
+        def desc = sql "DESC ${tableName}"
+        def v1Col = desc.find { it[0] == "v1" }
+        assertTrue(v1Col[1].toString().toLowerCase().contains("bigint"))
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to