This is an automated email from the ASF dual-hosted git repository. cmcfarlen pushed a commit to branch 10.2.x in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit be5b8383e7551b05a78815587e266c48504408b3 Author: Leif Hedstrom <[email protected]> AuthorDate: Thu May 28 10:34:14 2026 -0600 Slice: Add prefetch deduplication and freelist (#12949) * Slice: Add prefetch deduplication and freelist Track in-flight prefetches per remap instance using a mutex- protected set to prevent duplicate upstream requests. Recycle BgBlockFetch objects via a freelist under the same mutex. Fire the initial prefetch burst at header parse time so faster clients benefit sooner. Co-Authored-By: Craig Taylor (and research) * Address Copilot review comments Guard prefetchCleanup() with m_prefetch_mutex to prevent data race with concurrent prefetchRelease() calls. Add unit tests for prefetchAcquire() dedup semantics. Add missing #include <string_view> to prefetch.h. Null-guard the result of TSUrlStringGet() in schedule_prefetch() before constructing std::string_view. (cherry picked from commit cfae05c35ec88b7bfcbbe19f5c7f855ed7d2c7d7) --- plugins/slice/Config.cc | 34 +++++++++++++++++++ plugins/slice/Config.h | 13 ++++++++ plugins/slice/prefetch.cc | 59 +++++++++++++++++++++++++++++---- plugins/slice/prefetch.h | 16 +++++---- plugins/slice/server.cc | 2 ++ plugins/slice/unit-tests/test_config.cc | 20 +++++++++++ plugins/slice/util.cc | 54 +++++++++++++++++++++--------- plugins/slice/util.h | 2 ++ 8 files changed, 172 insertions(+), 28 deletions(-) diff --git a/plugins/slice/Config.cc b/plugins/slice/Config.cc index e60d894b7a..b95ace3bab 100644 --- a/plugins/slice/Config.cc +++ b/plugins/slice/Config.cc @@ -36,6 +36,7 @@ Config::~Config() if (nullptr != m_regex) { delete m_regex; } + prefetchCleanup(); } int64_t @@ -387,3 +388,36 @@ Config::sizeCacheRemove(std::string_view url) m_oscache->remove(url); } } + +std::pair<bool, BgBlockFetch *> +Config::prefetchAcquire(const std::string &key) +{ + std::lock_guard<std::mutex> const guard(m_prefetch_mutex); + auto [it, inserted] = m_prefetch_active.insert(key); + + if (!inserted) { + return {false, nullptr}; + } + + BgBlockFetch *bg = nullptr; + + if (!m_prefetch_freelist.empty()) { + bg = m_prefetch_freelist.back(); + m_prefetch_freelist.pop_back(); + } + + return {true, bg}; +} + +#if defined(UNITTEST) +// Stubs for unit tests that don't link prefetch.cc +void +Config::prefetchRelease(BgBlockFetch *) +{ +} + +void +Config::prefetchCleanup() +{ +} +#endif diff --git a/plugins/slice/Config.h b/plugins/slice/Config.h index 4d57ea10cc..4fd15a2506 100644 --- a/plugins/slice/Config.h +++ b/plugins/slice/Config.h @@ -25,6 +25,10 @@ #include <string> #include <mutex> +#include <unordered_set> +#include <vector> + +struct BgBlockFetch; // Data Structures and Classes struct Config { @@ -79,6 +83,10 @@ struct Config { // Did we cache this internally as a small object? bool isKnownLargeObj(std::string_view url); + // Prefetch dedup and freelist + std::pair<bool, BgBlockFetch *> prefetchAcquire(const std::string &key); + void prefetchRelease(BgBlockFetch *bg); + // Metadata cache stats std::string stat_prefix{}; int stat_TP{0}, stat_TN{0}, stat_FP{0}, stat_FN{0}, stat_no_cl{0}, stat_bad_cl{0}, stat_no_url{0}; @@ -89,4 +97,9 @@ private: std::mutex m_mutex; std::optional<ObjectSizeCache> m_oscache; void setCacheSize(size_t entries); + + std::mutex m_prefetch_mutex; + std::unordered_set<std::string> m_prefetch_active; + std::vector<BgBlockFetch *> m_prefetch_freelist; + void prefetchCleanup(); }; diff --git a/plugins/slice/prefetch.cc b/plugins/slice/prefetch.cc index 5e5f229566..50f7875c73 100644 --- a/plugins/slice/prefetch.cc +++ b/plugins/slice/prefetch.cc @@ -25,16 +25,61 @@ #include "prefetch.h" bool -BgBlockFetch::schedule(Data *const data, int blocknum) +BgBlockFetch::schedule(Data *const data, int blocknum, std::string_view url) { - bool ret = false; - BgBlockFetch *bg = new BgBlockFetch(blocknum); + std::string key = std::string(url) + ':' + std::to_string(blocknum); + auto [acquired, bg] = data->m_config->prefetchAcquire(key); + + if (!acquired) { + DEBUG_LOG("Prefetch already in flight for block %d, skipping", blocknum); + return false; + } + + // Nothing on the freelist, so make a new object + if (!bg) { + bg = new BgBlockFetch(); + } + + bg->m_blocknum = blocknum; + bg->m_config = data->m_config; + bg->m_key = std::move(key); + if (bg->fetch(data)) { - ret = true; + return true; } else { + bg->m_config->prefetchRelease(bg); + return false; + } +} + +void +BgBlockFetch::clear() +{ + m_blocknum = 0; + m_cont = nullptr; + m_config = nullptr; + m_key.clear(); +} + +void +Config::prefetchRelease(BgBlockFetch *bg) +{ + std::lock_guard<std::mutex> const guard(m_prefetch_mutex); + + m_prefetch_active.erase(bg->m_key); + bg->clear(); + m_prefetch_freelist.push_back(bg); +} + +void +Config::prefetchCleanup() +{ + std::lock_guard<std::mutex> const guard(m_prefetch_mutex); + + for (auto *bg : m_prefetch_freelist) { delete bg; } - return ret; + m_prefetch_freelist.clear(); } /** @@ -132,15 +177,15 @@ BgBlockFetch::handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */ case TS_EVENT_ERROR: bg->m_stream.abort(); TSContDataSet(contp, nullptr); - delete bg; TSContDestroy(contp); + bg->m_config->prefetchRelease(bg); break; case TS_EVENT_VCONN_READ_COMPLETE: case TS_EVENT_VCONN_EOS: bg->m_stream.close(); TSContDataSet(contp, nullptr); - delete bg; TSContDestroy(contp); + bg->m_config->prefetchRelease(bg); break; default: DEBUG_LOG("Unhandled bg fetch event:%s (%d)", TSHttpEventNameLookup(event), event); diff --git a/plugins/slice/prefetch.h b/plugins/slice/prefetch.h index 261d1db700..4a0ce972b6 100644 --- a/plugins/slice/prefetch.h +++ b/plugins/slice/prefetch.h @@ -23,7 +23,8 @@ #pragma once -#include <map> +#include <string> +#include <string_view> #include "ts/ts.h" #include "Data.h" @@ -33,15 +34,18 @@ * @brief Represents a single background fetch. */ struct BgBlockFetch { - static bool schedule(Data *const data, int blocknum); + static bool schedule(Data *const data, int blocknum, std::string_view url); - explicit BgBlockFetch(int blocknum) : m_blocknum(blocknum) {} + BgBlockFetch() = default; bool fetch(Data *const data); static int handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */); + void clear(); /* This is for the actual background fetch / NetVC */ - Stage m_stream; - int m_blocknum; - TSCont m_cont = nullptr; + Stage m_stream; + int m_blocknum{0}; + TSCont m_cont{nullptr}; + Config *m_config{nullptr}; + std::string m_key; }; diff --git a/plugins/slice/server.cc b/plugins/slice/server.cc index 3f2ca4f7c9..ffcf653de9 100644 --- a/plugins/slice/server.cc +++ b/plugins/slice/server.cc @@ -606,6 +606,8 @@ handle_server_resp(TSCont contp, TSEvent event, Data *const data) data->m_blockskip = data->m_req_range.skipBytesForBlock(data->m_config->m_blockbytes, data->m_blocknum); } break; } + + schedule_prefetch(data); } transfer_content_bytes(data); diff --git a/plugins/slice/unit-tests/test_config.cc b/plugins/slice/unit-tests/test_config.cc index 4610f40907..ac56582472 100644 --- a/plugins/slice/unit-tests/test_config.cc +++ b/plugins/slice/unit-tests/test_config.cc @@ -80,6 +80,26 @@ TEST_CASE("config bytesfrom invalid parsing", "[AWS][slice][utility]") } } +TEST_CASE("prefetchAcquire deduplication", "[AWS][slice][utility]") +{ + Config config; + + // Acquiring a new key should succeed with no freelist item. + auto [acquired1, bg1] = config.prefetchAcquire("http://example.com/file:0"); + CHECK(acquired1 == true); + CHECK(bg1 == nullptr); + + // Acquiring the same key again should fail (dedup). + auto [acquired2, bg2] = config.prefetchAcquire("http://example.com/file:0"); + CHECK(acquired2 == false); + CHECK(bg2 == nullptr); + + // A distinct key should succeed independently. + auto [acquired3, bg3] = config.prefetchAcquire("http://example.com/file:1"); + CHECK(acquired3 == true); + CHECK(bg3 == nullptr); +} + TEST_CASE("config fromargs validate sizes", "[AWS][slice][utility]") { char const *const appname = "slice.so"; diff --git a/plugins/slice/util.cc b/plugins/slice/util.cc index 579aac447a..6792a1969b 100644 --- a/plugins/slice/util.cc +++ b/plugins/slice/util.cc @@ -45,6 +45,41 @@ abort(TSCont const contp, Data *const data) TSContDestroy(contp); } +void +schedule_prefetch(Data *const data) +{ + if (!data->m_prefetchable || data->m_config->m_prefetchcount <= 0) { + return; + } + + int urllen = 0; + char *const urlstr = TSUrlStringGet(data->m_urlbuf, data->m_urlloc, &urllen); + + if (urlstr == nullptr || urllen <= 0) { + TSfree(urlstr); + return; + } + + std::string_view const url(urlstr, urllen); + int nextblocknum = data->m_blocknum + 1; + + if (data->m_blocknum > data->m_req_range.firstBlockFor(data->m_config->m_blockbytes) + 1) { + nextblocknum = data->m_blocknum + data->m_config->m_prefetchcount; + } + + for (int i = nextblocknum; i <= data->m_blocknum + data->m_config->m_prefetchcount; i++) { + if (data->m_req_range.blockIsInside(data->m_config->m_blockbytes, i)) { + if (BgBlockFetch::schedule(data, i, url)) { + DEBUG_LOG("Background fetch requested"); + } else { + DEBUG_LOG("Background fetch not requested"); + } + } + } + + TSfree(urlstr); +} + // create and issue a block request bool request_block(TSCont contp, Data *const data) @@ -151,22 +186,11 @@ request_block(TSCont contp, Data *const data) DEBUG_LOG("Headers\n%s", headerstr.c_str()); } - // if prefetch config set, schedule next block requests in background - if (data->m_prefetchable && data->m_config->m_prefetchcount > 0) { - int nextblocknum = data->m_blocknum + 1; - if (data->m_blocknum > data->m_req_range.firstBlockFor(data->m_config->m_blockbytes) + 1) { - nextblocknum = data->m_blocknum + data->m_config->m_prefetchcount; - } - for (int i = nextblocknum; i <= data->m_blocknum + data->m_config->m_prefetchcount; i++) { - if (data->m_req_range.blockIsInside(data->m_config->m_blockbytes, i)) { - if (BgBlockFetch::schedule(data, i)) { - DEBUG_LOG("Background fetch requested"); - } else { - DEBUG_LOG("Background fetch not requested"); - } - } - } + // Extend prefetch sliding window past the initial burst + if (data->m_blocknum > data->m_req_range.firstBlockFor(data->m_config->m_blockbytes) + 1) { + schedule_prefetch(data); } + // get ready for data back from the server data->m_upstream.setupVioRead(contp, INT64_MAX); diff --git a/plugins/slice/util.h b/plugins/slice/util.h index 9da6f368a3..7c0c57e5cd 100644 --- a/plugins/slice/util.h +++ b/plugins/slice/util.h @@ -33,4 +33,6 @@ void abort(TSCont const contp, Data *const data); bool request_block(TSCont contp, Data *const data); +void schedule_prefetch(Data *const data); + bool reader_avail_more_than(TSIOBufferReader const reader, int64_t bytes);
