This is an automated email from the ASF dual-hosted git repository. bneradt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push: new fdf2aa0b89 Move CacheVC struct and methods from Vol (#10601) fdf2aa0b89 is described below commit fdf2aa0b8955d2996c531e7825f506807d95bace Author: JosiahWI <41302989+josia...@users.noreply.github.com> AuthorDate: Fri Oct 13 11:18:58 2023 -0500 Move CacheVC struct and methods from Vol (#10601) This is part of the follow-up requested by @masaori335 in #10599. The remaining work is to deal with the methods defined in CacheRead.cc and CacheWrite.cc. --- iocore/cache/CacheVC.cc | 621 ++++++++++++++++++++++++++++++++++++----- iocore/cache/CacheVC.h | 337 ++++++++++++++++++++++ iocore/cache/CacheVol.cc | 474 +------------------------------ iocore/cache/P_CacheDir.h | 3 + iocore/cache/P_CacheInternal.h | 289 +------------------ 5 files changed, 894 insertions(+), 830 deletions(-) diff --git a/iocore/cache/CacheVC.cc b/iocore/cache/CacheVC.cc index ef8d5f6609..4852d268dd 100644 --- a/iocore/cache/CacheVC.cc +++ b/iocore/cache/CacheVC.cc @@ -21,12 +21,16 @@ limitations under the License. */ +#include "I_Cache.h" #include "I_CacheDefs.h" #include "P_CacheDisk.h" #include "P_CacheHttp.h" #include "P_CacheInternal.h" #include "P_CacheVol.h" +// must be included after the others +#include "CacheVC.h" + // hdrs #include "HTTP.h" #include "MIME.h" @@ -53,6 +57,7 @@ // tscore #include "tscore/I_Version.h" #include "tscore/ink_assert.h" +#include "tscore/ink_hrtime.h" #include "tscore/Ptr.h" // ts @@ -63,21 +68,96 @@ #include <cstring> #include <ctime> +namespace +{ DbgCtl dbg_ctl_cache_bc{"cache_bc"}; DbgCtl dbg_ctl_cache_disk_error{"cache_disk_error"}; DbgCtl dbg_ctl_cache_read{"cache_read"}; +DbgCtl dbg_ctl_cache_scan{"cache_scan"}; +DbgCtl dbg_ctl_cache_scan_truss{"cache_scan_truss"}; #ifdef DEBUG DbgCtl dbg_ctl_cache_close{"cache_close"}; DbgCtl dbg_ctl_cache_reenable{"cache_reenable"}; #endif +} // end anonymous namespace // Compilation Options -#define STORE_COLLISION 1 -#define USELESS_REENABLES // allow them for now +#define SCAN_BUF_SIZE RECOVERY_SIZE +#define SCAN_WRITER_LOCK_MAX_RETRY 5 +#define STORE_COLLISION 1 +#define USELESS_REENABLES // allow them for now // #define VERIFY_JTEST_DATA extern int cache_config_ram_cache_cutoff; +/* Next block with some data in it in this partition. Returns end of partition if no more + * locations. + * + * d - Vol + * vol_map - precalculated map + * offset - offset to start looking at (and data at this location has not been read yet). */ +static off_t +next_in_map(Vol *vol, char *vol_map, off_t offset) +{ + off_t start_offset = vol->vol_offset_to_offset(0); + off_t new_off = (offset - start_offset); + off_t vol_len = vol->vol_relative_length(start_offset); + + while (new_off < vol_len && !vol_map[new_off / SCAN_BUF_SIZE]) { + new_off += SCAN_BUF_SIZE; + } + if (new_off >= vol_len) { + return vol_len + start_offset; + } + return new_off + start_offset; +} + +// Function in CacheDir.cc that we need for make_vol_map(). +int dir_bucket_loop_fix(Dir *start_dir, int s, Vol *vol); + +// TODO: If we used a bit vector, we could make a smaller map structure. +// TODO: If we saved a high water mark we could have a smaller buf, and avoid searching it +// when we are asked about the highest interesting offset. +/* Make map of what blocks in partition are used. + * + * d - Vol to make a map of. */ +static char * +make_vol_map(Vol *vol) +{ + // Map will be one byte for each SCAN_BUF_SIZE bytes. + off_t start_offset = vol->vol_offset_to_offset(0); + off_t vol_len = vol->vol_relative_length(start_offset); + size_t map_len = (vol_len + (SCAN_BUF_SIZE - 1)) / SCAN_BUF_SIZE; + char *vol_map = static_cast<char *>(ats_malloc(map_len)); + + memset(vol_map, 0, map_len); + + // Scan directories. + // Copied from dir_entries_used() and modified to fill in the map instead. + for (int s = 0; s < vol->segments; s++) { + Dir *seg = vol->dir_segment(s); + for (int b = 0; b < vol->buckets; b++) { + Dir *e = dir_bucket(b, seg); + if (dir_bucket_loop_fix(e, s, vol)) { + break; + } + while (e) { + if (dir_offset(e) && dir_valid(vol, e) && dir_agg_valid(vol, e) && dir_head(e)) { + off_t offset = vol->vol_offset(e) - start_offset; + if (offset <= vol_len) { + vol_map[offset / SCAN_BUF_SIZE] = 1; + } + } + e = next_dir(e, seg); + if (!e) { + break; + } + } + } + } + return vol_map; +} + int CacheVC::size_to_init = -1; CacheVC::CacheVC() @@ -86,13 +166,6 @@ CacheVC::CacheVC() memset((void *)&vio, 0, size_to_init); } -HTTPInfo::FragOffset * -CacheVC::get_frag_table() -{ - ink_assert(alternate.valid()); - return alternate.valid() ? alternate.get_frag_table() : nullptr; -} - VIO * CacheVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *abuf) { @@ -227,63 +300,6 @@ CacheVC::set_data(int /* i ATS_UNUSED */, void * /* data */) return true; } -void -CacheVC::get_http_info(CacheHTTPInfo **ainfo) -{ - *ainfo = &(this)->alternate; -} - -// set_http_info must be called before do_io_write -// cluster vc does an optimization where it calls do_io_write() before -// calling set_http_info(), but it guarantees that the info will -// be set before transferring any bytes -void -CacheVC::set_http_info(CacheHTTPInfo *ainfo) -{ - ink_assert(!total_len); - if (f.update) { - ainfo->object_key_set(update_key); - ainfo->object_size_set(update_len); - } else { - ainfo->object_key_set(earliest_key); - // don't know the total len yet - } - - MIMEField *field = ainfo->m_alt->m_response_hdr.field_find(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH); - if ((field && !field->value_get_int64()) || ainfo->m_alt->m_response_hdr.status_get() == HTTP_STATUS_NO_CONTENT) { - f.allow_empty_doc = 1; - // Set the object size here to zero in case this is a cache replace where the new object - // length is zero but the old object was not. - ainfo->object_size_set(0); - } else { - f.allow_empty_doc = 0; - } - - alternate.copy_shallow(ainfo); - ainfo->clear(); -} - -bool -CacheVC::set_pin_in_cache(time_t time_pin) -{ - if (total_len) { - ink_assert(!"should Pin the document before writing"); - return false; - } - if (vio.op != VIO::WRITE) { - ink_assert(!"Pinning only allowed while writing objects to the cache"); - return false; - } - pin_in_cache = time_pin; - return true; -} - -time_t -CacheVC::get_pin_in_cache() -{ - return pin_in_cache; -} - int CacheVC::dead(int /* event ATS_UNUSED */, Event * /*e ATS_UNUSED */) { @@ -291,12 +307,6 @@ CacheVC::dead(int /* event ATS_UNUSED */, Event * /*e ATS_UNUSED */) return EVENT_DONE; } -bool -CacheVC::is_pread_capable() -{ - return !f.read_from_writer_called; -} - static void unmarshal_helper(Doc *doc, Ptr<IOBufferData> &buf, int &okay) { @@ -607,3 +617,470 @@ Lremoved: Lfree: return free_CacheVC(this); } + +int +CacheVC::scanVol(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) +{ + Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanVol", this); + if (_action.cancelled) { + return free_CacheVC(this); + } + + ReplaceablePtr<CacheHostTable>::ScopedReader hosttable(&theCache->hosttable); + + const CacheHostRecord *rec = &hosttable->gen_host_rec; + if (host_len) { + CacheHostResult res; + hosttable->Match(hostname, host_len, &res); + if (res.record) { + rec = res.record; + } + } + + if (!vol) { + if (!rec->num_vols) { + goto Ldone; + } + vol = rec->vols[0]; + } else { + for (int i = 0; i < rec->num_vols - 1; i++) { + if (vol == rec->vols[i]) { + vol = rec->vols[i + 1]; + goto Lcont; + } + } + goto Ldone; + } +Lcont: + fragment = 0; + SET_HANDLER(&CacheVC::scanObject); + eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay)); + return EVENT_CONT; +Ldone: + _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, nullptr); + return free_CacheVC(this); +} + +int +CacheVC::scanObject(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) +{ + Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanObject", this); + + Doc *doc = nullptr; + void *result = nullptr; + int hlen = 0; + char hname[500]; + bool hostinfo_copied = false; + off_t next_object_len = 0; + bool might_need_overlap_read = false; + + cancel_trigger(); + set_io_not_in_progress(); + if (_action.cancelled) { + return free_CacheVC(this); + } + + CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding); + if (!lock.is_locked()) { + Dbg(dbg_ctl_cache_scan_truss, "delay %p:scanObject", this); + mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); + return EVENT_CONT; + } + + if (!fragment) { // initialize for first read + fragment = 1; + scan_vol_map = make_vol_map(vol); + io.aiocb.aio_offset = next_in_map(vol, scan_vol_map, vol->vol_offset_to_offset(0)); + if (io.aiocb.aio_offset >= static_cast<off_t>(vol->skip + vol->len)) { + goto Lnext_vol; + } + io.aiocb.aio_nbytes = SCAN_BUF_SIZE; + io.aiocb.aio_buf = buf->data(); + io.action = this; + io.thread = AIO_CALLBACK_THREAD_ANY; + Dbg(dbg_ctl_cache_scan_truss, "read %p:scanObject", this); + goto Lread; + } + + if (!io.ok()) { + result = (void *)-ECACHE_READ_FAIL; + goto Ldone; + } + + doc = reinterpret_cast<Doc *>(buf->data() + offset); + // If there is data in the buffer before the start that is from a partial object read previously + // Fix things as if we read it this time. + if (scan_fix_buffer_offset) { + io.aio_result += scan_fix_buffer_offset; + io.aiocb.aio_nbytes += scan_fix_buffer_offset; + io.aiocb.aio_offset -= scan_fix_buffer_offset; + io.aiocb.aio_buf = static_cast<char *>(io.aiocb.aio_buf) - scan_fix_buffer_offset; + scan_fix_buffer_offset = 0; + } + while (static_cast<off_t>(reinterpret_cast<char *>(doc) - buf->data()) + next_object_len < + static_cast<off_t>(io.aiocb.aio_nbytes)) { + might_need_overlap_read = false; + doc = reinterpret_cast<Doc *>(reinterpret_cast<char *>(doc) + next_object_len); + next_object_len = vol->round_to_approx_size(doc->len); + int i; + bool changed; + + if (doc->magic != DOC_MAGIC) { + next_object_len = CACHE_BLOCK_SIZE; + Dbg(dbg_ctl_cache_scan_truss, "blockskip %p:scanObject", this); + continue; + } + + if (doc->doc_type != CACHE_FRAG_TYPE_HTTP || !doc->hlen) { + goto Lskip; + } + + last_collision = nullptr; + while (true) { + if (!dir_probe(&doc->first_key, vol, &dir, &last_collision)) { + goto Lskip; + } + if (!dir_agg_valid(vol, &dir) || !dir_head(&dir) || + (vol->vol_offset(&dir) != io.aiocb.aio_offset + (reinterpret_cast<char *>(doc) - buf->data()))) { + continue; + } + break; + } + if (doc->data() - buf->data() > static_cast<int>(io.aiocb.aio_nbytes)) { + might_need_overlap_read = true; + goto Lskip; + } + { + char *tmp = doc->hdr(); + int len = doc->hlen; + while (len > 0) { + int r = HTTPInfo::unmarshal(tmp, len, buf.get()); + if (r < 0) { + ink_assert(!"CacheVC::scanObject unmarshal failed"); + goto Lskip; + } + len -= r; + tmp += r; + } + } + if (this->load_http_info(&vector, doc) != doc->hlen) { + goto Lskip; + } + changed = false; + hostinfo_copied = false; + for (i = 0; i < vector.count(); i++) { + if (!vector.get(i)->valid()) { + goto Lskip; + } + if (!hostinfo_copied) { + memccpy(hname, vector.get(i)->request_get()->host_get(&hlen), 0, 500); + hname[hlen] = 0; + Dbg(dbg_ctl_cache_scan, "hostname = '%s', hostlen = %d", hname, hlen); + hostinfo_copied = true; + } + vector.get(i)->object_key_get(&key); + alternate_index = i; + // verify that the earliest block exists, reducing 'false hit' callbacks + if (!(key == doc->key)) { + last_collision = nullptr; + if (!dir_probe(&key, vol, &earliest_dir, &last_collision)) { + continue; + } + } + earliest_key = key; + int result1 = _action.continuation->handleEvent(CACHE_EVENT_SCAN_OBJECT, vector.get(i)); + switch (result1) { + case CACHE_SCAN_RESULT_CONTINUE: + continue; + case CACHE_SCAN_RESULT_DELETE: + changed = true; + vector.remove(i, true); + i--; + continue; + case CACHE_SCAN_RESULT_DELETE_ALL_ALTERNATES: + changed = true; + vector.clear(); + i = 0; + break; + case CACHE_SCAN_RESULT_UPDATE: + ink_assert(alternate_index >= 0); + vector.insert(&alternate, alternate_index); + if (!vector.get(alternate_index)->valid()) { + continue; + } + changed = true; + continue; + case EVENT_DONE: + goto Lcancel; + default: + ink_assert(!"unexpected CACHE_SCAN_RESULT"); + continue; + } + } + if (changed) { + if (!vector.count()) { + ink_assert(hostinfo_copied); + SET_HANDLER(&CacheVC::scanRemoveDone); + // force remove even if there is a writer + cacheProcessor.remove(this, &doc->first_key, CACHE_FRAG_TYPE_HTTP, hname, hlen); + return EVENT_CONT; + } else { + offset = reinterpret_cast<char *>(doc) - buf->data(); + write_len = 0; + frag_type = CACHE_FRAG_TYPE_HTTP; + f.use_first_key = 1; + f.evac_vector = 1; + alternate_index = CACHE_ALT_REMOVED; + writer_lock_retry = 0; + + first_key = key = doc->first_key; + earliest_key.clear(); + + SET_HANDLER(&CacheVC::scanOpenWrite); + return scanOpenWrite(EVENT_NONE, nullptr); + } + } + continue; + Lskip:; + } + vector.clear(); + // If we had an object that went past the end of the buffer, and it is small enough to fix, + // fix it. + if (might_need_overlap_read && + (static_cast<off_t>(reinterpret_cast<char *>(doc) - buf->data()) + next_object_len > + static_cast<off_t>(io.aiocb.aio_nbytes)) && + next_object_len > 0) { + off_t partial_object_len = io.aiocb.aio_nbytes - (reinterpret_cast<char *>(doc) - buf->data()); + // Copy partial object to beginning of the buffer. + memmove(buf->data(), reinterpret_cast<char *>(doc), partial_object_len); + io.aiocb.aio_offset += io.aiocb.aio_nbytes; + io.aiocb.aio_nbytes = SCAN_BUF_SIZE - partial_object_len; + io.aiocb.aio_buf = buf->data() + partial_object_len; + scan_fix_buffer_offset = partial_object_len; + } else { // Normal case, where we ended on a object boundary. + io.aiocb.aio_offset += (reinterpret_cast<char *>(doc) - buf->data()) + next_object_len; + Dbg(dbg_ctl_cache_scan_truss, "next %p:scanObject %" PRId64, this, (int64_t)io.aiocb.aio_offset); + io.aiocb.aio_offset = next_in_map(vol, scan_vol_map, io.aiocb.aio_offset); + Dbg(dbg_ctl_cache_scan_truss, "next_in_map %p:scanObject %" PRId64, this, (int64_t)io.aiocb.aio_offset); + io.aiocb.aio_nbytes = SCAN_BUF_SIZE; + io.aiocb.aio_buf = buf->data(); + scan_fix_buffer_offset = 0; + } + + if (io.aiocb.aio_offset >= vol->skip + vol->len) { + Lnext_vol: + SET_HANDLER(&CacheVC::scanVol); + eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay)); + return EVENT_CONT; + } + +Lread: + io.aiocb.aio_fildes = vol->fd; + if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast<off_t>(vol->skip + vol->len)) { + io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset; + } + offset = 0; + ink_assert(ink_aio_read(&io) >= 0); + Dbg(dbg_ctl_cache_scan_truss, "read %p:scanObject %" PRId64 " %zu", this, (int64_t)io.aiocb.aio_offset, + (size_t)io.aiocb.aio_nbytes); + return EVENT_CONT; + +Ldone: + Dbg(dbg_ctl_cache_scan_truss, "done %p:scanObject", this); + _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, result); +Lcancel: + return free_CacheVC(this); +} + +int +CacheVC::scanRemoveDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) +{ + Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanRemoveDone", this); + Dbg(dbg_ctl_cache_scan, "remove done."); + alternate.destroy(); + SET_HANDLER(&CacheVC::scanObject); + return handleEvent(EVENT_IMMEDIATE, nullptr); +} + +int +CacheVC::scanOpenWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) +{ + Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanOpenWrite", this); + cancel_trigger(); + // get volume lock + if (writer_lock_retry > SCAN_WRITER_LOCK_MAX_RETRY) { + int r = _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_BLOCKED, nullptr); + Dbg(dbg_ctl_cache_scan, "still haven't got the writer lock, asking user.."); + switch (r) { + case CACHE_SCAN_RESULT_RETRY: + writer_lock_retry = 0; + break; + case CACHE_SCAN_RESULT_CONTINUE: + SET_HANDLER(&CacheVC::scanObject); + return scanObject(EVENT_IMMEDIATE, nullptr); + } + } + int ret = 0; + { + CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding); + if (!lock.is_locked()) { + Dbg(dbg_ctl_cache_scan, "vol->mutex %p:scanOpenWrite", this); + VC_SCHED_LOCK_RETRY(); + } + + Dbg(dbg_ctl_cache_scan, "trying for writer lock"); + if (vol->open_write(this, false, 1)) { + writer_lock_retry++; + SET_HANDLER(&CacheVC::scanOpenWrite); + mutex->thread_holding->schedule_in_local(this, scan_msec_delay); + return EVENT_CONT; + } + + ink_assert(this->od); + // put all the alternates in the open directory vector + int alt_count = vector.count(); + for (int i = 0; i < alt_count; i++) { + write_vector->insert(vector.get(i)); + } + od->writing_vec = true; + vector.clear(false); + // check that the directory entry was not overwritten + // if so return failure + Dbg(dbg_ctl_cache_scan, "got writer lock"); + Dir *l = nullptr; + Dir d; + Doc *doc = reinterpret_cast<Doc *>(buf->data() + offset); + offset = reinterpret_cast<char *>(doc) - buf->data() + vol->round_to_approx_size(doc->len); + // if the doc contains some data, then we need to create + // a new directory entry for this fragment. Remember the + // offset and the key in earliest_key + dir_assign(&od->first_dir, &dir); + if (doc->total_len) { + dir_assign(&od->single_doc_dir, &dir); + dir_set_tag(&od->single_doc_dir, doc->key.slice32(2)); + od->single_doc_key = doc->key; + od->move_resident_alt = true; + } + + while (true) { + if (!dir_probe(&first_key, vol, &d, &l)) { + vol->close_write(this); + _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_FAILED, nullptr); + SET_HANDLER(&CacheVC::scanObject); + return handleEvent(EVENT_IMMEDIATE, nullptr); + } + if (memcmp(&dir, &d, SIZEOF_DIR)) { + Dbg(dbg_ctl_cache_scan, "dir entry has changed"); + continue; + } + break; + } + + // the document was not modified + // we are safe from now on as we hold the + // writer lock on the doc + if (f.evac_vector) { + header_len = write_vector->marshal_length(); + } + SET_HANDLER(&CacheVC::scanUpdateDone); + ret = do_write_call(); + } + if (ret == EVENT_RETURN) { + return handleEvent(AIO_EVENT_DONE, nullptr); + } + return ret; +} + +int +CacheVC::scanUpdateDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) +{ + Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanUpdateDone", this); + cancel_trigger(); + // get volume lock + CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding); + if (lock.is_locked()) { + // insert a directory entry for the previous fragment + dir_overwrite(&first_key, vol, &dir, &od->first_dir, false); + if (od->move_resident_alt) { + dir_insert(&od->single_doc_key, vol, &od->single_doc_dir); + } + ink_assert(vol->open_read(&first_key)); + ink_assert(this->od); + vol->close_write(this); + SET_HANDLER(&CacheVC::scanObject); + return handleEvent(EVENT_IMMEDIATE, nullptr); + } else { + mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); + return EVENT_CONT; + } +} + +// set_http_info must be called before do_io_write +// cluster vc does an optimization where it calls do_io_write() before +// calling set_http_info(), but it guarantees that the info will +// be set before transferring any bytes +void +CacheVC::set_http_info(CacheHTTPInfo *ainfo) +{ + ink_assert(!total_len); + if (f.update) { + ainfo->object_key_set(update_key); + ainfo->object_size_set(update_len); + } else { + ainfo->object_key_set(earliest_key); + // don't know the total len yet + } + + MIMEField *field = ainfo->m_alt->m_response_hdr.field_find(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH); + if ((field && !field->value_get_int64()) || ainfo->m_alt->m_response_hdr.status_get() == HTTP_STATUS_NO_CONTENT) { + f.allow_empty_doc = 1; + // Set the object size here to zero in case this is a cache replace where the new object + // length is zero but the old object was not. + ainfo->object_size_set(0); + } else { + f.allow_empty_doc = 0; + } + + alternate.copy_shallow(ainfo); + ainfo->clear(); +} + +void +CacheVC::get_http_info(CacheHTTPInfo **ainfo) +{ + *ainfo = &(this)->alternate; +} + +HTTPInfo::FragOffset * +CacheVC::get_frag_table() +{ + ink_assert(alternate.valid()); + return alternate.valid() ? alternate.get_frag_table() : nullptr; +} + +bool +CacheVC::is_pread_capable() +{ + return !f.read_from_writer_called; +} + +bool +CacheVC::set_pin_in_cache(time_t time_pin) +{ + if (total_len) { + ink_assert(!"should Pin the document before writing"); + return false; + } + if (vio.op != VIO::WRITE) { + ink_assert(!"Pinning only allowed while writing objects to the cache"); + return false; + } + pin_in_cache = time_pin; + return true; +} + +time_t +CacheVC::get_pin_in_cache() +{ + return pin_in_cache; +} diff --git a/iocore/cache/CacheVC.h b/iocore/cache/CacheVC.h new file mode 100644 index 0000000000..e345c99c07 --- /dev/null +++ b/iocore/cache/CacheVC.h @@ -0,0 +1,337 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +/* +// inkcache +#include "I_Cache.h" +#include "P_CacheDir.h" +#include "P_CacheVol.h" +*/ +#include "P_CacheHttp.h" + +// aio +#include "I_AIO.h" + +// inkevent +#include "I_Action.h" +#include "I_Continuation.h" +#include "I_Event.h" +#include "I_IOBuffer.h" +#include "I_VIO.h" + +// tscore +#include "tscore/ink_hrtime.h" +#include "tscore/List.h" +#include "tscore/Ptr.h" + +#include <cstdint> + +struct Vol; + +struct CacheVC : public CacheVConnection { + CacheVC(); + + VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) override; + VIO *do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *buf, int64_t offset) override; + VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner = false) override; + void do_io_close(int lerrno = -1) override; + void reenable(VIO *avio) override; + void reenable_re(VIO *avio) override; + bool get_data(int i, void *data) override; + bool set_data(int i, void *data) override; + + bool + is_ram_cache_hit() const override + { + ink_assert(vio.op == VIO::READ); + return !f.not_from_ram_cache; + } + + int + get_header(void **ptr, int *len) override + { + if (first_buf) { + Doc *doc = (Doc *)first_buf->data(); + *ptr = doc->hdr(); + *len = doc->hlen; + return 0; + } + + return -1; + } + + int + set_header(void *ptr, int len) override + { + header_to_write = ptr; + header_to_write_len = len; + return 0; + } + + int + get_single_data(void **ptr, int *len) override + { + if (first_buf) { + Doc *doc = (Doc *)first_buf->data(); + if (doc->data_len() == doc->total_len) { + *ptr = doc->data(); + *len = doc->data_len(); + return 0; + } + } + + return -1; + } + + int + get_volume_number() const override + { + if (vol && vol->cache_vol) { + return vol->cache_vol->vol_number; + } + + return -1; + } + + const char * + get_disk_path() const override + { + if (vol && vol->disk) { + return vol->disk->path; + } + + return nullptr; + } + + bool + is_compressed_in_ram() const override + { + ink_assert(vio.op == VIO::READ); + return f.compressed_in_ram; + } + + bool writer_done(); + int calluser(int event); + int callcont(int event); + int die(); + int dead(int event, Event *e); + + int handleReadDone(int event, Event *e); + int handleRead(int event, Event *e); + int do_read_call(CacheKey *akey); + int handleWrite(int event, Event *e); + int handleWriteLock(int event, Event *e); + int do_write_call(); + int do_write_lock(); + int do_write_lock_call(); + int do_sync(uint32_t target_write_serial); + + int openReadClose(int event, Event *e); + int openReadReadDone(int event, Event *e); + int openReadMain(int event, Event *e); + int openReadStartEarliest(int event, Event *e); + int openReadVecWrite(int event, Event *e); + int openReadStartHead(int event, Event *e); + int openReadFromWriter(int event, Event *e); + int openReadFromWriterMain(int event, Event *e); + int openReadFromWriterFailure(int event, Event *); + int openReadChooseWriter(int event, Event *e); + int openReadDirDelete(int event, Event *e); + + int openWriteCloseDir(int event, Event *e); + int openWriteCloseHeadDone(int event, Event *e); + int openWriteCloseHead(int event, Event *e); + int openWriteCloseDataDone(int event, Event *e); + int openWriteClose(int event, Event *e); + int openWriteRemoveVector(int event, Event *e); + int openWriteWriteDone(int event, Event *e); + int openWriteOverwrite(int event, Event *e); + int openWriteMain(int event, Event *e); + int openWriteStartDone(int event, Event *e); + int openWriteStartBegin(int event, Event *e); + + int updateVector(int event, Event *e); + int updateReadDone(int event, Event *e); + int updateVecWrite(int event, Event *e); + + int removeEvent(int event, Event *e); + + int scanVol(int event, Event *e); + int scanObject(int event, Event *e); + int scanUpdateDone(int event, Event *e); + int scanOpenWrite(int event, Event *e); + int scanRemoveDone(int event, Event *e); + + int + is_io_in_progress() + { + return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS; + } + void + set_io_not_in_progress() + { + io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS; + } + void + set_agg_write_in_progress() + { + io.aiocb.aio_fildes = AIO_AGG_WRITE_IN_PROGRESS; + } + int evacuateDocDone(int event, Event *e); + int evacuateReadHead(int event, Event *e); + + void cancel_trigger(); + int64_t get_object_size() override; + void set_http_info(CacheHTTPInfo *info) override; + void get_http_info(CacheHTTPInfo **info) override; + /** Get the fragment table. + @return The address of the start of the fragment table, + or @c nullptr if there is no fragment table. + */ + virtual HTTPInfo::FragOffset *get_frag_table(); + /** Load alt pointers and do fixups if needed. + @return Length of header data used for alternates. + */ + virtual uint32_t load_http_info(CacheHTTPInfoVector *info, struct Doc *doc, RefCountObj *block_ptr = nullptr); + bool is_pread_capable() override; + bool set_pin_in_cache(time_t time_pin) override; + time_t get_pin_in_cache() override; + + // number of bytes to memset to 0 in the CacheVC when we free + // it. All member variables starting from vio are memset to 0. + // This variable is initialized in CacheVC constructor. + static int size_to_init; + + // Start Region A + // This set of variables are not reset when the cacheVC is freed. + // A CacheVC must set these to the correct values whenever needed + // These are variables that are always set to the correct values + // before being used by the CacheVC + CacheKey key, first_key, earliest_key, update_key; + Dir dir, earliest_dir, overwrite_dir, first_dir; + // end Region A + + // Start Region B + // These variables are individually cleared or reset when the + // CacheVC is freed. All these variables must be reset/cleared + // in free_CacheVC. + Action _action; + CacheHTTPHdr request; + CacheHTTPInfoVector vector; + CacheHTTPInfo alternate; + Ptr<IOBufferData> buf; + Ptr<IOBufferData> first_buf; + Ptr<IOBufferBlock> blocks; // data available to write + Ptr<IOBufferBlock> writer_buf; + + OpenDirEntry *od = nullptr; + AIOCallbackInternal io; + int alternate_index = CACHE_ALT_INDEX_DEFAULT; // preferred position in vector + LINK(CacheVC, opendir_link); +#ifdef CACHE_STAT_PAGES + LINK(CacheVC, stat_link); +#endif + // end Region B + + // Start Region C + // These variables are memset to 0 when the structure is freed. + // The size of this region is size_to_init which is initialized + // in the CacheVC constructor. It assumes that vio is the start + // of this region. + // NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the + // size_to_init initialization + VIO vio; + CacheFragType frag_type; + CacheHTTPInfo *info; + CacheHTTPInfoVector *write_vector; + const OverridableHttpConfigParams *params; + int header_len; // for communicating with agg_copy + int frag_len; // for communicating with agg_copy + uint32_t write_len; // for communicating with agg_copy + uint32_t agg_len; // for communicating with aggWrite + uint32_t write_serial; // serial of the final write for SYNC + Vol *vol; + Dir *last_collision; + Event *trigger; + CacheKey *read_key; + ContinuationHandler save_handler; + time_t pin_in_cache; + ink_hrtime start_time; + int op_type; // Index into the metrics array for this operation, rather than a CacheOpType (fewer casts) + int recursive; + int closed; + uint64_t seek_to; // pread offset + int64_t offset; // offset into 'blocks' of data to write + int64_t writer_offset; // offset of the writer for reading from a writer + int64_t length; // length of data available to write + int64_t doc_pos; // read position in 'buf' + uint64_t write_pos; // length written + uint64_t total_len; // total length written and available to write + uint64_t doc_len; // total_length (of the selected alternate for HTTP) + uint64_t update_len; + int fragment; + int scan_msec_delay; + CacheVC *write_vc; + char *hostname; + int host_len; + int header_to_write_len; + void *header_to_write; + short writer_lock_retry; + union { + uint32_t flags; + struct { + unsigned int use_first_key : 1; + unsigned int overwrite : 1; // overwrite first_key Dir if it exists + unsigned int close_complete : 1; // WRITE_COMPLETE is final + unsigned int sync : 1; // write to be committed to durable storage before WRITE_COMPLETE + unsigned int evacuator : 1; + unsigned int single_fragment : 1; + unsigned int evac_vector : 1; + unsigned int lookup : 1; + unsigned int update : 1; + unsigned int remove : 1; + unsigned int remove_aborted_writers : 1; + unsigned int open_read_timeout : 1; // UNUSED + unsigned int data_done : 1; + unsigned int read_from_writer_called : 1; + unsigned int not_from_ram_cache : 1; // entire object was from ram cache + unsigned int rewrite_resident_alt : 1; + unsigned int readers : 1; + unsigned int doc_from_ram_cache : 1; + unsigned int hit_evacuate : 1; + unsigned int compressed_in_ram : 1; // compressed state in ram cache + unsigned int allow_empty_doc : 1; // used for cache empty http document + } f; + }; + // BTF optimization used to skip reading stuff in cache partition that doesn't contain any + // dir entries. + char *scan_vol_map; + // BTF fix to handle objects that overlapped over two different reads, + // this is how much we need to back up the buffer to get the start of the overlapping object. + off_t scan_fix_buffer_offset; + // end region C +}; + +LINK_DEFINITION(CacheVC, opendir_link) diff --git a/iocore/cache/CacheVol.cc b/iocore/cache/CacheVol.cc index bb34aa3263..ed1f59a293 100644 --- a/iocore/cache/CacheVol.cc +++ b/iocore/cache/CacheVol.cc @@ -23,17 +23,14 @@ #include "P_Cache.h" -#define SCAN_BUF_SIZE RECOVERY_SIZE -#define SCAN_WRITER_LOCK_MAX_RETRY 5 - namespace { - -DbgCtl dbg_ctl_cache_scan{"cache_scan"}; DbgCtl dbg_ctl_cache_scan_truss{"cache_scan_truss"}; - } // end anonymous namespace +#define SCAN_BUF_SIZE RECOVERY_SIZE +#define SCAN_WRITER_LOCK_MAX_RETRY 5 + Action * Cache::scan(Continuation *cont, const char *hostname, int host_len, int KB_per_second) { @@ -57,468 +54,3 @@ Cache::scan(Continuation *cont, const char *hostname, int host_len, int KB_per_s cont->handleEvent(CACHE_EVENT_SCAN, c); return &c->_action; } - -int -CacheVC::scanVol(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) -{ - Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanVol", this); - if (_action.cancelled) { - return free_CacheVC(this); - } - - ReplaceablePtr<CacheHostTable>::ScopedReader hosttable(&theCache->hosttable); - - const CacheHostRecord *rec = &hosttable->gen_host_rec; - if (host_len) { - CacheHostResult res; - hosttable->Match(hostname, host_len, &res); - if (res.record) { - rec = res.record; - } - } - - if (!vol) { - if (!rec->num_vols) { - goto Ldone; - } - vol = rec->vols[0]; - } else { - for (int i = 0; i < rec->num_vols - 1; i++) { - if (vol == rec->vols[i]) { - vol = rec->vols[i + 1]; - goto Lcont; - } - } - goto Ldone; - } -Lcont: - fragment = 0; - SET_HANDLER(&CacheVC::scanObject); - eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay)); - return EVENT_CONT; -Ldone: - _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, nullptr); - return free_CacheVC(this); -} - -/* Next block with some data in it in this partition. Returns end of partition if no more - * locations. - * - * d - Vol - * vol_map - precalculated map - * offset - offset to start looking at (and data at this location has not been read yet). */ -static off_t -next_in_map(Vol *vol, char *vol_map, off_t offset) -{ - off_t start_offset = vol->vol_offset_to_offset(0); - off_t new_off = (offset - start_offset); - off_t vol_len = vol->vol_relative_length(start_offset); - - while (new_off < vol_len && !vol_map[new_off / SCAN_BUF_SIZE]) { - new_off += SCAN_BUF_SIZE; - } - if (new_off >= vol_len) { - return vol_len + start_offset; - } - return new_off + start_offset; -} - -// Function in CacheDir.cc that we need for make_vol_map(). -int dir_bucket_loop_fix(Dir *start_dir, int s, Vol *vol); - -// TODO: If we used a bit vector, we could make a smaller map structure. -// TODO: If we saved a high water mark we could have a smaller buf, and avoid searching it -// when we are asked about the highest interesting offset. -/* Make map of what blocks in partition are used. - * - * d - Vol to make a map of. */ -static char * -make_vol_map(Vol *vol) -{ - // Map will be one byte for each SCAN_BUF_SIZE bytes. - off_t start_offset = vol->vol_offset_to_offset(0); - off_t vol_len = vol->vol_relative_length(start_offset); - size_t map_len = (vol_len + (SCAN_BUF_SIZE - 1)) / SCAN_BUF_SIZE; - char *vol_map = static_cast<char *>(ats_malloc(map_len)); - - memset(vol_map, 0, map_len); - - // Scan directories. - // Copied from dir_entries_used() and modified to fill in the map instead. - for (int s = 0; s < vol->segments; s++) { - Dir *seg = vol->dir_segment(s); - for (int b = 0; b < vol->buckets; b++) { - Dir *e = dir_bucket(b, seg); - if (dir_bucket_loop_fix(e, s, vol)) { - break; - } - while (e) { - if (dir_offset(e) && dir_valid(vol, e) && dir_agg_valid(vol, e) && dir_head(e)) { - off_t offset = vol->vol_offset(e) - start_offset; - if (offset <= vol_len) { - vol_map[offset / SCAN_BUF_SIZE] = 1; - } - } - e = next_dir(e, seg); - if (!e) { - break; - } - } - } - } - return vol_map; -} - -int -CacheVC::scanObject(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) -{ - Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanObject", this); - - Doc *doc = nullptr; - void *result = nullptr; - int hlen = 0; - char hname[500]; - bool hostinfo_copied = false; - off_t next_object_len = 0; - bool might_need_overlap_read = false; - - cancel_trigger(); - set_io_not_in_progress(); - if (_action.cancelled) { - return free_CacheVC(this); - } - - CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding); - if (!lock.is_locked()) { - Dbg(dbg_ctl_cache_scan_truss, "delay %p:scanObject", this); - mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); - return EVENT_CONT; - } - - if (!fragment) { // initialize for first read - fragment = 1; - scan_vol_map = make_vol_map(vol); - io.aiocb.aio_offset = next_in_map(vol, scan_vol_map, vol->vol_offset_to_offset(0)); - if (io.aiocb.aio_offset >= static_cast<off_t>(vol->skip + vol->len)) { - goto Lnext_vol; - } - io.aiocb.aio_nbytes = SCAN_BUF_SIZE; - io.aiocb.aio_buf = buf->data(); - io.action = this; - io.thread = AIO_CALLBACK_THREAD_ANY; - Dbg(dbg_ctl_cache_scan_truss, "read %p:scanObject", this); - goto Lread; - } - - if (!io.ok()) { - result = (void *)-ECACHE_READ_FAIL; - goto Ldone; - } - - doc = reinterpret_cast<Doc *>(buf->data() + offset); - // If there is data in the buffer before the start that is from a partial object read previously - // Fix things as if we read it this time. - if (scan_fix_buffer_offset) { - io.aio_result += scan_fix_buffer_offset; - io.aiocb.aio_nbytes += scan_fix_buffer_offset; - io.aiocb.aio_offset -= scan_fix_buffer_offset; - io.aiocb.aio_buf = static_cast<char *>(io.aiocb.aio_buf) - scan_fix_buffer_offset; - scan_fix_buffer_offset = 0; - } - while (static_cast<off_t>(reinterpret_cast<char *>(doc) - buf->data()) + next_object_len < - static_cast<off_t>(io.aiocb.aio_nbytes)) { - might_need_overlap_read = false; - doc = reinterpret_cast<Doc *>(reinterpret_cast<char *>(doc) + next_object_len); - next_object_len = vol->round_to_approx_size(doc->len); - int i; - bool changed; - - if (doc->magic != DOC_MAGIC) { - next_object_len = CACHE_BLOCK_SIZE; - Dbg(dbg_ctl_cache_scan_truss, "blockskip %p:scanObject", this); - continue; - } - - if (doc->doc_type != CACHE_FRAG_TYPE_HTTP || !doc->hlen) { - goto Lskip; - } - - last_collision = nullptr; - while (true) { - if (!dir_probe(&doc->first_key, vol, &dir, &last_collision)) { - goto Lskip; - } - if (!dir_agg_valid(vol, &dir) || !dir_head(&dir) || - (vol->vol_offset(&dir) != io.aiocb.aio_offset + (reinterpret_cast<char *>(doc) - buf->data()))) { - continue; - } - break; - } - if (doc->data() - buf->data() > static_cast<int>(io.aiocb.aio_nbytes)) { - might_need_overlap_read = true; - goto Lskip; - } - { - char *tmp = doc->hdr(); - int len = doc->hlen; - while (len > 0) { - int r = HTTPInfo::unmarshal(tmp, len, buf.get()); - if (r < 0) { - ink_assert(!"CacheVC::scanObject unmarshal failed"); - goto Lskip; - } - len -= r; - tmp += r; - } - } - if (this->load_http_info(&vector, doc) != doc->hlen) { - goto Lskip; - } - changed = false; - hostinfo_copied = false; - for (i = 0; i < vector.count(); i++) { - if (!vector.get(i)->valid()) { - goto Lskip; - } - if (!hostinfo_copied) { - memccpy(hname, vector.get(i)->request_get()->host_get(&hlen), 0, 500); - hname[hlen] = 0; - Dbg(dbg_ctl_cache_scan, "hostname = '%s', hostlen = %d", hname, hlen); - hostinfo_copied = true; - } - vector.get(i)->object_key_get(&key); - alternate_index = i; - // verify that the earliest block exists, reducing 'false hit' callbacks - if (!(key == doc->key)) { - last_collision = nullptr; - if (!dir_probe(&key, vol, &earliest_dir, &last_collision)) { - continue; - } - } - earliest_key = key; - int result1 = _action.continuation->handleEvent(CACHE_EVENT_SCAN_OBJECT, vector.get(i)); - switch (result1) { - case CACHE_SCAN_RESULT_CONTINUE: - continue; - case CACHE_SCAN_RESULT_DELETE: - changed = true; - vector.remove(i, true); - i--; - continue; - case CACHE_SCAN_RESULT_DELETE_ALL_ALTERNATES: - changed = true; - vector.clear(); - i = 0; - break; - case CACHE_SCAN_RESULT_UPDATE: - ink_assert(alternate_index >= 0); - vector.insert(&alternate, alternate_index); - if (!vector.get(alternate_index)->valid()) { - continue; - } - changed = true; - continue; - case EVENT_DONE: - goto Lcancel; - default: - ink_assert(!"unexpected CACHE_SCAN_RESULT"); - continue; - } - } - if (changed) { - if (!vector.count()) { - ink_assert(hostinfo_copied); - SET_HANDLER(&CacheVC::scanRemoveDone); - // force remove even if there is a writer - cacheProcessor.remove(this, &doc->first_key, CACHE_FRAG_TYPE_HTTP, hname, hlen); - return EVENT_CONT; - } else { - offset = reinterpret_cast<char *>(doc) - buf->data(); - write_len = 0; - frag_type = CACHE_FRAG_TYPE_HTTP; - f.use_first_key = 1; - f.evac_vector = 1; - alternate_index = CACHE_ALT_REMOVED; - writer_lock_retry = 0; - - first_key = key = doc->first_key; - earliest_key.clear(); - - SET_HANDLER(&CacheVC::scanOpenWrite); - return scanOpenWrite(EVENT_NONE, nullptr); - } - } - continue; - Lskip:; - } - vector.clear(); - // If we had an object that went past the end of the buffer, and it is small enough to fix, - // fix it. - if (might_need_overlap_read && - (static_cast<off_t>(reinterpret_cast<char *>(doc) - buf->data()) + next_object_len > - static_cast<off_t>(io.aiocb.aio_nbytes)) && - next_object_len > 0) { - off_t partial_object_len = io.aiocb.aio_nbytes - (reinterpret_cast<char *>(doc) - buf->data()); - // Copy partial object to beginning of the buffer. - memmove(buf->data(), reinterpret_cast<char *>(doc), partial_object_len); - io.aiocb.aio_offset += io.aiocb.aio_nbytes; - io.aiocb.aio_nbytes = SCAN_BUF_SIZE - partial_object_len; - io.aiocb.aio_buf = buf->data() + partial_object_len; - scan_fix_buffer_offset = partial_object_len; - } else { // Normal case, where we ended on a object boundary. - io.aiocb.aio_offset += (reinterpret_cast<char *>(doc) - buf->data()) + next_object_len; - Dbg(dbg_ctl_cache_scan_truss, "next %p:scanObject %" PRId64, this, (int64_t)io.aiocb.aio_offset); - io.aiocb.aio_offset = next_in_map(vol, scan_vol_map, io.aiocb.aio_offset); - Dbg(dbg_ctl_cache_scan_truss, "next_in_map %p:scanObject %" PRId64, this, (int64_t)io.aiocb.aio_offset); - io.aiocb.aio_nbytes = SCAN_BUF_SIZE; - io.aiocb.aio_buf = buf->data(); - scan_fix_buffer_offset = 0; - } - - if (io.aiocb.aio_offset >= vol->skip + vol->len) { - Lnext_vol: - SET_HANDLER(&CacheVC::scanVol); - eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay)); - return EVENT_CONT; - } - -Lread: - io.aiocb.aio_fildes = vol->fd; - if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast<off_t>(vol->skip + vol->len)) { - io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset; - } - offset = 0; - ink_assert(ink_aio_read(&io) >= 0); - Dbg(dbg_ctl_cache_scan_truss, "read %p:scanObject %" PRId64 " %zu", this, (int64_t)io.aiocb.aio_offset, - (size_t)io.aiocb.aio_nbytes); - return EVENT_CONT; - -Ldone: - Dbg(dbg_ctl_cache_scan_truss, "done %p:scanObject", this); - _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, result); -Lcancel: - return free_CacheVC(this); -} - -int -CacheVC::scanRemoveDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) -{ - Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanRemoveDone", this); - Dbg(dbg_ctl_cache_scan, "remove done."); - alternate.destroy(); - SET_HANDLER(&CacheVC::scanObject); - return handleEvent(EVENT_IMMEDIATE, nullptr); -} - -int -CacheVC::scanOpenWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) -{ - Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanOpenWrite", this); - cancel_trigger(); - // get volume lock - if (writer_lock_retry > SCAN_WRITER_LOCK_MAX_RETRY) { - int r = _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_BLOCKED, nullptr); - Dbg(dbg_ctl_cache_scan, "still haven't got the writer lock, asking user.."); - switch (r) { - case CACHE_SCAN_RESULT_RETRY: - writer_lock_retry = 0; - break; - case CACHE_SCAN_RESULT_CONTINUE: - SET_HANDLER(&CacheVC::scanObject); - return scanObject(EVENT_IMMEDIATE, nullptr); - } - } - int ret = 0; - { - CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding); - if (!lock.is_locked()) { - Dbg(dbg_ctl_cache_scan, "vol->mutex %p:scanOpenWrite", this); - VC_SCHED_LOCK_RETRY(); - } - - Dbg(dbg_ctl_cache_scan, "trying for writer lock"); - if (vol->open_write(this, false, 1)) { - writer_lock_retry++; - SET_HANDLER(&CacheVC::scanOpenWrite); - mutex->thread_holding->schedule_in_local(this, scan_msec_delay); - return EVENT_CONT; - } - - ink_assert(this->od); - // put all the alternates in the open directory vector - int alt_count = vector.count(); - for (int i = 0; i < alt_count; i++) { - write_vector->insert(vector.get(i)); - } - od->writing_vec = true; - vector.clear(false); - // check that the directory entry was not overwritten - // if so return failure - Dbg(dbg_ctl_cache_scan, "got writer lock"); - Dir *l = nullptr; - Dir d; - Doc *doc = reinterpret_cast<Doc *>(buf->data() + offset); - offset = reinterpret_cast<char *>(doc) - buf->data() + vol->round_to_approx_size(doc->len); - // if the doc contains some data, then we need to create - // a new directory entry for this fragment. Remember the - // offset and the key in earliest_key - dir_assign(&od->first_dir, &dir); - if (doc->total_len) { - dir_assign(&od->single_doc_dir, &dir); - dir_set_tag(&od->single_doc_dir, doc->key.slice32(2)); - od->single_doc_key = doc->key; - od->move_resident_alt = true; - } - - while (true) { - if (!dir_probe(&first_key, vol, &d, &l)) { - vol->close_write(this); - _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_FAILED, nullptr); - SET_HANDLER(&CacheVC::scanObject); - return handleEvent(EVENT_IMMEDIATE, nullptr); - } - if (memcmp(&dir, &d, SIZEOF_DIR)) { - Dbg(dbg_ctl_cache_scan, "dir entry has changed"); - continue; - } - break; - } - - // the document was not modified - // we are safe from now on as we hold the - // writer lock on the doc - if (f.evac_vector) { - header_len = write_vector->marshal_length(); - } - SET_HANDLER(&CacheVC::scanUpdateDone); - ret = do_write_call(); - } - if (ret == EVENT_RETURN) { - return handleEvent(AIO_EVENT_DONE, nullptr); - } - return ret; -} - -int -CacheVC::scanUpdateDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) -{ - Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanUpdateDone", this); - cancel_trigger(); - // get volume lock - CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding); - if (lock.is_locked()) { - // insert a directory entry for the previous fragment - dir_overwrite(&first_key, vol, &dir, &od->first_dir, false); - if (od->move_resident_alt) { - dir_insert(&od->single_doc_key, vol, &od->single_doc_dir); - } - ink_assert(vol->open_read(&first_key)); - ink_assert(this->od); - vol->close_write(this); - SET_HANDLER(&CacheVC::scanObject); - return handleEvent(EVENT_IMMEDIATE, nullptr); - } else { - mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); - return EVENT_CONT; - } -} diff --git a/iocore/cache/P_CacheDir.h b/iocore/cache/P_CacheDir.h index 2721ee0eba..2b5cd7c9a2 100644 --- a/iocore/cache/P_CacheDir.h +++ b/iocore/cache/P_CacheDir.h @@ -28,6 +28,9 @@ #include "I_EventSystem.h" #include "I_Continuation.h" +// aio +#include "I_AIO.h" + struct Vol; struct InterimCacheVol; struct CacheVC; diff --git a/iocore/cache/P_CacheInternal.h b/iocore/cache/P_CacheInternal.h index 8c6dba1aba..a102633475 100644 --- a/iocore/cache/P_CacheInternal.h +++ b/iocore/cache/P_CacheInternal.h @@ -31,6 +31,8 @@ #include "P_CacheHosting.h" #include "api/Metrics.h" +#include "CacheVC.h" + using ts::Metrics; struct EvacuationBlock; @@ -125,291 +127,6 @@ extern int cache_config_mutex_retry_delay; extern int cache_read_while_writer_retry_delay; extern int cache_config_read_while_writer_max_retries; -// CacheVC -struct CacheVC : public CacheVConnection { - CacheVC(); - - VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) override; - VIO *do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *buf, int64_t offset) override; - VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner = false) override; - void do_io_close(int lerrno = -1) override; - void reenable(VIO *avio) override; - void reenable_re(VIO *avio) override; - bool get_data(int i, void *data) override; - bool set_data(int i, void *data) override; - - bool - is_ram_cache_hit() const override - { - ink_assert(vio.op == VIO::READ); - return !f.not_from_ram_cache; - } - - int - get_header(void **ptr, int *len) override - { - if (first_buf) { - Doc *doc = (Doc *)first_buf->data(); - *ptr = doc->hdr(); - *len = doc->hlen; - return 0; - } - - return -1; - } - - int - set_header(void *ptr, int len) override - { - header_to_write = ptr; - header_to_write_len = len; - return 0; - } - - int - get_single_data(void **ptr, int *len) override - { - if (first_buf) { - Doc *doc = (Doc *)first_buf->data(); - if (doc->data_len() == doc->total_len) { - *ptr = doc->data(); - *len = doc->data_len(); - return 0; - } - } - - return -1; - } - - int - get_volume_number() const override - { - if (vol && vol->cache_vol) { - return vol->cache_vol->vol_number; - } - - return -1; - } - - const char * - get_disk_path() const override - { - if (vol && vol->disk) { - return vol->disk->path; - } - - return nullptr; - } - - bool - is_compressed_in_ram() const override - { - ink_assert(vio.op == VIO::READ); - return f.compressed_in_ram; - } - - bool writer_done(); - int calluser(int event); - int callcont(int event); - int die(); - int dead(int event, Event *e); - - int handleReadDone(int event, Event *e); - int handleRead(int event, Event *e); - int do_read_call(CacheKey *akey); - int handleWrite(int event, Event *e); - int handleWriteLock(int event, Event *e); - int do_write_call(); - int do_write_lock(); - int do_write_lock_call(); - int do_sync(uint32_t target_write_serial); - - int openReadClose(int event, Event *e); - int openReadReadDone(int event, Event *e); - int openReadMain(int event, Event *e); - int openReadStartEarliest(int event, Event *e); - int openReadVecWrite(int event, Event *e); - int openReadStartHead(int event, Event *e); - int openReadFromWriter(int event, Event *e); - int openReadFromWriterMain(int event, Event *e); - int openReadFromWriterFailure(int event, Event *); - int openReadChooseWriter(int event, Event *e); - int openReadDirDelete(int event, Event *e); - - int openWriteCloseDir(int event, Event *e); - int openWriteCloseHeadDone(int event, Event *e); - int openWriteCloseHead(int event, Event *e); - int openWriteCloseDataDone(int event, Event *e); - int openWriteClose(int event, Event *e); - int openWriteRemoveVector(int event, Event *e); - int openWriteWriteDone(int event, Event *e); - int openWriteOverwrite(int event, Event *e); - int openWriteMain(int event, Event *e); - int openWriteStartDone(int event, Event *e); - int openWriteStartBegin(int event, Event *e); - - int updateVector(int event, Event *e); - int updateReadDone(int event, Event *e); - int updateVecWrite(int event, Event *e); - - int removeEvent(int event, Event *e); - - int scanVol(int event, Event *e); - int scanObject(int event, Event *e); - int scanUpdateDone(int event, Event *e); - int scanOpenWrite(int event, Event *e); - int scanRemoveDone(int event, Event *e); - - int - is_io_in_progress() - { - return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS; - } - void - set_io_not_in_progress() - { - io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS; - } - void - set_agg_write_in_progress() - { - io.aiocb.aio_fildes = AIO_AGG_WRITE_IN_PROGRESS; - } - int evacuateDocDone(int event, Event *e); - int evacuateReadHead(int event, Event *e); - - void cancel_trigger(); - int64_t get_object_size() override; - void set_http_info(CacheHTTPInfo *info) override; - void get_http_info(CacheHTTPInfo **info) override; - /** Get the fragment table. - @return The address of the start of the fragment table, - or @c nullptr if there is no fragment table. - */ - virtual HTTPInfo::FragOffset *get_frag_table(); - /** Load alt pointers and do fixups if needed. - @return Length of header data used for alternates. - */ - virtual uint32_t load_http_info(CacheHTTPInfoVector *info, struct Doc *doc, RefCountObj *block_ptr = nullptr); - bool is_pread_capable() override; - bool set_pin_in_cache(time_t time_pin) override; - time_t get_pin_in_cache() override; - - // number of bytes to memset to 0 in the CacheVC when we free - // it. All member variables starting from vio are memset to 0. - // This variable is initialized in CacheVC constructor. - static int size_to_init; - - // Start Region A - // This set of variables are not reset when the cacheVC is freed. - // A CacheVC must set these to the correct values whenever needed - // These are variables that are always set to the correct values - // before being used by the CacheVC - CacheKey key, first_key, earliest_key, update_key; - Dir dir, earliest_dir, overwrite_dir, first_dir; - // end Region A - - // Start Region B - // These variables are individually cleared or reset when the - // CacheVC is freed. All these variables must be reset/cleared - // in free_CacheVC. - Action _action; - CacheHTTPHdr request; - CacheHTTPInfoVector vector; - CacheHTTPInfo alternate; - Ptr<IOBufferData> buf; - Ptr<IOBufferData> first_buf; - Ptr<IOBufferBlock> blocks; // data available to write - Ptr<IOBufferBlock> writer_buf; - - OpenDirEntry *od = nullptr; - AIOCallbackInternal io; - int alternate_index = CACHE_ALT_INDEX_DEFAULT; // preferred position in vector - LINK(CacheVC, opendir_link); -#ifdef CACHE_STAT_PAGES - LINK(CacheVC, stat_link); -#endif - // end Region B - - // Start Region C - // These variables are memset to 0 when the structure is freed. - // The size of this region is size_to_init which is initialized - // in the CacheVC constructor. It assumes that vio is the start - // of this region. - // NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the - // size_to_init initialization - VIO vio; - CacheFragType frag_type; - CacheHTTPInfo *info; - CacheHTTPInfoVector *write_vector; - const OverridableHttpConfigParams *params; - int header_len; // for communicating with agg_copy - int frag_len; // for communicating with agg_copy - uint32_t write_len; // for communicating with agg_copy - uint32_t agg_len; // for communicating with aggWrite - uint32_t write_serial; // serial of the final write for SYNC - Vol *vol; - Dir *last_collision; - Event *trigger; - CacheKey *read_key; - ContinuationHandler save_handler; - time_t pin_in_cache; - ink_hrtime start_time; - int op_type; // Index into the metrics array for this operation, rather than a CacheOpType (fewer casts) - int recursive; - int closed; - uint64_t seek_to; // pread offset - int64_t offset; // offset into 'blocks' of data to write - int64_t writer_offset; // offset of the writer for reading from a writer - int64_t length; // length of data available to write - int64_t doc_pos; // read position in 'buf' - uint64_t write_pos; // length written - uint64_t total_len; // total length written and available to write - uint64_t doc_len; // total_length (of the selected alternate for HTTP) - uint64_t update_len; - int fragment; - int scan_msec_delay; - CacheVC *write_vc; - char *hostname; - int host_len; - int header_to_write_len; - void *header_to_write; - short writer_lock_retry; - union { - uint32_t flags; - struct { - unsigned int use_first_key : 1; - unsigned int overwrite : 1; // overwrite first_key Dir if it exists - unsigned int close_complete : 1; // WRITE_COMPLETE is final - unsigned int sync : 1; // write to be committed to durable storage before WRITE_COMPLETE - unsigned int evacuator : 1; - unsigned int single_fragment : 1; - unsigned int evac_vector : 1; - unsigned int lookup : 1; - unsigned int update : 1; - unsigned int remove : 1; - unsigned int remove_aborted_writers : 1; - unsigned int open_read_timeout : 1; // UNUSED - unsigned int data_done : 1; - unsigned int read_from_writer_called : 1; - unsigned int not_from_ram_cache : 1; // entire object was from ram cache - unsigned int rewrite_resident_alt : 1; - unsigned int readers : 1; - unsigned int doc_from_ram_cache : 1; - unsigned int hit_evacuate : 1; - unsigned int compressed_in_ram : 1; // compressed state in ram cache - unsigned int allow_empty_doc : 1; // used for cache empty http document - } f; - }; - // BTF optimization used to skip reading stuff in cache partition that doesn't contain any - // dir entries. - char *scan_vol_map; - // BTF fix to handle objects that overlapped over two different reads, - // this is how much we need to back up the buffer to get the start of the overlapping object. - off_t scan_fix_buffer_offset; - // end region C -}; - #define PUSH_HANDLER(_x) \ do { \ ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \ @@ -917,5 +634,3 @@ cache_hash(const CryptoHash &hash) unsigned int mhash = (unsigned int)(f >> 32); return mhash; } - -LINK_DEFINITION(CacheVC, opendir_link)