This is an automated email from the ASF dual-hosted git repository.

zwoop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e9520a448 Parallel dir entry sync options (#12639)
9e9520a448 is described below

commit 9e9520a448c1f7d2928b5a3b3ef8c60609aadfec
Author: Leif Hedstrom <[email protected]>
AuthorDate: Sat Jan 24 12:56:35 2026 -0700

    Parallel dir entry sync options (#12639)
---
 doc/admin-guide/files/records.yaml.en.rst | 16 +++++++
 src/iocore/cache/Cache.cc                 |  5 +-
 src/iocore/cache/CacheDir.cc              | 77 ++++++++++++++++++++++++-------
 src/iocore/cache/P_CacheDir.h             | 20 +++++++-
 src/iocore/cache/P_CacheInternal.h        |  2 +-
 src/iocore/cache/StripeSM.cc              |  7 +--
 src/iocore/cache/StripeSM.h               | 15 +++---
 src/records/RecordsConfig.cc              |  2 +
 8 files changed, 114 insertions(+), 30 deletions(-)

diff --git a/doc/admin-guide/files/records.yaml.en.rst 
b/doc/admin-guide/files/records.yaml.en.rst
index 2221d4ceee..e7057658d3 100644
--- a/doc/admin-guide/files/records.yaml.en.rst
+++ b/doc/admin-guide/files/records.yaml.en.rst
@@ -2694,6 +2694,22 @@ Cache Control
    :units: millisecond
 
    How long to wait between each write cycle when syncing the cache directory 
to disk.
+.. ts:cv:: CONFIG proxy.config.cache.dir.sync_parallel_tasks INT 1
+
+   Number of parallel tasks to use for directory syncing. Each task syncs
+   directories for a different physical drive on ET_TASK threads.
+
+   ======= ==================================================================
+   Value   Description
+   ======= ==================================================================
+   ``-1``  Unlimited - one task per drive (maximum parallelism)
+   ``1``   Sequential - one task for all drives (default, safe)
+   ``N``   Parallel - up to N tasks (drives) sync concurrently
+   ======= ==================================================================
+
+   Default is ``1`` (sequential). Set to ``-1`` for maximum parallelism on
+   high-end NVMe arrays, or to ``4-8`` for balanced performance on multi-drive
+   systems.
 
 .. ts:cv:: CONFIG proxy.config.cache.limits.http.max_alts INT 5
 
diff --git a/src/iocore/cache/Cache.cc b/src/iocore/cache/Cache.cc
index 295a4eadb8..512a5e7bf7 100644
--- a/src/iocore/cache/Cache.cc
+++ b/src/iocore/cache/Cache.cc
@@ -64,6 +64,7 @@ int     cache_config_log_alternate_eviction        = 0;
 int     cache_config_dir_sync_frequency            = 60;
 int     cache_config_dir_sync_delay                = 500;
 int     cache_config_dir_sync_max_write            = (2 * 1024 * 1024);
+int     cache_config_dir_sync_parallel_tasks       = 1;
 int     cache_config_permit_pinning                = 0;
 int     cache_config_select_alternate              = 1;
 int     cache_config_max_doc_size                  = 0;
@@ -90,7 +91,6 @@ Cache                                    *theCache = nullptr;
 std::vector<std::unique_ptr<CacheDisk>>   gdisks;
 int                                       gndisks                      = 0;
 Cache                                    *caches[NUM_CACHE_FRAG_TYPES] = 
{nullptr};
-CacheSync                                *cacheDirSync                 = 
nullptr;
 Store                                     theCacheStore;
 StripeSM                                **gstripes  = nullptr;
 std::atomic<int>                          gnstripes = 0;
@@ -884,6 +884,9 @@ ink_cache_init(ts::ModuleVersion v)
 
   cacheProcessor.wait_for_cache = 
RecGetRecordInt("proxy.config.http.wait_for_cache").value_or(0);
 
+  RecEstablishStaticConfigInt32(cache_config_dir_sync_parallel_tasks, 
"proxy.config.cache.dir.sync_parallel_tasks");
+  Dbg(dbg_ctl_cache_init, "proxy.config.cache.dir.sync_parallel_tasks = %d", 
cache_config_dir_sync_parallel_tasks);
+
   RecEstablishStaticConfigInt32(cache_config_persist_bad_disks, 
"proxy.config.cache.persist_bad_disks");
   Dbg(dbg_ctl_cache_init, "proxy.config.cache.persist_bad_disks = %d", 
cache_config_persist_bad_disks);
   if (cache_config_persist_bad_disks) {
diff --git a/src/iocore/cache/CacheDir.cc b/src/iocore/cache/CacheDir.cc
index 538fe2a53f..0cdb7251d1 100644
--- a/src/iocore/cache/CacheDir.cc
+++ b/src/iocore/cache/CacheDir.cc
@@ -31,6 +31,9 @@
 #include "tscore/hugepages.h"
 #include "tscore/Random.h"
 #include "ts/ats_probe.h"
+#include "iocore/eventsystem/Tasks.h"
+
+#include <unordered_map>
 
 #ifdef LOOP_CHECK_MODE
 #define DIR_LOOP_THRESHOLD 1000
@@ -862,14 +865,56 @@ dir_lookaside_remove(const CacheKey *key, StripeSM 
*stripe)
   return;
 }
 
-// Cache Sync
-//
+// Cache Dir Sync
 
 void
 dir_sync_init()
 {
-  cacheDirSync          = new CacheSync;
-  cacheDirSync->trigger = eventProcessor.schedule_in(cacheDirSync, 
HRTIME_SECONDS(cache_config_dir_sync_frequency));
+  static std::vector<std::unique_ptr<CacheSync>>    cache_syncs;
+  static bool                                       initialized = false;
+  std::unordered_map<CacheDisk *, std::vector<int>> drive_stripe_map;
+
+  if (initialized) {
+    Warning("dir_sync_init() called multiple times - ignoring");
+    return;
+  }
+  initialized = true;
+
+  for (int i = 0; i < gnstripes; i++) {
+    drive_stripe_map[gstripes[i]->disk].push_back(i);
+  }
+
+  if (drive_stripe_map.empty()) {
+    Dbg(dbg_ctl_cache_dir_sync, "No stripes to sync - dir_sync_init complete");
+    return;
+  }
+
+  int num_tasks = std::max(1, (cache_config_dir_sync_parallel_tasks == -1) ? 
static_cast<int>(drive_stripe_map.size()) :
+                                                                             
cache_config_dir_sync_parallel_tasks);
+
+  cache_syncs.resize(num_tasks);
+  for (int i = 0; i < num_tasks; i++) {
+    cache_syncs[i] = std::make_unique<CacheSync>();
+  }
+
+  int task_idx = 0;
+
+  for (auto &[disk, indices] : drive_stripe_map) {
+    int target_task = task_idx % num_tasks;
+
+    Dbg(dbg_ctl_cache_dir_sync, "Disk %s: %zu stripe(s) assigned to task %d", 
disk->path, indices.size(), target_task);
+    for (int stripe_idx : indices) {
+      cache_syncs[target_task]->stripe_indices.push_back(stripe_idx);
+    }
+    task_idx++;
+  }
+
+  for (int i = 0; i < num_tasks; i++) {
+    Dbg(dbg_ctl_cache_dir_sync, "Task %d: syncing %zu stripe(s)", i, 
cache_syncs[i]->stripe_indices.size());
+    cache_syncs[i]->current_index = 0;
+    cache_syncs[i]->trigger =
+      eventProcessor.schedule_in(cache_syncs[i].get(), 
HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK);
+  }
 }
 
 void
@@ -930,7 +975,7 @@ sync_cache_dir_on_shutdown()
 }
 
 int
-CacheSync::mainEvent(int event, Event *e)
+CacheSync::mainEvent(int event, Event * /* e ATS_UNUSED */)
 {
   if (trigger) {
     trigger->cancel_action();
@@ -938,26 +983,27 @@ CacheSync::mainEvent(int event, Event *e)
   }
 
 Lrestart:
-  if (stripe_index >= gnstripes) {
-    stripe_index = 0;
+  if (current_index >= static_cast<int>(stripe_indices.size())) {
+    current_index = 0;
+#if FREE_BUF_BETWEEN_CYCLES
+    // Free buffer between sync cycles to avoid holding large amounts of memory
     if (buf) {
       if (buf_huge) {
         ats_free_hugepage(buf, buflen);
       } else {
         ats_free(buf);
       }
-      buflen   = 0;
       buf      = nullptr;
+      buflen   = 0;
       buf_huge = false;
     }
-    Dbg(dbg_ctl_cache_dir_sync, "sync done");
-    if (event == EVENT_INTERVAL) {
-      trigger = e->ethread->schedule_in(this, 
HRTIME_SECONDS(cache_config_dir_sync_frequency));
-    } else {
-      trigger = eventProcessor.schedule_in(this, 
HRTIME_SECONDS(cache_config_dir_sync_frequency));
-    }
+#endif
+    Dbg(dbg_ctl_cache_dir_sync, "sync cycle done");
+    trigger = eventProcessor.schedule_in(this, 
HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK);
     return EVENT_CONT;
   }
+  stripe_index = stripe_indices[current_index];
+  current_index++;
 
   StripeSM *stripe = gstripes[stripe_index]; // must be named "vol" to make 
STAT macros work.
 
@@ -1007,6 +1053,7 @@ Lrestart:
       if (stripe->is_io_in_progress() || stripe->get_agg_buf_pos()) {
         Dbg(dbg_ctl_cache_dir_sync, "Dir %s: waiting for agg buffer", 
stripe->hash_text.get());
         stripe->dir_sync_waiting = true;
+        stripe->waiting_dir_sync = this;
         if (!stripe->is_io_in_progress()) {
           stripe->aggWrite(EVENT_IMMEDIATE, nullptr);
         }
@@ -1072,9 +1119,7 @@ Lrestart:
     return EVENT_CONT;
   }
 Ldone:
-  // done
   writepos = 0;
-  ++stripe_index;
   goto Lrestart;
 }
 
diff --git a/src/iocore/cache/P_CacheDir.h b/src/iocore/cache/P_CacheDir.h
index caf9647db7..fbd2f0b25b 100644
--- a/src/iocore/cache/P_CacheDir.h
+++ b/src/iocore/cache/P_CacheDir.h
@@ -28,6 +28,7 @@
 #include "iocore/eventsystem/Continuation.h"
 #include "iocore/aio/AIO.h"
 #include "tscore/Version.h"
+#include "tscore/hugepages.h"
 
 #include <cstdint>
 #include <ctime>
@@ -246,10 +247,25 @@ struct CacheSync : public Continuation {
   AIOCallback io;
   Event      *trigger    = nullptr;
   ink_hrtime  start_time = 0;
-  int         mainEvent(int event, Event *e);
-  void        aio_write(int fd, char *b, int n, off_t o);
+
+  std::vector<int> stripe_indices;
+  int              current_index{0};
+
+  int  mainEvent(int event, Event *e);
+  void aio_write(int fd, char *b, int n, off_t o);
 
   CacheSync() : Continuation(new_ProxyMutex()) { 
SET_HANDLER(&CacheSync::mainEvent); }
+
+  ~CacheSync()
+  {
+    if (buf) {
+      if (buf_huge) {
+        ats_free_hugepage(buf, buflen);
+      } else {
+        ats_free(buf);
+      }
+    }
+  }
 };
 
 struct StripeHeaderFooter {
diff --git a/src/iocore/cache/P_CacheInternal.h 
b/src/iocore/cache/P_CacheInternal.h
index 8a8ee3c3aa..674dd98059 100644
--- a/src/iocore/cache/P_CacheInternal.h
+++ b/src/iocore/cache/P_CacheInternal.h
@@ -96,6 +96,7 @@ extern CacheStatsBlock cache_rsb;
 extern int cache_config_dir_sync_frequency;
 extern int cache_config_dir_sync_delay;
 extern int cache_config_dir_sync_max_write;
+extern int cache_config_dir_sync_parallel_tasks;
 extern int cache_config_http_max_alts;
 extern int cache_config_log_alternate_eviction;
 extern int cache_config_permit_pinning;
@@ -140,7 +141,6 @@ struct CacheRemoveCont : public Continuation {
 // Global Data
 extern ClassAllocator<CacheVC, false>            cacheVConnectionAllocator;
 extern ClassAllocator<CacheEvacuateDocVC, false> 
cacheEvacuateDocVConnectionAllocator;
-extern CacheSync                                *cacheDirSync;
 // Function Prototypes
 int                 cache_write(CacheVC *, CacheHTTPInfoVector *);
 int                 get_alternate_index(CacheHTTPInfoVector *cache_vector, 
CacheKey key);
diff --git a/src/iocore/cache/StripeSM.cc b/src/iocore/cache/StripeSM.cc
index 3ec61df5e1..2e39fc3098 100644
--- a/src/iocore/cache/StripeSM.cc
+++ b/src/iocore/cache/StripeSM.cc
@@ -709,9 +709,9 @@ StripeSM::aggWriteDone(int event, Event *e)
 {
   cancel_trigger();
 
-  // ensure we have the cacheDirSync lock if we intend to call it later
+  // ensure we have the waiting_dir_sync lock if we intend to call it later
   // retaking the current mutex recursively is a NOOP
-  CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, 
mutex->thread_holding);
+  CACHE_TRY_LOCK(lock, dir_sync_waiting ? waiting_dir_sync->mutex : mutex, 
mutex->thread_holding);
   if (!lock.is_locked()) {
     eventProcessor.schedule_in(this, 
HRTIME_MSECONDS(cache_config_mutex_retry_delay));
     return EVENT_CONT;
@@ -759,7 +759,8 @@ StripeSM::aggWriteDone(int event, Event *e)
   }
   if (dir_sync_waiting) {
     dir_sync_waiting = false;
-    cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr);
+    waiting_dir_sync->handleEvent(EVENT_IMMEDIATE, nullptr);
+    waiting_dir_sync = nullptr;
   }
   if (this->_write_buffer.get_pending_writers().head || sync.head) {
     return aggWrite(event, e);
diff --git a/src/iocore/cache/StripeSM.h b/src/iocore/cache/StripeSM.h
index 3ae476ac69..40bb972bab 100644
--- a/src/iocore/cache/StripeSM.h
+++ b/src/iocore/cache/StripeSM.h
@@ -85,13 +85,14 @@ public:
 
   StripeInitInfo *init_info = nullptr;
 
-  Cache   *cache                = nullptr;
-  uint32_t last_sync_serial     = 0;
-  uint32_t last_write_serial    = 0;
-  bool     recover_wrapped      = false;
-  bool     dir_sync_waiting     = false;
-  bool     dir_sync_in_progress = false;
-  bool     writing_end_marker   = false;
+  Cache     *cache                = nullptr;
+  uint32_t   last_sync_serial     = 0;
+  uint32_t   last_write_serial    = 0;
+  bool       recover_wrapped      = false;
+  bool       dir_sync_waiting     = false;
+  bool       dir_sync_in_progress = false;
+  CacheSync *waiting_dir_sync     = nullptr;
+  bool       writing_end_marker   = false;
 
   CacheKey          first_fragment_key;
   int64_t           first_fragment_offset = 0;
diff --git a/src/records/RecordsConfig.cc b/src/records/RecordsConfig.cc
index 096f522ca9..88526c08ad 100644
--- a/src/records/RecordsConfig.cc
+++ b/src/records/RecordsConfig.cc
@@ -863,6 +863,8 @@ static constexpr RecordElement RecordsConfig[] =
   ,
   {RECT_CONFIG, "proxy.config.cache.dir.sync_max_write", RECD_INT, "2097152", 
RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
   ,
+  {RECT_CONFIG, "proxy.config.cache.dir.sync_parallel_tasks", RECD_INT, "1", 
RECU_RESTART_TS, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
+  ,
   {RECT_CONFIG, "proxy.config.cache.hostdb.disable_reverse_lookup", RECD_INT, 
"0", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
   ,
   {RECT_CONFIG, "proxy.config.cache.select_alternate", RECD_INT, "1", 
RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}

Reply via email to