This is an automated email from the ASF dual-hosted git repository.

jvanderzee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e08439c9e Clean up agg_copy (#11456)
6e08439c9e is described below

commit 6e08439c9ea79e9da3ed8b90c108348aeb85e890
Author: JosiahWI <[email protected]>
AuthorDate: Fri Jul 12 05:21:01 2024 -0500

    Clean up agg_copy (#11456)
    
    * Extract Stripe::_copy_evacuator_to_aggregation
    
    * Extract Stripe::_copy_writer_to_aggregation
    
    * Put positive condition first
    
    * Extract update_header_info
    
    * Extract Doc::set_data
    
    * Extract Doc::calculate_checksum
    
    * Extract update_document_key
    
    * Extract init_document
    
    * Extract Doc::pin and Doc::unpin
    
    * Constify block parameter to Doc::set_data
    
    * Remove unnecessary block
    
    * Fix rebase error
    
    * Fix rebase error
    
    * Implement suggestions by Masaori Koshiba
    
     * Elide temporary mutex variable for `ink_assert`
    
       It's obvious from the context of the method calls that the variable holds
       the raw stripe mutex. The temporary variable clarifies nothing.
    
     * Move the new `Doc` methods to a source file
    
       This adds the CacheDoc.cc file. If some of the methods such as `pin` and
       `unpin` cause a performance problem by not being inlined, we can easily
       move them back to the header. For now, keeping the implementation in a
       source file helps speed up compile times and limits dependency scope.
---
 src/iocore/cache/CMakeLists.txt |   1 +
 src/iocore/cache/CacheDoc.cc    |  90 ++++++++++++++++++++++
 src/iocore/cache/CacheWrite.cc  | 163 +---------------------------------------
 src/iocore/cache/P_CacheDoc.h   |   6 ++
 src/iocore/cache/P_CacheVol.h   |   2 +
 src/iocore/cache/Stripe.cc      | 161 ++++++++++++++++++++++++++++++++++++++-
 6 files changed, 262 insertions(+), 161 deletions(-)

diff --git a/src/iocore/cache/CMakeLists.txt b/src/iocore/cache/CMakeLists.txt
index a77b47b38d..3e0e83b9b1 100644
--- a/src/iocore/cache/CMakeLists.txt
+++ b/src/iocore/cache/CMakeLists.txt
@@ -21,6 +21,7 @@ add_library(
   Cache.cc
   CacheDir.cc
   CacheDisk.cc
+  CacheDoc.cc
   CacheEvacuateDocVC.cc
   CacheHosting.cc
   CacheHttp.cc
diff --git a/src/iocore/cache/CacheDoc.cc b/src/iocore/cache/CacheDoc.cc
new file mode 100644
index 0000000000..0a79651f24
--- /dev/null
+++ b/src/iocore/cache/CacheDoc.cc
@@ -0,0 +1,90 @@
+/** @file
+
+  Operations on cache documents (may also be called fragments).
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#include "P_CacheDoc.h"
+
+#include "iocore/eventsystem/IOBuffer.h"
+
+#include "tscore/ink_hrtime.h"
+
+#include <cstring>
+
+namespace
+{
+
+char *
+iobufferblock_memcpy(char *p, int len, IOBufferBlock const *ab, int offset)
+{
+  IOBufferBlock const *b = ab;
+  while (b && len >= 0) {
+    char *start      = b->_start;
+    char *end        = b->_end;
+    int   max_bytes  = end - start;
+    max_bytes       -= offset;
+    if (max_bytes <= 0) {
+      offset = -max_bytes;
+      b      = b->next.get();
+      continue;
+    }
+    int bytes = len;
+    if (bytes >= max_bytes) {
+      bytes = max_bytes;
+    }
+    ::memcpy(p, start + offset, bytes);
+    p      += bytes;
+    len    -= bytes;
+    b       = b->next.get();
+    offset  = 0;
+  }
+  return p;
+}
+
+} // namespace
+
+void
+Doc::set_data(int const len, IOBufferBlock const *block, int const offset)
+{
+  iobufferblock_memcpy(this->data(), len, block, offset);
+}
+
+void
+Doc::calculate_checksum()
+{
+  this->checksum = 0;
+  for (char *b = this->hdr(); b < reinterpret_cast<char *>(this) + this->len; 
b++) {
+    this->checksum += *b;
+  }
+}
+
+void
+Doc::pin(std::uint32_t const pin_in_cache)
+{
+  // coverity[Y2K38_SAFETY:FALSE]
+  this->pinned = static_cast<uint32_t>(ink_get_hrtime() / HRTIME_SECOND) + 
pin_in_cache;
+}
+
+void
+Doc::unpin()
+{
+  this->pinned = 0;
+}
diff --git a/src/iocore/cache/CacheWrite.cc b/src/iocore/cache/CacheWrite.cc
index bfd60326b3..f947bacbfd 100644
--- a/src/iocore/cache/CacheWrite.cc
+++ b/src/iocore/cache/CacheWrite.cc
@@ -273,33 +273,6 @@ CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */)
   return EVENT_CONT;
 }
 
-static char *
-iobufferblock_memcpy(char *p, int len, IOBufferBlock *ab, int offset)
-{
-  IOBufferBlock *b = ab;
-  while (b && len >= 0) {
-    char *start      = b->_start;
-    char *end        = b->_end;
-    int   max_bytes  = end - start;
-    max_bytes       -= offset;
-    if (max_bytes <= 0) {
-      offset = -max_bytes;
-      b      = b->next.get();
-      continue;
-    }
-    int bytes = len;
-    if (bytes >= max_bytes) {
-      bytes = max_bytes;
-    }
-    ::memcpy(p, start + offset, bytes);
-    p      += bytes;
-    len    -= bytes;
-    b       = b->next.get();
-    offset  = 0;
-  }
-  return p;
-}
-
 EvacuationBlock *
 Stripe::force_evacuate_head(Dir const *evac_dir, int pinned)
 {
@@ -641,140 +614,10 @@ Stripe::evac_range(off_t low, off_t high, int evac_phase)
 int
 Stripe::_agg_copy(CacheVC *vc)
 {
-  off_t o = this->header->write_pos + this->get_agg_buf_pos();
-
-  if (!vc->f.evacuator) {
-    uint32_t       len         = vc->write_len + vc->header_len + vc->frag_len 
+ sizeof(Doc);
-    Doc           *doc         = 
this->_write_buffer.emplace(this->round_to_approx_size(len));
-    IOBufferBlock *res_alt_blk = nullptr;
-
-    ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeof(Doc));
-    ink_assert(this->round_to_approx_size(len) == vc->agg_len);
-    // update copy of directory entry for this document
-    dir_set_approx_size(&vc->dir, vc->agg_len);
-    dir_set_offset(&vc->dir, this->offset_to_vol_offset(o));
-    ink_assert(this->vol_offset(&vc->dir) < (this->skip + this->len));
-    dir_set_phase(&vc->dir, this->header->phase);
-
-    // fill in document header
-    doc->magic       = DOC_MAGIC;
-    doc->len         = len;
-    doc->hlen        = vc->header_len;
-    doc->doc_type    = vc->frag_type;
-    doc->v_major     = CACHE_DB_MAJOR_VERSION;
-    doc->v_minor     = CACHE_DB_MINOR_VERSION;
-    doc->unused      = 0; // force this for forward compatibility.
-    doc->total_len   = vc->total_len;
-    doc->first_key   = vc->first_key;
-    doc->sync_serial = this->header->sync_serial;
-    vc->write_serial = doc->write_serial = this->header->write_serial;
-    doc->checksum                        = DOC_NO_CHECKSUM;
-    if (vc->pin_in_cache) {
-      dir_set_pinned(&vc->dir, 1);
-      // coverity[Y2K38_SAFETY:FALSE]
-      doc->pinned = static_cast<uint32_t>(ink_get_hrtime() / HRTIME_SECOND) + 
vc->pin_in_cache;
-    } else {
-      dir_set_pinned(&vc->dir, 0);
-      doc->pinned = 0;
-    }
-
-    if (vc->f.use_first_key) {
-      if (doc->data_len() || vc->f.allow_empty_doc) {
-        doc->key = vc->earliest_key;
-      } else { // the vector is being written by itself
-        if (vc->earliest_key.is_zero()) {
-          do {
-            rand_CacheKey(&doc->key);
-          } while (DIR_MASK_TAG(doc->key.slice32(2)) == 
DIR_MASK_TAG(vc->first_key.slice32(2)));
-        } else {
-          prev_CacheKey(&doc->key, &vc->earliest_key);
-        }
-      }
-      dir_set_head(&vc->dir, true);
-    } else {
-      doc->key = vc->key;
-      dir_set_head(&vc->dir, !vc->fragment);
-    }
-
-    if (vc->f.rewrite_resident_alt) {
-      ink_assert(vc->f.use_first_key);
-      Doc *res_doc   = reinterpret_cast<Doc *>(vc->first_buf->data());
-      res_alt_blk    = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), 
sizeof(Doc) + res_doc->hlen);
-      doc->key       = res_doc->key;
-      doc->total_len = res_doc->data_len();
-    }
-    // update the new_info object_key, and total_len and dirinfo
-    if (vc->header_len) {
-      ink_assert(vc->f.use_first_key);
-      if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
-        ink_assert(vc->write_vector->count() > 0);
-        if (!vc->f.update && !vc->f.evac_vector) {
-          ink_assert(!(vc->first_key.is_zero()));
-          CacheHTTPInfo *http_info = 
vc->write_vector->get(vc->alternate_index);
-          http_info->object_size_set(vc->total_len);
-        }
-        // update + data_written =>  Update case (b)
-        // need to change the old alternate's object length
-        if (vc->f.update && vc->total_len) {
-          CacheHTTPInfo *http_info = 
vc->write_vector->get(vc->alternate_index);
-          http_info->object_size_set(vc->total_len);
-        }
-        ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
-        ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), 
vc->header_len));
-      } else {
-        memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
-      }
-      // the single fragment flag is not used in the write call.
-      // putting it in for completeness.
-      vc->f.single_fragment = doc->single_fragment();
-    }
-    // move data
-    if (vc->write_len) {
-      {
-        ProxyMutex *mutex ATS_UNUSED = this->mutex.get();
-        ink_assert(mutex->thread_holding == this_ethread());
-
-        Metrics::Counter::increment(cache_rsb.write_bytes, vc->write_len);
-        Metrics::Counter::increment(this->cache_vol->vol_rsb.write_bytes, 
vc->write_len);
-      }
-      if (vc->f.rewrite_resident_alt) {
-        iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0);
-      } else {
-        iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks.get(), 
vc->offset);
-      }
-    }
-    if (cache_config_enable_checksum) {
-      doc->checksum = 0;
-      for (char *b = doc->hdr(); b < reinterpret_cast<char *>(doc) + doc->len; 
b++) {
-        doc->checksum += *b;
-      }
-    }
-    if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment) {
-      ink_assert(doc->hlen);
-    }
-
-    if (res_alt_blk) {
-      res_alt_blk->free();
-    }
-
-    return vc->agg_len;
+  if (vc->f.evacuator) {
+    return this->_copy_evacuator_to_aggregation(vc);
   } else {
-    // for evacuated documents, copy the data, and update directory
-    Doc *doc = reinterpret_cast<Doc *>(vc->buf->data());
-    int  l   = this->round_to_approx_size(doc->len);
-
-    Metrics::Counter::increment(cache_rsb.gc_frags_evacuated);
-    Metrics::Counter::increment(this->cache_vol->vol_rsb.gc_frags_evacuated);
-
-    doc->sync_serial  = this->header->sync_serial;
-    doc->write_serial = this->header->write_serial;
-
-    this->_write_buffer.add(doc, l);
-
-    vc->dir = vc->overwrite_dir;
-    dir_set_offset(&vc->dir, this->offset_to_vol_offset(o));
-    dir_set_phase(&vc->dir, this->header->phase);
-    return l;
+    return this->_copy_writer_to_aggregation(vc);
   }
 }
 
diff --git a/src/iocore/cache/P_CacheDoc.h b/src/iocore/cache/P_CacheDoc.h
index d1c35063a1..d501ca5308 100644
--- a/src/iocore/cache/P_CacheDoc.h
+++ b/src/iocore/cache/P_CacheDoc.h
@@ -23,6 +23,8 @@
 
 #pragma once
 
+#include "iocore/eventsystem/IOBuffer.h"
+
 #include "tscore/CryptoHash.h"
 
 #include <cstdint>
@@ -63,6 +65,10 @@ struct Doc {
   int      single_fragment() const;
   char    *hdr();
   char    *data();
+  void     set_data(int len, IOBufferBlock const *block, int offset);
+  void     calculate_checksum();
+  void     pin(std::uint32_t const pin_in_cache);
+  void     unpin();
 
   using self_type = Doc;
 };
diff --git a/src/iocore/cache/P_CacheVol.h b/src/iocore/cache/P_CacheVol.h
index aa3a5a8614..2f1e8377a8 100644
--- a/src/iocore/cache/P_CacheVol.h
+++ b/src/iocore/cache/P_CacheVol.h
@@ -330,6 +330,8 @@ private:
   void _init_data_internal();
   void _init_data();
   int  _agg_copy(CacheVC *vc);
+  int  _copy_writer_to_aggregation(CacheVC *vc);
+  int  _copy_evacuator_to_aggregation(CacheVC *vc);
   bool flush_aggregate_write_buffer();
 
   AggregateWriteBuffer _write_buffer;
diff --git a/src/iocore/cache/Stripe.cc b/src/iocore/cache/Stripe.cc
index e9feed52b3..5c35c0d82a 100644
--- a/src/iocore/cache/Stripe.cc
+++ b/src/iocore/cache/Stripe.cc
@@ -21,13 +21,17 @@
   limitations under the License.
  */
 
-#include "iocore/cache/Cache.h"
 #include "P_CacheDisk.h"
 #include "P_CacheDoc.h"
 #include "P_CacheInternal.h"
 #include "P_CacheVol.h"
 
+#include "proxy/hdrs/HTTP.h"
+
+#include "tsutil/Metrics.h"
+
 #include "iocore/eventsystem/EThread.h"
+#include "iocore/eventsystem/IOBuffer.h"
 #include "iocore/eventsystem/Lock.h"
 
 #include "tsutil/DbgCtl.h"
@@ -38,6 +42,8 @@
 
 #include <cstring>
 
+using CacheHTTPInfo = HTTPInfo;
+
 namespace
 {
 
@@ -1010,6 +1016,159 @@ Stripe::shutdown(EThread *shutdown_thread)
   Dbg(dbg_ctl_cache_dir_sync, "done syncing dir for vol %s", 
this->hash_text.get());
 }
 
+static void
+init_document(CacheVC const *vc, Doc *doc, int const len)
+{
+  doc->magic     = DOC_MAGIC;
+  doc->len       = len;
+  doc->hlen      = vc->header_len;
+  doc->doc_type  = vc->frag_type;
+  doc->v_major   = CACHE_DB_MAJOR_VERSION;
+  doc->v_minor   = CACHE_DB_MINOR_VERSION;
+  doc->unused    = 0; // force this for forward compatibility.
+  doc->total_len = vc->total_len;
+  doc->first_key = vc->first_key;
+  doc->checksum  = DOC_NO_CHECKSUM;
+}
+
+static void
+update_header_info(CacheVC *vc, Doc *doc)
+{
+  if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
+    ink_assert(vc->write_vector->count() > 0);
+    if (!vc->f.update && !vc->f.evac_vector) {
+      ink_assert(!(vc->first_key.is_zero()));
+      CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+      http_info->object_size_set(vc->total_len);
+    }
+    // update + data_written =>  Update case (b)
+    // need to change the old alternate's object length
+    if (vc->f.update && vc->total_len) {
+      CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+      http_info->object_size_set(vc->total_len);
+    }
+    ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
+    ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), 
vc->header_len));
+  } else {
+    memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
+  }
+}
+
+static void
+update_document_key(CacheVC *vc, Doc *doc)
+{
+  if (vc->f.use_first_key) {
+    if (doc->data_len() || vc->f.allow_empty_doc) {
+      doc->key = vc->earliest_key;
+    } else { // the vector is being written by itself
+      if (vc->earliest_key.is_zero()) {
+        do {
+          rand_CacheKey(&doc->key);
+        } while (DIR_MASK_TAG(doc->key.slice32(2)) == 
DIR_MASK_TAG(vc->first_key.slice32(2)));
+      } else {
+        prev_CacheKey(&doc->key, &vc->earliest_key);
+      }
+    }
+    dir_set_head(&vc->dir, true);
+  } else {
+    doc->key = vc->key;
+    dir_set_head(&vc->dir, !vc->fragment);
+  }
+}
+
+int
+Stripe::_copy_writer_to_aggregation(CacheVC *vc)
+{
+  off_t          doc_offset{this->header->write_pos + this->get_agg_buf_pos()};
+  uint32_t       len         = vc->write_len + vc->header_len + vc->frag_len + 
sizeof(Doc);
+  Doc           *doc         = 
this->_write_buffer.emplace(this->round_to_approx_size(len));
+  IOBufferBlock *res_alt_blk = nullptr;
+
+  ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeof(Doc));
+  ink_assert(this->round_to_approx_size(len) == vc->agg_len);
+  // update copy of directory entry for this document
+  dir_set_approx_size(&vc->dir, vc->agg_len);
+  dir_set_offset(&vc->dir, this->offset_to_vol_offset(doc_offset));
+  ink_assert(this->vol_offset(&vc->dir) < (this->skip + this->len));
+  dir_set_phase(&vc->dir, this->header->phase);
+
+  // fill in document header
+  init_document(vc, doc, len);
+  doc->sync_serial = this->header->sync_serial;
+  vc->write_serial = doc->write_serial = this->header->write_serial;
+  if (vc->get_pin_in_cache()) {
+    dir_set_pinned(&vc->dir, 1);
+    doc->pin(vc->get_pin_in_cache());
+  } else {
+    dir_set_pinned(&vc->dir, 0);
+    doc->unpin();
+  }
+
+  update_document_key(vc, doc);
+
+  if (vc->f.rewrite_resident_alt) {
+    ink_assert(vc->f.use_first_key);
+    Doc *res_doc   = reinterpret_cast<Doc *>(vc->first_buf->data());
+    res_alt_blk    = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), 
sizeof(Doc) + res_doc->hlen);
+    doc->key       = res_doc->key;
+    doc->total_len = res_doc->data_len();
+  }
+  // update the new_info object_key, and total_len and dirinfo
+  if (vc->header_len) {
+    ink_assert(vc->f.use_first_key);
+    update_header_info(vc, doc);
+    // the single fragment flag is not used in the write call.
+    // putting it in for completeness.
+    vc->f.single_fragment = doc->single_fragment();
+  }
+  // move data
+  if (vc->write_len) {
+    ink_assert(this->mutex.get()->thread_holding == this_ethread());
+
+    Metrics::Counter::increment(cache_rsb.write_bytes);
+    Metrics::Counter::increment(this->cache_vol->vol_rsb.write_bytes);
+
+    if (vc->f.rewrite_resident_alt) {
+      doc->set_data(vc->write_len, res_alt_blk, 0);
+    } else {
+      doc->set_data(vc->write_len, vc->blocks.get(), vc->offset);
+    }
+  }
+  if (cache_config_enable_checksum) {
+    doc->calculate_checksum();
+  }
+  if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment) {
+    ink_assert(doc->hlen);
+  }
+
+  if (res_alt_blk) {
+    res_alt_blk->free();
+  }
+
+  return vc->agg_len;
+}
+
+int
+Stripe::_copy_evacuator_to_aggregation(CacheVC *vc)
+{
+  Doc *doc         = reinterpret_cast<Doc *>(vc->buf->data());
+  int  approx_size = this->round_to_approx_size(doc->len);
+
+  Metrics::Counter::increment(cache_rsb.gc_frags_evacuated);
+  Metrics::Counter::increment(this->cache_vol->vol_rsb.gc_frags_evacuated);
+
+  doc->sync_serial  = this->header->sync_serial;
+  doc->write_serial = this->header->write_serial;
+
+  off_t doc_offset{this->header->write_pos + 
this->_write_buffer.get_buffer_pos()};
+  this->_write_buffer.add(doc, approx_size);
+
+  vc->dir = vc->overwrite_dir;
+  dir_set_offset(&vc->dir, this->offset_to_vol_offset(doc_offset));
+  dir_set_phase(&vc->dir, this->header->phase);
+  return approx_size;
+}
+
 bool
 Stripe::flush_aggregate_write_buffer()
 {

Reply via email to