This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 0f22ae9b27c [opt](filecache) schema change adaptive to file cache 
(#58620)
0f22ae9b27c is described below

commit 0f22ae9b27c75d2cfee56a781cfb4162c982f586
Author: Haolin Guan <[email protected]>
AuthorDate: Fri Dec 12 09:27:30 2025 +0800

    [opt](filecache) schema change adaptive to file cache (#58620)
    
    Cherry-pick from #57470
---
 be/src/cloud/cloud_cumulative_compaction.cpp       |   2 +-
 be/src/cloud/cloud_schema_change_job.cpp           |  48 ++++-
 be/src/cloud/cloud_schema_change_job.h             |   3 +-
 be/src/cloud/cloud_tablet.cpp                      |   2 +-
 be/src/common/config.cpp                           |   4 +
 be/src/common/config.h                             |   2 +
 be/src/olap/schema_change.h                        |   1 +
 .../test_filecache_with_alter_table.groovy         | 234 +++++++++++++++++++++
 8 files changed, 289 insertions(+), 7 deletions(-)

diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp 
b/be/src/cloud/cloud_cumulative_compaction.cpp
index 4c587ea3bca..0398f24a5ad 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -375,7 +375,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
         if (_input_rowsets.size() == 1) {
             DCHECK_EQ(_output_rowset->version(), _input_rowsets[0]->version());
             // MUST NOT move input rowset to stale path
-            cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock);
+            cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock, true);
         } else {
             cloud_tablet()->delete_rowsets(_input_rowsets, wrlock);
             cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock);
diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index df59a78a583..c6ac0b6cb98 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -32,6 +32,7 @@
 #include "olap/delete_handler.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 #include "olap/storage_engine.h"
@@ -217,6 +218,13 @@ Status CloudSchemaChangeJob::process_alter_tablet(const 
TAlterTabletReqV2& reque
 
     SchemaChangeParams sc_params;
 
+    // cache schema change output to file cache
+    std::vector<RowsetSharedPtr> rowsets;
+    rowsets.resize(rs_splits.size());
+    std::transform(rs_splits.begin(), rs_splits.end(), rowsets.begin(),
+                   [](RowSetSplits& split) { return split.rs_reader->rowset(); 
});
+    sc_params.output_to_file_cache = _should_cache_sc_output(rowsets);
+
     RETURN_IF_ERROR(DescriptorTbl::create(&sc_params.pool, request.desc_tbl, 
&sc_params.desc_tbl));
     sc_params.ref_rowset_readers.reserve(rs_splits.size());
     for (RowSetSplits& split : rs_splits) {
@@ -309,6 +317,8 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
         context.tablet_schema = _new_tablet->tablet_schema();
         context.newest_write_timestamp = rs_reader->newest_write_timestamp();
         context.storage_resource = 
_cloud_storage_engine.get_storage_resource(sc_params.vault_id);
+        context.write_file_cache = sc_params.output_to_file_cache;
+        context.tablet = _new_tablet;
         if (!context.storage_resource) {
             return Status::InternalError("vault id not found, maybe not sync, 
vault id {}",
                                          sc_params.vault_id);
@@ -467,7 +477,7 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
         // during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in 
another thread
         std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
         std::unique_lock wlock(_new_tablet->get_header_lock());
-        _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock);
+        _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, 
false);
         _new_tablet->set_cumulative_layer_point(_output_cumulative_point);
         _new_tablet->reset_approximate_stats(stats.num_rowsets(), 
stats.num_segments(),
                                              stats.num_rows(), 
stats.data_size());
@@ -503,7 +513,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t 
alter_version,
             std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
     {
         std::unique_lock wlock(tmp_tablet->get_header_lock());
-        tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
+        tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
         // Set alter version to let the tmp_tablet can fill hole rowset 
greater than alter_version
         tmp_tablet->set_alter_version(alter_version);
     }
@@ -521,7 +531,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t 
alter_version,
                         DBUG_BLOCK);
         {
             std::unique_lock wlock(tmp_tablet->get_header_lock());
-            tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
+            tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
         }
         for (auto rowset : ret.rowsets) {
             
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, 
rowset));
@@ -544,7 +554,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t 
alter_version,
                 {max_version + 1, new_max_version}, CaptureRowsetOps {}));
         {
             std::unique_lock wlock(tmp_tablet->get_header_lock());
-            tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
+            tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
         }
         for (auto rowset : ret.rowsets) {
             
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, 
rowset));
@@ -595,4 +605,34 @@ void CloudSchemaChangeJob::clean_up_on_failure() {
     }
 }
 
+bool CloudSchemaChangeJob::_should_cache_sc_output(
+        const std::vector<RowsetSharedPtr>& input_rowsets) {
+    int64_t total_size = 0;
+    int64_t cached_index_size = 0;
+    int64_t cached_data_size = 0;
+
+    for (const auto& rs : input_rowsets) {
+        const RowsetMetaSharedPtr& rs_meta = rs->rowset_meta();
+        total_size += rs_meta->total_disk_size();
+        cached_index_size += rs->approximate_cache_index_size();
+        cached_data_size += rs->approximate_cached_data_size();
+    }
+
+    double input_hit_rate = static_cast<double>(cached_index_size + 
cached_data_size) / total_size;
+
+    LOG(INFO) << "CloudSchemaChangeJob check cache sc output strategy. "
+              << "job_id=" << _job_id << ", input_rowsets_count=" << 
input_rowsets.size()
+              << ", total_size=" << total_size << ", cached_index_size=" << 
cached_index_size
+              << ", cached_data_size=" << cached_data_size << ", 
input_hit_rate=" << input_hit_rate
+              << ", min_hit_ratio_threshold="
+              << config::file_cache_keep_schema_change_output_min_hit_ratio << 
", should_cache="
+              << (input_hit_rate > 
config::file_cache_keep_schema_change_output_min_hit_ratio);
+
+    if (input_hit_rate > 
config::file_cache_keep_schema_change_output_min_hit_ratio) {
+        return true;
+    }
+
+    return false;
+}
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_schema_change_job.h 
b/be/src/cloud/cloud_schema_change_job.h
index 7d84279dd1d..0132d1f506a 100644
--- a/be/src/cloud/cloud_schema_change_job.h
+++ b/be/src/cloud/cloud_schema_change_job.h
@@ -39,13 +39,14 @@ public:
     void clean_up_on_failure();
 
 private:
+    bool _should_cache_sc_output(const std::vector<RowsetSharedPtr>& 
input_rowsets);
+
     Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
                                        cloud::TabletJobInfoPB& job);
 
     Status _process_delete_bitmap(int64_t alter_version, int64_t 
start_calc_delete_bitmap_version,
                                   int64_t initiator, const std::string& 
vault_id);
 
-private:
     CloudStorageEngine& _cloud_storage_engine;
     std::shared_ptr<CloudTablet> _base_tablet;
     std::shared_ptr<CloudTablet> _new_tablet;
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 913f8127bd0..ea5f52366ab 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -384,7 +384,7 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> 
to_add, bool version_
 
     auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>& 
rowsets) {
         for (auto& rs : rowsets) {
-            if (version_overlap || warmup_delta_data) {
+            if (warmup_delta_data) {
 #ifndef BE_TEST
                 bool warm_up_state_updated = false;
                 // Warmup rowset data in background
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b38c05e5cab..939b1faf856 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1136,6 +1136,10 @@ 
DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");
 DEFINE_mBool(enable_file_cache_adaptive_write, "true");
 DEFINE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio, "0.7");
 
+// if difference below this threshold, we consider cache's progressive 
upgrading (2.0->3.0) successful
+DEFINE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold, "0.3");
+DEFINE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio, "0.7");
+
 DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
 DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
 DEFINE_mInt64(file_cache_background_block_lru_update_interval_ms, "5000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 25c95391968..7133adcca37 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1175,6 +1175,8 @@ DECLARE_mInt64(cache_lock_held_long_tail_threshold_us);
 DECLARE_mBool(enable_file_cache_keep_base_compaction_output);
 DECLARE_mBool(enable_file_cache_adaptive_write);
 DECLARE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio);
+DECLARE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold);
+DECLARE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio);
 DECLARE_mInt64(file_cache_remove_block_qps_limit);
 DECLARE_mInt64(file_cache_background_gc_interval_ms);
 DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 175ea70c81d..3bc0fb0901d 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -280,6 +280,7 @@ struct SchemaChangeParams {
     ObjectPool pool;
     int32_t be_exec_version;
     std::string vault_id;
+    bool output_to_file_cache;
 };
 
 class SchemaChangeJob {
diff --git 
a/regression-test/suites/schema_change/test_filecache_with_alter_table.groovy 
b/regression-test/suites/schema_change/test_filecache_with_alter_table.groovy
new file mode 100644
index 00000000000..87c33562dc6
--- /dev/null
+++ 
b/regression-test/suites/schema_change/test_filecache_with_alter_table.groovy
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+import org.apache.doris.regression.util.OutputUtils
+
[email protected]
+class RowsetInfo {
+    int startVersion
+    int endVersion
+    String id
+    String originalString
+}
+
+suite("test_filecache_with_alter_table", "docker") {
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.setFeNum(1)
+    options.setBeNum(1)
+
+    options.beConfigs.add('enable_flush_file_cache_async=false')
+    
options.beConfigs.add('file_cache_enter_disk_resource_limit_mode_percent=99')
+    options.beConfigs.add('enable_evict_file_cache_in_advance=false')
+    
options.beConfigs.add('file_cache_path=[{"path":"/opt/apache-doris/be/storage/file_cache","total_size":83886080,"query_limit":83886080}]')
+
+    def baseTestTable = "test_filecache_with_alter_table"
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    def backendId_to_backendBrpcPort = [:]
+    def csvPathPrefix = "/tmp/temp_csv_data"
+    def loadBatchNum = 20
+
+    def generateCsvData = {
+        def rowsPerFile = 32768
+        def columnsPerRow = 4
+        def headers = 'col1,col2,col3,col4'
+
+        def dir = new File(csvPathPrefix)
+        if (!dir.exists()) {
+            dir.mkdirs()
+        } else {
+            dir.eachFile { it.delete() }
+        }
+
+        long currentNumber = 1L
+        (1..loadBatchNum).each { fileIndex ->
+            def fileName = String.format("${csvPathPrefix}/data_%02d.csv", 
fileIndex)
+            def csvFile = new File(fileName)
+
+            csvFile.withWriter('UTF-8') { writer ->
+                writer.writeLine(headers)
+                (1..rowsPerFile).each { rowIndex ->
+                    def row = (1..columnsPerRow).collect { currentNumber++ }
+                    writer.writeLine(row.join(','))
+                }
+            }
+        }
+        logger.info("Successfully generated ${loadBatchNum} CSV files in 
${csvPathPrefix}")
+    }
+
+    def getTabletStatus = { tablet ->
+        String tabletId = tablet.TabletId
+        String backendId = tablet.BackendId
+        def beHost = backendId_to_backendIP[backendId]
+        def beHttpPort = backendId_to_backendHttpPort[backendId]
+        
+        String command = "curl -s -X GET 
http://${beHost}:${beHttpPort}/api/compaction/show?tablet_id=${tabletId}";
+
+        logger.info("Executing: ${command}")
+        def process = command.execute()
+        def exitCode = process.waitFor()
+        def output = process.getText()
+        
+        logger.info("Get tablet status response: code=${exitCode}, 
out=${output}")
+        assertEquals(0, exitCode, "Failed to get tablet status.")
+        
+        return parseJson(output.trim())
+    }
+
+    def waitForAlterJobToFinish = { tableName, timeoutMillis ->
+        def pollInterval = 1000
+        def timeElapsed = 0
+        while (timeElapsed <= timeoutMillis) {
+            def alterResult = sql_return_maparray """SHOW ALTER TABLE COLUMN 
WHERE TableName = "${tableName}" ORDER BY CreateTime DESC LIMIT 1;"""
+            logger.info("Checking ALTER status for table '${tableName}': 
${alterResult}")
+            if (alterResult && alterResult[0].State == "FINISHED") {
+                sleep(3000)
+                logger.info("ALTER job on table '${tableName}' finished. 
Details: ${alterResult[0]}")
+                return
+            }
+            sleep(pollInterval)
+            timeElapsed += pollInterval
+        }
+        fail("Wait for ALTER job on table '${tableName}' to finish timed out 
after ${timeoutMillis}ms.")
+    }
+
+    def runSchemaChangeCacheTest = { String testTable, double inputCacheRatio, 
boolean expectOutputCached ->
+        
logger.info("==================================================================================")
+        logger.info("Running Test Case on Table '${testTable}': Input Cache 
Ratio = ${inputCacheRatio * 100}%, Expect Output Cached = 
${expectOutputCached}")
+        
logger.info("==================================================================================")
+
+        sql """ DROP TABLE IF EXISTS ${testTable} force;"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${testTable} (
+                col1 bigint,
+                col2 bigint,
+                col3 bigint,
+                col4 bigint
+            )
+            UNIQUE KEY(col1)
+            DISTRIBUTED BY HASH(col1) BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "1",
+                "disable_auto_compaction" = "true"
+            )
+        """
+        
+        (1..loadBatchNum).each { fileIndex ->
+            def fileName = String.format("${csvPathPrefix}/data_%02d.csv", 
fileIndex)
+            streamLoad {
+                logger.info("Stream loading file index ${fileIndex} into table 
${testTable}")
+                set "column_separator", ","
+                table testTable
+                file fileName
+                time 3000
+                check { res, exception, startTime, endTime ->
+                    if (exception != null) throw exception
+                    def json = parseJson(res)
+                    assertEquals("success", json.Status.toLowerCase())
+                }
+            }
+        }
+        sql """ SELECT COUNT(col1) from ${testTable} """
+
+        def tablets = sql_return_maparray "show tablets from ${testTable};"
+        assertEquals(1, tablets.size(), "Expected to find exactly one tablet.")
+        def tablet = tablets[0]
+        def beHost = backendId_to_backendIP[tablet.BackendId]
+        def beHttpPort = backendId_to_backendHttpPort[tablet.BackendId]
+
+        def tabletStatus = getTabletStatus(tablet)
+        List<RowsetInfo> originalRowsetInfos = tabletStatus["rowsets"].collect 
{ rowsetStr ->
+            def parts = rowsetStr.split(" ")
+            def versionParts = parts[0].replace('[', '').replace(']', 
'').split("-")
+            new RowsetInfo(
+                startVersion: versionParts[0].toInteger(),
+                endVersion: versionParts[1].toInteger(),
+                id: parts[4],
+                originalString: rowsetStr
+            )
+        }.findAll { it.startVersion != 0 }.sort { it.startVersion }
+
+        int numToClear = Math.round(originalRowsetInfos.size() * (1 - 
inputCacheRatio)).toInteger()
+        logger.info("Total data rowsets: ${originalRowsetInfos.size()}. 
Clearing cache for ${numToClear} rowsets to achieve ~${inputCacheRatio * 100}% 
hit ratio.")
+
+        originalRowsetInfos.take(numToClear).each { rowset ->
+            
Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=clear&sync=true&value=${rowset.id}_0.dat";,
 true)
+        }
+
+        def cachedInputRowsets = originalRowsetInfos.findAll { rowset ->
+            def data = 
Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=list_cache&value=${rowset.id}_0.dat";,
 true)
+            data.any { item -> !item.endsWith("_idx") && 
!item.endsWith("_disposable") }
+        }
+        
+        def actualCachedRatio = cachedInputRowsets.size() / 
(double)originalRowsetInfos.size()
+        logger.info("Verification: Cached input rowsets: 
${cachedInputRowsets.size()}. Actual cache ratio: ${actualCachedRatio * 100}%")
+        assertTrue(Math.abs(inputCacheRatio - actualCachedRatio) < 0.01, 
"Actual cache ratio does not match expected ratio.")
+
+        logger.info("Triggering ALTER TABLE on ${testTable}")
+        sql """ALTER TABLE ${testTable} MODIFY COLUMN col2 VARCHAR(255)"""
+        waitForAlterJobToFinish(testTable, 60000)
+
+        tablets = sql_return_maparray "show tablets from ${testTable};"
+        tablet = tablets[0]
+        tabletStatus = getTabletStatus(tablet)
+
+        def newRowsetInfos = tabletStatus["rowsets"].collect { rowsetStr ->
+            def parts = rowsetStr.split(" ")
+            def version_pair = parts[0].replace('[', '').replace(']', 
'').split('-')
+            new RowsetInfo(
+                startVersion: version_pair[0].toInteger(),
+                endVersion: version_pair[1].toInteger(),
+                id: parts[4],
+                originalString: rowsetStr
+            )
+        }.findAll { it.startVersion != 0 }.sort { it.startVersion }
+
+        def cachedOutputRowsets = newRowsetInfos.findAll { rowset ->
+            def data = 
Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=list_cache&value=${rowset.id}_0.dat";,
 true)
+            data.any { item -> !item.endsWith("_idx") && 
!item.endsWith("_disposable") }
+        }
+
+        logger.info("After ALTER, found ${cachedOutputRowsets.size()} cached 
output rowsets out of ${newRowsetInfos.size()}.")
+
+        if (expectOutputCached) {
+            assertTrue(cachedOutputRowsets.size() > 0, "Expected output 
rowsets to be cached, but none were found.")
+        } else {
+            assertEquals(0, cachedOutputRowsets.size(), "Expected output 
rowsets NOT to be cached, but some were found.")
+        }
+        logger.info("Test Case Passed: Input Ratio ${inputCacheRatio * 100}%, 
Output Cached Check: ${expectOutputCached}")
+        
+        sql """ DROP TABLE IF EXISTS ${testTable} force;"""
+    }
+    
+    docker(options) {
+        getBackendIpHttpAndBrpcPort(backendId_to_backendIP, 
backendId_to_backendHttpPort, backendId_to_backendBrpcPort);
+        
+        sql """ set global enable_auto_analyze = false;"""
+        
+        generateCsvData()
+
+        runSchemaChangeCacheTest("${baseTestTable}_0", 0.0, false)
+        runSchemaChangeCacheTest("${baseTestTable}_65", 0.65, false)
+        runSchemaChangeCacheTest("${baseTestTable}_75", 0.75, true)
+        runSchemaChangeCacheTest("${baseTestTable}_100", 1.0, true)
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to