This is an automated email from the ASF dual-hosted git repository.
jvanderzee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new f840387340 Refactor sync_cache_dir_on_shutdown (#10993)
f840387340 is described below
commit f84038734069a2a2562329fecabd575cd4f59009
Author: JosiahWI <[email protected]>
AuthorDate: Mon Jan 29 07:54:39 2024 -0600
Refactor sync_cache_dir_on_shutdown (#10993)
This moves the logic for syncing the stripe buffer to disk on shutdown into
the Stripe class, since that class manages the buffer. It also does away with
the temporary buffer used to write to disk, instead writing to disk directly
from the stripe's buffer. The temporary buffer may have had some performance
advantage or something some time ago. I don't see a reason for it now.
This also adds an is_empty() method to AggregateWriteBuffer, and uses it in
the new Stripe::shutdown method as well as a few other places where appropriate.
---
include/iocore/cache/AggregateWriteBuffer.h | 13 +++++
src/iocore/cache/CacheDir.cc | 74 +----------------------------
src/iocore/cache/CacheWrite.cc | 4 +-
src/iocore/cache/P_CacheVol.h | 17 ++++++-
src/iocore/cache/Stripe.cc | 52 ++++++++++++++++++++
5 files changed, 84 insertions(+), 76 deletions(-)
diff --git a/include/iocore/cache/AggregateWriteBuffer.h
b/include/iocore/cache/AggregateWriteBuffer.h
index 42febcf5f0..ebf36b79b1 100644
--- a/include/iocore/cache/AggregateWriteBuffer.h
+++ b/include/iocore/cache/AggregateWriteBuffer.h
@@ -53,6 +53,13 @@ public:
AggregateWriteBuffer(AggregateWriteBuffer &&other) = delete;
AggregateWriteBuffer &operator=(AggregateWriteBuffer &&other) = delete;
+ /**
+ * Check whether the internal buffer is empty.
+ *
+ * @return Returns true if the buffer is empty, otherwise false.
+ */
+ bool is_empty() const;
+
/**
* Flush the internal buffer to disk.
*
@@ -143,3 +150,9 @@ AggregateWriteBuffer::add_bytes_pending_aggregation(int
size)
{
this->_bytes_pending_aggregation += size;
}
+
+inline bool
+AggregateWriteBuffer::is_empty() const
+{
+ return this->_buffer_pos == 0;
+}
diff --git a/src/iocore/cache/CacheDir.cc b/src/iocore/cache/CacheDir.cc
index 63556cb7bf..4daa794f82 100644
--- a/src/iocore/cache/CacheDir.cc
+++ b/src/iocore/cache/CacheDir.cc
@@ -968,83 +968,11 @@ void
sync_cache_dir_on_shutdown()
{
Dbg(dbg_ctl_cache_dir_sync, "sync started");
- char *buf = nullptr;
- size_t buflen = 0;
- bool buf_huge = false;
-
EThread *t = (EThread *)0xdeadbeef;
for (int i = 0; i < gnstripes; i++) {
- // the process is going down, do a blocking call
- // dont release the volume's lock, there could
- // be another aggWrite in progress
- MUTEX_TAKE_LOCK(gstripes[i]->mutex, t);
- Stripe *stripe = gstripes[i];
-
- if (DISK_BAD(stripe->disk)) {
- Dbg(dbg_ctl_cache_dir_sync, "Dir %s: ignoring -- bad disk",
stripe->hash_text.get());
- continue;
- }
- size_t dirlen = stripe->dirlen();
- ink_assert(dirlen > 0); // make clang happy - if not > 0 the vol is
seriously messed up
- if (!stripe->header->dirty && !stripe->dir_sync_in_progress) {
- Dbg(dbg_ctl_cache_dir_sync, "Dir %s: ignoring -- not dirty",
stripe->hash_text.get());
- continue;
- }
- // recompute hit_evacuate_window
- stripe->hit_evacuate_window = (stripe->data_blocks *
cache_config_hit_evacuate_percent) / 100;
-
- // check if we have data in the agg buffer
- // dont worry about the cachevc s in the agg queue
- // directories have not been inserted for these writes
- if (stripe->get_agg_buf_pos()) {
- Dbg(dbg_ctl_cache_dir_sync, "Dir %s: flushing agg buffer first",
stripe->hash_text.get());
- stripe->flush_aggregate_write_buffer();
- }
-
- if (buflen < dirlen) {
- if (buf) {
- if (buf_huge) {
- ats_free_hugepage(buf, buflen);
- } else {
- ats_free(buf);
- }
- buf = nullptr;
- }
- buflen = dirlen;
- if (ats_hugepage_enabled()) {
- buf = static_cast<char *>(ats_alloc_hugepage(buflen));
- buf_huge = true;
- }
- if (buf == nullptr) {
- buf = static_cast<char *>(ats_memalign(ats_pagesize(), buflen));
- buf_huge = false;
- }
- }
-
- if (!stripe->dir_sync_in_progress) {
- stripe->header->sync_serial++;
- } else {
- Dbg(dbg_ctl_cache_dir_sync, "Periodic dir sync in progress --
overwriting");
- }
- stripe->footer->sync_serial = stripe->header->sync_serial;
-
- CHECK_DIR(d);
- memcpy(buf, stripe->raw_dir, dirlen);
- size_t B = stripe->header->sync_serial & 1;
- off_t start = stripe->skip + (B ? dirlen : 0);
- B = pwrite(stripe->fd, buf, dirlen, start);
- ink_assert(B == dirlen);
- Dbg(dbg_ctl_cache_dir_sync, "done syncing dir for vol %s",
stripe->hash_text.get());
+ gstripes[i]->shutdown(t);
}
Dbg(dbg_ctl_cache_dir_sync, "sync done");
- if (buf) {
- if (buf_huge) {
- ats_free_hugepage(buf, buflen);
- } else {
- ats_free(buf);
- }
- buf = nullptr;
- }
}
int
diff --git a/src/iocore/cache/CacheWrite.cc b/src/iocore/cache/CacheWrite.cc
index a52080982b..2e27e0f8eb 100644
--- a/src/iocore/cache/CacheWrite.cc
+++ b/src/iocore/cache/CacheWrite.cc
@@ -891,7 +891,7 @@ Lagain:
this->aggregate_pending_writes(tocall);
// if we got nothing...
- if (!this->_write_buffer.get_buffer_pos()) {
+ if (this->_write_buffer.is_empty()) {
if (!this->_write_buffer.get_pending_writers().head && !sync.head) { //
nothing to get
return EVENT_CONT;
}
@@ -932,7 +932,7 @@ Lagain:
}
// write sync marker
- if (!this->_write_buffer.get_buffer_pos()) {
+ if (this->_write_buffer.is_empty()) {
ink_assert(sync.head);
int l = round_to_approx_size(sizeof(Doc));
this->_write_buffer.seek(l);
diff --git a/src/iocore/cache/P_CacheVol.h b/src/iocore/cache/P_CacheVol.h
index 53155aacb3..ed0da0d5e6 100644
--- a/src/iocore/cache/P_CacheVol.h
+++ b/src/iocore/cache/P_CacheVol.h
@@ -28,6 +28,8 @@
#include "P_RamCache.h"
#include "iocore/cache/AggregateWriteBuffer.h"
+#include "iocore/eventsystem/EThread.h"
+
#include "tscore/CryptoHash.h"
#include <atomic>
@@ -298,7 +300,19 @@ public:
* @return: Returns true if the operation was successfull, otherwise false.
*/
bool add_writer(CacheVC *vc);
- bool flush_aggregate_write_buffer();
+
+ /**
+ * Sync the stripe meta data to memory for shutdown.
+ *
+ * This method MUST NOT be called during regular operation. The stripe
+ * will be locked for this operation, and will not be unlocked afterwards.
+ *
+ * The aggregate write buffer will be flushed before copying the stripe to
+ * disk. Pending writes will be ignored.
+ *
+ * @param shutdown_thread The EThread to lock the stripe on.
+ */
+ void shutdown(EThread *shutdown_thread);
/**
* Retrieve a document from the aggregate write buffer.
@@ -319,6 +333,7 @@ private:
void _init_dir();
void _init_data_internal();
void _init_data();
+ bool flush_aggregate_write_buffer();
AggregateWriteBuffer _write_buffer;
};
diff --git a/src/iocore/cache/Stripe.cc b/src/iocore/cache/Stripe.cc
index df56307c2d..da099e865b 100644
--- a/src/iocore/cache/Stripe.cc
+++ b/src/iocore/cache/Stripe.cc
@@ -26,14 +26,21 @@
#include "P_CacheInternal.h"
#include "P_CacheVol.h"
+#include "iocore/eventsystem/EThread.h"
+#include "iocore/eventsystem/Lock.h"
+
+#include "tsutil/DbgCtl.h"
+
#include "tscore/hugepages.h"
#include "tscore/ink_assert.h"
+#include "tscore/ink_memory.h"
#include <cstring>
namespace
{
+DbgCtl dbg_ctl_cache_dir_sync{"dir_sync"};
DbgCtl dbg_ctl_cache_init{"cache_init"};
// This is the oldest version number that is still usable.
@@ -948,6 +955,51 @@ Stripe::add_writer(CacheVC *vc)
return !agg_error;
}
+void
+Stripe::shutdown(EThread *shutdown_thread)
+{
+ // the process is going down, do a blocking call
+ // dont release the volume's lock, there could
+ // be another aggWrite in progress
+ MUTEX_TAKE_LOCK(this->mutex, shutdown_thread);
+
+ if (DISK_BAD(this->disk)) {
+ Dbg(dbg_ctl_cache_dir_sync, "Dir %s: ignoring -- bad disk",
this->hash_text.get());
+ return;
+ }
+ size_t dirlen = this->dirlen();
+ ink_assert(dirlen > 0); // make clang happy - if not > 0 the vol is
seriously messed up
+ if (!this->header->dirty && !this->dir_sync_in_progress) {
+ Dbg(dbg_ctl_cache_dir_sync, "Dir %s: ignoring -- not dirty",
this->hash_text.get());
+ return;
+ }
+ // recompute hit_evacuate_window
+ this->hit_evacuate_window = (this->data_blocks *
cache_config_hit_evacuate_percent) / 100;
+
+ // check if we have data in the agg buffer
+ // dont worry about the cachevc s in the agg queue
+ // directories have not been inserted for these writes
+ if (!this->_write_buffer.is_empty()) {
+ Dbg(dbg_ctl_cache_dir_sync, "Dir %s: flushing agg buffer first",
this->hash_text.get());
+ this->flush_aggregate_write_buffer();
+ }
+
+ // We already asserted that dirlen > 0.
+ if (!this->dir_sync_in_progress) {
+ this->header->sync_serial++;
+ } else {
+ Dbg(dbg_ctl_cache_dir_sync, "Periodic dir sync in progress --
overwriting");
+ }
+ this->footer->sync_serial = this->header->sync_serial;
+
+ CHECK_DIR(d);
+ size_t B = this->header->sync_serial & 1;
+ off_t start = this->skip + (B ? dirlen : 0);
+ B = pwrite(this->fd, this->raw_dir, dirlen, start);
+ ink_assert(B == dirlen);
+ Dbg(dbg_ctl_cache_dir_sync, "done syncing dir for vol %s",
this->hash_text.get());
+}
+
bool
Stripe::flush_aggregate_write_buffer()
{