This is an automated email from the ASF dual-hosted git repository.
bneradt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 53916a16dc Split CacheVC into its own source file (#10599)
53916a16dc is described below
commit 53916a16dcba89827520ec1d28c6572401401e44
Author: JosiahWI <[email protected]>
AuthorDate: Thu Oct 12 20:58:39 2023 -0500
Split CacheVC into its own source file (#10599)
---
iocore/cache/CMakeLists.txt | 1 +
iocore/cache/Cache.cc | 544 ---------------------------------------
iocore/cache/CacheVC.cc | 609 ++++++++++++++++++++++++++++++++++++++++++++
iocore/cache/Makefile.am | 1 +
iocore/cache/P_CacheDisk.h | 2 +
5 files changed, 613 insertions(+), 544 deletions(-)
diff --git a/iocore/cache/CMakeLists.txt b/iocore/cache/CMakeLists.txt
index c283b90ff6..d9c12a9d91 100644
--- a/iocore/cache/CMakeLists.txt
+++ b/iocore/cache/CMakeLists.txt
@@ -25,6 +25,7 @@ add_library(inkcache STATIC
CachePages.cc
CachePagesInternal.cc
CacheRead.cc
+ CacheVC.cc
CacheVol.cc
CacheWrite.cc
RamCacheCLFUS.cc
diff --git a/iocore/cache/Cache.cc b/iocore/cache/Cache.cc
index 35f8842c13..516feab766 100644
--- a/iocore/cache/Cache.cc
+++ b/iocore/cache/Cache.cc
@@ -43,10 +43,6 @@
constexpr ts::VersionNumber CACHE_DB_VERSION(CACHE_DB_MAJOR_VERSION,
CACHE_DB_MINOR_VERSION);
-// Compilation Options
-#define USELESS_REENABLES // allow them for now
-// #define VERIFY_JTEST_DATA
-
static size_t DEFAULT_RAM_CACHE_MULTIPLIER = 10; // I.e. 10x 1MB per 1GB of
disk.
// This is the oldest version number that is still usable.
@@ -105,7 +101,6 @@ ClassAllocator<CacheVC>
cacheVConnectionAllocator("cacheVConnection");
ClassAllocator<EvacuationBlock> evacuationBlockAllocator("evacuationBlock");
ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator("cacheRemoveCont");
ClassAllocator<EvacuationKey> evacuationKeyAllocator("evacuationKey");
-int CacheVC::size_to_init = -1;
namespace
{
@@ -113,18 +108,8 @@ namespace
DbgCtl dbg_ctl_cache_init{"cache_init"};
DbgCtl dbg_ctl_cache_remove{"cache_remove"};
DbgCtl dbg_ctl_cache_hosting{"cache_hosting"};
-DbgCtl dbg_ctl_cache_bc{"cache_bc"};
-DbgCtl dbg_ctl_cache_read{"cache_read"};
-DbgCtl dbg_ctl_cache_disk_error{"cache_disk_error"};
DbgCtl dbg_ctl_ram_cache{"ram_cache"};
-#ifdef DEBUG
-
-DbgCtl dbg_ctl_cache_close{"cache_close"};
-DbgCtl dbg_ctl_cache_reenable{"cache_reenable"};
-
-#endif
-
} // end anonymous namespace
struct VolInitInfo {
@@ -313,210 +298,6 @@ update_cache_config(const char * /* name ATS_UNUSED */,
RecDataT /* data_type AT
return 0;
}
-CacheVC::CacheVC()
-{
- size_to_init = sizeof(CacheVC) - (size_t) & ((CacheVC *)nullptr)->vio;
- memset((void *)&vio, 0, size_to_init);
-}
-
-HTTPInfo::FragOffset *
-CacheVC::get_frag_table()
-{
- ink_assert(alternate.valid());
- return alternate.valid() ? alternate.get_frag_table() : nullptr;
-}
-
-VIO *
-CacheVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *abuf)
-{
- ink_assert(vio.op == VIO::READ);
- vio.buffer.writer_for(abuf);
- vio.set_continuation(c);
- vio.ndone = 0;
- vio.nbytes = nbytes;
- vio.vc_server = this;
-#ifdef DEBUG
- ink_assert(!c || c->mutex->thread_holding);
-#endif
- if (c && !trigger && !recursive) {
- trigger = c->mutex->thread_holding->schedule_imm_local(this);
- }
- return &vio;
-}
-
-VIO *
-CacheVC::do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *abuf, int64_t
offset)
-{
- ink_assert(vio.op == VIO::READ);
- vio.buffer.writer_for(abuf);
- vio.set_continuation(c);
- vio.ndone = 0;
- vio.nbytes = nbytes;
- vio.vc_server = this;
- seek_to = offset;
-#ifdef DEBUG
- ink_assert(c->mutex->thread_holding);
-#endif
- if (!trigger && !recursive) {
- trigger = c->mutex->thread_holding->schedule_imm_local(this);
- }
- return &vio;
-}
-
-VIO *
-CacheVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuf,
bool owner)
-{
- ink_assert(vio.op == VIO::WRITE);
- ink_assert(!owner);
- vio.buffer.reader_for(abuf);
- vio.set_continuation(c);
- vio.ndone = 0;
- vio.nbytes = nbytes;
- vio.vc_server = this;
-#ifdef DEBUG
- ink_assert(!c || c->mutex->thread_holding);
-#endif
- if (c && !trigger && !recursive) {
- trigger = c->mutex->thread_holding->schedule_imm_local(this);
- }
- return &vio;
-}
-
-void
-CacheVC::do_io_close(int alerrno)
-{
- ink_assert(mutex->thread_holding == this_ethread());
- int previous_closed = closed;
- closed = (alerrno == -1) ? 1 : -1; // Stupid default arguments
- DDbg(dbg_ctl_cache_close, "do_io_close %p %d %d", this, alerrno, closed);
- if (!previous_closed && !recursive) {
- die();
- }
-}
-
-void
-CacheVC::reenable(VIO *avio)
-{
- DDbg(dbg_ctl_cache_reenable, "reenable %p", this);
- (void)avio;
-#ifdef DEBUG
- ink_assert(avio->mutex->thread_holding);
-#endif
- if (!trigger) {
-#ifndef USELESS_REENABLES
- if (vio.op == VIO::READ) {
- if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark)
- ink_assert(!"useless reenable of cache read");
- } else if (!vio.buffer.reader()->read_avail())
- ink_assert(!"useless reenable of cache write");
-#endif
- trigger = avio->mutex->thread_holding->schedule_imm_local(this);
- }
-}
-
-void
-CacheVC::reenable_re(VIO *avio)
-{
- DDbg(dbg_ctl_cache_reenable, "reenable_re %p", this);
- (void)avio;
-#ifdef DEBUG
- ink_assert(avio->mutex->thread_holding);
-#endif
- if (!trigger) {
- if (!is_io_in_progress() && !recursive) {
- handleEvent(EVENT_NONE, (void *)nullptr);
- } else {
- trigger = avio->mutex->thread_holding->schedule_imm_local(this);
- }
- }
-}
-
-bool
-CacheVC::get_data(int i, void *data)
-{
- switch (i) {
- case CACHE_DATA_HTTP_INFO:
- *(static_cast<CacheHTTPInfo **>(data)) = &alternate;
- return true;
- case CACHE_DATA_RAM_CACHE_HIT_FLAG:
- *(static_cast<int *>(data)) = !f.not_from_ram_cache;
- return true;
- default:
- break;
- }
- return false;
-}
-
-int64_t
-CacheVC::get_object_size()
-{
- return (this)->doc_len;
-}
-
-bool
-CacheVC::set_data(int /* i ATS_UNUSED */, void * /* data */)
-{
- ink_assert(!"CacheVC::set_data should not be called!");
- return true;
-}
-
-void
-CacheVC::get_http_info(CacheHTTPInfo **ainfo)
-{
- *ainfo = &(this)->alternate;
-}
-
-// set_http_info must be called before do_io_write
-// cluster vc does an optimization where it calls do_io_write() before
-// calling set_http_info(), but it guarantees that the info will
-// be set before transferring any bytes
-void
-CacheVC::set_http_info(CacheHTTPInfo *ainfo)
-{
- ink_assert(!total_len);
- if (f.update) {
- ainfo->object_key_set(update_key);
- ainfo->object_size_set(update_len);
- } else {
- ainfo->object_key_set(earliest_key);
- // don't know the total len yet
- }
-
- MIMEField *field =
ainfo->m_alt->m_response_hdr.field_find(MIME_FIELD_CONTENT_LENGTH,
MIME_LEN_CONTENT_LENGTH);
- if ((field && !field->value_get_int64()) ||
ainfo->m_alt->m_response_hdr.status_get() == HTTP_STATUS_NO_CONTENT) {
- f.allow_empty_doc = 1;
- // Set the object size here to zero in case this is a cache replace where
the new object
- // length is zero but the old object was not.
- ainfo->object_size_set(0);
- } else {
- f.allow_empty_doc = 0;
- }
-
- alternate.copy_shallow(ainfo);
- ainfo->clear();
-}
-
-bool
-CacheVC::set_pin_in_cache(time_t time_pin)
-{
- if (total_len) {
- ink_assert(!"should Pin the document before writing");
- return false;
- }
- if (vio.op != VIO::WRITE) {
- ink_assert(!"Pinning only allowed while writing objects to the cache");
- return false;
- }
- pin_in_cache = time_pin;
- return true;
-}
-
-time_t
-CacheVC::get_pin_in_cache()
-{
- return pin_in_cache;
-}
-
int
Vol::begin_read(CacheVC *cont) const
{
@@ -2125,248 +1906,6 @@ Cache::close()
return -1;
}
-int
-CacheVC::dead(int /* event ATS_UNUSED */, Event * /*e ATS_UNUSED */)
-{
- ink_assert(0);
- return EVENT_DONE;
-}
-
-bool
-CacheVC::is_pread_capable()
-{
- return !f.read_from_writer_called;
-}
-
-#define STORE_COLLISION 1
-
-static void
-unmarshal_helper(Doc *doc, Ptr<IOBufferData> &buf, int &okay)
-{
- using UnmarshalFunc = int(char *buf, int len, RefCountObj
*block_ref);
- UnmarshalFunc *unmarshal_func = &HTTPInfo::unmarshal;
- ts::VersionNumber version(doc->v_major, doc->v_minor);
-
- // introduced by https://github.com/apache/trafficserver/pull/4874, this is
used to distinguish the doc version
- // before and after #4847
- if (version < CACHE_DB_VERSION) {
- unmarshal_func = &HTTPInfo::unmarshal_v24_1;
- }
-
- char *tmp = doc->hdr();
- int len = doc->hlen;
- while (len > 0) {
- int r = unmarshal_func(tmp, len, buf.get());
- if (r < 0) {
- ink_assert(!"CacheVC::handleReadDone unmarshal failed");
- okay = 0;
- break;
- }
- len -= r;
- tmp += r;
- }
-}
-
-// [amc] I think this is where all disk reads from cache funnel through here.
-int
-CacheVC::handleReadDone(int event, Event *e)
-{
- cancel_trigger();
- ink_assert(this_ethread() == mutex->thread_holding);
-
- Doc *doc = nullptr;
- if (event == AIO_EVENT_DONE) {
- set_io_not_in_progress();
- } else if (is_io_in_progress()) {
- return EVENT_CONT;
- }
- if (DISK_BAD(vol->disk)) {
- io.aio_result = -1;
- Warning("Canceling cache read: disk %s is bad.", vol->hash_text.get());
- goto Ldone;
- }
- {
- MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
- if (!lock.is_locked()) {
- VC_SCHED_LOCK_RETRY();
- }
- if ((!dir_valid(vol, &dir)) || (!io.ok())) {
- if (!io.ok()) {
- Dbg(dbg_ctl_cache_disk_error, "Read error on disk %s\n \
- read range : [%" PRIu64 " - %" PRIu64 " bytes] [%" PRIu64 " - %"
PRIu64 " blocks] \n",
- vol->hash_text.get(), (uint64_t)io.aiocb.aio_offset,
(uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
- (uint64_t)io.aiocb.aio_offset / 512,
(uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / 512);
- }
- goto Ldone;
- }
-
- doc = reinterpret_cast<Doc *>(buf->data());
- ink_assert(vol->mutex->nthread_holding < 1000);
- ink_assert(doc->magic == DOC_MAGIC);
-
- if (ts::VersionNumber(doc->v_major, doc->v_minor) > CACHE_DB_VERSION) {
- // future version, count as corrupted
- doc->magic = DOC_CORRUPT;
- Dbg(dbg_ctl_cache_bc, "Object is future version %d:%d - disk %s - doc id
= %" PRIx64 ":%" PRIx64 "", doc->v_major,
- doc->v_minor, vol->hash_text.get(), read_key->slice64(0),
read_key->slice64(1));
- goto Ldone;
- }
-
-#ifdef VERIFY_JTEST_DATA
- char xx[500];
- if (read_key && *read_key == doc->key && request.valid() &&
!dir_head(&dir) && !vio.ndone) {
- int ib = 0, xd = 0;
- request.url_get()->print(xx, 500, &ib, &xd);
- char *x = xx;
- for (int q = 0; q < 3; q++)
- x = strchr(x + 1, '/');
- ink_assert(!memcmp(doc->data(), x, ib - (x - xx)));
- }
-#endif
-
- if (dbg_ctl_cache_read.on()) {
- char xt[CRYPTO_HEX_SIZE];
- Dbg(dbg_ctl_cache_read,
- "Read complete on fragment %s. Length: data payload=%d this
fragment=%d total doc=%" PRId64 " prefix=%d",
- doc->key.toHexStr(xt), doc->data_len(), doc->len, doc->total_len,
doc->prefix_len());
- }
-
- // put into ram cache?
- if (io.ok() && ((doc->first_key == *read_key) || (doc->key == *read_key)
|| STORE_COLLISION) && doc->magic == DOC_MAGIC) {
- int okay = 1;
- if (!f.doc_from_ram_cache) {
- f.not_from_ram_cache = 1;
- }
- if (cache_config_enable_checksum && doc->checksum != DOC_NO_CHECKSUM) {
- // verify that the checksum matches
- uint32_t checksum = 0;
- for (char *b = doc->hdr(); b < reinterpret_cast<char *>(doc) +
doc->len; b++) {
- checksum += *b;
- }
- ink_assert(checksum == doc->checksum);
- if (checksum != doc->checksum) {
- Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "] len %d,
hlen %d, disk %s, offset %" PRIu64 " size %zu",
- doc->first_key.b[0], doc->first_key.b[1], doc->len, doc->hlen,
vol->path, (uint64_t)io.aiocb.aio_offset,
- (size_t)io.aiocb.aio_nbytes);
- doc->magic = DOC_CORRUPT;
- okay = 0;
- }
- }
- (void)e; // Avoid compiler warnings
- bool http_copy_hdr = false;
- http_copy_hdr =
- cache_config_ram_cache_compress && !f.doc_from_ram_cache &&
doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen;
- // If http doc we need to unmarshal the headers before putting in the
ram cache
- // unless it could be compressed
- if (!http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen
&& okay) {
- unmarshal_helper(doc, buf, okay);
- }
- // Put the request in the ram cache only if its a open_read or lookup
- if (vio.op == VIO::READ && okay) {
- bool cutoff_check;
- // cutoff_check :
- // doc_len == 0 for the first fragment (it is set from the vector)
- // The decision on the first fragment is based on
- // doc->total_len
- // After that, the decision is based of doc_len (doc_len != 0)
- // (cache_config_ram_cache_cutoff == 0) : no cutoffs
- cutoff_check =
- ((!doc_len && static_cast<int64_t>(doc->total_len) <
cache_config_ram_cache_cutoff) ||
- (doc_len && static_cast<int64_t>(doc_len) <
cache_config_ram_cache_cutoff) || !cache_config_ram_cache_cutoff);
- if (cutoff_check && !f.doc_from_ram_cache) {
- uint64_t o = dir_offset(&dir);
- vol->ram_cache->put(read_key, buf.get(), doc->len, http_copy_hdr, o);
- }
- if (!doc_len) {
- // keep a pointer to it. In case the state machine decides to
- // update this document, we don't have to read it back in memory
- // again
- vol->first_fragment_key = *read_key;
- vol->first_fragment_offset = dir_offset(&dir);
- vol->first_fragment_data = buf;
- }
- } // end VIO::READ check
- // If it could be compressed, unmarshal after
- if (http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen
&& okay) {
- unmarshal_helper(doc, buf, okay);
- }
- } // end io.ok() check
- }
-Ldone:
- POP_HANDLER;
- return handleEvent(AIO_EVENT_DONE, nullptr);
-}
-
-int
-CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
-{
- cancel_trigger();
-
- f.doc_from_ram_cache = false;
-
- // check ram cache
- ink_assert(vol->mutex->thread_holding == this_ethread());
- int64_t o = dir_offset(&dir);
- int ram_hit_state = vol->ram_cache->get(read_key, &buf,
static_cast<uint64_t>(o));
- f.compressed_in_ram = (ram_hit_state > RAM_HIT_COMPRESS_NONE) ? 1 : 0;
- if (ram_hit_state >= RAM_HIT_COMPRESS_NONE) {
- goto LramHit;
- }
-
- // check if it was read in the last open_read call
- if (*read_key == vol->first_fragment_key && dir_offset(&dir) ==
vol->first_fragment_offset) {
- buf = vol->first_fragment_data;
- goto LmemHit;
- }
- // see if its in the aggregation buffer
- if (dir_agg_buf_valid(vol, &dir)) {
- int agg_offset = vol->vol_offset(&dir) - vol->header->write_pos;
- buf =
new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes,
MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
- ink_assert((agg_offset + io.aiocb.aio_nbytes) <=
(unsigned)vol->agg_buf_pos);
- char *doc = buf->data();
- char *agg = vol->agg_buffer + agg_offset;
- memcpy(doc, agg, io.aiocb.aio_nbytes);
- io.aio_result = io.aiocb.aio_nbytes;
- SET_HANDLER(&CacheVC::handleReadDone);
- return EVENT_RETURN;
- }
-
- io.aiocb.aio_fildes = vol->fd;
- io.aiocb.aio_offset = vol->vol_offset(&dir);
- if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) >
static_cast<off_t>(vol->skip + vol->len)) {
- io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset;
- }
- buf =
new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes,
MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
- io.aiocb.aio_buf = buf->data();
- io.action = this;
- io.thread = mutex->thread_holding->tt == DEDICATED ?
AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
- SET_HANDLER(&CacheVC::handleReadDone);
- ink_assert(ink_aio_read(&io) >= 0);
-
-// ToDo: Why are these for debug only ??
-#if DEBUG
- Metrics::increment(cache_rsb.pread_count);
- Metrics::increment(vol->cache_vol->vol_rsb.pread_count);
-#endif
-
- return EVENT_CONT;
-
-LramHit: {
- f.doc_from_ram_cache = true;
- io.aio_result = io.aiocb.aio_nbytes;
- Doc *doc = reinterpret_cast<Doc *>(buf->data());
- if (cache_config_ram_cache_compress && doc->doc_type == CACHE_FRAG_TYPE_HTTP
&& doc->hlen) {
- SET_HANDLER(&CacheVC::handleReadDone);
- return EVENT_RETURN;
- }
-}
-LmemHit:
- f.doc_from_ram_cache = true;
- io.aio_result = io.aiocb.aio_nbytes;
- POP_HANDLER;
- return EVENT_RETURN; // allow the caller to release the volume lock
-}
-
Action *
Cache::lookup(Continuation *cont, const CacheKey *key, CacheFragType type,
const char *hostname, int host_len)
{
@@ -2395,89 +1934,6 @@ Cache::lookup(Continuation *cont, const CacheKey *key,
CacheFragType type, const
}
}
-int
-CacheVC::removeEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
-{
- cancel_trigger();
- set_io_not_in_progress();
- {
- MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
- if (!lock.is_locked()) {
- VC_SCHED_LOCK_RETRY();
- }
- if (_action.cancelled) {
- if (od) {
- vol->close_write(this);
- od = nullptr;
- }
- goto Lfree;
- }
- if (!f.remove_aborted_writers) {
- if (vol->open_write(this, true, 1)) {
- // writer exists
- od = vol->open_read(&key);
- ink_release_assert(od);
- od->dont_update_directory = true;
- od = nullptr;
- } else {
- od->dont_update_directory = true;
- }
- f.remove_aborted_writers = 1;
- }
- Lread:
- SET_HANDLER(&CacheVC::removeEvent);
- if (!buf) {
- goto Lcollision;
- }
- if (!dir_valid(vol, &dir)) {
- last_collision = nullptr;
- goto Lcollision;
- }
- // check read completed correct FIXME: remove bad vols
- if (!io.ok()) {
- goto Ldone;
- }
- {
- // verify that this is our document
- Doc *doc = reinterpret_cast<Doc *>(buf->data());
- /* should be first_key not key..right?? */
- if (doc->first_key == key) {
- ink_assert(doc->magic == DOC_MAGIC);
- if (dir_delete(&key, vol, &dir) > 0) {
- if (od) {
- vol->close_write(this);
- }
- od = nullptr;
- goto Lremoved;
- }
- goto Ldone;
- }
- }
- Lcollision:
- // check for collision
- if (dir_probe(&key, vol, &dir, &last_collision) > 0) {
- int ret = do_read_call(&key);
- if (ret == EVENT_RETURN) {
- goto Lread;
- }
- return ret;
- }
- Ldone:
-
Metrics::increment(cache_rsb.status[static_cast<int>(CacheOpType::Remove)].failure);
-
Metrics::increment(vol->cache_vol->vol_rsb.status[static_cast<int>(CacheOpType::Remove)].failure);
- if (od) {
- vol->close_write(this);
- }
- }
- ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
- _action.continuation->handleEvent(CACHE_EVENT_REMOVE_FAILED, (void
*)-ECACHE_NO_DOC);
- goto Lfree;
-Lremoved:
- _action.continuation->handleEvent(CACHE_EVENT_REMOVE, nullptr);
-Lfree:
- return free_CacheVC(this);
-}
-
Action *
Cache::remove(Continuation *cont, const CacheKey *key, CacheFragType type,
const char *hostname, int host_len)
{
diff --git a/iocore/cache/CacheVC.cc b/iocore/cache/CacheVC.cc
new file mode 100644
index 0000000000..ef8d5f6609
--- /dev/null
+++ b/iocore/cache/CacheVC.cc
@@ -0,0 +1,609 @@
+/** @file
+
+ A brief file description
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#include "I_CacheDefs.h"
+#include "P_CacheDisk.h"
+#include "P_CacheHttp.h"
+#include "P_CacheInternal.h"
+#include "P_CacheVol.h"
+
+// hdrs
+#include "HTTP.h"
+#include "MIME.h"
+
+// aio
+#include "I_AIO.h"
+
+// tsapi
+#if DEBUG
+#include "api/Metrics.h"
+#endif
+
+// inkevent
+#include "I_Continuation.h"
+#include "I_EThread.h"
+#include "I_Event.h"
+#include "I_IOBuffer.h"
+#include "I_Lock.h"
+#include "I_VIO.h"
+
+// tscppapi
+#include "tscpp/api/HttpStatus.h"
+
+// tscore
+#include "tscore/I_Version.h"
+#include "tscore/ink_assert.h"
+#include "tscore/Ptr.h"
+
+// ts
+#include "ts/DbgCtl.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <ctime>
+
+DbgCtl dbg_ctl_cache_bc{"cache_bc"};
+DbgCtl dbg_ctl_cache_disk_error{"cache_disk_error"};
+DbgCtl dbg_ctl_cache_read{"cache_read"};
+#ifdef DEBUG
+DbgCtl dbg_ctl_cache_close{"cache_close"};
+DbgCtl dbg_ctl_cache_reenable{"cache_reenable"};
+#endif
+
+// Compilation Options
+#define STORE_COLLISION 1
+#define USELESS_REENABLES // allow them for now
+// #define VERIFY_JTEST_DATA
+
+extern int cache_config_ram_cache_cutoff;
+
+int CacheVC::size_to_init = -1;
+
+CacheVC::CacheVC()
+{
+ size_to_init = sizeof(CacheVC) - (size_t) & ((CacheVC *)nullptr)->vio;
+ memset((void *)&vio, 0, size_to_init);
+}
+
+HTTPInfo::FragOffset *
+CacheVC::get_frag_table()
+{
+ ink_assert(alternate.valid());
+ return alternate.valid() ? alternate.get_frag_table() : nullptr;
+}
+
+VIO *
+CacheVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *abuf)
+{
+ ink_assert(vio.op == VIO::READ);
+ vio.buffer.writer_for(abuf);
+ vio.set_continuation(c);
+ vio.ndone = 0;
+ vio.nbytes = nbytes;
+ vio.vc_server = this;
+#ifdef DEBUG
+ ink_assert(!c || c->mutex->thread_holding);
+#endif
+ if (c && !trigger && !recursive) {
+ trigger = c->mutex->thread_holding->schedule_imm_local(this);
+ }
+ return &vio;
+}
+
+VIO *
+CacheVC::do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *abuf, int64_t
offset)
+{
+ ink_assert(vio.op == VIO::READ);
+ vio.buffer.writer_for(abuf);
+ vio.set_continuation(c);
+ vio.ndone = 0;
+ vio.nbytes = nbytes;
+ vio.vc_server = this;
+ seek_to = offset;
+#ifdef DEBUG
+ ink_assert(c->mutex->thread_holding);
+#endif
+ if (!trigger && !recursive) {
+ trigger = c->mutex->thread_holding->schedule_imm_local(this);
+ }
+ return &vio;
+}
+
+VIO *
+CacheVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuf,
bool owner)
+{
+ ink_assert(vio.op == VIO::WRITE);
+ ink_assert(!owner);
+ vio.buffer.reader_for(abuf);
+ vio.set_continuation(c);
+ vio.ndone = 0;
+ vio.nbytes = nbytes;
+ vio.vc_server = this;
+#ifdef DEBUG
+ ink_assert(!c || c->mutex->thread_holding);
+#endif
+ if (c && !trigger && !recursive) {
+ trigger = c->mutex->thread_holding->schedule_imm_local(this);
+ }
+ return &vio;
+}
+
+void
+CacheVC::do_io_close(int alerrno)
+{
+ ink_assert(mutex->thread_holding == this_ethread());
+ int previous_closed = closed;
+ closed = (alerrno == -1) ? 1 : -1; // Stupid default arguments
+ DDbg(dbg_ctl_cache_close, "do_io_close %p %d %d", this, alerrno, closed);
+ if (!previous_closed && !recursive) {
+ die();
+ }
+}
+
+void
+CacheVC::reenable(VIO *avio)
+{
+ DDbg(dbg_ctl_cache_reenable, "reenable %p", this);
+ (void)avio;
+#ifdef DEBUG
+ ink_assert(avio->mutex->thread_holding);
+#endif
+ if (!trigger) {
+#ifndef USELESS_REENABLES
+ if (vio.op == VIO::READ) {
+ if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark)
+ ink_assert(!"useless reenable of cache read");
+ } else if (!vio.buffer.reader()->read_avail())
+ ink_assert(!"useless reenable of cache write");
+#endif
+ trigger = avio->mutex->thread_holding->schedule_imm_local(this);
+ }
+}
+
+void
+CacheVC::reenable_re(VIO *avio)
+{
+ DDbg(dbg_ctl_cache_reenable, "reenable_re %p", this);
+ (void)avio;
+#ifdef DEBUG
+ ink_assert(avio->mutex->thread_holding);
+#endif
+ if (!trigger) {
+ if (!is_io_in_progress() && !recursive) {
+ handleEvent(EVENT_NONE, (void *)nullptr);
+ } else {
+ trigger = avio->mutex->thread_holding->schedule_imm_local(this);
+ }
+ }
+}
+
+bool
+CacheVC::get_data(int i, void *data)
+{
+ switch (i) {
+ case CACHE_DATA_HTTP_INFO:
+ *(static_cast<CacheHTTPInfo **>(data)) = &alternate;
+ return true;
+ case CACHE_DATA_RAM_CACHE_HIT_FLAG:
+ *(static_cast<int *>(data)) = !f.not_from_ram_cache;
+ return true;
+ default:
+ break;
+ }
+ return false;
+}
+
+int64_t
+CacheVC::get_object_size()
+{
+ return (this)->doc_len;
+}
+
+bool
+CacheVC::set_data(int /* i ATS_UNUSED */, void * /* data */)
+{
+ ink_assert(!"CacheVC::set_data should not be called!");
+ return true;
+}
+
+void
+CacheVC::get_http_info(CacheHTTPInfo **ainfo)
+{
+ *ainfo = &(this)->alternate;
+}
+
+// set_http_info must be called before do_io_write
+// cluster vc does an optimization where it calls do_io_write() before
+// calling set_http_info(), but it guarantees that the info will
+// be set before transferring any bytes
+void
+CacheVC::set_http_info(CacheHTTPInfo *ainfo)
+{
+ ink_assert(!total_len);
+ if (f.update) {
+ ainfo->object_key_set(update_key);
+ ainfo->object_size_set(update_len);
+ } else {
+ ainfo->object_key_set(earliest_key);
+ // don't know the total len yet
+ }
+
+ MIMEField *field =
ainfo->m_alt->m_response_hdr.field_find(MIME_FIELD_CONTENT_LENGTH,
MIME_LEN_CONTENT_LENGTH);
+ if ((field && !field->value_get_int64()) ||
ainfo->m_alt->m_response_hdr.status_get() == HTTP_STATUS_NO_CONTENT) {
+ f.allow_empty_doc = 1;
+ // Set the object size here to zero in case this is a cache replace where
the new object
+ // length is zero but the old object was not.
+ ainfo->object_size_set(0);
+ } else {
+ f.allow_empty_doc = 0;
+ }
+
+ alternate.copy_shallow(ainfo);
+ ainfo->clear();
+}
+
+bool
+CacheVC::set_pin_in_cache(time_t time_pin)
+{
+ if (total_len) {
+ ink_assert(!"should Pin the document before writing");
+ return false;
+ }
+ if (vio.op != VIO::WRITE) {
+ ink_assert(!"Pinning only allowed while writing objects to the cache");
+ return false;
+ }
+ pin_in_cache = time_pin;
+ return true;
+}
+
+time_t
+CacheVC::get_pin_in_cache()
+{
+ return pin_in_cache;
+}
+
+int
+CacheVC::dead(int /* event ATS_UNUSED */, Event * /*e ATS_UNUSED */)
+{
+ ink_assert(0);
+ return EVENT_DONE;
+}
+
+bool
+CacheVC::is_pread_capable()
+{
+ return !f.read_from_writer_called;
+}
+
+static void
+unmarshal_helper(Doc *doc, Ptr<IOBufferData> &buf, int &okay)
+{
+ using UnmarshalFunc = int(char *buf, int len, RefCountObj
*block_ref);
+ UnmarshalFunc *unmarshal_func = &HTTPInfo::unmarshal;
+ ts::VersionNumber version(doc->v_major, doc->v_minor);
+
+ // introduced by https://github.com/apache/trafficserver/pull/4874, this is
used to distinguish the doc version
+ // before and after #4847
+ if (version < CACHE_DB_VERSION) {
+ unmarshal_func = &HTTPInfo::unmarshal_v24_1;
+ }
+
+ char *tmp = doc->hdr();
+ int len = doc->hlen;
+ while (len > 0) {
+ int r = unmarshal_func(tmp, len, buf.get());
+ if (r < 0) {
+ ink_assert(!"CacheVC::handleReadDone unmarshal failed");
+ okay = 0;
+ break;
+ }
+ len -= r;
+ tmp += r;
+ }
+}
+
+// [amc] I think this is where all disk reads from cache funnel through here.
+int
+CacheVC::handleReadDone(int event, Event *e)
+{
+ cancel_trigger();
+ ink_assert(this_ethread() == mutex->thread_holding);
+
+ Doc *doc = nullptr;
+ if (event == AIO_EVENT_DONE) {
+ set_io_not_in_progress();
+ } else if (is_io_in_progress()) {
+ return EVENT_CONT;
+ }
+ if (DISK_BAD(vol->disk)) {
+ io.aio_result = -1;
+ Warning("Canceling cache read: disk %s is bad.", vol->hash_text.get());
+ goto Ldone;
+ }
+ {
+ MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+ if (!lock.is_locked()) {
+ VC_SCHED_LOCK_RETRY();
+ }
+ if ((!dir_valid(vol, &dir)) || (!io.ok())) {
+ if (!io.ok()) {
+ Dbg(dbg_ctl_cache_disk_error, "Read error on disk %s\n \
+ read range : [%" PRIu64 " - %" PRIu64 " bytes] [%" PRIu64 " - %"
PRIu64 " blocks] \n",
+ vol->hash_text.get(), (uint64_t)io.aiocb.aio_offset,
(uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
+ (uint64_t)io.aiocb.aio_offset / 512,
(uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / 512);
+ }
+ goto Ldone;
+ }
+
+ doc = reinterpret_cast<Doc *>(buf->data());
+ ink_assert(vol->mutex->nthread_holding < 1000);
+ ink_assert(doc->magic == DOC_MAGIC);
+
+ if (ts::VersionNumber(doc->v_major, doc->v_minor) > CACHE_DB_VERSION) {
+ // future version, count as corrupted
+ doc->magic = DOC_CORRUPT;
+ Dbg(dbg_ctl_cache_bc, "Object is future version %d:%d - disk %s - doc id
= %" PRIx64 ":%" PRIx64 "", doc->v_major,
+ doc->v_minor, vol->hash_text.get(), read_key->slice64(0),
read_key->slice64(1));
+ goto Ldone;
+ }
+
+#ifdef VERIFY_JTEST_DATA
+ char xx[500];
+ if (read_key && *read_key == doc->key && request.valid() &&
!dir_head(&dir) && !vio.ndone) {
+ int ib = 0, xd = 0;
+ request.url_get()->print(xx, 500, &ib, &xd);
+ char *x = xx;
+ for (int q = 0; q < 3; q++)
+ x = strchr(x + 1, '/');
+ ink_assert(!memcmp(doc->data(), x, ib - (x - xx)));
+ }
+#endif
+
+ if (dbg_ctl_cache_read.on()) {
+ char xt[CRYPTO_HEX_SIZE];
+ Dbg(dbg_ctl_cache_read,
+ "Read complete on fragment %s. Length: data payload=%d this
fragment=%d total doc=%" PRId64 " prefix=%d",
+ doc->key.toHexStr(xt), doc->data_len(), doc->len, doc->total_len,
doc->prefix_len());
+ }
+
+ // put into ram cache?
+ if (io.ok() && ((doc->first_key == *read_key) || (doc->key == *read_key)
|| STORE_COLLISION) && doc->magic == DOC_MAGIC) {
+ int okay = 1;
+ if (!f.doc_from_ram_cache) {
+ f.not_from_ram_cache = 1;
+ }
+ if (cache_config_enable_checksum && doc->checksum != DOC_NO_CHECKSUM) {
+ // verify that the checksum matches
+ uint32_t checksum = 0;
+ for (char *b = doc->hdr(); b < reinterpret_cast<char *>(doc) +
doc->len; b++) {
+ checksum += *b;
+ }
+ ink_assert(checksum == doc->checksum);
+ if (checksum != doc->checksum) {
+ Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "] len %d,
hlen %d, disk %s, offset %" PRIu64 " size %zu",
+ doc->first_key.b[0], doc->first_key.b[1], doc->len, doc->hlen,
vol->path, (uint64_t)io.aiocb.aio_offset,
+ (size_t)io.aiocb.aio_nbytes);
+ doc->magic = DOC_CORRUPT;
+ okay = 0;
+ }
+ }
+ (void)e; // Avoid compiler warnings
+ bool http_copy_hdr = false;
+ http_copy_hdr =
+ cache_config_ram_cache_compress && !f.doc_from_ram_cache &&
doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen;
+ // If http doc we need to unmarshal the headers before putting in the
ram cache
+ // unless it could be compressed
+ if (!http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen
&& okay) {
+ unmarshal_helper(doc, buf, okay);
+ }
+ // Put the request in the ram cache only if its a open_read or lookup
+ if (vio.op == VIO::READ && okay) {
+ bool cutoff_check;
+ // cutoff_check :
+ // doc_len == 0 for the first fragment (it is set from the vector)
+ // The decision on the first fragment is based on
+ // doc->total_len
+ // After that, the decision is based of doc_len (doc_len != 0)
+ // (cache_config_ram_cache_cutoff == 0) : no cutoffs
+ cutoff_check =
+ ((!doc_len && static_cast<int64_t>(doc->total_len) <
cache_config_ram_cache_cutoff) ||
+ (doc_len && static_cast<int64_t>(doc_len) <
cache_config_ram_cache_cutoff) || !cache_config_ram_cache_cutoff);
+ if (cutoff_check && !f.doc_from_ram_cache) {
+ uint64_t o = dir_offset(&dir);
+ vol->ram_cache->put(read_key, buf.get(), doc->len, http_copy_hdr, o);
+ }
+ if (!doc_len) {
+ // keep a pointer to it. In case the state machine decides to
+ // update this document, we don't have to read it back in memory
+ // again
+ vol->first_fragment_key = *read_key;
+ vol->first_fragment_offset = dir_offset(&dir);
+ vol->first_fragment_data = buf;
+ }
+ } // end VIO::READ check
+ // If it could be compressed, unmarshal after
+ if (http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen
&& okay) {
+ unmarshal_helper(doc, buf, okay);
+ }
+ } // end io.ok() check
+ }
+Ldone:
+ POP_HANDLER;
+ return handleEvent(AIO_EVENT_DONE, nullptr);
+}
+
+int
+CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
+
+{
+ cancel_trigger();
+
+ f.doc_from_ram_cache = false;
+
+ // check ram cache
+ ink_assert(vol->mutex->thread_holding == this_ethread());
+ int64_t o = dir_offset(&dir);
+ int ram_hit_state = vol->ram_cache->get(read_key, &buf,
static_cast<uint64_t>(o));
+ f.compressed_in_ram = (ram_hit_state > RAM_HIT_COMPRESS_NONE) ? 1 : 0;
+ if (ram_hit_state >= RAM_HIT_COMPRESS_NONE) {
+ goto LramHit;
+ }
+
+ // check if it was read in the last open_read call
+ if (*read_key == vol->first_fragment_key && dir_offset(&dir) ==
vol->first_fragment_offset) {
+ buf = vol->first_fragment_data;
+ goto LmemHit;
+ }
+ // see if its in the aggregation buffer
+ if (dir_agg_buf_valid(vol, &dir)) {
+ int agg_offset = vol->vol_offset(&dir) - vol->header->write_pos;
+ buf =
new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes,
MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
+ ink_assert((agg_offset + io.aiocb.aio_nbytes) <=
(unsigned)vol->agg_buf_pos);
+ char *doc = buf->data();
+ char *agg = vol->agg_buffer + agg_offset;
+ memcpy(doc, agg, io.aiocb.aio_nbytes);
+ io.aio_result = io.aiocb.aio_nbytes;
+ SET_HANDLER(&CacheVC::handleReadDone);
+ return EVENT_RETURN;
+ }
+
+ io.aiocb.aio_fildes = vol->fd;
+ io.aiocb.aio_offset = vol->vol_offset(&dir);
+ if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) >
static_cast<off_t>(vol->skip + vol->len)) {
+ io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset;
+ }
+ buf =
new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes,
MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
+ io.aiocb.aio_buf = buf->data();
+ io.action = this;
+ io.thread = mutex->thread_holding->tt == DEDICATED ?
AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
+ SET_HANDLER(&CacheVC::handleReadDone);
+ ink_assert(ink_aio_read(&io) >= 0);
+
+// ToDo: Why are these for debug only ??
+#if DEBUG
+ Metrics::increment(cache_rsb.pread_count);
+ Metrics::increment(vol->cache_vol->vol_rsb.pread_count);
+#endif
+
+ return EVENT_CONT;
+
+LramHit: {
+ f.doc_from_ram_cache = true;
+ io.aio_result = io.aiocb.aio_nbytes;
+ Doc *doc = reinterpret_cast<Doc *>(buf->data());
+ if (cache_config_ram_cache_compress && doc->doc_type == CACHE_FRAG_TYPE_HTTP
&& doc->hlen) {
+ SET_HANDLER(&CacheVC::handleReadDone);
+ return EVENT_RETURN;
+ }
+}
+LmemHit:
+ f.doc_from_ram_cache = true;
+ io.aio_result = io.aiocb.aio_nbytes;
+ POP_HANDLER;
+ return EVENT_RETURN; // allow the caller to release the volume lock
+}
+
+int
+CacheVC::removeEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
+{
+ cancel_trigger();
+ set_io_not_in_progress();
+ {
+ MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+ if (!lock.is_locked()) {
+ VC_SCHED_LOCK_RETRY();
+ }
+ if (_action.cancelled) {
+ if (od) {
+ vol->close_write(this);
+ od = nullptr;
+ }
+ goto Lfree;
+ }
+ if (!f.remove_aborted_writers) {
+ if (vol->open_write(this, true, 1)) {
+ // writer exists
+ od = vol->open_read(&key);
+ ink_release_assert(od);
+ od->dont_update_directory = true;
+ od = nullptr;
+ } else {
+ od->dont_update_directory = true;
+ }
+ f.remove_aborted_writers = 1;
+ }
+ Lread:
+ SET_HANDLER(&CacheVC::removeEvent);
+ if (!buf) {
+ goto Lcollision;
+ }
+ if (!dir_valid(vol, &dir)) {
+ last_collision = nullptr;
+ goto Lcollision;
+ }
+ // check read completed correct FIXME: remove bad vols
+ if (!io.ok()) {
+ goto Ldone;
+ }
+ {
+ // verify that this is our document
+ Doc *doc = reinterpret_cast<Doc *>(buf->data());
+ /* should be first_key not key..right?? */
+ if (doc->first_key == key) {
+ ink_assert(doc->magic == DOC_MAGIC);
+ if (dir_delete(&key, vol, &dir) > 0) {
+ if (od) {
+ vol->close_write(this);
+ }
+ od = nullptr;
+ goto Lremoved;
+ }
+ goto Ldone;
+ }
+ }
+ Lcollision:
+ // check for collision
+ if (dir_probe(&key, vol, &dir, &last_collision) > 0) {
+ int ret = do_read_call(&key);
+ if (ret == EVENT_RETURN) {
+ goto Lread;
+ }
+ return ret;
+ }
+ Ldone:
+
Metrics::increment(cache_rsb.status[static_cast<int>(CacheOpType::Remove)].failure);
+
Metrics::increment(vol->cache_vol->vol_rsb.status[static_cast<int>(CacheOpType::Remove)].failure);
+ if (od) {
+ vol->close_write(this);
+ }
+ }
+ ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
+ _action.continuation->handleEvent(CACHE_EVENT_REMOVE_FAILED, (void
*)-ECACHE_NO_DOC);
+ goto Lfree;
+Lremoved:
+ _action.continuation->handleEvent(CACHE_EVENT_REMOVE, nullptr);
+Lfree:
+ return free_CacheVC(this);
+}
diff --git a/iocore/cache/Makefile.am b/iocore/cache/Makefile.am
index 79677ce36f..e62303bc3e 100644
--- a/iocore/cache/Makefile.am
+++ b/iocore/cache/Makefile.am
@@ -39,6 +39,7 @@ libinkcache_a_SOURCES = \
CachePages.cc \
CachePagesInternal.cc \
CacheRead.cc \
+ CacheVC.cc \
CacheVol.cc \
CacheWrite.cc \
I_Cache.h \
diff --git a/iocore/cache/P_CacheDisk.h b/iocore/cache/P_CacheDisk.h
index 30bea53adc..4113d0a148 100644
--- a/iocore/cache/P_CacheDisk.h
+++ b/iocore/cache/P_CacheDisk.h
@@ -25,6 +25,8 @@
#include "I_Cache.h"
+#include "P_AIO.h"
+
extern int cache_config_max_disk_errors;
#define DISK_BAD(_x) ((_x)->num_errors >=
cache_config_max_disk_errors)