This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new d27f052a745 branch-4.0: [fix][enhancement](filecache) filecache query
limit feature bugfix and enhancement #55772 (#60224)
d27f052a745 is described below
commit d27f052a74504d46308ee93c927d17959ea25a1d
Author: Wen Zhenghu <[email protected]>
AuthorDate: Tue Jan 27 08:03:31 2026 +0800
branch-4.0: [fix][enhancement](filecache) filecache query limit feature
bugfix and enhancement #55772 (#60224)
pick https://github.com/apache/doris/pull/55772
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [x] Regression test
- [x] Unit Test
- [x] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [x] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---------
Co-authored-by: xuchenhao <[email protected]>
---
be/src/common/config.cpp | 2 +-
be/src/common/config.h | 2 +-
be/src/io/cache/block_file_cache.cpp | 19 +-
be/src/io/cache/block_file_cache.h | 6 +-
be/src/io/cache/block_file_cache_factory.cpp | 6 +-
be/src/io/cache/block_file_cache_factory.h | 2 +-
be/src/runtime/query_context.cpp | 12 +
be/src/runtime/query_context.h | 3 +
be/test/io/cache/block_file_cache_test.cpp | 17 +-
.../main/java/org/apache/doris/common/Config.java | 7 +
.../java/org/apache/doris/qe/SessionVariable.java | 26 ++
gensrc/thrift/PaloInternalService.thrift | 1 +
.../cache/test_file_cache_features.groovy | 105 +++---
.../cache/test_file_cache_query_limit.groovy | 395 +++++++++++++++++++++
.../test_file_cache_query_limit_config.groovy | 123 +++++++
.../cache/test_file_cache_statistics.groovy | 310 ++++++++--------
.../cache/test_hive_warmup_select.groovy | 2 +-
17 files changed, 828 insertions(+), 210 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ed0f411d780..b86b60ed14e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1118,7 +1118,7 @@ DEFINE_String(file_cache_path,
"[{\"path\":\"${DORIS_HOME}/file_cache\"}]");
DEFINE_Int64(file_cache_each_block_size, "1048576"); // 1MB
DEFINE_Bool(clear_file_cache, "false");
-DEFINE_Bool(enable_file_cache_query_limit, "false");
+DEFINE_mBool(enable_file_cache_query_limit, "false");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "88");
DEFINE_mBool(enable_evict_file_cache_in_advance, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 194d41e51e1..47623a2b1a4 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1158,7 +1158,7 @@ DECLARE_Bool(enable_file_cache);
DECLARE_String(file_cache_path);
DECLARE_Int64(file_cache_each_block_size);
DECLARE_Bool(clear_file_cache);
-DECLARE_Bool(enable_file_cache_query_limit);
+DECLARE_mBool(enable_file_cache_query_limit);
DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
DECLARE_mBool(enable_evict_file_cache_in_advance);
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index ae93eefd427..bd55a2b76d0 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -139,8 +139,7 @@ BlockFileCache::BlockFileCache(const std::string&
cache_base_path,
const FileCacheSettings& cache_settings)
: _cache_base_path(cache_base_path),
_capacity(cache_settings.capacity),
- _max_file_block_size(cache_settings.max_file_block_size),
- _max_query_cache_size(cache_settings.max_query_cache_size) {
+ _max_file_block_size(cache_settings.max_file_block_size) {
_cur_cache_size_metrics =
std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
"file_cache_cache_size", 0);
_cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
@@ -379,7 +378,7 @@ UInt128Wrapper BlockFileCache::hash(const std::string&
path) {
}
BlockFileCache::QueryFileCacheContextHolderPtr
BlockFileCache::get_query_context_holder(
- const TUniqueId& query_id) {
+ const TUniqueId& query_id, int file_cache_query_limit_percent) {
SCOPED_CACHE_LOCK(_mutex, this);
if (!config::enable_file_cache_query_limit) {
return {};
@@ -387,7 +386,7 @@ BlockFileCache::QueryFileCacheContextHolderPtr
BlockFileCache::get_query_context
/// if enable_filesystem_query_cache_limit is true,
/// we create context query for current query.
- auto context = get_or_set_query_context(query_id, cache_lock);
+ auto context = get_or_set_query_context(query_id, cache_lock,
file_cache_query_limit_percent);
return std::make_unique<QueryFileCacheContextHolder>(query_id, this,
context);
}
@@ -407,7 +406,8 @@ void BlockFileCache::remove_query_context(const TUniqueId&
query_id) {
}
BlockFileCache::QueryFileCacheContextPtr
BlockFileCache::get_or_set_query_context(
- const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
+ const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
+ int file_cache_query_limit_percent) {
if (query_id.lo == 0 && query_id.hi == 0) {
return nullptr;
}
@@ -417,7 +417,14 @@ BlockFileCache::QueryFileCacheContextPtr
BlockFileCache::get_or_set_query_contex
return context;
}
- auto query_context =
std::make_shared<QueryFileCacheContext>(_max_query_cache_size);
+ size_t file_cache_query_limit_size = _capacity *
file_cache_query_limit_percent / 100;
+ if (file_cache_query_limit_size < 268435456) {
+ LOG(WARNING) << "The user-set file cache query limit (" <<
file_cache_query_limit_size
+ << " bytes) is less than the 256MB recommended minimum. "
+ << "Consider increasing the session variable
'file_cache_query_limit_percent'"
+ << " from its current value " <<
file_cache_query_limit_percent << "%.";
+ }
+ auto query_context =
std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
auto query_iter = _query_map.emplace(query_id, query_context).first;
return query_iter->second;
}
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index 7527186c749..e284aed5c93 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -345,7 +345,8 @@ public:
void remove_query_context(const TUniqueId& query_id);
QueryFileCacheContextPtr get_or_set_query_context(const TUniqueId&
query_id,
-
std::lock_guard<std::mutex>&);
+
std::lock_guard<std::mutex>& cache_lock,
+ int
file_cache_query_limit_percent);
/// Save a query context information, and adopt different cache policies
/// for different queries through the context cache layer.
@@ -371,7 +372,8 @@ public:
QueryFileCacheContextPtr context;
};
using QueryFileCacheContextHolderPtr =
std::unique_ptr<QueryFileCacheContextHolder>;
- QueryFileCacheContextHolderPtr get_query_context_holder(const TUniqueId&
query_id);
+ QueryFileCacheContextHolderPtr get_query_context_holder(const TUniqueId&
query_id,
+ int
file_cache_query_limit_percent);
int64_t approximate_available_cache_size() const {
return std::max<int64_t>(
diff --git a/be/src/io/cache/block_file_cache_factory.cpp
b/be/src/io/cache/block_file_cache_factory.cpp
index f99a9112d09..15b7880a718 100644
--- a/be/src/io/cache/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block_file_cache_factory.cpp
@@ -196,10 +196,12 @@ BlockFileCache* FileCacheFactory::get_by_path(const
std::string& cache_base_path
}
std::vector<BlockFileCache::QueryFileCacheContextHolderPtr>
-FileCacheFactory::get_query_context_holders(const TUniqueId& query_id) {
+FileCacheFactory::get_query_context_holders(const TUniqueId& query_id,
+ int
file_cache_query_limit_percent) {
std::vector<BlockFileCache::QueryFileCacheContextHolderPtr> holders;
for (const auto& cache : _caches) {
- holders.push_back(cache->get_query_context_holder(query_id));
+ holders.push_back(
+ cache->get_query_context_holder(query_id,
file_cache_query_limit_percent));
}
return holders;
}
diff --git a/be/src/io/cache/block_file_cache_factory.h
b/be/src/io/cache/block_file_cache_factory.h
index 837feac7f68..a21b62d973f 100644
--- a/be/src/io/cache/block_file_cache_factory.h
+++ b/be/src/io/cache/block_file_cache_factory.h
@@ -74,7 +74,7 @@ public:
BlockFileCache* get_by_path(const UInt128Wrapper& hash);
BlockFileCache* get_by_path(const std::string& cache_base_path);
std::vector<BlockFileCache::QueryFileCacheContextHolderPtr>
get_query_context_holders(
- const TUniqueId& query_id);
+ const TUniqueId& query_id, int file_cache_query_limit_percent);
/**
* Clears data of all file cache instances
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index a5755aa275d..424085ea1ac 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -110,6 +110,18 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv*
exec_env,
_timeout_second = query_options.execution_timeout;
+ bool initialize_context_holder =
+ config::enable_file_cache && config::enable_file_cache_query_limit
&&
+ query_options.__isset.enable_file_cache &&
query_options.enable_file_cache &&
+ query_options.__isset.file_cache_query_limit_percent &&
+ query_options.file_cache_query_limit_percent < 100;
+
+ // Init query context holders for file cache, if enable query limit feature
+ if (initialize_context_holder) {
+ _query_context_holders =
io::FileCacheFactory::instance()->get_query_context_holders(
+ _query_id, query_options.file_cache_query_limit_percent);
+ }
+
bool is_query_type_valid = query_options.query_type == TQueryType::SELECT
||
query_options.query_type == TQueryType::LOAD ||
query_options.query_type ==
TQueryType::EXTERNAL;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 30cc8fd6016..643925ad2d7 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -368,6 +368,9 @@ private:
std::string _load_error_url;
std::string _first_error_msg;
+ // file cache context holders
+ std::vector<io::BlockFileCache::QueryFileCacheContextHolderPtr>
_query_context_holders;
+
public:
// when fragment of pipeline is closed, it will register its profile to
this map by using add_fragment_profile
void add_fragment_profile(
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index 5c159126ecf..a34b29b0050 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -969,6 +969,7 @@ TEST_F(BlockFileCacheTest, init) {
)");
cache_paths.clear();
EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths));
+ config::enable_file_cache_query_limit = false;
}
TEST_F(BlockFileCacheTest, normal) {
@@ -1246,7 +1247,7 @@ TEST_F(BlockFileCacheTest,
query_limit_heap_use_after_free) {
query_id.hi = 1;
query_id.lo = 1;
context.query_id = query_id;
- auto query_context_holder = cache.get_query_context_holder(query_id);
+ auto query_context_holder = cache.get_query_context_holder(query_id, 100);
{
auto holder = cache.get_or_set(key, 9, 1, context); /// Add range [9,
9]
auto blocks = fromHolder(holder);
@@ -1333,7 +1334,7 @@ TEST_F(BlockFileCacheTest, query_limit_dcheck) {
query_id.hi = 1;
query_id.lo = 1;
context.query_id = query_id;
- auto query_context_holder = cache.get_query_context_holder(query_id);
+ auto query_context_holder = cache.get_query_context_holder(query_id, 100);
{
auto holder = cache.get_or_set(key, 9, 1, context); /// Add range [9,
9]
auto blocks = fromHolder(holder);
@@ -3374,7 +3375,7 @@ TEST_F(BlockFileCacheTest, test_query_limit) {
}
ASSERT_LT(i, 1000);
auto query_context_holder =
-
FileCacheFactory::instance()->get_query_context_holders(query_id);
+
FileCacheFactory::instance()->get_query_context_holders(query_id, 50);
for (int64_t offset = 0; offset < 60; offset += 5) {
auto holder = cache->get_or_set(key, offset, 5, context);
auto blocks = fromHolder(holder);
@@ -3555,7 +3556,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
- EXPECT_EQ(cache.get_query_context_holder(id), nullptr);
+ EXPECT_EQ(cache.get_query_context_holder(id, 50), nullptr);
}
config::enable_file_cache_query_limit = true;
io::BlockFileCache cache(cache_base_path, settings);
@@ -3567,9 +3568,9 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
id.hi = id.lo = 0;
- EXPECT_EQ(cache.get_query_context_holder(id)->context, nullptr);
+ EXPECT_EQ(cache.get_query_context_holder(id, 50)->context, nullptr);
id.hi = id.lo = 1;
- auto query_ctx_1 = cache.get_query_context_holder(id);
+ auto query_ctx_1 = cache.get_query_context_holder(id, 50);
ASSERT_NE(query_ctx_1, nullptr);
for (int64_t offset = 0; offset < 60; offset += 5) {
auto holder = cache.get_or_set(key, offset, 5, context);
@@ -3582,7 +3583,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 4),
io::FileBlock::State::DOWNLOADED);
}
- auto query_ctx_2 = cache.get_query_context_holder(id);
+ auto query_ctx_2 = cache.get_query_context_holder(id, 50);
EXPECT_EQ(query_ctx_1->query_id, query_ctx_2->query_id);
std::lock_guard lock(cache._mutex);
EXPECT_EQ(query_ctx_1->context->get_cache_size(lock),
@@ -3625,7 +3626,7 @@ TEST_F(BlockFileCacheTest, query_file_cache_reserve) {
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
- auto query_ctx_1 = cache.get_query_context_holder(id);
+ auto query_ctx_1 = cache.get_query_context_holder(id, 50);
ASSERT_NE(query_ctx_1, nullptr);
{
auto holder = cache.get_or_set(key, 0, 5, context);
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index ea3301cabbc..1acbf5d5fab 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3736,6 +3736,13 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static String aws_credentials_provider_version = "v2";
+ @ConfField(mutable = true, description = {
+ "用户的单个查询能使用的 FILE_CACHE 比例的上限(取值范围 1 到 100),100表示能够使用全量
FILE_CACHE",
+ "The upper limit of FILE_CACHE percent that a single query of a
user can use, (range: 1 to 100).",
+ "100 indicate that the full FILE_CACHE capacity can be used. "
+ })
+ public static int file_cache_query_limit_max_percent = 100;
+
@ConfField(description = {
"AWS SDK 用于调度异步重试、超时任务以及其他后台操作的线程池大小,全局共享",
"The thread pool size used by the AWS SDK to schedule asynchronous
retries, timeout tasks, "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index e9378840101..b2bc1447808 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -473,6 +473,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String DISABLE_FILE_CACHE = "disable_file_cache";
+ public static final String FILE_CACHE_QUERY_LIMIT_PERCENT =
"file_cache_query_limit_percent";
+
public static final String FILE_CACHE_BASE_PATH = "file_cache_base_path";
public static final String ENABLE_INVERTED_INDEX_QUERY =
"enable_inverted_index_query";
@@ -2692,6 +2694,23 @@ public class SessionVariable implements Serializable,
Writable {
"Make the READ_SLICE_SIZE variable configurable to reduce the
impact caused by read amplification."})
public int mergeReadSliceSizeBytes = 8388608;
+ @VariableMgr.VarAttr(name = FILE_CACHE_QUERY_LIMIT_PERCENT, needForward =
true,
+ checker = "checkFileCacheQueryLimitPercent",
+ description = {"限制用户的单个查询能使用的 FILE_CACHE 比例 "
+ + "(用户设置,取值范围 1 到
Config.file_cache_query_limit_max_percent)。",
+ "Limit the FILE_CACHE percent that a single query of a
user can use "
+ + "(set by user via session variables, range: 1 to
Config.file_cache_query_limit_max_percent)."})
+ public int fileCacheQueryLimitPercent = -1;
+
+ public void checkFileCacheQueryLimitPercent(String
fileCacheQueryLimitPercentStr) {
+ int fileCacheQueryLimitPct =
Integer.valueOf(fileCacheQueryLimitPercentStr);
+ if (fileCacheQueryLimitPct < 1 || fileCacheQueryLimitPct >
Config.file_cache_query_limit_max_percent) {
+ throw new InvalidParameterException(
+ String.format("file_cache_query_limit_percent should be
between 1 and %d",
+ Config.file_cache_query_limit_max_percent));
+ }
+ }
+
public void setAggPhase(int phase) {
aggPhase = phase;
}
@@ -4920,6 +4939,13 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setMergeReadSliceSize(mergeReadSliceSizeBytes);
tResult.setEnableExtendedRegex(enableExtendedRegex);
+ if (fileCacheQueryLimitPercent > 0) {
+
tResult.setFileCacheQueryLimitPercent(Math.min(fileCacheQueryLimitPercent,
+ Config.file_cache_query_limit_max_percent));
+ } else {
+
tResult.setFileCacheQueryLimitPercent(Config.file_cache_query_limit_max_percent);
+ }
+
// Set Iceberg write target file size
tResult.setIcebergWriteTargetFileSizeBytes(icebergWriteTargetFileSizeBytes);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 7181431c2b7..b15bfd8bd89 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -427,6 +427,7 @@ struct TQueryOptions {
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
1000: optional bool disable_file_cache = false
+ 1001: optional i32 file_cache_query_limit_percent = -1
}
diff --git
a/regression-test/suites/external_table_p0/cache/test_file_cache_features.groovy
b/regression-test/suites/external_table_p0/cache/test_file_cache_features.groovy
index 5951b9c79c5..a3485f5ff6b 100644
---
a/regression-test/suites/external_table_p0/cache/test_file_cache_features.groovy
+++
b/regression-test/suites/external_table_p0/cache/test_file_cache_features.groovy
@@ -121,7 +121,17 @@ suite("test_file_cache_features",
"external_docker,hive,external_docker_hive,p0,
assertTrue(false, INITIAL_VALUES_NOT_ZERO_CHECK_FAILED_MSG +
"disk_resource_limit_mode: ${initialDiskResourceLimitMode},
need_evict_cache_in_advance: ${initialNeedEvictCacheInAdvance}")
}
-
+
+ def fileCacheBackgroundMonitorIntervalMsResult = sql """show backend
config like 'file_cache_background_monitor_interval_ms';"""
+ logger.info("file_cache_background_monitor_interval_ms configuration: " +
fileCacheBackgroundMonitorIntervalMsResult)
+ assertFalse(fileCacheBackgroundMonitorIntervalMsResult.size() == 0 ||
fileCacheBackgroundMonitorIntervalMsResult[0][3] == null ||
+ fileCacheBackgroundMonitorIntervalMsResult[0][3].trim().isEmpty(),
"file_cache_background_monitor_interval_ms is empty or not set to true")
+
+ // brpc metrics will be updated at most 5 seconds
+ def totalWaitTime =
(fileCacheBackgroundMonitorIntervalMsResult[0][3].toInteger() / 1000) as int
+ def interval = 1
+ def iterations = totalWaitTime / interval
+
// Set backend configuration parameters for testing
boolean diskResourceLimitModeTestPassed = true
setBeConfigTemporary([
@@ -131,42 +141,47 @@ suite("test_file_cache_features",
"external_docker,hive,external_docker_hive,p0,
// Execute test logic with modified configuration
logger.info("Backend configuration set -
file_cache_enter_disk_resource_limit_mode_percent: 2, " +
"file_cache_exit_disk_resource_limit_mode_percent: 1")
-
+
// Wait for disk_resource_limit_mode metric to change to 1
try {
- Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until {
- def updatedDiskResourceLimitModeResult = sql """select
METRIC_VALUE from information_schema.file_cache_statistics
- where METRIC_NAME = 'disk_resource_limit_mode' limit 1;"""
- logger.info("Checking disk_resource_limit_mode result: " +
updatedDiskResourceLimitModeResult)
-
- if (updatedDiskResourceLimitModeResult.size() > 0) {
- double updatedDiskResourceLimitMode =
Double.valueOf(updatedDiskResourceLimitModeResult[0][0])
- logger.info("Current disk_resource_limit_mode value:
${updatedDiskResourceLimitMode}")
-
- if (updatedDiskResourceLimitMode == 1.0) {
- logger.info("Disk resource limit mode is now active
(value = 1)")
- return true
- } else {
- logger.info("Disk resource limit mode is not yet
active (value = ${updatedDiskResourceLimitMode}), waiting...")
- return false
- }
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for backend configuration update
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ def updatedDiskResourceLimitModeResult = sql """select
METRIC_VALUE from information_schema.file_cache_statistics
+ where METRIC_NAME = 'disk_resource_limit_mode' limit 1;"""
+ logger.info("Checking disk_resource_limit_mode result: " +
updatedDiskResourceLimitModeResult)
+
+ if (updatedDiskResourceLimitModeResult.size() > 0) {
+ double updatedDiskResourceLimitMode =
Double.valueOf(updatedDiskResourceLimitModeResult[0][0])
+ logger.info("Current disk_resource_limit_mode value:
${updatedDiskResourceLimitMode}")
+
+ if (updatedDiskResourceLimitMode == 1.0) {
+ logger.info("Disk resource limit mode is now active (value
= 1)")
+ return true
} else {
- logger.info("Failed to get disk_resource_limit_mode
metric, waiting...")
+ logger.info("Disk resource limit mode is not yet active
(value = ${updatedDiskResourceLimitMode}), waiting...")
return false
}
+ } else {
+ logger.info("Failed to get disk_resource_limit_mode metric,
waiting...")
+ return false
}
} catch (Exception e) {
logger.info(DISK_RESOURCE_LIMIT_MODE_TEST_FAILED_MSG +
e.getMessage())
diskResourceLimitModeTestPassed = false
}
}
-
+
// Check disk resource limit mode test result
if (!diskResourceLimitModeTestPassed) {
logger.info(DISK_RESOURCE_LIMIT_MODE_TEST_FAILED_MSG)
assertTrue(false, DISK_RESOURCE_LIMIT_MODE_TEST_FAILED_MSG)
}
-
+
// Set backend configuration parameters for need_evict_cache_in_advance
testing
boolean needEvictCacheInAdvanceTestPassed = true
setBeConfigTemporary([
@@ -179,36 +194,41 @@ suite("test_file_cache_features",
"external_docker,hive,external_docker_hive,p0,
"enable_evict_file_cache_in_advance: true, " +
"file_cache_enter_need_evict_cache_in_advance_percent: 2, " +
"file_cache_exit_need_evict_cache_in_advance_percent: 1")
-
+
// Wait for need_evict_cache_in_advance metric to change to 1
try {
- Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until {
- def updatedNeedEvictCacheInAdvanceResult = sql """select
METRIC_VALUE from information_schema.file_cache_statistics
- where METRIC_NAME = 'need_evict_cache_in_advance' limit
1;"""
- logger.info("Checking need_evict_cache_in_advance result: " +
updatedNeedEvictCacheInAdvanceResult)
-
- if (updatedNeedEvictCacheInAdvanceResult.size() > 0) {
- double updatedNeedEvictCacheInAdvance =
Double.valueOf(updatedNeedEvictCacheInAdvanceResult[0][0])
- logger.info("Current need_evict_cache_in_advance value:
${updatedNeedEvictCacheInAdvance}")
-
- if (updatedNeedEvictCacheInAdvance == 1.0) {
- logger.info("Need evict cache in advance mode is now
active (value = 1)")
- return true
- } else {
- logger.info("Need evict cache in advance mode is not
yet active (value = ${updatedNeedEvictCacheInAdvance}), waiting...")
- return false
- }
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for backend configuration update
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ def updatedNeedEvictCacheInAdvanceResult = sql """select
METRIC_VALUE from information_schema.file_cache_statistics
+ where METRIC_NAME = 'need_evict_cache_in_advance' limit 1;"""
+ logger.info("Checking need_evict_cache_in_advance result: " +
updatedNeedEvictCacheInAdvanceResult)
+
+ if (updatedNeedEvictCacheInAdvanceResult.size() > 0) {
+ double updatedNeedEvictCacheInAdvance =
Double.valueOf(updatedNeedEvictCacheInAdvanceResult[0][0])
+ logger.info("Current need_evict_cache_in_advance value:
${updatedNeedEvictCacheInAdvance}")
+
+ if (updatedNeedEvictCacheInAdvance == 1.0) {
+ logger.info("Need evict cache in advance mode is now
active (value = 1)")
+ return true
} else {
- logger.info("Failed to get need_evict_cache_in_advance
metric, waiting...")
+ logger.info("Need evict cache in advance mode is not yet
active (value = ${updatedNeedEvictCacheInAdvance}), waiting...")
return false
}
+ } else {
+ logger.info("Failed to get need_evict_cache_in_advance metric,
waiting...")
+ return false
}
} catch (Exception e) {
logger.info(NEED_EVICT_CACHE_IN_ADVANCE_TEST_FAILED_MSG +
e.getMessage())
needEvictCacheInAdvanceTestPassed = false
- }
+ }
}
-
+
// Check need evict cache in advance test result
if (!needEvictCacheInAdvanceTestPassed) {
logger.info(NEED_EVICT_CACHE_IN_ADVANCE_TEST_FAILED_MSG)
@@ -218,5 +238,4 @@ suite("test_file_cache_features",
"external_docker,hive,external_docker_hive,p0,
sql """set global enable_file_cache=false"""
return true
-}
-
+}
\ No newline at end of file
diff --git
a/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit.groovy
b/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit.groovy
new file mode 100644
index 00000000000..a6ccf96c38c
--- /dev/null
+++
b/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit.groovy
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.dgmimpl.arrays.LongArrayGetAtMetaMethod
+
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
+
+// Constants for backend configuration check
+final String BACKEND_CONFIG_CHECK_FAILED_PREFIX = "Backend configuration check
failed: "
+final String ENABLE_FILE_CACHE_CHECK_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "enable_file_cache is empty or not set to
true"
+final String FILE_CACHE_BACKGROUND_MONITOR_INTERVAL_CHECK_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "file_cache_background_monitor_interval_ms
is empty or not set to true"
+final String FILE_CACHE_PATH_CHECK_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "file_cache_path is empty or not
configured"
+final String WEB_SERVER_PORT_CHECK_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "webserver_port is empty or not configured"
+final String BRPC_PORT_CHECK_FAILED_MSG = BACKEND_CONFIG_CHECK_FAILED_PREFIX +
"brpc_port is empty or not configured"
+final String ENABLE_FILE_CACHE_QUERY_LIMIT_CHECK_FALSE_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "enable_file_cache_query_limit is empty or
not set to false"
+final String ENABLE_FILE_CACHE_QUERY_LIMIT_CHECK_TRUE_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "enable_file_cache_query_limit is empty or
not set to true"
+final String FILE_CACHE_QUERY_LIMIT_BYTES_CHECK_FAILED_MSG =
BACKEND_CONFIG_CHECK_FAILED_PREFIX + "file_cache_query_limit_bytes is empty or
not configured"
+// Constants for cache query features check
+final String FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX = "File cache features
check failed: "
+final String BASE_NORMAL_QUEUE_CURR_SIZE_IS_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "base normal_queue_curr_size is 0"
+final String BASE_NORMAL_QUEUE_CURR_ELEMENTS_IS_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "base normal_queue_curr_elements is 0"
+final String TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "total_read_counts did not increase
after cache operation"
+final String INITIAL_NORMAL_QUEUE_CURR_SIZE_NOT_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "initial normal_queue_curr_size is
not 0"
+final String INITIAL_NORMAL_QUEUE_CURR_ELEMENTS_NOT_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "initial normal_queue_curr_elements
is not 0"
+final String INITIAL_NORMAL_QUEUE_MAX_SIZE_IS_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "initial normal_queue_max_size is 0"
+final String INITIAL_NORMAL_QUEUE_MAX_ELEMENTS_IS_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "initial normal_queue_max_elements is
0"
+final String NORMAL_QUEUE_CURR_SIZE_NOT_GREATER_THAN_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "normal_queue_curr_size is not
greater than 0 after cache operation"
+final String NORMAL_QUEUE_CURR_ELEMENTS_NOT_GREATER_THAN_ZERO_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "normal_queue_curr_elements is not
greater than 0 after cache operation"
+final String NORMAL_QUEUE_CURR_SIZE_GREATER_THAN_QUERY_CACHE_CAPACITY_MSG =
FILE_CACHE_FEATURES_CHECK_FAILED_PREFIX + "normal_queue_curr_size is greater
than query cache capacity"
+
+suite("test_file_cache_query_limit",
"external_docker,hive,external_docker_hive,p0,external,nonConcurrent") {
+ String enableHiveTest = context.config.otherConfigs.get("enableHiveTest")
+ if (enableHiveTest == null || !enableHiveTest.equalsIgnoreCase("true")) {
+ logger.info("disable hive test.")
+ return
+ }
+
+ sql """set enable_file_cache=true"""
+
+ // Check backend configuration prerequisites
+ // Note: This test case assumes a single backend scenario. Testing with
single backend is logically equivalent
+ // to testing with multiple backends having identical configurations, but
simpler in logic.
+ def enableFileCacheResult = sql """show backend config like
'enable_file_cache';"""
+ logger.info("enable_file_cache configuration: " + enableFileCacheResult)
+ assertFalse(enableFileCacheResult.size() == 0 ||
!enableFileCacheResult[0][3].equalsIgnoreCase("true"),
+ ENABLE_FILE_CACHE_CHECK_FAILED_MSG)
+
+ def fileCacheBackgroundMonitorIntervalMsResult = sql """show backend
config like 'file_cache_background_monitor_interval_ms';"""
+ logger.info("file_cache_background_monitor_interval_ms configuration: " +
fileCacheBackgroundMonitorIntervalMsResult)
+ assertFalse(fileCacheBackgroundMonitorIntervalMsResult.size() == 0 ||
fileCacheBackgroundMonitorIntervalMsResult[0][3] == null ||
+ fileCacheBackgroundMonitorIntervalMsResult[0][3].trim().isEmpty(),
FILE_CACHE_BACKGROUND_MONITOR_INTERVAL_CHECK_FAILED_MSG)
+
+ String catalog_name = "test_file_cache_query_limit"
+ String ex_db_name = "tpch1_parquet"
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
+ int queryCacheCapacity
+
+ sql """drop catalog if exists ${catalog_name} """
+
+ sql """CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='hms',
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}',
+ 'hadoop.username' = 'hive'
+ );"""
+
+ String query_sql =
+ """select sum(l_quantity) as sum_qty,
+ sum(l_extendedprice) as sum_base_price,
+ sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
+ sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as
sum_charge,
+ avg(l_quantity) as avg_qty,
+ avg(l_extendedprice) as avg_price,
+ avg(l_discount) as avg_disc,
+ count(*) as count_order
+ from ${catalog_name}.${ex_db_name}.lineitem
+ where l_shipdate <= date '1998-12-01' - interval '90' day
+ group by l_returnflag, l_linestatus
+ order by l_returnflag, l_linestatus;"""
+
+ def webserverPortResult = sql """SHOW BACKEND CONFIG LIKE
'webserver_port';"""
+ logger.info("webserver_port configuration: " + webserverPortResult)
+ assertFalse(webserverPortResult.size() == 0 || webserverPortResult[0][3]
== null || webserverPortResult[0][3].trim().isEmpty(),
+ WEB_SERVER_PORT_CHECK_FAILED_MSG)
+
+ String webserver_port = webserverPortResult[0][3]
+
+ def brpcPortResult = sql """SHOW BACKEND CONFIG LIKE 'brpc_port';"""
+ logger.info("brpcPortResult configuration: " + brpcPortResult)
+ assertFalse(brpcPortResult.size() == 0 || brpcPortResult[0][3] == null ||
brpcPortResult[0][3].trim().isEmpty(),
+ BRPC_PORT_CHECK_FAILED_MSG)
+
+ String brpc_port = brpcPortResult[0][3]
+
+ // Search file cache capacity
+ def command = ["curl", "-X", "POST", "${externalEnvIp}:${brpc_port}/vars"]
+ def stringCommand = command.collect{it.toString()}
+ def process = new ProcessBuilder(stringCommand as
String[]).redirectErrorStream(true).start()
+
+ def output = new StringBuilder()
+ def errorOutput = new StringBuilder()
+ process.inputStream.eachLine{line -> output.append(line).append("\n")}
+ process.errorStream.eachLine{line -> errorOutput.append(line).append("\n")}
+
+ // Wait for process completion and check exit status
+ def exitCode = process.waitFor()
+ def fileCacheCapacityResult = output.toString().split("\n").find {
it.contains("file_cache_capacity") }?.split(":")?.last()?.trim()
+
+ logger.info("File cache capacity: ${fileCacheCapacityResult}")
+ assertTrue(fileCacheCapacityResult != null, "Failed to find
file_cache_capacity in brpc metrics")
+ def fileCacheCapacity = Long.valueOf(fileCacheCapacityResult)
+
+ // Run file cache base test for setting the parameter
file_cache_query_limit_bytes
+ logger.info("========================= Start running file cache base test
========================")
+
+ // Clear file cache
+ command = ["curl", "-X", "POST",
"${externalEnvIp}:${webserver_port}/api/file_cache?op=clear&sync=true"]
+ stringCommand = command.collect{it.toString()}
+ process = new ProcessBuilder(stringCommand as
String[]).redirectErrorStream(true).start()
+
+ output = new StringBuilder()
+ errorOutput = new StringBuilder()
+ process.inputStream.eachLine{line -> output.append(line).append("\n")}
+ process.errorStream.eachLine{line -> errorOutput.append(line).append("\n")}
+
+ // Wait for process completion and check exit status
+ exitCode = process.waitFor()
+ logger.info("File cache clear command output: ${output.toString()}")
+ assertTrue(exitCode == 0, "File cache clear failed with exit code
${exitCode}. Error: ${errorOutput.toString()}")
+
+ // brpc metrics will be updated at most 5 seconds
+ def totalWaitTime =
(fileCacheBackgroundMonitorIntervalMsResult[0][3].toLong() / 1000) as int
+ def interval = 1
+ def iterations = totalWaitTime / interval
+
+ // Waiting for file cache clearing
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for file cache clearing ${elapsedSeconds} seconds,
${remainingSeconds} seconds remaining")
+ }
+
+ def initialNormalQueueCurrSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+ logger.info("normal_queue_curr_size result: " +
initialNormalQueueCurrSizeResult)
+ assertFalse(initialNormalQueueCurrSizeResult.size() == 0 ||
Double.valueOf(initialNormalQueueCurrSizeResult[0][0]) != 0.0,
+ INITIAL_NORMAL_QUEUE_CURR_SIZE_NOT_ZERO_MSG)
+
+ // Check normal queue current elements
+ def initialNormalQueueCurrElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+ logger.info("normal_queue_curr_elements result: " +
initialNormalQueueCurrElementsResult)
+ assertFalse(initialNormalQueueCurrElementsResult.size() == 0 ||
Double.valueOf(initialNormalQueueCurrElementsResult[0][0]) != 0.0,
+ INITIAL_NORMAL_QUEUE_CURR_ELEMENTS_NOT_ZERO_MSG)
+
+ double initialNormalQueueCurrSize =
Double.valueOf(initialNormalQueueCurrSizeResult[0][0])
+ double initialNormalQueueCurrElements =
Double.valueOf(initialNormalQueueCurrElementsResult[0][0])
+
+ logger.info("Initial normal queue curr size and elements - size:
${initialNormalQueueCurrSize} , " +
+ "elements: ${initialNormalQueueCurrElements}")
+
+ setBeConfigTemporary([
+ "enable_file_cache_query_limit": "false"
+ ]) {
+ // Execute test logic with modified configuration for
file_cache_query_limit
+ logger.info("Backend configuration set -
enable_file_cache_query_limit: false")
+
+ // Waiting for backend configuration update
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for backend configuration update
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ // Check if the configuration is modified
+ def enableFileCacheQueryLimitResult = sql """SHOW BACKEND CONFIG LIKE
'enable_file_cache_query_limit';"""
+ logger.info("enable_file_cache_query_limit configuration: " +
enableFileCacheQueryLimitResult)
+ assertFalse(enableFileCacheQueryLimitResult.size() == 0 ||
enableFileCacheQueryLimitResult[0][3] == null ||
enableFileCacheQueryLimitResult[0][3] != "false",
+ ENABLE_FILE_CACHE_QUERY_LIMIT_CHECK_FALSE_FAILED_MSG)
+
+ sql """switch ${catalog_name}"""
+ // load the table into file cache
+ sql query_sql
+
+ // Waiting for file cache statistics update
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for file cache statistics update
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ def baseNormalQueueCurrElementsResult = sql """select METRIC_VALUE
from information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+ logger.info("normal_queue_curr_elements result: " +
baseNormalQueueCurrElementsResult)
+ assertFalse(baseNormalQueueCurrElementsResult.size() == 0 ||
Double.valueOf(baseNormalQueueCurrElementsResult[0][0]) == 0.0,
+ BASE_NORMAL_QUEUE_CURR_ELEMENTS_IS_ZERO_MSG)
+
+ def baseNormalQueueCurrSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+ logger.info("normal_queue_curr_size result: " +
baseNormalQueueCurrSizeResult)
+ assertFalse(baseNormalQueueCurrSizeResult.size() == 0 ||
Double.valueOf(baseNormalQueueCurrSizeResult[0][0]) == 0.0,
+ BASE_NORMAL_QUEUE_CURR_SIZE_IS_ZERO_MSG)
+
+ int baseNormalQueueCurrElements =
Double.valueOf(baseNormalQueueCurrElementsResult[0][0]) as Long
+ queryCacheCapacity =
Double.valueOf(baseNormalQueueCurrSizeResult[0][0]) as Long
+ }
+
+ // The parameter file_cache_query_limit_percent must be set smaller than
the cache capacity required by the query
+ def fileCacheQueryLimitPercent = (queryCacheCapacity / fileCacheCapacity)
* 100
+ logger.info("file_cache_query_limit_percent: " +
fileCacheQueryLimitPercent)
+
+ logger.info("========================== End running file cache base test
=========================")
+
+ logger.info("==================== Start running file cache query limit
test 1 ====================")
+
+ def fileCacheQueryLimitPercentTest1 = (fileCacheQueryLimitPercent / 2) as
Long
+ logger.info("file_cache_query_limit_percent_test1: " +
fileCacheQueryLimitPercentTest1)
+
+ // Clear file cache
+ process = new ProcessBuilder(stringCommand as
String[]).redirectErrorStream(true).start()
+
+ output = new StringBuilder()
+ errorOutput = new StringBuilder()
+ process.inputStream.eachLine{line -> output.append(line).append("\n")}
+ process.errorStream.eachLine{line -> errorOutput.append(line).append("\n")}
+
+ // Wait for process completion and check exit status
+ exitCode = process.waitFor()
+ logger.info("File cache clear command output: ${output.toString()}")
+ assertTrue(exitCode == 0, "File cache clear failed with exit code
${exitCode}. Error: ${errorOutput.toString()}")
+
+ // Waiting for file cache clearing
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for file cache clearing ${elapsedSeconds} seconds,
${remainingSeconds} seconds remaining")
+ }
+
+ // ===== Normal Queue Metrics Check =====
+ // Check normal queue current size
+ initialNormalQueueCurrSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+ logger.info("normal_queue_curr_size result: " +
initialNormalQueueCurrSizeResult)
+ assertFalse(initialNormalQueueCurrSizeResult.size() == 0 ||
Double.valueOf(initialNormalQueueCurrSizeResult[0][0]) != 0.0,
+ INITIAL_NORMAL_QUEUE_CURR_SIZE_NOT_ZERO_MSG)
+
+ // Check normal queue current elements
+ initialNormalQueueCurrElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+ logger.info("normal_queue_curr_elements result: " +
initialNormalQueueCurrElementsResult)
+ assertFalse(initialNormalQueueCurrElementsResult.size() == 0 ||
Double.valueOf(initialNormalQueueCurrElementsResult[0][0]) != 0.0,
+ INITIAL_NORMAL_QUEUE_CURR_ELEMENTS_NOT_ZERO_MSG)
+
+ // Check normal queue max size
+ def initialNormalQueueMaxSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_max_size' limit 1;"""
+ logger.info("normal_queue_max_size result: " +
initialNormalQueueMaxSizeResult)
+ assertFalse(initialNormalQueueMaxSizeResult.size() == 0 ||
Double.valueOf(initialNormalQueueMaxSizeResult[0][0]) == 0.0,
+ INITIAL_NORMAL_QUEUE_MAX_SIZE_IS_ZERO_MSG)
+
+ // Check normal queue max elements
+ def initialNormalQueueMaxElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_max_elements' limit 1;"""
+ logger.info("normal_queue_max_elements result: " +
initialNormalQueueMaxElementsResult)
+ assertFalse(initialNormalQueueMaxElementsResult.size() == 0 ||
Double.valueOf(initialNormalQueueMaxElementsResult[0][0]) == 0.0,
+ INITIAL_NORMAL_QUEUE_MAX_ELEMENTS_IS_ZERO_MSG)
+
+ initialNormalQueueCurrSize =
Double.valueOf(initialNormalQueueCurrSizeResult[0][0])
+ initialNormalQueueCurrElements =
Double.valueOf(initialNormalQueueCurrElementsResult[0][0])
+ double initialNormalQueueMaxSize =
Double.valueOf(initialNormalQueueMaxSizeResult[0][0])
+ double initialNormalQueueMaxElements =
Double.valueOf(initialNormalQueueMaxElementsResult[0][0])
+
+ logger.info("Initial normal queue curr size and elements - size:
${initialNormalQueueCurrSize} , " +
+ "elements: ${initialNormalQueueCurrElements}")
+
+ logger.info("Initial normal queue max size and elements - size:
${initialNormalQueueMaxSize} , " +
+ "elements: ${initialNormalQueueMaxElements}")
+
+ // ===== Hit And Read Counts Metrics Check =====
+ // Get initial values for hit and read counts
+ def initialTotalHitCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_hit_counts' limit 1;"""
+ logger.info("Initial total_hit_counts result: " +
initialTotalHitCountsResult)
+
+ def initialTotalReadCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_read_counts' limit 1;"""
+ logger.info("Initial total_read_counts result: " +
initialTotalReadCountsResult)
+
+ // Store initial values
+ double initialTotalHitCounts =
Double.valueOf(initialTotalHitCountsResult[0][0])
+ double initialTotalReadCounts =
Double.valueOf(initialTotalReadCountsResult[0][0])
+
+ // Set backend configuration parameters for file_cache_query_limit test 1
+ setBeConfigTemporary([
+ "enable_file_cache_query_limit": "true"
+ ]) {
+ // Execute test logic with modified configuration for
file_cache_query_limit
+ logger.info("Backend configuration set -
enable_file_cache_query_limit: true")
+
+ sql """set file_cache_query_limit_percent =
${fileCacheQueryLimitPercentTest1}"""
+
+ // Waiting for backend configuration update
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for backend configuration update
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ // Check if the configuration is modified
+ def enableFileCacheQueryLimitResult = sql """SHOW BACKEND CONFIG LIKE
'enable_file_cache_query_limit';"""
+ logger.info("enable_file_cache_query_limit configuration: " +
enableFileCacheQueryLimitResult)
+ assertFalse(enableFileCacheQueryLimitResult.size() == 0 ||
enableFileCacheQueryLimitResult[0][3] == null ||
enableFileCacheQueryLimitResult[0][3] != "true",
+ ENABLE_FILE_CACHE_QUERY_LIMIT_CHECK_TRUE_FAILED_MSG)
+
+ sql """switch ${catalog_name}"""
+
+ // load the table into file cache
+ sql query_sql
+
+ // Waiting for file cache statistics update
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for file cache statistics update
${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ // Get updated value of normal queue current elements and max elements
after cache operations
+ def updatedNormalQueueCurrSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+ logger.info("normal_queue_curr_size result: " +
updatedNormalQueueCurrSizeResult)
+
+ def updatedNormalQueueCurrElementsResult = sql """select METRIC_VALUE
from information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+ logger.info("normal_queue_curr_elements result: " +
updatedNormalQueueCurrElementsResult)
+
+ // Check if updated values are greater than initial values
+ double updatedNormalQueueCurrSize =
Double.valueOf(updatedNormalQueueCurrSizeResult[0][0])
+ double updatedNormalQueueCurrElements =
Double.valueOf(updatedNormalQueueCurrElementsResult[0][0])
+
+ logger.info("Updated normal queue curr size and elements - size:
${updatedNormalQueueCurrSize} , " +
+ "elements: ${updatedNormalQueueCurrElements}")
+
+ assertTrue(updatedNormalQueueCurrSize > 0.0,
NORMAL_QUEUE_CURR_SIZE_NOT_GREATER_THAN_ZERO_MSG)
+ assertTrue(updatedNormalQueueCurrElements > 0.0,
NORMAL_QUEUE_CURR_ELEMENTS_NOT_GREATER_THAN_ZERO_MSG)
+
+ logger.info("Normal queue curr size and query cache capacity
comparison - normal queue curr size: ${updatedNormalQueueCurrSize as Long} , " +
+ "query cache capacity: ${fileCacheCapacity}")
+
+ assertTrue((updatedNormalQueueCurrSize as Long) <= queryCacheCapacity,
+ NORMAL_QUEUE_CURR_SIZE_GREATER_THAN_QUERY_CACHE_CAPACITY_MSG)
+
+ // Get updated values for hit and read counts after cache operations
+ def updatedTotalHitCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_hit_counts' limit 1;"""
+ logger.info("Updated total_hit_counts result: " +
updatedTotalHitCountsResult)
+
+ def updatedTotalReadCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_read_counts' limit 1;"""
+ logger.info("Updated total_read_counts result: " +
updatedTotalReadCountsResult)
+
+ // Check if updated values are greater than initial values
+ double updatedTotalHitCounts =
Double.valueOf(updatedTotalHitCountsResult[0][0])
+ double updatedTotalReadCounts =
Double.valueOf(updatedTotalReadCountsResult[0][0])
+
+ logger.info("Total hit and read counts comparison - hit counts:
${initialTotalHitCounts} -> " +
+ "${updatedTotalHitCounts} , read counts:
${initialTotalReadCounts} -> ${updatedTotalReadCounts}")
+
+ assertTrue(updatedTotalReadCounts > initialTotalReadCounts,
TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
+ }
+
+ logger.info("===================== End running file cache query limit test
1 =====================")
+
+ return true;
+}
\ No newline at end of file
diff --git
a/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit_config.groovy
b/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit_config.groovy
new file mode 100644
index 00000000000..9e83bd0af3f
--- /dev/null
+++
b/regression-test/suites/external_table_p0/cache/test_file_cache_query_limit_config.groovy
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.dgmimpl.arrays.LongArrayGetAtMetaMethod
+
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
+
+final String ERROR_SQL_SUCCEED_MSG = "SQL should have failed but succeeded"
+final String SET_SESSION_VARIABLE_FAILED_MSG = "SQL set session variable
failed"
+
+suite("test_file_cache_query_limit_config",
"external_docker,hive,external_docker_hive,p0,external,nonConcurrent") {
+
+ sql """set file_cache_query_limit_percent = 1"""
+ def fileCacheQueryLimitPercentResult = sql """show variables like
'file_cache_query_limit_percent';"""
+ logger.info("file_cache_query_limit_percent configuration: " +
fileCacheQueryLimitPercentResult)
+ assertFalse(fileCacheQueryLimitPercentResult.size() == 0 ||
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 1.0,
+ SET_SESSION_VARIABLE_FAILED_MSG)
+
+ sql """set file_cache_query_limit_percent = 20"""
+ fileCacheQueryLimitPercentResult = sql """show variables like
'file_cache_query_limit_percent';"""
+ logger.info("file_cache_query_limit_percent configuration: " +
fileCacheQueryLimitPercentResult)
+ assertFalse(fileCacheQueryLimitPercentResult.size() == 0 ||
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 20.0,
+ SET_SESSION_VARIABLE_FAILED_MSG)
+
+ sql """set file_cache_query_limit_percent = 51"""
+ fileCacheQueryLimitPercentResult = sql """show variables like
'file_cache_query_limit_percent';"""
+ logger.info("file_cache_query_limit_percent configuration: " +
fileCacheQueryLimitPercentResult)
+ assertFalse(fileCacheQueryLimitPercentResult.size() == 0 ||
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 51.0,
+ SET_SESSION_VARIABLE_FAILED_MSG)
+
+
+ sql """set file_cache_query_limit_percent = 100"""
+ fileCacheQueryLimitPercentResult = sql """show variables like
'file_cache_query_limit_percent';"""
+ logger.info("file_cache_query_limit_percent configuration: " +
fileCacheQueryLimitPercentResult)
+ assertFalse(fileCacheQueryLimitPercentResult.size() == 0 ||
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 100.0,
+ SET_SESSION_VARIABLE_FAILED_MSG)
+
+ try {
+ sql """set file_cache_query_limit_percent = -1"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+
+ try {
+ sql """set file_cache_query_limit_percent = 0"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+
+ try {
+ sql """set file_cache_query_limit_percent = 101"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+
+ try {
+ sql """set file_cache_query_limit_percent = 1000000"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+
+ // Set frontend configuration parameters for
file_cache_query_limit_max_percent
+ setFeConfigTemporary([
+ "file_cache_query_limit_max_percent": "50"
+ ]) {
+ // Execute test logic with modified configuration for
file_cache_query_limit_max_percent
+ logger.info("Backend configuration set -
file_cache_query_limit_max_percent: 50")
+
+ sql """set file_cache_query_limit_percent = 1"""
+ fileCacheQueryLimitPercentResult = sql """show variables like
'file_cache_query_limit_percent';"""
+ logger.info("file_cache_query_limit_percent configuration: " +
fileCacheQueryLimitPercentResult)
+ assertFalse(fileCacheQueryLimitPercentResult.size() == 0 ||
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 1.0,
+ SET_SESSION_VARIABLE_FAILED_MSG)
+
+ sql """set file_cache_query_limit_percent = 20"""
+ fileCacheQueryLimitPercentResult = sql """show variables like
'file_cache_query_limit_percent';"""
+ logger.info("file_cache_query_limit_percent configuration: " +
fileCacheQueryLimitPercentResult)
+ assertFalse(fileCacheQueryLimitPercentResult.size() == 0 ||
Double.valueOf(fileCacheQueryLimitPercentResult[0][1]) != 20.0,
+ SET_SESSION_VARIABLE_FAILED_MSG)
+
+ try {
+ sql """set file_cache_query_limit_percent = -1"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+
+ try {
+ sql """set file_cache_query_limit_percent = 0"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+
+ try {
+ sql """set file_cache_query_limit_percent = 51"""
+ assertTrue(false, ERROR_SQL_SUCCEED_MSG)
+ } catch (Exception e) {
+ logger.info("SQL failed as expected: ${e.message}")
+ }
+ }
+
+ return true;
+}
diff --git
a/regression-test/suites/external_table_p0/cache/test_file_cache_statistics.groovy
b/regression-test/suites/external_table_p0/cache/test_file_cache_statistics.groovy
index b0984445e5f..b8e2d3a164e 100644
---
a/regression-test/suites/external_table_p0/cache/test_file_cache_statistics.groovy
+++
b/regression-test/suites/external_table_p0/cache/test_file_cache_statistics.groovy
@@ -89,150 +89,170 @@ suite("test_file_cache_statistics",
"external_docker,hive,external_docker_hive,p
// do it twice to make sure the table block could hit the cache
order_qt_1 """select * from
${catalog_name}.${ex_db_name}.parquet_partition_table where l_orderkey=1 and
l_partkey=1534 limit 1;"""
- // brpc metrics will be updated at most 20 seconds
- Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until{
- // ===== Hit Ratio Metrics Check =====
- // Check overall hit ratio hits_ratio
- def hitsRatioResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio' limit
1;"""
- logger.info("hits_ratio result: " + hitsRatioResult)
-
- // Check 1-hour hit ratio hits_ratio_1h
- def hitsRatio1hResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio_1h'
limit 1;"""
- logger.info("hits_ratio_1h result: " + hitsRatio1hResult)
-
- // Check 5-minute hit ratio hits_ratio_5m
- def hitsRatio5mResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio_5m'
limit 1;"""
- logger.info("hits_ratio_5m result: " + hitsRatio5mResult)
-
- // Check if all three metrics exist and are greater than 0
- boolean hasHitsRatio = hitsRatioResult.size() > 0 &&
Double.valueOf(hitsRatioResult[0][0]) > 0
- boolean hasHitsRatio1h = hitsRatio1hResult.size() > 0 &&
Double.valueOf(hitsRatio1hResult[0][0]) > 0
- boolean hasHitsRatio5m = hitsRatio5mResult.size() > 0 &&
Double.valueOf(hitsRatio5mResult[0][0]) > 0
-
- logger.info("Hit ratio metrics check result - hits_ratio:
${hasHitsRatio}, hits_ratio_1h: ${hasHitsRatio1h}, hits_ratio_5m:
${hasHitsRatio5m}")
-
- // Return false if any metric is false, otherwise return true
- if (!hasHitsRatio) {
- logger.info(HIT_RATIO_METRIC_FALSE_MSG)
- assertTrue(false, HIT_RATIO_METRIC_FALSE_MSG)
- }
- if (!hasHitsRatio1h) {
- logger.info(HIT_RATIO_1H_METRIC_FALSE_MSG)
- assertTrue(false, HIT_RATIO_1H_METRIC_FALSE_MSG)
- }
- if (!hasHitsRatio5m) {
- logger.info(HIT_RATIO_5M_METRIC_FALSE_MSG)
- assertTrue(false, HIT_RATIO_5M_METRIC_FALSE_MSG)
- }
- // ===== End Hit Ratio Metrics Check =====
-
- // ===== Normal Queue Metrics Check =====
- // Check normal queue current size and max size
- def normalQueueCurrSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
- logger.info("normal_queue_curr_size result: " +
normalQueueCurrSizeResult)
-
- def normalQueueMaxSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'normal_queue_max_size' limit 1;"""
- logger.info("normal_queue_max_size result: " +
normalQueueMaxSizeResult)
-
- // Check normal queue current elements and max elements
- def normalQueueCurrElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
- logger.info("normal_queue_curr_elements result: " +
normalQueueCurrElementsResult)
-
- def normalQueueMaxElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'normal_queue_max_elements' limit 1;"""
- logger.info("normal_queue_max_elements result: " +
normalQueueMaxElementsResult)
-
- // Check normal queue size metrics
- boolean hasNormalQueueCurrSize = normalQueueCurrSizeResult.size() > 0
&&
- Double.valueOf(normalQueueCurrSizeResult[0][0]) > 0
- boolean hasNormalQueueMaxSize = normalQueueMaxSizeResult.size() > 0 &&
- Double.valueOf(normalQueueMaxSizeResult[0][0]) > 0
- boolean hasNormalQueueCurrElements =
normalQueueCurrElementsResult.size() > 0 &&
- Double.valueOf(normalQueueCurrElementsResult[0][0]) > 0
- boolean hasNormalQueueMaxElements =
normalQueueMaxElementsResult.size() > 0 &&
- Double.valueOf(normalQueueMaxElementsResult[0][0]) > 0
-
- // Check if current size is less than max size and current elements is
less than max elements
- boolean normalQueueSizeValid = hasNormalQueueCurrSize &&
hasNormalQueueMaxSize &&
- Double.valueOf(normalQueueCurrSizeResult[0][0]) <
Double.valueOf(normalQueueMaxSizeResult[0][0])
- boolean normalQueueElementsValid = hasNormalQueueCurrElements &&
hasNormalQueueMaxElements &&
- Double.valueOf(normalQueueCurrElementsResult[0][0]) <
Double.valueOf(normalQueueMaxElementsResult[0][0])
-
- logger.info("Normal queue metrics check result - size valid:
${normalQueueSizeValid}, " +
- "elements valid: ${normalQueueElementsValid}")
-
- if (!normalQueueSizeValid) {
- logger.info(NORMAL_QUEUE_SIZE_VALIDATION_FAILED_MSG)
- assertTrue(false, NORMAL_QUEUE_SIZE_VALIDATION_FAILED_MSG)
- }
- if (!normalQueueElementsValid) {
- logger.info(NORMAL_QUEUE_ELEMENTS_VALIDATION_FAILED_MSG)
- assertTrue(false, NORMAL_QUEUE_ELEMENTS_VALIDATION_FAILED_MSG)
- }
- // ===== End Normal Queue Metrics Check =====
-
- // ===== Hit and Read Counts Metrics Check =====
- // Get initial values for hit and read counts
- def initialHitCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'total_hit_counts' limit 1;"""
- logger.info("Initial total_hit_counts result: " +
initialHitCountsResult)
-
- def initialReadCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'total_read_counts' limit 1;"""
- logger.info("Initial total_read_counts result: " +
initialReadCountsResult)
-
- // Check if initial values exist and are greater than 0
- if (initialHitCountsResult.size() == 0 ||
Double.valueOf(initialHitCountsResult[0][0]) <= 0) {
- logger.info(INITIAL_TOTAL_HIT_COUNTS_NOT_GREATER_THAN_0_MSG)
- assertTrue(false, INITIAL_TOTAL_HIT_COUNTS_NOT_GREATER_THAN_0_MSG)
- }
- if (initialReadCountsResult.size() == 0 ||
Double.valueOf(initialReadCountsResult[0][0]) <= 0) {
- logger.info(INITIAL_TOTAL_READ_COUNTS_NOT_GREATER_THAN_0_MSG)
- assertTrue(false, INITIAL_TOTAL_READ_COUNTS_NOT_GREATER_THAN_0_MSG)
- }
-
- // Store initial values
- double initialHitCounts = Double.valueOf(initialHitCountsResult[0][0])
- double initialReadCounts =
Double.valueOf(initialReadCountsResult[0][0])
-
- // Execute the same query to trigger cache operations
- order_qt_2 """select * from
${catalog_name}.${ex_db_name}.parquet_partition_table
- where l_orderkey=1 and l_partkey=1534 limit 1;"""
-
- // Get updated values after cache operations
- def updatedHitCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'total_hit_counts' limit 1;"""
- logger.info("Updated total_hit_counts result: " +
updatedHitCountsResult)
-
- def updatedReadCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
- where METRIC_NAME = 'total_read_counts' limit 1;"""
- logger.info("Updated total_read_counts result: " +
updatedReadCountsResult)
-
- // Check if updated values are greater than initial values
- double updatedHitCounts = Double.valueOf(updatedHitCountsResult[0][0])
- double updatedReadCounts =
Double.valueOf(updatedReadCountsResult[0][0])
-
- boolean hitCountsIncreased = updatedHitCounts > initialHitCounts
- boolean readCountsIncreased = updatedReadCounts > initialReadCounts
-
- logger.info("Hit and read counts comparison - hit_counts:
${initialHitCounts} -> " +
- "${updatedHitCounts} (increased: ${hitCountsIncreased}),
read_counts: ${initialReadCounts} -> " +
- "${updatedReadCounts} (increased: ${readCountsIncreased})")
-
- if (!hitCountsIncreased) {
- logger.info(TOTAL_HIT_COUNTS_DID_NOT_INCREASE_MSG)
- assertTrue(false, TOTAL_HIT_COUNTS_DID_NOT_INCREASE_MSG)
- }
- if (!readCountsIncreased) {
- logger.info(TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
- assertTrue(false, TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
- }
- // ===== End Hit and Read Counts Metrics Check =====
- sql """set global enable_file_cache=false"""
- return true
+ def fileCacheBackgroundMonitorIntervalMsResult = sql """show backend
config like 'file_cache_background_monitor_interval_ms';"""
+ logger.info("file_cache_background_monitor_interval_ms configuration: " +
fileCacheBackgroundMonitorIntervalMsResult)
+ assertFalse(fileCacheBackgroundMonitorIntervalMsResult.size() == 0 ||
fileCacheBackgroundMonitorIntervalMsResult[0][3] == null ||
+ fileCacheBackgroundMonitorIntervalMsResult[0][3].trim().isEmpty(),
"file_cache_background_monitor_interval_ms is empty or not set to true")
+
+ // brpc metrics will be updated at most 5 seconds
+ def totalWaitTime =
(fileCacheBackgroundMonitorIntervalMsResult[0][3].toInteger() / 1000) as int
+ def interval = 1
+ def iterations = totalWaitTime / interval
+
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for file cache statistics update ${elapsedSeconds}
seconds, ${remainingSeconds} seconds remaining")
}
-}
+ // ===== Hit Ratio Metrics Check =====
+ // Check overall hit ratio hits_ratio
+ def hitsRatioResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio' limit
1;"""
+ logger.info("hits_ratio result: " + hitsRatioResult)
+
+ // Check 1-hour hit ratio hits_ratio_1h
+ def hitsRatio1hResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio_1h'
limit 1;"""
+ logger.info("hits_ratio_1h result: " + hitsRatio1hResult)
+
+ // Check 5-minute hit ratio hits_ratio_5m
+ def hitsRatio5mResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics where METRIC_NAME = 'hits_ratio_5m'
limit 1;"""
+ logger.info("hits_ratio_5m result: " + hitsRatio5mResult)
+
+ // Check if all three metrics exist and are greater than 0
+ boolean hasHitsRatio = hitsRatioResult.size() > 0 &&
Double.valueOf(hitsRatioResult[0][0]) > 0
+ boolean hasHitsRatio1h = hitsRatio1hResult.size() > 0 &&
Double.valueOf(hitsRatio1hResult[0][0]) > 0
+ boolean hasHitsRatio5m = hitsRatio5mResult.size() > 0 &&
Double.valueOf(hitsRatio5mResult[0][0]) > 0
+
+ logger.info("Hit ratio metrics check result - hits_ratio: ${hasHitsRatio},
hits_ratio_1h: ${hasHitsRatio1h}, hits_ratio_5m: ${hasHitsRatio5m}")
+
+ // Return false if any metric is false, otherwise return true
+ if (!hasHitsRatio) {
+ logger.info(HIT_RATIO_METRIC_FALSE_MSG)
+ assertTrue(false, HIT_RATIO_METRIC_FALSE_MSG)
+ }
+ if (!hasHitsRatio1h) {
+ logger.info(HIT_RATIO_1H_METRIC_FALSE_MSG)
+ assertTrue(false, HIT_RATIO_1H_METRIC_FALSE_MSG)
+ }
+ if (!hasHitsRatio5m) {
+ logger.info(HIT_RATIO_5M_METRIC_FALSE_MSG)
+ assertTrue(false, HIT_RATIO_5M_METRIC_FALSE_MSG)
+ }
+ // ===== End Hit Ratio Metrics Check =====
+
+ // ===== Normal Queue Metrics Check =====
+ // Check normal queue current size and max size
+ def normalQueueCurrSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
+ logger.info("normal_queue_curr_size result: " + normalQueueCurrSizeResult)
+
+ def normalQueueMaxSizeResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_max_size' limit 1;"""
+ logger.info("normal_queue_max_size result: " + normalQueueMaxSizeResult)
+
+ // Check normal queue current elements and max elements
+ def normalQueueCurrElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
+ logger.info("normal_queue_curr_elements result: " +
normalQueueCurrElementsResult)
+
+ def normalQueueMaxElementsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'normal_queue_max_elements' limit 1;"""
+ logger.info("normal_queue_max_elements result: " +
normalQueueMaxElementsResult)
+
+ // Check normal queue size metrics
+ boolean hasNormalQueueCurrSize = normalQueueCurrSizeResult.size() > 0 &&
+ Double.valueOf(normalQueueCurrSizeResult[0][0]) > 0
+ boolean hasNormalQueueMaxSize = normalQueueMaxSizeResult.size() > 0 &&
+ Double.valueOf(normalQueueMaxSizeResult[0][0]) > 0
+ boolean hasNormalQueueCurrElements = normalQueueCurrElementsResult.size()
> 0 &&
+ Double.valueOf(normalQueueCurrElementsResult[0][0]) > 0
+ boolean hasNormalQueueMaxElements = normalQueueMaxElementsResult.size() >
0 &&
+ Double.valueOf(normalQueueMaxElementsResult[0][0]) > 0
+
+ // Check if current size is less than max size and current elements is
less than max elements
+ boolean normalQueueSizeValid = hasNormalQueueCurrSize &&
hasNormalQueueMaxSize &&
+ Double.valueOf(normalQueueCurrSizeResult[0][0]) <
Double.valueOf(normalQueueMaxSizeResult[0][0])
+ boolean normalQueueElementsValid = hasNormalQueueCurrElements &&
hasNormalQueueMaxElements &&
+ Double.valueOf(normalQueueCurrElementsResult[0][0]) <
Double.valueOf(normalQueueMaxElementsResult[0][0])
+
+ logger.info("Normal queue metrics check result - size valid:
${normalQueueSizeValid}, " +
+ "elements valid: ${normalQueueElementsValid}")
+
+ if (!normalQueueSizeValid) {
+ logger.info(NORMAL_QUEUE_SIZE_VALIDATION_FAILED_MSG)
+ assertTrue(false, NORMAL_QUEUE_SIZE_VALIDATION_FAILED_MSG)
+ }
+ if (!normalQueueElementsValid) {
+ logger.info(NORMAL_QUEUE_ELEMENTS_VALIDATION_FAILED_MSG)
+ assertTrue(false, NORMAL_QUEUE_ELEMENTS_VALIDATION_FAILED_MSG)
+ }
+ // ===== End Normal Queue Metrics Check =====
+
+ // ===== Hit and Read Counts Metrics Check =====
+ // Get initial values for hit and read counts
+ def initialHitCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_hit_counts' limit 1;"""
+ logger.info("Initial total_hit_counts result: " + initialHitCountsResult)
+
+ def initialReadCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_read_counts' limit 1;"""
+ logger.info("Initial total_read_counts result: " + initialReadCountsResult)
+
+ // Check if initial values exist and are greater than 0
+ if (initialHitCountsResult.size() == 0 ||
Double.valueOf(initialHitCountsResult[0][0]) <= 0) {
+ logger.info(INITIAL_TOTAL_HIT_COUNTS_NOT_GREATER_THAN_0_MSG)
+ assertTrue(false, INITIAL_TOTAL_HIT_COUNTS_NOT_GREATER_THAN_0_MSG)
+ }
+ if (initialReadCountsResult.size() == 0 ||
Double.valueOf(initialReadCountsResult[0][0]) <= 0) {
+ logger.info(INITIAL_TOTAL_READ_COUNTS_NOT_GREATER_THAN_0_MSG)
+ assertTrue(false, INITIAL_TOTAL_READ_COUNTS_NOT_GREATER_THAN_0_MSG)
+ }
+
+ // Store initial values
+ double initialHitCounts = Double.valueOf(initialHitCountsResult[0][0])
+ double initialReadCounts = Double.valueOf(initialReadCountsResult[0][0])
+
+ (1..iterations).each { count ->
+ Thread.sleep(interval * 1000)
+ def elapsedSeconds = count * interval
+ def remainingSeconds = totalWaitTime - elapsedSeconds
+ logger.info("Waited for file cache statistics update ${elapsedSeconds}
seconds, ${remainingSeconds} seconds remaining")
+ }
+
+ // Execute the same query to trigger cache operations
+ order_qt_2 """select * from
${catalog_name}.${ex_db_name}.parquet_partition_table
+ where l_orderkey=1 and l_partkey=1534 limit 1;"""
+
+ // Get updated values after cache operations
+ def updatedHitCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_hit_counts' limit 1;"""
+ logger.info("Updated total_hit_counts result: " + updatedHitCountsResult)
+
+ def updatedReadCountsResult = sql """select METRIC_VALUE from
information_schema.file_cache_statistics
+ where METRIC_NAME = 'total_read_counts' limit 1;"""
+ logger.info("Updated total_read_counts result: " + updatedReadCountsResult)
+
+ // Check if updated values are greater than initial values
+ double updatedHitCounts = Double.valueOf(updatedHitCountsResult[0][0])
+ double updatedReadCounts = Double.valueOf(updatedReadCountsResult[0][0])
+
+ boolean hitCountsIncreased = updatedHitCounts > initialHitCounts
+ boolean readCountsIncreased = updatedReadCounts > initialReadCounts
+
+ logger.info("Hit and read counts comparison - hit_counts:
${initialHitCounts} -> " +
+ "${updatedHitCounts} (increased: ${hitCountsIncreased}), read_counts:
${initialReadCounts} -> " +
+ "${updatedReadCounts} (increased: ${readCountsIncreased})")
+
+ if (!hitCountsIncreased) {
+ logger.info(TOTAL_HIT_COUNTS_DID_NOT_INCREASE_MSG)
+ assertTrue(false, TOTAL_HIT_COUNTS_DID_NOT_INCREASE_MSG)
+ }
+ if (!readCountsIncreased) {
+ logger.info(TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
+ assertTrue(false, TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
+ }
+ // ===== End Hit and Read Counts Metrics Check =====
+ sql """set global enable_file_cache=false"""
+ return true
+}
diff --git
a/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
b/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
index 7d270ffe3b8..d1adc2b6b84 100644
---
a/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
+++
b/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_hive_warmup_select",
"p0,external,hive,external_docker,external_docker_hive") {
+suite("test_hive_warmup_select",
"p0,external,hive,external_docker,external_docker_hive,nonConcurrent") {
String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("disable Hive test.")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]