This is an automated email from the ASF dual-hosted git repository.

jvanderzee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new d7b4543dd7 Move `StripeSM` methods to StripeSM.cc (#11617)
d7b4543dd7 is described below

commit d7b4543dd7653ef6796bd1b22a45ed388f508428
Author: JosiahWI <[email protected]>
AuthorDate: Mon Jul 29 10:16:04 2024 -0500

    Move `StripeSM` methods to StripeSM.cc (#11617)
    
    This also updates import lists.
---
 src/iocore/cache/CMakeLists.txt             |    1 +
 src/iocore/cache/CacheWrite.cc              |  574 --------------
 src/iocore/cache/P_CacheVol.h               |    1 -
 src/iocore/cache/Stripe.cc                  |  884 ---------------------
 src/iocore/cache/{Stripe.cc => StripeSM.cc} | 1117 +++++++++++++++++----------
 5 files changed, 729 insertions(+), 1848 deletions(-)

diff --git a/src/iocore/cache/CMakeLists.txt b/src/iocore/cache/CMakeLists.txt
index 3e0e83b9b1..6460b91e39 100644
--- a/src/iocore/cache/CMakeLists.txt
+++ b/src/iocore/cache/CMakeLists.txt
@@ -34,6 +34,7 @@ add_library(
   RamCacheLRU.cc
   Store.cc
   Stripe.cc
+  StripeSM.cc
 )
 add_library(ts::inkcache ALIAS inkcache)
 
diff --git a/src/iocore/cache/CacheWrite.cc b/src/iocore/cache/CacheWrite.cc
index 032ed1092d..1c3c81c62a 100644
--- a/src/iocore/cache/CacheWrite.cc
+++ b/src/iocore/cache/CacheWrite.cc
@@ -23,30 +23,18 @@
 
 #include "P_Cache.h"
 #include "P_CacheDoc.h"
-#include "AggregateWriteBuffer.h"
-#include "CacheEvacuateDocVC.h"
-
-// These macros allow two incrementing unsigned values x and y to maintain
-// their ordering when one of them overflows, given that the values stay close 
to each other.
-#define UINT_WRAP_LTE(_x, _y) (((_y) - (_x)) < INT_MAX)  // exploit overflow
-#define UINT_WRAP_GTE(_x, _y) (((_x) - (_y)) < INT_MAX)  // exploit overflow
-#define UINT_WRAP_LT(_x, _y)  (((_x) - (_y)) >= INT_MAX) // exploit overflow
 
 namespace
 {
 
-DbgCtl dbg_ctl_cache_evac{"cache_evac"};
 DbgCtl dbg_ctl_cache_update{"cache_update"};
-DbgCtl dbg_ctl_cache_disk_error{"cache_disk_error"};
 DbgCtl dbg_ctl_cache_update_alt{"cache_update_alt"};
 
 #ifdef DEBUG
 
-DbgCtl dbg_ctl_cache_agg{"cache_agg"};
 DbgCtl dbg_ctl_cache_stats{"cache_stats"};
 DbgCtl dbg_ctl_cache_write{"cache_write"};
 DbgCtl dbg_ctl_cache_insert{"cache_insert"};
-DbgCtl dbg_ctl_agg_read{"agg_read"};
 
 #endif
 
@@ -273,568 +261,6 @@ CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED 
*/)
   return EVENT_CONT;
 }
 
-EvacuationBlock *
-StripeSM::force_evacuate_head(Dir const *evac_dir, int pinned)
-{
-  auto bucket = dir_evac_bucket(evac_dir);
-  if (!evac_bucket_valid(bucket)) {
-    DDbg(dbg_ctl_cache_evac, "dir_evac_bucket out of bounds, skipping 
evacuate: %" PRId64 "(%d), %d, %d", bucket, evacuate_size,
-         (int)dir_offset(evac_dir), (int)dir_phase(evac_dir));
-    return nullptr;
-  }
-
-  // build an evacuation block for the object
-  EvacuationBlock *b = evacuation_block_exists(evac_dir, this);
-  // if we have already started evacuating this document, its too late
-  // to evacuate the head...bad luck
-  if (b && b->f.done) {
-    return b;
-  }
-
-  if (!b) {
-    b      = new_EvacuationBlock(mutex->thread_holding);
-    b->dir = *evac_dir;
-    DDbg(dbg_ctl_cache_evac, "force: %d, %d", (int)dir_offset(evac_dir), 
(int)dir_phase(evac_dir));
-    evacuate[bucket].push(b);
-  }
-  b->f.pinned        = pinned;
-  b->f.evacuate_head = 1;
-  b->evac_frags.key.clear(); // ensure that the block gets evacuated no matter 
what
-  b->readers = 0;            // ensure that the block does not disappear
-  return b;
-}
-
-void
-StripeSM::scan_for_pinned_documents()
-{
-  if (cache_config_permit_pinning) {
-    // we can't evacuate anything between header->write_pos and
-    // header->write_pos + AGG_SIZE.
-    int ps                = this->offset_to_vol_offset(header->write_pos + 
AGG_SIZE);
-    int pe                = this->offset_to_vol_offset(header->write_pos + 2 * 
EVACUATION_SIZE + (len / PIN_SCAN_EVERY));
-    int vol_end_offset    = this->offset_to_vol_offset(len + skip);
-    int before_end_of_vol = pe < vol_end_offset;
-    DDbg(dbg_ctl_cache_evac, "scan %d %d", ps, pe);
-    for (int i = 0; i < this->direntries(); i++) {
-      // is it a valid pinned object?
-      if (!dir_is_empty(&dir[i]) && dir_pinned(&dir[i]) && dir_head(&dir[i])) {
-        // select objects only within this PIN_SCAN region
-        int o = dir_offset(&dir[i]);
-        if (dir_phase(&dir[i]) == header->phase) {
-          if (before_end_of_vol || o >= (pe - vol_end_offset)) {
-            continue;
-          }
-        } else {
-          if (o < ps || o >= pe) {
-            continue;
-          }
-        }
-        force_evacuate_head(&dir[i], 1);
-      }
-    }
-  }
-}
-
-/* NOTE:: This state can be called by an AIO thread, so DON'T DON'T
-   DON'T schedule any events on this thread using VC_SCHED_XXX or
-   mutex->thread_holding->schedule_xxx_local(). ALWAYS use
-   eventProcessor.schedule_xxx().
-   */
-int
-StripeSM::aggWriteDone(int event, Event *e)
-{
-  cancel_trigger();
-
-  // ensure we have the cacheDirSync lock if we intend to call it later
-  // retaking the current mutex recursively is a NOOP
-  CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, 
mutex->thread_holding);
-  if (!lock.is_locked()) {
-    eventProcessor.schedule_in(this, 
HRTIME_MSECONDS(cache_config_mutex_retry_delay));
-    return EVENT_CONT;
-  }
-  if (io.ok()) {
-    header->last_write_pos  = header->write_pos;
-    header->write_pos      += io.aiocb.aio_nbytes;
-    ink_assert(header->write_pos >= start);
-    DDbg(dbg_ctl_cache_agg, "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 
"", hash_text.get(), header->write_pos,
-         header->last_write_pos);
-    ink_assert(header->write_pos == header->agg_pos);
-    if (header->write_pos + EVACUATION_SIZE > scan_pos) {
-      periodic_scan();
-    }
-    this->_write_buffer.reset_buffer_pos();
-    header->write_serial++;
-  } else {
-    // delete all the directory entries that we inserted
-    // for fragments is this aggregation buffer
-    Dbg(dbg_ctl_cache_disk_error, "Write error on disk %s\n \
-            write range : [%" PRIu64 " - %" PRIu64 " bytes]  [%" PRIu64 " - %" 
PRIu64 " blocks] \n",
-        hash_text.get(), (uint64_t)io.aiocb.aio_offset, 
(uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
-        (uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE, 
(uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
-    Dir del_dir;
-    dir_clear(&del_dir);
-    for (int done = 0; done < this->_write_buffer.get_buffer_pos();) {
-      Doc *doc = reinterpret_cast<Doc *>(this->_write_buffer.get_buffer() + 
done);
-      dir_set_offset(&del_dir, header->write_pos + done);
-      dir_delete(&doc->key, this, &del_dir);
-      done += round_to_approx_size(doc->len);
-    }
-    this->_write_buffer.reset_buffer_pos();
-  }
-  set_io_not_in_progress();
-  // callback ready sync CacheVCs
-  CacheVC *c = nullptr;
-  while ((c = sync.dequeue())) {
-    if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) {
-      eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
-    } else {
-      sync.push(c); // put it back on the front
-      break;
-    }
-  }
-  if (dir_sync_waiting) {
-    dir_sync_waiting = false;
-    cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr);
-  }
-  if (this->_write_buffer.get_pending_writers().head || sync.head) {
-    return aggWrite(event, e);
-  }
-  return EVENT_CONT;
-}
-
-CacheEvacuateDocVC *
-new_DocEvacuator(int nbytes, StripeSM *stripe)
-{
-  CacheEvacuateDocVC *c = new_CacheEvacuateDocVC(stripe);
-  c->op_type            = static_cast<int>(CacheOpType::Evacuate);
-  Metrics::Gauge::increment(cache_rsb.status[c->op_type].active);
-  
Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active);
-  c->buf         = new_IOBufferData(iobuffer_size_to_index(nbytes, 
MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
-  c->stripe      = stripe;
-  c->f.evacuator = 1;
-  c->earliest_key.clear();
-  SET_CONTINUATION_HANDLER(c, &CacheEvacuateDocVC::evacuateDocDone);
-  return c;
-}
-
-static int
-evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int force, StripeSM 
*stripe)
-{
-  Dir dir, *last_collision = nullptr;
-  int i = 0;
-  while (dir_probe(key, stripe, &dir, &last_collision)) {
-    // next fragment cannot be a head...if it is, it must have been a
-    // directory collision.
-    if (dir_head(&dir)) {
-      continue;
-    }
-    EvacuationBlock *b = evacuation_block_exists(&dir, stripe);
-    if (!b) {
-      b                          = 
new_EvacuationBlock(stripe->mutex->thread_holding);
-      b->dir                     = dir;
-      b->evac_frags.key          = *key;
-      b->evac_frags.earliest_key = *earliest_key;
-      stripe->evacuate[dir_evac_bucket(&dir)].push(b);
-      i++;
-    } else {
-      ink_assert(dir_offset(&dir) == dir_offset(&b->dir));
-      ink_assert(dir_phase(&dir) == dir_phase(&b->dir));
-      EvacuationKey *evac_frag = evacuationKeyAllocator.alloc();
-      evac_frag->key           = *key;
-      evac_frag->earliest_key  = *earliest_key;
-      evac_frag->link.next     = b->evac_frags.link.next;
-      b->evac_frags.link.next  = evac_frag;
-    }
-    if (force) {
-      b->readers = 0;
-    }
-    DDbg(dbg_ctl_cache_evac, "next fragment %X Earliest: %X offset %d phase %d 
force %d", (int)key->slice32(0),
-         (int)earliest_key->slice32(0), (int)dir_offset(&dir), 
(int)dir_phase(&dir), force);
-  }
-  return i;
-}
-
-int
-StripeSM::evacuateWrite(CacheEvacuateDocVC *evacuator, int event, Event *e)
-{
-  // push to front of aggregation write list, so it is written first
-
-  evacuator->agg_len = round_to_approx_size((reinterpret_cast<Doc 
*>(evacuator->buf->data()))->len);
-  this->_write_buffer.add_bytes_pending_aggregation(evacuator->agg_len);
-  /* insert the evacuator after all the other evacuators */
-  CacheVC *cur   = static_cast<CacheVC 
*>(this->_write_buffer.get_pending_writers().head);
-  CacheVC *after = nullptr;
-  for (; cur && cur->f.evacuator; cur = (CacheVC *)cur->link.next) {
-    after = cur;
-  }
-  ink_assert(evacuator->agg_len <= AGG_SIZE);
-  this->_write_buffer.get_pending_writers().insert(evacuator, after);
-  return aggWrite(event, e);
-}
-
-int
-StripeSM::evacuateDocReadDone(int event, Event *e)
-{
-  cancel_trigger();
-  if (event != AIO_EVENT_DONE) {
-    return EVENT_DONE;
-  }
-  ink_assert(is_io_in_progress());
-  set_io_not_in_progress();
-  ink_assert(mutex->thread_holding == this_ethread());
-  Doc             *doc = reinterpret_cast<Doc *>(doc_evacuator->buf->data());
-  CacheKey         next_key;
-  EvacuationBlock *b      = nullptr;
-  auto             bucket = dir_evac_bucket(&doc_evacuator->overwrite_dir);
-  if (doc->magic != DOC_MAGIC) {
-    Dbg(dbg_ctl_cache_evac, "DOC magic: %X %d", 
(int)dir_tag(&doc_evacuator->overwrite_dir),
-        (int)dir_offset(&doc_evacuator->overwrite_dir));
-    ink_assert(doc->magic == DOC_MAGIC);
-    goto Ldone;
-  }
-  DDbg(dbg_ctl_cache_evac, "evacuateDocReadDone %X offset %d", 
(int)doc->key.slice32(0),
-       (int)dir_offset(&doc_evacuator->overwrite_dir));
-
-  if (evac_bucket_valid(bucket)) {
-    b = evacuate[bucket].head;
-  }
-  while (b) {
-    if (dir_offset(&b->dir) == dir_offset(&doc_evacuator->overwrite_dir)) {
-      break;
-    }
-    b = b->link.next;
-  }
-  if (!b) {
-    goto Ldone;
-  }
-  // coverity[Y2K38_SAFETY:FALSE]
-  if ((b->f.pinned && !b->readers) && doc->pinned < 
static_cast<uint32_t>(ink_get_hrtime() / HRTIME_SECOND)) {
-    goto Ldone;
-  }
-
-  if (dir_head(&b->dir) && b->f.evacuate_head) {
-    ink_assert(!b->evac_frags.key.fold());
-    // if its a head (vector), evacuation is real simple...we just
-    // need to write this vector down and overwrite the directory entry.
-    if (dir_compare_tag(&b->dir, &doc->first_key)) {
-      doc_evacuator->key = doc->first_key;
-      b->evac_frags.key  = doc->first_key;
-      DDbg(dbg_ctl_cache_evac, "evacuating vector %X offset %d", 
(int)doc->first_key.slice32(0),
-           (int)dir_offset(&doc_evacuator->overwrite_dir));
-      b->f.unused = 57;
-    } else {
-      // if its an earliest fragment (alternate) evacuation, things get
-      // a little tricky. We have to propagate the earliest key to the next
-      // fragments for this alternate. The last fragment to be evacuated
-      // fixes up the lookaside buffer.
-      doc_evacuator->key          = doc->key;
-      doc_evacuator->earliest_key = doc->key;
-      b->evac_frags.key           = doc->key;
-      b->evac_frags.earliest_key  = doc->key;
-      b->earliest_evacuator       = doc_evacuator;
-      DDbg(dbg_ctl_cache_evac, "evacuating earliest %X %X evac: %p offset: 
%d", (int)b->evac_frags.key.slice32(0),
-           (int)doc->key.slice32(0), doc_evacuator, 
(int)dir_offset(&doc_evacuator->overwrite_dir));
-      b->f.unused = 67;
-    }
-  } else {
-    // find which key matches the document
-    EvacuationKey *ek = &b->evac_frags;
-    for (; ek && !(ek->key == doc->key); ek = ek->link.next) {
-      ;
-    }
-    if (!ek) {
-      b->f.unused = 77;
-      goto Ldone;
-    }
-    doc_evacuator->key          = ek->key;
-    doc_evacuator->earliest_key = ek->earliest_key;
-    DDbg(dbg_ctl_cache_evac, "evacuateDocReadDone key: %X earliest: %X", 
(int)ek->key.slice32(0), (int)ek->earliest_key.slice32(0));
-    b->f.unused = 87;
-  }
-  // if the tag in the c->dir does match the first_key in the
-  // document, then it has to be the earliest fragment. We guarantee that
-  // the first_key and the earliest_key will never collide (see
-  // Cache::open_write).
-  if (!dir_head(&b->dir) || !dir_compare_tag(&b->dir, &doc->first_key)) {
-    next_CacheKey(&next_key, &doc->key);
-    evacuate_fragments(&next_key, &doc_evacuator->earliest_key, !b->readers, 
this);
-  }
-  return evacuateWrite(doc_evacuator, event, e);
-Ldone:
-  free_CacheEvacuateDocVC(doc_evacuator);
-  doc_evacuator = nullptr;
-  return aggWrite(event, e);
-}
-
-int
-StripeSM::evac_range(off_t low, off_t high, int evac_phase)
-{
-  off_t s  = this->offset_to_vol_offset(low);
-  off_t e  = this->offset_to_vol_offset(high);
-  int   si = dir_offset_evac_bucket(s);
-  int   ei = dir_offset_evac_bucket(e);
-
-  for (int i = si; i <= ei; i++) {
-    EvacuationBlock *b            = evacuate[i].head;
-    EvacuationBlock *first        = nullptr;
-    int64_t          first_offset = INT64_MAX;
-    for (; b; b = b->link.next) {
-      int64_t offset = dir_offset(&b->dir);
-      int     phase  = dir_phase(&b->dir);
-      if (offset >= s && offset < e && !b->f.done && phase == evac_phase) {
-        if (offset < first_offset) {
-          first        = b;
-          first_offset = offset;
-        }
-      }
-    }
-    if (first) {
-      first->f.done       = 1;
-      io.aiocb.aio_fildes = fd;
-      io.aiocb.aio_nbytes = dir_approx_size(&first->dir);
-      io.aiocb.aio_offset = this->vol_offset(&first->dir);
-      if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > 
static_cast<off_t>(skip + len)) {
-        io.aiocb.aio_nbytes = skip + len - io.aiocb.aio_offset;
-      }
-      doc_evacuator                = new_DocEvacuator(io.aiocb.aio_nbytes, 
this);
-      doc_evacuator->overwrite_dir = first->dir;
-
-      io.aiocb.aio_buf = doc_evacuator->buf->data();
-      io.action        = this;
-      io.thread        = AIO_CALLBACK_THREAD_ANY;
-      DDbg(dbg_ctl_cache_evac, "evac_range evacuating %X %d", 
(int)dir_tag(&first->dir), (int)dir_offset(&first->dir));
-      SET_HANDLER(&StripeSM::evacuateDocReadDone);
-      ink_assert(ink_aio_read(&io) >= 0);
-      return -1;
-    }
-  }
-  return 0;
-}
-
-int
-StripeSM::_agg_copy(CacheVC *vc)
-{
-  if (vc->f.evacuator) {
-    return this->_copy_evacuator_to_aggregation(vc);
-  } else {
-    return this->_copy_writer_to_aggregation(vc);
-  }
-}
-
-inline void
-StripeSM::evacuate_cleanup_blocks(int i)
-{
-  EvacuationBlock *b = evac_bucket_valid(i) ? evacuate[i].head : nullptr;
-  while (b) {
-    if (b->f.done && ((header->phase != dir_phase(&b->dir) && 
header->write_pos > this->vol_offset(&b->dir)) ||
-                      (header->phase == dir_phase(&b->dir) && 
header->write_pos <= this->vol_offset(&b->dir)))) {
-      EvacuationBlock *x = b;
-      DDbg(dbg_ctl_cache_evac, "evacuate cleanup free %X offset %d", 
(int)b->evac_frags.key.slice32(0), (int)dir_offset(&b->dir));
-      b = b->link.next;
-      evacuate[i].remove(x);
-      free_EvacuationBlock(x, mutex->thread_holding);
-      continue;
-    }
-    b = b->link.next;
-  }
-}
-
-void
-StripeSM::evacuate_cleanup()
-{
-  int64_t eo = ((header->write_pos - start) / CACHE_BLOCK_SIZE) + 1;
-  int64_t e  = dir_offset_evac_bucket(eo);
-  int64_t sx = e - (evacuate_size / PIN_SCAN_EVERY) - 1;
-  int64_t s  = sx;
-  int     i;
-
-  if (e > evacuate_size) {
-    e = evacuate_size;
-  }
-  if (sx < 0) {
-    s = 0;
-  }
-  for (i = s; i < e; i++) {
-    evacuate_cleanup_blocks(i);
-  }
-
-  // if we have wrapped, handle the end bit
-  if (sx <= 0) {
-    s = evacuate_size + sx - 2;
-    if (s < 0) {
-      s = 0;
-    }
-    for (i = s; i < evacuate_size; i++) {
-      evacuate_cleanup_blocks(i);
-    }
-  }
-}
-
-void
-StripeSM::periodic_scan()
-{
-  evacuate_cleanup();
-  scan_for_pinned_documents();
-  if (header->write_pos == start) {
-    scan_pos = start;
-  }
-  scan_pos += len / PIN_SCAN_EVERY;
-}
-
-void
-StripeSM::agg_wrap()
-{
-  header->write_pos = start;
-  header->phase     = !header->phase;
-
-  header->cycle++;
-  header->agg_pos = header->write_pos;
-  dir_lookaside_cleanup(this);
-  dir_clean_vol(this);
-  {
-    StripeSM *stripe = this;
-    Metrics::Counter::increment(cache_rsb.directory_wrap);
-    Metrics::Counter::increment(stripe->cache_vol->vol_rsb.directory_wrap);
-    Note("Cache volume %d on disk '%s' wraps around", 
stripe->cache_vol->vol_number, stripe->hash_text.get());
-  }
-  periodic_scan();
-}
-
-/* NOTE: This state can be called by an AIO thread, so DON'T DON'T
-   DON'T schedule any events on this thread using VC_SCHED_XXX or
-   mutex->thread_holding->schedule_xxx_local(). ALWAYS use
-   eventProcessor.schedule_xxx().
-   Also, make sure that any functions called by this also use
-   the eventProcessor to schedule events
-*/
-int
-StripeSM::aggWrite(int event, void * /* e ATS_UNUSED */)
-{
-  ink_assert(!is_io_in_progress());
-
-  Que(CacheVC, link) tocall;
-  CacheVC *c;
-
-  cancel_trigger();
-
-Lagain:
-  this->aggregate_pending_writes(tocall);
-
-  // if we got nothing...
-  if (this->_write_buffer.is_empty()) {
-    if (!this->_write_buffer.get_pending_writers().head && !sync.head) { // 
nothing to get
-      return EVENT_CONT;
-    }
-    if (header->write_pos == start) {
-      // write aggregation too long, bad bad, punt on everything.
-      Note("write aggregation exceeds vol size");
-      ink_assert(!tocall.head);
-      ink_assert(false);
-      while ((c = this->get_pending_writers().dequeue())) {
-        this->_write_buffer.add_bytes_pending_aggregation(-c->agg_len);
-        eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
-      }
-      return EVENT_CONT;
-    }
-    // start back
-    if (this->get_pending_writers().head) {
-      agg_wrap();
-      goto Lagain;
-    }
-  }
-
-  // evacuate space
-  off_t end = header->write_pos + this->_write_buffer.get_buffer_pos() + 
EVACUATION_SIZE;
-  if (evac_range(header->write_pos, end, !header->phase) < 0) {
-    goto Lwait;
-  }
-  if (end > skip + len) {
-    if (evac_range(start, start + (end - (skip + len)), header->phase) < 0) {
-      goto Lwait;
-    }
-  }
-
-  // if write_buffer.get_pending_writers.head, then we are near the end of the 
disk, so
-  // write down the aggregation in whatever size it is.
-  if (this->_write_buffer.get_buffer_pos() < AGG_HIGH_WATER && 
!this->_write_buffer.get_pending_writers().head && !sync.head &&
-      !dir_sync_waiting) {
-    goto Lwait;
-  }
-
-  // write sync marker
-  if (this->_write_buffer.is_empty()) {
-    ink_assert(sync.head);
-    int l = round_to_approx_size(sizeof(Doc));
-    this->_write_buffer.seek(l);
-    Doc *d = reinterpret_cast<Doc *>(this->_write_buffer.get_buffer());
-    memset(static_cast<void *>(d), 0, sizeof(Doc));
-    d->magic        = DOC_MAGIC;
-    d->len          = l;
-    d->sync_serial  = header->sync_serial;
-    d->write_serial = header->write_serial;
-  }
-
-  // set write limit
-  header->agg_pos = header->write_pos + this->_write_buffer.get_buffer_pos();
-
-  io.aiocb.aio_fildes = fd;
-  io.aiocb.aio_offset = header->write_pos;
-  io.aiocb.aio_buf    = this->_write_buffer.get_buffer();
-  io.aiocb.aio_nbytes = this->_write_buffer.get_buffer_pos();
-  io.action           = this;
-  /*
-    Callback on AIO thread so that we can issue a new write ASAP
-    as all writes are serialized in the volume.  This is not necessary
-    for reads proceed independently.
-   */
-  io.thread = AIO_CALLBACK_THREAD_AIO;
-  SET_HANDLER(&StripeSM::aggWriteDone);
-  ink_aio_write(&io);
-
-Lwait:
-  int ret = EVENT_CONT;
-  while ((c = tocall.dequeue())) {
-    if (event == EVENT_CALL && c->mutex->thread_holding == 
mutex->thread_holding) {
-      ret = EVENT_RETURN;
-    } else {
-      eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
-    }
-  }
-  return ret;
-}
-
-void
-StripeSM::aggregate_pending_writes(Queue<CacheVC, Continuation::Link_link> 
&tocall)
-{
-  for (auto *c = static_cast<CacheVC 
*>(this->_write_buffer.get_pending_writers().head); c;) {
-    int writelen = c->agg_len;
-    // [amc] this is checked multiple places, on here was it strictly less.
-    ink_assert(writelen <= AGG_SIZE);
-    if (this->_write_buffer.get_buffer_pos() + writelen > AGG_SIZE ||
-        this->header->write_pos + this->_write_buffer.get_buffer_pos() + 
writelen > (this->skip + this->len)) {
-      break;
-    }
-    DDbg(dbg_ctl_agg_read, "copying: %d, %" PRIu64 ", key: %d", 
this->_write_buffer.get_buffer_pos(),
-         this->header->write_pos + this->_write_buffer.get_buffer_pos(), 
c->first_key.slice32(0));
-    [[maybe_unused]] int wrotelen = this->_agg_copy(c);
-    ink_assert(writelen == wrotelen);
-    CacheVC *n = (CacheVC *)c->link.next;
-    this->_write_buffer.get_pending_writers().dequeue();
-    if (c->f.sync && c->f.use_first_key) {
-      CacheVC *last = this->sync.tail;
-      while (last && UINT_WRAP_LT(c->write_serial, last->write_serial)) {
-        last = (CacheVC *)last->link.prev;
-      }
-      this->sync.insert(c, last);
-    } else if (c->f.evacuator) {
-      c->handleEvent(AIO_EVENT_DONE, nullptr);
-    } else {
-      tocall.enqueue(c);
-    }
-    c = n;
-  }
-}
-
 int
 CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED 
*/)
 {
diff --git a/src/iocore/cache/P_CacheVol.h b/src/iocore/cache/P_CacheVol.h
index 2766db459c..a448b41f84 100644
--- a/src/iocore/cache/P_CacheVol.h
+++ b/src/iocore/cache/P_CacheVol.h
@@ -195,7 +195,6 @@ public:
 
   int evacuateWrite(CacheEvacuateDocVC *evacuator, int event, Event *e);
   int evacuateDocReadDone(int event, Event *e);
-  int evacuateDoc(int event, Event *e);
 
   int              evac_range(off_t start, off_t end, int evac_phase);
   void             periodic_scan();
diff --git a/src/iocore/cache/Stripe.cc b/src/iocore/cache/Stripe.cc
index c8ba5268af..b01cbfaf10 100644
--- a/src/iocore/cache/Stripe.cc
+++ b/src/iocore/cache/Stripe.cc
@@ -21,22 +21,10 @@
   limitations under the License.
  */
 
-#include "P_CacheDisk.h"
 #include "P_CacheDoc.h"
 #include "P_CacheInternal.h"
 #include "P_CacheVol.h"
 
-#include "proxy/hdrs/HTTP.h"
-
-#include "tsutil/Metrics.h"
-
-#include "iocore/eventsystem/EThread.h"
-#include "iocore/eventsystem/IOBuffer.h"
-#include "iocore/eventsystem/Lock.h"
-
-#include "tsutil/DbgCtl.h"
-
-#include "tscore/hugepages.h"
 #include "tscore/ink_assert.h"
 #include "tscore/ink_memory.h"
 
@@ -47,12 +35,6 @@ using CacheHTTPInfo = HTTPInfo;
 namespace
 {
 
-DbgCtl dbg_ctl_cache_dir_sync{"dir_sync"};
-DbgCtl dbg_ctl_cache_init{"cache_init"};
-
-// This is the oldest version number that is still usable.
-short int const CACHE_DB_MAJOR_VERSION_COMPATIBLE = 21;
-
 int
 compare_ushort(void const *a, void const *b)
 {
@@ -87,638 +69,6 @@ struct StripeInitInfo {
 // Stripe
 //
 
-int
-StripeSM::begin_read(CacheVC *cont) const
-{
-  ink_assert(cont->mutex->thread_holding == this_ethread());
-  ink_assert(mutex->thread_holding == this_ethread());
-  // no need for evacuation as the entire document is already in memory
-  if (cont->f.single_fragment) {
-    return 0;
-  }
-  int              i = dir_evac_bucket(&cont->earliest_dir);
-  EvacuationBlock *b;
-  for (b = evacuate[i].head; b; b = b->link.next) {
-    if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) {
-      continue;
-    }
-    if (b->readers) {
-      b->readers = b->readers + 1;
-    }
-    return 0;
-  }
-  // we don't actually need to preserve this block as it is already in
-  // memory, but this is easier, and evacuations are rare
-  EThread *t        = cont->mutex->thread_holding;
-  b                 = new_EvacuationBlock(t);
-  b->readers        = 1;
-  b->dir            = cont->earliest_dir;
-  b->evac_frags.key = cont->earliest_key;
-  evacuate[i].push(b);
-  return 1;
-}
-
-int
-StripeSM::close_read(CacheVC *cont) const
-{
-  EThread *t = cont->mutex->thread_holding;
-  ink_assert(t == this_ethread());
-  ink_assert(t == mutex->thread_holding);
-  if (dir_is_empty(&cont->earliest_dir)) {
-    return 1;
-  }
-  int              i = dir_evac_bucket(&cont->earliest_dir);
-  EvacuationBlock *b;
-  for (b = evacuate[i].head; b;) {
-    EvacuationBlock *next = b->link.next;
-    if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) {
-      b = next;
-      continue;
-    }
-    if (b->readers && !--b->readers) {
-      evacuate[i].remove(b);
-      free_EvacuationBlock(b, t);
-      break;
-    }
-    b = next;
-  }
-
-  return 1;
-}
-
-/**
-  Add AIO task to clear Dir.
- */
-int
-StripeSM::clear_dir_aio()
-{
-  size_t dir_len = this->dirlen();
-  this->_clear_init();
-
-  SET_HANDLER(&StripeSM::handle_dir_clear);
-
-  io.aiocb.aio_fildes = fd;
-  io.aiocb.aio_buf    = raw_dir;
-  io.aiocb.aio_nbytes = dir_len;
-  io.aiocb.aio_offset = skip;
-  io.action           = this;
-  io.thread           = AIO_CALLBACK_THREAD_ANY;
-  io.then             = nullptr;
-  ink_assert(ink_aio_write(&io));
-
-  return 0;
-}
-
-/**
-  Clear Dir directly. This is mainly used by unit tests. The clear_dir_aio() 
is the suitable function in most cases.
- */
-int
-StripeSM::clear_dir()
-{
-  size_t dir_len = this->dirlen();
-  this->_clear_init();
-
-  if (pwrite(this->fd, this->raw_dir, dir_len, this->skip) < 0) {
-    Warning("unable to clear cache directory '%s'", this->hash_text.get());
-    return -1;
-  }
-
-  return 0;
-}
-
-int
-StripeSM::init(char *s, off_t blocks, off_t dir_skip, bool clear)
-{
-  char        *seed_str       = disk->hash_base_string ? 
disk->hash_base_string : s;
-  const size_t hash_seed_size = strlen(seed_str);
-  const size_t hash_text_size = hash_seed_size + 32;
-
-  hash_text = static_cast<char *>(ats_malloc(hash_text_size));
-  ink_strlcpy(hash_text, seed_str, hash_text_size);
-  snprintf(hash_text + hash_seed_size, (hash_text_size - hash_seed_size), " %" 
PRIu64 ":%" PRIu64 "",
-           static_cast<uint64_t>(dir_skip), static_cast<uint64_t>(blocks));
-  CryptoContext().hash_immediate(hash_id, hash_text, strlen(hash_text));
-
-  dir_skip = ROUND_TO_STORE_BLOCK((dir_skip < START_POS ? START_POS : 
dir_skip));
-  path     = ats_strdup(s);
-  len      = blocks * STORE_BLOCK_SIZE;
-  ink_assert(len <= MAX_STRIPE_SIZE);
-  skip             = dir_skip;
-  prev_recover_pos = 0;
-
-  // successive approximation, directory/meta data eats up some storage
-  start = dir_skip;
-  this->_init_data();
-  data_blocks         = (len - (start - skip)) / STORE_BLOCK_SIZE;
-  hit_evacuate_window = (data_blocks * cache_config_hit_evacuate_percent) / 
100;
-
-  evacuate_size = static_cast<int>(len / EVACUATION_BUCKET_SIZE) + 2;
-  int evac_len  = evacuate_size * sizeof(DLL<EvacuationBlock>);
-  evacuate      = static_cast<DLL<EvacuationBlock> *>(ats_malloc(evac_len));
-  memset(static_cast<void *>(evacuate), 0, evac_len);
-
-  Dbg(dbg_ctl_cache_init, "Vol %s: allocating %zu directory bytes for a %lld 
byte volume (%lf%%)", hash_text.get(), dirlen(),
-      (long long)this->len, (double)dirlen() / (double)this->len * 100.0);
-
-  raw_dir = nullptr;
-  if (ats_hugepage_enabled()) {
-    raw_dir = static_cast<char *>(ats_alloc_hugepage(this->dirlen()));
-  }
-  if (raw_dir == nullptr) {
-    raw_dir = static_cast<char *>(ats_memalign(ats_pagesize(), 
this->dirlen()));
-  }
-
-  dir    = reinterpret_cast<Dir *>(raw_dir + this->headerlen());
-  header = reinterpret_cast<StripteHeaderFooter *>(raw_dir);
-  footer = reinterpret_cast<StripteHeaderFooter *>(raw_dir + this->dirlen() - 
ROUND_TO_STORE_BLOCK(sizeof(StripteHeaderFooter)));
-
-  if (clear) {
-    Note("clearing cache directory '%s'", hash_text.get());
-    return clear_dir_aio();
-  }
-
-  init_info           = new StripeInitInfo();
-  int   footerlen     = ROUND_TO_STORE_BLOCK(sizeof(StripteHeaderFooter));
-  off_t footer_offset = this->dirlen() - footerlen;
-  // try A
-  off_t as = skip;
-
-  Dbg(dbg_ctl_cache_init, "reading directory '%s'", hash_text.get());
-  SET_HANDLER(&StripeSM::handle_header_read);
-  init_info->vol_aio[0].aiocb.aio_offset = as;
-  init_info->vol_aio[1].aiocb.aio_offset = as + footer_offset;
-  off_t bs                               = skip + this->dirlen();
-  init_info->vol_aio[2].aiocb.aio_offset = bs;
-  init_info->vol_aio[3].aiocb.aio_offset = bs + footer_offset;
-
-  for (unsigned i = 0; i < countof(init_info->vol_aio); i++) {
-    AIOCallback *aio      = &(init_info->vol_aio[i]);
-    aio->aiocb.aio_fildes = fd;
-    aio->aiocb.aio_buf    = &(init_info->vol_h_f[i * STORE_BLOCK_SIZE]);
-    aio->aiocb.aio_nbytes = footerlen;
-    aio->action           = this;
-    aio->thread           = AIO_CALLBACK_THREAD_ANY;
-    aio->then             = (i < 3) ? &(init_info->vol_aio[i + 1]) : nullptr;
-  }
-  ink_assert(ink_aio_read(init_info->vol_aio));
-  return 0;
-}
-
-int
-StripeSM::handle_dir_clear(int event, void *data)
-{
-  size_t       dir_len = this->dirlen();
-  AIOCallback *op;
-
-  if (event == AIO_EVENT_DONE) {
-    op = static_cast<AIOCallback *>(data);
-    if (!op->ok()) {
-      Warning("unable to clear cache directory '%s'", hash_text.get());
-      disk->incrErrors(op);
-      fd = -1;
-    }
-
-    if (op->aiocb.aio_nbytes == dir_len) {
-      /* clear the header for directory B. We don't need to clear the
-         whole of directory B. The header for directory B starts at
-         skip + len */
-      op->aiocb.aio_nbytes = ROUND_TO_STORE_BLOCK(sizeof(StripteHeaderFooter));
-      op->aiocb.aio_offset = skip + dir_len;
-      ink_assert(ink_aio_write(op));
-      return EVENT_DONE;
-    }
-    set_io_not_in_progress();
-    SET_HANDLER(&StripeSM::dir_init_done);
-    dir_init_done(EVENT_IMMEDIATE, nullptr);
-    /* mark the volume as bad */
-  }
-  return EVENT_DONE;
-}
-
-int
-StripeSM::handle_dir_read(int event, void *data)
-{
-  AIOCallback *op = static_cast<AIOCallback *>(data);
-
-  if (event == AIO_EVENT_DONE) {
-    if (!op->ok()) {
-      Note("Directory read failed: clearing cache directory %s", 
this->hash_text.get());
-      clear_dir_aio();
-      return EVENT_DONE;
-    }
-  }
-
-  if (!(header->magic == STRIPE_MAGIC && footer->magic == STRIPE_MAGIC &&
-        CACHE_DB_MAJOR_VERSION_COMPATIBLE <= header->version._major && 
header->version._major <= CACHE_DB_MAJOR_VERSION)) {
-    Warning("bad footer in cache directory for '%s', clearing", 
hash_text.get());
-    Note("STRIPE_MAGIC %d\n header magic: %d\n footer_magic %d\n 
CACHE_DB_MAJOR_VERSION_COMPATIBLE %d\n major version %d\n"
-         "CACHE_DB_MAJOR_VERSION %d\n",
-         STRIPE_MAGIC, header->magic, footer->magic, 
CACHE_DB_MAJOR_VERSION_COMPATIBLE, header->version._major,
-         CACHE_DB_MAJOR_VERSION);
-    Note("clearing cache directory '%s'", hash_text.get());
-    clear_dir_aio();
-    return EVENT_DONE;
-  }
-  CHECK_DIR(this);
-
-  sector_size = header->sector_size;
-
-  return this->recover_data();
-}
-
-int
-StripeSM::recover_data()
-{
-  SET_HANDLER(&StripeSM::handle_recover_from_data);
-  return handle_recover_from_data(EVENT_IMMEDIATE, nullptr);
-}
-
-/*
-   Philosophy:  The idea is to find the region of disk that could be
-   inconsistent and remove all directory entries pointing to that potentially
-   inconsistent region.
-   Start from a consistent position (the write_pos of the last directory
-   synced to disk) and scan forward. Two invariants for docs that were
-   written to the disk after the directory was synced:
-
-   1. doc->magic == DOC_MAGIC
-
-   The following two cases happen only when the previous generation
-   documents are aligned with the current ones.
-
-   2. All the docs written to the disk
-   after the directory was synced will have their sync_serial <=
-   header->sync_serial + 1,  because the write aggregation can take
-   indeterminate amount of time to sync. The doc->sync_serial can be
-   equal to header->sync_serial + 1, because we increment the sync_serial
-   before we sync the directory to disk.
-
-   3. The doc->sync_serial will always increase. If doc->sync_serial
-   decreases, the document was written in the previous phase
-
-   If either of these conditions fail and we are not too close to the end
-   (see the next comment ) then we're done
-
-   We actually start from header->last_write_pos instead of header->write_pos
-   to make sure that we haven't wrapped around the whole disk without
-   syncing the directory.  Since the sync serial is 60 seconds, it is
-   entirely possible to write through the whole cache without
-   once syncing the directory. In this case, we need to clear the
-   cache.The documents written right before we synced the
-   directory to disk should have the write_serial <= header->sync_serial.
-
-      */
-
-int
-StripeSM::handle_recover_from_data(int event, void * /* data ATS_UNUSED */)
-{
-  uint32_t got_len         = 0;
-  uint32_t max_sync_serial = header->sync_serial;
-  char    *s, *e = nullptr;
-  if (event == EVENT_IMMEDIATE) {
-    if (header->sync_serial == 0) {
-      io.aiocb.aio_buf = nullptr;
-      SET_HANDLER(&StripeSM::handle_recover_write_dir);
-      return handle_recover_write_dir(EVENT_IMMEDIATE, nullptr);
-    }
-    // initialize
-    recover_wrapped   = false;
-    last_sync_serial  = 0;
-    last_write_serial = 0;
-    recover_pos       = header->last_write_pos;
-    if (recover_pos >= skip + len) {
-      recover_wrapped = true;
-      recover_pos     = start;
-    }
-    io.aiocb.aio_buf    = static_cast<char *>(ats_memalign(ats_pagesize(), 
RECOVERY_SIZE));
-    io.aiocb.aio_nbytes = RECOVERY_SIZE;
-    if (static_cast<off_t>(recover_pos + io.aiocb.aio_nbytes) > 
static_cast<off_t>(skip + len)) {
-      io.aiocb.aio_nbytes = (skip + len) - recover_pos;
-    }
-  } else if (event == AIO_EVENT_DONE) {
-    if (!io.ok()) {
-      Warning("disk read error on recover '%s', clearing", hash_text.get());
-      disk->incrErrors(&io);
-      goto Lclear;
-    }
-    if (io.aiocb.aio_offset == header->last_write_pos) {
-      /* check that we haven't wrapped around without syncing
-         the directory. Start from last_write_serial (write pos the documents
-         were written to just before syncing the directory) and make sure
-         that all documents have write_serial <= header->write_serial.
-       */
-      uint32_t to_check = header->write_pos - header->last_write_pos;
-      ink_assert(to_check && to_check < (uint32_t)io.aiocb.aio_nbytes);
-      uint32_t done = 0;
-      s             = static_cast<char *>(io.aiocb.aio_buf);
-      while (done < to_check) {
-        Doc *doc = reinterpret_cast<Doc *>(s + done);
-        if (doc->magic != DOC_MAGIC || doc->write_serial > 
header->write_serial) {
-          Warning("no valid directory found while recovering '%s', clearing", 
hash_text.get());
-          goto Lclear;
-        }
-        done += round_to_approx_size(doc->len);
-        if (doc->sync_serial > last_write_serial) {
-          last_sync_serial = doc->sync_serial;
-        }
-      }
-      ink_assert(done == to_check);
-
-      got_len      = io.aiocb.aio_nbytes - done;
-      recover_pos += io.aiocb.aio_nbytes;
-      s            = static_cast<char *>(io.aiocb.aio_buf) + done;
-      e            = s + got_len;
-    } else {
-      got_len      = io.aiocb.aio_nbytes;
-      recover_pos += io.aiocb.aio_nbytes;
-      s            = static_cast<char *>(io.aiocb.aio_buf);
-      e            = s + got_len;
-    }
-  }
-  // examine what we got
-  if (got_len) {
-    Doc *doc = nullptr;
-
-    if (recover_wrapped && start == io.aiocb.aio_offset) {
-      doc = reinterpret_cast<Doc *>(s);
-      if (doc->magic != DOC_MAGIC || doc->write_serial < last_write_serial) {
-        recover_pos = skip + len - EVACUATION_SIZE;
-        goto Ldone;
-      }
-    }
-
-    // If execution reaches here, then @c got_len > 0 and e == s + got_len 
therefore s < e
-    // clang analyzer can't figure this out, so be explicit.
-    ink_assert(s < e);
-    while (s < e) {
-      doc = reinterpret_cast<Doc *>(s);
-
-      if (doc->magic != DOC_MAGIC || doc->sync_serial != last_sync_serial) {
-        if (doc->magic == DOC_MAGIC) {
-          if (doc->sync_serial > header->sync_serial) {
-            max_sync_serial = doc->sync_serial;
-          }
-
-          /*
-             doc->magic == DOC_MAGIC, but doc->sync_serial != last_sync_serial
-             This might happen in the following situations
-             1. We are starting off recovery. In this case the
-             last_sync_serial == header->sync_serial, but the doc->sync_serial
-             can be anywhere in the range (0, header->sync_serial + 1]
-             If this is the case, update last_sync_serial and continue;
-
-             2. A dir sync started between writing documents to the
-             aggregation buffer and hence the doc->sync_serial went up.
-             If the doc->sync_serial is greater than the last
-             sync serial and less than (header->sync_serial + 2) then
-             continue;
-
-             3. If the position we are recovering from is within AGG_SIZE
-             from the disk end, then we can't trust this document. The
-             aggregation buffer might have been larger than the remaining space
-             at the end and we decided to wrap around instead of writing
-             anything at that point. In this case, wrap around and start
-             from the beginning.
-
-             If neither of these 3 cases happen, then we are indeed done.
-
-           */
-
-          // case 1
-          // case 2
-          if (doc->sync_serial > last_sync_serial && doc->sync_serial <= 
header->sync_serial + 1) {
-            last_sync_serial  = doc->sync_serial;
-            s                += round_to_approx_size(doc->len);
-            continue;
-          }
-          // case 3 - we have already recovered some data and
-          // (doc->sync_serial < last_sync_serial) ||
-          // (doc->sync_serial > header->sync_serial + 1).
-          // if we are too close to the end, wrap around
-          else if (recover_pos - (e - s) > (skip + len) - AGG_SIZE) {
-            recover_wrapped     = true;
-            recover_pos         = start;
-            io.aiocb.aio_nbytes = RECOVERY_SIZE;
-
-            break;
-          }
-          // we are done. This doc was written in the earlier phase
-          recover_pos -= e - s;
-          goto Ldone;
-        } else {
-          // doc->magic != DOC_MAGIC
-          // If we are in the danger zone - recover_pos is within AGG_SIZE
-          // from the end, then wrap around
-          recover_pos -= e - s;
-          if (recover_pos > (skip + len) - AGG_SIZE) {
-            recover_wrapped     = true;
-            recover_pos         = start;
-            io.aiocb.aio_nbytes = RECOVERY_SIZE;
-
-            break;
-          }
-          // we ar not in the danger zone
-          goto Ldone;
-        }
-      }
-      // doc->magic == DOC_MAGIC && doc->sync_serial == last_sync_serial
-      last_write_serial  = doc->write_serial;
-      s                 += round_to_approx_size(doc->len);
-    }
-
-    /* if (s > e) then we gone through RECOVERY_SIZE; we need to
-       read more data off disk and continue recovering */
-    if (s >= e) {
-      /* In the last iteration, we increment s by doc->len...need to undo
-         that change */
-      if (s > e) {
-        s -= round_to_approx_size(doc->len);
-      }
-      recover_pos -= e - s;
-      if (recover_pos >= skip + len) {
-        recover_wrapped = true;
-        recover_pos     = start;
-      }
-      io.aiocb.aio_nbytes = RECOVERY_SIZE;
-      if (static_cast<off_t>(recover_pos + io.aiocb.aio_nbytes) > 
static_cast<off_t>(skip + len)) {
-        io.aiocb.aio_nbytes = (skip + len) - recover_pos;
-      }
-    }
-  }
-  if (recover_pos == prev_recover_pos) { // this should never happen, but if 
it does break the loop
-    goto Lclear;
-  }
-  prev_recover_pos    = recover_pos;
-  io.aiocb.aio_offset = recover_pos;
-  ink_assert(ink_aio_read(&io));
-  return EVENT_CONT;
-
-Ldone: {
-  /* if we come back to the starting position, then we don't have to recover 
anything */
-  if (recover_pos == header->write_pos && recover_wrapped) {
-    SET_HANDLER(&StripeSM::handle_recover_write_dir);
-    if (dbg_ctl_cache_init.on()) {
-      Note("recovery wrapped around. nothing to clear\n");
-    }
-    return handle_recover_write_dir(EVENT_IMMEDIATE, nullptr);
-  }
-
-  recover_pos += EVACUATION_SIZE; // safely cover the max write size
-  if (recover_pos < header->write_pos && (recover_pos + EVACUATION_SIZE >= 
header->write_pos)) {
-    Dbg(dbg_ctl_cache_init, "Head Pos: %" PRIu64 ", Rec Pos: %" PRIu64 ", 
Wrapped:%d", header->write_pos, recover_pos,
-        recover_wrapped);
-    Warning("no valid directory found while recovering '%s', clearing", 
hash_text.get());
-    goto Lclear;
-  }
-
-  if (recover_pos > skip + len) {
-    recover_pos -= skip + len;
-  }
-  // bump sync number so it is different from that in the Doc structs
-  uint32_t next_sync_serial = max_sync_serial + 1;
-  // make that the next sync does not overwrite our good copy!
-  if (!(header->sync_serial & 1) == !(next_sync_serial & 1)) {
-    next_sync_serial++;
-  }
-  // clear effected portion of the cache
-  off_t clear_start = this->offset_to_vol_offset(header->write_pos);
-  off_t clear_end   = this->offset_to_vol_offset(recover_pos);
-  if (clear_start <= clear_end) {
-    dir_clear_range(clear_start, clear_end, this);
-  } else {
-    dir_clear_range(clear_start, DIR_OFFSET_MAX, this);
-    dir_clear_range(1, clear_end, this);
-  }
-
-  Note("recovery clearing offsets of Stripe %s : [%" PRIu64 ", %" PRIu64 "] 
sync_serial %d next %d\n", hash_text.get(),
-       header->write_pos, recover_pos, header->sync_serial, next_sync_serial);
-
-  footer->sync_serial = header->sync_serial = next_sync_serial;
-
-  for (int i = 0; i < 3; i++) {
-    AIOCallback *aio      = &(init_info->vol_aio[i]);
-    aio->aiocb.aio_fildes = fd;
-    aio->action           = this;
-    aio->thread           = AIO_CALLBACK_THREAD_ANY;
-    aio->then             = (i < 2) ? &(init_info->vol_aio[i + 1]) : nullptr;
-  }
-  int    footerlen = ROUND_TO_STORE_BLOCK(sizeof(StripteHeaderFooter));
-  size_t dirlen    = this->dirlen();
-  int    B         = header->sync_serial & 1;
-  off_t  ss        = skip + (B ? dirlen : 0);
-
-  init_info->vol_aio[0].aiocb.aio_buf    = raw_dir;
-  init_info->vol_aio[0].aiocb.aio_nbytes = footerlen;
-  init_info->vol_aio[0].aiocb.aio_offset = ss;
-  init_info->vol_aio[1].aiocb.aio_buf    = raw_dir + footerlen;
-  init_info->vol_aio[1].aiocb.aio_nbytes = dirlen - 2 * footerlen;
-  init_info->vol_aio[1].aiocb.aio_offset = ss + footerlen;
-  init_info->vol_aio[2].aiocb.aio_buf    = raw_dir + dirlen - footerlen;
-  init_info->vol_aio[2].aiocb.aio_nbytes = footerlen;
-  init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen - footerlen;
-
-  SET_HANDLER(&StripeSM::handle_recover_write_dir);
-  ink_assert(ink_aio_write(init_info->vol_aio));
-  return EVENT_CONT;
-}
-
-Lclear:
-  free(static_cast<char *>(io.aiocb.aio_buf));
-  delete init_info;
-  init_info = nullptr;
-  clear_dir_aio();
-  return EVENT_CONT;
-}
-
-int
-StripeSM::handle_recover_write_dir(int /* event ATS_UNUSED */, void * /* data 
ATS_UNUSED */)
-{
-  if (io.aiocb.aio_buf) {
-    free(static_cast<char *>(io.aiocb.aio_buf));
-  }
-  delete init_info;
-  init_info = nullptr;
-  set_io_not_in_progress();
-  scan_pos = header->write_pos;
-  periodic_scan();
-  SET_HANDLER(&StripeSM::dir_init_done);
-  return dir_init_done(EVENT_IMMEDIATE, nullptr);
-}
-
-int
-StripeSM::handle_header_read(int event, void *data)
-{
-  AIOCallback         *op;
-  StripteHeaderFooter *hf[4];
-  switch (event) {
-  case AIO_EVENT_DONE:
-    op = static_cast<AIOCallback *>(data);
-    for (auto &i : hf) {
-      ink_assert(op != nullptr);
-      i = static_cast<StripteHeaderFooter *>(op->aiocb.aio_buf);
-      if (!op->ok()) {
-        Note("Header read failed: clearing cache directory %s", 
this->hash_text.get());
-        clear_dir_aio();
-        return EVENT_DONE;
-      }
-      op = op->then;
-    }
-
-    io.aiocb.aio_fildes = fd;
-    io.aiocb.aio_nbytes = this->dirlen();
-    io.aiocb.aio_buf    = raw_dir;
-    io.action           = this;
-    io.thread           = AIO_CALLBACK_THREAD_ANY;
-    io.then             = nullptr;
-
-    if (hf[0]->sync_serial == hf[1]->sync_serial &&
-        (hf[0]->sync_serial >= hf[2]->sync_serial || hf[2]->sync_serial != 
hf[3]->sync_serial)) {
-      SET_HANDLER(&StripeSM::handle_dir_read);
-      if (dbg_ctl_cache_init.on()) {
-        Note("using directory A for '%s'", hash_text.get());
-      }
-      io.aiocb.aio_offset = skip;
-      ink_assert(ink_aio_read(&io));
-    }
-    // try B
-    else if (hf[2]->sync_serial == hf[3]->sync_serial) {
-      SET_HANDLER(&StripeSM::handle_dir_read);
-      if (dbg_ctl_cache_init.on()) {
-        Note("using directory B for '%s'", hash_text.get());
-      }
-      io.aiocb.aio_offset = skip + this->dirlen();
-      ink_assert(ink_aio_read(&io));
-    } else {
-      Note("no good directory, clearing '%s' since sync_serials on both A and 
B copies are invalid", hash_text.get());
-      Note("Header A: %d\nFooter A: %d\n Header B: %d\n Footer B %d\n", 
hf[0]->sync_serial, hf[1]->sync_serial, hf[2]->sync_serial,
-           hf[3]->sync_serial);
-      clear_dir_aio();
-      delete init_info;
-      init_info = nullptr;
-    }
-    return EVENT_DONE;
-  default:
-    ink_assert(!"not reach here");
-  }
-  return EVENT_DONE;
-}
-
-int
-StripeSM::dir_init_done(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED 
*/)
-{
-  if (!cache->cache_read_done) {
-    eventProcessor.schedule_in(this, HRTIME_MSECONDS(5), ET_CALL);
-    return EVENT_CONT;
-  } else {
-    int i = gnstripes++;
-    ink_assert(!gstripes[i]);
-    gstripes[i] = this;
-    SET_HANDLER(&StripeSM::aggWrite);
-    cache->vol_initialized(fd != -1);
-    return EVENT_DONE;
-  }
-}
-
 int
 Stripe::dir_check()
 {
@@ -935,240 +285,6 @@ Stripe::_init_data()
   this->_init_data_internal();
 }
 
-bool
-StripeSM::add_writer(CacheVC *vc)
-{
-  ink_assert(vc);
-  this->_write_buffer.add_bytes_pending_aggregation(vc->agg_len);
-  // An extra AGG_SIZE is added to the backlog here, but not in
-  // open_write, at the time I'm writing this comment. I venture to
-  // guess that because the stripe lock may be released between
-  // open_write and add_writer (I have checked this), the number of
-  // bytes pending aggregation lags and is inaccurate. Therefore the
-  // check in open_write is too permissive, and once we get to add_writer
-  // and update our bytes pending, we may discover we have more backlog
-  // than we thought we did. The solution to the problem was to permit
-  // an aggregation buffer extra of backlog here. That's my analysis.
-  bool agg_error =
-    (vc->agg_len > AGG_SIZE || vc->header_len + sizeof(Doc) > MAX_FRAG_SIZE ||
-     (!vc->f.readers && (this->_write_buffer.get_bytes_pending_aggregation() > 
cache_config_agg_write_backlog + AGG_SIZE) &&
-      vc->write_len));
-#ifdef CACHE_AGG_FAIL_RATE
-  agg_error = agg_error || 
((uint32_t)vc->mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX 
* CACHE_AGG_FAIL_RATE));
-#endif
-
-  if (agg_error) {
-    this->_write_buffer.add_bytes_pending_aggregation(-vc->agg_len);
-  } else {
-    ink_assert(vc->agg_len <= AGG_SIZE);
-    if (vc->f.evac_vector) {
-      this->get_pending_writers().push(vc);
-    } else {
-      this->get_pending_writers().enqueue(vc);
-    }
-  }
-
-  return !agg_error;
-}
-
-void
-StripeSM::shutdown(EThread *shutdown_thread)
-{
-  // the process is going down, do a blocking call
-  // dont release the volume's lock, there could
-  // be another aggWrite in progress
-  MUTEX_TAKE_LOCK(this->mutex, shutdown_thread);
-
-  if (DISK_BAD(this->disk)) {
-    Dbg(dbg_ctl_cache_dir_sync, "Dir %s: ignoring -- bad disk", 
this->hash_text.get());
-    return;
-  }
-  size_t dirlen = this->dirlen();
-  ink_assert(dirlen > 0); // make clang happy - if not > 0 the vol is 
seriously messed up
-  if (!this->header->dirty && !this->dir_sync_in_progress) {
-    Dbg(dbg_ctl_cache_dir_sync, "Dir %s: ignoring -- not dirty", 
this->hash_text.get());
-    return;
-  }
-  // recompute hit_evacuate_window
-  this->hit_evacuate_window = (this->data_blocks * 
cache_config_hit_evacuate_percent) / 100;
-
-  // check if we have data in the agg buffer
-  // dont worry about the cachevc s in the agg queue
-  // directories have not been inserted for these writes
-  if (!this->_write_buffer.is_empty()) {
-    Dbg(dbg_ctl_cache_dir_sync, "Dir %s: flushing agg buffer first", 
this->hash_text.get());
-    this->flush_aggregate_write_buffer();
-  }
-
-  // We already asserted that dirlen > 0.
-  if (!this->dir_sync_in_progress) {
-    this->header->sync_serial++;
-  } else {
-    Dbg(dbg_ctl_cache_dir_sync, "Periodic dir sync in progress -- 
overwriting");
-  }
-  this->footer->sync_serial = this->header->sync_serial;
-
-  CHECK_DIR(d);
-  size_t B     = this->header->sync_serial & 1;
-  off_t  start = this->skip + (B ? dirlen : 0);
-  B            = pwrite(this->fd, this->raw_dir, dirlen, start);
-  ink_assert(B == dirlen);
-  Dbg(dbg_ctl_cache_dir_sync, "done syncing dir for vol %s", 
this->hash_text.get());
-}
-
-static void
-init_document(CacheVC const *vc, Doc *doc, int const len)
-{
-  doc->magic     = DOC_MAGIC;
-  doc->len       = len;
-  doc->hlen      = vc->header_len;
-  doc->doc_type  = vc->frag_type;
-  doc->v_major   = CACHE_DB_MAJOR_VERSION;
-  doc->v_minor   = CACHE_DB_MINOR_VERSION;
-  doc->unused    = 0; // force this for forward compatibility.
-  doc->total_len = vc->total_len;
-  doc->first_key = vc->first_key;
-  doc->checksum  = DOC_NO_CHECKSUM;
-}
-
-static void
-update_header_info(CacheVC *vc, Doc *doc)
-{
-  if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
-    ink_assert(vc->write_vector->count() > 0);
-    if (!vc->f.update && !vc->f.evac_vector) {
-      ink_assert(!(vc->first_key.is_zero()));
-      CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
-      http_info->object_size_set(vc->total_len);
-    }
-    // update + data_written =>  Update case (b)
-    // need to change the old alternate's object length
-    if (vc->f.update && vc->total_len) {
-      CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
-      http_info->object_size_set(vc->total_len);
-    }
-    ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
-    ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), 
vc->header_len));
-  } else {
-    memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
-  }
-}
-
-static void
-update_document_key(CacheVC *vc, Doc *doc)
-{
-  if (vc->f.use_first_key) {
-    if (doc->data_len() || vc->f.allow_empty_doc) {
-      doc->key = vc->earliest_key;
-    } else { // the vector is being written by itself
-      if (vc->earliest_key.is_zero()) {
-        do {
-          rand_CacheKey(&doc->key);
-        } while (DIR_MASK_TAG(doc->key.slice32(2)) == 
DIR_MASK_TAG(vc->first_key.slice32(2)));
-      } else {
-        prev_CacheKey(&doc->key, &vc->earliest_key);
-      }
-    }
-    dir_set_head(&vc->dir, true);
-  } else {
-    doc->key = vc->key;
-    dir_set_head(&vc->dir, !vc->fragment);
-  }
-}
-
-int
-StripeSM::_copy_writer_to_aggregation(CacheVC *vc)
-{
-  off_t          doc_offset{this->header->write_pos + this->get_agg_buf_pos()};
-  uint32_t       len         = vc->write_len + vc->header_len + vc->frag_len + 
sizeof(Doc);
-  Doc           *doc         = 
this->_write_buffer.emplace(this->round_to_approx_size(len));
-  IOBufferBlock *res_alt_blk = nullptr;
-
-  ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeof(Doc));
-  ink_assert(this->round_to_approx_size(len) == vc->agg_len);
-  // update copy of directory entry for this document
-  dir_set_approx_size(&vc->dir, vc->agg_len);
-  dir_set_offset(&vc->dir, this->offset_to_vol_offset(doc_offset));
-  ink_assert(this->vol_offset(&vc->dir) < (this->skip + this->len));
-  dir_set_phase(&vc->dir, this->header->phase);
-
-  // fill in document header
-  init_document(vc, doc, len);
-  doc->sync_serial = this->header->sync_serial;
-  vc->write_serial = doc->write_serial = this->header->write_serial;
-  if (vc->get_pin_in_cache()) {
-    dir_set_pinned(&vc->dir, 1);
-    doc->pin(vc->get_pin_in_cache());
-  } else {
-    dir_set_pinned(&vc->dir, 0);
-    doc->unpin();
-  }
-
-  update_document_key(vc, doc);
-
-  if (vc->f.rewrite_resident_alt) {
-    ink_assert(vc->f.use_first_key);
-    Doc *res_doc   = reinterpret_cast<Doc *>(vc->first_buf->data());
-    res_alt_blk    = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), 
sizeof(Doc) + res_doc->hlen);
-    doc->key       = res_doc->key;
-    doc->total_len = res_doc->data_len();
-  }
-  // update the new_info object_key, and total_len and dirinfo
-  if (vc->header_len) {
-    ink_assert(vc->f.use_first_key);
-    update_header_info(vc, doc);
-    // the single fragment flag is not used in the write call.
-    // putting it in for completeness.
-    vc->f.single_fragment = doc->single_fragment();
-  }
-  // move data
-  if (vc->write_len) {
-    ink_assert(this->mutex.get()->thread_holding == this_ethread());
-
-    Metrics::Counter::increment(cache_rsb.write_bytes);
-    Metrics::Counter::increment(this->cache_vol->vol_rsb.write_bytes);
-
-    if (vc->f.rewrite_resident_alt) {
-      doc->set_data(vc->write_len, res_alt_blk, 0);
-    } else {
-      doc->set_data(vc->write_len, vc->blocks.get(), vc->offset);
-    }
-  }
-  if (cache_config_enable_checksum) {
-    doc->calculate_checksum();
-  }
-  if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment) {
-    ink_assert(doc->hlen);
-  }
-
-  if (res_alt_blk) {
-    res_alt_blk->free();
-  }
-
-  return vc->agg_len;
-}
-
-int
-StripeSM::_copy_evacuator_to_aggregation(CacheVC *vc)
-{
-  Doc *doc         = reinterpret_cast<Doc *>(vc->buf->data());
-  int  approx_size = this->round_to_approx_size(doc->len);
-
-  Metrics::Counter::increment(cache_rsb.gc_frags_evacuated);
-  Metrics::Counter::increment(this->cache_vol->vol_rsb.gc_frags_evacuated);
-
-  doc->sync_serial  = this->header->sync_serial;
-  doc->write_serial = this->header->write_serial;
-
-  off_t doc_offset{this->header->write_pos + 
this->_write_buffer.get_buffer_pos()};
-  this->_write_buffer.add(doc, approx_size);
-
-  vc->dir = vc->overwrite_dir;
-  dir_set_offset(&vc->dir, this->offset_to_vol_offset(doc_offset));
-  dir_set_phase(&vc->dir, this->header->phase);
-  return approx_size;
-}
-
 bool
 Stripe::flush_aggregate_write_buffer()
 {
diff --git a/src/iocore/cache/Stripe.cc b/src/iocore/cache/StripeSM.cc
similarity index 61%
copy from src/iocore/cache/Stripe.cc
copy to src/iocore/cache/StripeSM.cc
index c8ba5268af..2368c5283a 100644
--- a/src/iocore/cache/Stripe.cc
+++ b/src/iocore/cache/StripeSM.cc
@@ -24,43 +24,71 @@
 #include "P_CacheDisk.h"
 #include "P_CacheDoc.h"
 #include "P_CacheInternal.h"
+#include "P_CacheStats.h"
 #include "P_CacheVol.h"
+#include "P_CacheDir.h"
+
+#include "CacheEvacuateDocVC.h"
+#include "Stripe.h"
+
+#include "iocore/cache/CacheDefs.h"
+#include "iocore/cache/CacheVC.h"
 
 #include "proxy/hdrs/HTTP.h"
 
-#include "tsutil/Metrics.h"
+#include "iocore/aio/AIO.h"
 
+#include "iocore/eventsystem/Continuation.h"
 #include "iocore/eventsystem/EThread.h"
+#include "iocore/eventsystem/Event.h"
+#include "iocore/eventsystem/EventProcessor.h"
 #include "iocore/eventsystem/IOBuffer.h"
-#include "iocore/eventsystem/Lock.h"
 
 #include "tsutil/DbgCtl.h"
+#include "tsutil/Metrics.h"
 
+#include "tscore/Diags.h"
 #include "tscore/hugepages.h"
 #include "tscore/ink_assert.h"
-#include "tscore/ink_memory.h"
+#include "tscore/ink_hrtime.h"
+#include "tscore/ink_platform.h"
+#include "tscore/List.h"
 
+#include <cinttypes>
+#include <cstddef>
+#include <cstdlib>
 #include <cstring>
 
-using CacheHTTPInfo = HTTPInfo;
+// These macros allow two incrementing unsigned values x and y to maintain
+// their ordering when one of them overflows, given that the values stay close 
to each other.
+#define UINT_WRAP_LTE(_x, _y) (((_y) - (_x)) < INT_MAX)  // exploit overflow
+#define UINT_WRAP_GTE(_x, _y) (((_x) - (_y)) < INT_MAX)  // exploit overflow
+#define UINT_WRAP_LT(_x, _y)  (((_x) - (_y)) >= INT_MAX) // exploit overflow
 
 namespace
 {
 
+short int const CACHE_DB_MAJOR_VERSION_COMPATIBLE = 21;
+
 DbgCtl dbg_ctl_cache_dir_sync{"dir_sync"};
+DbgCtl dbg_ctl_cache_disk_error{"cache_disk_error"};
+DbgCtl dbg_ctl_cache_evac{"cache_evac"};
 DbgCtl dbg_ctl_cache_init{"cache_init"};
 
-// This is the oldest version number that is still usable.
-short int const CACHE_DB_MAJOR_VERSION_COMPATIBLE = 21;
+#ifdef DEBUG
 
-int
-compare_ushort(void const *a, void const *b)
-{
-  return *static_cast<unsigned short const *>(a) - *static_cast<unsigned short 
const *>(b);
-}
+DbgCtl dbg_ctl_agg_read{"agg_read"};
+DbgCtl dbg_ctl_cache_agg{"cache_agg"};
+
+#endif
 
 } // namespace
 
+static void init_document(CacheVC const *vc, Doc *doc, int const len);
+static void update_document_key(CacheVC *vc, Doc *doc);
+static void update_header_info(CacheVC *vc, Doc *doc);
+static int  evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int 
force, StripeSM *stripe);
+
 struct StripeInitInfo {
   off_t               recover_pos;
   AIOCallbackInternal vol_aio[4];
@@ -83,10 +111,6 @@ struct StripeInitInfo {
   }
 };
 
-////
-// Stripe
-//
-
 int
 StripeSM::begin_read(CacheVC *cont) const
 {
@@ -146,29 +170,6 @@ StripeSM::close_read(CacheVC *cont) const
   return 1;
 }
 
-/**
-  Add AIO task to clear Dir.
- */
-int
-StripeSM::clear_dir_aio()
-{
-  size_t dir_len = this->dirlen();
-  this->_clear_init();
-
-  SET_HANDLER(&StripeSM::handle_dir_clear);
-
-  io.aiocb.aio_fildes = fd;
-  io.aiocb.aio_buf    = raw_dir;
-  io.aiocb.aio_nbytes = dir_len;
-  io.aiocb.aio_offset = skip;
-  io.action           = this;
-  io.thread           = AIO_CALLBACK_THREAD_ANY;
-  io.then             = nullptr;
-  ink_assert(ink_aio_write(&io));
-
-  return 0;
-}
-
 /**
   Clear Dir directly. This is mainly used by unit tests. The clear_dir_aio() 
is the suitable function in most cases.
  */
@@ -326,6 +327,29 @@ StripeSM::handle_dir_read(int event, void *data)
   return this->recover_data();
 }
 
+/**
+  Add AIO task to clear Dir.
+ */
+int
+StripeSM::clear_dir_aio()
+{
+  size_t dir_len = this->dirlen();
+  this->_clear_init();
+
+  SET_HANDLER(&StripeSM::handle_dir_clear);
+
+  io.aiocb.aio_fildes = fd;
+  io.aiocb.aio_buf    = raw_dir;
+  io.aiocb.aio_nbytes = dir_len;
+  io.aiocb.aio_offset = skip;
+  io.action           = this;
+  io.thread           = AIO_CALLBACK_THREAD_ANY;
+  io.then             = nullptr;
+  ink_assert(ink_aio_write(&io));
+
+  return 0;
+}
+
 int
 StripeSM::recover_data()
 {
@@ -368,7 +392,6 @@ StripeSM::recover_data()
    directory to disk should have the write_serial <= header->sync_serial.
 
       */
-
 int
 StripeSM::handle_recover_from_data(int event, void * /* data ATS_UNUSED */)
 {
@@ -645,6 +668,143 @@ StripeSM::handle_recover_write_dir(int /* event 
ATS_UNUSED */, void * /* data AT
   return dir_init_done(EVENT_IMMEDIATE, nullptr);
 }
 
+void
+StripeSM::periodic_scan()
+{
+  evacuate_cleanup();
+  scan_for_pinned_documents();
+  if (header->write_pos == start) {
+    scan_pos = start;
+  }
+  scan_pos += len / PIN_SCAN_EVERY;
+}
+
+void
+StripeSM::evacuate_cleanup()
+{
+  int64_t eo = ((header->write_pos - start) / CACHE_BLOCK_SIZE) + 1;
+  int64_t e  = dir_offset_evac_bucket(eo);
+  int64_t sx = e - (evacuate_size / PIN_SCAN_EVERY) - 1;
+  int64_t s  = sx;
+  int     i;
+
+  if (e > evacuate_size) {
+    e = evacuate_size;
+  }
+  if (sx < 0) {
+    s = 0;
+  }
+  for (i = s; i < e; i++) {
+    evacuate_cleanup_blocks(i);
+  }
+
+  // if we have wrapped, handle the end bit
+  if (sx <= 0) {
+    s = evacuate_size + sx - 2;
+    if (s < 0) {
+      s = 0;
+    }
+    for (i = s; i < evacuate_size; i++) {
+      evacuate_cleanup_blocks(i);
+    }
+  }
+}
+
+inline void
+StripeSM::evacuate_cleanup_blocks(int i)
+{
+  EvacuationBlock *b = evac_bucket_valid(i) ? evacuate[i].head : nullptr;
+  while (b) {
+    if (b->f.done && ((header->phase != dir_phase(&b->dir) && 
header->write_pos > this->vol_offset(&b->dir)) ||
+                      (header->phase == dir_phase(&b->dir) && 
header->write_pos <= this->vol_offset(&b->dir)))) {
+      EvacuationBlock *x = b;
+      DDbg(dbg_ctl_cache_evac, "evacuate cleanup free %X offset %d", 
(int)b->evac_frags.key.slice32(0), (int)dir_offset(&b->dir));
+      b = b->link.next;
+      evacuate[i].remove(x);
+      free_EvacuationBlock(x, mutex->thread_holding);
+      continue;
+    }
+    b = b->link.next;
+  }
+}
+
+void
+StripeSM::scan_for_pinned_documents()
+{
+  if (cache_config_permit_pinning) {
+    // we can't evacuate anything between header->write_pos and
+    // header->write_pos + AGG_SIZE.
+    int ps                = this->offset_to_vol_offset(header->write_pos + 
AGG_SIZE);
+    int pe                = this->offset_to_vol_offset(header->write_pos + 2 * 
EVACUATION_SIZE + (len / PIN_SCAN_EVERY));
+    int vol_end_offset    = this->offset_to_vol_offset(len + skip);
+    int before_end_of_vol = pe < vol_end_offset;
+    DDbg(dbg_ctl_cache_evac, "scan %d %d", ps, pe);
+    for (int i = 0; i < this->direntries(); i++) {
+      // is it a valid pinned object?
+      if (!dir_is_empty(&dir[i]) && dir_pinned(&dir[i]) && dir_head(&dir[i])) {
+        // select objects only within this PIN_SCAN region
+        int o = dir_offset(&dir[i]);
+        if (dir_phase(&dir[i]) == header->phase) {
+          if (before_end_of_vol || o >= (pe - vol_end_offset)) {
+            continue;
+          }
+        } else {
+          if (o < ps || o >= pe) {
+            continue;
+          }
+        }
+        force_evacuate_head(&dir[i], 1);
+      }
+    }
+  }
+}
+
+EvacuationBlock *
+StripeSM::force_evacuate_head(Dir const *evac_dir, int pinned)
+{
+  auto bucket = dir_evac_bucket(evac_dir);
+  if (!evac_bucket_valid(bucket)) {
+    DDbg(dbg_ctl_cache_evac, "dir_evac_bucket out of bounds, skipping 
evacuate: %" PRId64 "(%d), %d, %d", bucket, evacuate_size,
+         (int)dir_offset(evac_dir), (int)dir_phase(evac_dir));
+    return nullptr;
+  }
+
+  // build an evacuation block for the object
+  EvacuationBlock *b = evacuation_block_exists(evac_dir, this);
+  // if we have already started evacuating this document, its too late
+  // to evacuate the head...bad luck
+  if (b && b->f.done) {
+    return b;
+  }
+
+  if (!b) {
+    b      = new_EvacuationBlock(mutex->thread_holding);
+    b->dir = *evac_dir;
+    DDbg(dbg_ctl_cache_evac, "force: %d, %d", (int)dir_offset(evac_dir), 
(int)dir_phase(evac_dir));
+    evacuate[bucket].push(b);
+  }
+  b->f.pinned        = pinned;
+  b->f.evacuate_head = 1;
+  b->evac_frags.key.clear(); // ensure that the block gets evacuated no matter 
what
+  b->readers = 0;            // ensure that the block does not disappear
+  return b;
+}
+
+CacheEvacuateDocVC *
+new_DocEvacuator(int nbytes, StripeSM *stripe)
+{
+  CacheEvacuateDocVC *c = new_CacheEvacuateDocVC(stripe);
+  c->op_type            = static_cast<int>(CacheOpType::Evacuate);
+  Metrics::Gauge::increment(cache_rsb.status[c->op_type].active);
+  
Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active);
+  c->buf         = new_IOBufferData(iobuffer_size_to_index(nbytes, 
MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
+  c->stripe      = stripe;
+  c->f.evacuator = 1;
+  c->earliest_key.clear();
+  SET_CONTINUATION_HANDLER(c, &CacheEvacuateDocVC::evacuateDocDone);
+  return c;
+}
+
 int
 StripeSM::handle_header_read(int event, void *data)
 {
@@ -719,363 +879,239 @@ StripeSM::dir_init_done(int /* event ATS_UNUSED */, 
void * /* data ATS_UNUSED */
   }
 }
 
+/* NOTE:: This state can be called by an AIO thread, so DON'T DON'T
+   DON'T schedule any events on this thread using VC_SCHED_XXX or
+   mutex->thread_holding->schedule_xxx_local(). ALWAYS use
+   eventProcessor.schedule_xxx().
+   */
 int
-Stripe::dir_check()
+StripeSM::aggWriteDone(int event, Event *e)
 {
-  static int const SEGMENT_HISTOGRAM_WIDTH           = 16;
-  int              hist[SEGMENT_HISTOGRAM_WIDTH + 1] = {0};
-  unsigned short   chain_tag[MAX_ENTRIES_PER_SEGMENT];
-  int32_t          chain_mark[MAX_ENTRIES_PER_SEGMENT];
-  uint64_t         total_buckets = buckets * segments;
-  uint64_t         total_entries = total_buckets * DIR_DEPTH;
-  int              frag_demographics[1 << DIR_SIZE_WIDTH][DIR_BLOCK_SIZES];
-
-  int j;
-  int stale = 0, in_use = 0, empty = 0;
-  int free = 0, head = 0, buckets_in_use = 0;
-
-  int     max_chain_length = 0;
-  int64_t bytes_in_use     = 0;
-
-  ink_zero(frag_demographics);
-
-  printf("Stripe '[%s]'\n", hash_text.get());
-  printf("  Directory Bytes: %" PRIu64 "\n", total_buckets * SIZEOF_DIR);
-  printf("  Segments:  %d\n", segments);
-  printf("  Buckets per segment:   %" PRIu64 "\n", buckets);
-  printf("  Entries:   %" PRIu64 "\n", total_entries);
-
-  for (int s = 0; s < segments; s++) {
-    Dir *seg                = this->dir_segment(s);
-    int  seg_chain_max      = 0;
-    int  seg_empty          = 0;
-    int  seg_in_use         = 0;
-    int  seg_stale          = 0;
-    int  seg_bytes_in_use   = 0;
-    int  seg_dups           = 0;
-    int  seg_buckets_in_use = 0;
-
-    ink_zero(chain_tag);
-    memset(chain_mark, -1, sizeof(chain_mark));
-
-    for (int b = 0; b < buckets; b++) {
-      Dir *root = dir_bucket(b, seg);
-      int  h    = 0; // chain length starting in this bucket
-
-      // Walk the chain starting in this bucket
-      int chain_idx = 0;
-      int mark      = 0;
-      ++seg_buckets_in_use;
-      for (Dir *e = root; e; e = next_dir(e, seg)) {
-        if (!dir_offset(e)) {
-          ++seg_empty;
-          --seg_buckets_in_use;
-          // this should only happen on the first dir in a bucket
-          ink_assert(nullptr == next_dir(e, seg));
-          break;
-        } else {
-          int e_idx = e - seg;
-          ++h;
-          chain_tag[chain_idx++] = dir_tag(e);
-          if (chain_mark[e_idx] == mark) {
-            printf("    - Cycle of length %d detected for bucket %d\n", h, b);
-          } else if (chain_mark[e_idx] >= 0) {
-            printf("    - Entry %d is in chain %d and %d", e_idx, 
chain_mark[e_idx], mark);
-          } else {
-            chain_mark[e_idx] = mark;
-          }
-
-          if (!dir_valid(this, e)) {
-            ++seg_stale;
-          } else {
-            uint64_t size = dir_approx_size(e);
-            if (dir_head(e)) {
-              ++head;
-            }
-            ++seg_in_use;
-            seg_bytes_in_use += size;
-            ++frag_demographics[dir_size(e)][dir_big(e)];
-          }
-        }
-      }
+  cancel_trigger();
 
-      // Check for duplicates (identical tags in the same bucket).
-      if (h > 1) {
-        unsigned short last;
-        qsort(chain_tag, h, sizeof(chain_tag[0]), &compare_ushort);
-        last = chain_tag[0];
-        for (int k = 1; k < h; ++k) {
-          if (last == chain_tag[k]) {
-            ++seg_dups;
-          }
-          last = chain_tag[k];
-        }
-      }
-
-      ++hist[std::min(h, SEGMENT_HISTOGRAM_WIDTH)];
-      seg_chain_max = std::max(seg_chain_max, h);
+  // ensure we have the cacheDirSync lock if we intend to call it later
+  // retaking the current mutex recursively is a NOOP
+  CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, 
mutex->thread_holding);
+  if (!lock.is_locked()) {
+    eventProcessor.schedule_in(this, 
HRTIME_MSECONDS(cache_config_mutex_retry_delay));
+    return EVENT_CONT;
+  }
+  if (io.ok()) {
+    header->last_write_pos  = header->write_pos;
+    header->write_pos      += io.aiocb.aio_nbytes;
+    ink_assert(header->write_pos >= start);
+    DDbg(dbg_ctl_cache_agg, "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 
"", hash_text.get(), header->write_pos,
+         header->last_write_pos);
+    ink_assert(header->write_pos == header->agg_pos);
+    if (header->write_pos + EVACUATION_SIZE > scan_pos) {
+      periodic_scan();
     }
-    int fl_size       = dir_freelist_length(this, s);
-    in_use           += seg_in_use;
-    empty            += seg_empty;
-    stale            += seg_stale;
-    free             += fl_size;
-    buckets_in_use   += seg_buckets_in_use;
-    max_chain_length  = std::max(max_chain_length, seg_chain_max);
-    bytes_in_use     += seg_bytes_in_use;
-
-    printf("  - Segment-%d | Entries: used=%d stale=%d free=%d disk-bytes=%d 
Buckets: used=%d empty=%d max=%d avg=%.2f dups=%d\n",
-           s, seg_in_use, seg_stale, fl_size, seg_bytes_in_use, 
seg_buckets_in_use, seg_empty, seg_chain_max,
-           seg_buckets_in_use ? static_cast<float>(seg_in_use + seg_stale) / 
seg_buckets_in_use : 0.0, seg_dups);
-  }
-
-  printf("  - Stripe | Entries: in-use=%d stale=%d free=%d Buckets: empty=%d 
max=%d avg=%.2f\n", in_use, stale, free, empty,
-         max_chain_length, buckets_in_use ? static_cast<float>(in_use + stale) 
/ buckets_in_use : 0);
-
-  printf("    Chain lengths:  ");
-  for (j = 0; j < SEGMENT_HISTOGRAM_WIDTH; ++j) {
-    printf(" %d=%d ", j, hist[j]);
-  }
-  printf(" %d>=%d\n", SEGMENT_HISTOGRAM_WIDTH, hist[SEGMENT_HISTOGRAM_WIDTH]);
-
-  char tt[256];
-  printf("    Total Size:      %" PRIu64 "\n", static_cast<uint64_t>(len));
-  printf("    Bytes in Use:    %" PRIu64 " [%0.2f%%]\n", bytes_in_use, 100.0 * 
(static_cast<float>(bytes_in_use) / len));
-  printf("    Objects:         %d\n", head);
-  printf("    Average Size:    %" PRIu64 "\n", head ? (bytes_in_use / head) : 
0);
-  printf("    Average Frags:   %.2f\n", head ? static_cast<float>(in_use) / 
head : 0);
-  printf("    Write Position:  %" PRIu64 "\n", header->write_pos - start);
-  printf("    Wrap Count:      %d\n", header->cycle);
-  printf("    Phase:           %s\n", header->phase ? "true" : "false");
-  ink_ctime_r(&header->create_time, tt);
-  tt[strlen(tt) - 1] = 0;
-  printf("    Sync Serial:     %u\n", header->sync_serial);
-  printf("    Write Serial:    %u\n", header->write_serial);
-  printf("    Create Time:     %s\n", tt);
-  printf("\n");
-  printf("  Fragment size demographics\n");
-  for (int b = 0; b < DIR_BLOCK_SIZES; ++b) {
-    int block_size = DIR_BLOCK_SIZE(b);
-    int s          = 0;
-    while (s < 1 << DIR_SIZE_WIDTH) {
-      for (int j = 0; j < 8; ++j, ++s) {
-        // The size markings are redundant. Low values (less than 
DIR_SHIFT_WIDTH) for larger
-        // base block sizes should never be used. Such entries should use the 
next smaller base block size.
-        if (b > 0 && s < 1 << DIR_BLOCK_SHIFT(1)) {
-          ink_assert(frag_demographics[s][b] == 0);
-          continue;
-        }
-        printf(" %8d[%2d:%1d]:%06d", (s + 1) * block_size, s, b, 
frag_demographics[s][b]);
-      }
-      printf("\n");
+    this->_write_buffer.reset_buffer_pos();
+    header->write_serial++;
+  } else {
+    // delete all the directory entries that we inserted
+    // for fragments is this aggregation buffer
+    Dbg(dbg_ctl_cache_disk_error, "Write error on disk %s\n \
+            write range : [%" PRIu64 " - %" PRIu64 " bytes]  [%" PRIu64 " - %" 
PRIu64 " blocks] \n",
+        hash_text.get(), (uint64_t)io.aiocb.aio_offset, 
(uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
+        (uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE, 
(uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
+    Dir del_dir;
+    dir_clear(&del_dir);
+    for (int done = 0; done < this->_write_buffer.get_buffer_pos();) {
+      Doc *doc = reinterpret_cast<Doc *>(this->_write_buffer.get_buffer() + 
done);
+      dir_set_offset(&del_dir, header->write_pos + done);
+      dir_delete(&doc->key, this, &del_dir);
+      done += round_to_approx_size(doc->len);
     }
+    this->_write_buffer.reset_buffer_pos();
   }
-  printf("\n");
-
-  return 0;
-}
-
-void
-Stripe::_clear_init()
-{
-  size_t dir_len = this->dirlen();
-  memset(this->raw_dir, 0, dir_len);
-  this->_init_dir();
-  this->header->magic          = STRIPE_MAGIC;
-  this->header->version._major = CACHE_DB_MAJOR_VERSION;
-  this->header->version._minor = CACHE_DB_MINOR_VERSION;
-  this->scan_pos = this->header->agg_pos = this->header->write_pos = 
this->start;
-  this->header->last_write_pos                                     = 
this->header->write_pos;
-  this->header->phase                                              = 0;
-  this->header->cycle                                              = 0;
-  this->header->create_time                                        = 
time(nullptr);
-  this->header->dirty                                              = 0;
-  this->sector_size = this->header->sector_size = this->disk->hw_sector_size;
-  *this->footer                                 = *this->header;
-}
-
-void
-Stripe::_init_dir()
-{
-  int b, s, l;
-
-  for (s = 0; s < this->segments; s++) {
-    this->header->freelist[s] = 0;
-    Dir *seg                  = this->dir_segment(s);
-    for (l = 1; l < DIR_DEPTH; l++) {
-      for (b = 0; b < this->buckets; b++) {
-        Dir *bucket = dir_bucket(b, seg);
-        dir_free_entry(dir_bucket_row(bucket, l), s, this);
-      }
+  set_io_not_in_progress();
+  // callback ready sync CacheVCs
+  CacheVC *c = nullptr;
+  while ((c = sync.dequeue())) {
+    if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) {
+      eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
+    } else {
+      sync.push(c); // put it back on the front
+      break;
     }
   }
+  if (dir_sync_waiting) {
+    dir_sync_waiting = false;
+    cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr);
+  }
+  if (this->_write_buffer.get_pending_writers().head || sync.head) {
+    return aggWrite(event, e);
+  }
+  return EVENT_CONT;
 }
 
-void
-Stripe::_init_data_internal()
+/* NOTE: This state can be called by an AIO thread, so DON'T DON'T
+   DON'T schedule any events on this thread using VC_SCHED_XXX or
+   mutex->thread_holding->schedule_xxx_local(). ALWAYS use
+   eventProcessor.schedule_xxx().
+   Also, make sure that any functions called by this also use
+   the eventProcessor to schedule events
+*/
+int
+StripeSM::aggWrite(int event, void * /* e ATS_UNUSED */)
 {
-  // step1: calculate the number of entries.
-  off_t total_entries = (this->len - (this->start - this->skip)) / 
cache_config_min_average_object_size;
-  // step2: calculate the number of buckets
-  off_t total_buckets = total_entries / DIR_DEPTH;
-  // step3: calculate the number of segments, no segment has more than 16384 
buckets
-  this->segments = (total_buckets + (((1 << 16) - 1) / DIR_DEPTH)) / ((1 << 
16) / DIR_DEPTH);
-  // step4: divide total_buckets into segments on average.
-  this->buckets = (total_buckets + this->segments - 1) / this->segments;
-  // step5: set the start pointer.
-  this->start = this->skip + 2 * this->dirlen();
-}
+  ink_assert(!is_io_in_progress());
 
-void
-Stripe::_init_data()
-{
-  // iteratively calculate start + buckets
-  this->_init_data_internal();
-  this->_init_data_internal();
-  this->_init_data_internal();
-}
+  Que(CacheVC, link) tocall;
+  CacheVC *c;
 
-bool
-StripeSM::add_writer(CacheVC *vc)
-{
-  ink_assert(vc);
-  this->_write_buffer.add_bytes_pending_aggregation(vc->agg_len);
-  // An extra AGG_SIZE is added to the backlog here, but not in
-  // open_write, at the time I'm writing this comment. I venture to
-  // guess that because the stripe lock may be released between
-  // open_write and add_writer (I have checked this), the number of
-  // bytes pending aggregation lags and is inaccurate. Therefore the
-  // check in open_write is too permissive, and once we get to add_writer
-  // and update our bytes pending, we may discover we have more backlog
-  // than we thought we did. The solution to the problem was to permit
-  // an aggregation buffer extra of backlog here. That's my analysis.
-  bool agg_error =
-    (vc->agg_len > AGG_SIZE || vc->header_len + sizeof(Doc) > MAX_FRAG_SIZE ||
-     (!vc->f.readers && (this->_write_buffer.get_bytes_pending_aggregation() > 
cache_config_agg_write_backlog + AGG_SIZE) &&
-      vc->write_len));
-#ifdef CACHE_AGG_FAIL_RATE
-  agg_error = agg_error || 
((uint32_t)vc->mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX 
* CACHE_AGG_FAIL_RATE));
-#endif
+  cancel_trigger();
 
-  if (agg_error) {
-    this->_write_buffer.add_bytes_pending_aggregation(-vc->agg_len);
-  } else {
-    ink_assert(vc->agg_len <= AGG_SIZE);
-    if (vc->f.evac_vector) {
-      this->get_pending_writers().push(vc);
-    } else {
-      this->get_pending_writers().enqueue(vc);
+Lagain:
+  this->aggregate_pending_writes(tocall);
+
+  // if we got nothing...
+  if (this->_write_buffer.is_empty()) {
+    if (!this->_write_buffer.get_pending_writers().head && !sync.head) { // 
nothing to get
+      return EVENT_CONT;
+    }
+    if (header->write_pos == start) {
+      // write aggregation too long, bad bad, punt on everything.
+      Note("write aggregation exceeds vol size");
+      ink_assert(!tocall.head);
+      ink_assert(false);
+      while ((c = this->get_pending_writers().dequeue())) {
+        this->_write_buffer.add_bytes_pending_aggregation(-c->agg_len);
+        eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
+      }
+      return EVENT_CONT;
+    }
+    // start back
+    if (this->get_pending_writers().head) {
+      agg_wrap();
+      goto Lagain;
     }
   }
 
-  return !agg_error;
-}
-
-void
-StripeSM::shutdown(EThread *shutdown_thread)
-{
-  // the process is going down, do a blocking call
-  // dont release the volume's lock, there could
-  // be another aggWrite in progress
-  MUTEX_TAKE_LOCK(this->mutex, shutdown_thread);
-
-  if (DISK_BAD(this->disk)) {
-    Dbg(dbg_ctl_cache_dir_sync, "Dir %s: ignoring -- bad disk", 
this->hash_text.get());
-    return;
+  // evacuate space
+  off_t end = header->write_pos + this->_write_buffer.get_buffer_pos() + 
EVACUATION_SIZE;
+  if (evac_range(header->write_pos, end, !header->phase) < 0) {
+    goto Lwait;
   }
-  size_t dirlen = this->dirlen();
-  ink_assert(dirlen > 0); // make clang happy - if not > 0 the vol is 
seriously messed up
-  if (!this->header->dirty && !this->dir_sync_in_progress) {
-    Dbg(dbg_ctl_cache_dir_sync, "Dir %s: ignoring -- not dirty", 
this->hash_text.get());
-    return;
+  if (end > skip + len) {
+    if (evac_range(start, start + (end - (skip + len)), header->phase) < 0) {
+      goto Lwait;
+    }
   }
-  // recompute hit_evacuate_window
-  this->hit_evacuate_window = (this->data_blocks * 
cache_config_hit_evacuate_percent) / 100;
 
-  // check if we have data in the agg buffer
-  // dont worry about the cachevc s in the agg queue
-  // directories have not been inserted for these writes
-  if (!this->_write_buffer.is_empty()) {
-    Dbg(dbg_ctl_cache_dir_sync, "Dir %s: flushing agg buffer first", 
this->hash_text.get());
-    this->flush_aggregate_write_buffer();
+  // if write_buffer.get_pending_writers.head, then we are near the end of the 
disk, so
+  // write down the aggregation in whatever size it is.
+  if (this->_write_buffer.get_buffer_pos() < AGG_HIGH_WATER && 
!this->_write_buffer.get_pending_writers().head && !sync.head &&
+      !dir_sync_waiting) {
+    goto Lwait;
   }
 
-  // We already asserted that dirlen > 0.
-  if (!this->dir_sync_in_progress) {
-    this->header->sync_serial++;
-  } else {
-    Dbg(dbg_ctl_cache_dir_sync, "Periodic dir sync in progress -- 
overwriting");
+  // write sync marker
+  if (this->_write_buffer.is_empty()) {
+    ink_assert(sync.head);
+    int l = round_to_approx_size(sizeof(Doc));
+    this->_write_buffer.seek(l);
+    Doc *d = reinterpret_cast<Doc *>(this->_write_buffer.get_buffer());
+    memset(static_cast<void *>(d), 0, sizeof(Doc));
+    d->magic        = DOC_MAGIC;
+    d->len          = l;
+    d->sync_serial  = header->sync_serial;
+    d->write_serial = header->write_serial;
   }
-  this->footer->sync_serial = this->header->sync_serial;
 
-  CHECK_DIR(d);
-  size_t B     = this->header->sync_serial & 1;
-  off_t  start = this->skip + (B ? dirlen : 0);
-  B            = pwrite(this->fd, this->raw_dir, dirlen, start);
-  ink_assert(B == dirlen);
-  Dbg(dbg_ctl_cache_dir_sync, "done syncing dir for vol %s", 
this->hash_text.get());
-}
+  // set write limit
+  header->agg_pos = header->write_pos + this->_write_buffer.get_buffer_pos();
 
-static void
-init_document(CacheVC const *vc, Doc *doc, int const len)
-{
-  doc->magic     = DOC_MAGIC;
-  doc->len       = len;
-  doc->hlen      = vc->header_len;
-  doc->doc_type  = vc->frag_type;
-  doc->v_major   = CACHE_DB_MAJOR_VERSION;
-  doc->v_minor   = CACHE_DB_MINOR_VERSION;
-  doc->unused    = 0; // force this for forward compatibility.
-  doc->total_len = vc->total_len;
-  doc->first_key = vc->first_key;
-  doc->checksum  = DOC_NO_CHECKSUM;
+  io.aiocb.aio_fildes = fd;
+  io.aiocb.aio_offset = header->write_pos;
+  io.aiocb.aio_buf    = this->_write_buffer.get_buffer();
+  io.aiocb.aio_nbytes = this->_write_buffer.get_buffer_pos();
+  io.action           = this;
+  /*
+    Callback on AIO thread so that we can issue a new write ASAP
+    as all writes are serialized in the volume.  This is not necessary
+    for reads proceed independently.
+   */
+  io.thread = AIO_CALLBACK_THREAD_AIO;
+  SET_HANDLER(&StripeSM::aggWriteDone);
+  ink_aio_write(&io);
+
+Lwait:
+  int ret = EVENT_CONT;
+  while ((c = tocall.dequeue())) {
+    if (event == EVENT_CALL && c->mutex->thread_holding == 
mutex->thread_holding) {
+      ret = EVENT_RETURN;
+    } else {
+      eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
+    }
+  }
+  return ret;
 }
 
-static void
-update_header_info(CacheVC *vc, Doc *doc)
+void
+StripeSM::aggregate_pending_writes(Queue<CacheVC, Continuation::Link_link> 
&tocall)
 {
-  if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
-    ink_assert(vc->write_vector->count() > 0);
-    if (!vc->f.update && !vc->f.evac_vector) {
-      ink_assert(!(vc->first_key.is_zero()));
-      CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
-      http_info->object_size_set(vc->total_len);
+  for (auto *c = static_cast<CacheVC 
*>(this->_write_buffer.get_pending_writers().head); c;) {
+    int writelen = c->agg_len;
+    // [amc] this is checked multiple places, on here was it strictly less.
+    ink_assert(writelen <= AGG_SIZE);
+    if (this->_write_buffer.get_buffer_pos() + writelen > AGG_SIZE ||
+        this->header->write_pos + this->_write_buffer.get_buffer_pos() + 
writelen > (this->skip + this->len)) {
+      break;
     }
-    // update + data_written =>  Update case (b)
-    // need to change the old alternate's object length
-    if (vc->f.update && vc->total_len) {
-      CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
-      http_info->object_size_set(vc->total_len);
+    DDbg(dbg_ctl_agg_read, "copying: %d, %" PRIu64 ", key: %d", 
this->_write_buffer.get_buffer_pos(),
+         this->header->write_pos + this->_write_buffer.get_buffer_pos(), 
c->first_key.slice32(0));
+    [[maybe_unused]] int wrotelen = this->_agg_copy(c);
+    ink_assert(writelen == wrotelen);
+    CacheVC *n = (CacheVC *)c->link.next;
+    this->_write_buffer.get_pending_writers().dequeue();
+    if (c->f.sync && c->f.use_first_key) {
+      CacheVC *last = this->sync.tail;
+      while (last && UINT_WRAP_LT(c->write_serial, last->write_serial)) {
+        last = (CacheVC *)last->link.prev;
+      }
+      this->sync.insert(c, last);
+    } else if (c->f.evacuator) {
+      c->handleEvent(AIO_EVENT_DONE, nullptr);
+    } else {
+      tocall.enqueue(c);
     }
-    ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
-    ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), 
vc->header_len));
-  } else {
-    memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
+    c = n;
   }
 }
 
-static void
-update_document_key(CacheVC *vc, Doc *doc)
+int
+StripeSM::_agg_copy(CacheVC *vc)
 {
-  if (vc->f.use_first_key) {
-    if (doc->data_len() || vc->f.allow_empty_doc) {
-      doc->key = vc->earliest_key;
-    } else { // the vector is being written by itself
-      if (vc->earliest_key.is_zero()) {
-        do {
-          rand_CacheKey(&doc->key);
-        } while (DIR_MASK_TAG(doc->key.slice32(2)) == 
DIR_MASK_TAG(vc->first_key.slice32(2)));
-      } else {
-        prev_CacheKey(&doc->key, &vc->earliest_key);
-      }
-    }
-    dir_set_head(&vc->dir, true);
+  if (vc->f.evacuator) {
+    return this->_copy_evacuator_to_aggregation(vc);
   } else {
-    doc->key = vc->key;
-    dir_set_head(&vc->dir, !vc->fragment);
+    return this->_copy_writer_to_aggregation(vc);
   }
 }
 
+int
+StripeSM::_copy_evacuator_to_aggregation(CacheVC *vc)
+{
+  Doc *doc         = reinterpret_cast<Doc *>(vc->buf->data());
+  int  approx_size = this->round_to_approx_size(doc->len);
+
+  Metrics::Counter::increment(cache_rsb.gc_frags_evacuated);
+  Metrics::Counter::increment(this->cache_vol->vol_rsb.gc_frags_evacuated);
+
+  doc->sync_serial  = this->header->sync_serial;
+  doc->write_serial = this->header->write_serial;
+
+  off_t doc_offset{this->header->write_pos + 
this->_write_buffer.get_buffer_pos()};
+  this->_write_buffer.add(doc, approx_size);
+
+  vc->dir = vc->overwrite_dir;
+  dir_set_offset(&vc->dir, this->offset_to_vol_offset(doc_offset));
+  dir_set_phase(&vc->dir, this->header->phase);
+  return approx_size;
+}
+
 int
 StripeSM::_copy_writer_to_aggregation(CacheVC *vc)
 {
@@ -1148,53 +1184,356 @@ StripeSM::_copy_writer_to_aggregation(CacheVC *vc)
   return vc->agg_len;
 }
 
+static void
+init_document(CacheVC const *vc, Doc *doc, int const len)
+{
+  doc->magic     = DOC_MAGIC;
+  doc->len       = len;
+  doc->hlen      = vc->header_len;
+  doc->doc_type  = vc->frag_type;
+  doc->v_major   = CACHE_DB_MAJOR_VERSION;
+  doc->v_minor   = CACHE_DB_MINOR_VERSION;
+  doc->unused    = 0; // force this for forward compatibility.
+  doc->total_len = vc->total_len;
+  doc->first_key = vc->first_key;
+  doc->checksum  = DOC_NO_CHECKSUM;
+}
+
+static void
+update_document_key(CacheVC *vc, Doc *doc)
+{
+  if (vc->f.use_first_key) {
+    if (doc->data_len() || vc->f.allow_empty_doc) {
+      doc->key = vc->earliest_key;
+    } else { // the vector is being written by itself
+      if (vc->earliest_key.is_zero()) {
+        do {
+          rand_CacheKey(&doc->key);
+        } while (DIR_MASK_TAG(doc->key.slice32(2)) == 
DIR_MASK_TAG(vc->first_key.slice32(2)));
+      } else {
+        prev_CacheKey(&doc->key, &vc->earliest_key);
+      }
+    }
+    dir_set_head(&vc->dir, true);
+  } else {
+    doc->key = vc->key;
+    dir_set_head(&vc->dir, !vc->fragment);
+  }
+}
+
+static void
+update_header_info(CacheVC *vc, Doc *doc)
+{
+  if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
+    ink_assert(vc->write_vector->count() > 0);
+    if (!vc->f.update && !vc->f.evac_vector) {
+      ink_assert(!(vc->first_key.is_zero()));
+      CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+      http_info->object_size_set(vc->total_len);
+    }
+    // update + data_written =>  Update case (b)
+    // need to change the old alternate's object length
+    if (vc->f.update && vc->total_len) {
+      CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+      http_info->object_size_set(vc->total_len);
+    }
+    ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
+    ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), 
vc->header_len));
+  } else {
+    memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
+  }
+}
+
+void
+StripeSM::agg_wrap()
+{
+  header->write_pos = start;
+  header->phase     = !header->phase;
+
+  header->cycle++;
+  header->agg_pos = header->write_pos;
+  dir_lookaside_cleanup(this);
+  dir_clean_vol(this);
+  {
+    StripeSM *stripe = this;
+    Metrics::Counter::increment(cache_rsb.directory_wrap);
+    Metrics::Counter::increment(stripe->cache_vol->vol_rsb.directory_wrap);
+    Note("Cache volume %d on disk '%s' wraps around", 
stripe->cache_vol->vol_number, stripe->hash_text.get());
+  }
+  periodic_scan();
+}
+
 int
-StripeSM::_copy_evacuator_to_aggregation(CacheVC *vc)
+StripeSM::evac_range(off_t low, off_t high, int evac_phase)
 {
-  Doc *doc         = reinterpret_cast<Doc *>(vc->buf->data());
-  int  approx_size = this->round_to_approx_size(doc->len);
+  off_t s  = this->offset_to_vol_offset(low);
+  off_t e  = this->offset_to_vol_offset(high);
+  int   si = dir_offset_evac_bucket(s);
+  int   ei = dir_offset_evac_bucket(e);
+
+  for (int i = si; i <= ei; i++) {
+    EvacuationBlock *b            = evacuate[i].head;
+    EvacuationBlock *first        = nullptr;
+    int64_t          first_offset = INT64_MAX;
+    for (; b; b = b->link.next) {
+      int64_t offset = dir_offset(&b->dir);
+      int     phase  = dir_phase(&b->dir);
+      if (offset >= s && offset < e && !b->f.done && phase == evac_phase) {
+        if (offset < first_offset) {
+          first        = b;
+          first_offset = offset;
+        }
+      }
+    }
+    if (first) {
+      first->f.done       = 1;
+      io.aiocb.aio_fildes = fd;
+      io.aiocb.aio_nbytes = dir_approx_size(&first->dir);
+      io.aiocb.aio_offset = this->vol_offset(&first->dir);
+      if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > 
static_cast<off_t>(skip + len)) {
+        io.aiocb.aio_nbytes = skip + len - io.aiocb.aio_offset;
+      }
+      doc_evacuator                = new_DocEvacuator(io.aiocb.aio_nbytes, 
this);
+      doc_evacuator->overwrite_dir = first->dir;
+
+      io.aiocb.aio_buf = doc_evacuator->buf->data();
+      io.action        = this;
+      io.thread        = AIO_CALLBACK_THREAD_ANY;
+      DDbg(dbg_ctl_cache_evac, "evac_range evacuating %X %d", 
(int)dir_tag(&first->dir), (int)dir_offset(&first->dir));
+      SET_HANDLER(&StripeSM::evacuateDocReadDone);
+      ink_assert(ink_aio_read(&io) >= 0);
+      return -1;
+    }
+  }
+  return 0;
+}
 
-  Metrics::Counter::increment(cache_rsb.gc_frags_evacuated);
-  Metrics::Counter::increment(this->cache_vol->vol_rsb.gc_frags_evacuated);
+int
+StripeSM::evacuateDocReadDone(int event, Event *e)
+{
+  cancel_trigger();
+  if (event != AIO_EVENT_DONE) {
+    return EVENT_DONE;
+  }
+  ink_assert(is_io_in_progress());
+  set_io_not_in_progress();
+  ink_assert(mutex->thread_holding == this_ethread());
+  Doc             *doc = reinterpret_cast<Doc *>(doc_evacuator->buf->data());
+  CacheKey         next_key;
+  EvacuationBlock *b      = nullptr;
+  auto             bucket = dir_evac_bucket(&doc_evacuator->overwrite_dir);
+  if (doc->magic != DOC_MAGIC) {
+    Dbg(dbg_ctl_cache_evac, "DOC magic: %X %d", 
(int)dir_tag(&doc_evacuator->overwrite_dir),
+        (int)dir_offset(&doc_evacuator->overwrite_dir));
+    ink_assert(doc->magic == DOC_MAGIC);
+    goto Ldone;
+  }
+  DDbg(dbg_ctl_cache_evac, "evacuateDocReadDone %X offset %d", 
(int)doc->key.slice32(0),
+       (int)dir_offset(&doc_evacuator->overwrite_dir));
 
-  doc->sync_serial  = this->header->sync_serial;
-  doc->write_serial = this->header->write_serial;
+  if (evac_bucket_valid(bucket)) {
+    b = evacuate[bucket].head;
+  }
+  while (b) {
+    if (dir_offset(&b->dir) == dir_offset(&doc_evacuator->overwrite_dir)) {
+      break;
+    }
+    b = b->link.next;
+  }
+  if (!b) {
+    goto Ldone;
+  }
+  // coverity[Y2K38_SAFETY:FALSE]
+  if ((b->f.pinned && !b->readers) && doc->pinned < 
static_cast<uint32_t>(ink_get_hrtime() / HRTIME_SECOND)) {
+    goto Ldone;
+  }
 
-  off_t doc_offset{this->header->write_pos + 
this->_write_buffer.get_buffer_pos()};
-  this->_write_buffer.add(doc, approx_size);
+  if (dir_head(&b->dir) && b->f.evacuate_head) {
+    ink_assert(!b->evac_frags.key.fold());
+    // if its a head (vector), evacuation is real simple...we just
+    // need to write this vector down and overwrite the directory entry.
+    if (dir_compare_tag(&b->dir, &doc->first_key)) {
+      doc_evacuator->key = doc->first_key;
+      b->evac_frags.key  = doc->first_key;
+      DDbg(dbg_ctl_cache_evac, "evacuating vector %X offset %d", 
(int)doc->first_key.slice32(0),
+           (int)dir_offset(&doc_evacuator->overwrite_dir));
+      b->f.unused = 57;
+    } else {
+      // if its an earliest fragment (alternate) evacuation, things get
+      // a little tricky. We have to propagate the earliest key to the next
+      // fragments for this alternate. The last fragment to be evacuated
+      // fixes up the lookaside buffer.
+      doc_evacuator->key          = doc->key;
+      doc_evacuator->earliest_key = doc->key;
+      b->evac_frags.key           = doc->key;
+      b->evac_frags.earliest_key  = doc->key;
+      b->earliest_evacuator       = doc_evacuator;
+      DDbg(dbg_ctl_cache_evac, "evacuating earliest %X %X evac: %p offset: 
%d", (int)b->evac_frags.key.slice32(0),
+           (int)doc->key.slice32(0), doc_evacuator, 
(int)dir_offset(&doc_evacuator->overwrite_dir));
+      b->f.unused = 67;
+    }
+  } else {
+    // find which key matches the document
+    EvacuationKey *ek = &b->evac_frags;
+    for (; ek && !(ek->key == doc->key); ek = ek->link.next) {
+      ;
+    }
+    if (!ek) {
+      b->f.unused = 77;
+      goto Ldone;
+    }
+    doc_evacuator->key          = ek->key;
+    doc_evacuator->earliest_key = ek->earliest_key;
+    DDbg(dbg_ctl_cache_evac, "evacuateDocReadDone key: %X earliest: %X", 
(int)ek->key.slice32(0), (int)ek->earliest_key.slice32(0));
+    b->f.unused = 87;
+  }
+  // if the tag in the c->dir does match the first_key in the
+  // document, then it has to be the earliest fragment. We guarantee that
+  // the first_key and the earliest_key will never collide (see
+  // Cache::open_write).
+  if (!dir_head(&b->dir) || !dir_compare_tag(&b->dir, &doc->first_key)) {
+    next_CacheKey(&next_key, &doc->key);
+    evacuate_fragments(&next_key, &doc_evacuator->earliest_key, !b->readers, 
this);
+  }
+  return evacuateWrite(doc_evacuator, event, e);
+Ldone:
+  free_CacheEvacuateDocVC(doc_evacuator);
+  doc_evacuator = nullptr;
+  return aggWrite(event, e);
+}
 
-  vc->dir = vc->overwrite_dir;
-  dir_set_offset(&vc->dir, this->offset_to_vol_offset(doc_offset));
-  dir_set_phase(&vc->dir, this->header->phase);
-  return approx_size;
+static int
+evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int force, StripeSM 
*stripe)
+{
+  Dir dir, *last_collision = nullptr;
+  int i = 0;
+  while (dir_probe(key, stripe, &dir, &last_collision)) {
+    // next fragment cannot be a head...if it is, it must have been a
+    // directory collision.
+    if (dir_head(&dir)) {
+      continue;
+    }
+    EvacuationBlock *b = evacuation_block_exists(&dir, stripe);
+    if (!b) {
+      b                          = 
new_EvacuationBlock(stripe->mutex->thread_holding);
+      b->dir                     = dir;
+      b->evac_frags.key          = *key;
+      b->evac_frags.earliest_key = *earliest_key;
+      stripe->evacuate[dir_evac_bucket(&dir)].push(b);
+      i++;
+    } else {
+      ink_assert(dir_offset(&dir) == dir_offset(&b->dir));
+      ink_assert(dir_phase(&dir) == dir_phase(&b->dir));
+      EvacuationKey *evac_frag = evacuationKeyAllocator.alloc();
+      evac_frag->key           = *key;
+      evac_frag->earliest_key  = *earliest_key;
+      evac_frag->link.next     = b->evac_frags.link.next;
+      b->evac_frags.link.next  = evac_frag;
+    }
+    if (force) {
+      b->readers = 0;
+    }
+    DDbg(dbg_ctl_cache_evac, "next fragment %X Earliest: %X offset %d phase %d 
force %d", (int)key->slice32(0),
+         (int)earliest_key->slice32(0), (int)dir_offset(&dir), 
(int)dir_phase(&dir), force);
+  }
+  return i;
+}
+
+int
+StripeSM::evacuateWrite(CacheEvacuateDocVC *evacuator, int event, Event *e)
+{
+  // push to front of aggregation write list, so it is written first
+
+  evacuator->agg_len = round_to_approx_size((reinterpret_cast<Doc 
*>(evacuator->buf->data()))->len);
+  this->_write_buffer.add_bytes_pending_aggregation(evacuator->agg_len);
+  /* insert the evacuator after all the other evacuators */
+  CacheVC *cur   = static_cast<CacheVC 
*>(this->_write_buffer.get_pending_writers().head);
+  CacheVC *after = nullptr;
+  for (; cur && cur->f.evacuator; cur = (CacheVC *)cur->link.next) {
+    after = cur;
+  }
+  ink_assert(evacuator->agg_len <= AGG_SIZE);
+  this->_write_buffer.get_pending_writers().insert(evacuator, after);
+  return aggWrite(event, e);
 }
 
 bool
-Stripe::flush_aggregate_write_buffer()
+StripeSM::add_writer(CacheVC *vc)
 {
-  // set write limit
-  this->header->agg_pos = this->header->write_pos + 
this->_write_buffer.get_buffer_pos();
+  ink_assert(vc);
+  this->_write_buffer.add_bytes_pending_aggregation(vc->agg_len);
+  // An extra AGG_SIZE is added to the backlog here, but not in
+  // open_write, at the time I'm writing this comment. I venture to
+  // guess that because the stripe lock may be released between
+  // open_write and add_writer (I have checked this), the number of
+  // bytes pending aggregation lags and is inaccurate. Therefore the
+  // check in open_write is too permissive, and once we get to add_writer
+  // and update our bytes pending, we may discover we have more backlog
+  // than we thought we did. The solution to the problem was to permit
+  // an aggregation buffer extra of backlog here. That's my analysis.
+  bool agg_error =
+    (vc->agg_len > AGG_SIZE || vc->header_len + sizeof(Doc) > MAX_FRAG_SIZE ||
+     (!vc->f.readers && (this->_write_buffer.get_bytes_pending_aggregation() > 
cache_config_agg_write_backlog + AGG_SIZE) &&
+      vc->write_len));
+#ifdef CACHE_AGG_FAIL_RATE
+  agg_error = agg_error || 
((uint32_t)vc->mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX 
* CACHE_AGG_FAIL_RATE));
+#endif
 
-  if (!this->_write_buffer.flush(this->fd, this->header->write_pos)) {
-    return false;
+  if (agg_error) {
+    this->_write_buffer.add_bytes_pending_aggregation(-vc->agg_len);
+  } else {
+    ink_assert(vc->agg_len <= AGG_SIZE);
+    if (vc->f.evac_vector) {
+      this->get_pending_writers().push(vc);
+    } else {
+      this->get_pending_writers().enqueue(vc);
+    }
   }
-  this->header->last_write_pos  = this->header->write_pos;
-  this->header->write_pos      += this->_write_buffer.get_buffer_pos();
-  ink_assert(this->header->write_pos == this->header->agg_pos);
-  this->_write_buffer.reset_buffer_pos();
-  this->header->write_serial++;
 
-  return true;
+  return !agg_error;
 }
 
-bool
-Stripe::copy_from_aggregate_write_buffer(char *dest, Dir const &dir, size_t 
nbytes) const
+void
+StripeSM::shutdown(EThread *shutdown_thread)
 {
-  if (!dir_agg_buf_valid(this, &dir)) {
-    return false;
+  // the process is going down, do a blocking call
+  // dont release the volume's lock, there could
+  // be another aggWrite in progress
+  MUTEX_TAKE_LOCK(this->mutex, shutdown_thread);
+
+  if (DISK_BAD(this->disk)) {
+    Dbg(dbg_ctl_cache_dir_sync, "Dir %s: ignoring -- bad disk", 
this->hash_text.get());
+    return;
+  }
+  size_t dirlen = this->dirlen();
+  ink_assert(dirlen > 0); // make clang happy - if not > 0 the vol is 
seriously messed up
+  if (!this->header->dirty && !this->dir_sync_in_progress) {
+    Dbg(dbg_ctl_cache_dir_sync, "Dir %s: ignoring -- not dirty", 
this->hash_text.get());
+    return;
+  }
+  // recompute hit_evacuate_window
+  this->hit_evacuate_window = (this->data_blocks * 
cache_config_hit_evacuate_percent) / 100;
+
+  // check if we have data in the agg buffer
+  // dont worry about the cachevc s in the agg queue
+  // directories have not been inserted for these writes
+  if (!this->_write_buffer.is_empty()) {
+    Dbg(dbg_ctl_cache_dir_sync, "Dir %s: flushing agg buffer first", 
this->hash_text.get());
+    this->flush_aggregate_write_buffer();
   }
 
-  int agg_offset = this->vol_offset(&dir) - this->header->write_pos;
-  this->_write_buffer.copy_from(dest, agg_offset, nbytes);
-  return true;
+  // We already asserted that dirlen > 0.
+  if (!this->dir_sync_in_progress) {
+    this->header->sync_serial++;
+  } else {
+    Dbg(dbg_ctl_cache_dir_sync, "Periodic dir sync in progress -- 
overwriting");
+  }
+  this->footer->sync_serial = this->header->sync_serial;
+
+  CHECK_DIR(d);
+  size_t B     = this->header->sync_serial & 1;
+  off_t  start = this->skip + (B ? dirlen : 0);
+  B            = pwrite(this->fd, this->raw_dir, dirlen, start);
+  ink_assert(B == dirlen);
+  Dbg(dbg_ctl_cache_dir_sync, "done syncing dir for vol %s", 
this->hash_text.get());
 }

Reply via email to