TS-2275: fix interim cache lossing data if the server process crash we have disable the permanent storage on the interim cache device due to consistence. that is why I point out this is #1 problem of the current implement in https://blog.zymlinux.net/index.php/archives/555 as: loss data if the server process crash
after this patch, we can declare the interim cache stable Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/64b5a6fc Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/64b5a6fc Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/64b5a6fc Branch: refs/heads/5.0.x Commit: 64b5a6fca3a58bdd6010ad8d9ad08456a4f21ad5 Parents: 40deb91 Author: Gang Li <que...@taobao.com> Authored: Mon Oct 21 00:33:13 2013 +0800 Committer: Zhao Yongming <ming....@gmail.com> Committed: Mon Oct 21 15:13:21 2013 +0800 ---------------------------------------------------------------------- CHANGES | 3 + iocore/cache/Cache.cc | 276 ++++++++++++++++++++++++++++++++++++++-- iocore/cache/CacheDir.cc | 40 +++++- iocore/cache/CacheWrite.cc | 5 +- iocore/cache/P_CacheDir.h | 3 + iocore/cache/P_CacheVol.h | 65 +++++++--- 6 files changed, 361 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/64b5a6fc/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index e0c3452..0a1ded9 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,9 @@ -*- coding: utf-8 -*- Changes with Apache Traffic Server 4.1.0 + *) [TS-2275] fix interim cache lossing data if the server process crash + Author: Gang Li. + *) [TS-2291] Add remap_stats plugin to experimental. *) [TS-2242] Update core plugins' support_email and vendor_name for http://git-wip-us.apache.org/repos/asf/trafficserver/blob/64b5a6fc/iocore/cache/Cache.cc ---------------------------------------------------------------------- diff --git a/iocore/cache/Cache.cc b/iocore/cache/Cache.cc index ffd69b7..9256ea5 100644 --- a/iocore/cache/Cache.cc +++ b/iocore/cache/Cache.cc @@ -1163,6 +1163,24 @@ vol_init_dir(Vol *d) } } +#if TS_USE_INTERIM_CACHE == 1 +void +interimvol_clear_init(InterimCacheVol *d) +{ + memset(d->header, 0, sizeof(InterimVolHeaderFooter)); + d->header->magic = VOL_MAGIC; + d->header->version.ink_major = CACHE_DB_MAJOR_VERSION; + d->header->version.ink_minor = CACHE_DB_MINOR_VERSION; + d->header->agg_pos = d->header->write_pos = d->start; + d->header->last_write_pos = d->header->write_pos; + d->header->phase = 0; + d->header->cycle = 0; + d->header->create_time = time(NULL); + d->header->dirty = 0; + d->sector_size = d->header->sector_size = d->disk->hw_sector_size; +} +#endif + void vol_clear_init(Vol *d) { @@ -1180,6 +1198,12 @@ vol_clear_init(Vol *d) d->header->dirty = 0; d->sector_size = d->header->sector_size = d->disk->hw_sector_size; *d->footer = *d->header; + +#if TS_USE_INTERIM_CACHE == 1 + for (int i = 0; i < d->num_interim_vols; i++) { + interimvol_clear_init(&(d->interim_vols[i])); + } +#endif } int @@ -1251,11 +1275,6 @@ Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear) header = (VolHeaderFooter *) raw_dir; footer = (VolHeaderFooter *) (raw_dir + vol_dirlen(this) - ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter))); - if (clear) { - Note("clearing cache directory '%s'", hash_id); - return clear_dir(); - } - #if TS_USE_INTERIM_CACHE == 1 num_interim_vols = good_interim_disks; ink_assert(num_interim_vols >= 0 && num_interim_vols <= 8); @@ -1264,11 +1283,16 @@ Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear) off_t vlen = off_t (r * g_interim_disks[i]->len * STORE_BLOCK_SIZE); vlen = (vlen / STORE_BLOCK_SIZE) * STORE_BLOCK_SIZE; off_t start = ink_atomic_increment(&g_interim_disks[i]->skip, vlen); - interim_vols[i].init(start, vlen, g_interim_disks[i], this); + interim_vols[i].init(start, vlen, g_interim_disks[i], this, &(this->header->interim_header[i])); ink_assert(interim_vols[i].start + interim_vols[i].len <= g_interim_disks[i]->len * STORE_BLOCK_SIZE); } #endif + if (clear) { + Note("clearing cache directory '%s'", hash_id); + return clear_dir(); + } + init_info = new VolInitInfo(); int footerlen = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter)); off_t footer_offset = vol_dirlen(this) - footerlen; @@ -1349,11 +1373,30 @@ Vol::handle_dir_read(int event, void *data) return EVENT_DONE; } CHECK_DIR(this); + + sector_size = header->sector_size; + #if TS_USE_INTERIM_CACHE == 1 - if (gn_interim_disks > 0) - clear_interim_dir(this); + if (num_interim_vols > 0) { + interim_done = 0; + for (int i = 0; i < num_interim_vols; i++) { + interim_vols[i].recover_data(); + } + } else { #endif - sector_size = header->sector_size; + + return this->recover_data(); + +#if TS_USE_INTERIM_CACHE == 1 + } +#endif + + return EVENT_CONT; +} + +int +Vol::recover_data() +{ SET_HANDLER(&Vol::handle_recover_from_data); return handle_recover_from_data(EVENT_IMMEDIATE, 0); } @@ -1732,6 +1775,221 @@ Vol::dir_init_done(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */ ) } } +#if TS_USE_INTERIM_CACHE == 1 +int +InterimCacheVol::recover_data() +{ + io.aiocb.aio_fildes = fd; + io.action = this; + io.thread = AIO_CALLBACK_THREAD_ANY; + io.then = 0; + + SET_HANDLER(&InterimCacheVol::handle_recover_from_data); + return handle_recover_from_data(EVENT_IMMEDIATE, 0); +} + +int +InterimCacheVol::handle_recover_from_data(int event, void *data) +{ + (void)data; + uint32_t got_len = 0; + uint32_t max_sync_serial = header->sync_serial; + char *s, *e; + int ndone, offset; + + if (event == EVENT_IMMEDIATE) { + if (header->magic != VOL_MAGIC || header->version.ink_major != CACHE_DB_MAJOR_VERSION) { + Warning("bad header in cache directory for '%s', clearing", hash_id); + goto Lclear; + } else if (header->sync_serial == 0) { + io.aiocb.aio_buf = NULL; + goto Lfinish; + } + + // initialize + recover_wrapped = 0; + last_sync_serial = 0; + last_write_serial = 0; + recover_pos = header->last_write_pos; + if (recover_pos >= skip + len) { + recover_wrapped = 1; + recover_pos = start; + } + + io.aiocb.aio_buf = (char *)ats_memalign(sysconf(_SC_PAGESIZE), RECOVERY_SIZE); + io.aiocb.aio_nbytes = RECOVERY_SIZE; + if ((off_t)(recover_pos + io.aiocb.aio_nbytes) > (off_t)(skip + len)) + io.aiocb.aio_nbytes = (skip + len) - recover_pos; + + } else if (event == AIO_EVENT_DONE) { + if ((size_t) io.aiocb.aio_nbytes != (size_t) io.aio_result) { + Warning("disk read error on recover '%s', clearing", hash_id); + goto Lclear; + } + + if (io.aiocb.aio_offset == header->last_write_pos) { + uint32_t to_check = header->write_pos - header->last_write_pos; + ink_assert(to_check && to_check < (uint32_t)io.aiocb.aio_nbytes); + uint32_t done = 0; + s = (char *) io.aiocb.aio_buf; + while (done < to_check) { + Doc *doc = (Doc *) (s + done); + if (doc->magic != DOC_MAGIC || doc->write_serial > header->write_serial) { + Warning("no valid directory found while recovering '%s', clearing", hash_id); + goto Lclear; + } + done += round_to_approx_size(doc->len); + if (doc->sync_serial > last_write_serial) + last_sync_serial = doc->sync_serial; + } + ink_assert(done == to_check); + + got_len = io.aiocb.aio_nbytes - done; + recover_pos += io.aiocb.aio_nbytes; + s = (char *) io.aiocb.aio_buf + done; + e = s + got_len; + } else { + got_len = io.aiocb.aio_nbytes; + recover_pos += io.aiocb.aio_nbytes; + s = (char *) io.aiocb.aio_buf; + e = s + got_len; + } + } + + // examine what we got + if (got_len) { + + Doc *doc = NULL; + + if (recover_wrapped && start == io.aiocb.aio_offset) { + doc = (Doc *) s; + if (doc->magic != DOC_MAGIC || doc->write_serial < last_write_serial) { + recover_pos = skip + len - EVACUATION_SIZE; + goto Ldone; + } + } + + while (s < e) { + doc = (Doc *) s; + + if (doc->magic != DOC_MAGIC || doc->sync_serial != last_sync_serial) { + + if (doc->magic == DOC_MAGIC) { + if (doc->sync_serial > header->sync_serial) + max_sync_serial = doc->sync_serial; + + if (doc->sync_serial > last_sync_serial && doc->sync_serial <= header->sync_serial + 1) { + last_sync_serial = doc->sync_serial; + s += round_to_approx_size(doc->len); + continue; + + } else if (recover_pos - (e - s) > (skip + len) - AGG_SIZE) { + recover_wrapped = 1; + recover_pos = start; + io.aiocb.aio_nbytes = RECOVERY_SIZE; + break; + } + + recover_pos -= e - s; + goto Ldone; + + } else { + recover_pos -= e - s; + if (recover_pos > (skip + len) - AGG_SIZE) { + recover_wrapped = 1; + recover_pos = start; + io.aiocb.aio_nbytes = RECOVERY_SIZE; + break; + } + + goto Ldone; + } + } + + last_write_serial = doc->write_serial; + s += round_to_approx_size(doc->len); + } + + if (s >= e) { + + if (s > e) + s -= round_to_approx_size(doc->len); + + recover_pos -= e - s; + if (recover_pos >= skip + len) + recover_pos = start; + + io.aiocb.aio_nbytes = RECOVERY_SIZE; + if ((off_t)(recover_pos + io.aiocb.aio_nbytes) > (off_t)(skip + len)) + io.aiocb.aio_nbytes = (skip + len) - recover_pos; + } + } + + if (recover_pos == prev_recover_pos) + goto Lclear; + + prev_recover_pos = recover_pos; + io.aiocb.aio_offset = recover_pos; + ink_assert(ink_aio_read(&io)); + return EVENT_CONT; + +Ldone: { + + if (recover_pos == header->write_pos && recover_wrapped) { + goto Lfinish; + } + + recover_pos += EVACUATION_SIZE; + if (recover_pos < header->write_pos && (recover_pos + EVACUATION_SIZE >= header->write_pos)) { + Debug("cache_init", "Head Pos: %" PRIu64 ", Rec Pos: %" PRIu64 ", Wrapped:%d", header->write_pos, recover_pos, recover_wrapped); + Warning("no valid directory found while recovering '%s', clearing", hash_id); + goto Lclear; + } + + if (recover_pos > skip + len) + recover_pos -= skip + len; + + uint32_t next_sync_serial = max_sync_serial + 1; + if (!(header->sync_serial & 1) == !(next_sync_serial & 1)) + next_sync_serial++; + + off_t clear_start = offset_to_vol_offset(this, header->write_pos); + off_t clear_end = offset_to_vol_offset(this, recover_pos); + + if (clear_start <= clear_end) + dir_clean_range_interimvol(clear_start, clear_end, this); + else { + dir_clean_range_interimvol(clear_end, DIR_OFFSET_MAX, this); + dir_clean_range_interimvol(1, clear_start, this); + } + + header->sync_serial = next_sync_serial; + + goto Lfinish; + } + +Lclear: + + interimvol_clear_init(this); + offset = this - vol->interim_vols; + clear_interimvol_dir(vol, offset); // remove this interimvol dir + +Lfinish: + + free((char*)io.aiocb.aio_buf); + io.aiocb.aio_buf = NULL; + + set_io_not_in_progress(); + + ndone = ink_atomic_increment(&vol->interim_done, 1); + if (ndone == vol->num_interim_vols - 1) { // all interim finished + return vol->recover_data(); + } + + return EVENT_CONT; +} +#endif + // explicit pair for random table in build_vol_hash_table struct rtable_pair { unsigned int rval; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/64b5a6fc/iocore/cache/CacheDir.cc ---------------------------------------------------------------------- diff --git a/iocore/cache/CacheDir.cc b/iocore/cache/CacheDir.cc index 055e7fd..2ac5305 100644 --- a/iocore/cache/CacheDir.cc +++ b/iocore/cache/CacheDir.cc @@ -373,12 +373,12 @@ dir_clean_vol(Vol *d) #if TS_USE_INTERIM_CACHE == 1 static inline void -interim_dir_clean_bucket(Dir *b, int s, Vol *vol) +interim_dir_clean_bucket(Dir *b, int s, Vol *vol, int offset) { Dir *e = b, *p = NULL; Dir *seg = dir_segment(s, vol); do { - if (dir_ininterim(e)) { + if (dir_ininterim(e) && dir_get_index(e) == offset) { e = dir_delete_entry(e, p, s, vol); continue; } @@ -388,12 +388,12 @@ interim_dir_clean_bucket(Dir *b, int s, Vol *vol) } void -clear_interim_dir(Vol *v) +clear_interimvol_dir(Vol *v, int offset) { for (int i = 0; i < v->segments; i++) { Dir *seg = dir_segment(i, v); for (int j = 0; j < v->buckets; j++) { - interim_dir_clean_bucket(dir_bucket(j, seg), i, v); + interim_dir_clean_bucket(dir_bucket(j, seg), i, v, offset); } } } @@ -437,13 +437,30 @@ dir_clean_segment(int s, InterimCacheVol *d) } } void -clean_interimvol(InterimCacheVol *d) +dir_clean_interimvol(InterimCacheVol *d) { - Warning("Note: clean interim"); for (int i = 0; i < d->vol->segments; i++) dir_clean_segment(i, d); CHECK_DIR(d); } + +void +dir_clean_range_interimvol(off_t start, off_t end, InterimCacheVol *svol) +{ + Vol *vol = svol->vol; + int offset = svol - vol->interim_vols; + + for (int i = 0; i < vol->buckets * DIR_DEPTH * vol->segments; i++) { + Dir *e = dir_index(vol, i); + if (dir_ininterim(e) && dir_get_index(e) == offset && !dir_token(e) && + dir_offset(e) >= (int64_t)start && dir_offset(e) < (int64_t)end) { + CACHE_DEC_DIR_USED(vol->mutex); + dir_set_offset(e, 0); // delete + } + } + + dir_clean_interimvol(svol); +} #endif void @@ -1068,6 +1085,12 @@ sync_cache_dir_on_shutdown(void) Debug("cache_dir_sync", "Periodic dir sync in progress -- overwriting"); } d->footer->sync_serial = d->header->sync_serial; + +#if TS_USE_INTERIM_CACHE == 1 + for (int j = 0; j < d->num_interim_vols; j++) { + d->interim_vols[j].header->sync_serial = d->header->sync_serial; + } +#endif CHECK_DIR(d); memcpy(buf, d->raw_dir, dirlen); size_t B = d->header->sync_serial & 1; @@ -1172,6 +1195,11 @@ Lrestart: } d->header->sync_serial++; d->footer->sync_serial = d->header->sync_serial; +#if TS_USE_INTERIM_CACHE == 1 + for (int j = 0; j < d->num_interim_vols; j++) { + d->interim_vols[j].header->sync_serial = d->header->sync_serial; + } +#endif CHECK_DIR(d); memcpy(buf, d->raw_dir, dirlen); d->dir_sync_in_progress = 1; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/64b5a6fc/iocore/cache/CacheWrite.cc ---------------------------------------------------------------------- diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc index 210e84f..841bcbf 100644 --- a/iocore/cache/CacheWrite.cc +++ b/iocore/cache/CacheWrite.cc @@ -1834,6 +1834,9 @@ Lagain: if (!mts->notMigrate) { old_off = dir_get_offset(&mts->dir); Dir old_dir = mts->dir; + doc->sync_serial = header->sync_serial; + doc->write_serial = header->write_serial; + memcpy(agg_buffer + agg_buf_pos, doc, doc->len); off_t o = header->write_pos + agg_buf_pos; dir_set_offset(&mts->dir, offset_to_vol_offset(this, o)); @@ -1876,7 +1879,7 @@ Lagain: header->cycle++; header->agg_pos = header->write_pos; - clean_interimvol(this); + dir_clean_interimvol(this); goto Lagain; } return EVENT_CONT; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/64b5a6fc/iocore/cache/P_CacheDir.h ---------------------------------------------------------------------- diff --git a/iocore/cache/P_CacheDir.h b/iocore/cache/P_CacheDir.h index d862d51..013698a 100644 --- a/iocore/cache/P_CacheDir.h +++ b/iocore/cache/P_CacheDir.h @@ -28,6 +28,7 @@ #include "P_CacheHttp.h" struct Vol; +struct InterimCacheVol; struct CacheVC; /* @@ -191,6 +192,8 @@ struct FreeDir (((uint64_t)(_e)->w[4]) << 24))) void clear_interim_dir(Vol *v); +void clear_interimvol_dir(Vol *v, int offset); +void dir_clean_range_interimvol(off_t start, off_t end, InterimCacheVol *svol); #else #define dir_offset(_e) ((int64_t) \ http://git-wip-us.apache.org/repos/asf/trafficserver/blob/64b5a6fc/iocore/cache/P_CacheVol.h ---------------------------------------------------------------------- diff --git a/iocore/cache/P_CacheVol.h b/iocore/cache/P_CacheVol.h index ba328a5..ecf2d41 100644 --- a/iocore/cache/P_CacheVol.h +++ b/iocore/cache/P_CacheVol.h @@ -71,6 +71,26 @@ #define sizeofDoc (((uint32_t)(uintptr_t)&((Doc*)0)->checksum)+(uint32_t)sizeof(uint32_t)) +#if TS_USE_INTERIM_CACHE == 1 +struct InterimVolHeaderFooter +{ + unsigned int magic; + VersionNumber version; + time_t create_time; + off_t write_pos; + off_t last_write_pos; + off_t agg_pos; + uint32_t generation; // token generation (vary), this cannot be 0 + uint32_t phase; + uint32_t cycle; + uint32_t sync_serial; + uint32_t write_serial; + uint32_t dirty; + uint32_t sector_size; + int32_t unused; // pad out to 8 byte boundary +}; +#endif + struct Cache; struct Vol; struct CacheDisk; @@ -94,6 +114,9 @@ struct VolHeaderFooter uint32_t dirty; uint32_t sector_size; uint32_t unused; // pad out to 8 byte boundary +#if TS_USE_INTERIM_CACHE == 1 + InterimVolHeaderFooter interim_header[8]; +#endif uint16_t freelist[1]; }; @@ -133,6 +156,7 @@ struct EvacuationBlock extern int migrate_threshold; extern int good_interim_disks; + union AccessEntry { uintptr_t v[2]; struct { @@ -313,8 +337,15 @@ struct MigrateToInterimCache struct InterimCacheVol: public Continuation { - VolHeaderFooter hh; - VolHeaderFooter *header; + char *hash_id; + InterimVolHeaderFooter *header; + + off_t recover_pos; + off_t prev_recover_pos; + uint32_t last_sync_serial; + uint32_t last_write_serial; + bool recover_wrapped; + off_t scan_pos; off_t skip; // start of headers off_t start; // start of data @@ -335,6 +366,9 @@ struct InterimCacheVol: public Continuation return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS; } + int recover_data(); + int handle_recover_from_data(int event, void *data); + void set_io_not_in_progress() { io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS; } @@ -346,7 +380,11 @@ struct InterimCacheVol: public Continuation return INK_ALIGN(ll, disk->hw_sector_size); } - void init(off_t s, off_t l, CacheDisk *interim, Vol *v) { + void init(off_t s, off_t l, CacheDisk *interim, Vol *v, InterimVolHeaderFooter *hptr) { + const size_t hash_id_size = strlen(interim->path) + 32; + hash_id = (char *)ats_malloc(hash_id_size); + snprintf(hash_id, hash_id_size, "%s %" PRIu64 ":%" PRIu64 "", interim->path, s, l); + skip = start = s; len = l; disk = interim; @@ -355,17 +393,7 @@ struct InterimCacheVol: public Continuation transistor_range_threshold = len / 5; // 20% storage size for transistor sync = false; - header = &hh; - header->magic = VOL_MAGIC; - header->version.ink_major = CACHE_DB_MAJOR_VERSION; - header->version.ink_minor = CACHE_DB_MINOR_VERSION; - header->agg_pos = header->write_pos = start; - header->last_write_pos = header->write_pos; - header->phase = 0; - header->cycle = 0; - header->create_time = time(NULL); - header->dirty = 0; - sector_size = header->sector_size = disk->hw_sector_size; + header = hptr; agg_todo_size = 0; agg_buf_pos = 0; @@ -379,7 +407,7 @@ struct InterimCacheVol: public Continuation void dir_clean_bucket(Dir *b, int s, InterimCacheVol *d); void dir_clean_segment(int s, InterimCacheVol *d); -void clean_interimvol(InterimCacheVol *d); +void dir_clean_interimvol(InterimCacheVol *d); #endif @@ -445,6 +473,7 @@ struct Vol: public Continuation AccessHistory history; uint32_t interim_index; Queue<MigrateToInterimCache, MigrateToInterimCache::Link_hash_link> mig_hash[MIGRATE_BUCKETS]; + volatile int interim_done; bool migrate_probe(CacheKey *key, MigrateToInterimCache **result) { @@ -477,6 +506,8 @@ struct Vol: public Continuation void cancel_trigger(); + int recover_data(); + int open_write(CacheVC *cont, int allow_if_writers, int max_writers); int open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers); int close_write(CacheVC *cont); @@ -500,6 +531,10 @@ struct Vol: public Continuation int handle_recover_write_dir(int event, void *data); int handle_header_read(int event, void *data); +#if TS_USE_INTERIM_CACHE == 1 + int recover_interim_vol(); +#endif + int dir_init_done(int event, void *data); int dir_check(bool fix);