This is an automated email from the ASF dual-hosted git repository.
zwoop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 9e9520a448 Parallel dir entry sync options (#12639)
9e9520a448 is described below
commit 9e9520a448c1f7d2928b5a3b3ef8c60609aadfec
Author: Leif Hedstrom <[email protected]>
AuthorDate: Sat Jan 24 12:56:35 2026 -0700
Parallel dir entry sync options (#12639)
---
doc/admin-guide/files/records.yaml.en.rst | 16 +++++++
src/iocore/cache/Cache.cc | 5 +-
src/iocore/cache/CacheDir.cc | 77 ++++++++++++++++++++++++-------
src/iocore/cache/P_CacheDir.h | 20 +++++++-
src/iocore/cache/P_CacheInternal.h | 2 +-
src/iocore/cache/StripeSM.cc | 7 +--
src/iocore/cache/StripeSM.h | 15 +++---
src/records/RecordsConfig.cc | 2 +
8 files changed, 114 insertions(+), 30 deletions(-)
diff --git a/doc/admin-guide/files/records.yaml.en.rst
b/doc/admin-guide/files/records.yaml.en.rst
index 2221d4ceee..e7057658d3 100644
--- a/doc/admin-guide/files/records.yaml.en.rst
+++ b/doc/admin-guide/files/records.yaml.en.rst
@@ -2694,6 +2694,22 @@ Cache Control
:units: millisecond
How long to wait between each write cycle when syncing the cache directory
to disk.
+.. ts:cv:: CONFIG proxy.config.cache.dir.sync_parallel_tasks INT 1
+
+ Number of parallel tasks to use for directory syncing. Each task syncs
+ directories for a different physical drive on ET_TASK threads.
+
+ ======= ==================================================================
+ Value Description
+ ======= ==================================================================
+ ``-1`` Unlimited - one task per drive (maximum parallelism)
+ ``1`` Sequential - one task for all drives (default, safe)
+ ``N`` Parallel - up to N tasks (drives) sync concurrently
+ ======= ==================================================================
+
+ Default is ``1`` (sequential). Set to ``-1`` for maximum parallelism on
+ high-end NVMe arrays, or to ``4-8`` for balanced performance on multi-drive
+ systems.
.. ts:cv:: CONFIG proxy.config.cache.limits.http.max_alts INT 5
diff --git a/src/iocore/cache/Cache.cc b/src/iocore/cache/Cache.cc
index 295a4eadb8..512a5e7bf7 100644
--- a/src/iocore/cache/Cache.cc
+++ b/src/iocore/cache/Cache.cc
@@ -64,6 +64,7 @@ int cache_config_log_alternate_eviction = 0;
int cache_config_dir_sync_frequency = 60;
int cache_config_dir_sync_delay = 500;
int cache_config_dir_sync_max_write = (2 * 1024 * 1024);
+int cache_config_dir_sync_parallel_tasks = 1;
int cache_config_permit_pinning = 0;
int cache_config_select_alternate = 1;
int cache_config_max_doc_size = 0;
@@ -90,7 +91,6 @@ Cache *theCache = nullptr;
std::vector<std::unique_ptr<CacheDisk>> gdisks;
int gndisks = 0;
Cache *caches[NUM_CACHE_FRAG_TYPES] =
{nullptr};
-CacheSync *cacheDirSync =
nullptr;
Store theCacheStore;
StripeSM **gstripes = nullptr;
std::atomic<int> gnstripes = 0;
@@ -884,6 +884,9 @@ ink_cache_init(ts::ModuleVersion v)
cacheProcessor.wait_for_cache =
RecGetRecordInt("proxy.config.http.wait_for_cache").value_or(0);
+ RecEstablishStaticConfigInt32(cache_config_dir_sync_parallel_tasks,
"proxy.config.cache.dir.sync_parallel_tasks");
+ Dbg(dbg_ctl_cache_init, "proxy.config.cache.dir.sync_parallel_tasks = %d",
cache_config_dir_sync_parallel_tasks);
+
RecEstablishStaticConfigInt32(cache_config_persist_bad_disks,
"proxy.config.cache.persist_bad_disks");
Dbg(dbg_ctl_cache_init, "proxy.config.cache.persist_bad_disks = %d",
cache_config_persist_bad_disks);
if (cache_config_persist_bad_disks) {
diff --git a/src/iocore/cache/CacheDir.cc b/src/iocore/cache/CacheDir.cc
index 538fe2a53f..0cdb7251d1 100644
--- a/src/iocore/cache/CacheDir.cc
+++ b/src/iocore/cache/CacheDir.cc
@@ -31,6 +31,9 @@
#include "tscore/hugepages.h"
#include "tscore/Random.h"
#include "ts/ats_probe.h"
+#include "iocore/eventsystem/Tasks.h"
+
+#include <unordered_map>
#ifdef LOOP_CHECK_MODE
#define DIR_LOOP_THRESHOLD 1000
@@ -862,14 +865,56 @@ dir_lookaside_remove(const CacheKey *key, StripeSM
*stripe)
return;
}
-// Cache Sync
-//
+// Cache Dir Sync
void
dir_sync_init()
{
- cacheDirSync = new CacheSync;
- cacheDirSync->trigger = eventProcessor.schedule_in(cacheDirSync,
HRTIME_SECONDS(cache_config_dir_sync_frequency));
+ static std::vector<std::unique_ptr<CacheSync>> cache_syncs;
+ static bool initialized = false;
+ std::unordered_map<CacheDisk *, std::vector<int>> drive_stripe_map;
+
+ if (initialized) {
+ Warning("dir_sync_init() called multiple times - ignoring");
+ return;
+ }
+ initialized = true;
+
+ for (int i = 0; i < gnstripes; i++) {
+ drive_stripe_map[gstripes[i]->disk].push_back(i);
+ }
+
+ if (drive_stripe_map.empty()) {
+ Dbg(dbg_ctl_cache_dir_sync, "No stripes to sync - dir_sync_init complete");
+ return;
+ }
+
+ int num_tasks = std::max(1, (cache_config_dir_sync_parallel_tasks == -1) ?
static_cast<int>(drive_stripe_map.size()) :
+
cache_config_dir_sync_parallel_tasks);
+
+ cache_syncs.resize(num_tasks);
+ for (int i = 0; i < num_tasks; i++) {
+ cache_syncs[i] = std::make_unique<CacheSync>();
+ }
+
+ int task_idx = 0;
+
+ for (auto &[disk, indices] : drive_stripe_map) {
+ int target_task = task_idx % num_tasks;
+
+ Dbg(dbg_ctl_cache_dir_sync, "Disk %s: %zu stripe(s) assigned to task %d",
disk->path, indices.size(), target_task);
+ for (int stripe_idx : indices) {
+ cache_syncs[target_task]->stripe_indices.push_back(stripe_idx);
+ }
+ task_idx++;
+ }
+
+ for (int i = 0; i < num_tasks; i++) {
+ Dbg(dbg_ctl_cache_dir_sync, "Task %d: syncing %zu stripe(s)", i,
cache_syncs[i]->stripe_indices.size());
+ cache_syncs[i]->current_index = 0;
+ cache_syncs[i]->trigger =
+ eventProcessor.schedule_in(cache_syncs[i].get(),
HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK);
+ }
}
void
@@ -930,7 +975,7 @@ sync_cache_dir_on_shutdown()
}
int
-CacheSync::mainEvent(int event, Event *e)
+CacheSync::mainEvent(int event, Event * /* e ATS_UNUSED */)
{
if (trigger) {
trigger->cancel_action();
@@ -938,26 +983,27 @@ CacheSync::mainEvent(int event, Event *e)
}
Lrestart:
- if (stripe_index >= gnstripes) {
- stripe_index = 0;
+ if (current_index >= static_cast<int>(stripe_indices.size())) {
+ current_index = 0;
+#if FREE_BUF_BETWEEN_CYCLES
+ // Free buffer between sync cycles to avoid holding large amounts of memory
if (buf) {
if (buf_huge) {
ats_free_hugepage(buf, buflen);
} else {
ats_free(buf);
}
- buflen = 0;
buf = nullptr;
+ buflen = 0;
buf_huge = false;
}
- Dbg(dbg_ctl_cache_dir_sync, "sync done");
- if (event == EVENT_INTERVAL) {
- trigger = e->ethread->schedule_in(this,
HRTIME_SECONDS(cache_config_dir_sync_frequency));
- } else {
- trigger = eventProcessor.schedule_in(this,
HRTIME_SECONDS(cache_config_dir_sync_frequency));
- }
+#endif
+ Dbg(dbg_ctl_cache_dir_sync, "sync cycle done");
+ trigger = eventProcessor.schedule_in(this,
HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK);
return EVENT_CONT;
}
+ stripe_index = stripe_indices[current_index];
+ current_index++;
StripeSM *stripe = gstripes[stripe_index]; // must be named "vol" to make
STAT macros work.
@@ -1007,6 +1053,7 @@ Lrestart:
if (stripe->is_io_in_progress() || stripe->get_agg_buf_pos()) {
Dbg(dbg_ctl_cache_dir_sync, "Dir %s: waiting for agg buffer",
stripe->hash_text.get());
stripe->dir_sync_waiting = true;
+ stripe->waiting_dir_sync = this;
if (!stripe->is_io_in_progress()) {
stripe->aggWrite(EVENT_IMMEDIATE, nullptr);
}
@@ -1072,9 +1119,7 @@ Lrestart:
return EVENT_CONT;
}
Ldone:
- // done
writepos = 0;
- ++stripe_index;
goto Lrestart;
}
diff --git a/src/iocore/cache/P_CacheDir.h b/src/iocore/cache/P_CacheDir.h
index caf9647db7..fbd2f0b25b 100644
--- a/src/iocore/cache/P_CacheDir.h
+++ b/src/iocore/cache/P_CacheDir.h
@@ -28,6 +28,7 @@
#include "iocore/eventsystem/Continuation.h"
#include "iocore/aio/AIO.h"
#include "tscore/Version.h"
+#include "tscore/hugepages.h"
#include <cstdint>
#include <ctime>
@@ -246,10 +247,25 @@ struct CacheSync : public Continuation {
AIOCallback io;
Event *trigger = nullptr;
ink_hrtime start_time = 0;
- int mainEvent(int event, Event *e);
- void aio_write(int fd, char *b, int n, off_t o);
+
+ std::vector<int> stripe_indices;
+ int current_index{0};
+
+ int mainEvent(int event, Event *e);
+ void aio_write(int fd, char *b, int n, off_t o);
CacheSync() : Continuation(new_ProxyMutex()) {
SET_HANDLER(&CacheSync::mainEvent); }
+
+ ~CacheSync()
+ {
+ if (buf) {
+ if (buf_huge) {
+ ats_free_hugepage(buf, buflen);
+ } else {
+ ats_free(buf);
+ }
+ }
+ }
};
struct StripeHeaderFooter {
diff --git a/src/iocore/cache/P_CacheInternal.h
b/src/iocore/cache/P_CacheInternal.h
index 8a8ee3c3aa..674dd98059 100644
--- a/src/iocore/cache/P_CacheInternal.h
+++ b/src/iocore/cache/P_CacheInternal.h
@@ -96,6 +96,7 @@ extern CacheStatsBlock cache_rsb;
extern int cache_config_dir_sync_frequency;
extern int cache_config_dir_sync_delay;
extern int cache_config_dir_sync_max_write;
+extern int cache_config_dir_sync_parallel_tasks;
extern int cache_config_http_max_alts;
extern int cache_config_log_alternate_eviction;
extern int cache_config_permit_pinning;
@@ -140,7 +141,6 @@ struct CacheRemoveCont : public Continuation {
// Global Data
extern ClassAllocator<CacheVC, false> cacheVConnectionAllocator;
extern ClassAllocator<CacheEvacuateDocVC, false>
cacheEvacuateDocVConnectionAllocator;
-extern CacheSync *cacheDirSync;
// Function Prototypes
int cache_write(CacheVC *, CacheHTTPInfoVector *);
int get_alternate_index(CacheHTTPInfoVector *cache_vector,
CacheKey key);
diff --git a/src/iocore/cache/StripeSM.cc b/src/iocore/cache/StripeSM.cc
index 3ec61df5e1..2e39fc3098 100644
--- a/src/iocore/cache/StripeSM.cc
+++ b/src/iocore/cache/StripeSM.cc
@@ -709,9 +709,9 @@ StripeSM::aggWriteDone(int event, Event *e)
{
cancel_trigger();
- // ensure we have the cacheDirSync lock if we intend to call it later
+ // ensure we have the waiting_dir_sync lock if we intend to call it later
// retaking the current mutex recursively is a NOOP
- CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex,
mutex->thread_holding);
+ CACHE_TRY_LOCK(lock, dir_sync_waiting ? waiting_dir_sync->mutex : mutex,
mutex->thread_holding);
if (!lock.is_locked()) {
eventProcessor.schedule_in(this,
HRTIME_MSECONDS(cache_config_mutex_retry_delay));
return EVENT_CONT;
@@ -759,7 +759,8 @@ StripeSM::aggWriteDone(int event, Event *e)
}
if (dir_sync_waiting) {
dir_sync_waiting = false;
- cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr);
+ waiting_dir_sync->handleEvent(EVENT_IMMEDIATE, nullptr);
+ waiting_dir_sync = nullptr;
}
if (this->_write_buffer.get_pending_writers().head || sync.head) {
return aggWrite(event, e);
diff --git a/src/iocore/cache/StripeSM.h b/src/iocore/cache/StripeSM.h
index 3ae476ac69..40bb972bab 100644
--- a/src/iocore/cache/StripeSM.h
+++ b/src/iocore/cache/StripeSM.h
@@ -85,13 +85,14 @@ public:
StripeInitInfo *init_info = nullptr;
- Cache *cache = nullptr;
- uint32_t last_sync_serial = 0;
- uint32_t last_write_serial = 0;
- bool recover_wrapped = false;
- bool dir_sync_waiting = false;
- bool dir_sync_in_progress = false;
- bool writing_end_marker = false;
+ Cache *cache = nullptr;
+ uint32_t last_sync_serial = 0;
+ uint32_t last_write_serial = 0;
+ bool recover_wrapped = false;
+ bool dir_sync_waiting = false;
+ bool dir_sync_in_progress = false;
+ CacheSync *waiting_dir_sync = nullptr;
+ bool writing_end_marker = false;
CacheKey first_fragment_key;
int64_t first_fragment_offset = 0;
diff --git a/src/records/RecordsConfig.cc b/src/records/RecordsConfig.cc
index 096f522ca9..88526c08ad 100644
--- a/src/records/RecordsConfig.cc
+++ b/src/records/RecordsConfig.cc
@@ -863,6 +863,8 @@ static constexpr RecordElement RecordsConfig[] =
,
{RECT_CONFIG, "proxy.config.cache.dir.sync_max_write", RECD_INT, "2097152",
RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
,
+ {RECT_CONFIG, "proxy.config.cache.dir.sync_parallel_tasks", RECD_INT, "1",
RECU_RESTART_TS, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
+ ,
{RECT_CONFIG, "proxy.config.cache.hostdb.disable_reverse_lookup", RECD_INT,
"0", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
,
{RECT_CONFIG, "proxy.config.cache.select_alternate", RECD_INT, "1",
RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}