This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2864924bdce [feat](compaction) Make base compaction output rowset
write filecache more adaptive (#54694)
2864924bdce is described below
commit 2864924bdcef4fa20edf01ec39c81a138bf850ba
Author: Lei Zhang <[email protected]>
AuthorDate: Fri Sep 12 09:33:13 2025 +0800
[feat](compaction) Make base compaction output rowset write filecache more
adaptive (#54694)
* when base compaction input rowsets (total_cache_size /
total_data_size) > file_cache_keep_base_compaction_output_min_hit_ratio
output rowsets will write to filecache
---
be/src/cloud/cloud_base_compaction.cpp | 10 +-
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/io/cache/block_file_cache_factory.cpp | 13 +
be/src/io/cache/block_file_cache_factory.h | 1 +
be/src/io/fs/file_writer.h | 6 +
be/src/olap/compaction.cpp | 43 +-
be/src/olap/compaction.h | 5 +
be/src/olap/rowset/rowset.cpp | 31 +
be/src/olap/rowset/rowset.h | 4 +
be/test/cloud/cloud_compaction_test.cpp | 45 ++
docker/runtime/doris-compose/cluster.py | 4 +-
.../test_filecache_with_base_compaction.groovy | 8 +-
...ilecache_with_base_compaction_thresthold.groovy | 740 +++++++++++++++++++++
14 files changed, 904 insertions(+), 8 deletions(-)
diff --git a/be/src/cloud/cloud_base_compaction.cpp
b/be/src/cloud/cloud_base_compaction.cpp
index 79ebe1b3712..e3f7ae2f730 100644
--- a/be/src/cloud/cloud_base_compaction.cpp
+++ b/be/src/cloud/cloud_base_compaction.cpp
@@ -35,6 +35,8 @@ namespace doris {
using namespace ErrorCode;
bvar::Adder<uint64_t> base_output_size("base_compaction", "output_size");
+bvar::Adder<uint64_t> base_input_cached_size("base_compaction",
"input_cached_size");
+bvar::Adder<uint64_t> base_input_size("base_compaction", "input_size");
bvar::LatencyRecorder g_base_compaction_hold_delete_bitmap_lock_time_ms(
"base_compaction_hold_delete_bitmap_lock_time_ms");
@@ -82,6 +84,8 @@ Status CloudBaseCompaction::prepare_compact() {
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
+ _input_rowsets_cached_data_size += rs->approximate_cached_data_size();
+ _input_rowsets_cached_index_size += rs->approximate_cache_index_size();
}
LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]",
_tablet->tablet_id(),
_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version())
@@ -91,7 +95,11 @@ Status CloudBaseCompaction::prepare_compact() {
.tag("input_segments", _input_segments)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
- .tag("input_rowsets_total_size", _input_rowsets_total_size);
+ .tag("input_rowsets_total_size", _input_rowsets_total_size)
+ .tag("input_rowsets_cached_data_size",
_input_rowsets_cached_data_size)
+ .tag("input_rowsets_cached_index_size",
_input_rowsets_cached_index_size);
+ base_input_cached_size << (_input_rowsets_cached_data_size +
_input_rowsets_cached_index_size);
+ base_input_size << _input_rowsets_total_size;
return Status::OK();
}
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f777d25c7f5..bbfa8b3ce12 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1141,6 +1141,7 @@ DEFINE_mInt64(cache_lock_held_long_tail_threshold_us,
"30000000");
// will write to file cache; satisfying any of the two conditions will write
to file cache.
DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");
DEFINE_mBool(enable_file_cache_adaptive_write, "true");
+DEFINE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio, "0.7");
DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 01ebefd833b..93b765d477a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1164,6 +1164,7 @@ DECLARE_mInt64(cache_lock_held_long_tail_threshold_us);
// enable this option; otherwise, it is recommended to leave it disabled.
DECLARE_mBool(enable_file_cache_keep_base_compaction_output);
DECLARE_mBool(enable_file_cache_adaptive_write);
+DECLARE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio);
DECLARE_mInt64(file_cache_remove_block_qps_limit);
DECLARE_mInt64(file_cache_background_gc_interval_ms);
DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);
diff --git a/be/src/io/cache/block_file_cache_factory.cpp
b/be/src/io/cache/block_file_cache_factory.cpp
index 4112902aa2b..c9631e35dc4 100644
--- a/be/src/io/cache/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block_file_cache_factory.cpp
@@ -133,6 +133,19 @@ std::vector<std::string>
FileCacheFactory::get_cache_file_by_path(const UInt128W
return ret;
}
+int64_t FileCacheFactory::get_cache_file_size_by_path(const UInt128Wrapper&
hash) {
+ io::BlockFileCache* cache =
io::FileCacheFactory::instance()->get_by_path(hash);
+ auto blocks = cache->get_blocks_by_key(hash);
+ if (blocks.empty()) {
+ return 0;
+ }
+ int64_t cache_size = 0;
+ for (auto& [_, fb] : blocks) {
+ cache_size += fb->range().size();
+ }
+ return cache_size;
+}
+
BlockFileCache* FileCacheFactory::get_by_path(const UInt128Wrapper& key) {
// dont need lock mutex because _caches is immutable after
create_file_cache
return _caches[KeyHash()(key) % _caches.size()].get();
diff --git a/be/src/io/cache/block_file_cache_factory.h
b/be/src/io/cache/block_file_cache_factory.h
index b00bd7bdfcb..4366afc72eb 100644
--- a/be/src/io/cache/block_file_cache_factory.h
+++ b/be/src/io/cache/block_file_cache_factory.h
@@ -63,6 +63,7 @@ public:
[[nodiscard]] size_t get_cache_instance_size() const { return
_caches.size(); }
std::vector<std::string> get_cache_file_by_path(const UInt128Wrapper&
hash);
+ int64_t get_cache_file_size_by_path(const UInt128Wrapper& hash);
BlockFileCache* get_by_path(const UInt128Wrapper& hash);
BlockFileCache* get_by_path(const std::string& cache_base_path);
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 6220d1c9210..74ab7b57880 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -101,6 +101,12 @@ protected:
(file_cache_ptr->approximate_available_cache_size() >
opts->approximate_bytes_to_write);
+ VLOG_DEBUG << "path:" << path.filename().native()
+ << ", write_file_cache:" << opts->write_file_cache
+ << ", has_enough_file_cache_space:" <<
has_enough_file_cache_space
+ << ", approximate_bytes_to_write:" <<
opts->approximate_bytes_to_write
+ << ", file_cache_available_size:"
+ << file_cache_ptr->approximate_available_cache_size();
if (opts->write_file_cache || has_enough_file_cache_space) {
_cache_builder =
std::make_unique<FileCacheAllocatorBuilder>(FileCacheAllocatorBuilder {
opts ? opts->is_cold_data : false, opts ?
opts->file_cache_expiration : 0,
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 9dd8dd5d2f8..0458be237f0 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -86,7 +86,6 @@ using std::vector;
namespace doris {
using namespace ErrorCode;
-
namespace {
#include "common/compile_check_begin.h"
@@ -1617,9 +1616,8 @@ Status
CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
// We presume that the data involved in cumulative compaction is
sufficiently 'hot'
// and should always be retained in the cache.
// TODO(gavin): Ensure that the retention of hot data is implemented with
precision.
- ctx.write_file_cache = (compaction_type() ==
ReaderType::READER_CUMULATIVE_COMPACTION) ||
-
(config::enable_file_cache_keep_base_compaction_output &&
- compaction_type() ==
ReaderType::READER_BASE_COMPACTION);
+
+ ctx.write_file_cache = should_cache_compaction_output();
ctx.file_cache_ttl_sec = _tablet->ttl_seconds();
ctx.approximate_bytes_to_write = _input_rowsets_total_size;
@@ -1674,5 +1672,42 @@ int64_t CloudCompactionMixin::num_input_rowsets() const {
return count;
}
+bool CloudCompactionMixin::should_cache_compaction_output() {
+ if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
+ return true;
+ }
+
+ if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
+ double input_rowsets_hit_cache_ratio = 0.0;
+
+ int64_t _input_rowsets_cached_size =
+ _input_rowsets_cached_data_size +
_input_rowsets_cached_index_size;
+ if (_input_rowsets_total_size > 0) {
+ input_rowsets_hit_cache_ratio =
+ double(_input_rowsets_cached_size) /
double(_input_rowsets_total_size);
+ }
+
+ LOG(INFO) << "CloudBaseCompaction should_cache_compaction_output"
+ << ", tablet_id=" << _tablet->tablet_id()
+ << ", input_rowsets_hit_cache_ratio=" <<
input_rowsets_hit_cache_ratio
+ << ", _input_rowsets_cached_size=" <<
_input_rowsets_cached_size
+ << ", _input_rowsets_total_size=" <<
_input_rowsets_total_size
+ << ", enable_file_cache_keep_base_compaction_output="
+ << config::enable_file_cache_keep_base_compaction_output
+ << ", file_cache_keep_base_compaction_output_min_hit_ratio="
+ <<
config::file_cache_keep_base_compaction_output_min_hit_ratio;
+
+ if (config::enable_file_cache_keep_base_compaction_output) {
+ return true;
+ }
+
+ if (input_rowsets_hit_cache_ratio >
+ config::file_cache_keep_base_compaction_output_min_hit_ratio) {
+ return true;
+ }
+ }
+ return false;
+}
+
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 57ce98bef15..fa111590933 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -125,6 +125,9 @@ protected:
int64_t _local_read_bytes_total {};
int64_t _remote_read_bytes_total {};
+ int64_t _input_rowsets_cached_data_size {0};
+ int64_t _input_rowsets_cached_index_size {0};
+
Merger::Statistics _stats;
RowsetSharedPtr _output_rowset;
@@ -247,6 +250,8 @@ private:
int64_t get_compaction_permits();
void update_compaction_level();
+
+ bool should_cache_compaction_output();
};
} // namespace doris
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index a810a2ab98a..c318cce9a42 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -203,6 +203,37 @@ std::vector<std::string> Rowset::get_index_file_names() {
return file_names;
}
+int64_t Rowset::approximate_cached_data_size() {
+ if (!config::enable_file_cache) {
+ return 0;
+ }
+
+ int64_t total_cache_size = 0;
+ for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
+ auto cache_key =
segment_v2::Segment::file_cache_key(rowset_id().to_string(), seg_id);
+ int64_t cache_size =
+
io::FileCacheFactory::instance()->get_cache_file_size_by_path(cache_key);
+ total_cache_size += cache_size;
+ }
+ return total_cache_size;
+}
+
+int64_t Rowset::approximate_cache_index_size() {
+ if (!config::enable_file_cache) {
+ return 0;
+ }
+
+ int64_t total_cache_size = 0;
+ auto file_names = get_index_file_names();
+ for (const auto& file_name : file_names) {
+ auto cache_key = io::BlockFileCache::hash(file_name);
+ int64_t cache_size =
+
io::FileCacheFactory::instance()->get_cache_file_size_by_path(cache_key);
+ total_cache_size += cache_size;
+ }
+ return total_cache_size;
+}
+
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 6b105ed37e8..ae68eb0bcf4 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -323,6 +323,10 @@ public:
// set the rowset as a hole rowset
void set_hole_rowset(bool is_hole_rowset) { _is_hole_rowset =
is_hole_rowset; }
+ int64_t approximate_cached_data_size();
+
+ int64_t approximate_cache_index_size();
+
protected:
friend class RowsetFactory;
diff --git a/be/test/cloud/cloud_compaction_test.cpp
b/be/test/cloud/cloud_compaction_test.cpp
index 67ba84021ac..1da47c17195 100644
--- a/be/test/cloud/cloud_compaction_test.cpp
+++ b/be/test/cloud/cloud_compaction_test.cpp
@@ -23,6 +23,7 @@
#include <memory>
+#include "cloud/cloud_base_compaction.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
@@ -380,4 +381,48 @@ TEST_F(CloudCompactionTest,
test_set_storage_resource_from_input_rowsets) {
ASSERT_FALSE(ctx.storage_resource.has_value());
}
}
+TEST_F(CloudCompactionTest, should_cache_compaction_output) {
+ CloudTabletSPtr tablet = std::make_shared<CloudTablet>(_engine,
std::make_shared<TabletMeta>());
+ CloudBaseCompaction cloud_base_compaction(_engine, tablet);
+ cloud_base_compaction._input_rowsets_total_size = 0;
+ cloud_base_compaction._input_rowsets_cached_data_size = 0;
+ cloud_base_compaction._input_rowsets_cached_index_size = 0;
+ ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false);
+
+ cloud_base_compaction._input_rowsets_total_size = 100;
+ cloud_base_compaction._input_rowsets_cached_data_size = 0;
+ cloud_base_compaction._input_rowsets_cached_index_size = 0;
+ ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false);
+
+ cloud_base_compaction._input_rowsets_total_size = 100;
+ cloud_base_compaction._input_rowsets_cached_data_size = 70;
+ cloud_base_compaction._input_rowsets_cached_index_size = 0;
+ ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false);
+
+ cloud_base_compaction._input_rowsets_total_size = 100;
+ cloud_base_compaction._input_rowsets_cached_data_size = 0;
+ cloud_base_compaction._input_rowsets_cached_index_size = 70;
+ ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false);
+
+ cloud_base_compaction._input_rowsets_total_size = 100;
+ cloud_base_compaction._input_rowsets_cached_data_size = 0;
+ cloud_base_compaction._input_rowsets_cached_index_size = 70;
+ ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false);
+
+ cloud_base_compaction._input_rowsets_total_size = 100;
+ cloud_base_compaction._input_rowsets_cached_data_size = 80;
+ cloud_base_compaction._input_rowsets_cached_index_size = 0;
+ ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), true);
+
+ cloud_base_compaction._input_rowsets_total_size = 100;
+ cloud_base_compaction._input_rowsets_cached_data_size = 0;
+ cloud_base_compaction._input_rowsets_cached_index_size = 80;
+ ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), true);
+
+ cloud_base_compaction._input_rowsets_total_size = 100;
+ cloud_base_compaction._input_rowsets_cached_data_size = 50;
+ cloud_base_compaction._input_rowsets_cached_index_size = 50;
+ ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), true);
+ LOG(INFO) << "should_cache_compaction_output done";
+}
} // namespace doris
diff --git a/docker/runtime/doris-compose/cluster.py
b/docker/runtime/doris-compose/cluster.py
index cf63f403240..d7f038f839b 100644
--- a/docker/runtime/doris-compose/cluster.py
+++ b/docker/runtime/doris-compose/cluster.py
@@ -612,8 +612,6 @@ class BE(Node):
cfg += [
'enable_java_support = false',
]
- if self.cluster.be_config:
- cfg += self.cluster.be_config
if self.cluster.is_cloud:
cfg += [
'tmp_file_dirs = [
{"path":"./storage/tmp","max_cache_bytes":10240000,
"max_upload_bytes":10240000}]',
@@ -636,6 +634,8 @@ class BE(Node):
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
]
+ if self.cluster.be_config:
+ cfg += self.cluster.be_config
return cfg
def init_disk(self, be_disks):
diff --git
a/regression-test/suites/compaction/test_filecache_with_base_compaction.groovy
b/regression-test/suites/compaction/test_filecache_with_base_compaction.groovy
index 7c067afd768..966da184792 100644
---
a/regression-test/suites/compaction/test_filecache_with_base_compaction.groovy
+++
b/regression-test/suites/compaction/test_filecache_with_base_compaction.groovy
@@ -29,7 +29,7 @@ suite("test_filecache_with_base_compaction", "docker") {
options.beConfigs.add('enable_flush_file_cache_async=false')
options.beConfigs.add('file_cache_enter_disk_resource_limit_mode_percent=99')
options.beConfigs.add('enable_evict_file_cache_in_advance=false')
- options.beConfigs.add('')
+
options.beConfigs.add('file_cache_path=[{"path":"/opt/apache-doris/be/storage/file_cache","total_size":83886080,"query_limit":83886080}]')
def testTable = "test_filecache_with_base_compaction"
def backendId_to_backendIP = [:]
@@ -190,6 +190,12 @@ suite("test_filecache_with_base_compaction", "docker") {
def data =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat",
true)
logger.info("file cache data: ${data}")
assertTrue(data.size() > 0)
+ def segments = data.stream()
+ .filter(item -> !item.endsWith("_idx"))
+ .count();
+ logger.info("segments: ${segments}")
+ assertTrue(segments > 0)
}
+ // test_filecache_with_base_compaction_thresthold case4 can cover the
space not enough case
}
}
diff --git
a/regression-test/suites/compaction/test_filecache_with_base_compaction_thresthold.groovy
b/regression-test/suites/compaction/test_filecache_with_base_compaction_thresthold.groovy
new file mode 100644
index 00000000000..c30e08d1f4f
--- /dev/null
+++
b/regression-test/suites/compaction/test_filecache_with_base_compaction_thresthold.groovy
@@ -0,0 +1,740 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+
+suite("test_filecache_with_base_compaction_thresthold", "docker") {
+ def options = new ClusterOptions()
+ options.cloudMode = true
+ options.setFeNum(1)
+ options.setBeNum(1)
+
+ options.beConfigs.add('enable_flush_file_cache_async=false')
+
options.beConfigs.add('file_cache_enter_disk_resource_limit_mode_percent=99')
+ options.beConfigs.add('enable_evict_file_cache_in_advance=false')
+ // 80MB file cache size
+
options.beConfigs.add('file_cache_path=[{"path":"/opt/apache-doris/be/storage/file_cache","total_size":83886080,"query_limit":83886080}]')
+
+ def testTable = "test_filecache_with_base_compaction_thresthold"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_backendBrpcPort = [:]
+
+ def triggerCumulativeCompaction = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host,
be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ",
err=" + err_1)
+ assertEquals(code_1, 0)
+ return out_1
+ }
+
+ def triggerBaseCompaction = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ def (code_1, out_1, err_1) = be_run_base_compaction(be_host,
be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ",
err=" + err_1)
+ assertEquals(code_1, 0)
+ return out_1
+ }
+
+ def getTabletStatus = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/show?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get tablet status: =" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def tabletStatus = parseJson(out.trim())
+ return tabletStatus
+ }
+
+ def waitForCompaction = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ def running = true
+ do {
+ Thread.sleep(20000)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/run_status?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get compaction status: code=" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+
+ docker(options) {
+ def fes = sql_return_maparray "show frontends"
+ logger.info("frontends: ${fes}")
+ def url = "jdbc:mysql://${fes[0].Host}:${fes[0].QueryPort}/"
+ logger.info("url: " + url)
+
+ def result = sql 'SELECT DATABASE()'
+
+ sql """ DROP TABLE IF EXISTS ${testTable} force;"""
+
+ // ================= case1: input_rowsets_hit_cache_ratio=0.703611,
_input_rowsets_cached_size=79656542, _input_rowsets_total_size=113210974
=================
+ // real_ratio > file_cache_keep_base_compaction_output_min_hit_ratio =
0.7, it will write file cache
+ sql """
+ CREATE TABLE IF NOT EXISTS ${testTable} (
+ ss_sold_date_sk bigint,
+ ss_sold_time_sk bigint,
+ ss_item_sk bigint,
+ ss_customer_sk bigint,
+ ss_cdemo_sk bigint,
+ ss_hdemo_sk bigint,
+ ss_addr_sk bigint,
+ ss_store_sk bigint,
+ ss_promo_sk bigint,
+ ss_ticket_number bigint,
+ ss_quantity integer,
+ ss_wholesale_cost decimal(7,2),
+ ss_list_price decimal(7,2),
+ ss_sales_price decimal(7,2),
+ ss_ext_discount_amt decimal(7,2),
+ ss_ext_sales_price decimal(7,2),
+ ss_ext_wholesale_cost decimal(7,2),
+ ss_ext_list_price decimal(7,2),
+ ss_ext_tax decimal(7,2),
+ ss_coupon_amt decimal(7,2),
+ ss_net_paid decimal(7,2),
+ ss_net_paid_inc_tax decimal(7,2),
+ ss_net_profit decimal(7,2)
+ )
+ DUPLICATE KEY(ss_sold_date_sk, ss_sold_time_sk, ss_item_sk,
ss_customer_sk)
+ DISTRIBUTED BY HASH(ss_customer_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
+ )
+
+ """
+
+ getBackendIpHttpAndBrpcPort(backendId_to_backendIP,
backendId_to_backendHttpPort, backendId_to_backendBrpcPort);
+
+ def tablets = sql_return_maparray """ show tablets from ${testTable};
"""
+ logger.info("tablets: " + tablets)
+ assertEquals(1, tablets.size())
+ def tablet = tablets[0]
+ String tablet_id = tablet.TabletId
+
+ streamLoad {
+ table testTable
+
+ // default label is UUID:
+ // set 'label' UUID.randomUUID().toString()
+
+ // default column_separator is specify in doris fe config, usually
is '\t'.
+ // this line change to ','
+ set 'column_separator', '|'
+ set 'compress_type', 'GZ'
+
+ file """${getS3Url()}/regression/tpcds/sf1/store_sales.dat.gz"""
+
+ time 10000 // limit inflight 10s
+ check { res, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${res}".toString())
+ def json = parseJson(res)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+
+ sql """ sync """
+ sql "select * from ${testTable}"
+ def tablet_status = getTabletStatus(tablet)
+ logger.info("tablet status: ${tablet_status}")
+
+ triggerCumulativeCompaction(tablet)
+ waitForCompaction(tablet)
+ triggerCumulativeCompaction(tablet)
+ waitForCompaction(tablet)
+ triggerBaseCompaction(tablet)
+ waitForCompaction(tablet)
+
+ tablet_status = getTabletStatus(tablet)
+ logger.info("tablet status: ${tablet_status}")
+ def base_compaction_finished = false
+ Set<String> final_rowsets = new HashSet<>();
+ for (int i = 0; i < 60; i++) {
+ tablet_status = getTabletStatus(tablet)
+ if (tablet_status["rowsets"].size() == 2) {
+ base_compaction_finished = true
+ final_rowsets.addAll(tablet_status["rowsets"])
+ break
+ }
+ sleep(10000)
+ }
+ assertTrue(base_compaction_finished)
+
+ def be_host = backendId_to_backendIP[tablet.BackendId]
+ def be_http_port = backendId_to_backendHttpPort[tablet.BackendId]
+
+ for (int i = 0; i < final_rowsets.size(); i++) {
+ def rowsetStr = final_rowsets[i]
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ def rowset_id = rowsetStr.split(" ")[4]
+ if (start_version == 0) {
+ continue
+ }
+
+ logger.info("final rowset ${i}, start: ${start_version}, end:
${end_version}, id: ${rowset_id}")
+ def data =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat",
true)
+ logger.info("file cache data: ${data}")
+ assertTrue(data.size() > 0)
+ def segments = data.stream()
+ .filter(item -> !item.endsWith("_idx"))
+ .count();
+ logger.info("segments: ${segments}")
+ assertTrue(segments > 0)
+ }
+
+ def (code_0, out_0, err_0) = curl("GET",
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_size")
+ logger.info("base_compaction_input_size: ${out_0}")
+ def size = out_0.trim().split(":")[1].trim().toInteger()
+ assertTrue(size > 100 * 1024 * 1024)
+
+ (code_0, out_0, err_0) = curl("GET",
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_cached_size")
+ logger.info("base_compaction_input_cached_size: ${out_0}")
+ size = out_0.trim().split(":")[1].trim().toInteger()
+ assertTrue(size > 70 * 1024 * 1024)
+
+ // =================case2: input_rowsets_hit_cache_ratio=0.703524,
total_cached_size=62914560 (60MB), total_size=113177376 =================
+ // real_ratio < file_cache_keep_base_compaction_output_min_hit_ratio
= 0.8, so not write file cache
+ result =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=clear&sync=true",
true)
+ logger.info("clear file cache data: ${result}")
+
+ for (int i = 0; i < final_rowsets.size(); i++) {
+ def rowsetStr = final_rowsets[i]
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ def rowset_id = rowsetStr.split(" ")[4]
+ if (start_version == 0) {
+ continue
+ }
+
+ logger.info("final rowset ${i}, start: ${start_version}, end:
${end_version}, id: ${rowset_id}")
+ def data =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat",
true)
+ logger.info("file cache data: ${data}")
+ assertTrue(data.size() == 0)
+ }
+
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", be_host, be_http_port,
"file_cache_keep_base_compaction_output_min_hit_ratio", 0.8))
+ assertTrue(out.contains("OK"))
+ sql """ DROP TABLE IF EXISTS ${testTable} force;"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${testTable} (
+ ss_sold_date_sk bigint,
+ ss_sold_time_sk bigint,
+ ss_item_sk bigint,
+ ss_customer_sk bigint,
+ ss_cdemo_sk bigint,
+ ss_hdemo_sk bigint,
+ ss_addr_sk bigint,
+ ss_store_sk bigint,
+ ss_promo_sk bigint,
+ ss_ticket_number bigint,
+ ss_quantity integer,
+ ss_wholesale_cost decimal(7,2),
+ ss_list_price decimal(7,2),
+ ss_sales_price decimal(7,2),
+ ss_ext_discount_amt decimal(7,2),
+ ss_ext_sales_price decimal(7,2),
+ ss_ext_wholesale_cost decimal(7,2),
+ ss_ext_list_price decimal(7,2),
+ ss_ext_tax decimal(7,2),
+ ss_coupon_amt decimal(7,2),
+ ss_net_paid decimal(7,2),
+ ss_net_paid_inc_tax decimal(7,2),
+ ss_net_profit decimal(7,2)
+ )
+ DUPLICATE KEY(ss_sold_date_sk, ss_sold_time_sk, ss_item_sk,
ss_customer_sk)
+ DISTRIBUTED BY HASH(ss_customer_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
+ )
+ """
+
+ tablets = sql_return_maparray """ show tablets from ${testTable}; """
+ logger.info("tablets: " + tablets)
+ assertEquals(1, tablets.size())
+ tablet = tablets[0]
+ tablet_id = tablet.TabletId
+
+ streamLoad {
+ table testTable
+
+ // default label is UUID:
+ // set 'label' UUID.randomUUID().toString()
+
+ // default column_separator is specify in doris fe config, usually
is '\t'.
+ // this line change to ','
+ set 'column_separator', '|'
+ set 'compress_type', 'GZ'
+
+ file """${getS3Url()}/regression/tpcds/sf1/store_sales.dat.gz"""
+
+ time 10000 // limit inflight 10s
+ check { res, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${res}".toString())
+ def json = parseJson(res)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+
+ sql """ sync """
+ sql "select * from ${testTable}"
+ tablet_status = getTabletStatus(tablet)
+ logger.info("tablet status: ${tablet_status}")
+
+ triggerCumulativeCompaction(tablet)
+ waitForCompaction(tablet)
+ triggerCumulativeCompaction(tablet)
+ waitForCompaction(tablet)
+ triggerBaseCompaction(tablet)
+ waitForCompaction(tablet)
+
+ tablet_status = getTabletStatus(tablet)
+ logger.info("tablet status: ${tablet_status}")
+ base_compaction_finished = false
+ final_rowsets = new HashSet<>();
+ for (int i = 0; i < 60; i++) {
+ tablet_status = getTabletStatus(tablet)
+ if (tablet_status["rowsets"].size() == 2) {
+ base_compaction_finished = true
+ final_rowsets.addAll(tablet_status["rowsets"])
+ break
+ }
+ sleep(10000)
+ }
+ assertTrue(base_compaction_finished)
+
+ be_host = backendId_to_backendIP[tablet.BackendId]
+ be_http_port = backendId_to_backendHttpPort[tablet.BackendId]
+
+ for (int i = 0; i < final_rowsets.size(); i++) {
+ def rowsetStr = final_rowsets[i]
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ def rowset_id = rowsetStr.split(" ")[4]
+ if (start_version == 0) {
+ continue
+ }
+
+ logger.info("final rowset ${i}, start: ${start_version}, end:
${end_version}, id: ${rowset_id}")
+ def data =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat",
true)
+ logger.info("file cache data: ${data}")
+
+ def segments = data.stream()
+ .filter(item -> !item.endsWith("_idx"))
+ .count();
+ logger.info("segments: ${segments}")
+ assertTrue(segments == 0)
+ }
+
+ (code_0, out_0, err_0) = curl("GET",
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_size")
+ logger.info("base_compaction_input_size: ${out_0}")
+ size = out_0.trim().split(":")[1].trim().toInteger()
+ assertTrue(size > 2 * 100 * 1024 * 1024)
+
+ (code_0, out_0, err_0) = curl("GET",
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_cached_size")
+ logger.info("base_compaction_input_cached_size: ${out_0}")
+ size = out_0.trim().split(":")[1].trim().toInteger()
+ assertTrue(size > 70 * 1024 * 1024)
+
+ // =================case3:
file_cache_keep_base_compaction_output_min_hit_ratio = 0 =================
+ // real_ratio > file_cache_keep_base_compaction_output_min_hit_ratio
= 0, it will write file cache
+ result =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=clear&sync=true",
true)
+ logger.info("clear file cache data: ${result}")
+
+ for (int i = 0; i < final_rowsets.size(); i++) {
+ def rowsetStr = final_rowsets[i]
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ def rowset_id = rowsetStr.split(" ")[4]
+ if (start_version == 0) {
+ continue
+ }
+
+ logger.info("final rowset ${i}, start: ${start_version}, end:
${end_version}, id: ${rowset_id}")
+ def data =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat",
true)
+ logger.info("file cache data: ${data}")
+ assertTrue(data.size() == 0)
+ }
+
+ (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", be_host, be_http_port,
"file_cache_keep_base_compaction_output_min_hit_ratio", 0))
+ assertTrue(out.contains("OK"))
+ sql """ DROP TABLE IF EXISTS ${testTable} force;"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${testTable} (
+ ss_sold_date_sk bigint,
+ ss_sold_time_sk bigint,
+ ss_item_sk bigint,
+ ss_customer_sk bigint,
+ ss_cdemo_sk bigint,
+ ss_hdemo_sk bigint,
+ ss_addr_sk bigint,
+ ss_store_sk bigint,
+ ss_promo_sk bigint,
+ ss_ticket_number bigint,
+ ss_quantity integer,
+ ss_wholesale_cost decimal(7,2),
+ ss_list_price decimal(7,2),
+ ss_sales_price decimal(7,2),
+ ss_ext_discount_amt decimal(7,2),
+ ss_ext_sales_price decimal(7,2),
+ ss_ext_wholesale_cost decimal(7,2),
+ ss_ext_list_price decimal(7,2),
+ ss_ext_tax decimal(7,2),
+ ss_coupon_amt decimal(7,2),
+ ss_net_paid decimal(7,2),
+ ss_net_paid_inc_tax decimal(7,2),
+ ss_net_profit decimal(7,2)
+ )
+ DUPLICATE KEY(ss_sold_date_sk, ss_sold_time_sk, ss_item_sk,
ss_customer_sk)
+ DISTRIBUTED BY HASH(ss_customer_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
+ )
+ """
+
+ tablets = sql_return_maparray """ show tablets from ${testTable}; """
+ logger.info("tablets: " + tablets)
+ assertEquals(1, tablets.size())
+ tablet = tablets[0]
+ tablet_id = tablet.TabletId
+
+ streamLoad {
+ table testTable
+
+ // default label is UUID:
+ // set 'label' UUID.randomUUID().toString()
+
+ // default column_separator is specify in doris fe config, usually
is '\t'.
+ // this line change to ','
+ set 'column_separator', '|'
+ set 'compress_type', 'GZ'
+
+ file """${getS3Url()}/regression/tpcds/sf1/store_sales.dat.gz"""
+
+ time 10000 // limit inflight 10s
+ check { res, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${res}".toString())
+ def json = parseJson(res)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+
+ sql """ sync """
+ sql "select * from ${testTable}"
+ tablet_status = getTabletStatus(tablet)
+ logger.info("tablet status: ${tablet_status}")
+
+ triggerCumulativeCompaction(tablet)
+ waitForCompaction(tablet)
+ triggerCumulativeCompaction(tablet)
+ waitForCompaction(tablet)
+ triggerBaseCompaction(tablet)
+ waitForCompaction(tablet)
+
+ tablet_status = getTabletStatus(tablet)
+ logger.info("tablet status: ${tablet_status}")
+ base_compaction_finished = false
+ final_rowsets = new HashSet<>();
+ for (int i = 0; i < 60; i++) {
+ tablet_status = getTabletStatus(tablet)
+ if (tablet_status["rowsets"].size() == 2) {
+ base_compaction_finished = true
+ final_rowsets.addAll(tablet_status["rowsets"])
+ break
+ }
+ sleep(10000)
+ }
+ assertTrue(base_compaction_finished)
+
+ be_host = backendId_to_backendIP[tablet.BackendId]
+ be_http_port = backendId_to_backendHttpPort[tablet.BackendId]
+
+ for (int i = 0; i < final_rowsets.size(); i++) {
+ def rowsetStr = final_rowsets[i]
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ def rowset_id = rowsetStr.split(" ")[4]
+ if (start_version == 0) {
+ continue
+ }
+
+ logger.info("final rowset ${i}, start: ${start_version}, end:
${end_version}, id: ${rowset_id}")
+ def data =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat",
true)
+ logger.info("file cache data: ${data}")
+
+ def segments = data.stream()
+ .filter(item -> !item.endsWith("_idx"))
+ .count();
+ logger.info("segments: ${segments}")
+ assertTrue(segments > 0)
+ assertTrue(data.size() > 0)
+ }
+
+ (code_0, out_0, err_0) = curl("GET",
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_size")
+ logger.info("base_compaction_input_size: ${out_0}")
+ size = out_0.trim().split(":")[1].trim().toInteger()
+ assertTrue(size > 3 * 100 * 1024 * 1024)
+
+ (code_0, out_0, err_0) = curl("GET",
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_cached_size")
+ logger.info("base_compaction_input_cached_size: ${out_0}")
+ size = out_0.trim().split(":")[1].trim().toInteger()
+ assertTrue(size > 70 * 1024 * 1024)
+
+ // =================case4:
file_cache_keep_base_compaction_output_min_hit_ratio = 1 =================
+ // real_ratio < file_cache_keep_base_compaction_output_min_hit_ratio
= 1, it will write not file cache
+ result =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=clear&sync=true",
true)
+ logger.info("clear file cache data: ${result}")
+
+ for (int i = 0; i < final_rowsets.size(); i++) {
+ def rowsetStr = final_rowsets[i]
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ def rowset_id = rowsetStr.split(" ")[4]
+ if (start_version == 0) {
+ continue
+ }
+
+ logger.info("final rowset ${i}, start: ${start_version}, end:
${end_version}, id: ${rowset_id}")
+ def data =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat",
true)
+ logger.info("file cache data: ${data}")
+ assertTrue(data.size() == 0)
+ }
+
+ (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", be_host, be_http_port,
"file_cache_keep_base_compaction_output_min_hit_ratio", 1))
+ assertTrue(out.contains("OK"))
+ sql """ DROP TABLE IF EXISTS ${testTable} force;"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${testTable} (
+ ss_sold_date_sk bigint,
+ ss_sold_time_sk bigint,
+ ss_item_sk bigint,
+ ss_customer_sk bigint,
+ ss_cdemo_sk bigint,
+ ss_hdemo_sk bigint,
+ ss_addr_sk bigint,
+ ss_store_sk bigint,
+ ss_promo_sk bigint,
+ ss_ticket_number bigint,
+ ss_quantity integer,
+ ss_wholesale_cost decimal(7,2),
+ ss_list_price decimal(7,2),
+ ss_sales_price decimal(7,2),
+ ss_ext_discount_amt decimal(7,2),
+ ss_ext_sales_price decimal(7,2),
+ ss_ext_wholesale_cost decimal(7,2),
+ ss_ext_list_price decimal(7,2),
+ ss_ext_tax decimal(7,2),
+ ss_coupon_amt decimal(7,2),
+ ss_net_paid decimal(7,2),
+ ss_net_paid_inc_tax decimal(7,2),
+ ss_net_profit decimal(7,2)
+ )
+ DUPLICATE KEY(ss_sold_date_sk, ss_sold_time_sk, ss_item_sk,
ss_customer_sk)
+ DISTRIBUTED BY HASH(ss_customer_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
+ )
+ """
+
+ tablets = sql_return_maparray """ show tablets from ${testTable}; """
+ logger.info("tablets: " + tablets)
+ assertEquals(1, tablets.size())
+ tablet = tablets[0]
+ tablet_id = tablet.TabletId
+
+ streamLoad {
+ table testTable
+
+ // default label is UUID:
+ // set 'label' UUID.randomUUID().toString()
+
+ // default column_separator is specify in doris fe config, usually
is '\t'.
+ // this line change to ','
+ set 'column_separator', '|'
+ set 'compress_type', 'GZ'
+
+ file """${getS3Url()}/regression/tpcds/sf1/store_sales.dat.gz"""
+
+ time 10000 // limit inflight 10s
+ check { res, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${res}".toString())
+ def json = parseJson(res)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+ sql """ delete from ${testTable} where ss_item_sk=2; """
+
+ sql """ sync """
+
+ sql "select * from ${testTable}"
+ tablet_status = getTabletStatus(tablet)
+ logger.info("tablet status: ${tablet_status}")
+
+ triggerCumulativeCompaction(tablet)
+ waitForCompaction(tablet)
+ triggerCumulativeCompaction(tablet)
+ waitForCompaction(tablet)
+
+ sql "select * from ${testTable}"
+ tablet_status = getTabletStatus(tablet)
+ logger.info("tablet status: ${tablet_status}")
+
+ triggerBaseCompaction(tablet)
+ waitForCompaction(tablet)
+
+ tablet_status = getTabletStatus(tablet)
+ logger.info("tablet status: ${tablet_status}")
+ base_compaction_finished = false
+ final_rowsets = new HashSet<>();
+ for (int i = 0; i < 60; i++) {
+ tablet_status = getTabletStatus(tablet)
+ if (tablet_status["rowsets"].size() == 2) {
+ base_compaction_finished = true
+ final_rowsets.addAll(tablet_status["rowsets"])
+ break
+ }
+ sleep(10000)
+ }
+ assertTrue(base_compaction_finished)
+
+ be_host = backendId_to_backendIP[tablet.BackendId]
+ be_http_port = backendId_to_backendHttpPort[tablet.BackendId]
+
+ for (int i = 0; i < final_rowsets.size(); i++) {
+ def rowsetStr = final_rowsets[i]
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ def rowset_id = rowsetStr.split(" ")[4]
+ if (start_version == 0) {
+ continue
+ }
+
+ logger.info("final rowset ${i}, start: ${start_version}, end:
${end_version}, id: ${rowset_id}")
+ def data =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat",
true)
+ logger.info("file cache data: ${data}")
+
+ def segments = data.stream()
+ .filter(item -> !item.endsWith("_idx"))
+ .count();
+ logger.info("segments: ${segments}")
+ assertTrue(segments == 0)
+ }
+
+ (code_0, out_0, err_0) = curl("GET",
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_size")
+ logger.info("base_compaction_input_size: ${out_0}")
+ size = out_0.trim().split(":")[1].trim().toInteger()
+ assertTrue(size > 4 * 100 * 1024 * 1024)
+
+ (code_0, out_0, err_0) = curl("GET",
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/base_compaction_input_cached_size")
+ logger.info("base_compaction_input_cached_size: ${out_0}")
+ size = out_0.trim().split(":")[1].trim().toInteger()
+ assertTrue(size > 70 * 1024 * 1024)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]