This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 098b66ee8cc [cherry-pick][enhancement](cloud) support BE http action:
list_cache and clear (#41037) (#43412)
098b66ee8cc is described below
commit 098b66ee8cc1be6c1d1c4530ea2bcfcbc64dfbfc
Author: yagagagaga <[email protected]>
AuthorDate: Thu Nov 14 16:23:23 2024 +0800
[cherry-pick][enhancement](cloud) support BE http action: list_cache and
clear (#41037) (#43412)
## Proposed changes
Add a http action which is useful when you debug.
### API
```http
GET /api/file_cache
```
### request parameter
#### request parameter1
|param|type|desc|require|
|:---|:---|:---|:---|
|op|string|the value must be `list_cache`, other value you can refer to
#40831 #37484 |yes|
|value|string|the segment file name |yes|
#### request parameter2
|param|type|desc|require|
|:---|:---|:---|:---|
|op|string|the value must be `clear`, other value you can refer to
#40831 #37484 |yes|
|value|string|the segment file name |yes|
|sync|bool|clean local cache in sync |no|
### response
#### response1
if success
|param|type|desc|
|:---|:---|:---|
||array|return the segment file cache in local path|
if fail
|param|type|desc|
|:---|:---|:---|
||array|empty array|
#### response2
if success
|param|type|desc|
|:---|:---|:---|
|status|string||
|msg|string||
### example
#### case 1
```bash
curl
'172.100.0.4:8040/api/file_cache?op=list_cache&value=0200000000000001bf42c14374fff491ffb7c89a1a65c5bb_0.dat'
```
return
```json
["/opt/doris/be/file_cache/c6a/c6a599f453f67f0949f80ad9990fa3dd/0"]
```
#### case 2
```bash
curl
'127.0.0.1:8040/api/file_cache?op=clear&sync=true&value=0200000000000001284b68fea3dcfe8a83e65cd88426b081_0.dat'
```
return
```json
{
"status": "OK",
"msg": "OK"
}
```
(cherry picked from commit 99d07480d3ae685f57b6cf61866343863b481489)
---
be/src/http/action/file_cache_action.cpp | 33 +++++-
be/src/io/cache/block_file_cache_factory.cpp | 17 +++
be/src/io/cache/block_file_cache_factory.h | 2 +
be/src/io/cache/file_block.cpp | 4 +
be/src/io/cache/file_block.h | 2 +
be/src/io/cache/file_cache_storage.h | 2 +
be/src/io/cache/fs_file_cache_storage.cpp | 5 +
be/src/io/cache/fs_file_cache_storage.h | 1 +
be/src/io/cache/mem_file_cache_storage.cpp | 4 +
be/src/io/cache/mem_file_cache_storage.h | 1 +
.../cache/http/test_list_cache_file.groovy | 117 +++++++++++++++++++++
11 files changed, 187 insertions(+), 1 deletion(-)
diff --git a/be/src/http/action/file_cache_action.cpp
b/be/src/http/action/file_cache_action.cpp
index f31c040c5cf..740bac46edf 100644
--- a/be/src/http/action/file_cache_action.cpp
+++ b/be/src/http/action/file_cache_action.cpp
@@ -17,10 +17,15 @@
#include "file_cache_action.h"
+#include <glog/logging.h>
+
+#include <algorithm>
#include <memory>
#include <shared_mutex>
#include <sstream>
#include <string>
+#include <string_view>
+#include <vector>
#include "common/status.h"
#include "http/http_channel.h"
@@ -30,6 +35,7 @@
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_cache_common.h"
+#include "io/cache/fs_file_cache_storage.h"
#include "olap/olap_define.h"
#include "olap/tablet_meta.h"
#include "util/easy_json.h"
@@ -43,6 +49,7 @@ constexpr static std::string_view PATH = "path";
constexpr static std::string_view CLEAR = "clear";
constexpr static std::string_view RESET = "reset";
constexpr static std::string_view HASH = "hash";
+constexpr static std::string_view LIST_CACHE = "list_cache";
constexpr static std::string_view CAPACITY = "capacity";
constexpr static std::string_view RELEASE = "release";
constexpr static std::string_view BASE_PATH = "base_path";
@@ -66,7 +73,14 @@ Status FileCacheAction::_handle_header(HttpRequest* req,
std::string* json_metri
*json_metrics = json.ToString();
} else if (operation == CLEAR) {
const std::string& sync = req->param(SYNC.data());
- auto ret =
io::FileCacheFactory::instance()->clear_file_caches(to_lower(sync) == "true");
+ const std::string& segment_path = req->param(VALUE.data());
+ if (segment_path.empty()) {
+ io::FileCacheFactory::instance()->clear_file_caches(to_lower(sync)
== "true");
+ } else {
+ io::UInt128Wrapper hash = io::BlockFileCache::hash(segment_path);
+ io::BlockFileCache* cache =
io::FileCacheFactory::instance()->get_by_path(hash);
+ cache->remove_if_cached(hash);
+ }
} else if (operation == RESET) {
std::string capacity = req->param(CAPACITY.data());
int64_t new_capacity = 0;
@@ -96,6 +110,23 @@ Status FileCacheAction::_handle_header(HttpRequest* req,
std::string* json_metri
json[HASH.data()] = ret.to_string();
*json_metrics = json.ToString();
}
+ } else if (operation == LIST_CACHE) {
+ const std::string& segment_path = req->param(VALUE.data());
+ if (segment_path.empty()) {
+ st = Status::InvalidArgument("missing parameter: {} is required",
VALUE.data());
+ } else {
+ io::UInt128Wrapper cache_hash =
io::BlockFileCache::hash(segment_path);
+ std::vector<std::string> cache_files =
+
io::FileCacheFactory::instance()->get_cache_file_by_path(cache_hash);
+ if (cache_files.empty()) {
+ *json_metrics = "[]";
+ } else {
+ EasyJson json;
+ std::for_each(cache_files.begin(), cache_files.end(),
+ [&json](auto& x) { json.PushBack(x); });
+ *json_metrics = json.ToString();
+ }
+ }
} else {
st = Status::InternalError("invalid operation: {}", operation);
}
diff --git a/be/src/io/cache/block_file_cache_factory.cpp
b/be/src/io/cache/block_file_cache_factory.cpp
index 8370962ddd5..2d0d25735fe 100644
--- a/be/src/io/cache/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block_file_cache_factory.cpp
@@ -21,6 +21,9 @@
#include "io/cache/block_file_cache_factory.h"
#include <glog/logging.h>
+
+#include <string>
+#include <vector>
#if defined(__APPLE__)
#include <sys/mount.h>
#else
@@ -118,6 +121,20 @@ Status FileCacheFactory::create_file_cache(const
std::string& cache_base_path,
return Status::OK();
}
+std::vector<std::string> FileCacheFactory::get_cache_file_by_path(const
UInt128Wrapper& hash) {
+ io::BlockFileCache* cache =
io::FileCacheFactory::instance()->get_by_path(hash);
+ auto blocks = cache->get_blocks_by_key(hash);
+ std::vector<std::string> ret;
+ if (blocks.empty()) {
+ return ret;
+ } else {
+ for (auto& [_, fb] : blocks) {
+ ret.emplace_back(fb->get_cache_file());
+ }
+ }
+ return ret;
+}
+
BlockFileCache* FileCacheFactory::get_by_path(const UInt128Wrapper& key) {
// dont need lock mutex because _caches is immutable after
create_file_cache
return _caches[KeyHash()(key) % _caches.size()].get();
diff --git a/be/src/io/cache/block_file_cache_factory.h
b/be/src/io/cache/block_file_cache_factory.h
index 12714fd2087..b00bd7bdfcb 100644
--- a/be/src/io/cache/block_file_cache_factory.h
+++ b/be/src/io/cache/block_file_cache_factory.h
@@ -62,6 +62,8 @@ public:
[[nodiscard]] size_t get_cache_instance_size() const { return
_caches.size(); }
+ std::vector<std::string> get_cache_file_by_path(const UInt128Wrapper&
hash);
+
BlockFileCache* get_by_path(const UInt128Wrapper& hash);
BlockFileCache* get_by_path(const std::string& cache_base_path);
std::vector<BlockFileCache::QueryFileCacheContextHolderPtr>
get_query_context_holders(
diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp
index 4576b9dbba8..44cad5520ea 100644
--- a/be/src/io/cache/file_block.cpp
+++ b/be/src/io/cache/file_block.cpp
@@ -272,6 +272,10 @@ std::string FileBlock::state_to_string(FileBlock::State
state) {
}
}
+std::string FileBlock::get_cache_file() const {
+ return _mgr->_storage->get_local_file(this->_key);
+}
+
FileBlocksHolder::~FileBlocksHolder() {
for (auto file_block_it = file_blocks.begin(); file_block_it !=
file_blocks.end();) {
auto current_file_block_it = file_block_it;
diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h
index 6e49a597b7b..3a4490d67a3 100644
--- a/be/src/io/cache/file_block.h
+++ b/be/src/io/cache/file_block.h
@@ -123,6 +123,8 @@ public:
uint64_t expiration_time() const { return _key.meta.expiration_time; }
+ std::string get_cache_file() const;
+
State state_unlock(std::lock_guard<std::mutex>&) const;
FileBlock& operator=(const FileBlock&) = delete;
diff --git a/be/src/io/cache/file_cache_storage.h
b/be/src/io/cache/file_cache_storage.h
index 642c4711cf6..024e701c6fa 100644
--- a/be/src/io/cache/file_cache_storage.h
+++ b/be/src/io/cache/file_cache_storage.h
@@ -65,6 +65,8 @@ public:
// force clear all current data in the cache
virtual Status clear(std::string& msg) = 0;
virtual FileCacheStorageType get_type() = 0;
+ // get local cached file
+ virtual std::string get_local_file(const FileCacheKey& key) = 0;
};
} // namespace doris::io
diff --git a/be/src/io/cache/fs_file_cache_storage.cpp
b/be/src/io/cache/fs_file_cache_storage.cpp
index 8471f6e12b4..cf1cd41a537 100644
--- a/be/src/io/cache/fs_file_cache_storage.cpp
+++ b/be/src/io/cache/fs_file_cache_storage.cpp
@@ -677,6 +677,11 @@ Status FSFileCacheStorage::clear(std::string& msg) {
return Status::OK();
}
+std::string FSFileCacheStorage::get_local_file(const FileCacheKey& key) {
+ return get_path_in_local_cache(get_path_in_local_cache(key.hash,
key.meta.expiration_time),
+ key.offset, key.meta.type, false);
+}
+
FSFileCacheStorage::~FSFileCacheStorage() {
if (_cache_background_load_thread.joinable()) {
_cache_background_load_thread.join();
diff --git a/be/src/io/cache/fs_file_cache_storage.h
b/be/src/io/cache/fs_file_cache_storage.h
index ac5d10d0430..8a97aa109ad 100644
--- a/be/src/io/cache/fs_file_cache_storage.h
+++ b/be/src/io/cache/fs_file_cache_storage.h
@@ -70,6 +70,7 @@ public:
void load_blocks_directly_unlocked(BlockFileCache* _mgr, const
FileCacheKey& key,
std::lock_guard<std::mutex>&
cache_lock) override;
Status clear(std::string& msg) override;
+ std::string get_local_file(const FileCacheKey& key) override;
[[nodiscard]] static std::string get_path_in_local_cache(const
std::string& dir, size_t offset,
FileCacheType
type,
diff --git a/be/src/io/cache/mem_file_cache_storage.cpp
b/be/src/io/cache/mem_file_cache_storage.cpp
index bffa75ae305..7e76dd5f88c 100644
--- a/be/src/io/cache/mem_file_cache_storage.cpp
+++ b/be/src/io/cache/mem_file_cache_storage.cpp
@@ -128,4 +128,8 @@ Status MemFileCacheStorage::clear(std::string& msg) {
return Status::OK();
}
+std::string MemFileCacheStorage::get_local_file(const FileCacheKey& key) {
+ return "";
+}
+
} // namespace doris::io
diff --git a/be/src/io/cache/mem_file_cache_storage.h
b/be/src/io/cache/mem_file_cache_storage.h
index 20fdd8ce9f6..82064c6e9ed 100644
--- a/be/src/io/cache/mem_file_cache_storage.h
+++ b/be/src/io/cache/mem_file_cache_storage.h
@@ -44,6 +44,7 @@ public:
void load_blocks_directly_unlocked(BlockFileCache* _mgr, const
FileCacheKey& key,
std::lock_guard<std::mutex>&
cache_lock) override;
Status clear(std::string& msg) override;
+ std::string get_local_file(const FileCacheKey& key) override;
FileCacheStorageType get_type() override { return MEMORY; }
diff --git
a/regression-test/suites/cloud_p0/cache/http/test_list_cache_file.groovy
b/regression-test/suites/cloud_p0/cache/http/test_list_cache_file.groovy
new file mode 100644
index 00000000000..acd33a22f0c
--- /dev/null
+++ b/regression-test/suites/cloud_p0/cache/http/test_list_cache_file.groovy
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_list_cache_file") {
+ sql """ use @regression_cluster_name1 """
+ String[][] backends = sql """ show backends """
+ String backendId;
+ def backendIdToBackendIP = [:]
+ def backendIdToBackendHttpPort = [:]
+ def backendIdToBackendBrpcPort = [:]
+ for (String[] backend in backends) {
+ if (backend[9].equals("true") &&
backend[19].contains("regression_cluster_name1")) {
+ backendIdToBackendIP.put(backend[0], backend[1])
+ backendIdToBackendHttpPort.put(backend[0], backend[4])
+ backendIdToBackendBrpcPort.put(backend[0], backend[5])
+ }
+ }
+ assertEquals(backendIdToBackendIP.size(), 1)
+
+ backendId = backendIdToBackendIP.keySet()[0]
+ def socket = backendIdToBackendIP.get(backendId) + ":" +
backendIdToBackendHttpPort.get(backendId)
+
+ sql "drop table IF EXISTS `user`"
+
+ sql """
+ CREATE TABLE IF NOT EXISTS `user` (
+ `id` int NULL,
+ `name` string NULL
+ )
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "file_cache_ttl_seconds" = "2884"
+ )
+ """
+
+ sql "insert into user select number, cast(rand() as varchar(32)) from
numbers(\"number\"=\"1000000\")"
+
+ def get_tablets = { String tbl_name ->
+ def res = sql "show tablets from ${tbl_name}"
+ List<Integer> tablets = new ArrayList<>()
+ for (final def line in res) {
+ tablets.add(Integer.valueOf(line[0].toString()))
+ }
+ return tablets
+ }
+
+ def get_rowsets = { int tablet_id ->
+ var ret = []
+ httpTest {
+ endpoint ""
+ uri socket + "/api/compaction/show?tablet_id=" + tablet_id
+ op "get"
+ check {respCode, body ->
+ assertEquals(respCode, 200)
+ var map = parseJson(body)
+ for (final def line in map.get("rowsets")) {
+ var tokens = line.toString().split(" ")
+ ret.add(tokens[4])
+ }
+ }
+ }
+ return ret
+ }
+
+ var tablets = get_tablets("user")
+ var rowsets = get_rowsets(tablets.get(0))
+ var segment_file = rowsets[rowsets.size() - 1] + "_0.dat"
+
+ httpTest {
+ endpoint ""
+ uri socket + "/api/file_cache?op=list_cache&value=" + segment_file
+ op "get"
+ check {respCode, body ->
+ assertEquals(respCode, 200)
+ var arr = parseJson(body)
+ assertTrue(arr.size() > 0, "There shouldn't be no cache file at
all, maybe you need to check disk capacity and modify
file_cache_enter_disk_resource_limit_mode_percent in be.conf")
+ }
+ }
+
+ // clear single segment file cache
+ httpTest {
+ endpoint ""
+ uri socket + "/api/file_cache?op=clear&value=" + segment_file
+ op "get"
+ check {respCode, body ->
+ assertEquals(respCode, 200, "clear local cache fail, maybe you can
find something in respond: " + parseJson(body))
+ }
+ }
+
+ httpTest {
+ endpoint ""
+ uri socket + "/api/file_cache?op=list_cache&value=" + segment_file
+ op "get"
+ check {respCode, body ->
+ assertEquals(respCode, 200)
+ var arr = parseJson(body)
+ assertTrue(arr.size() == 0, "local cache files should not greater
than 0, because it has already clear")
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]