This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2864924bdce [feat](compaction) Make base compaction output rowset 
write filecache more adaptive (#54694)
2864924bdce is described below

commit 2864924bdcef4fa20edf01ec39c81a138bf850ba
Author: Lei Zhang <[email protected]>
AuthorDate: Fri Sep 12 09:33:13 2025 +0800

    [feat](compaction) Make base compaction output rowset write filecache more 
adaptive (#54694)
    
    * when base compaction input rowsets (total_cache_size /
    total_data_size) > file_cache_keep_base_compaction_output_min_hit_ratio
          output rowsets will write to filecache
---
 be/src/cloud/cloud_base_compaction.cpp             |  10 +-
 be/src/common/config.cpp                           |   1 +
 be/src/common/config.h                             |   1 +
 be/src/io/cache/block_file_cache_factory.cpp       |  13 +
 be/src/io/cache/block_file_cache_factory.h         |   1 +
 be/src/io/fs/file_writer.h                         |   6 +
 be/src/olap/compaction.cpp                         |  43 +-
 be/src/olap/compaction.h                           |   5 +
 be/src/olap/rowset/rowset.cpp                      |  31 +
 be/src/olap/rowset/rowset.h                        |   4 +
 be/test/cloud/cloud_compaction_test.cpp            |  45 ++
 docker/runtime/doris-compose/cluster.py            |   4 +-
 .../test_filecache_with_base_compaction.groovy     |   8 +-
 ...ilecache_with_base_compaction_thresthold.groovy | 740 +++++++++++++++++++++
 14 files changed, 904 insertions(+), 8 deletions(-)

diff --git a/be/src/cloud/cloud_base_compaction.cpp 
b/be/src/cloud/cloud_base_compaction.cpp
index 79ebe1b3712..e3f7ae2f730 100644
--- a/be/src/cloud/cloud_base_compaction.cpp
+++ b/be/src/cloud/cloud_base_compaction.cpp
@@ -35,6 +35,8 @@ namespace doris {
 using namespace ErrorCode;
 
 bvar::Adder<uint64_t> base_output_size("base_compaction", "output_size");
+bvar::Adder<uint64_t> base_input_cached_size("base_compaction", 
"input_cached_size");
+bvar::Adder<uint64_t> base_input_size("base_compaction", "input_size");
 bvar::LatencyRecorder g_base_compaction_hold_delete_bitmap_lock_time_ms(
         "base_compaction_hold_delete_bitmap_lock_time_ms");
 
@@ -82,6 +84,8 @@ Status CloudBaseCompaction::prepare_compact() {
         _input_rowsets_data_size += rs->data_disk_size();
         _input_rowsets_index_size += rs->index_disk_size();
         _input_rowsets_total_size += rs->total_disk_size();
+        _input_rowsets_cached_data_size += rs->approximate_cached_data_size();
+        _input_rowsets_cached_index_size += rs->approximate_cache_index_size();
     }
     LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", 
_tablet->tablet_id(),
              _input_rowsets.front()->start_version(), 
_input_rowsets.back()->end_version())
@@ -91,7 +95,11 @@ Status CloudBaseCompaction::prepare_compact() {
             .tag("input_segments", _input_segments)
             .tag("input_rowsets_data_size", _input_rowsets_data_size)
             .tag("input_rowsets_index_size", _input_rowsets_index_size)
-            .tag("input_rowsets_total_size", _input_rowsets_total_size);
+            .tag("input_rowsets_total_size", _input_rowsets_total_size)
+            .tag("input_rowsets_cached_data_size", 
_input_rowsets_cached_data_size)
+            .tag("input_rowsets_cached_index_size", 
_input_rowsets_cached_index_size);
+    base_input_cached_size << (_input_rowsets_cached_data_size + 
_input_rowsets_cached_index_size);
+    base_input_size << _input_rowsets_total_size;
     return Status::OK();
 }
 
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f777d25c7f5..bbfa8b3ce12 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1141,6 +1141,7 @@ DEFINE_mInt64(cache_lock_held_long_tail_threshold_us, 
"30000000");
 // will write to file cache; satisfying any of the two conditions will write 
to file cache.
 DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");
 DEFINE_mBool(enable_file_cache_adaptive_write, "true");
+DEFINE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio, "0.7");
 
 DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
 DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 01ebefd833b..93b765d477a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1164,6 +1164,7 @@ DECLARE_mInt64(cache_lock_held_long_tail_threshold_us);
 // enable this option; otherwise, it is recommended to leave it disabled.
 DECLARE_mBool(enable_file_cache_keep_base_compaction_output);
 DECLARE_mBool(enable_file_cache_adaptive_write);
+DECLARE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio);
 DECLARE_mInt64(file_cache_remove_block_qps_limit);
 DECLARE_mInt64(file_cache_background_gc_interval_ms);
 DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);
diff --git a/be/src/io/cache/block_file_cache_factory.cpp 
b/be/src/io/cache/block_file_cache_factory.cpp
index 4112902aa2b..c9631e35dc4 100644
--- a/be/src/io/cache/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block_file_cache_factory.cpp
@@ -133,6 +133,19 @@ std::vector<std::string> 
FileCacheFactory::get_cache_file_by_path(const UInt128W
     return ret;
 }
 
+int64_t FileCacheFactory::get_cache_file_size_by_path(const UInt128Wrapper& 
hash) {
+    io::BlockFileCache* cache = 
io::FileCacheFactory::instance()->get_by_path(hash);
+    auto blocks = cache->get_blocks_by_key(hash);
+    if (blocks.empty()) {
+        return 0;
+    }
+    int64_t cache_size = 0;
+    for (auto& [_, fb] : blocks) {
+        cache_size += fb->range().size();
+    }
+    return cache_size;
+}
+
 BlockFileCache* FileCacheFactory::get_by_path(const UInt128Wrapper& key) {
     // dont need lock mutex because _caches is immutable after 
create_file_cache
     return _caches[KeyHash()(key) % _caches.size()].get();
diff --git a/be/src/io/cache/block_file_cache_factory.h 
b/be/src/io/cache/block_file_cache_factory.h
index b00bd7bdfcb..4366afc72eb 100644
--- a/be/src/io/cache/block_file_cache_factory.h
+++ b/be/src/io/cache/block_file_cache_factory.h
@@ -63,6 +63,7 @@ public:
     [[nodiscard]] size_t get_cache_instance_size() const { return 
_caches.size(); }
 
     std::vector<std::string> get_cache_file_by_path(const UInt128Wrapper& 
hash);
+    int64_t get_cache_file_size_by_path(const UInt128Wrapper& hash);
 
     BlockFileCache* get_by_path(const UInt128Wrapper& hash);
     BlockFileCache* get_by_path(const std::string& cache_base_path);
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 6220d1c9210..74ab7b57880 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -101,6 +101,12 @@ protected:
                                            
(file_cache_ptr->approximate_available_cache_size() >
                                             opts->approximate_bytes_to_write);
 
+        VLOG_DEBUG << "path:" << path.filename().native()
+                   << ", write_file_cache:" << opts->write_file_cache
+                   << ", has_enough_file_cache_space:" << 
has_enough_file_cache_space
+                   << ", approximate_bytes_to_write:" << 
opts->approximate_bytes_to_write
+                   << ", file_cache_available_size:"
+                   << file_cache_ptr->approximate_available_cache_size();
         if (opts->write_file_cache || has_enough_file_cache_space) {
             _cache_builder = 
std::make_unique<FileCacheAllocatorBuilder>(FileCacheAllocatorBuilder {
                     opts ? opts->is_cold_data : false, opts ? 
opts->file_cache_expiration : 0,
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 9dd8dd5d2f8..0458be237f0 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -86,7 +86,6 @@ using std::vector;
 
 namespace doris {
 using namespace ErrorCode;
-
 namespace {
 #include "common/compile_check_begin.h"
 
@@ -1617,9 +1616,8 @@ Status 
CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
     // We presume that the data involved in cumulative compaction is 
sufficiently 'hot'
     // and should always be retained in the cache.
     // TODO(gavin): Ensure that the retention of hot data is implemented with 
precision.
-    ctx.write_file_cache = (compaction_type() == 
ReaderType::READER_CUMULATIVE_COMPACTION) ||
-                           
(config::enable_file_cache_keep_base_compaction_output &&
-                            compaction_type() == 
ReaderType::READER_BASE_COMPACTION);
+
+    ctx.write_file_cache = should_cache_compaction_output();
     ctx.file_cache_ttl_sec = _tablet->ttl_seconds();
     ctx.approximate_bytes_to_write = _input_rowsets_total_size;
 
@@ -1674,5 +1672,42 @@ int64_t CloudCompactionMixin::num_input_rowsets() const {
     return count;
 }
 
+bool CloudCompactionMixin::should_cache_compaction_output() {
+    if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
+        return true;
+    }
+
+    if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
+        double input_rowsets_hit_cache_ratio = 0.0;
+
+        int64_t _input_rowsets_cached_size =
+                _input_rowsets_cached_data_size + 
_input_rowsets_cached_index_size;
+        if (_input_rowsets_total_size > 0) {
+            input_rowsets_hit_cache_ratio =
+                    double(_input_rowsets_cached_size) / 
double(_input_rowsets_total_size);
+        }
+
+        LOG(INFO) << "CloudBaseCompaction should_cache_compaction_output"
+                  << ", tablet_id=" << _tablet->tablet_id()
+                  << ", input_rowsets_hit_cache_ratio=" << 
input_rowsets_hit_cache_ratio
+                  << ", _input_rowsets_cached_size=" << 
_input_rowsets_cached_size
+                  << ", _input_rowsets_total_size=" << 
_input_rowsets_total_size
+                  << ", enable_file_cache_keep_base_compaction_output="
+                  << config::enable_file_cache_keep_base_compaction_output
+                  << ", file_cache_keep_base_compaction_output_min_hit_ratio="
+                  << 
config::file_cache_keep_base_compaction_output_min_hit_ratio;
+
+        if (config::enable_file_cache_keep_base_compaction_output) {
+            return true;
+        }
+
+        if (input_rowsets_hit_cache_ratio >
+            config::file_cache_keep_base_compaction_output_min_hit_ratio) {
+            return true;
+        }
+    }
+    return false;
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 57ce98bef15..fa111590933 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -125,6 +125,9 @@ protected:
     int64_t _local_read_bytes_total {};
     int64_t _remote_read_bytes_total {};
 
+    int64_t _input_rowsets_cached_data_size {0};
+    int64_t _input_rowsets_cached_index_size {0};
+
     Merger::Statistics _stats;
 
     RowsetSharedPtr _output_rowset;
@@ -247,6 +250,8 @@ private:
     int64_t get_compaction_permits();
 
     void update_compaction_level();
+
+    bool should_cache_compaction_output();
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index a810a2ab98a..c318cce9a42 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -203,6 +203,37 @@ std::vector<std::string> Rowset::get_index_file_names() {
     return file_names;
 }
 
+int64_t Rowset::approximate_cached_data_size() {
+    if (!config::enable_file_cache) {
+        return 0;
+    }
+
+    int64_t total_cache_size = 0;
+    for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
+        auto cache_key = 
segment_v2::Segment::file_cache_key(rowset_id().to_string(), seg_id);
+        int64_t cache_size =
+                
io::FileCacheFactory::instance()->get_cache_file_size_by_path(cache_key);
+        total_cache_size += cache_size;
+    }
+    return total_cache_size;
+}
+
+int64_t Rowset::approximate_cache_index_size() {
+    if (!config::enable_file_cache) {
+        return 0;
+    }
+
+    int64_t total_cache_size = 0;
+    auto file_names = get_index_file_names();
+    for (const auto& file_name : file_names) {
+        auto cache_key = io::BlockFileCache::hash(file_name);
+        int64_t cache_size =
+                
io::FileCacheFactory::instance()->get_cache_file_size_by_path(cache_key);
+        total_cache_size += cache_size;
+    }
+    return total_cache_size;
+}
+
 #include "common/compile_check_end.h"
 
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 6b105ed37e8..ae68eb0bcf4 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -323,6 +323,10 @@ public:
     // set the rowset as a hole rowset
     void set_hole_rowset(bool is_hole_rowset) { _is_hole_rowset = 
is_hole_rowset; }
 
+    int64_t approximate_cached_data_size();
+
+    int64_t approximate_cache_index_size();
+
 protected:
     friend class RowsetFactory;
 
diff --git a/be/test/cloud/cloud_compaction_test.cpp 
b/be/test/cloud/cloud_compaction_test.cpp
index 67ba84021ac..1da47c17195 100644
--- a/be/test/cloud/cloud_compaction_test.cpp
+++ b/be/test/cloud/cloud_compaction_test.cpp
@@ -23,6 +23,7 @@
 
 #include <memory>
 
+#include "cloud/cloud_base_compaction.h"
 #include "cloud/cloud_storage_engine.h"
 #include "cloud/cloud_tablet.h"
 #include "cloud/cloud_tablet_mgr.h"
@@ -380,4 +381,48 @@ TEST_F(CloudCompactionTest, 
test_set_storage_resource_from_input_rowsets) {
         ASSERT_FALSE(ctx.storage_resource.has_value());
     }
 }
+TEST_F(CloudCompactionTest, should_cache_compaction_output) {
+    CloudTabletSPtr tablet = std::make_shared<CloudTablet>(_engine, 
std::make_shared<TabletMeta>());
+    CloudBaseCompaction cloud_base_compaction(_engine, tablet);
+    cloud_base_compaction._input_rowsets_total_size = 0;
+    cloud_base_compaction._input_rowsets_cached_data_size = 0;
+    cloud_base_compaction._input_rowsets_cached_index_size = 0;
+    ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false);
+
+    cloud_base_compaction._input_rowsets_total_size = 100;
+    cloud_base_compaction._input_rowsets_cached_data_size = 0;
+    cloud_base_compaction._input_rowsets_cached_index_size = 0;
+    ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false);
+
+    cloud_base_compaction._input_rowsets_total_size = 100;
+    cloud_base_compaction._input_rowsets_cached_data_size = 70;
+    cloud_base_compaction._input_rowsets_cached_index_size = 0;
+    ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false);
+
+    cloud_base_compaction._input_rowsets_total_size = 100;
+    cloud_base_compaction._input_rowsets_cached_data_size = 0;
+    cloud_base_compaction._input_rowsets_cached_index_size = 70;
+    ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false);
+
+    cloud_base_compaction._input_rowsets_total_size = 100;
+    cloud_base_compaction._input_rowsets_cached_data_size = 0;
+    cloud_base_compaction._input_rowsets_cached_index_size = 70;
+    ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false);
+
+    cloud_base_compaction._input_rowsets_total_size = 100;
+    cloud_base_compaction._input_rowsets_cached_data_size = 80;
+    cloud_base_compaction._input_rowsets_cached_index_size = 0;
+    ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), true);
+
+    cloud_base_compaction._input_rowsets_total_size = 100;
+    cloud_base_compaction._input_rowsets_cached_data_size = 0;
+    cloud_base_compaction._input_rowsets_cached_index_size = 80;
+    ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), true);
+
+    cloud_base_compaction._input_rowsets_total_size = 100;
+    cloud_base_compaction._input_rowsets_cached_data_size = 50;
+    cloud_base_compaction._input_rowsets_cached_index_size = 50;
+    ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), true);
+    LOG(INFO) << "should_cache_compaction_output done";
+}
 } // namespace doris
diff --git a/docker/runtime/doris-compose/cluster.py 
b/docker/runtime/doris-compose/cluster.py
index cf63f403240..d7f038f839b 100644
--- a/docker/runtime/doris-compose/cluster.py
+++ b/docker/runtime/doris-compose/cluster.py
@@ -612,8 +612,6 @@ class BE(Node):
         cfg += [
             'enable_java_support = false',
         ]
-        if self.cluster.be_config:
-            cfg += self.cluster.be_config
         if self.cluster.is_cloud:
             cfg += [
                 'tmp_file_dirs = [ 
{"path":"./storage/tmp","max_cache_bytes":10240000, 
"max_upload_bytes":10240000}]',
@@ -636,6 +634,8 @@ class BE(Node):
                 cfg += [
                     "cloud_unique_id = " + self.cloud_unique_id(),
                 ]
+        if self.cluster.be_config:
+            cfg += self.cluster.be_config
         return cfg
 
     def init_disk(self, be_disks):
diff --git 
a/regression-test/suites/compaction/test_filecache_with_base_compaction.groovy 
b/regression-test/suites/compaction/test_filecache_with_base_compaction.groovy
index 7c067afd768..966da184792 100644
--- 
a/regression-test/suites/compaction/test_filecache_with_base_compaction.groovy
+++ 
b/regression-test/suites/compaction/test_filecache_with_base_compaction.groovy
@@ -29,7 +29,7 @@ suite("test_filecache_with_base_compaction", "docker") {
     options.beConfigs.add('enable_flush_file_cache_async=false')
     
options.beConfigs.add('file_cache_enter_disk_resource_limit_mode_percent=99')
     options.beConfigs.add('enable_evict_file_cache_in_advance=false')
-    options.beConfigs.add('')
+    
options.beConfigs.add('file_cache_path=[{"path":"/opt/apache-doris/be/storage/file_cache","total_size":83886080,"query_limit":83886080}]')
 
     def testTable = "test_filecache_with_base_compaction"
     def backendId_to_backendIP = [:]
@@ -190,6 +190,12 @@ suite("test_filecache_with_base_compaction", "docker") {
             def data = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat";,
 true)
             logger.info("file cache data: ${data}")
             assertTrue(data.size() > 0)
+            def segments = data.stream()
+                .filter(item -> !item.endsWith("_idx"))
+                .count();
+            logger.info("segments: ${segments}")
+            assertTrue(segments > 0)
         }
+        // test_filecache_with_base_compaction_thresthold case4 can cover the 
space not enough case
     }
 }
diff --git 
a/regression-test/suites/compaction/test_filecache_with_base_compaction_thresthold.groovy
 
b/regression-test/suites/compaction/test_filecache_with_base_compaction_thresthold.groovy
new file mode 100644
index 00000000000..c30e08d1f4f
--- /dev/null
+++ 
b/regression-test/suites/compaction/test_filecache_with_base_compaction_thresthold.groovy
@@ -0,0 +1,740 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+ 
+suite("test_filecache_with_base_compaction_thresthold", "docker") {
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.setFeNum(1)
+    options.setBeNum(1)
+
+    options.beConfigs.add('enable_flush_file_cache_async=false')
+    
options.beConfigs.add('file_cache_enter_disk_resource_limit_mode_percent=99')
+    options.beConfigs.add('enable_evict_file_cache_in_advance=false')
+    // 80MB file cache size
+    
options.beConfigs.add('file_cache_path=[{"path":"/opt/apache-doris/be/storage/file_cache","total_size":83886080,"query_limit":83886080}]')
+
+    def testTable = "test_filecache_with_base_compaction_thresthold"
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    def backendId_to_backendBrpcPort = [:]
+
+    def triggerCumulativeCompaction = { tablet ->
+        String tablet_id = tablet.TabletId
+        String trigger_backend_id = tablet.BackendId
+        def be_host = backendId_to_backendIP[trigger_backend_id]
+        def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+        def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, 
be_http_port, tablet_id)
+        logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", 
err=" + err_1)
+        assertEquals(code_1, 0)
+        return out_1
+    }
+
+    def triggerBaseCompaction = { tablet ->
+        String tablet_id = tablet.TabletId
+        String trigger_backend_id = tablet.BackendId
+        def be_host = backendId_to_backendIP[trigger_backend_id]
+        def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+        def (code_1, out_1, err_1) = be_run_base_compaction(be_host, 
be_http_port, tablet_id)
+        logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", 
err=" + err_1)
+        assertEquals(code_1, 0)
+        return out_1
+    }
+
+    def getTabletStatus = { tablet ->
+        String tablet_id = tablet.TabletId
+        String trigger_backend_id = tablet.BackendId
+        def be_host = backendId_to_backendIP[trigger_backend_id]
+        def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+        StringBuilder sb = new StringBuilder();
+        sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+        sb.append("/api/compaction/show?tablet_id=")
+        sb.append(tablet_id)
+
+        String command = sb.toString()
+        logger.info(command)
+        def process = command.execute()
+        def code = process.waitFor()
+        def out = process.getText()
+        logger.info("Get tablet status:  =" + code + ", out=" + out)
+        assertEquals(code, 0)
+        def tabletStatus = parseJson(out.trim())
+        return tabletStatus
+    }
+
+    def waitForCompaction = { tablet ->
+        String tablet_id = tablet.TabletId
+        String trigger_backend_id = tablet.BackendId
+        def be_host = backendId_to_backendIP[trigger_backend_id]
+        def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+        def running = true
+        do {
+            Thread.sleep(20000)
+            StringBuilder sb = new StringBuilder();
+            sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+            sb.append("/api/compaction/run_status?tablet_id=")
+            sb.append(tablet_id)
+
+            String command = sb.toString()
+            logger.info(command)
+            def process = command.execute()
+            def code = process.waitFor()
+            def out = process.getText()
+            logger.info("Get compaction status: code=" + code + ", out=" + out)
+            assertEquals(code, 0)
+            def compactionStatus = parseJson(out.trim())
+            assertEquals("success", compactionStatus.status.toLowerCase())
+            running = compactionStatus.run_status
+        } while (running)
+    }
+
+    docker(options) {
+        def fes = sql_return_maparray "show frontends"
+        logger.info("frontends: ${fes}")
+        def url = "jdbc:mysql://${fes[0].Host}:${fes[0].QueryPort}/"
+        logger.info("url: " + url)
+
+        def result = sql 'SELECT DATABASE()'
+
+        sql """ DROP TABLE IF EXISTS ${testTable} force;"""
+
+        // ================= case1: input_rowsets_hit_cache_ratio=0.703611, 
_input_rowsets_cached_size=79656542, _input_rowsets_total_size=113210974 
=================
+        // real_ratio > file_cache_keep_base_compaction_output_min_hit_ratio = 
0.7, it will write file cache
+        sql """
+            CREATE TABLE IF NOT EXISTS ${testTable} (
+                ss_sold_date_sk bigint,
+                ss_sold_time_sk bigint,
+                ss_item_sk bigint,
+                ss_customer_sk bigint,
+                ss_cdemo_sk bigint,
+                ss_hdemo_sk bigint,
+                ss_addr_sk bigint,
+                ss_store_sk bigint,
+                ss_promo_sk bigint,
+                ss_ticket_number bigint,
+                ss_quantity integer,
+                ss_wholesale_cost decimal(7,2),
+                ss_list_price decimal(7,2),
+                ss_sales_price decimal(7,2),
+                ss_ext_discount_amt decimal(7,2),
+                ss_ext_sales_price decimal(7,2),
+                ss_ext_wholesale_cost decimal(7,2),
+                ss_ext_list_price decimal(7,2),
+                ss_ext_tax decimal(7,2),
+                ss_coupon_amt decimal(7,2),
+                ss_net_paid decimal(7,2),
+                ss_net_paid_inc_tax decimal(7,2),
+                ss_net_profit decimal(7,2)
+            )
+            DUPLICATE KEY(ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, 
ss_customer_sk)
+            DISTRIBUTED BY HASH(ss_customer_sk) BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "1",
+                "disable_auto_compaction" = "true"
+            )
+
+        """
+
+        getBackendIpHttpAndBrpcPort(backendId_to_backendIP, 
backendId_to_backendHttpPort, backendId_to_backendBrpcPort);
+
+        def tablets = sql_return_maparray """ show tablets from ${testTable}; 
"""
+        logger.info("tablets: " + tablets)
+        assertEquals(1, tablets.size())
+        def tablet = tablets[0]
+        String tablet_id = tablet.TabletId
+
+        streamLoad {
+            table testTable
+
+            // default label is UUID:
+            // set 'label' UUID.randomUUID().toString()
+
+            // default column_separator is specify in doris fe config, usually 
is '\t'.
+            // this line change to ','
+            set 'column_separator', '|'
+            set 'compress_type', 'GZ'
+
+            file """${getS3Url()}/regression/tpcds/sf1/store_sales.dat.gz"""
+
+            time 10000 // limit inflight 10s
+            check { res, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${res}".toString())
+                def json = parseJson(res)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+
+        sql """ sync """
+        sql "select * from ${testTable}"
+        def tablet_status = getTabletStatus(tablet)
+        logger.info("tablet status: ${tablet_status}")
+
+        triggerCumulativeCompaction(tablet)
+        waitForCompaction(tablet)
+        triggerCumulativeCompaction(tablet)
+        waitForCompaction(tablet)
+        triggerBaseCompaction(tablet)
+        waitForCompaction(tablet)
+
+        tablet_status = getTabletStatus(tablet)
+        logger.info("tablet status: ${tablet_status}")
+        def base_compaction_finished = false
+        Set<String> final_rowsets = new HashSet<>();
+        for (int i = 0; i < 60; i++) {
+            tablet_status = getTabletStatus(tablet)
+            if (tablet_status["rowsets"].size() == 2) {
+                base_compaction_finished = true
+                final_rowsets.addAll(tablet_status["rowsets"])
+                break
+            }
+            sleep(10000)
+        }
+        assertTrue(base_compaction_finished)
+
+        def be_host = backendId_to_backendIP[tablet.BackendId]
+        def be_http_port = backendId_to_backendHttpPort[tablet.BackendId]
+
+        for (int i = 0; i < final_rowsets.size(); i++) {
+            def rowsetStr = final_rowsets[i]
+            def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+            def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+            def rowset_id = rowsetStr.split(" ")[4]
+            if (start_version == 0) {
+                continue
+            }
+
+            logger.info("final rowset ${i}, start: ${start_version}, end: 
${end_version}, id: ${rowset_id}")
+            def data = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat";,
 true)
+            logger.info("file cache data: ${data}")
+            assertTrue(data.size() > 0)
+            def segments = data.stream()
+                .filter(item -> !item.endsWith("_idx"))
+                .count();
+            logger.info("segments: ${segments}")
+            assertTrue(segments > 0)
+        }
+
+        def (code_0, out_0, err_0) = curl("GET", 
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_size";)
+        logger.info("base_compaction_input_size: ${out_0}")
+        def size = out_0.trim().split(":")[1].trim().toInteger()
+        assertTrue(size > 100 * 1024 * 1024)
+
+        (code_0, out_0, err_0) = curl("GET", 
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_cached_size";)
+        logger.info("base_compaction_input_cached_size: ${out_0}")
+        size = out_0.trim().split(":")[1].trim().toInteger()
+        assertTrue(size > 70 * 1024 * 1024)
+
+        // =================case2: input_rowsets_hit_cache_ratio=0.703524, 
total_cached_size=62914560 (60MB), total_size=113177376 =================
+        //  real_ratio < file_cache_keep_base_compaction_output_min_hit_ratio 
= 0.8, so not write file cache
+        result = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=clear&sync=true";, 
true)
+        logger.info("clear file cache data: ${result}")
+
+        for (int i = 0; i < final_rowsets.size(); i++) {
+            def rowsetStr = final_rowsets[i]
+            def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+            def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+            def rowset_id = rowsetStr.split(" ")[4]
+            if (start_version == 0) {
+                continue
+            }
+
+            logger.info("final rowset ${i}, start: ${start_version}, end: 
${end_version}, id: ${rowset_id}")
+            def data = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat";,
 true)
+            logger.info("file cache data: ${data}")
+            assertTrue(data.size() == 0)
+        }
+
+        def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, be_host, be_http_port, 
"file_cache_keep_base_compaction_output_min_hit_ratio", 0.8))
+        assertTrue(out.contains("OK"))
+        sql """ DROP TABLE IF EXISTS ${testTable} force;"""
+
+        sql """
+            CREATE TABLE IF NOT EXISTS ${testTable} (
+                ss_sold_date_sk bigint,
+                ss_sold_time_sk bigint,
+                ss_item_sk bigint,
+                ss_customer_sk bigint,
+                ss_cdemo_sk bigint,
+                ss_hdemo_sk bigint,
+                ss_addr_sk bigint,
+                ss_store_sk bigint,
+                ss_promo_sk bigint,
+                ss_ticket_number bigint,
+                ss_quantity integer,
+                ss_wholesale_cost decimal(7,2),
+                ss_list_price decimal(7,2),
+                ss_sales_price decimal(7,2),
+                ss_ext_discount_amt decimal(7,2),
+                ss_ext_sales_price decimal(7,2),
+                ss_ext_wholesale_cost decimal(7,2),
+                ss_ext_list_price decimal(7,2),
+                ss_ext_tax decimal(7,2),
+                ss_coupon_amt decimal(7,2),
+                ss_net_paid decimal(7,2),
+                ss_net_paid_inc_tax decimal(7,2),
+                ss_net_profit decimal(7,2)
+            )
+            DUPLICATE KEY(ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, 
ss_customer_sk)
+            DISTRIBUTED BY HASH(ss_customer_sk) BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "1",
+                "disable_auto_compaction" = "true"
+            )
+        """
+
+        tablets = sql_return_maparray """ show tablets from ${testTable}; """
+        logger.info("tablets: " + tablets)
+        assertEquals(1, tablets.size())
+        tablet = tablets[0]
+        tablet_id = tablet.TabletId
+
+        streamLoad {
+            table testTable
+
+            // default label is UUID:
+            // set 'label' UUID.randomUUID().toString()
+
+            // default column_separator is specify in doris fe config, usually 
is '\t'.
+            // this line change to ','
+            set 'column_separator', '|'
+            set 'compress_type', 'GZ'
+
+            file """${getS3Url()}/regression/tpcds/sf1/store_sales.dat.gz"""
+
+            time 10000 // limit inflight 10s
+            check { res, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${res}".toString())
+                def json = parseJson(res)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+
+        sql """ sync """
+        sql "select * from ${testTable}"
+        tablet_status = getTabletStatus(tablet)
+        logger.info("tablet status: ${tablet_status}")
+
+        triggerCumulativeCompaction(tablet)
+        waitForCompaction(tablet)
+        triggerCumulativeCompaction(tablet)
+        waitForCompaction(tablet)
+        triggerBaseCompaction(tablet)
+        waitForCompaction(tablet)
+
+        tablet_status = getTabletStatus(tablet)
+        logger.info("tablet status: ${tablet_status}")
+        base_compaction_finished = false
+        final_rowsets = new HashSet<>();
+        for (int i = 0; i < 60; i++) {
+            tablet_status = getTabletStatus(tablet)
+            if (tablet_status["rowsets"].size() == 2) {
+                base_compaction_finished = true
+                final_rowsets.addAll(tablet_status["rowsets"])
+                break
+            }
+            sleep(10000)
+        }
+        assertTrue(base_compaction_finished)
+
+        be_host = backendId_to_backendIP[tablet.BackendId]
+        be_http_port = backendId_to_backendHttpPort[tablet.BackendId]
+
+        for (int i = 0; i < final_rowsets.size(); i++) {
+            def rowsetStr = final_rowsets[i]
+            def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+            def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+            def rowset_id = rowsetStr.split(" ")[4]
+            if (start_version == 0) {
+                continue
+            }
+
+            logger.info("final rowset ${i}, start: ${start_version}, end: 
${end_version}, id: ${rowset_id}")
+            def data = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat";,
 true)
+            logger.info("file cache data: ${data}")
+
+            def segments = data.stream()
+                .filter(item -> !item.endsWith("_idx"))
+                .count();
+            logger.info("segments: ${segments}")
+            assertTrue(segments == 0)
+        }
+
+        (code_0, out_0, err_0) = curl("GET", 
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_size";)
+        logger.info("base_compaction_input_size: ${out_0}")
+        size = out_0.trim().split(":")[1].trim().toInteger()
+        assertTrue(size > 2 * 100 * 1024 * 1024)
+
+        (code_0, out_0, err_0) = curl("GET", 
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_cached_size";)
+        logger.info("base_compaction_input_cached_size: ${out_0}")
+        size = out_0.trim().split(":")[1].trim().toInteger()
+        assertTrue(size > 70 * 1024 * 1024)
+
+        // =================case3: 
file_cache_keep_base_compaction_output_min_hit_ratio = 0 =================
+        //  real_ratio > file_cache_keep_base_compaction_output_min_hit_ratio 
= 0, it will write file cache
+        result = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=clear&sync=true";, 
true)
+        logger.info("clear file cache data: ${result}")
+
+        for (int i = 0; i < final_rowsets.size(); i++) {
+            def rowsetStr = final_rowsets[i]
+            def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+            def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+            def rowset_id = rowsetStr.split(" ")[4]
+            if (start_version == 0) {
+                continue
+            }
+
+            logger.info("final rowset ${i}, start: ${start_version}, end: 
${end_version}, id: ${rowset_id}")
+            def data = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat";,
 true)
+            logger.info("file cache data: ${data}")
+            assertTrue(data.size() == 0)
+        }
+
+        (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, be_host, be_http_port, 
"file_cache_keep_base_compaction_output_min_hit_ratio", 0))
+        assertTrue(out.contains("OK"))
+        sql """ DROP TABLE IF EXISTS ${testTable} force;"""
+
+        sql """
+            CREATE TABLE IF NOT EXISTS ${testTable} (
+                ss_sold_date_sk bigint,
+                ss_sold_time_sk bigint,
+                ss_item_sk bigint,
+                ss_customer_sk bigint,
+                ss_cdemo_sk bigint,
+                ss_hdemo_sk bigint,
+                ss_addr_sk bigint,
+                ss_store_sk bigint,
+                ss_promo_sk bigint,
+                ss_ticket_number bigint,
+                ss_quantity integer,
+                ss_wholesale_cost decimal(7,2),
+                ss_list_price decimal(7,2),
+                ss_sales_price decimal(7,2),
+                ss_ext_discount_amt decimal(7,2),
+                ss_ext_sales_price decimal(7,2),
+                ss_ext_wholesale_cost decimal(7,2),
+                ss_ext_list_price decimal(7,2),
+                ss_ext_tax decimal(7,2),
+                ss_coupon_amt decimal(7,2),
+                ss_net_paid decimal(7,2),
+                ss_net_paid_inc_tax decimal(7,2),
+                ss_net_profit decimal(7,2)
+            )
+            DUPLICATE KEY(ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, 
ss_customer_sk)
+            DISTRIBUTED BY HASH(ss_customer_sk) BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "1",
+                "disable_auto_compaction" = "true"
+            )
+        """
+
+        tablets = sql_return_maparray """ show tablets from ${testTable}; """
+        logger.info("tablets: " + tablets)
+        assertEquals(1, tablets.size())
+        tablet = tablets[0]
+        tablet_id = tablet.TabletId
+
+        streamLoad {
+            table testTable
+
+            // default label is UUID:
+            // set 'label' UUID.randomUUID().toString()
+
+            // default column_separator is specify in doris fe config, usually 
is '\t'.
+            // this line change to ','
+            set 'column_separator', '|'
+            set 'compress_type', 'GZ'
+
+            file """${getS3Url()}/regression/tpcds/sf1/store_sales.dat.gz"""
+
+            time 10000 // limit inflight 10s
+            check { res, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${res}".toString())
+                def json = parseJson(res)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+
+        sql """ sync """
+        sql "select * from ${testTable}"
+        tablet_status = getTabletStatus(tablet)
+        logger.info("tablet status: ${tablet_status}")
+
+        triggerCumulativeCompaction(tablet)
+        waitForCompaction(tablet)
+        triggerCumulativeCompaction(tablet)
+        waitForCompaction(tablet)
+        triggerBaseCompaction(tablet)
+        waitForCompaction(tablet)
+
+        tablet_status = getTabletStatus(tablet)
+        logger.info("tablet status: ${tablet_status}")
+        base_compaction_finished = false
+        final_rowsets = new HashSet<>();
+        for (int i = 0; i < 60; i++) {
+            tablet_status = getTabletStatus(tablet)
+            if (tablet_status["rowsets"].size() == 2) {
+                base_compaction_finished = true
+                final_rowsets.addAll(tablet_status["rowsets"])
+                break
+            }
+            sleep(10000)
+        }
+        assertTrue(base_compaction_finished)
+
+        be_host = backendId_to_backendIP[tablet.BackendId]
+        be_http_port = backendId_to_backendHttpPort[tablet.BackendId]
+
+        for (int i = 0; i < final_rowsets.size(); i++) {
+            def rowsetStr = final_rowsets[i]
+            def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+            def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+            def rowset_id = rowsetStr.split(" ")[4]
+            if (start_version == 0) {
+                continue
+            }
+
+            logger.info("final rowset ${i}, start: ${start_version}, end: 
${end_version}, id: ${rowset_id}")
+            def data = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat";,
 true)
+            logger.info("file cache data: ${data}")
+
+            def segments = data.stream()
+                .filter(item -> !item.endsWith("_idx"))
+                .count();
+            logger.info("segments: ${segments}")
+            assertTrue(segments > 0)
+            assertTrue(data.size() > 0)
+        }
+
+        (code_0, out_0, err_0) = curl("GET", 
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_size";)
+        logger.info("base_compaction_input_size: ${out_0}")
+        size = out_0.trim().split(":")[1].trim().toInteger()
+        assertTrue(size > 3 * 100 * 1024 * 1024)
+
+        (code_0, out_0, err_0) = curl("GET", 
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_cached_size";)
+        logger.info("base_compaction_input_cached_size: ${out_0}")
+        size = out_0.trim().split(":")[1].trim().toInteger()
+        assertTrue(size > 70 * 1024 * 1024)
+
+        // =================case4: 
file_cache_keep_base_compaction_output_min_hit_ratio = 1 =================
+        //  real_ratio < file_cache_keep_base_compaction_output_min_hit_ratio 
= 1, it will write not file cache
+        result = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=clear&sync=true";, 
true)
+        logger.info("clear file cache data: ${result}")
+
+        for (int i = 0; i < final_rowsets.size(); i++) {
+            def rowsetStr = final_rowsets[i]
+            def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+            def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+            def rowset_id = rowsetStr.split(" ")[4]
+            if (start_version == 0) {
+                continue
+            }
+
+            logger.info("final rowset ${i}, start: ${start_version}, end: 
${end_version}, id: ${rowset_id}")
+            def data = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat";,
 true)
+            logger.info("file cache data: ${data}")
+            assertTrue(data.size() == 0)
+        }
+
+        (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, be_host, be_http_port, 
"file_cache_keep_base_compaction_output_min_hit_ratio", 1))
+        assertTrue(out.contains("OK"))
+        sql """ DROP TABLE IF EXISTS ${testTable} force;"""
+
+        sql """
+            CREATE TABLE IF NOT EXISTS ${testTable} (
+                ss_sold_date_sk bigint,
+                ss_sold_time_sk bigint,
+                ss_item_sk bigint,
+                ss_customer_sk bigint,
+                ss_cdemo_sk bigint,
+                ss_hdemo_sk bigint,
+                ss_addr_sk bigint,
+                ss_store_sk bigint,
+                ss_promo_sk bigint,
+                ss_ticket_number bigint,
+                ss_quantity integer,
+                ss_wholesale_cost decimal(7,2),
+                ss_list_price decimal(7,2),
+                ss_sales_price decimal(7,2),
+                ss_ext_discount_amt decimal(7,2),
+                ss_ext_sales_price decimal(7,2),
+                ss_ext_wholesale_cost decimal(7,2),
+                ss_ext_list_price decimal(7,2),
+                ss_ext_tax decimal(7,2),
+                ss_coupon_amt decimal(7,2),
+                ss_net_paid decimal(7,2),
+                ss_net_paid_inc_tax decimal(7,2),
+                ss_net_profit decimal(7,2)
+            )
+            DUPLICATE KEY(ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, 
ss_customer_sk)
+            DISTRIBUTED BY HASH(ss_customer_sk) BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "1",
+                "disable_auto_compaction" = "true"
+            )
+        """
+
+        tablets = sql_return_maparray """ show tablets from ${testTable}; """
+        logger.info("tablets: " + tablets)
+        assertEquals(1, tablets.size())
+        tablet = tablets[0]
+        tablet_id = tablet.TabletId
+
+        streamLoad {
+            table testTable
+
+            // default label is UUID:
+            // set 'label' UUID.randomUUID().toString()
+
+            // default column_separator is specify in doris fe config, usually 
is '\t'.
+            // this line change to ','
+            set 'column_separator', '|'
+            set 'compress_type', 'GZ'
+
+            file """${getS3Url()}/regression/tpcds/sf1/store_sales.dat.gz"""
+
+            time 10000 // limit inflight 10s
+            check { res, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${res}".toString())
+                def json = parseJson(res)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+        sql """ delete from ${testTable} where ss_item_sk=2; """
+
+        sql """ sync """
+
+        sql "select * from ${testTable}"
+        tablet_status = getTabletStatus(tablet)
+        logger.info("tablet status: ${tablet_status}")
+
+        triggerCumulativeCompaction(tablet)
+        waitForCompaction(tablet)
+        triggerCumulativeCompaction(tablet)
+        waitForCompaction(tablet)
+
+        sql "select * from ${testTable}"
+        tablet_status = getTabletStatus(tablet)
+        logger.info("tablet status: ${tablet_status}")
+
+        triggerBaseCompaction(tablet)
+        waitForCompaction(tablet)
+
+        tablet_status = getTabletStatus(tablet)
+        logger.info("tablet status: ${tablet_status}")
+        base_compaction_finished = false
+        final_rowsets = new HashSet<>();
+        for (int i = 0; i < 60; i++) {
+            tablet_status = getTabletStatus(tablet)
+            if (tablet_status["rowsets"].size() == 2) {
+                base_compaction_finished = true
+                final_rowsets.addAll(tablet_status["rowsets"])
+                break
+            }
+            sleep(10000)
+        }
+        assertTrue(base_compaction_finished)
+
+        be_host = backendId_to_backendIP[tablet.BackendId]
+        be_http_port = backendId_to_backendHttpPort[tablet.BackendId]
+
+        for (int i = 0; i < final_rowsets.size(); i++) {
+            def rowsetStr = final_rowsets[i]
+            def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+            def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+            def rowset_id = rowsetStr.split(" ")[4]
+            if (start_version == 0) {
+                continue
+            }
+
+            logger.info("final rowset ${i}, start: ${start_version}, end: 
${end_version}, id: ${rowset_id}")
+            def data = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat";,
 true)
+            logger.info("file cache data: ${data}")
+
+            def segments = data.stream()
+                .filter(item -> !item.endsWith("_idx"))
+                .count();
+            logger.info("segments: ${segments}")
+            assertTrue(segments == 0)
+        }
+
+        (code_0, out_0, err_0) = curl("GET", 
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_size";)
+        logger.info("base_compaction_input_size: ${out_0}")
+        size = out_0.trim().split(":")[1].trim().toInteger()
+        assertTrue(size > 4 * 100 * 1024 * 1024)
+
+        (code_0, out_0, err_0) = curl("GET", 
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_cached_size";)
+        logger.info("base_compaction_input_cached_size: ${out_0}")
+        size = out_0.trim().split(":")[1].trim().toInteger()
+        assertTrue(size > 70 * 1024 * 1024)
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to