This is an automated email from the ASF dual-hosted git repository.
jvanderzee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 6e08439c9e Clean up agg_copy (#11456)
6e08439c9e is described below
commit 6e08439c9ea79e9da3ed8b90c108348aeb85e890
Author: JosiahWI <[email protected]>
AuthorDate: Fri Jul 12 05:21:01 2024 -0500
Clean up agg_copy (#11456)
* Extract Stripe::_copy_evacuator_to_aggregation
* Extract Stripe::_copy_writer_to_aggregation
* Put positive condition first
* Extract update_header_info
* Extract Doc::set_data
* Extract Doc::calculate_checksum
* Extract update_document_key
* Extract init_document
* Extract Doc::pin and Doc::unpin
* Constify block parameter to Doc::set_data
* Remove unnecessary block
* Fix rebase error
* Fix rebase error
* Implement suggestions by Masaori Koshiba
* Elide temporary mutex variable for `ink_assert`
It's obvious from the context of the method calls that the variable holds
the raw stripe mutex. The temporary variable clarifies nothing.
* Move the new `Doc` methods to a source file
This adds the CacheDoc.cc file. If some of the methods such as `pin` and
`unpin` cause a performance problem by not being inlined, we can easily
move them back to the header. For now, keeping the implementation in a
source file helps speed up compile times and limits dependency scope.
---
src/iocore/cache/CMakeLists.txt | 1 +
src/iocore/cache/CacheDoc.cc | 90 ++++++++++++++++++++++
src/iocore/cache/CacheWrite.cc | 163 +---------------------------------------
src/iocore/cache/P_CacheDoc.h | 6 ++
src/iocore/cache/P_CacheVol.h | 2 +
src/iocore/cache/Stripe.cc | 161 ++++++++++++++++++++++++++++++++++++++-
6 files changed, 262 insertions(+), 161 deletions(-)
diff --git a/src/iocore/cache/CMakeLists.txt b/src/iocore/cache/CMakeLists.txt
index a77b47b38d..3e0e83b9b1 100644
--- a/src/iocore/cache/CMakeLists.txt
+++ b/src/iocore/cache/CMakeLists.txt
@@ -21,6 +21,7 @@ add_library(
Cache.cc
CacheDir.cc
CacheDisk.cc
+ CacheDoc.cc
CacheEvacuateDocVC.cc
CacheHosting.cc
CacheHttp.cc
diff --git a/src/iocore/cache/CacheDoc.cc b/src/iocore/cache/CacheDoc.cc
new file mode 100644
index 0000000000..0a79651f24
--- /dev/null
+++ b/src/iocore/cache/CacheDoc.cc
@@ -0,0 +1,90 @@
+/** @file
+
+ Operations on cache documents (may also be called fragments).
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#include "P_CacheDoc.h"
+
+#include "iocore/eventsystem/IOBuffer.h"
+
+#include "tscore/ink_hrtime.h"
+
+#include <cstring>
+
+namespace
+{
+
+char *
+iobufferblock_memcpy(char *p, int len, IOBufferBlock const *ab, int offset)
+{
+ IOBufferBlock const *b = ab;
+ while (b && len >= 0) {
+ char *start = b->_start;
+ char *end = b->_end;
+ int max_bytes = end - start;
+ max_bytes -= offset;
+ if (max_bytes <= 0) {
+ offset = -max_bytes;
+ b = b->next.get();
+ continue;
+ }
+ int bytes = len;
+ if (bytes >= max_bytes) {
+ bytes = max_bytes;
+ }
+ ::memcpy(p, start + offset, bytes);
+ p += bytes;
+ len -= bytes;
+ b = b->next.get();
+ offset = 0;
+ }
+ return p;
+}
+
+} // namespace
+
+void
+Doc::set_data(int const len, IOBufferBlock const *block, int const offset)
+{
+ iobufferblock_memcpy(this->data(), len, block, offset);
+}
+
+void
+Doc::calculate_checksum()
+{
+ this->checksum = 0;
+ for (char *b = this->hdr(); b < reinterpret_cast<char *>(this) + this->len;
b++) {
+ this->checksum += *b;
+ }
+}
+
+void
+Doc::pin(std::uint32_t const pin_in_cache)
+{
+ // coverity[Y2K38_SAFETY:FALSE]
+ this->pinned = static_cast<uint32_t>(ink_get_hrtime() / HRTIME_SECOND) +
pin_in_cache;
+}
+
+void
+Doc::unpin()
+{
+ this->pinned = 0;
+}
diff --git a/src/iocore/cache/CacheWrite.cc b/src/iocore/cache/CacheWrite.cc
index bfd60326b3..f947bacbfd 100644
--- a/src/iocore/cache/CacheWrite.cc
+++ b/src/iocore/cache/CacheWrite.cc
@@ -273,33 +273,6 @@ CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */)
return EVENT_CONT;
}
-static char *
-iobufferblock_memcpy(char *p, int len, IOBufferBlock *ab, int offset)
-{
- IOBufferBlock *b = ab;
- while (b && len >= 0) {
- char *start = b->_start;
- char *end = b->_end;
- int max_bytes = end - start;
- max_bytes -= offset;
- if (max_bytes <= 0) {
- offset = -max_bytes;
- b = b->next.get();
- continue;
- }
- int bytes = len;
- if (bytes >= max_bytes) {
- bytes = max_bytes;
- }
- ::memcpy(p, start + offset, bytes);
- p += bytes;
- len -= bytes;
- b = b->next.get();
- offset = 0;
- }
- return p;
-}
-
EvacuationBlock *
Stripe::force_evacuate_head(Dir const *evac_dir, int pinned)
{
@@ -641,140 +614,10 @@ Stripe::evac_range(off_t low, off_t high, int evac_phase)
int
Stripe::_agg_copy(CacheVC *vc)
{
- off_t o = this->header->write_pos + this->get_agg_buf_pos();
-
- if (!vc->f.evacuator) {
- uint32_t len = vc->write_len + vc->header_len + vc->frag_len
+ sizeof(Doc);
- Doc *doc =
this->_write_buffer.emplace(this->round_to_approx_size(len));
- IOBufferBlock *res_alt_blk = nullptr;
-
- ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeof(Doc));
- ink_assert(this->round_to_approx_size(len) == vc->agg_len);
- // update copy of directory entry for this document
- dir_set_approx_size(&vc->dir, vc->agg_len);
- dir_set_offset(&vc->dir, this->offset_to_vol_offset(o));
- ink_assert(this->vol_offset(&vc->dir) < (this->skip + this->len));
- dir_set_phase(&vc->dir, this->header->phase);
-
- // fill in document header
- doc->magic = DOC_MAGIC;
- doc->len = len;
- doc->hlen = vc->header_len;
- doc->doc_type = vc->frag_type;
- doc->v_major = CACHE_DB_MAJOR_VERSION;
- doc->v_minor = CACHE_DB_MINOR_VERSION;
- doc->unused = 0; // force this for forward compatibility.
- doc->total_len = vc->total_len;
- doc->first_key = vc->first_key;
- doc->sync_serial = this->header->sync_serial;
- vc->write_serial = doc->write_serial = this->header->write_serial;
- doc->checksum = DOC_NO_CHECKSUM;
- if (vc->pin_in_cache) {
- dir_set_pinned(&vc->dir, 1);
- // coverity[Y2K38_SAFETY:FALSE]
- doc->pinned = static_cast<uint32_t>(ink_get_hrtime() / HRTIME_SECOND) +
vc->pin_in_cache;
- } else {
- dir_set_pinned(&vc->dir, 0);
- doc->pinned = 0;
- }
-
- if (vc->f.use_first_key) {
- if (doc->data_len() || vc->f.allow_empty_doc) {
- doc->key = vc->earliest_key;
- } else { // the vector is being written by itself
- if (vc->earliest_key.is_zero()) {
- do {
- rand_CacheKey(&doc->key);
- } while (DIR_MASK_TAG(doc->key.slice32(2)) ==
DIR_MASK_TAG(vc->first_key.slice32(2)));
- } else {
- prev_CacheKey(&doc->key, &vc->earliest_key);
- }
- }
- dir_set_head(&vc->dir, true);
- } else {
- doc->key = vc->key;
- dir_set_head(&vc->dir, !vc->fragment);
- }
-
- if (vc->f.rewrite_resident_alt) {
- ink_assert(vc->f.use_first_key);
- Doc *res_doc = reinterpret_cast<Doc *>(vc->first_buf->data());
- res_alt_blk = new_IOBufferBlock(vc->first_buf, res_doc->data_len(),
sizeof(Doc) + res_doc->hlen);
- doc->key = res_doc->key;
- doc->total_len = res_doc->data_len();
- }
- // update the new_info object_key, and total_len and dirinfo
- if (vc->header_len) {
- ink_assert(vc->f.use_first_key);
- if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
- ink_assert(vc->write_vector->count() > 0);
- if (!vc->f.update && !vc->f.evac_vector) {
- ink_assert(!(vc->first_key.is_zero()));
- CacheHTTPInfo *http_info =
vc->write_vector->get(vc->alternate_index);
- http_info->object_size_set(vc->total_len);
- }
- // update + data_written => Update case (b)
- // need to change the old alternate's object length
- if (vc->f.update && vc->total_len) {
- CacheHTTPInfo *http_info =
vc->write_vector->get(vc->alternate_index);
- http_info->object_size_set(vc->total_len);
- }
- ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
- ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(),
vc->header_len));
- } else {
- memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
- }
- // the single fragment flag is not used in the write call.
- // putting it in for completeness.
- vc->f.single_fragment = doc->single_fragment();
- }
- // move data
- if (vc->write_len) {
- {
- ProxyMutex *mutex ATS_UNUSED = this->mutex.get();
- ink_assert(mutex->thread_holding == this_ethread());
-
- Metrics::Counter::increment(cache_rsb.write_bytes, vc->write_len);
- Metrics::Counter::increment(this->cache_vol->vol_rsb.write_bytes,
vc->write_len);
- }
- if (vc->f.rewrite_resident_alt) {
- iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0);
- } else {
- iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks.get(),
vc->offset);
- }
- }
- if (cache_config_enable_checksum) {
- doc->checksum = 0;
- for (char *b = doc->hdr(); b < reinterpret_cast<char *>(doc) + doc->len;
b++) {
- doc->checksum += *b;
- }
- }
- if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment) {
- ink_assert(doc->hlen);
- }
-
- if (res_alt_blk) {
- res_alt_blk->free();
- }
-
- return vc->agg_len;
+ if (vc->f.evacuator) {
+ return this->_copy_evacuator_to_aggregation(vc);
} else {
- // for evacuated documents, copy the data, and update directory
- Doc *doc = reinterpret_cast<Doc *>(vc->buf->data());
- int l = this->round_to_approx_size(doc->len);
-
- Metrics::Counter::increment(cache_rsb.gc_frags_evacuated);
- Metrics::Counter::increment(this->cache_vol->vol_rsb.gc_frags_evacuated);
-
- doc->sync_serial = this->header->sync_serial;
- doc->write_serial = this->header->write_serial;
-
- this->_write_buffer.add(doc, l);
-
- vc->dir = vc->overwrite_dir;
- dir_set_offset(&vc->dir, this->offset_to_vol_offset(o));
- dir_set_phase(&vc->dir, this->header->phase);
- return l;
+ return this->_copy_writer_to_aggregation(vc);
}
}
diff --git a/src/iocore/cache/P_CacheDoc.h b/src/iocore/cache/P_CacheDoc.h
index d1c35063a1..d501ca5308 100644
--- a/src/iocore/cache/P_CacheDoc.h
+++ b/src/iocore/cache/P_CacheDoc.h
@@ -23,6 +23,8 @@
#pragma once
+#include "iocore/eventsystem/IOBuffer.h"
+
#include "tscore/CryptoHash.h"
#include <cstdint>
@@ -63,6 +65,10 @@ struct Doc {
int single_fragment() const;
char *hdr();
char *data();
+ void set_data(int len, IOBufferBlock const *block, int offset);
+ void calculate_checksum();
+ void pin(std::uint32_t const pin_in_cache);
+ void unpin();
using self_type = Doc;
};
diff --git a/src/iocore/cache/P_CacheVol.h b/src/iocore/cache/P_CacheVol.h
index aa3a5a8614..2f1e8377a8 100644
--- a/src/iocore/cache/P_CacheVol.h
+++ b/src/iocore/cache/P_CacheVol.h
@@ -330,6 +330,8 @@ private:
void _init_data_internal();
void _init_data();
int _agg_copy(CacheVC *vc);
+ int _copy_writer_to_aggregation(CacheVC *vc);
+ int _copy_evacuator_to_aggregation(CacheVC *vc);
bool flush_aggregate_write_buffer();
AggregateWriteBuffer _write_buffer;
diff --git a/src/iocore/cache/Stripe.cc b/src/iocore/cache/Stripe.cc
index e9feed52b3..5c35c0d82a 100644
--- a/src/iocore/cache/Stripe.cc
+++ b/src/iocore/cache/Stripe.cc
@@ -21,13 +21,17 @@
limitations under the License.
*/
-#include "iocore/cache/Cache.h"
#include "P_CacheDisk.h"
#include "P_CacheDoc.h"
#include "P_CacheInternal.h"
#include "P_CacheVol.h"
+#include "proxy/hdrs/HTTP.h"
+
+#include "tsutil/Metrics.h"
+
#include "iocore/eventsystem/EThread.h"
+#include "iocore/eventsystem/IOBuffer.h"
#include "iocore/eventsystem/Lock.h"
#include "tsutil/DbgCtl.h"
@@ -38,6 +42,8 @@
#include <cstring>
+using CacheHTTPInfo = HTTPInfo;
+
namespace
{
@@ -1010,6 +1016,159 @@ Stripe::shutdown(EThread *shutdown_thread)
Dbg(dbg_ctl_cache_dir_sync, "done syncing dir for vol %s",
this->hash_text.get());
}
+static void
+init_document(CacheVC const *vc, Doc *doc, int const len)
+{
+ doc->magic = DOC_MAGIC;
+ doc->len = len;
+ doc->hlen = vc->header_len;
+ doc->doc_type = vc->frag_type;
+ doc->v_major = CACHE_DB_MAJOR_VERSION;
+ doc->v_minor = CACHE_DB_MINOR_VERSION;
+ doc->unused = 0; // force this for forward compatibility.
+ doc->total_len = vc->total_len;
+ doc->first_key = vc->first_key;
+ doc->checksum = DOC_NO_CHECKSUM;
+}
+
+static void
+update_header_info(CacheVC *vc, Doc *doc)
+{
+ if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
+ ink_assert(vc->write_vector->count() > 0);
+ if (!vc->f.update && !vc->f.evac_vector) {
+ ink_assert(!(vc->first_key.is_zero()));
+ CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+ http_info->object_size_set(vc->total_len);
+ }
+ // update + data_written => Update case (b)
+ // need to change the old alternate's object length
+ if (vc->f.update && vc->total_len) {
+ CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+ http_info->object_size_set(vc->total_len);
+ }
+ ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
+ ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(),
vc->header_len));
+ } else {
+ memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
+ }
+}
+
+static void
+update_document_key(CacheVC *vc, Doc *doc)
+{
+ if (vc->f.use_first_key) {
+ if (doc->data_len() || vc->f.allow_empty_doc) {
+ doc->key = vc->earliest_key;
+ } else { // the vector is being written by itself
+ if (vc->earliest_key.is_zero()) {
+ do {
+ rand_CacheKey(&doc->key);
+ } while (DIR_MASK_TAG(doc->key.slice32(2)) ==
DIR_MASK_TAG(vc->first_key.slice32(2)));
+ } else {
+ prev_CacheKey(&doc->key, &vc->earliest_key);
+ }
+ }
+ dir_set_head(&vc->dir, true);
+ } else {
+ doc->key = vc->key;
+ dir_set_head(&vc->dir, !vc->fragment);
+ }
+}
+
+int
+Stripe::_copy_writer_to_aggregation(CacheVC *vc)
+{
+ off_t doc_offset{this->header->write_pos + this->get_agg_buf_pos()};
+ uint32_t len = vc->write_len + vc->header_len + vc->frag_len +
sizeof(Doc);
+ Doc *doc =
this->_write_buffer.emplace(this->round_to_approx_size(len));
+ IOBufferBlock *res_alt_blk = nullptr;
+
+ ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeof(Doc));
+ ink_assert(this->round_to_approx_size(len) == vc->agg_len);
+ // update copy of directory entry for this document
+ dir_set_approx_size(&vc->dir, vc->agg_len);
+ dir_set_offset(&vc->dir, this->offset_to_vol_offset(doc_offset));
+ ink_assert(this->vol_offset(&vc->dir) < (this->skip + this->len));
+ dir_set_phase(&vc->dir, this->header->phase);
+
+ // fill in document header
+ init_document(vc, doc, len);
+ doc->sync_serial = this->header->sync_serial;
+ vc->write_serial = doc->write_serial = this->header->write_serial;
+ if (vc->get_pin_in_cache()) {
+ dir_set_pinned(&vc->dir, 1);
+ doc->pin(vc->get_pin_in_cache());
+ } else {
+ dir_set_pinned(&vc->dir, 0);
+ doc->unpin();
+ }
+
+ update_document_key(vc, doc);
+
+ if (vc->f.rewrite_resident_alt) {
+ ink_assert(vc->f.use_first_key);
+ Doc *res_doc = reinterpret_cast<Doc *>(vc->first_buf->data());
+ res_alt_blk = new_IOBufferBlock(vc->first_buf, res_doc->data_len(),
sizeof(Doc) + res_doc->hlen);
+ doc->key = res_doc->key;
+ doc->total_len = res_doc->data_len();
+ }
+ // update the new_info object_key, and total_len and dirinfo
+ if (vc->header_len) {
+ ink_assert(vc->f.use_first_key);
+ update_header_info(vc, doc);
+ // the single fragment flag is not used in the write call.
+ // putting it in for completeness.
+ vc->f.single_fragment = doc->single_fragment();
+ }
+ // move data
+ if (vc->write_len) {
+ ink_assert(this->mutex.get()->thread_holding == this_ethread());
+
+ Metrics::Counter::increment(cache_rsb.write_bytes);
+ Metrics::Counter::increment(this->cache_vol->vol_rsb.write_bytes);
+
+ if (vc->f.rewrite_resident_alt) {
+ doc->set_data(vc->write_len, res_alt_blk, 0);
+ } else {
+ doc->set_data(vc->write_len, vc->blocks.get(), vc->offset);
+ }
+ }
+ if (cache_config_enable_checksum) {
+ doc->calculate_checksum();
+ }
+ if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment) {
+ ink_assert(doc->hlen);
+ }
+
+ if (res_alt_blk) {
+ res_alt_blk->free();
+ }
+
+ return vc->agg_len;
+}
+
+int
+Stripe::_copy_evacuator_to_aggregation(CacheVC *vc)
+{
+ Doc *doc = reinterpret_cast<Doc *>(vc->buf->data());
+ int approx_size = this->round_to_approx_size(doc->len);
+
+ Metrics::Counter::increment(cache_rsb.gc_frags_evacuated);
+ Metrics::Counter::increment(this->cache_vol->vol_rsb.gc_frags_evacuated);
+
+ doc->sync_serial = this->header->sync_serial;
+ doc->write_serial = this->header->write_serial;
+
+ off_t doc_offset{this->header->write_pos +
this->_write_buffer.get_buffer_pos()};
+ this->_write_buffer.add(doc, approx_size);
+
+ vc->dir = vc->overwrite_dir;
+ dir_set_offset(&vc->dir, this->offset_to_vol_offset(doc_offset));
+ dir_set_phase(&vc->dir, this->header->phase);
+ return approx_size;
+}
+
bool
Stripe::flush_aggregate_write_buffer()
{