This is an automated email from the ASF dual-hosted git repository.
bneradt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new fdf2aa0b89 Move CacheVC struct and methods from Vol (#10601)
fdf2aa0b89 is described below
commit fdf2aa0b8955d2996c531e7825f506807d95bace
Author: JosiahWI <[email protected]>
AuthorDate: Fri Oct 13 11:18:58 2023 -0500
Move CacheVC struct and methods from Vol (#10601)
This is part of the follow-up requested by @masaori335 in #10599. The
remaining work is to deal with the methods defined in CacheRead.cc and
CacheWrite.cc.
---
iocore/cache/CacheVC.cc | 621 ++++++++++++++++++++++++++++++++++++-----
iocore/cache/CacheVC.h | 337 ++++++++++++++++++++++
iocore/cache/CacheVol.cc | 474 +------------------------------
iocore/cache/P_CacheDir.h | 3 +
iocore/cache/P_CacheInternal.h | 289 +------------------
5 files changed, 894 insertions(+), 830 deletions(-)
diff --git a/iocore/cache/CacheVC.cc b/iocore/cache/CacheVC.cc
index ef8d5f6609..4852d268dd 100644
--- a/iocore/cache/CacheVC.cc
+++ b/iocore/cache/CacheVC.cc
@@ -21,12 +21,16 @@
limitations under the License.
*/
+#include "I_Cache.h"
#include "I_CacheDefs.h"
#include "P_CacheDisk.h"
#include "P_CacheHttp.h"
#include "P_CacheInternal.h"
#include "P_CacheVol.h"
+// must be included after the others
+#include "CacheVC.h"
+
// hdrs
#include "HTTP.h"
#include "MIME.h"
@@ -53,6 +57,7 @@
// tscore
#include "tscore/I_Version.h"
#include "tscore/ink_assert.h"
+#include "tscore/ink_hrtime.h"
#include "tscore/Ptr.h"
// ts
@@ -63,21 +68,96 @@
#include <cstring>
#include <ctime>
+namespace
+{
DbgCtl dbg_ctl_cache_bc{"cache_bc"};
DbgCtl dbg_ctl_cache_disk_error{"cache_disk_error"};
DbgCtl dbg_ctl_cache_read{"cache_read"};
+DbgCtl dbg_ctl_cache_scan{"cache_scan"};
+DbgCtl dbg_ctl_cache_scan_truss{"cache_scan_truss"};
#ifdef DEBUG
DbgCtl dbg_ctl_cache_close{"cache_close"};
DbgCtl dbg_ctl_cache_reenable{"cache_reenable"};
#endif
+} // end anonymous namespace
// Compilation Options
-#define STORE_COLLISION 1
-#define USELESS_REENABLES // allow them for now
+#define SCAN_BUF_SIZE RECOVERY_SIZE
+#define SCAN_WRITER_LOCK_MAX_RETRY 5
+#define STORE_COLLISION 1
+#define USELESS_REENABLES // allow them for now
// #define VERIFY_JTEST_DATA
extern int cache_config_ram_cache_cutoff;
+/* Next block with some data in it in this partition. Returns end of
partition if no more
+ * locations.
+ *
+ * d - Vol
+ * vol_map - precalculated map
+ * offset - offset to start looking at (and data at this location has not been
read yet). */
+static off_t
+next_in_map(Vol *vol, char *vol_map, off_t offset)
+{
+ off_t start_offset = vol->vol_offset_to_offset(0);
+ off_t new_off = (offset - start_offset);
+ off_t vol_len = vol->vol_relative_length(start_offset);
+
+ while (new_off < vol_len && !vol_map[new_off / SCAN_BUF_SIZE]) {
+ new_off += SCAN_BUF_SIZE;
+ }
+ if (new_off >= vol_len) {
+ return vol_len + start_offset;
+ }
+ return new_off + start_offset;
+}
+
+// Function in CacheDir.cc that we need for make_vol_map().
+int dir_bucket_loop_fix(Dir *start_dir, int s, Vol *vol);
+
+// TODO: If we used a bit vector, we could make a smaller map structure.
+// TODO: If we saved a high water mark we could have a smaller buf, and avoid
searching it
+// when we are asked about the highest interesting offset.
+/* Make map of what blocks in partition are used.
+ *
+ * d - Vol to make a map of. */
+static char *
+make_vol_map(Vol *vol)
+{
+ // Map will be one byte for each SCAN_BUF_SIZE bytes.
+ off_t start_offset = vol->vol_offset_to_offset(0);
+ off_t vol_len = vol->vol_relative_length(start_offset);
+ size_t map_len = (vol_len + (SCAN_BUF_SIZE - 1)) / SCAN_BUF_SIZE;
+ char *vol_map = static_cast<char *>(ats_malloc(map_len));
+
+ memset(vol_map, 0, map_len);
+
+ // Scan directories.
+ // Copied from dir_entries_used() and modified to fill in the map instead.
+ for (int s = 0; s < vol->segments; s++) {
+ Dir *seg = vol->dir_segment(s);
+ for (int b = 0; b < vol->buckets; b++) {
+ Dir *e = dir_bucket(b, seg);
+ if (dir_bucket_loop_fix(e, s, vol)) {
+ break;
+ }
+ while (e) {
+ if (dir_offset(e) && dir_valid(vol, e) && dir_agg_valid(vol, e) &&
dir_head(e)) {
+ off_t offset = vol->vol_offset(e) - start_offset;
+ if (offset <= vol_len) {
+ vol_map[offset / SCAN_BUF_SIZE] = 1;
+ }
+ }
+ e = next_dir(e, seg);
+ if (!e) {
+ break;
+ }
+ }
+ }
+ }
+ return vol_map;
+}
+
int CacheVC::size_to_init = -1;
CacheVC::CacheVC()
@@ -86,13 +166,6 @@ CacheVC::CacheVC()
memset((void *)&vio, 0, size_to_init);
}
-HTTPInfo::FragOffset *
-CacheVC::get_frag_table()
-{
- ink_assert(alternate.valid());
- return alternate.valid() ? alternate.get_frag_table() : nullptr;
-}
-
VIO *
CacheVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *abuf)
{
@@ -227,63 +300,6 @@ CacheVC::set_data(int /* i ATS_UNUSED */, void * /* data
*/)
return true;
}
-void
-CacheVC::get_http_info(CacheHTTPInfo **ainfo)
-{
- *ainfo = &(this)->alternate;
-}
-
-// set_http_info must be called before do_io_write
-// cluster vc does an optimization where it calls do_io_write() before
-// calling set_http_info(), but it guarantees that the info will
-// be set before transferring any bytes
-void
-CacheVC::set_http_info(CacheHTTPInfo *ainfo)
-{
- ink_assert(!total_len);
- if (f.update) {
- ainfo->object_key_set(update_key);
- ainfo->object_size_set(update_len);
- } else {
- ainfo->object_key_set(earliest_key);
- // don't know the total len yet
- }
-
- MIMEField *field =
ainfo->m_alt->m_response_hdr.field_find(MIME_FIELD_CONTENT_LENGTH,
MIME_LEN_CONTENT_LENGTH);
- if ((field && !field->value_get_int64()) ||
ainfo->m_alt->m_response_hdr.status_get() == HTTP_STATUS_NO_CONTENT) {
- f.allow_empty_doc = 1;
- // Set the object size here to zero in case this is a cache replace where
the new object
- // length is zero but the old object was not.
- ainfo->object_size_set(0);
- } else {
- f.allow_empty_doc = 0;
- }
-
- alternate.copy_shallow(ainfo);
- ainfo->clear();
-}
-
-bool
-CacheVC::set_pin_in_cache(time_t time_pin)
-{
- if (total_len) {
- ink_assert(!"should Pin the document before writing");
- return false;
- }
- if (vio.op != VIO::WRITE) {
- ink_assert(!"Pinning only allowed while writing objects to the cache");
- return false;
- }
- pin_in_cache = time_pin;
- return true;
-}
-
-time_t
-CacheVC::get_pin_in_cache()
-{
- return pin_in_cache;
-}
-
int
CacheVC::dead(int /* event ATS_UNUSED */, Event * /*e ATS_UNUSED */)
{
@@ -291,12 +307,6 @@ CacheVC::dead(int /* event ATS_UNUSED */, Event * /*e
ATS_UNUSED */)
return EVENT_DONE;
}
-bool
-CacheVC::is_pread_capable()
-{
- return !f.read_from_writer_called;
-}
-
static void
unmarshal_helper(Doc *doc, Ptr<IOBufferData> &buf, int &okay)
{
@@ -607,3 +617,470 @@ Lremoved:
Lfree:
return free_CacheVC(this);
}
+
+int
+CacheVC::scanVol(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
+{
+ Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanVol", this);
+ if (_action.cancelled) {
+ return free_CacheVC(this);
+ }
+
+ ReplaceablePtr<CacheHostTable>::ScopedReader hosttable(&theCache->hosttable);
+
+ const CacheHostRecord *rec = &hosttable->gen_host_rec;
+ if (host_len) {
+ CacheHostResult res;
+ hosttable->Match(hostname, host_len, &res);
+ if (res.record) {
+ rec = res.record;
+ }
+ }
+
+ if (!vol) {
+ if (!rec->num_vols) {
+ goto Ldone;
+ }
+ vol = rec->vols[0];
+ } else {
+ for (int i = 0; i < rec->num_vols - 1; i++) {
+ if (vol == rec->vols[i]) {
+ vol = rec->vols[i + 1];
+ goto Lcont;
+ }
+ }
+ goto Ldone;
+ }
+Lcont:
+ fragment = 0;
+ SET_HANDLER(&CacheVC::scanObject);
+ eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay));
+ return EVENT_CONT;
+Ldone:
+ _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, nullptr);
+ return free_CacheVC(this);
+}
+
+int
+CacheVC::scanObject(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
+{
+ Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanObject", this);
+
+ Doc *doc = nullptr;
+ void *result = nullptr;
+ int hlen = 0;
+ char hname[500];
+ bool hostinfo_copied = false;
+ off_t next_object_len = 0;
+ bool might_need_overlap_read = false;
+
+ cancel_trigger();
+ set_io_not_in_progress();
+ if (_action.cancelled) {
+ return free_CacheVC(this);
+ }
+
+ CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+ if (!lock.is_locked()) {
+ Dbg(dbg_ctl_cache_scan_truss, "delay %p:scanObject", this);
+ mutex->thread_holding->schedule_in_local(this,
HRTIME_MSECONDS(cache_config_mutex_retry_delay));
+ return EVENT_CONT;
+ }
+
+ if (!fragment) { // initialize for first read
+ fragment = 1;
+ scan_vol_map = make_vol_map(vol);
+ io.aiocb.aio_offset = next_in_map(vol, scan_vol_map,
vol->vol_offset_to_offset(0));
+ if (io.aiocb.aio_offset >= static_cast<off_t>(vol->skip + vol->len)) {
+ goto Lnext_vol;
+ }
+ io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
+ io.aiocb.aio_buf = buf->data();
+ io.action = this;
+ io.thread = AIO_CALLBACK_THREAD_ANY;
+ Dbg(dbg_ctl_cache_scan_truss, "read %p:scanObject", this);
+ goto Lread;
+ }
+
+ if (!io.ok()) {
+ result = (void *)-ECACHE_READ_FAIL;
+ goto Ldone;
+ }
+
+ doc = reinterpret_cast<Doc *>(buf->data() + offset);
+ // If there is data in the buffer before the start that is from a partial
object read previously
+ // Fix things as if we read it this time.
+ if (scan_fix_buffer_offset) {
+ io.aio_result += scan_fix_buffer_offset;
+ io.aiocb.aio_nbytes += scan_fix_buffer_offset;
+ io.aiocb.aio_offset -= scan_fix_buffer_offset;
+ io.aiocb.aio_buf = static_cast<char *>(io.aiocb.aio_buf) -
scan_fix_buffer_offset;
+ scan_fix_buffer_offset = 0;
+ }
+ while (static_cast<off_t>(reinterpret_cast<char *>(doc) - buf->data()) +
next_object_len <
+ static_cast<off_t>(io.aiocb.aio_nbytes)) {
+ might_need_overlap_read = false;
+ doc = reinterpret_cast<Doc *>(reinterpret_cast<char
*>(doc) + next_object_len);
+ next_object_len = vol->round_to_approx_size(doc->len);
+ int i;
+ bool changed;
+
+ if (doc->magic != DOC_MAGIC) {
+ next_object_len = CACHE_BLOCK_SIZE;
+ Dbg(dbg_ctl_cache_scan_truss, "blockskip %p:scanObject", this);
+ continue;
+ }
+
+ if (doc->doc_type != CACHE_FRAG_TYPE_HTTP || !doc->hlen) {
+ goto Lskip;
+ }
+
+ last_collision = nullptr;
+ while (true) {
+ if (!dir_probe(&doc->first_key, vol, &dir, &last_collision)) {
+ goto Lskip;
+ }
+ if (!dir_agg_valid(vol, &dir) || !dir_head(&dir) ||
+ (vol->vol_offset(&dir) != io.aiocb.aio_offset +
(reinterpret_cast<char *>(doc) - buf->data()))) {
+ continue;
+ }
+ break;
+ }
+ if (doc->data() - buf->data() > static_cast<int>(io.aiocb.aio_nbytes)) {
+ might_need_overlap_read = true;
+ goto Lskip;
+ }
+ {
+ char *tmp = doc->hdr();
+ int len = doc->hlen;
+ while (len > 0) {
+ int r = HTTPInfo::unmarshal(tmp, len, buf.get());
+ if (r < 0) {
+ ink_assert(!"CacheVC::scanObject unmarshal failed");
+ goto Lskip;
+ }
+ len -= r;
+ tmp += r;
+ }
+ }
+ if (this->load_http_info(&vector, doc) != doc->hlen) {
+ goto Lskip;
+ }
+ changed = false;
+ hostinfo_copied = false;
+ for (i = 0; i < vector.count(); i++) {
+ if (!vector.get(i)->valid()) {
+ goto Lskip;
+ }
+ if (!hostinfo_copied) {
+ memccpy(hname, vector.get(i)->request_get()->host_get(&hlen), 0, 500);
+ hname[hlen] = 0;
+ Dbg(dbg_ctl_cache_scan, "hostname = '%s', hostlen = %d", hname, hlen);
+ hostinfo_copied = true;
+ }
+ vector.get(i)->object_key_get(&key);
+ alternate_index = i;
+ // verify that the earliest block exists, reducing 'false hit' callbacks
+ if (!(key == doc->key)) {
+ last_collision = nullptr;
+ if (!dir_probe(&key, vol, &earliest_dir, &last_collision)) {
+ continue;
+ }
+ }
+ earliest_key = key;
+ int result1 =
_action.continuation->handleEvent(CACHE_EVENT_SCAN_OBJECT, vector.get(i));
+ switch (result1) {
+ case CACHE_SCAN_RESULT_CONTINUE:
+ continue;
+ case CACHE_SCAN_RESULT_DELETE:
+ changed = true;
+ vector.remove(i, true);
+ i--;
+ continue;
+ case CACHE_SCAN_RESULT_DELETE_ALL_ALTERNATES:
+ changed = true;
+ vector.clear();
+ i = 0;
+ break;
+ case CACHE_SCAN_RESULT_UPDATE:
+ ink_assert(alternate_index >= 0);
+ vector.insert(&alternate, alternate_index);
+ if (!vector.get(alternate_index)->valid()) {
+ continue;
+ }
+ changed = true;
+ continue;
+ case EVENT_DONE:
+ goto Lcancel;
+ default:
+ ink_assert(!"unexpected CACHE_SCAN_RESULT");
+ continue;
+ }
+ }
+ if (changed) {
+ if (!vector.count()) {
+ ink_assert(hostinfo_copied);
+ SET_HANDLER(&CacheVC::scanRemoveDone);
+ // force remove even if there is a writer
+ cacheProcessor.remove(this, &doc->first_key, CACHE_FRAG_TYPE_HTTP,
hname, hlen);
+ return EVENT_CONT;
+ } else {
+ offset = reinterpret_cast<char *>(doc) - buf->data();
+ write_len = 0;
+ frag_type = CACHE_FRAG_TYPE_HTTP;
+ f.use_first_key = 1;
+ f.evac_vector = 1;
+ alternate_index = CACHE_ALT_REMOVED;
+ writer_lock_retry = 0;
+
+ first_key = key = doc->first_key;
+ earliest_key.clear();
+
+ SET_HANDLER(&CacheVC::scanOpenWrite);
+ return scanOpenWrite(EVENT_NONE, nullptr);
+ }
+ }
+ continue;
+ Lskip:;
+ }
+ vector.clear();
+ // If we had an object that went past the end of the buffer, and it is small
enough to fix,
+ // fix it.
+ if (might_need_overlap_read &&
+ (static_cast<off_t>(reinterpret_cast<char *>(doc) - buf->data()) +
next_object_len >
+ static_cast<off_t>(io.aiocb.aio_nbytes)) &&
+ next_object_len > 0) {
+ off_t partial_object_len = io.aiocb.aio_nbytes - (reinterpret_cast<char
*>(doc) - buf->data());
+ // Copy partial object to beginning of the buffer.
+ memmove(buf->data(), reinterpret_cast<char *>(doc), partial_object_len);
+ io.aiocb.aio_offset += io.aiocb.aio_nbytes;
+ io.aiocb.aio_nbytes = SCAN_BUF_SIZE - partial_object_len;
+ io.aiocb.aio_buf = buf->data() + partial_object_len;
+ scan_fix_buffer_offset = partial_object_len;
+ } else { // Normal case, where we ended on a object boundary.
+ io.aiocb.aio_offset += (reinterpret_cast<char *>(doc) - buf->data()) +
next_object_len;
+ Dbg(dbg_ctl_cache_scan_truss, "next %p:scanObject %" PRId64, this,
(int64_t)io.aiocb.aio_offset);
+ io.aiocb.aio_offset = next_in_map(vol, scan_vol_map, io.aiocb.aio_offset);
+ Dbg(dbg_ctl_cache_scan_truss, "next_in_map %p:scanObject %" PRId64, this,
(int64_t)io.aiocb.aio_offset);
+ io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
+ io.aiocb.aio_buf = buf->data();
+ scan_fix_buffer_offset = 0;
+ }
+
+ if (io.aiocb.aio_offset >= vol->skip + vol->len) {
+ Lnext_vol:
+ SET_HANDLER(&CacheVC::scanVol);
+ eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay));
+ return EVENT_CONT;
+ }
+
+Lread:
+ io.aiocb.aio_fildes = vol->fd;
+ if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) >
static_cast<off_t>(vol->skip + vol->len)) {
+ io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset;
+ }
+ offset = 0;
+ ink_assert(ink_aio_read(&io) >= 0);
+ Dbg(dbg_ctl_cache_scan_truss, "read %p:scanObject %" PRId64 " %zu", this,
(int64_t)io.aiocb.aio_offset,
+ (size_t)io.aiocb.aio_nbytes);
+ return EVENT_CONT;
+
+Ldone:
+ Dbg(dbg_ctl_cache_scan_truss, "done %p:scanObject", this);
+ _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, result);
+Lcancel:
+ return free_CacheVC(this);
+}
+
+int
+CacheVC::scanRemoveDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
+{
+ Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanRemoveDone", this);
+ Dbg(dbg_ctl_cache_scan, "remove done.");
+ alternate.destroy();
+ SET_HANDLER(&CacheVC::scanObject);
+ return handleEvent(EVENT_IMMEDIATE, nullptr);
+}
+
+int
+CacheVC::scanOpenWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
+{
+ Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanOpenWrite", this);
+ cancel_trigger();
+ // get volume lock
+ if (writer_lock_retry > SCAN_WRITER_LOCK_MAX_RETRY) {
+ int r =
_action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_BLOCKED, nullptr);
+ Dbg(dbg_ctl_cache_scan, "still haven't got the writer lock, asking
user..");
+ switch (r) {
+ case CACHE_SCAN_RESULT_RETRY:
+ writer_lock_retry = 0;
+ break;
+ case CACHE_SCAN_RESULT_CONTINUE:
+ SET_HANDLER(&CacheVC::scanObject);
+ return scanObject(EVENT_IMMEDIATE, nullptr);
+ }
+ }
+ int ret = 0;
+ {
+ CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+ if (!lock.is_locked()) {
+ Dbg(dbg_ctl_cache_scan, "vol->mutex %p:scanOpenWrite", this);
+ VC_SCHED_LOCK_RETRY();
+ }
+
+ Dbg(dbg_ctl_cache_scan, "trying for writer lock");
+ if (vol->open_write(this, false, 1)) {
+ writer_lock_retry++;
+ SET_HANDLER(&CacheVC::scanOpenWrite);
+ mutex->thread_holding->schedule_in_local(this, scan_msec_delay);
+ return EVENT_CONT;
+ }
+
+ ink_assert(this->od);
+ // put all the alternates in the open directory vector
+ int alt_count = vector.count();
+ for (int i = 0; i < alt_count; i++) {
+ write_vector->insert(vector.get(i));
+ }
+ od->writing_vec = true;
+ vector.clear(false);
+ // check that the directory entry was not overwritten
+ // if so return failure
+ Dbg(dbg_ctl_cache_scan, "got writer lock");
+ Dir *l = nullptr;
+ Dir d;
+ Doc *doc = reinterpret_cast<Doc *>(buf->data() + offset);
+ offset = reinterpret_cast<char *>(doc) - buf->data() +
vol->round_to_approx_size(doc->len);
+ // if the doc contains some data, then we need to create
+ // a new directory entry for this fragment. Remember the
+ // offset and the key in earliest_key
+ dir_assign(&od->first_dir, &dir);
+ if (doc->total_len) {
+ dir_assign(&od->single_doc_dir, &dir);
+ dir_set_tag(&od->single_doc_dir, doc->key.slice32(2));
+ od->single_doc_key = doc->key;
+ od->move_resident_alt = true;
+ }
+
+ while (true) {
+ if (!dir_probe(&first_key, vol, &d, &l)) {
+ vol->close_write(this);
+ _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_FAILED,
nullptr);
+ SET_HANDLER(&CacheVC::scanObject);
+ return handleEvent(EVENT_IMMEDIATE, nullptr);
+ }
+ if (memcmp(&dir, &d, SIZEOF_DIR)) {
+ Dbg(dbg_ctl_cache_scan, "dir entry has changed");
+ continue;
+ }
+ break;
+ }
+
+ // the document was not modified
+ // we are safe from now on as we hold the
+ // writer lock on the doc
+ if (f.evac_vector) {
+ header_len = write_vector->marshal_length();
+ }
+ SET_HANDLER(&CacheVC::scanUpdateDone);
+ ret = do_write_call();
+ }
+ if (ret == EVENT_RETURN) {
+ return handleEvent(AIO_EVENT_DONE, nullptr);
+ }
+ return ret;
+}
+
+int
+CacheVC::scanUpdateDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
+{
+ Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanUpdateDone", this);
+ cancel_trigger();
+ // get volume lock
+ CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+ if (lock.is_locked()) {
+ // insert a directory entry for the previous fragment
+ dir_overwrite(&first_key, vol, &dir, &od->first_dir, false);
+ if (od->move_resident_alt) {
+ dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
+ }
+ ink_assert(vol->open_read(&first_key));
+ ink_assert(this->od);
+ vol->close_write(this);
+ SET_HANDLER(&CacheVC::scanObject);
+ return handleEvent(EVENT_IMMEDIATE, nullptr);
+ } else {
+ mutex->thread_holding->schedule_in_local(this,
HRTIME_MSECONDS(cache_config_mutex_retry_delay));
+ return EVENT_CONT;
+ }
+}
+
+// set_http_info must be called before do_io_write
+// cluster vc does an optimization where it calls do_io_write() before
+// calling set_http_info(), but it guarantees that the info will
+// be set before transferring any bytes
+void
+CacheVC::set_http_info(CacheHTTPInfo *ainfo)
+{
+ ink_assert(!total_len);
+ if (f.update) {
+ ainfo->object_key_set(update_key);
+ ainfo->object_size_set(update_len);
+ } else {
+ ainfo->object_key_set(earliest_key);
+ // don't know the total len yet
+ }
+
+ MIMEField *field =
ainfo->m_alt->m_response_hdr.field_find(MIME_FIELD_CONTENT_LENGTH,
MIME_LEN_CONTENT_LENGTH);
+ if ((field && !field->value_get_int64()) ||
ainfo->m_alt->m_response_hdr.status_get() == HTTP_STATUS_NO_CONTENT) {
+ f.allow_empty_doc = 1;
+ // Set the object size here to zero in case this is a cache replace where
the new object
+ // length is zero but the old object was not.
+ ainfo->object_size_set(0);
+ } else {
+ f.allow_empty_doc = 0;
+ }
+
+ alternate.copy_shallow(ainfo);
+ ainfo->clear();
+}
+
+void
+CacheVC::get_http_info(CacheHTTPInfo **ainfo)
+{
+ *ainfo = &(this)->alternate;
+}
+
+HTTPInfo::FragOffset *
+CacheVC::get_frag_table()
+{
+ ink_assert(alternate.valid());
+ return alternate.valid() ? alternate.get_frag_table() : nullptr;
+}
+
+bool
+CacheVC::is_pread_capable()
+{
+ return !f.read_from_writer_called;
+}
+
+bool
+CacheVC::set_pin_in_cache(time_t time_pin)
+{
+ if (total_len) {
+ ink_assert(!"should Pin the document before writing");
+ return false;
+ }
+ if (vio.op != VIO::WRITE) {
+ ink_assert(!"Pinning only allowed while writing objects to the cache");
+ return false;
+ }
+ pin_in_cache = time_pin;
+ return true;
+}
+
+time_t
+CacheVC::get_pin_in_cache()
+{
+ return pin_in_cache;
+}
diff --git a/iocore/cache/CacheVC.h b/iocore/cache/CacheVC.h
new file mode 100644
index 0000000000..e345c99c07
--- /dev/null
+++ b/iocore/cache/CacheVC.h
@@ -0,0 +1,337 @@
+/** @file
+
+ A brief file description
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#pragma once
+
+/*
+// inkcache
+#include "I_Cache.h"
+#include "P_CacheDir.h"
+#include "P_CacheVol.h"
+*/
+#include "P_CacheHttp.h"
+
+// aio
+#include "I_AIO.h"
+
+// inkevent
+#include "I_Action.h"
+#include "I_Continuation.h"
+#include "I_Event.h"
+#include "I_IOBuffer.h"
+#include "I_VIO.h"
+
+// tscore
+#include "tscore/ink_hrtime.h"
+#include "tscore/List.h"
+#include "tscore/Ptr.h"
+
+#include <cstdint>
+
+struct Vol;
+
+struct CacheVC : public CacheVConnection {
+ CacheVC();
+
+ VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) override;
+ VIO *do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *buf, int64_t
offset) override;
+ VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool
owner = false) override;
+ void do_io_close(int lerrno = -1) override;
+ void reenable(VIO *avio) override;
+ void reenable_re(VIO *avio) override;
+ bool get_data(int i, void *data) override;
+ bool set_data(int i, void *data) override;
+
+ bool
+ is_ram_cache_hit() const override
+ {
+ ink_assert(vio.op == VIO::READ);
+ return !f.not_from_ram_cache;
+ }
+
+ int
+ get_header(void **ptr, int *len) override
+ {
+ if (first_buf) {
+ Doc *doc = (Doc *)first_buf->data();
+ *ptr = doc->hdr();
+ *len = doc->hlen;
+ return 0;
+ }
+
+ return -1;
+ }
+
+ int
+ set_header(void *ptr, int len) override
+ {
+ header_to_write = ptr;
+ header_to_write_len = len;
+ return 0;
+ }
+
+ int
+ get_single_data(void **ptr, int *len) override
+ {
+ if (first_buf) {
+ Doc *doc = (Doc *)first_buf->data();
+ if (doc->data_len() == doc->total_len) {
+ *ptr = doc->data();
+ *len = doc->data_len();
+ return 0;
+ }
+ }
+
+ return -1;
+ }
+
+ int
+ get_volume_number() const override
+ {
+ if (vol && vol->cache_vol) {
+ return vol->cache_vol->vol_number;
+ }
+
+ return -1;
+ }
+
+ const char *
+ get_disk_path() const override
+ {
+ if (vol && vol->disk) {
+ return vol->disk->path;
+ }
+
+ return nullptr;
+ }
+
+ bool
+ is_compressed_in_ram() const override
+ {
+ ink_assert(vio.op == VIO::READ);
+ return f.compressed_in_ram;
+ }
+
+ bool writer_done();
+ int calluser(int event);
+ int callcont(int event);
+ int die();
+ int dead(int event, Event *e);
+
+ int handleReadDone(int event, Event *e);
+ int handleRead(int event, Event *e);
+ int do_read_call(CacheKey *akey);
+ int handleWrite(int event, Event *e);
+ int handleWriteLock(int event, Event *e);
+ int do_write_call();
+ int do_write_lock();
+ int do_write_lock_call();
+ int do_sync(uint32_t target_write_serial);
+
+ int openReadClose(int event, Event *e);
+ int openReadReadDone(int event, Event *e);
+ int openReadMain(int event, Event *e);
+ int openReadStartEarliest(int event, Event *e);
+ int openReadVecWrite(int event, Event *e);
+ int openReadStartHead(int event, Event *e);
+ int openReadFromWriter(int event, Event *e);
+ int openReadFromWriterMain(int event, Event *e);
+ int openReadFromWriterFailure(int event, Event *);
+ int openReadChooseWriter(int event, Event *e);
+ int openReadDirDelete(int event, Event *e);
+
+ int openWriteCloseDir(int event, Event *e);
+ int openWriteCloseHeadDone(int event, Event *e);
+ int openWriteCloseHead(int event, Event *e);
+ int openWriteCloseDataDone(int event, Event *e);
+ int openWriteClose(int event, Event *e);
+ int openWriteRemoveVector(int event, Event *e);
+ int openWriteWriteDone(int event, Event *e);
+ int openWriteOverwrite(int event, Event *e);
+ int openWriteMain(int event, Event *e);
+ int openWriteStartDone(int event, Event *e);
+ int openWriteStartBegin(int event, Event *e);
+
+ int updateVector(int event, Event *e);
+ int updateReadDone(int event, Event *e);
+ int updateVecWrite(int event, Event *e);
+
+ int removeEvent(int event, Event *e);
+
+ int scanVol(int event, Event *e);
+ int scanObject(int event, Event *e);
+ int scanUpdateDone(int event, Event *e);
+ int scanOpenWrite(int event, Event *e);
+ int scanRemoveDone(int event, Event *e);
+
+ int
+ is_io_in_progress()
+ {
+ return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
+ }
+ void
+ set_io_not_in_progress()
+ {
+ io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
+ }
+ void
+ set_agg_write_in_progress()
+ {
+ io.aiocb.aio_fildes = AIO_AGG_WRITE_IN_PROGRESS;
+ }
+ int evacuateDocDone(int event, Event *e);
+ int evacuateReadHead(int event, Event *e);
+
+ void cancel_trigger();
+ int64_t get_object_size() override;
+ void set_http_info(CacheHTTPInfo *info) override;
+ void get_http_info(CacheHTTPInfo **info) override;
+ /** Get the fragment table.
+ @return The address of the start of the fragment table,
+ or @c nullptr if there is no fragment table.
+ */
+ virtual HTTPInfo::FragOffset *get_frag_table();
+ /** Load alt pointers and do fixups if needed.
+ @return Length of header data used for alternates.
+ */
+ virtual uint32_t load_http_info(CacheHTTPInfoVector *info, struct Doc *doc,
RefCountObj *block_ptr = nullptr);
+ bool is_pread_capable() override;
+ bool set_pin_in_cache(time_t time_pin) override;
+ time_t get_pin_in_cache() override;
+
+ // number of bytes to memset to 0 in the CacheVC when we free
+ // it. All member variables starting from vio are memset to 0.
+ // This variable is initialized in CacheVC constructor.
+ static int size_to_init;
+
+ // Start Region A
+ // This set of variables are not reset when the cacheVC is freed.
+ // A CacheVC must set these to the correct values whenever needed
+ // These are variables that are always set to the correct values
+ // before being used by the CacheVC
+ CacheKey key, first_key, earliest_key, update_key;
+ Dir dir, earliest_dir, overwrite_dir, first_dir;
+ // end Region A
+
+ // Start Region B
+ // These variables are individually cleared or reset when the
+ // CacheVC is freed. All these variables must be reset/cleared
+ // in free_CacheVC.
+ Action _action;
+ CacheHTTPHdr request;
+ CacheHTTPInfoVector vector;
+ CacheHTTPInfo alternate;
+ Ptr<IOBufferData> buf;
+ Ptr<IOBufferData> first_buf;
+ Ptr<IOBufferBlock> blocks; // data available to write
+ Ptr<IOBufferBlock> writer_buf;
+
+ OpenDirEntry *od = nullptr;
+ AIOCallbackInternal io;
+ int alternate_index = CACHE_ALT_INDEX_DEFAULT; // preferred position in
vector
+ LINK(CacheVC, opendir_link);
+#ifdef CACHE_STAT_PAGES
+ LINK(CacheVC, stat_link);
+#endif
+ // end Region B
+
+ // Start Region C
+ // These variables are memset to 0 when the structure is freed.
+ // The size of this region is size_to_init which is initialized
+ // in the CacheVC constructor. It assumes that vio is the start
+ // of this region.
+ // NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the
+ // size_to_init initialization
+ VIO vio;
+ CacheFragType frag_type;
+ CacheHTTPInfo *info;
+ CacheHTTPInfoVector *write_vector;
+ const OverridableHttpConfigParams *params;
+ int header_len; // for communicating with agg_copy
+ int frag_len; // for communicating with agg_copy
+ uint32_t write_len; // for communicating with agg_copy
+ uint32_t agg_len; // for communicating with aggWrite
+ uint32_t write_serial; // serial of the final write for SYNC
+ Vol *vol;
+ Dir *last_collision;
+ Event *trigger;
+ CacheKey *read_key;
+ ContinuationHandler save_handler;
+ time_t pin_in_cache;
+ ink_hrtime start_time;
+ int op_type; // Index into the metrics array for this operation, rather than
a CacheOpType (fewer casts)
+ int recursive;
+ int closed;
+ uint64_t seek_to; // pread offset
+ int64_t offset; // offset into 'blocks' of data to write
+ int64_t writer_offset; // offset of the writer for reading from a writer
+ int64_t length; // length of data available to write
+ int64_t doc_pos; // read position in 'buf'
+ uint64_t write_pos; // length written
+ uint64_t total_len; // total length written and available to write
+ uint64_t doc_len; // total_length (of the selected alternate for HTTP)
+ uint64_t update_len;
+ int fragment;
+ int scan_msec_delay;
+ CacheVC *write_vc;
+ char *hostname;
+ int host_len;
+ int header_to_write_len;
+ void *header_to_write;
+ short writer_lock_retry;
+ union {
+ uint32_t flags;
+ struct {
+ unsigned int use_first_key : 1;
+ unsigned int overwrite : 1; // overwrite first_key Dir if
it exists
+ unsigned int close_complete : 1; // WRITE_COMPLETE is final
+ unsigned int sync : 1; // write to be committed to
durable storage before WRITE_COMPLETE
+ unsigned int evacuator : 1;
+ unsigned int single_fragment : 1;
+ unsigned int evac_vector : 1;
+ unsigned int lookup : 1;
+ unsigned int update : 1;
+ unsigned int remove : 1;
+ unsigned int remove_aborted_writers : 1;
+ unsigned int open_read_timeout : 1; // UNUSED
+ unsigned int data_done : 1;
+ unsigned int read_from_writer_called : 1;
+ unsigned int not_from_ram_cache : 1; // entire object was from ram
cache
+ unsigned int rewrite_resident_alt : 1;
+ unsigned int readers : 1;
+ unsigned int doc_from_ram_cache : 1;
+ unsigned int hit_evacuate : 1;
+ unsigned int compressed_in_ram : 1; // compressed state in ram
cache
+ unsigned int allow_empty_doc : 1; // used for cache empty http
document
+ } f;
+ };
+ // BTF optimization used to skip reading stuff in cache partition that
doesn't contain any
+ // dir entries.
+ char *scan_vol_map;
+ // BTF fix to handle objects that overlapped over two different reads,
+ // this is how much we need to back up the buffer to get the start of the
overlapping object.
+ off_t scan_fix_buffer_offset;
+ // end region C
+};
+
+LINK_DEFINITION(CacheVC, opendir_link)
diff --git a/iocore/cache/CacheVol.cc b/iocore/cache/CacheVol.cc
index bb34aa3263..ed1f59a293 100644
--- a/iocore/cache/CacheVol.cc
+++ b/iocore/cache/CacheVol.cc
@@ -23,17 +23,14 @@
#include "P_Cache.h"
-#define SCAN_BUF_SIZE RECOVERY_SIZE
-#define SCAN_WRITER_LOCK_MAX_RETRY 5
-
namespace
{
-
-DbgCtl dbg_ctl_cache_scan{"cache_scan"};
DbgCtl dbg_ctl_cache_scan_truss{"cache_scan_truss"};
-
} // end anonymous namespace
+#define SCAN_BUF_SIZE RECOVERY_SIZE
+#define SCAN_WRITER_LOCK_MAX_RETRY 5
+
Action *
Cache::scan(Continuation *cont, const char *hostname, int host_len, int
KB_per_second)
{
@@ -57,468 +54,3 @@ Cache::scan(Continuation *cont, const char *hostname, int
host_len, int KB_per_s
cont->handleEvent(CACHE_EVENT_SCAN, c);
return &c->_action;
}
-
-int
-CacheVC::scanVol(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
-{
- Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanVol", this);
- if (_action.cancelled) {
- return free_CacheVC(this);
- }
-
- ReplaceablePtr<CacheHostTable>::ScopedReader hosttable(&theCache->hosttable);
-
- const CacheHostRecord *rec = &hosttable->gen_host_rec;
- if (host_len) {
- CacheHostResult res;
- hosttable->Match(hostname, host_len, &res);
- if (res.record) {
- rec = res.record;
- }
- }
-
- if (!vol) {
- if (!rec->num_vols) {
- goto Ldone;
- }
- vol = rec->vols[0];
- } else {
- for (int i = 0; i < rec->num_vols - 1; i++) {
- if (vol == rec->vols[i]) {
- vol = rec->vols[i + 1];
- goto Lcont;
- }
- }
- goto Ldone;
- }
-Lcont:
- fragment = 0;
- SET_HANDLER(&CacheVC::scanObject);
- eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay));
- return EVENT_CONT;
-Ldone:
- _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, nullptr);
- return free_CacheVC(this);
-}
-
-/* Next block with some data in it in this partition. Returns end of
partition if no more
- * locations.
- *
- * d - Vol
- * vol_map - precalculated map
- * offset - offset to start looking at (and data at this location has not been
read yet). */
-static off_t
-next_in_map(Vol *vol, char *vol_map, off_t offset)
-{
- off_t start_offset = vol->vol_offset_to_offset(0);
- off_t new_off = (offset - start_offset);
- off_t vol_len = vol->vol_relative_length(start_offset);
-
- while (new_off < vol_len && !vol_map[new_off / SCAN_BUF_SIZE]) {
- new_off += SCAN_BUF_SIZE;
- }
- if (new_off >= vol_len) {
- return vol_len + start_offset;
- }
- return new_off + start_offset;
-}
-
-// Function in CacheDir.cc that we need for make_vol_map().
-int dir_bucket_loop_fix(Dir *start_dir, int s, Vol *vol);
-
-// TODO: If we used a bit vector, we could make a smaller map structure.
-// TODO: If we saved a high water mark we could have a smaller buf, and avoid
searching it
-// when we are asked about the highest interesting offset.
-/* Make map of what blocks in partition are used.
- *
- * d - Vol to make a map of. */
-static char *
-make_vol_map(Vol *vol)
-{
- // Map will be one byte for each SCAN_BUF_SIZE bytes.
- off_t start_offset = vol->vol_offset_to_offset(0);
- off_t vol_len = vol->vol_relative_length(start_offset);
- size_t map_len = (vol_len + (SCAN_BUF_SIZE - 1)) / SCAN_BUF_SIZE;
- char *vol_map = static_cast<char *>(ats_malloc(map_len));
-
- memset(vol_map, 0, map_len);
-
- // Scan directories.
- // Copied from dir_entries_used() and modified to fill in the map instead.
- for (int s = 0; s < vol->segments; s++) {
- Dir *seg = vol->dir_segment(s);
- for (int b = 0; b < vol->buckets; b++) {
- Dir *e = dir_bucket(b, seg);
- if (dir_bucket_loop_fix(e, s, vol)) {
- break;
- }
- while (e) {
- if (dir_offset(e) && dir_valid(vol, e) && dir_agg_valid(vol, e) &&
dir_head(e)) {
- off_t offset = vol->vol_offset(e) - start_offset;
- if (offset <= vol_len) {
- vol_map[offset / SCAN_BUF_SIZE] = 1;
- }
- }
- e = next_dir(e, seg);
- if (!e) {
- break;
- }
- }
- }
- }
- return vol_map;
-}
-
-int
-CacheVC::scanObject(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
-{
- Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanObject", this);
-
- Doc *doc = nullptr;
- void *result = nullptr;
- int hlen = 0;
- char hname[500];
- bool hostinfo_copied = false;
- off_t next_object_len = 0;
- bool might_need_overlap_read = false;
-
- cancel_trigger();
- set_io_not_in_progress();
- if (_action.cancelled) {
- return free_CacheVC(this);
- }
-
- CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
- if (!lock.is_locked()) {
- Dbg(dbg_ctl_cache_scan_truss, "delay %p:scanObject", this);
- mutex->thread_holding->schedule_in_local(this,
HRTIME_MSECONDS(cache_config_mutex_retry_delay));
- return EVENT_CONT;
- }
-
- if (!fragment) { // initialize for first read
- fragment = 1;
- scan_vol_map = make_vol_map(vol);
- io.aiocb.aio_offset = next_in_map(vol, scan_vol_map,
vol->vol_offset_to_offset(0));
- if (io.aiocb.aio_offset >= static_cast<off_t>(vol->skip + vol->len)) {
- goto Lnext_vol;
- }
- io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
- io.aiocb.aio_buf = buf->data();
- io.action = this;
- io.thread = AIO_CALLBACK_THREAD_ANY;
- Dbg(dbg_ctl_cache_scan_truss, "read %p:scanObject", this);
- goto Lread;
- }
-
- if (!io.ok()) {
- result = (void *)-ECACHE_READ_FAIL;
- goto Ldone;
- }
-
- doc = reinterpret_cast<Doc *>(buf->data() + offset);
- // If there is data in the buffer before the start that is from a partial
object read previously
- // Fix things as if we read it this time.
- if (scan_fix_buffer_offset) {
- io.aio_result += scan_fix_buffer_offset;
- io.aiocb.aio_nbytes += scan_fix_buffer_offset;
- io.aiocb.aio_offset -= scan_fix_buffer_offset;
- io.aiocb.aio_buf = static_cast<char *>(io.aiocb.aio_buf) -
scan_fix_buffer_offset;
- scan_fix_buffer_offset = 0;
- }
- while (static_cast<off_t>(reinterpret_cast<char *>(doc) - buf->data()) +
next_object_len <
- static_cast<off_t>(io.aiocb.aio_nbytes)) {
- might_need_overlap_read = false;
- doc = reinterpret_cast<Doc *>(reinterpret_cast<char
*>(doc) + next_object_len);
- next_object_len = vol->round_to_approx_size(doc->len);
- int i;
- bool changed;
-
- if (doc->magic != DOC_MAGIC) {
- next_object_len = CACHE_BLOCK_SIZE;
- Dbg(dbg_ctl_cache_scan_truss, "blockskip %p:scanObject", this);
- continue;
- }
-
- if (doc->doc_type != CACHE_FRAG_TYPE_HTTP || !doc->hlen) {
- goto Lskip;
- }
-
- last_collision = nullptr;
- while (true) {
- if (!dir_probe(&doc->first_key, vol, &dir, &last_collision)) {
- goto Lskip;
- }
- if (!dir_agg_valid(vol, &dir) || !dir_head(&dir) ||
- (vol->vol_offset(&dir) != io.aiocb.aio_offset +
(reinterpret_cast<char *>(doc) - buf->data()))) {
- continue;
- }
- break;
- }
- if (doc->data() - buf->data() > static_cast<int>(io.aiocb.aio_nbytes)) {
- might_need_overlap_read = true;
- goto Lskip;
- }
- {
- char *tmp = doc->hdr();
- int len = doc->hlen;
- while (len > 0) {
- int r = HTTPInfo::unmarshal(tmp, len, buf.get());
- if (r < 0) {
- ink_assert(!"CacheVC::scanObject unmarshal failed");
- goto Lskip;
- }
- len -= r;
- tmp += r;
- }
- }
- if (this->load_http_info(&vector, doc) != doc->hlen) {
- goto Lskip;
- }
- changed = false;
- hostinfo_copied = false;
- for (i = 0; i < vector.count(); i++) {
- if (!vector.get(i)->valid()) {
- goto Lskip;
- }
- if (!hostinfo_copied) {
- memccpy(hname, vector.get(i)->request_get()->host_get(&hlen), 0, 500);
- hname[hlen] = 0;
- Dbg(dbg_ctl_cache_scan, "hostname = '%s', hostlen = %d", hname, hlen);
- hostinfo_copied = true;
- }
- vector.get(i)->object_key_get(&key);
- alternate_index = i;
- // verify that the earliest block exists, reducing 'false hit' callbacks
- if (!(key == doc->key)) {
- last_collision = nullptr;
- if (!dir_probe(&key, vol, &earliest_dir, &last_collision)) {
- continue;
- }
- }
- earliest_key = key;
- int result1 =
_action.continuation->handleEvent(CACHE_EVENT_SCAN_OBJECT, vector.get(i));
- switch (result1) {
- case CACHE_SCAN_RESULT_CONTINUE:
- continue;
- case CACHE_SCAN_RESULT_DELETE:
- changed = true;
- vector.remove(i, true);
- i--;
- continue;
- case CACHE_SCAN_RESULT_DELETE_ALL_ALTERNATES:
- changed = true;
- vector.clear();
- i = 0;
- break;
- case CACHE_SCAN_RESULT_UPDATE:
- ink_assert(alternate_index >= 0);
- vector.insert(&alternate, alternate_index);
- if (!vector.get(alternate_index)->valid()) {
- continue;
- }
- changed = true;
- continue;
- case EVENT_DONE:
- goto Lcancel;
- default:
- ink_assert(!"unexpected CACHE_SCAN_RESULT");
- continue;
- }
- }
- if (changed) {
- if (!vector.count()) {
- ink_assert(hostinfo_copied);
- SET_HANDLER(&CacheVC::scanRemoveDone);
- // force remove even if there is a writer
- cacheProcessor.remove(this, &doc->first_key, CACHE_FRAG_TYPE_HTTP,
hname, hlen);
- return EVENT_CONT;
- } else {
- offset = reinterpret_cast<char *>(doc) - buf->data();
- write_len = 0;
- frag_type = CACHE_FRAG_TYPE_HTTP;
- f.use_first_key = 1;
- f.evac_vector = 1;
- alternate_index = CACHE_ALT_REMOVED;
- writer_lock_retry = 0;
-
- first_key = key = doc->first_key;
- earliest_key.clear();
-
- SET_HANDLER(&CacheVC::scanOpenWrite);
- return scanOpenWrite(EVENT_NONE, nullptr);
- }
- }
- continue;
- Lskip:;
- }
- vector.clear();
- // If we had an object that went past the end of the buffer, and it is small
enough to fix,
- // fix it.
- if (might_need_overlap_read &&
- (static_cast<off_t>(reinterpret_cast<char *>(doc) - buf->data()) +
next_object_len >
- static_cast<off_t>(io.aiocb.aio_nbytes)) &&
- next_object_len > 0) {
- off_t partial_object_len = io.aiocb.aio_nbytes - (reinterpret_cast<char
*>(doc) - buf->data());
- // Copy partial object to beginning of the buffer.
- memmove(buf->data(), reinterpret_cast<char *>(doc), partial_object_len);
- io.aiocb.aio_offset += io.aiocb.aio_nbytes;
- io.aiocb.aio_nbytes = SCAN_BUF_SIZE - partial_object_len;
- io.aiocb.aio_buf = buf->data() + partial_object_len;
- scan_fix_buffer_offset = partial_object_len;
- } else { // Normal case, where we ended on a object boundary.
- io.aiocb.aio_offset += (reinterpret_cast<char *>(doc) - buf->data()) +
next_object_len;
- Dbg(dbg_ctl_cache_scan_truss, "next %p:scanObject %" PRId64, this,
(int64_t)io.aiocb.aio_offset);
- io.aiocb.aio_offset = next_in_map(vol, scan_vol_map, io.aiocb.aio_offset);
- Dbg(dbg_ctl_cache_scan_truss, "next_in_map %p:scanObject %" PRId64, this,
(int64_t)io.aiocb.aio_offset);
- io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
- io.aiocb.aio_buf = buf->data();
- scan_fix_buffer_offset = 0;
- }
-
- if (io.aiocb.aio_offset >= vol->skip + vol->len) {
- Lnext_vol:
- SET_HANDLER(&CacheVC::scanVol);
- eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay));
- return EVENT_CONT;
- }
-
-Lread:
- io.aiocb.aio_fildes = vol->fd;
- if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) >
static_cast<off_t>(vol->skip + vol->len)) {
- io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset;
- }
- offset = 0;
- ink_assert(ink_aio_read(&io) >= 0);
- Dbg(dbg_ctl_cache_scan_truss, "read %p:scanObject %" PRId64 " %zu", this,
(int64_t)io.aiocb.aio_offset,
- (size_t)io.aiocb.aio_nbytes);
- return EVENT_CONT;
-
-Ldone:
- Dbg(dbg_ctl_cache_scan_truss, "done %p:scanObject", this);
- _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, result);
-Lcancel:
- return free_CacheVC(this);
-}
-
-int
-CacheVC::scanRemoveDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
-{
- Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanRemoveDone", this);
- Dbg(dbg_ctl_cache_scan, "remove done.");
- alternate.destroy();
- SET_HANDLER(&CacheVC::scanObject);
- return handleEvent(EVENT_IMMEDIATE, nullptr);
-}
-
-int
-CacheVC::scanOpenWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
-{
- Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanOpenWrite", this);
- cancel_trigger();
- // get volume lock
- if (writer_lock_retry > SCAN_WRITER_LOCK_MAX_RETRY) {
- int r =
_action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_BLOCKED, nullptr);
- Dbg(dbg_ctl_cache_scan, "still haven't got the writer lock, asking
user..");
- switch (r) {
- case CACHE_SCAN_RESULT_RETRY:
- writer_lock_retry = 0;
- break;
- case CACHE_SCAN_RESULT_CONTINUE:
- SET_HANDLER(&CacheVC::scanObject);
- return scanObject(EVENT_IMMEDIATE, nullptr);
- }
- }
- int ret = 0;
- {
- CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
- if (!lock.is_locked()) {
- Dbg(dbg_ctl_cache_scan, "vol->mutex %p:scanOpenWrite", this);
- VC_SCHED_LOCK_RETRY();
- }
-
- Dbg(dbg_ctl_cache_scan, "trying for writer lock");
- if (vol->open_write(this, false, 1)) {
- writer_lock_retry++;
- SET_HANDLER(&CacheVC::scanOpenWrite);
- mutex->thread_holding->schedule_in_local(this, scan_msec_delay);
- return EVENT_CONT;
- }
-
- ink_assert(this->od);
- // put all the alternates in the open directory vector
- int alt_count = vector.count();
- for (int i = 0; i < alt_count; i++) {
- write_vector->insert(vector.get(i));
- }
- od->writing_vec = true;
- vector.clear(false);
- // check that the directory entry was not overwritten
- // if so return failure
- Dbg(dbg_ctl_cache_scan, "got writer lock");
- Dir *l = nullptr;
- Dir d;
- Doc *doc = reinterpret_cast<Doc *>(buf->data() + offset);
- offset = reinterpret_cast<char *>(doc) - buf->data() +
vol->round_to_approx_size(doc->len);
- // if the doc contains some data, then we need to create
- // a new directory entry for this fragment. Remember the
- // offset and the key in earliest_key
- dir_assign(&od->first_dir, &dir);
- if (doc->total_len) {
- dir_assign(&od->single_doc_dir, &dir);
- dir_set_tag(&od->single_doc_dir, doc->key.slice32(2));
- od->single_doc_key = doc->key;
- od->move_resident_alt = true;
- }
-
- while (true) {
- if (!dir_probe(&first_key, vol, &d, &l)) {
- vol->close_write(this);
- _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_FAILED,
nullptr);
- SET_HANDLER(&CacheVC::scanObject);
- return handleEvent(EVENT_IMMEDIATE, nullptr);
- }
- if (memcmp(&dir, &d, SIZEOF_DIR)) {
- Dbg(dbg_ctl_cache_scan, "dir entry has changed");
- continue;
- }
- break;
- }
-
- // the document was not modified
- // we are safe from now on as we hold the
- // writer lock on the doc
- if (f.evac_vector) {
- header_len = write_vector->marshal_length();
- }
- SET_HANDLER(&CacheVC::scanUpdateDone);
- ret = do_write_call();
- }
- if (ret == EVENT_RETURN) {
- return handleEvent(AIO_EVENT_DONE, nullptr);
- }
- return ret;
-}
-
-int
-CacheVC::scanUpdateDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
-{
- Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanUpdateDone", this);
- cancel_trigger();
- // get volume lock
- CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
- if (lock.is_locked()) {
- // insert a directory entry for the previous fragment
- dir_overwrite(&first_key, vol, &dir, &od->first_dir, false);
- if (od->move_resident_alt) {
- dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
- }
- ink_assert(vol->open_read(&first_key));
- ink_assert(this->od);
- vol->close_write(this);
- SET_HANDLER(&CacheVC::scanObject);
- return handleEvent(EVENT_IMMEDIATE, nullptr);
- } else {
- mutex->thread_holding->schedule_in_local(this,
HRTIME_MSECONDS(cache_config_mutex_retry_delay));
- return EVENT_CONT;
- }
-}
diff --git a/iocore/cache/P_CacheDir.h b/iocore/cache/P_CacheDir.h
index 2721ee0eba..2b5cd7c9a2 100644
--- a/iocore/cache/P_CacheDir.h
+++ b/iocore/cache/P_CacheDir.h
@@ -28,6 +28,9 @@
#include "I_EventSystem.h"
#include "I_Continuation.h"
+// aio
+#include "I_AIO.h"
+
struct Vol;
struct InterimCacheVol;
struct CacheVC;
diff --git a/iocore/cache/P_CacheInternal.h b/iocore/cache/P_CacheInternal.h
index 8c6dba1aba..a102633475 100644
--- a/iocore/cache/P_CacheInternal.h
+++ b/iocore/cache/P_CacheInternal.h
@@ -31,6 +31,8 @@
#include "P_CacheHosting.h"
#include "api/Metrics.h"
+#include "CacheVC.h"
+
using ts::Metrics;
struct EvacuationBlock;
@@ -125,291 +127,6 @@ extern int cache_config_mutex_retry_delay;
extern int cache_read_while_writer_retry_delay;
extern int cache_config_read_while_writer_max_retries;
-// CacheVC
-struct CacheVC : public CacheVConnection {
- CacheVC();
-
- VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) override;
- VIO *do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *buf, int64_t
offset) override;
- VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool
owner = false) override;
- void do_io_close(int lerrno = -1) override;
- void reenable(VIO *avio) override;
- void reenable_re(VIO *avio) override;
- bool get_data(int i, void *data) override;
- bool set_data(int i, void *data) override;
-
- bool
- is_ram_cache_hit() const override
- {
- ink_assert(vio.op == VIO::READ);
- return !f.not_from_ram_cache;
- }
-
- int
- get_header(void **ptr, int *len) override
- {
- if (first_buf) {
- Doc *doc = (Doc *)first_buf->data();
- *ptr = doc->hdr();
- *len = doc->hlen;
- return 0;
- }
-
- return -1;
- }
-
- int
- set_header(void *ptr, int len) override
- {
- header_to_write = ptr;
- header_to_write_len = len;
- return 0;
- }
-
- int
- get_single_data(void **ptr, int *len) override
- {
- if (first_buf) {
- Doc *doc = (Doc *)first_buf->data();
- if (doc->data_len() == doc->total_len) {
- *ptr = doc->data();
- *len = doc->data_len();
- return 0;
- }
- }
-
- return -1;
- }
-
- int
- get_volume_number() const override
- {
- if (vol && vol->cache_vol) {
- return vol->cache_vol->vol_number;
- }
-
- return -1;
- }
-
- const char *
- get_disk_path() const override
- {
- if (vol && vol->disk) {
- return vol->disk->path;
- }
-
- return nullptr;
- }
-
- bool
- is_compressed_in_ram() const override
- {
- ink_assert(vio.op == VIO::READ);
- return f.compressed_in_ram;
- }
-
- bool writer_done();
- int calluser(int event);
- int callcont(int event);
- int die();
- int dead(int event, Event *e);
-
- int handleReadDone(int event, Event *e);
- int handleRead(int event, Event *e);
- int do_read_call(CacheKey *akey);
- int handleWrite(int event, Event *e);
- int handleWriteLock(int event, Event *e);
- int do_write_call();
- int do_write_lock();
- int do_write_lock_call();
- int do_sync(uint32_t target_write_serial);
-
- int openReadClose(int event, Event *e);
- int openReadReadDone(int event, Event *e);
- int openReadMain(int event, Event *e);
- int openReadStartEarliest(int event, Event *e);
- int openReadVecWrite(int event, Event *e);
- int openReadStartHead(int event, Event *e);
- int openReadFromWriter(int event, Event *e);
- int openReadFromWriterMain(int event, Event *e);
- int openReadFromWriterFailure(int event, Event *);
- int openReadChooseWriter(int event, Event *e);
- int openReadDirDelete(int event, Event *e);
-
- int openWriteCloseDir(int event, Event *e);
- int openWriteCloseHeadDone(int event, Event *e);
- int openWriteCloseHead(int event, Event *e);
- int openWriteCloseDataDone(int event, Event *e);
- int openWriteClose(int event, Event *e);
- int openWriteRemoveVector(int event, Event *e);
- int openWriteWriteDone(int event, Event *e);
- int openWriteOverwrite(int event, Event *e);
- int openWriteMain(int event, Event *e);
- int openWriteStartDone(int event, Event *e);
- int openWriteStartBegin(int event, Event *e);
-
- int updateVector(int event, Event *e);
- int updateReadDone(int event, Event *e);
- int updateVecWrite(int event, Event *e);
-
- int removeEvent(int event, Event *e);
-
- int scanVol(int event, Event *e);
- int scanObject(int event, Event *e);
- int scanUpdateDone(int event, Event *e);
- int scanOpenWrite(int event, Event *e);
- int scanRemoveDone(int event, Event *e);
-
- int
- is_io_in_progress()
- {
- return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
- }
- void
- set_io_not_in_progress()
- {
- io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
- }
- void
- set_agg_write_in_progress()
- {
- io.aiocb.aio_fildes = AIO_AGG_WRITE_IN_PROGRESS;
- }
- int evacuateDocDone(int event, Event *e);
- int evacuateReadHead(int event, Event *e);
-
- void cancel_trigger();
- int64_t get_object_size() override;
- void set_http_info(CacheHTTPInfo *info) override;
- void get_http_info(CacheHTTPInfo **info) override;
- /** Get the fragment table.
- @return The address of the start of the fragment table,
- or @c nullptr if there is no fragment table.
- */
- virtual HTTPInfo::FragOffset *get_frag_table();
- /** Load alt pointers and do fixups if needed.
- @return Length of header data used for alternates.
- */
- virtual uint32_t load_http_info(CacheHTTPInfoVector *info, struct Doc *doc,
RefCountObj *block_ptr = nullptr);
- bool is_pread_capable() override;
- bool set_pin_in_cache(time_t time_pin) override;
- time_t get_pin_in_cache() override;
-
- // number of bytes to memset to 0 in the CacheVC when we free
- // it. All member variables starting from vio are memset to 0.
- // This variable is initialized in CacheVC constructor.
- static int size_to_init;
-
- // Start Region A
- // This set of variables are not reset when the cacheVC is freed.
- // A CacheVC must set these to the correct values whenever needed
- // These are variables that are always set to the correct values
- // before being used by the CacheVC
- CacheKey key, first_key, earliest_key, update_key;
- Dir dir, earliest_dir, overwrite_dir, first_dir;
- // end Region A
-
- // Start Region B
- // These variables are individually cleared or reset when the
- // CacheVC is freed. All these variables must be reset/cleared
- // in free_CacheVC.
- Action _action;
- CacheHTTPHdr request;
- CacheHTTPInfoVector vector;
- CacheHTTPInfo alternate;
- Ptr<IOBufferData> buf;
- Ptr<IOBufferData> first_buf;
- Ptr<IOBufferBlock> blocks; // data available to write
- Ptr<IOBufferBlock> writer_buf;
-
- OpenDirEntry *od = nullptr;
- AIOCallbackInternal io;
- int alternate_index = CACHE_ALT_INDEX_DEFAULT; // preferred position in
vector
- LINK(CacheVC, opendir_link);
-#ifdef CACHE_STAT_PAGES
- LINK(CacheVC, stat_link);
-#endif
- // end Region B
-
- // Start Region C
- // These variables are memset to 0 when the structure is freed.
- // The size of this region is size_to_init which is initialized
- // in the CacheVC constructor. It assumes that vio is the start
- // of this region.
- // NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the
- // size_to_init initialization
- VIO vio;
- CacheFragType frag_type;
- CacheHTTPInfo *info;
- CacheHTTPInfoVector *write_vector;
- const OverridableHttpConfigParams *params;
- int header_len; // for communicating with agg_copy
- int frag_len; // for communicating with agg_copy
- uint32_t write_len; // for communicating with agg_copy
- uint32_t agg_len; // for communicating with aggWrite
- uint32_t write_serial; // serial of the final write for SYNC
- Vol *vol;
- Dir *last_collision;
- Event *trigger;
- CacheKey *read_key;
- ContinuationHandler save_handler;
- time_t pin_in_cache;
- ink_hrtime start_time;
- int op_type; // Index into the metrics array for this operation, rather than
a CacheOpType (fewer casts)
- int recursive;
- int closed;
- uint64_t seek_to; // pread offset
- int64_t offset; // offset into 'blocks' of data to write
- int64_t writer_offset; // offset of the writer for reading from a writer
- int64_t length; // length of data available to write
- int64_t doc_pos; // read position in 'buf'
- uint64_t write_pos; // length written
- uint64_t total_len; // total length written and available to write
- uint64_t doc_len; // total_length (of the selected alternate for HTTP)
- uint64_t update_len;
- int fragment;
- int scan_msec_delay;
- CacheVC *write_vc;
- char *hostname;
- int host_len;
- int header_to_write_len;
- void *header_to_write;
- short writer_lock_retry;
- union {
- uint32_t flags;
- struct {
- unsigned int use_first_key : 1;
- unsigned int overwrite : 1; // overwrite first_key Dir if
it exists
- unsigned int close_complete : 1; // WRITE_COMPLETE is final
- unsigned int sync : 1; // write to be committed to
durable storage before WRITE_COMPLETE
- unsigned int evacuator : 1;
- unsigned int single_fragment : 1;
- unsigned int evac_vector : 1;
- unsigned int lookup : 1;
- unsigned int update : 1;
- unsigned int remove : 1;
- unsigned int remove_aborted_writers : 1;
- unsigned int open_read_timeout : 1; // UNUSED
- unsigned int data_done : 1;
- unsigned int read_from_writer_called : 1;
- unsigned int not_from_ram_cache : 1; // entire object was from ram
cache
- unsigned int rewrite_resident_alt : 1;
- unsigned int readers : 1;
- unsigned int doc_from_ram_cache : 1;
- unsigned int hit_evacuate : 1;
- unsigned int compressed_in_ram : 1; // compressed state in ram
cache
- unsigned int allow_empty_doc : 1; // used for cache empty http
document
- } f;
- };
- // BTF optimization used to skip reading stuff in cache partition that
doesn't contain any
- // dir entries.
- char *scan_vol_map;
- // BTF fix to handle objects that overlapped over two different reads,
- // this is how much we need to back up the buffer to get the start of the
overlapping object.
- off_t scan_fix_buffer_offset;
- // end region C
-};
-
#define PUSH_HANDLER(_x) \
do { \
ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
@@ -917,5 +634,3 @@ cache_hash(const CryptoHash &hash)
unsigned int mhash = (unsigned int)(f >> 32);
return mhash;
}
-
-LINK_DEFINITION(CacheVC, opendir_link)