This is an automated email from the ASF dual-hosted git repository.
jvanderzee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new a74f2b25f3 Move some logic from CacheVC to Stripe and
AggregateWriteBuffer (#10987)
a74f2b25f3 is described below
commit a74f2b25f3abed1f4745fab3ad11545f05f52b53
Author: JosiahWI <[email protected]>
AuthorDate: Wed Jan 17 17:27:44 2024 -0600
Move some logic from CacheVC to Stripe and AggregateWriteBuffer (#10987)
This breaks up some of handleRead into smaller methods and moves the logic
for copying out of the aggregate write buffer to the classes responsible for
that buffer.
There was a check to make sure that data was only copied from the aggregate
write buffer if the requested data was available. I left the old check because
it avoids allocating a new buffer if the data is unavailable, and I also
duplicated it in Stripe::copy_from_aggregate_write_buffer for safety. I thought
it would be better to have it in and take it out if it negatively impacts
performance than not to have it and find out it would have been useful. I have
not benchmarked this.
The order of the parameters of Stripe::copy_from_aggregate_write_buffer and
AggregateWriteBuffer::copy_from was chosen to reflect the order of the memcpy
parameters. This does not mean I like the way memcpy chose to do it, I just
thought it might help consistency. :)
This is part of a chain of PRs to remove all the getters I added to Stripe,
in order to encapsulate the aggregate write buffer properly. This PR removes
Stripe::get_agg_buffer.
* Extract CacheVC::check_ram_cache from handleRead
* Extract check_last_open_read_call from handleRead
* Extract check_aggregation_buffer from handleRead
* Invert check_aggregation_buffer conditional logic
* Reorder buffer creation and agg_offset calculation
* Extract Stripe::copy_from_aggregate_write_buffer
* Extract AggregateWriteBuffer::copy_from
* Fix naming: check -> load_from
This fixes the names of the check_ CacheVC methods to reflect what they
actually do, which is to load from various caches into the CacheVC
buffer.
* Fix assertion: assert -> ink_assert
* Mark methods as const where appropriate
---
include/iocore/cache/AggregateWriteBuffer.h | 11 ++++++
include/iocore/cache/CacheVC.h | 3 ++
src/iocore/cache/AggregateWriteBuffer.cc | 9 +++++
src/iocore/cache/CacheVC.cc | 56 +++++++++++++++++++----------
src/iocore/cache/P_CacheVol.h | 21 +++++++----
src/iocore/cache/Stripe.cc | 15 ++++++++
6 files changed, 89 insertions(+), 26 deletions(-)
diff --git a/include/iocore/cache/AggregateWriteBuffer.h
b/include/iocore/cache/AggregateWriteBuffer.h
index 1e0456b204..42febcf5f0 100644
--- a/include/iocore/cache/AggregateWriteBuffer.h
+++ b/include/iocore/cache/AggregateWriteBuffer.h
@@ -69,6 +69,17 @@ public:
*/
bool flush(int fd, off_t write_pos) const;
+ /**
+ * Copy part of the buffer.
+ *
+ * The range of bytes to copy must fit within the written buffer.
+ *
+ * @param dest: The destination buffer.
+ * @param offset: Byte offset to begin copying at.
+ * @param nbytes: Number of bytes to copy.
+ */
+ void copy_from(char *dest, int offset, size_t nbytes) const;
+
Queue<CacheVC, Continuation::Link_link> &get_pending_writers();
char *get_buffer();
int get_buffer_pos() const;
diff --git a/include/iocore/cache/CacheVC.h b/include/iocore/cache/CacheVC.h
index 35bc895f3e..540f027c14 100644
--- a/include/iocore/cache/CacheVC.h
+++ b/include/iocore/cache/CacheVC.h
@@ -141,6 +141,9 @@ struct CacheVC : public CacheVConnection {
int handleReadDone(int event, Event *e);
int handleRead(int event, Event *e);
+ bool load_from_ram_cache();
+ bool load_from_last_open_read_call();
+ bool load_from_aggregation_buffer();
int do_read_call(CacheKey *akey);
int handleWrite(int event, Event *e);
int handleWriteLock(int event, Event *e);
diff --git a/src/iocore/cache/AggregateWriteBuffer.cc
b/src/iocore/cache/AggregateWriteBuffer.cc
index 52e96c667d..486de3fa5e 100644
--- a/src/iocore/cache/AggregateWriteBuffer.cc
+++ b/src/iocore/cache/AggregateWriteBuffer.cc
@@ -28,6 +28,8 @@
#include "tscore/ink_assert.h"
#include "tscore/ink_platform.h"
+#include <cstring>
+
bool
AggregateWriteBuffer::flush(int fd, off_t write_pos) const
{
@@ -38,3 +40,10 @@ AggregateWriteBuffer::flush(int fd, off_t write_pos) const
}
return true;
}
+
+void
+AggregateWriteBuffer::copy_from(char *dest, int offset, size_t nbytes) const
+{
+ ink_assert((offset + nbytes) <= (unsigned)this->_buffer_pos);
+ memcpy(dest, this->_buffer + offset, nbytes);
+}
diff --git a/src/iocore/cache/CacheVC.cc b/src/iocore/cache/CacheVC.cc
index 23c579e0ba..5f2b5fba45 100644
--- a/src/iocore/cache/CacheVC.cc
+++ b/src/iocore/cache/CacheVC.cc
@@ -472,28 +472,12 @@ CacheVC::handleRead(int /* event ATS_UNUSED */, Event *
/* e ATS_UNUSED */)
f.doc_from_ram_cache = false;
- // check ram cache
ink_assert(stripe->mutex->thread_holding == this_ethread());
- int64_t o = dir_offset(&dir);
- int ram_hit_state = stripe->ram_cache->get(read_key, &buf,
static_cast<uint64_t>(o));
- f.compressed_in_ram = (ram_hit_state > RAM_HIT_COMPRESS_NONE) ? 1 : 0;
- if (ram_hit_state >= RAM_HIT_COMPRESS_NONE) {
+ if (load_from_ram_cache()) {
goto LramHit;
- }
-
- // check if it was read in the last open_read call
- if (*read_key == stripe->first_fragment_key && dir_offset(&dir) ==
stripe->first_fragment_offset) {
- buf = stripe->first_fragment_data;
+ } else if (load_from_last_open_read_call()) {
goto LmemHit;
- }
- // see if its in the aggregation buffer
- if (dir_agg_buf_valid(stripe, &dir)) {
- int agg_offset = stripe->vol_offset(&dir) - stripe->header->write_pos;
- buf =
new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes,
MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
- ink_assert((agg_offset + io.aiocb.aio_nbytes) <=
(unsigned)stripe->get_agg_buf_pos());
- char *doc = buf->data();
- char *agg = stripe->get_agg_buffer() + agg_offset;
- memcpy(doc, agg, io.aiocb.aio_nbytes);
+ } else if (load_from_aggregation_buffer()) {
io.aio_result = io.aiocb.aio_nbytes;
SET_HANDLER(&CacheVC::handleReadDone);
return EVENT_RETURN;
@@ -535,6 +519,40 @@ LmemHit:
return EVENT_RETURN; // allow the caller to release the volume lock
}
+bool
+CacheVC::load_from_ram_cache()
+{
+ int64_t o = dir_offset(&this->dir);
+ int ram_hit_state = this->stripe->ram_cache->get(read_key, &this->buf,
static_cast<uint64_t>(o));
+ f.compressed_in_ram = (ram_hit_state > RAM_HIT_COMPRESS_NONE) ? 1 : 0;
+ return ram_hit_state >= RAM_HIT_COMPRESS_NONE;
+}
+
+bool
+CacheVC::load_from_last_open_read_call()
+{
+ if (*this->read_key == this->stripe->first_fragment_key &&
dir_offset(&this->dir) == this->stripe->first_fragment_offset) {
+ this->buf = this->stripe->first_fragment_data;
+ return true;
+ }
+ return false;
+}
+
+bool
+CacheVC::load_from_aggregation_buffer()
+{
+ if (!dir_agg_buf_valid(this->stripe, &this->dir)) {
+ return false;
+ }
+
+ this->buf =
new_IOBufferData(iobuffer_size_to_index(this->io.aiocb.aio_nbytes,
MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
+ char *doc = this->buf->data();
+ [[maybe_unused]] bool success =
this->stripe->copy_from_aggregate_write_buffer(doc, dir,
this->io.aiocb.aio_nbytes);
+ // We already confirmed that the copy was valid, so it should not fail.
+ ink_assert(success);
+ return true;
+}
+
int
CacheVC::removeEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
diff --git a/src/iocore/cache/P_CacheVol.h b/src/iocore/cache/P_CacheVol.h
index 3d4d487c27..53155aacb3 100644
--- a/src/iocore/cache/P_CacheVol.h
+++ b/src/iocore/cache/P_CacheVol.h
@@ -277,7 +277,6 @@ public:
}
Queue<CacheVC, Continuation::Link_link> &get_pending_writers();
- char *get_agg_buffer();
int get_agg_buf_pos() const;
int get_agg_todo_size() const;
@@ -301,6 +300,20 @@ public:
bool add_writer(CacheVC *vc);
bool flush_aggregate_write_buffer();
+ /**
+ * Retrieve a document from the aggregate write buffer.
+ *
+ * This is used to speed up reads by copying from the in-memory write buffer
+ * instead of reading from disk. If the document is not in the write buffer,
+ * nothing will be copied.
+ *
+ * @param dir: The directory entry for the desired document.
+ * @param dest: The destination buffer where the document will be copied to.
+ * @param nbytes: The size of the document (number of bytes to copy).
+ * @return Returns true if the document was copied, false otherwise.
+ */
+ bool copy_from_aggregate_write_buffer(char *dest, Dir &dir, size_t nbytes)
const;
+
private:
void _clear_init();
void _init_dir();
@@ -585,12 +598,6 @@ Stripe::get_pending_writers()
return this->_write_buffer.get_pending_writers();
}
-inline char *
-Stripe::get_agg_buffer()
-{
- return this->_write_buffer.get_buffer();
-}
-
inline int
Stripe::get_agg_buf_pos() const
{
diff --git a/src/iocore/cache/Stripe.cc b/src/iocore/cache/Stripe.cc
index 85ab5da3e2..df56307c2d 100644
--- a/src/iocore/cache/Stripe.cc
+++ b/src/iocore/cache/Stripe.cc
@@ -27,6 +27,9 @@
#include "P_CacheVol.h"
#include "tscore/hugepages.h"
+#include "tscore/ink_assert.h"
+
+#include <cstring>
namespace
{
@@ -962,3 +965,15 @@ Stripe::flush_aggregate_write_buffer()
return true;
}
+
+bool
+Stripe::copy_from_aggregate_write_buffer(char *dest, Dir &dir, size_t nbytes)
const
+{
+ if (!dir_agg_buf_valid(this, &dir)) {
+ return false;
+ }
+
+ int agg_offset = this->vol_offset(&dir) - this->header->write_pos;
+ this->_write_buffer.copy_from(dest, agg_offset, nbytes);
+ return true;
+}