This is an automated email from the ASF dual-hosted git repository.
jvanderzee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new f9b2b230ac Clean up the insertion of documents into the aggregate
write buffer (#11407)
f9b2b230ac is described below
commit f9b2b230ac359440b9a0ebff22293b4cd5b80e22
Author: JosiahWI <[email protected]>
AuthorDate: Thu Jun 13 07:26:25 2024 -0500
Clean up the insertion of documents into the aggregate write buffer (#11407)
* Add AggregateWriteBuffer::emplace
The new method creates a document in the buffer and updates the position
and bytes pending.
* Add AggregateWriteBuffer::add
The new method adds a document to the buffer and updates the position
and bytes pending.
* Move agg_copy to Stripe::agg_copy
* Do not touch buffer directly in agg_copy
This refactors agg_copy to use AggregateWriteBuffer::emplace and
AggregateWriteBuffer::add. This also gives control of updating
the buffer position to AggregateWriteBuffer, and removes the
AggregateWriteBuffer::add_buffer_pos method.
* Fix incorrect behavior of add
The add method was adding to the pending bytes instead of subtracting. This
was
noticed by Masaori Koshiba in review and not caught by any tests.
* Rename Stripe::agg_copy to Stripe::_agg_copy
* Add unit tests for AggregateWriteBuffer pending bytes
---
src/iocore/cache/AggregateWriteBuffer.cc | 23 +++++++++-
.../iocore/cache/AggregateWriteBuffer.h | 53 +++++++++++++++++++---
src/iocore/cache/CMakeLists.txt | 1 +
src/iocore/cache/CacheWrite.cc | 47 +++++++++----------
src/iocore/cache/P_CacheVol.h | 3 +-
.../cache/unit_tests/test_AggregateWriteBuffer.cc | 52 +++++++++++++++++++++
6 files changed, 145 insertions(+), 34 deletions(-)
diff --git a/src/iocore/cache/AggregateWriteBuffer.cc
b/src/iocore/cache/AggregateWriteBuffer.cc
index 486de3fa5e..11f233c1cd 100644
--- a/src/iocore/cache/AggregateWriteBuffer.cc
+++ b/src/iocore/cache/AggregateWriteBuffer.cc
@@ -21,7 +21,11 @@
limitations under the License.
*/
-#include "iocore/cache/AggregateWriteBuffer.h"
+#include "P_CacheInternal.h"
+#include "P_CacheDir.h"
+#include "P_CacheDoc.h"
+#include "AggregateWriteBuffer.h"
+#include "iocore/cache/CacheDefs.h"
#include "iocore/aio/AIO_fault_injection.h"
@@ -30,6 +34,23 @@
#include <cstring>
+void
+AggregateWriteBuffer::add(Doc const *doc, int approx_size)
+{
+ std::memcpy(this->_buffer + this->_buffer_pos, doc, doc->len);
+ this->_buffer_pos += approx_size;
+ this->add_bytes_pending_aggregation(-approx_size);
+}
+
+Doc *
+AggregateWriteBuffer::emplace(int approx_size)
+{
+ Doc *result{new (this->_buffer + this->_buffer_pos) Doc};
+ this->_buffer_pos += approx_size;
+ this->add_bytes_pending_aggregation(-approx_size);
+ return result;
+}
+
bool
AggregateWriteBuffer::flush(int fd, off_t write_pos) const
{
diff --git a/include/iocore/cache/AggregateWriteBuffer.h
b/src/iocore/cache/AggregateWriteBuffer.h
similarity index 69%
rename from include/iocore/cache/AggregateWriteBuffer.h
rename to src/iocore/cache/AggregateWriteBuffer.h
index 7b3da69e5d..ad99b03ce0 100644
--- a/include/iocore/cache/AggregateWriteBuffer.h
+++ b/src/iocore/cache/AggregateWriteBuffer.h
@@ -23,6 +23,8 @@
#pragma once
+#include "P_CacheDoc.h"
+
#include "iocore/eventsystem/Continuation.h"
#include "tscore/ink_memory.h"
@@ -60,6 +62,50 @@ public:
*/
bool is_empty() const;
+ /**
+ * Add a new document to the buffer.
+ *
+ * This method copies the provided document into the buffer.
+ *
+ * This method may only be called if there is space at the current
+ * buffer position for the document, and the document has a correct len
+ * field. Use reset_buffer_pos to reset to the beginning of the buffer
+ * when it gets full. If this condition is not met, the new document may
+ * overrun the buffer.
+ *
+ * The buffer position will be updated to the end of the document's data
+ * and approx_size will be subtracted from the bytes pending aggregation.
+ *
+ * @param document: A pointer to the document to add to the buffer. It must
+ * have a correct len field, and its headers and data must follow it. This
+ * requires a pointer to a full document buffer - not just the Doc struct.
+ * @param approx_size The approximate size of all headers and data as
+ * determined by Stripe::round_to_approx_size. The document may not need
+ * this much space.
+ *
+ */
+ void add(Doc const *document, int approx_size);
+
+ /**
+ * Create a new document in the buffer.
+ *
+ * This method may only be called if there is space at the current
+ * buffer position for the document. Use reset_buffer_pos to reset
+ * to the beginning of the buffer when it gets full. If this
+ * condition is not met, the new document may overrun the buffer.
+ *
+ * The buffer position will be updated to the end of the document's data
+ * and approx_size will be subtracted from the bytes pending aggregation.
+ *
+ * The new document will be uninitialized.
+ *
+ * @param approx_size The approximate size of all headers and data as
+ * determined by Stripe::round_to_approx_size. The document may not need
+ * this much space.
+ * @return Returns a non-owning pointer to the new document.
+ */
+ Doc *emplace(int approx_size);
+
/**
* Flush the internal buffer to disk.
*
@@ -90,7 +136,6 @@ public:
Queue<CacheVC, Continuation::Link_link> &get_pending_writers();
char *get_buffer();
int get_buffer_pos() const;
- void add_buffer_pos(int size);
void seek(int offset);
void reset_buffer_pos();
int get_bytes_pending_aggregation()
const;
@@ -121,12 +166,6 @@ AggregateWriteBuffer::get_buffer_pos() const
return this->_buffer_pos;
}
-inline void
-AggregateWriteBuffer::add_buffer_pos(int size)
-{
- this->_buffer_pos += size;
-}
-
inline void
AggregateWriteBuffer::seek(int offset)
{
diff --git a/src/iocore/cache/CMakeLists.txt b/src/iocore/cache/CMakeLists.txt
index 68d10ac594..a77b47b38d 100644
--- a/src/iocore/cache/CMakeLists.txt
+++ b/src/iocore/cache/CMakeLists.txt
@@ -86,6 +86,7 @@ if(BUILD_TESTING)
add_cache_test(Update_S_to_L unit_tests/test_Update_S_to_L.cc)
add_cache_test(Update_Header unit_tests/test_Update_header.cc)
add_cache_test(CacheStripe unit_tests/test_Stripe.cc)
+ add_cache_test(CacheAggregateWriteBuffer
unit_tests/test_AggregateWriteBuffer.cc)
endif()
diff --git a/src/iocore/cache/CacheWrite.cc b/src/iocore/cache/CacheWrite.cc
index 80c74f59e0..dec0da4687 100644
--- a/src/iocore/cache/CacheWrite.cc
+++ b/src/iocore/cache/CacheWrite.cc
@@ -23,7 +23,7 @@
#include "P_Cache.h"
#include "P_CacheDoc.h"
-#include "iocore/cache/AggregateWriteBuffer.h"
+#include "AggregateWriteBuffer.h"
#include "CacheEvacuateDocVC.h"
// These macros allow two incrementing unsigned values x and y to maintain
@@ -638,24 +638,23 @@ Stripe::evac_range(off_t low, off_t high, int evac_phase)
return 0;
}
-static int
-agg_copy(char *p, CacheVC *vc)
+int
+Stripe::_agg_copy(CacheVC *vc)
{
- Stripe *stripe = vc->stripe;
- off_t o = stripe->header->write_pos + stripe->get_agg_buf_pos();
+ off_t o = this->header->write_pos + this->get_agg_buf_pos();
if (!vc->f.evacuator) {
- Doc *doc = reinterpret_cast<Doc *>(p);
+ uint32_t len = vc->write_len + vc->header_len + vc->frag_len
+ sizeof(Doc);
+ Doc *doc =
this->_write_buffer.emplace(this->round_to_approx_size(len));
IOBufferBlock *res_alt_blk = nullptr;
- uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeof(Doc);
ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeof(Doc));
- ink_assert(stripe->round_to_approx_size(len) == vc->agg_len);
+ ink_assert(this->round_to_approx_size(len) == vc->agg_len);
// update copy of directory entry for this document
dir_set_approx_size(&vc->dir, vc->agg_len);
- dir_set_offset(&vc->dir, stripe->offset_to_vol_offset(o));
- ink_assert(stripe->vol_offset(&vc->dir) < (stripe->skip + stripe->len));
- dir_set_phase(&vc->dir, stripe->header->phase);
+ dir_set_offset(&vc->dir, this->offset_to_vol_offset(o));
+ ink_assert(this->vol_offset(&vc->dir) < (this->skip + this->len));
+ dir_set_phase(&vc->dir, this->header->phase);
// fill in document header
doc->magic = DOC_MAGIC;
@@ -667,8 +666,8 @@ agg_copy(char *p, CacheVC *vc)
doc->unused = 0; // force this for forward compatibility.
doc->total_len = vc->total_len;
doc->first_key = vc->first_key;
- doc->sync_serial = stripe->header->sync_serial;
- vc->write_serial = doc->write_serial = stripe->header->write_serial;
+ doc->sync_serial = this->header->sync_serial;
+ vc->write_serial = doc->write_serial = this->header->write_serial;
doc->checksum = DOC_NO_CHECKSUM;
if (vc->pin_in_cache) {
dir_set_pinned(&vc->dir, 1);
@@ -732,13 +731,13 @@ agg_copy(char *p, CacheVC *vc)
// move data
if (vc->write_len) {
{
- ProxyMutex *mutex ATS_UNUSED = vc->stripe->mutex.get();
+ ProxyMutex *mutex ATS_UNUSED = this->mutex.get();
ink_assert(mutex->thread_holding == this_ethread());
// ToDo: Why are these for debug only ?
#ifdef DEBUG
Metrics::Counter::increment(cache_rsb.write_backlog_failure);
-
Metrics::Counter::increment(stripe->cache_vol->vol_rsb.write_backlog_failure);
+
Metrics::Counter::increment(this->cache_vol->vol_rsb.write_backlog_failure);
#endif
}
if (vc->f.rewrite_resident_alt) {
@@ -776,21 +775,21 @@ agg_copy(char *p, CacheVC *vc)
} else {
// for evacuated documents, copy the data, and update directory
Doc *doc = reinterpret_cast<Doc *>(vc->buf->data());
- int l = vc->stripe->round_to_approx_size(doc->len);
+ int l = this->round_to_approx_size(doc->len);
#ifdef DEBUG
Metrics::Counter::increment(cache_rsb.gc_frags_evacuated);
- Metrics::Counter::increment(stripe->cache_vol->vol_rsb.gc_frags_evacuated);
+ Metrics::Counter::increment(this->cache_vol->vol_rsb.gc_frags_evacuated);
#endif
- doc->sync_serial = vc->stripe->header->sync_serial;
- doc->write_serial = vc->stripe->header->write_serial;
+ doc->sync_serial = this->header->sync_serial;
+ doc->write_serial = this->header->write_serial;
- memcpy(p, doc, doc->len);
+ this->_write_buffer.add(doc, l);
vc->dir = vc->overwrite_dir;
- dir_set_offset(&vc->dir, vc->stripe->offset_to_vol_offset(o));
- dir_set_phase(&vc->dir, vc->stripe->header->phase);
+ dir_set_offset(&vc->dir, this->offset_to_vol_offset(o));
+ dir_set_phase(&vc->dir, this->header->phase);
return l;
}
}
@@ -990,10 +989,8 @@ Stripe::aggregate_pending_writes(Queue<CacheVC,
Continuation::Link_link> &tocall
}
DDbg(dbg_ctl_agg_read, "copying: %d, %" PRIu64 ", key: %d",
this->_write_buffer.get_buffer_pos(),
this->header->write_pos + this->_write_buffer.get_buffer_pos(),
c->first_key.slice32(0));
- int wrotelen = agg_copy(this->_write_buffer.get_buffer() +
this->_write_buffer.get_buffer_pos(), c);
+ [[maybe_unused]] int wrotelen = this->_agg_copy(c);
ink_assert(writelen == wrotelen);
- this->_write_buffer.add_bytes_pending_aggregation(-writelen);
- this->_write_buffer.add_buffer_pos(writelen);
CacheVC *n = (CacheVC *)c->link.next;
this->_write_buffer.get_pending_writers().dequeue();
if (c->f.sync && c->f.use_first_key) {
diff --git a/src/iocore/cache/P_CacheVol.h b/src/iocore/cache/P_CacheVol.h
index 3b01a87f69..9ef1397be7 100644
--- a/src/iocore/cache/P_CacheVol.h
+++ b/src/iocore/cache/P_CacheVol.h
@@ -27,7 +27,7 @@
#include "P_CacheDoc.h"
#include "P_CacheStats.h"
#include "P_RamCache.h"
-#include "iocore/cache/AggregateWriteBuffer.h"
+#include "AggregateWriteBuffer.h"
#include "iocore/eventsystem/EThread.h"
@@ -328,6 +328,7 @@ private:
void _init_dir();
void _init_data_internal();
void _init_data();
+ int _agg_copy(CacheVC *vc);
bool flush_aggregate_write_buffer();
AggregateWriteBuffer _write_buffer;
diff --git a/src/iocore/cache/unit_tests/test_AggregateWriteBuffer.cc
b/src/iocore/cache/unit_tests/test_AggregateWriteBuffer.cc
new file mode 100644
index 0000000000..7c97197d53
--- /dev/null
+++ b/src/iocore/cache/unit_tests/test_AggregateWriteBuffer.cc
@@ -0,0 +1,52 @@
+/** @file
+
+ Unit tests for AggregateWriteBuffer
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#include "main.h"
+
+int cache_vols = 1;
+bool reuse_existing_cache = false;
+
+// This is a regression test for a bug caught in review. The RegressionSM
+// suite did not catch it. Issues related to this would manifest only after
+// the cache wraps around, because add() is only used by evacuators.
+TEST_CASE("Given 10 bytes are pending to the buffer, "
+ "when we add a document with an approximate size of 10, "
+ "then there should be 0 bytes pending.")
+{
+ AggregateWriteBuffer write_buffer;
+ Doc doc;
+ doc.len = sizeof(Doc);
+ write_buffer.add_bytes_pending_aggregation(10);
+ write_buffer.add(&doc, 10);
+ CHECK(0 == write_buffer.get_bytes_pending_aggregation());
+}
+
+TEST_CASE("Given 10 bytes are pending to the buffer, "
+ "when we emplace a document with an approximate size of 10, "
+ "then there should be 0 bytes pending.")
+{
+ AggregateWriteBuffer write_buffer;
+ write_buffer.add_bytes_pending_aggregation(10);
+ write_buffer.emplace(10);
+ CHECK(0 == write_buffer.get_bytes_pending_aggregation());
+}