This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 098b66ee8cc [cherry-pick][enhancement](cloud) support BE http action: 
list_cache and clear (#41037) (#43412)
098b66ee8cc is described below

commit 098b66ee8cc1be6c1d1c4530ea2bcfcbc64dfbfc
Author: yagagagaga <[email protected]>
AuthorDate: Thu Nov 14 16:23:23 2024 +0800

    [cherry-pick][enhancement](cloud) support BE http action: list_cache and 
clear (#41037) (#43412)
    
    ## Proposed changes
    
    Add a http action which is useful when you debug.
    
    ### API
    ```http
    GET /api/file_cache
    ```
    
    ### request parameter
    
    #### request parameter1
    
    |param|type|desc|require|
    |:---|:---|:---|:---|
    |op|string|the value must be `list_cache`, other value you can refer to
    #40831 #37484 |yes|
    |value|string|the segment file name |yes|
    
    #### request parameter2
    
    |param|type|desc|require|
    |:---|:---|:---|:---|
    |op|string|the value must be `clear`, other value you can refer to
    #40831 #37484 |yes|
    |value|string|the segment file name |yes|
    |sync|bool|clean local cache in sync |no|
    
    ### response
    
    #### response1
    
    if success
    |param|type|desc|
    |:---|:---|:---|
    ||array|return the segment file cache in local path|
    
    if fail
    |param|type|desc|
    |:---|:---|:---|
    ||array|empty array|
    
    #### response2
    
    if success
    |param|type|desc|
    |:---|:---|:---|
    |status|string||
    |msg|string||
    
    ### example
    
    #### case 1
    
    ```bash
    curl  
'172.100.0.4:8040/api/file_cache?op=list_cache&value=0200000000000001bf42c14374fff491ffb7c89a1a65c5bb_0.dat'
    ```
    
    return
    ```json
    ["/opt/doris/be/file_cache/c6a/c6a599f453f67f0949f80ad9990fa3dd/0"]
    ```
    
    #### case 2
    
    ```bash
    curl 
'127.0.0.1:8040/api/file_cache?op=clear&sync=true&value=0200000000000001284b68fea3dcfe8a83e65cd88426b081_0.dat'
    ```
    
    return
    ```json
    {
        "status": "OK",
        "msg": "OK"
    }
    ```
    
    (cherry picked from commit 99d07480d3ae685f57b6cf61866343863b481489)
---
 be/src/http/action/file_cache_action.cpp           |  33 +++++-
 be/src/io/cache/block_file_cache_factory.cpp       |  17 +++
 be/src/io/cache/block_file_cache_factory.h         |   2 +
 be/src/io/cache/file_block.cpp                     |   4 +
 be/src/io/cache/file_block.h                       |   2 +
 be/src/io/cache/file_cache_storage.h               |   2 +
 be/src/io/cache/fs_file_cache_storage.cpp          |   5 +
 be/src/io/cache/fs_file_cache_storage.h            |   1 +
 be/src/io/cache/mem_file_cache_storage.cpp         |   4 +
 be/src/io/cache/mem_file_cache_storage.h           |   1 +
 .../cache/http/test_list_cache_file.groovy         | 117 +++++++++++++++++++++
 11 files changed, 187 insertions(+), 1 deletion(-)

diff --git a/be/src/http/action/file_cache_action.cpp 
b/be/src/http/action/file_cache_action.cpp
index f31c040c5cf..740bac46edf 100644
--- a/be/src/http/action/file_cache_action.cpp
+++ b/be/src/http/action/file_cache_action.cpp
@@ -17,10 +17,15 @@
 
 #include "file_cache_action.h"
 
+#include <glog/logging.h>
+
+#include <algorithm>
 #include <memory>
 #include <shared_mutex>
 #include <sstream>
 #include <string>
+#include <string_view>
+#include <vector>
 
 #include "common/status.h"
 #include "http/http_channel.h"
@@ -30,6 +35,7 @@
 #include "io/cache/block_file_cache.h"
 #include "io/cache/block_file_cache_factory.h"
 #include "io/cache/file_cache_common.h"
+#include "io/cache/fs_file_cache_storage.h"
 #include "olap/olap_define.h"
 #include "olap/tablet_meta.h"
 #include "util/easy_json.h"
@@ -43,6 +49,7 @@ constexpr static std::string_view PATH = "path";
 constexpr static std::string_view CLEAR = "clear";
 constexpr static std::string_view RESET = "reset";
 constexpr static std::string_view HASH = "hash";
+constexpr static std::string_view LIST_CACHE = "list_cache";
 constexpr static std::string_view CAPACITY = "capacity";
 constexpr static std::string_view RELEASE = "release";
 constexpr static std::string_view BASE_PATH = "base_path";
@@ -66,7 +73,14 @@ Status FileCacheAction::_handle_header(HttpRequest* req, 
std::string* json_metri
         *json_metrics = json.ToString();
     } else if (operation == CLEAR) {
         const std::string& sync = req->param(SYNC.data());
-        auto ret = 
io::FileCacheFactory::instance()->clear_file_caches(to_lower(sync) == "true");
+        const std::string& segment_path = req->param(VALUE.data());
+        if (segment_path.empty()) {
+            io::FileCacheFactory::instance()->clear_file_caches(to_lower(sync) 
== "true");
+        } else {
+            io::UInt128Wrapper hash = io::BlockFileCache::hash(segment_path);
+            io::BlockFileCache* cache = 
io::FileCacheFactory::instance()->get_by_path(hash);
+            cache->remove_if_cached(hash);
+        }
     } else if (operation == RESET) {
         std::string capacity = req->param(CAPACITY.data());
         int64_t new_capacity = 0;
@@ -96,6 +110,23 @@ Status FileCacheAction::_handle_header(HttpRequest* req, 
std::string* json_metri
             json[HASH.data()] = ret.to_string();
             *json_metrics = json.ToString();
         }
+    } else if (operation == LIST_CACHE) {
+        const std::string& segment_path = req->param(VALUE.data());
+        if (segment_path.empty()) {
+            st = Status::InvalidArgument("missing parameter: {} is required", 
VALUE.data());
+        } else {
+            io::UInt128Wrapper cache_hash = 
io::BlockFileCache::hash(segment_path);
+            std::vector<std::string> cache_files =
+                    
io::FileCacheFactory::instance()->get_cache_file_by_path(cache_hash);
+            if (cache_files.empty()) {
+                *json_metrics = "[]";
+            } else {
+                EasyJson json;
+                std::for_each(cache_files.begin(), cache_files.end(),
+                              [&json](auto& x) { json.PushBack(x); });
+                *json_metrics = json.ToString();
+            }
+        }
     } else {
         st = Status::InternalError("invalid operation: {}", operation);
     }
diff --git a/be/src/io/cache/block_file_cache_factory.cpp 
b/be/src/io/cache/block_file_cache_factory.cpp
index 8370962ddd5..2d0d25735fe 100644
--- a/be/src/io/cache/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block_file_cache_factory.cpp
@@ -21,6 +21,9 @@
 #include "io/cache/block_file_cache_factory.h"
 
 #include <glog/logging.h>
+
+#include <string>
+#include <vector>
 #if defined(__APPLE__)
 #include <sys/mount.h>
 #else
@@ -118,6 +121,20 @@ Status FileCacheFactory::create_file_cache(const 
std::string& cache_base_path,
     return Status::OK();
 }
 
+std::vector<std::string> FileCacheFactory::get_cache_file_by_path(const 
UInt128Wrapper& hash) {
+    io::BlockFileCache* cache = 
io::FileCacheFactory::instance()->get_by_path(hash);
+    auto blocks = cache->get_blocks_by_key(hash);
+    std::vector<std::string> ret;
+    if (blocks.empty()) {
+        return ret;
+    } else {
+        for (auto& [_, fb] : blocks) {
+            ret.emplace_back(fb->get_cache_file());
+        }
+    }
+    return ret;
+}
+
 BlockFileCache* FileCacheFactory::get_by_path(const UInt128Wrapper& key) {
     // dont need lock mutex because _caches is immutable after 
create_file_cache
     return _caches[KeyHash()(key) % _caches.size()].get();
diff --git a/be/src/io/cache/block_file_cache_factory.h 
b/be/src/io/cache/block_file_cache_factory.h
index 12714fd2087..b00bd7bdfcb 100644
--- a/be/src/io/cache/block_file_cache_factory.h
+++ b/be/src/io/cache/block_file_cache_factory.h
@@ -62,6 +62,8 @@ public:
 
     [[nodiscard]] size_t get_cache_instance_size() const { return 
_caches.size(); }
 
+    std::vector<std::string> get_cache_file_by_path(const UInt128Wrapper& 
hash);
+
     BlockFileCache* get_by_path(const UInt128Wrapper& hash);
     BlockFileCache* get_by_path(const std::string& cache_base_path);
     std::vector<BlockFileCache::QueryFileCacheContextHolderPtr> 
get_query_context_holders(
diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp
index 4576b9dbba8..44cad5520ea 100644
--- a/be/src/io/cache/file_block.cpp
+++ b/be/src/io/cache/file_block.cpp
@@ -272,6 +272,10 @@ std::string FileBlock::state_to_string(FileBlock::State 
state) {
     }
 }
 
+std::string FileBlock::get_cache_file() const {
+    return _mgr->_storage->get_local_file(this->_key);
+}
+
 FileBlocksHolder::~FileBlocksHolder() {
     for (auto file_block_it = file_blocks.begin(); file_block_it != 
file_blocks.end();) {
         auto current_file_block_it = file_block_it;
diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h
index 6e49a597b7b..3a4490d67a3 100644
--- a/be/src/io/cache/file_block.h
+++ b/be/src/io/cache/file_block.h
@@ -123,6 +123,8 @@ public:
 
     uint64_t expiration_time() const { return _key.meta.expiration_time; }
 
+    std::string get_cache_file() const;
+
     State state_unlock(std::lock_guard<std::mutex>&) const;
 
     FileBlock& operator=(const FileBlock&) = delete;
diff --git a/be/src/io/cache/file_cache_storage.h 
b/be/src/io/cache/file_cache_storage.h
index 642c4711cf6..024e701c6fa 100644
--- a/be/src/io/cache/file_cache_storage.h
+++ b/be/src/io/cache/file_cache_storage.h
@@ -65,6 +65,8 @@ public:
     // force clear all current data in the cache
     virtual Status clear(std::string& msg) = 0;
     virtual FileCacheStorageType get_type() = 0;
+    // get local cached file
+    virtual std::string get_local_file(const FileCacheKey& key) = 0;
 };
 
 } // namespace doris::io
diff --git a/be/src/io/cache/fs_file_cache_storage.cpp 
b/be/src/io/cache/fs_file_cache_storage.cpp
index 8471f6e12b4..cf1cd41a537 100644
--- a/be/src/io/cache/fs_file_cache_storage.cpp
+++ b/be/src/io/cache/fs_file_cache_storage.cpp
@@ -677,6 +677,11 @@ Status FSFileCacheStorage::clear(std::string& msg) {
     return Status::OK();
 }
 
+std::string FSFileCacheStorage::get_local_file(const FileCacheKey& key) {
+    return get_path_in_local_cache(get_path_in_local_cache(key.hash, 
key.meta.expiration_time),
+                                   key.offset, key.meta.type, false);
+}
+
 FSFileCacheStorage::~FSFileCacheStorage() {
     if (_cache_background_load_thread.joinable()) {
         _cache_background_load_thread.join();
diff --git a/be/src/io/cache/fs_file_cache_storage.h 
b/be/src/io/cache/fs_file_cache_storage.h
index ac5d10d0430..8a97aa109ad 100644
--- a/be/src/io/cache/fs_file_cache_storage.h
+++ b/be/src/io/cache/fs_file_cache_storage.h
@@ -70,6 +70,7 @@ public:
     void load_blocks_directly_unlocked(BlockFileCache* _mgr, const 
FileCacheKey& key,
                                        std::lock_guard<std::mutex>& 
cache_lock) override;
     Status clear(std::string& msg) override;
+    std::string get_local_file(const FileCacheKey& key) override;
 
     [[nodiscard]] static std::string get_path_in_local_cache(const 
std::string& dir, size_t offset,
                                                              FileCacheType 
type,
diff --git a/be/src/io/cache/mem_file_cache_storage.cpp 
b/be/src/io/cache/mem_file_cache_storage.cpp
index bffa75ae305..7e76dd5f88c 100644
--- a/be/src/io/cache/mem_file_cache_storage.cpp
+++ b/be/src/io/cache/mem_file_cache_storage.cpp
@@ -128,4 +128,8 @@ Status MemFileCacheStorage::clear(std::string& msg) {
     return Status::OK();
 }
 
+std::string MemFileCacheStorage::get_local_file(const FileCacheKey& key) {
+    return "";
+}
+
 } // namespace doris::io
diff --git a/be/src/io/cache/mem_file_cache_storage.h 
b/be/src/io/cache/mem_file_cache_storage.h
index 20fdd8ce9f6..82064c6e9ed 100644
--- a/be/src/io/cache/mem_file_cache_storage.h
+++ b/be/src/io/cache/mem_file_cache_storage.h
@@ -44,6 +44,7 @@ public:
     void load_blocks_directly_unlocked(BlockFileCache* _mgr, const 
FileCacheKey& key,
                                        std::lock_guard<std::mutex>& 
cache_lock) override;
     Status clear(std::string& msg) override;
+    std::string get_local_file(const FileCacheKey& key) override;
 
     FileCacheStorageType get_type() override { return MEMORY; }
 
diff --git 
a/regression-test/suites/cloud_p0/cache/http/test_list_cache_file.groovy 
b/regression-test/suites/cloud_p0/cache/http/test_list_cache_file.groovy
new file mode 100644
index 00000000000..acd33a22f0c
--- /dev/null
+++ b/regression-test/suites/cloud_p0/cache/http/test_list_cache_file.groovy
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_list_cache_file") {
+    sql """ use @regression_cluster_name1 """
+    String[][] backends = sql """ show backends """
+    String backendId;
+    def backendIdToBackendIP = [:]
+    def backendIdToBackendHttpPort = [:]
+    def backendIdToBackendBrpcPort = [:]
+    for (String[] backend in backends) {
+        if (backend[9].equals("true") && 
backend[19].contains("regression_cluster_name1")) {
+            backendIdToBackendIP.put(backend[0], backend[1])
+            backendIdToBackendHttpPort.put(backend[0], backend[4])
+            backendIdToBackendBrpcPort.put(backend[0], backend[5])
+        }
+    }
+    assertEquals(backendIdToBackendIP.size(), 1)
+
+    backendId = backendIdToBackendIP.keySet()[0]
+    def socket = backendIdToBackendIP.get(backendId) + ":" + 
backendIdToBackendHttpPort.get(backendId)
+
+    sql "drop table IF EXISTS `user`"
+
+    sql """
+        CREATE TABLE IF NOT EXISTS `user` (
+            `id` int NULL,
+            `name` string NULL
+        )
+        UNIQUE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+        "file_cache_ttl_seconds" = "2884"
+        )
+    """
+
+    sql "insert into user select number, cast(rand() as varchar(32)) from 
numbers(\"number\"=\"1000000\")"
+
+    def get_tablets = { String tbl_name ->
+        def res = sql "show tablets from ${tbl_name}"
+        List<Integer> tablets = new ArrayList<>()
+        for (final def line in res) {
+            tablets.add(Integer.valueOf(line[0].toString()))
+        }
+        return tablets
+    }
+
+    def get_rowsets = { int tablet_id ->
+        var ret = []
+        httpTest {
+            endpoint ""
+            uri socket + "/api/compaction/show?tablet_id=" + tablet_id
+            op "get"
+            check {respCode, body ->
+                assertEquals(respCode, 200)
+                var map = parseJson(body)
+                for (final def line in map.get("rowsets")) {
+                    var tokens = line.toString().split(" ")
+                    ret.add(tokens[4])
+                }
+            }
+        }
+        return ret
+    }
+
+    var tablets = get_tablets("user")
+    var rowsets = get_rowsets(tablets.get(0))
+    var segment_file = rowsets[rowsets.size() - 1] + "_0.dat"
+
+    httpTest {
+        endpoint ""
+        uri socket + "/api/file_cache?op=list_cache&value=" + segment_file
+        op "get"
+        check {respCode, body ->
+            assertEquals(respCode, 200)
+            var arr = parseJson(body)
+            assertTrue(arr.size() > 0, "There shouldn't be no cache file at 
all, maybe you need to check disk capacity and modify 
file_cache_enter_disk_resource_limit_mode_percent in be.conf")
+        }
+    }
+
+    // clear single segment file cache
+    httpTest {
+        endpoint ""
+        uri socket + "/api/file_cache?op=clear&value=" + segment_file
+        op "get"
+        check {respCode, body ->
+            assertEquals(respCode, 200, "clear local cache fail, maybe you can 
find something in respond: " + parseJson(body))
+        }
+    }
+
+    httpTest {
+        endpoint ""
+        uri socket + "/api/file_cache?op=list_cache&value=" + segment_file
+        op "get"
+        check {respCode, body ->
+            assertEquals(respCode, 200)
+            var arr = parseJson(body)
+            assertTrue(arr.size() == 0, "local cache files should not greater 
than 0, because it has already clear")
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to