http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cache/P_CacheInternal.h ---------------------------------------------------------------------- diff --git a/iocore/cache/P_CacheInternal.h b/iocore/cache/P_CacheInternal.h index 438c66a..3d95778 100644 --- a/iocore/cache/P_CacheInternal.h +++ b/iocore/cache/P_CacheInternal.h @@ -36,13 +36,13 @@ struct EvacuationBlock; // Compilation Options -#define ALTERNATES 1 +#define ALTERNATES 1 // #define CACHE_LOCK_FAIL_RATE 0.001 // #define CACHE_AGG_FAIL_RATE 0.005 // #define CACHE_INSPECTOR_PAGES -#define MAX_CACHE_VCS_PER_THREAD 500 +#define MAX_CACHE_VCS_PER_THREAD 500 -#define INTEGRAL_FRAGS 4 +#define INTEGRAL_FRAGS 4 #ifdef CACHE_INSPECTOR_PAGES #ifdef DEBUG @@ -53,64 +53,63 @@ struct EvacuationBlock; #ifdef DEBUG #define DDebug Debug #else -#define DDebug if (0) dummy_debug +#define DDebug \ + if (0) \ + dummy_debug #endif -#define AIO_SOFT_FAILURE -100000 +#define AIO_SOFT_FAILURE -100000 // retry read from writer delay -#define WRITER_RETRY_DELAY HRTIME_MSECONDS(50) +#define WRITER_RETRY_DELAY HRTIME_MSECONDS(50) #ifndef CACHE_LOCK_FAIL_RATE #define CACHE_TRY_LOCK(_l, _m, _t) MUTEX_TRY_LOCK(_l, _m, _t) #else -#define CACHE_TRY_LOCK(_l, _m, _t) \ - MUTEX_TRY_LOCK(_l, _m, _t); \ - if ((uint32_t)_t->generator.random() < \ - (uint32_t)(UINT_MAX *CACHE_LOCK_FAIL_RATE)) \ - CACHE_MUTEX_RELEASE(_l) +#define CACHE_TRY_LOCK(_l, _m, _t) \ + MUTEX_TRY_LOCK(_l, _m, _t); \ + if ((uint32_t)_t->generator.random() < (uint32_t)(UINT_MAX * CACHE_LOCK_FAIL_RATE)) \ + CACHE_MUTEX_RELEASE(_l) #endif -#define VC_LOCK_RETRY_EVENT() \ - do { \ +#define VC_LOCK_RETRY_EVENT() \ + do { \ trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay), event); \ - return EVENT_CONT; \ + return EVENT_CONT; \ } while (0) -#define VC_SCHED_LOCK_RETRY() \ - do { \ +#define VC_SCHED_LOCK_RETRY() \ + do { \ trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \ - return EVENT_CONT; \ + return EVENT_CONT; \ } while (0) -#define CONT_SCHED_LOCK_RETRY_RET(_c) \ - do { \ +#define CONT_SCHED_LOCK_RETRY_RET(_c) \ + do { \ _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \ - return EVENT_CONT; \ + return EVENT_CONT; \ } while (0) -#define CONT_SCHED_LOCK_RETRY(_c) \ - _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay)) - -#define VC_SCHED_WRITER_RETRY() \ - do { \ - ink_assert(!trigger); \ - writer_lock_retry++; \ - ink_hrtime _t = WRITER_RETRY_DELAY; \ - if (writer_lock_retry > 2) \ - _t = WRITER_RETRY_DELAY * 2; \ - else if (writer_lock_retry > 5) \ - _t = WRITER_RETRY_DELAY * 10; \ - else if (writer_lock_retry > 10) \ - _t = WRITER_RETRY_DELAY * 100; \ +#define CONT_SCHED_LOCK_RETRY(_c) _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay)) + +#define VC_SCHED_WRITER_RETRY() \ + do { \ + ink_assert(!trigger); \ + writer_lock_retry++; \ + ink_hrtime _t = WRITER_RETRY_DELAY; \ + if (writer_lock_retry > 2) \ + _t = WRITER_RETRY_DELAY * 2; \ + else if (writer_lock_retry > 5) \ + _t = WRITER_RETRY_DELAY * 10; \ + else if (writer_lock_retry > 10) \ + _t = WRITER_RETRY_DELAY * 100; \ trigger = mutex->thread_holding->schedule_in_local(this, _t); \ - return EVENT_CONT; \ + return EVENT_CONT; \ } while (0) - // cache stats definitions -enum -{ +// cache stats definitions +enum { cache_bytes_used_stat, cache_bytes_total_stat, cache_ram_cache_bytes_stat, @@ -170,46 +169,41 @@ enum extern RecRawStatBlock *cache_rsb; -#define GLOBAL_CACHE_SET_DYN_STAT(x,y) \ - RecSetGlobalRawStatSum(cache_rsb, (x), (y)) +#define GLOBAL_CACHE_SET_DYN_STAT(x, y) RecSetGlobalRawStatSum(cache_rsb, (x), (y)) -#define CACHE_SET_DYN_STAT(x,y) \ - RecSetGlobalRawStatSum(cache_rsb, (x), (y)) \ - RecSetGlobalRawStatSum(vol->cache_vol->vol_rsb, (x), (y)) +#define CACHE_SET_DYN_STAT(x, y) \ + RecSetGlobalRawStatSum(cache_rsb, (x), (y)) RecSetGlobalRawStatSum(vol->cache_vol->vol_rsb, (x), (y)) -#define CACHE_INCREMENT_DYN_STAT(x) \ - RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), 1); \ - RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), 1); +#define CACHE_INCREMENT_DYN_STAT(x) \ + RecIncrRawStat(cache_rsb, mutex->thread_holding, (int)(x), 1); \ + RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), 1); -#define CACHE_DECREMENT_DYN_STAT(x) \ - RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), -1); \ - RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), -1); +#define CACHE_DECREMENT_DYN_STAT(x) \ + RecIncrRawStat(cache_rsb, mutex->thread_holding, (int)(x), -1); \ + RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), -1); -#define CACHE_VOL_SUM_DYN_STAT(x,y) \ - RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), (int64_t) y); +#define CACHE_VOL_SUM_DYN_STAT(x, y) RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), (int64_t)y); -#define CACHE_SUM_DYN_STAT(x, y) \ - RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), (int64_t) (y)); \ - RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), (int64_t) (y)); +#define CACHE_SUM_DYN_STAT(x, y) \ + RecIncrRawStat(cache_rsb, mutex->thread_holding, (int)(x), (int64_t)(y)); \ + RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), (int64_t)(y)); -#define CACHE_SUM_DYN_STAT_THREAD(x, y) \ - RecIncrRawStat(cache_rsb, this_ethread(), (int) (x), (int64_t) (y)); \ - RecIncrRawStat(vol->cache_vol->vol_rsb, this_ethread(), (int) (x), (int64_t) (y)); +#define CACHE_SUM_DYN_STAT_THREAD(x, y) \ + RecIncrRawStat(cache_rsb, this_ethread(), (int)(x), (int64_t)(y)); \ + RecIncrRawStat(vol->cache_vol->vol_rsb, this_ethread(), (int)(x), (int64_t)(y)); -#define GLOBAL_CACHE_SUM_GLOBAL_DYN_STAT(x, y) \ - RecIncrGlobalRawStatSum(cache_rsb,(x),(y)) +#define GLOBAL_CACHE_SUM_GLOBAL_DYN_STAT(x, y) RecIncrGlobalRawStatSum(cache_rsb, (x), (y)) #define CACHE_SUM_GLOBAL_DYN_STAT(x, y) \ - RecIncrGlobalRawStatSum(cache_rsb,(x),(y)) \ - RecIncrGlobalRawStatSum(vol->cache_vol->vol_rsb,(x),(y)) + RecIncrGlobalRawStatSum(cache_rsb, (x), (y)) RecIncrGlobalRawStatSum(vol->cache_vol->vol_rsb, (x), (y)) -#define CACHE_CLEAR_DYN_STAT(x) \ -do { \ - RecSetRawStatSum(cache_rsb, (x), 0); \ - RecSetRawStatCount(cache_rsb, (x), 0); \ - RecSetRawStatSum(vol->cache_vol->vol_rsb, (x), 0); \ - RecSetRawStatCount(vol->cache_vol->vol_rsb, (x), 0); \ -} while (0); +#define CACHE_CLEAR_DYN_STAT(x) \ + do { \ + RecSetRawStatSum(cache_rsb, (x), 0); \ + RecSetRawStatCount(cache_rsb, (x), 0); \ + RecSetRawStatSum(vol->cache_vol->vol_rsb, (x), 0); \ + RecSetRawStatCount(vol->cache_vol->vol_rsb, (x), 0); \ + } while (0); // Configuration extern int cache_config_dir_sync_frequency; @@ -236,8 +230,7 @@ extern int cache_config_mutex_retry_delay; extern int good_interim_disks; #endif // CacheVC -struct CacheVC: public CacheVConnection -{ +struct CacheVC : public CacheVConnection { CacheVC(); VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf); @@ -249,31 +242,35 @@ struct CacheVC: public CacheVConnection bool get_data(int i, void *data); bool set_data(int i, void *data); - bool is_ram_cache_hit() const + bool + is_ram_cache_hit() const { ink_assert(vio.op == VIO::READ); return !f.not_from_ram_cache; } - int get_header(void **ptr, int *len) + int + get_header(void **ptr, int *len) { if (first_buf.m_ptr) { - Doc *doc = (Doc*)first_buf->data(); + Doc *doc = (Doc *)first_buf->data(); *ptr = doc->hdr(); *len = doc->hlen; return 0; } else return -1; } - int set_header(void *ptr, int len) + int + set_header(void *ptr, int len) { header_to_write = ptr; header_to_write_len = len; return 0; } - int get_single_data(void **ptr, int *len) + int + get_single_data(void **ptr, int *len) { if (first_buf.m_ptr) { - Doc *doc = (Doc*)first_buf->data(); + Doc *doc = (Doc *)first_buf->data(); if (doc->data_len() == doc->total_len) { *ptr = doc->data(); *len = doc->data_len(); @@ -339,15 +336,18 @@ struct CacheVC: public CacheVConnection int scanOpenWrite(int event, Event *e); int scanRemoveDone(int event, Event *e); - int is_io_in_progress() + int + is_io_in_progress() { return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS; } - void set_io_not_in_progress() + void + set_io_not_in_progress() { io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS; } - void set_agg_write_in_progress() + void + set_agg_write_in_progress() { io.aiocb.aio_fildes = AIO_AGG_WRITE_IN_PROGRESS; } @@ -358,16 +358,16 @@ struct CacheVC: public CacheVConnection virtual int64_t get_object_size(); #ifdef HTTP_CACHE virtual void set_http_info(CacheHTTPInfo *info); - virtual void get_http_info(CacheHTTPInfo ** info); + virtual void get_http_info(CacheHTTPInfo **info); /** Get the fragment table. @return The address of the start of the fragment table, or @c NULL if there is no fragment table. */ - virtual HTTPInfo::FragOffset* get_frag_table(); + virtual HTTPInfo::FragOffset *get_frag_table(); /** Load alt pointers and do fixups if needed. @return Length of header data used for alternates. */ - virtual uint32_t load_http_info(CacheHTTPInfoVector* info, struct Doc* doc, RefCountObj * block_ptr = NULL); + virtual uint32_t load_http_info(CacheHTTPInfoVector *info, struct Doc *doc, RefCountObj *block_ptr = NULL); #endif virtual bool is_pread_capable(); virtual bool set_pin_in_cache(time_t time_pin); @@ -375,8 +375,8 @@ struct CacheVC: public CacheVConnection virtual bool set_disk_io_priority(int priority); virtual int get_disk_io_priority(); - // offsets from the base stat -#define CACHE_STAT_ACTIVE 0 +// offsets from the base stat +#define CACHE_STAT_ACTIVE 0 #define CACHE_STAT_SUCCESS 1 #define CACHE_STAT_FAILURE 2 @@ -411,7 +411,7 @@ struct CacheVC: public CacheVConnection OpenDirEntry *od; AIOCallbackInternal io; - int alternate_index; // preferred position in vector + int alternate_index; // preferred position in vector LINK(CacheVC, opendir_link); #ifdef CACHE_STAT_PAGES LINK(CacheVC, stat_link); @@ -426,18 +426,18 @@ struct CacheVC: public CacheVConnection // NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the // size_to_init initialization VIO vio; - EThread *initial_thread; // initial thread open_XX was called on + EThread *initial_thread; // initial thread open_XX was called on CacheFragType frag_type; CacheHTTPInfo *info; CacheHTTPInfoVector *write_vector; #ifdef HTTP_CACHE CacheLookupHttpConfig *params; #endif - int header_len; // for communicating with agg_copy - int frag_len; // for communicating with agg_copy - uint32_t write_len; // for communicating with agg_copy - uint32_t agg_len; // for communicating with aggWrite - uint32_t write_serial; // serial of the final write for SYNC + int header_len; // for communicating with agg_copy + int frag_len; // for communicating with agg_copy + uint32_t write_len; // for communicating with agg_copy + uint32_t agg_len; // for communicating with aggWrite + uint32_t write_serial; // serial of the final write for SYNC Vol *vol; Dir *last_collision; Event *trigger; @@ -448,14 +448,14 @@ struct CacheVC: public CacheVConnection int base_stat; int recursive; int closed; - uint64_t seek_to; // pread offset - int64_t offset; // offset into 'blocks' of data to write - int64_t writer_offset; // offset of the writer for reading from a writer - int64_t length; // length of data available to write - int64_t doc_pos; // read position in 'buf' - uint64_t write_pos; // length written - uint64_t total_len; // total length written and available to write - uint64_t doc_len; // total_length (of the selected alternate for HTTP) + uint64_t seek_to; // pread offset + int64_t offset; // offset into 'blocks' of data to write + int64_t writer_offset; // offset of the writer for reading from a writer + int64_t length; // length of data available to write + int64_t doc_pos; // read position in 'buf' + uint64_t write_pos; // length written + uint64_t total_len; // total length written and available to write + uint64_t doc_len; // total_length (of the selected alternate for HTTP) uint64_t update_len; int fragment; int scan_msec_delay; @@ -470,38 +470,36 @@ struct CacheVC: public CacheVConnection MigrateToInterimCache *mts; uint64_t dir_off; #endif - union - { + union { uint32_t flags; - struct - { - unsigned int use_first_key:1; - unsigned int overwrite:1; // overwrite first_key Dir if it exists - unsigned int close_complete:1; // WRITE_COMPLETE is final - unsigned int sync:1; // write to be committed to durable storage before WRITE_COMPLETE - unsigned int evacuator:1; - unsigned int single_fragment:1; - unsigned int evac_vector:1; - unsigned int lookup:1; - unsigned int update:1; - unsigned int remove:1; - unsigned int remove_aborted_writers:1; - unsigned int open_read_timeout:1; // UNUSED - unsigned int data_done:1; - unsigned int read_from_writer_called:1; - unsigned int not_from_ram_cache:1; // entire object was from ram cache - unsigned int rewrite_resident_alt:1; - unsigned int readers:1; - unsigned int doc_from_ram_cache:1; - unsigned int hit_evacuate:1; + struct { + unsigned int use_first_key : 1; + unsigned int overwrite : 1; // overwrite first_key Dir if it exists + unsigned int close_complete : 1; // WRITE_COMPLETE is final + unsigned int sync : 1; // write to be committed to durable storage before WRITE_COMPLETE + unsigned int evacuator : 1; + unsigned int single_fragment : 1; + unsigned int evac_vector : 1; + unsigned int lookup : 1; + unsigned int update : 1; + unsigned int remove : 1; + unsigned int remove_aborted_writers : 1; + unsigned int open_read_timeout : 1; // UNUSED + unsigned int data_done : 1; + unsigned int read_from_writer_called : 1; + unsigned int not_from_ram_cache : 1; // entire object was from ram cache + unsigned int rewrite_resident_alt : 1; + unsigned int readers : 1; + unsigned int doc_from_ram_cache : 1; + unsigned int hit_evacuate : 1; #if TS_USE_INTERIM_CACHE == 1 - unsigned int read_from_interim:1; - unsigned int write_into_interim:1; - unsigned int ram_fixup:1; - unsigned int transistor:1; + unsigned int read_from_interim : 1; + unsigned int write_into_interim : 1; + unsigned int ram_fixup : 1; + unsigned int transistor : 1; #endif #ifdef HTTP_CACHE - unsigned int allow_empty_doc:1; // used for cache empty http document + unsigned int allow_empty_doc : 1; // used for cache empty http document #endif } f; }; @@ -511,26 +509,26 @@ struct CacheVC: public CacheVConnection // BTF fix to handle objects that overlapped over two different reads, // this is how much we need to back up the buffer to get the start of the overlapping object. off_t scan_fix_buffer_offset; - //end region C + // end region C }; -#define PUSH_HANDLER(_x) do { \ - ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \ - save_handler = handler; handler = (ContinuationHandler)(_x); \ -} while (0) +#define PUSH_HANDLER(_x) \ + do { \ + ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \ + save_handler = handler; \ + handler = (ContinuationHandler)(_x); \ + } while (0) -#define POP_HANDLER do { \ +#define POP_HANDLER \ + do { \ handler = save_handler; \ ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \ } while (0) -struct CacheRemoveCont: public Continuation -{ +struct CacheRemoveCont : public Continuation { int event_handler(int event, void *data); - CacheRemoveCont() - : Continuation(NULL) - { } + CacheRemoveCont() : Continuation(NULL) {} }; @@ -593,7 +591,7 @@ free_CacheVC(CacheVC *cont) } } #endif - } // else abort,cancel + } // else abort,cancel } ink_assert(mutex->thread_holding == this_ethread()); if (cont->trigger) @@ -631,7 +629,7 @@ free_CacheVC(CacheVC *cont) cont->alternate_index = CACHE_ALT_INDEX_DEFAULT; if (cont->scan_vol_map) ats_free(cont->scan_vol_map); - memset((char *) &cont->vio, 0, cont->size_to_init); + memset((char *)&cont->vio, 0, cont->size_to_init); #ifdef CACHE_STAT_PAGES ink_assert(!cont->stat_link.next && !cont->stat_link.prev); #endif @@ -647,7 +645,7 @@ CacheVC::calluser(int event) { recursive++; ink_assert(!vol || this_ethread() != vol->mutex->thread_holding); - vio._cont->handleEvent(event, (void *) &vio); + vio._cont->handleEvent(event, (void *)&vio); recursive--; if (closed) { die(); @@ -685,7 +683,7 @@ CacheVC::do_read_call(CacheKey *akey) f.transistor = 0; f.read_from_interim = dir_ininterim(&dir); - if (!f.read_from_interim && vio.op == VIO::READ && good_interim_disks > 0){ + if (!f.read_from_interim && vio.op == VIO::READ && good_interim_disks > 0) { vol->history.put_key(read_key); if (vol->history.is_hot(read_key) && !vol->migrate_probe(read_key, NULL) && !od) { f.write_into_interim = 1; @@ -693,8 +691,7 @@ CacheVC::do_read_call(CacheKey *akey) } if (f.read_from_interim) { interim_vol = &vol->interim_vols[dir_get_index(&dir)]; - if (vio.op == VIO::READ && vol_transistor_range_valid(interim_vol, &dir) - && !vol->migrate_probe(read_key, NULL) && !od) + if (vio.op == VIO::READ && vol_transistor_range_valid(interim_vol, &dir) && !vol->migrate_probe(read_key, NULL) && !od) f.transistor = 1; } if (f.write_into_interim || f.transistor) { @@ -739,11 +736,11 @@ CacheVC::die() SET_HANDLER(&CacheVC::openWriteClose); if (!recursive) openWriteClose(EVENT_NONE, NULL); - } // else catch it at the end of openWriteWriteDone + } // else catch it at the end of openWriteWriteDone return EVENT_CONT; } else { if (is_io_in_progress()) - save_handler = (ContinuationHandler) & CacheVC::openReadClose; + save_handler = (ContinuationHandler)&CacheVC::openReadClose; else { SET_HANDLER(&CacheVC::openReadClose); if (!recursive) @@ -797,7 +794,8 @@ CacheVC::writer_done() // original writer, since we never choose a writer that started // after the reader. The original writer was deallocated and then // reallocated for the same first_key - for (; w && (w != write_vc || w->start_time > start_time); w = (CacheVC *) w->opendir_link.next); + for (; w && (w != write_vc || w->start_time > start_time); w = (CacheVC *)w->opendir_link.next) + ; if (!w) return true; return false; @@ -823,8 +821,7 @@ Vol::open_write(CacheVC *cont, int allow_if_writers, int max_writers) if (!cont->f.remove) { agg_error = (!cont->f.update && agg_todo_size > cache_config_agg_write_backlog); #ifdef CACHE_AGG_FAIL_RATE - agg_error = agg_error || ((uint32_t) mutex->thread_holding->generator.random() < - (uint32_t) (UINT_MAX * CACHE_AGG_FAIL_RATE)); + agg_error = agg_error || ((uint32_t)mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX * CACHE_AGG_FAIL_RATE)); #endif } if (agg_error) { @@ -880,8 +877,8 @@ Vol::open_read_lock(INK_MD5 *key, EThread *t) TS_INLINE int Vol::begin_read_lock(CacheVC *cont) { - // no need for evacuation as the entire document is already in memory -#ifndef CACHE_STAT_PAGES +// no need for evacuation as the entire document is already in memory +#ifndef CACHE_STAT_PAGES if (cont->f.single_fragment) return 0; #endif @@ -944,8 +941,8 @@ extern uint8_t CacheKey_next_table[]; void TS_INLINE next_CacheKey(CacheKey *next_key, CacheKey *key) { - uint8_t *b = (uint8_t *) next_key; - uint8_t *k = (uint8_t *) key; + uint8_t *b = (uint8_t *)next_key; + uint8_t *k = (uint8_t *)key; b[0] = CacheKey_next_table[k[0]]; for (int i = 1; i < 16; i++) b[i] = CacheKey_next_table[(b[i - 1] + k[i]) & 0xFF]; @@ -954,8 +951,8 @@ extern uint8_t CacheKey_prev_table[]; void TS_INLINE prev_CacheKey(CacheKey *prev_key, CacheKey *key) { - uint8_t *b = (uint8_t *) prev_key; - uint8_t *k = (uint8_t *) key; + uint8_t *b = (uint8_t *)prev_key; + uint8_t *k = (uint8_t *)key; for (int i = 15; i > 0; i--) b[i] = 256 + CacheKey_prev_table[k[i]] - k[i - 1]; b[0] = CacheKey_prev_table[k[0]]; @@ -992,8 +989,8 @@ free_CacheRemoveCont(CacheRemoveCont *cache_rm) TS_INLINE int CacheRemoveCont::event_handler(int event, void *data) { - (void) event; - (void) data; + (void)event; + (void)data; free_CacheRemoveCont(this); return EVENT_DONE; } @@ -1003,23 +1000,22 @@ int64_t cache_bytes_total(void); #ifdef DEBUG #define CACHE_DEBUG_INCREMENT_DYN_STAT(_x) CACHE_INCREMENT_DYN_STAT(_x) -#define CACHE_DEBUG_SUM_DYN_STAT(_x,_y) CACHE_SUM_DYN_STAT(_x,_y) +#define CACHE_DEBUG_SUM_DYN_STAT(_x, _y) CACHE_SUM_DYN_STAT(_x, _y) #else #define CACHE_DEBUG_INCREMENT_DYN_STAT(_x) -#define CACHE_DEBUG_SUM_DYN_STAT(_x,_y) +#define CACHE_DEBUG_SUM_DYN_STAT(_x, _y) #endif struct CacheHostRecord; struct Vol; class CacheHostTable; -struct Cache -{ +struct Cache { volatile int cache_read_done; volatile int total_good_nvol; volatile int total_nvol; volatile int ready; - int64_t cache_size; //in store block size + int64_t cache_size; // in store block size CacheHostTable *hosttable; volatile int total_initialized_vol; CacheType scheme; @@ -1027,30 +1023,22 @@ struct Cache int open(bool reconfigure, bool fix); int close(); - Action *lookup(Continuation *cont, CacheKey *key, CacheFragType type, char const* hostname, int host_len); + Action *lookup(Continuation *cont, CacheKey *key, CacheFragType type, char const *hostname, int host_len); inkcoreapi Action *open_read(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int len); - inkcoreapi Action *open_write(Continuation *cont, CacheKey *key, - CacheFragType frag_type, int options = 0, - time_t pin_in_cache = (time_t) 0, char *hostname = 0, int host_len = 0); - inkcoreapi Action *remove(Continuation *cont, CacheKey *key, - CacheFragType type = CACHE_FRAG_TYPE_HTTP, - bool user_agents = true, bool link = false, - char *hostname = 0, int host_len = 0); + inkcoreapi Action *open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type, int options = 0, + time_t pin_in_cache = (time_t)0, char *hostname = 0, int host_len = 0); + inkcoreapi Action *remove(Continuation *cont, CacheKey *key, CacheFragType type = CACHE_FRAG_TYPE_HTTP, bool user_agents = true, + bool link = false, char *hostname = 0, int host_len = 0); Action *scan(Continuation *cont, char *hostname = 0, int host_len = 0, int KB_per_second = 2500); #ifdef HTTP_CACHE Action *lookup(Continuation *cont, URL *url, CacheFragType type); - inkcoreapi Action *open_read(Continuation *cont, CacheKey *key, - CacheHTTPHdr *request, - CacheLookupHttpConfig *params, CacheFragType type, char *hostname, int host_len); - Action *open_read(Continuation *cont, URL *url, CacheHTTPHdr *request, - CacheLookupHttpConfig *params, CacheFragType type); - Action *open_write(Continuation *cont, CacheKey *key, - CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t) 0, - CacheKey *key1 = NULL, - CacheFragType type = CACHE_FRAG_TYPE_HTTP, char *hostname = 0, int host_len = 0); - Action *open_write(Continuation *cont, URL *url, CacheHTTPHdr *request, - CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t) 0, + inkcoreapi Action *open_read(Continuation *cont, CacheKey *key, CacheHTTPHdr *request, CacheLookupHttpConfig *params, + CacheFragType type, char *hostname, int host_len); + Action *open_read(Continuation *cont, URL *url, CacheHTTPHdr *request, CacheLookupHttpConfig *params, CacheFragType type); + Action *open_write(Continuation *cont, CacheKey *key, CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t)0, + CacheKey *key1 = NULL, CacheFragType type = CACHE_FRAG_TYPE_HTTP, char *hostname = 0, int host_len = 0); + Action *open_write(Continuation *cont, URL *url, CacheHTTPHdr *request, CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t)0, CacheFragType type = CACHE_FRAG_TYPE_HTTP); static void generate_key(INK_MD5 *md5, URL *url); #endif @@ -1062,12 +1050,13 @@ struct Cache int open_done(); - Vol *key_to_vol(CacheKey *key, char const* hostname, int host_len); + Vol *key_to_vol(CacheKey *key, char const *hostname, int host_len); Cache() - : cache_read_done(0), total_good_nvol(0), total_nvol(0), ready(CACHE_INITIALIZING), cache_size(0), // in store block size + : cache_read_done(0), total_good_nvol(0), total_nvol(0), ready(CACHE_INITIALIZING), cache_size(0), // in store block size hosttable(NULL), total_initialized_vol(0), scheme(CACHE_NONE_TYPE) - { } + { + } }; extern Cache *theCache; @@ -1076,14 +1065,13 @@ inkcoreapi extern Cache *caches[NUM_CACHE_FRAG_TYPES]; #ifdef HTTP_CACHE TS_INLINE Action * -Cache::open_read(Continuation *cont, CacheURL *url, CacheHTTPHdr *request, - CacheLookupHttpConfig *params, CacheFragType type) +Cache::open_read(Continuation *cont, CacheURL *url, CacheHTTPHdr *request, CacheLookupHttpConfig *params, CacheFragType type) { INK_MD5 md5; int len; url->hash_get(&md5); const char *hostname = url->host_get(&len); - return open_read(cont, &md5, request, params, type, (char *) hostname, len); + return open_read(cont, &md5, request, params, type, (char *)hostname, len); } TS_INLINE void @@ -1093,24 +1081,24 @@ Cache::generate_key(INK_MD5 *md5, URL *url) } TS_INLINE Action * -Cache::open_write(Continuation *cont, CacheURL *url, CacheHTTPHdr *request, - CacheHTTPInfo *old_info, time_t pin_in_cache, CacheFragType type) +Cache::open_write(Continuation *cont, CacheURL *url, CacheHTTPHdr *request, CacheHTTPInfo *old_info, time_t pin_in_cache, + CacheFragType type) { - (void) request; + (void)request; INK_MD5 url_md5; url->hash_get(&url_md5); int len; const char *hostname = url->host_get(&len); - return open_write(cont, &url_md5, old_info, pin_in_cache, NULL, type, (char *) hostname, len); + return open_write(cont, &url_md5, old_info, pin_in_cache, NULL, type, (char *)hostname, len); } #endif TS_INLINE unsigned int -cache_hash(INK_MD5 & md5) +cache_hash(INK_MD5 &md5) { uint64_t f = md5.fold(); - unsigned int mhash = (unsigned int) (f >> 32); + unsigned int mhash = (unsigned int)(f >> 32); return mhash; } @@ -1126,8 +1114,8 @@ cache_hash(INK_MD5 & md5) #endif TS_INLINE Action * -CacheProcessor::lookup(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED, - bool local_only ATS_UNUSED, CacheFragType frag_type, char *hostname, int host_len) +CacheProcessor::lookup(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED, bool local_only ATS_UNUSED, + CacheFragType frag_type, char *hostname, int host_len) { #ifdef CLUSTER_CACHE // Try to send remote, if not possible, handle locally @@ -1142,32 +1130,28 @@ CacheProcessor::lookup(Continuation *cont, CacheKey *key, bool cluster_cache_loc } TS_INLINE inkcoreapi Action * -CacheProcessor::open_read(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED, - CacheFragType frag_type, char *hostname, int host_len) +CacheProcessor::open_read(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED, CacheFragType frag_type, + char *hostname, int host_len) { #ifdef CLUSTER_CACHE if (cache_clustering_enabled > 0 && !cluster_cache_local) { - return open_read_internal(CACHE_OPEN_READ, cont, (MIOBuffer *) 0, - (CacheURL *) 0, (CacheHTTPHdr *) 0, - (CacheLookupHttpConfig *) 0, key, 0, frag_type, hostname, host_len); + return open_read_internal(CACHE_OPEN_READ, cont, (MIOBuffer *)0, (CacheURL *)0, (CacheHTTPHdr *)0, (CacheLookupHttpConfig *)0, + key, 0, frag_type, hostname, host_len); } #endif return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len); } TS_INLINE inkcoreapi Action * -CacheProcessor::open_write(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED, - CacheFragType frag_type, int expected_size ATS_UNUSED, int options, - time_t pin_in_cache, char *hostname, int host_len) +CacheProcessor::open_write(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED, CacheFragType frag_type, + int expected_size ATS_UNUSED, int options, time_t pin_in_cache, char *hostname, int host_len) { #ifdef CLUSTER_CACHE if (cache_clustering_enabled > 0 && !cluster_cache_local) { ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key)); if (m) - return Cluster_write(cont, expected_size, (MIOBuffer *) 0, m, - key, frag_type, options, pin_in_cache, - CACHE_OPEN_WRITE, key, (CacheURL *) 0, - (CacheHTTPHdr *) 0, (CacheHTTPInfo *) 0, hostname, host_len); + return Cluster_write(cont, expected_size, (MIOBuffer *)0, m, key, frag_type, options, pin_in_cache, CACHE_OPEN_WRITE, key, + (CacheURL *)0, (CacheHTTPHdr *)0, (CacheHTTPInfo *)0, hostname, host_len); } #endif return caches[frag_type]->open_write(cont, key, frag_type, options, pin_in_cache, hostname, host_len); @@ -1190,25 +1174,25 @@ CacheProcessor::remove(Continuation *cont, CacheKey *key, bool cluster_cache_loc return caches[frag_type]->remove(cont, key, frag_type, rm_user_agents, rm_link, hostname, host_len); } -# if 0 +#if 0 TS_INLINE Action * scan(Continuation *cont, char *hostname = 0, int host_len = 0, int KB_per_second = 2500) { return caches[CACHE_FRAG_TYPE_HTTP]->scan(cont, hostname, host_len, KB_per_second); } -# endif +#endif #ifdef HTTP_CACHE TS_INLINE Action * CacheProcessor::lookup(Continuation *cont, URL *url, bool cluster_cache_local, bool local_only, CacheFragType frag_type) { - (void) local_only; + (void)local_only; INK_MD5 md5; url->hash_get(&md5); int host_len = 0; const char *hostname = url->host_get(&host_len); - return lookup(cont, &md5, cluster_cache_local, local_only, frag_type, (char *) hostname, host_len); + return lookup(cont, &md5, cluster_cache_local, local_only, frag_type, (char *)hostname, host_len); } #endif @@ -1216,13 +1200,9 @@ CacheProcessor::lookup(Continuation *cont, URL *url, bool cluster_cache_local, b #ifdef CLUSTER_CACHE TS_INLINE Action * -CacheProcessor::open_read_internal(int opcode, - Continuation *cont, MIOBuffer *buf, - CacheURL *url, - CacheHTTPHdr *request, - CacheLookupHttpConfig *params, - CacheKey *key, - time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len) +CacheProcessor::open_read_internal(int opcode, Continuation *cont, MIOBuffer *buf, CacheURL *url, CacheHTTPHdr *request, + CacheLookupHttpConfig *params, CacheKey *key, time_t pin_in_cache, CacheFragType frag_type, + char *hostname, int host_len) { INK_MD5 url_md5; if ((opcode == CACHE_OPEN_READ_LONG) || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) { @@ -1233,11 +1213,9 @@ CacheProcessor::open_read_internal(int opcode, ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5)); if (m) { - return Cluster_read(m, opcode, cont, buf, url, - request, params, key, pin_in_cache, frag_type, hostname, host_len); + return Cluster_read(m, opcode, cont, buf, url, request, params, key, pin_in_cache, frag_type, hostname, host_len); } else { - if ((opcode == CACHE_OPEN_READ_LONG) - || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) { + if ((opcode == CACHE_OPEN_READ_LONG) || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) { return caches[frag_type]->open_read(cont, &url_md5, request, params, frag_type, hostname, host_len); } else { return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len); @@ -1248,8 +1226,8 @@ CacheProcessor::open_read_internal(int opcode, #ifdef CLUSTER_CACHE TS_INLINE Action * -CacheProcessor::link(Continuation *cont, CacheKey *from, CacheKey *to, bool cluster_cache_local, - CacheFragType type, char *hostname, int host_len) +CacheProcessor::link(Continuation *cont, CacheKey *from, CacheKey *to, bool cluster_cache_local, CacheFragType type, char *hostname, + int host_len) { if (cache_clustering_enabled > 0 && !cluster_cache_local) { // Use INK_MD5 in "from" to determine target machine
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cache/P_CacheTest.h ---------------------------------------------------------------------- diff --git a/iocore/cache/P_CacheTest.h b/iocore/cache/P_CacheTest.h index 8ef83ca..f3928e9 100644 --- a/iocore/cache/P_CacheTest.h +++ b/iocore/cache/P_CacheTest.h @@ -31,25 +31,21 @@ #define PINNED_DOC_TABLE_SIZE 16 #define PINNED_DOC_TABLES 246 -struct PinnedDocEntry -{ +struct PinnedDocEntry { CacheKey key; ink_time_t time; LINK(PinnedDocEntry, link); }; -struct PinnedDocTable: public Continuation -{ +struct PinnedDocTable : public Continuation { Queue<PinnedDocEntry> bucket[PINNED_DOC_TABLE_SIZE]; - void insert(CacheKey * key, ink_time_t time, int update); - int probe(CacheKey * key); - int remove(CacheKey * key); - int cleanup(int event, Event * e); + void insert(CacheKey *key, ink_time_t time, int update); + int probe(CacheKey *key); + int remove(CacheKey *key); + int cleanup(int event, Event *e); - PinnedDocTable():Continuation(new_ProxyMutex()) { - memset(bucket, 0, sizeof(Queue<PinnedDocEntry>) * PINNED_DOC_TABLE_SIZE); - } + PinnedDocTable() : Continuation(new_ProxyMutex()) { memset(bucket, 0, sizeof(Queue<PinnedDocEntry>) * PINNED_DOC_TABLE_SIZE); } }; struct CacheTestHost { @@ -58,8 +54,7 @@ struct CacheTestHost { double xprev_host_prob; double xnext_host_prob; - CacheTestHost():name(NULL), xlast_cachable_id(0), - xprev_host_prob(0), xnext_host_prob(0) {} + CacheTestHost() : name(NULL), xlast_cachable_id(0), xprev_host_prob(0), xnext_host_prob(0) {} }; struct CacheTestHeader { @@ -96,7 +91,9 @@ struct CacheTestSM : public RegressionSM { int check_result(int event); int complete(int event); int event_handler(int event, void *edata); - void make_request() { + void + make_request() + { start_time = ink_get_hrtime(); make_request_internal(); } @@ -104,13 +101,21 @@ struct CacheTestSM : public RegressionSM { virtual int open_read_callout(); virtual int open_write_callout(); - void cancel_timeout() { - if (timeout) timeout->cancel(); + void + cancel_timeout() + { + if (timeout) + timeout->cancel(); timeout = 0; } // RegressionSM API - void run() { MUTEX_LOCK(lock, mutex, this_ethread()); timeout = eventProcessor.schedule_imm(this); } + void + run() + { + MUTEX_LOCK(lock, mutex, this_ethread()); + timeout = eventProcessor.schedule_imm(this); + } virtual RegressionSM *clone() = 0; CacheTestSM(RegressionTest *t); @@ -119,13 +124,20 @@ struct CacheTestSM : public RegressionSM { }; // It is 2010 and C++ STILL doesn't have closures, a technology of the 1950s, unbelievable -#define CACHE_SM(_t, _sm, _f) \ - struct CacheTestSM__##_sm : public CacheTestSM { \ - void make_request_internal() _f \ - CacheTestSM__##_sm(RegressionTest *t) : CacheTestSM(t) {} \ +#define CACHE_SM(_t, _sm, _f) \ + struct CacheTestSM__##_sm : public CacheTestSM { \ + void \ + make_request_internal() _f CacheTestSM__##_sm(RegressionTest *t) \ + : CacheTestSM(t) \ + { \ + } \ CacheTestSM__##_sm(const CacheTestSM__##_sm &xsm) : CacheTestSM(xsm) {} \ - RegressionSM *clone() { return new CacheTestSM__##_sm(*this); } \ -} _sm(_t); + RegressionSM * \ + clone() \ + { \ + return new CacheTestSM__##_sm(*this); \ + } \ + } _sm(_t); void force_link_CacheTest(); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cache/P_CacheVol.h ---------------------------------------------------------------------- diff --git a/iocore/cache/P_CacheVol.h b/iocore/cache/P_CacheVol.h index 53f24c4..5545992 100644 --- a/iocore/cache/P_CacheVol.h +++ b/iocore/cache/P_CacheVol.h @@ -25,69 +25,67 @@ #ifndef _P_CACHE_VOL_H__ #define _P_CACHE_VOL_H__ -#define CACHE_BLOCK_SHIFT 9 -#define CACHE_BLOCK_SIZE (1<<CACHE_BLOCK_SHIFT) // 512, smallest sector size -#define ROUND_TO_STORE_BLOCK(_x) INK_ALIGN((_x), STORE_BLOCK_SIZE) -#define ROUND_TO_CACHE_BLOCK(_x) INK_ALIGN((_x), CACHE_BLOCK_SIZE) -#define ROUND_TO_SECTOR(_p, _x) INK_ALIGN((_x), _p->sector_size) -#define ROUND_TO(_x, _y) INK_ALIGN((_x), (_y)) +#define CACHE_BLOCK_SHIFT 9 +#define CACHE_BLOCK_SIZE (1 << CACHE_BLOCK_SHIFT) // 512, smallest sector size +#define ROUND_TO_STORE_BLOCK(_x) INK_ALIGN((_x), STORE_BLOCK_SIZE) +#define ROUND_TO_CACHE_BLOCK(_x) INK_ALIGN((_x), CACHE_BLOCK_SIZE) +#define ROUND_TO_SECTOR(_p, _x) INK_ALIGN((_x), _p->sector_size) +#define ROUND_TO(_x, _y) INK_ALIGN((_x), (_y)) // Vol (volumes) -#define VOL_MAGIC 0xF1D0F00D -#define START_BLOCKS 16 // 8k, STORE_BLOCK_SIZE -#define START_POS ((off_t)START_BLOCKS * CACHE_BLOCK_SIZE) -#define AGG_SIZE (4 * 1024 * 1024) // 4MB -#define AGG_HIGH_WATER (AGG_SIZE / 2) // 2MB -#define EVACUATION_SIZE (2 * AGG_SIZE) // 8MB -#define MAX_VOL_SIZE ((off_t)512 * 1024 * 1024 * 1024 * 1024) -#define STORE_BLOCKS_PER_CACHE_BLOCK (STORE_BLOCK_SIZE / CACHE_BLOCK_SIZE) -#define MAX_VOL_BLOCKS (MAX_VOL_SIZE / CACHE_BLOCK_SIZE) -#define MAX_FRAG_SIZE (AGG_SIZE - sizeofDoc) // true max -#define LEAVE_FREE DEFAULT_MAX_BUFFER_SIZE -#define PIN_SCAN_EVERY 16 // scan every 1/16 of disk -#define VOL_HASH_TABLE_SIZE 32707 -#define VOL_HASH_EMPTY 0xFFFF -#define VOL_HASH_ALLOC_SIZE (8 * 1024 * 1024) // one chance per this unit -#define LOOKASIDE_SIZE 256 -#define EVACUATION_BUCKET_SIZE (2 * EVACUATION_SIZE) // 16MB -#define RECOVERY_SIZE EVACUATION_SIZE // 8MB -#define AIO_NOT_IN_PROGRESS 0 -#define AIO_AGG_WRITE_IN_PROGRESS -1 -#define AUTO_SIZE_RAM_CACHE -1 // 1-1 with directory size -#define DEFAULT_TARGET_FRAGMENT_SIZE (1048576 - sizeofDoc) // 1MB - - -#define dir_offset_evac_bucket(_o) \ - (_o / (EVACUATION_BUCKET_SIZE / CACHE_BLOCK_SIZE)) +#define VOL_MAGIC 0xF1D0F00D +#define START_BLOCKS 16 // 8k, STORE_BLOCK_SIZE +#define START_POS ((off_t)START_BLOCKS * CACHE_BLOCK_SIZE) +#define AGG_SIZE (4 * 1024 * 1024) // 4MB +#define AGG_HIGH_WATER (AGG_SIZE / 2) // 2MB +#define EVACUATION_SIZE (2 * AGG_SIZE) // 8MB +#define MAX_VOL_SIZE ((off_t)512 * 1024 * 1024 * 1024 * 1024) +#define STORE_BLOCKS_PER_CACHE_BLOCK (STORE_BLOCK_SIZE / CACHE_BLOCK_SIZE) +#define MAX_VOL_BLOCKS (MAX_VOL_SIZE / CACHE_BLOCK_SIZE) +#define MAX_FRAG_SIZE (AGG_SIZE - sizeofDoc) // true max +#define LEAVE_FREE DEFAULT_MAX_BUFFER_SIZE +#define PIN_SCAN_EVERY 16 // scan every 1/16 of disk +#define VOL_HASH_TABLE_SIZE 32707 +#define VOL_HASH_EMPTY 0xFFFF +#define VOL_HASH_ALLOC_SIZE (8 * 1024 * 1024) // one chance per this unit +#define LOOKASIDE_SIZE 256 +#define EVACUATION_BUCKET_SIZE (2 * EVACUATION_SIZE) // 16MB +#define RECOVERY_SIZE EVACUATION_SIZE // 8MB +#define AIO_NOT_IN_PROGRESS 0 +#define AIO_AGG_WRITE_IN_PROGRESS -1 +#define AUTO_SIZE_RAM_CACHE -1 // 1-1 with directory size +#define DEFAULT_TARGET_FRAGMENT_SIZE (1048576 - sizeofDoc) // 1MB + + +#define dir_offset_evac_bucket(_o) (_o / (EVACUATION_BUCKET_SIZE / CACHE_BLOCK_SIZE)) #define dir_evac_bucket(_e) dir_offset_evac_bucket(dir_offset(_e)) #define offset_evac_bucket(_d, _o) \ dir_offset_evac_bucket((offset_to_vol_offset(_d, _o) // Documents -#define DOC_MAGIC ((uint32_t)0x5F129B13) -#define DOC_CORRUPT ((uint32_t)0xDEADBABE) -#define DOC_NO_CHECKSUM ((uint32_t)0xA0B0C0D0) +#define DOC_MAGIC ((uint32_t)0x5F129B13) +#define DOC_CORRUPT ((uint32_t)0xDEADBABE) +#define DOC_NO_CHECKSUM ((uint32_t)0xA0B0C0D0) -#define sizeofDoc (((uint32_t)(uintptr_t)&((Doc*)0)->checksum)+(uint32_t)sizeof(uint32_t)) +#define sizeofDoc (((uint32_t)(uintptr_t) & ((Doc *)0)->checksum) + (uint32_t)sizeof(uint32_t)) #if TS_USE_INTERIM_CACHE == 1 -struct InterimVolHeaderFooter -{ +struct InterimVolHeaderFooter { unsigned int magic; VersionNumber version; time_t create_time; off_t write_pos; off_t last_write_pos; off_t agg_pos; - uint32_t generation; // token generation (vary), this cannot be 0 + uint32_t generation; // token generation (vary), this cannot be 0 uint32_t phase; uint32_t cycle; uint32_t sync_serial; uint32_t write_serial; uint32_t dirty; uint32_t sector_size; - int32_t unused; // pad out to 8 byte boundary + int32_t unused; // pad out to 8 byte boundary }; #endif @@ -98,22 +96,21 @@ struct VolInitInfo; struct DiskVol; struct CacheVol; -struct VolHeaderFooter -{ +struct VolHeaderFooter { unsigned int magic; VersionNumber version; time_t create_time; off_t write_pos; off_t last_write_pos; off_t agg_pos; - uint32_t generation; // token generation (vary), this cannot be 0 + uint32_t generation; // token generation (vary), this cannot be 0 uint32_t phase; uint32_t cycle; uint32_t sync_serial; uint32_t write_serial; uint32_t dirty; uint32_t sector_size; - uint32_t unused; // pad out to 8 byte boundary + uint32_t unused; // pad out to 8 byte boundary #if TS_USE_INTERIM_CACHE == 1 InterimVolHeaderFooter interim_header[8]; #endif @@ -121,24 +118,20 @@ struct VolHeaderFooter }; // Key and Earliest key for each fragment that needs to be evacuated -struct EvacuationKey -{ +struct EvacuationKey { SLink<EvacuationKey> link; CryptoHash key; CryptoHash earliest_key; }; -struct EvacuationBlock -{ - union - { +struct EvacuationBlock { + union { unsigned int init; - struct - { - unsigned int done:1; // has been evacuated - unsigned int pinned:1; // check pinning timeout - unsigned int evacuate_head:1; // check pinning timeout - unsigned int unused:29; + struct { + unsigned int done : 1; // has been evacuated + unsigned int pinned : 1; // check pinning timeout + unsigned int evacuate_head : 1; // check pinning timeout + unsigned int unused : 29; } f; }; @@ -152,7 +145,7 @@ struct EvacuationBlock }; #if TS_USE_INTERIM_CACHE == 1 -#define MIGRATE_BUCKETS 1021 +#define MIGRATE_BUCKETS 1021 extern int migrate_threshold; extern int good_interim_disks; @@ -160,11 +153,11 @@ extern int good_interim_disks; union AccessEntry { uintptr_t v[2]; struct { - uint32_t next; - uint32_t prev; - uint32_t index; - uint16_t tag; - int16_t count; + uint32_t next; + uint32_t prev; + uint32_t index; + uint16_t tag; + int16_t count; } item; }; @@ -177,31 +170,37 @@ struct AccessHistory { AccessEntry *freelist; - void freeEntry(AccessEntry *entry) { - entry->v[0] = (uintptr_t) freelist; + void + freeEntry(AccessEntry *entry) + { + entry->v[0] = (uintptr_t)freelist; entry->v[1] = 0xABCD1234U; freelist = entry; } - void init(int size, int hash_size) { + void + init(int size, int hash_size) + { this->size = size; this->hash_size = hash_size; freelist = NULL; - base = (AccessEntry *) malloc(sizeof(AccessEntry) * size); - hash = (uint32_t *) malloc (sizeof(uint32_t) * hash_size); + base = (AccessEntry *)malloc(sizeof(AccessEntry) * size); + hash = (uint32_t *)malloc(sizeof(uint32_t) * hash_size); memset(hash, 0, sizeof(uint32_t) * hash_size); base[0].item.next = base[0].item.prev = 0; base[0].v[1] = 0xABCD1234UL; for (int i = size; --i > 0;) - freeEntry(&base[i]); + freeEntry(&base[i]); return; } - void remove(AccessEntry *entry) { + void + remove(AccessEntry *entry) + { if (entry == &(base[base[0].item.prev])) { // head base[0].item.prev = entry->item.next; } else { @@ -212,12 +211,14 @@ struct AccessHistory { } else { base[entry->item.next].item.prev = entry->item.prev; } - uint32_t hash_index = (uint32_t) (entry->item.index % hash_size); + uint32_t hash_index = (uint32_t)(entry->item.index % hash_size); hash[hash_index] = 0; } - void enqueue(AccessEntry *entry) { - uint32_t hash_index = (uint32_t) (entry->item.index % hash_size); + void + enqueue(AccessEntry *entry) + { + uint32_t hash_index = (uint32_t)(entry->item.index % hash_size); hash[hash_index] = entry - base; entry->item.prev = 0; @@ -228,7 +229,9 @@ struct AccessHistory { base[0].item.next = entry - base; } - AccessEntry* dequeue() { + AccessEntry * + dequeue() + { AccessEntry *tail = &base[base[0].item.next]; if (tail != base) remove(tail); @@ -236,10 +239,12 @@ struct AccessHistory { return tail; } - void set_in_progress(CryptoHash *key) { + void + set_in_progress(CryptoHash *key) + { uint32_t key_index = key->slice32(3); uint16_t tag = static_cast<uint16_t>(key->slice32(1)); - unsigned int hash_index = (uint32_t) (key_index % hash_size); + unsigned int hash_index = (uint32_t)(key_index % hash_size); uint32_t index = hash[hash_index]; AccessEntry *entry = &base[index]; @@ -248,10 +253,12 @@ struct AccessHistory { } } - void set_not_in_progress(CryptoHash *key) { + void + set_not_in_progress(CryptoHash *key) + { uint32_t key_index = key->slice32(3); uint16_t tag = static_cast<uint16_t>(key->slice32(1)); - unsigned int hash_index = (uint32_t) (key_index % hash_size); + unsigned int hash_index = (uint32_t)(key_index % hash_size); uint32_t index = hash[hash_index]; AccessEntry *entry = &base[index]; @@ -260,10 +267,12 @@ struct AccessHistory { } } - void put_key(CryptoHash *key) { + void + put_key(CryptoHash *key) + { uint32_t key_index = key->slice32(3); uint16_t tag = static_cast<uint16_t>(key->slice32(1)); - unsigned int hash_index = (uint32_t) (key_index % hash_size); + unsigned int hash_index = (uint32_t)(key_index % hash_size); uint32_t index = hash[hash_index]; AccessEntry *entry = &base[index]; @@ -280,7 +289,7 @@ struct AccessHistory { } } else { entry = freelist; - freelist = (AccessEntry *) entry->v[0]; + freelist = (AccessEntry *)entry->v[0]; } } else { // collation remove(entry); @@ -292,7 +301,9 @@ struct AccessHistory { } } - bool remove_key(CryptoHash *key) { + bool + remove_key(CryptoHash *key) + { unsigned int hash_index = static_cast<uint32_t>(key->slice32(3) % hash_size); uint32_t index = hash[hash_index]; AccessEntry *entry = &base[index]; @@ -304,27 +315,27 @@ struct AccessHistory { return false; } - bool is_hot(CryptoHash *key) { + bool + is_hot(CryptoHash *key) + { uint32_t key_index = key->slice32(3); - uint16_t tag = (uint16_t) key->slice32(1); - unsigned int hash_index = (uint32_t) (key_index % hash_size); + uint16_t tag = (uint16_t)key->slice32(1); + unsigned int hash_index = (uint32_t)(key_index % hash_size); uint32_t index = hash[hash_index]; AccessEntry *entry = &base[index]; - return (index != 0 && entry->item.tag == tag && entry->item.index == key_index - && entry->item.count >= migrate_threshold); + return (index != 0 && entry->item.tag == tag && entry->item.index == key_index && entry->item.count >= migrate_threshold); } }; struct InterimCacheVol; -struct MigrateToInterimCache -{ - MigrateToInterimCache() { } +struct MigrateToInterimCache { + MigrateToInterimCache() {} Ptr<IOBufferData> buf; uint32_t agg_len; - CacheKey key; + CacheKey key; Dir dir; InterimCacheVol *interim_vol; CacheVC *vc; @@ -335,8 +346,7 @@ struct MigrateToInterimCache LINK(MigrateToInterimCache, hash_link); }; -struct InterimCacheVol: public Continuation -{ +struct InterimCacheVol : public Continuation { ats_scoped_str hash_text; InterimVolHeaderFooter *header; @@ -347,7 +357,7 @@ struct InterimCacheVol: public Continuation bool recover_wrapped; off_t scan_pos; - off_t skip; // start of headers + off_t skip; // start of headers off_t start; // start of data off_t len; off_t data_blocks; @@ -362,26 +372,34 @@ struct InterimCacheVol: public Continuation Queue<MigrateToInterimCache, MigrateToInterimCache::Link_link> agg; int64_t transistor_range_threshold; bool sync; - bool is_io_in_progress() { + bool + is_io_in_progress() + { return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS; } int recover_data(); int handle_recover_from_data(int event, void *data); - void set_io_not_in_progress() { + void + set_io_not_in_progress() + { io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS; } int aggWrite(int event, void *e); int aggWriteDone(int event, void *e); - uint32_t round_to_approx_size (uint32_t l) { + uint32_t + round_to_approx_size(uint32_t l) + { uint32_t ll = round_to_approx_dir_size(l); return INK_ALIGN(ll, disk->hw_sector_size); } - void init(off_t s, off_t l, CacheDisk *interim, Vol *v, InterimVolHeaderFooter *hptr) { - char* seed_str = interim->hash_base_string ? interim->hash_base_string : interim->path; + void + init(off_t s, off_t l, CacheDisk *interim, Vol *v, InterimVolHeaderFooter *hptr) + { + char *seed_str = interim->hash_base_string ? interim->hash_base_string : interim->path; const size_t hash_seed_size = strlen(seed_str); const size_t hash_text_size = hash_seed_size + 32; @@ -401,7 +419,7 @@ struct InterimCacheVol: public Continuation agg_todo_size = 0; agg_buf_pos = 0; - agg_buffer = (char *) ats_memalign(sysconf(_SC_PAGESIZE), AGG_SIZE); + agg_buffer = (char *)ats_memalign(sysconf(_SC_PAGESIZE), AGG_SIZE); memset(agg_buffer, 0, AGG_SIZE); this->mutex = ((Continuation *)vol)->mutex; } @@ -414,8 +432,7 @@ void dir_clean_interimvol(InterimCacheVol *d); #endif -struct Vol: public Continuation -{ +struct Vol : public Continuation { char *path; ats_scoped_str hash_text; CryptoHash hash_id; @@ -430,8 +447,8 @@ struct Vol: public Continuation off_t recover_pos; off_t prev_recover_pos; off_t scan_pos; - off_t skip; // start of headers - off_t start; // start of data + off_t skip; // start of headers + off_t start; // start of data off_t len; off_t data_blocks; int hit_evacuate_window; @@ -479,7 +496,9 @@ struct Vol: public Continuation volatile int interim_done; - bool migrate_probe(CacheKey *key, MigrateToInterimCache **result) { + bool + migrate_probe(CacheKey *key, MigrateToInterimCache **result) + { uint32_t indx = key->slice32(3) % MIGRATE_BUCKETS; MigrateToInterimCache *m = mig_hash[indx].head; while (m != NULL && !(m->key == *key)) { @@ -490,17 +509,23 @@ struct Vol: public Continuation return m != NULL; } - void set_migrate_in_progress(MigrateToInterimCache *m) { + void + set_migrate_in_progress(MigrateToInterimCache *m) + { uint32_t indx = m->key.slice32(3) % MIGRATE_BUCKETS; mig_hash[indx].enqueue(m); } - void set_migrate_failed(MigrateToInterimCache *m) { + void + set_migrate_failed(MigrateToInterimCache *m) + { uint32_t indx = m->key.slice32(3) % MIGRATE_BUCKETS; mig_hash[indx].remove(m); } - void set_migrate_done(MigrateToInterimCache *m) { + void + set_migrate_done(MigrateToInterimCache *m) + { uint32_t indx = m->key.slice32(3) % MIGRATE_BUCKETS; mig_hash[indx].remove(m); history.remove_key(&m->key); @@ -543,11 +568,13 @@ struct Vol: public Continuation int dir_check(bool fix); int db_check(bool fix); - int is_io_in_progress() + int + is_io_in_progress() { return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS; } - int increment_generation() + int + increment_generation() { // this is stored in the offset field of the directory (!=0) ink_assert(mutex->thread_holding == this_ethread()); @@ -556,7 +583,8 @@ struct Vol: public Continuation header->generation++; return header->generation; } - void set_io_not_in_progress() + void + set_io_not_in_progress() { io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS; } @@ -579,33 +607,27 @@ struct Vol: public Continuation uint32_t round_to_approx_size(uint32_t l); Vol() - : Continuation(new_ProxyMutex()), path(NULL), fd(-1), - dir(0), buckets(0), recover_pos(0), prev_recover_pos(0), scan_pos(0), skip(0), start(0), - len(0), data_blocks(0), hit_evacuate_window(0), agg_todo_size(0), agg_buf_pos(0), trigger(0), - evacuate_size(0), disk(NULL), last_sync_serial(0), last_write_serial(0), recover_wrapped(false), - dir_sync_waiting(0), dir_sync_in_progress(0), writing_end_marker(0) { + : Continuation(new_ProxyMutex()), path(NULL), fd(-1), dir(0), buckets(0), recover_pos(0), prev_recover_pos(0), scan_pos(0), + skip(0), start(0), len(0), data_blocks(0), hit_evacuate_window(0), agg_todo_size(0), agg_buf_pos(0), trigger(0), + evacuate_size(0), disk(NULL), last_sync_serial(0), last_write_serial(0), recover_wrapped(false), dir_sync_waiting(0), + dir_sync_in_progress(0), writing_end_marker(0) + { open_dir.mutex = mutex; agg_buffer = (char *)ats_memalign(ats_pagesize(), AGG_SIZE); memset(agg_buffer, 0, AGG_SIZE); SET_HANDLER(&Vol::aggWrite); } - ~Vol() { - ats_memalign_free(agg_buffer); - } + ~Vol() { ats_memalign_free(agg_buffer); } }; -struct AIO_Callback_handler: public Continuation -{ +struct AIO_Callback_handler : public Continuation { int handle_disk_failure(int event, void *data); - AIO_Callback_handler():Continuation(new_ProxyMutex()) { - SET_HANDLER(&AIO_Callback_handler::handle_disk_failure); - } + AIO_Callback_handler() : Continuation(new_ProxyMutex()) { SET_HANDLER(&AIO_Callback_handler::handle_disk_failure); } }; -struct CacheVol -{ +struct CacheVol { int vol_number; int scheme; off_t size; @@ -616,28 +638,25 @@ struct CacheVol // per volume stats RecRawStatBlock *vol_rsb; - CacheVol() - : vol_number(-1), scheme(0), size(0), num_vols(0), vols(NULL), disk_vols(0), vol_rsb(0) - { } + CacheVol() : vol_number(-1), scheme(0), size(0), num_vols(0), vols(NULL), disk_vols(0), vol_rsb(0) {} }; // Note : hdr() needs to be 8 byte aligned. // If you change this, change sizeofDoc above -struct Doc -{ - uint32_t magic; // DOC_MAGIC - uint32_t len; // length of this fragment (including hlen & sizeof(Doc), unrounded) - uint64_t total_len; // total length of document - CryptoHash first_key; ///< first key in object. - CryptoHash key; ///< Key for this doc. - uint32_t hlen; ///< Length of this header. - uint32_t doc_type:8; ///< Doc type - indicates the format of this structure and its content. - uint32_t v_major:8; ///< Major version number. - uint32_t v_minor:8; ///< Minor version number. - uint32_t unused:8; ///< Unused, forced to zero. +struct Doc { + uint32_t magic; // DOC_MAGIC + uint32_t len; // length of this fragment (including hlen & sizeof(Doc), unrounded) + uint64_t total_len; // total length of document + CryptoHash first_key; ///< first key in object. + CryptoHash key; ///< Key for this doc. + uint32_t hlen; ///< Length of this header. + uint32_t doc_type : 8; ///< Doc type - indicates the format of this structure and its content. + uint32_t v_major : 8; ///< Major version number. + uint32_t v_minor : 8; ///< Minor version number. + uint32_t unused : 8; ///< Unused, forced to zero. uint32_t sync_serial; uint32_t write_serial; - uint32_t pinned; // pinned until + uint32_t pinned; // pinned until uint32_t checksum; uint32_t data_len(); @@ -660,16 +679,16 @@ extern unsigned short *vol_hash_table; // inline Functions TS_INLINE int -vol_headerlen(Vol *d) { - return ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter) + sizeof(uint16_t) * (d->segments-1)); +vol_headerlen(Vol *d) +{ + return ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter) + sizeof(uint16_t) * (d->segments - 1)); } TS_INLINE size_t vol_dirlen(Vol *d) { - return vol_headerlen(d) + - ROUND_TO_STORE_BLOCK(((size_t)d->buckets) * DIR_DEPTH * d->segments * SIZEOF_DIR) + - ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter)); + return vol_headerlen(d) + ROUND_TO_STORE_BLOCK(((size_t)d->buckets) * DIR_DEPTH * d->segments * SIZEOF_DIR) + + ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter)); } TS_INLINE int @@ -679,39 +698,31 @@ vol_direntries(Vol *d) } #if TS_USE_INTERIM_CACHE == 1 -#define vol_out_of_phase_valid(d, e) \ - (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start) / CACHE_BLOCK_SIZE)) +#define vol_out_of_phase_valid(d, e) (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start) / CACHE_BLOCK_SIZE)) -#define vol_out_of_phase_agg_valid(d, e) \ - (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start + AGG_SIZE) / CACHE_BLOCK_SIZE)) +#define vol_out_of_phase_agg_valid(d, e) (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start + AGG_SIZE) / CACHE_BLOCK_SIZE)) -#define vol_out_of_phase_write_valid(d, e) \ - (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start + AGG_SIZE) / CACHE_BLOCK_SIZE)) +#define vol_out_of_phase_write_valid(d, e) (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start + AGG_SIZE) / CACHE_BLOCK_SIZE)) -#define vol_in_phase_valid(d, e) \ - (dir_offset(e) - 1 < ((d->header->write_pos + d->agg_buf_pos - d->start) / CACHE_BLOCK_SIZE)) +#define vol_in_phase_valid(d, e) (dir_offset(e) - 1 < ((d->header->write_pos + d->agg_buf_pos - d->start) / CACHE_BLOCK_SIZE)) -#define vol_offset_to_offset(d, pos) \ - (d->start + pos * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE) +#define vol_offset_to_offset(d, pos) (d->start + pos * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE) -#define vol_dir_segment(d, s) \ - (Dir *) (((char *) d->dir) + (s * d->buckets) * DIR_DEPTH * SIZEOF_DIR) +#define vol_dir_segment(d, s) (Dir *)(((char *)d->dir) + (s * d->buckets) * DIR_DEPTH * SIZEOF_DIR) -#define offset_to_vol_offset(d, pos) \ - ((pos - d->start + CACHE_BLOCK_SIZE) / CACHE_BLOCK_SIZE) +#define offset_to_vol_offset(d, pos) ((pos - d->start + CACHE_BLOCK_SIZE) / CACHE_BLOCK_SIZE) -#define vol_offset(d, e) \ - ((d)->start + (off_t) ((off_t)dir_offset(e) * CACHE_BLOCK_SIZE) - CACHE_BLOCK_SIZE) +#define vol_offset(d, e) ((d)->start + (off_t)((off_t)dir_offset(e) * CACHE_BLOCK_SIZE) - CACHE_BLOCK_SIZE) -#define vol_in_phase_agg_buf_valid(d, e) \ - ((vol_offset(d, e) >= d->header->write_pos) && vol_offset(d, e) < (d->header->write_pos + d->agg_buf_pos)) +#define vol_in_phase_agg_buf_valid(d, e) \ + ((vol_offset(d, e) >= d->header->write_pos) && vol_offset(d, e) < (d->header->write_pos + d->agg_buf_pos)) -#define vol_transistor_range_valid(d, e) \ - ((d->header->agg_pos + d->transistor_range_threshold < d->start + d->len) ? \ - (vol_out_of_phase_write_valid(d, e) && \ - (dir_offset(e) <= ((d->header->agg_pos - d->start + d->transistor_range_threshold) / CACHE_BLOCK_SIZE))) : \ - ((dir_offset(e) <= ((d->header->agg_pos - d->start + d->transistor_range_threshold - d->len) / CACHE_BLOCK_SIZE)) || \ - (dir_offset(e) > ((d->header->agg_pos - d->start) / CACHE_BLOCK_SIZE)))) +#define vol_transistor_range_valid(d, e) \ + ((d->header->agg_pos + d->transistor_range_threshold < d->start + d->len) ? \ + (vol_out_of_phase_write_valid(d, e) && \ + (dir_offset(e) <= ((d->header->agg_pos - d->start + d->transistor_range_threshold) / CACHE_BLOCK_SIZE))) : \ + ((dir_offset(e) <= ((d->header->agg_pos - d->start + d->transistor_range_threshold - d->len) / CACHE_BLOCK_SIZE)) || \ + (dir_offset(e) > ((d->header->agg_pos - d->start) / CACHE_BLOCK_SIZE)))) #else @@ -742,7 +753,7 @@ vol_in_phase_valid(Vol *d, Dir *e) TS_INLINE off_t vol_offset(Vol *d, Dir *e) { - return d->start + (off_t) dir_offset(e) * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE; + return d->start + (off_t)dir_offset(e) * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE; } TS_INLINE off_t @@ -760,7 +771,7 @@ vol_offset_to_offset(Vol *d, off_t pos) TS_INLINE Dir * vol_dir_segment(Vol *d, int s) { - return (Dir *) (((char *) d->dir) + (s * d->buckets) * DIR_DEPTH * SIZEOF_DIR); + return (Dir *)(((char *)d->dir) + (s * d->buckets) * DIR_DEPTH * SIZEOF_DIR); } TS_INLINE int @@ -773,7 +784,7 @@ vol_in_phase_agg_buf_valid(Vol *d, Dir *e) TS_INLINE off_t vol_relative_length(Vol *v, off_t start_offset) { - return (v->len + v->skip) - start_offset; + return (v->len + v->skip) - start_offset; } TS_INLINE uint32_t @@ -797,13 +808,13 @@ Doc::single_fragment() TS_INLINE char * Doc::hdr() { - return reinterpret_cast<char*>(this) + sizeofDoc; + return reinterpret_cast<char *>(this) + sizeofDoc; } TS_INLINE char * Doc::data() { - return this->hdr() + hlen; + return this->hdr() + hlen; } int vol_dir_clear(Vol *d); @@ -872,64 +883,67 @@ Vol::within_hit_evacuate_window(Dir *xdir) } TS_INLINE uint32_t -Vol::round_to_approx_size(uint32_t l) { +Vol::round_to_approx_size(uint32_t l) +{ uint32_t ll = round_to_approx_dir_size(l); return ROUND_TO_SECTOR(this, ll); } #if TS_USE_INTERIM_CACHE == 1 inline bool -dir_valid(Vol *_d, Dir *_e) { +dir_valid(Vol *_d, Dir *_e) +{ if (!dir_ininterim(_e)) - return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) : - vol_out_of_phase_valid(_d, _e); + return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) : vol_out_of_phase_valid(_d, _e); else { int idx = dir_get_index(_e); - if (good_interim_disks <= 0 || idx >= _d->num_interim_vols) return false; + if (good_interim_disks <= 0 || idx >= _d->num_interim_vols) + return false; InterimCacheVol *sv = &(_d->interim_vols[idx]); - return !DISK_BAD(sv->disk) ? (sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) : - vol_out_of_phase_valid(sv, _e)) : false; + return !DISK_BAD(sv->disk) ? + (sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) : vol_out_of_phase_valid(sv, _e)) : + false; } } inline bool -dir_valid(InterimCacheVol *_d, Dir *_e) { +dir_valid(InterimCacheVol *_d, Dir *_e) +{ if (!dir_ininterim(_e)) return true; InterimCacheVol *sv = &(_d->vol->interim_vols[dir_get_index(_e)]); if (_d != sv) return true; - return !DISK_BAD(sv->disk) ? (sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) : - vol_out_of_phase_valid(sv, _e)) : false; - + return !DISK_BAD(sv->disk) ? (sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) : vol_out_of_phase_valid(sv, _e)) : + false; } inline bool -dir_agg_valid(Vol *_d, Dir *_e) { +dir_agg_valid(Vol *_d, Dir *_e) +{ if (!dir_ininterim(_e)) - return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) : - vol_out_of_phase_agg_valid(_d, _e); + return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) : vol_out_of_phase_agg_valid(_d, _e); else { int idx = dir_get_index(_e); - if(good_interim_disks <= 0 || idx >= _d->num_interim_vols) return false; + if (good_interim_disks <= 0 || idx >= _d->num_interim_vols) + return false; InterimCacheVol *sv = &(_d->interim_vols[idx]); - return sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) : - vol_out_of_phase_agg_valid(sv, _e); + return sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) : vol_out_of_phase_agg_valid(sv, _e); } } inline bool -dir_write_valid(Vol *_d, Dir *_e) { +dir_write_valid(Vol *_d, Dir *_e) +{ if (!dir_ininterim(_e)) - return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) : - vol_out_of_phase_write_valid(_d, _e); + return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) : vol_out_of_phase_write_valid(_d, _e); else { InterimCacheVol *sv = &(_d->interim_vols[dir_get_index(_e)]); - return sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) : - vol_out_of_phase_write_valid(sv, _e); + return sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) : vol_out_of_phase_write_valid(sv, _e); } } inline bool -dir_agg_buf_valid(Vol *_d, Dir *_e) { +dir_agg_buf_valid(Vol *_d, Dir *_e) +{ if (!dir_ininterim(_e)) return _d->header->phase == dir_phase(_e) && vol_in_phase_agg_buf_valid(_d, _e); else { @@ -939,7 +953,8 @@ dir_agg_buf_valid(Vol *_d, Dir *_e) { } inline bool -dir_agg_buf_valid(InterimCacheVol *_d, Dir *_e) { +dir_agg_buf_valid(InterimCacheVol *_d, Dir *_e) +{ return _d->header->phase == dir_phase(_e) && vol_in_phase_agg_buf_valid(_d, _e); } http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cache/P_RamCache.h ---------------------------------------------------------------------- diff --git a/iocore/cache/P_RamCache.h b/iocore/cache/P_RamCache.h index 9a0ae68..d4350fa 100644 --- a/iocore/cache/P_RamCache.h +++ b/iocore/cache/P_RamCache.h @@ -31,11 +31,12 @@ struct RamCache { // returns 1 on found/stored, 0 on not found/stored, if provided auxkey1 and auxkey2 must match virtual int get(INK_MD5 *key, Ptr<IOBufferData> *ret_data, uint32_t auxkey1 = 0, uint32_t auxkey2 = 0) = 0; - virtual int put(INK_MD5 *key, IOBufferData *data, uint32_t len, bool copy = false, uint32_t auxkey1 = 0, uint32_t auxkey2 = 0) = 0; + virtual int put(INK_MD5 *key, IOBufferData *data, uint32_t len, bool copy = false, uint32_t auxkey1 = 0, + uint32_t auxkey2 = 0) = 0; virtual int fixup(INK_MD5 *key, uint32_t old_auxkey1, uint32_t old_auxkey2, uint32_t new_auxkey1, uint32_t new_auxkey2) = 0; virtual void init(int64_t max_bytes, Vol *vol) = 0; - virtual ~RamCache() {}; + virtual ~RamCache(){}; }; RamCache *new_RamCacheLRU(); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cache/RamCacheCLFUS.cc ---------------------------------------------------------------------- diff --git a/iocore/cache/RamCacheCLFUS.cc b/iocore/cache/RamCacheCLFUS.cc index a1bfd79..6cfce51 100644 --- a/iocore/cache/RamCacheCLFUS.cc +++ b/iocore/cache/RamCacheCLFUS.cc @@ -34,14 +34,14 @@ #endif #define REQUIRED_COMPRESSION 0.9 // must get to this size or declared incompressible -#define REQUIRED_SHRINK 0.8 // must get to this size or keep orignal buffer (with padding) -#define HISTORY_HYSTERIA 10 // extra temporary history -#define ENTRY_OVERHEAD 256 // per-entry overhead to consider when computing cache value/size +#define REQUIRED_SHRINK 0.8 // must get to this size or keep orignal buffer (with padding) +#define HISTORY_HYSTERIA 10 // extra temporary history +#define ENTRY_OVERHEAD 256 // per-entry overhead to consider when computing cache value/size #define LZMA_BASE_MEMLIMIT (64 * 1024 * 1024) //#define CHECK_ACOUNTING 1 // very expensive double checking of all sizes #define REQUEUE_HITS(_h) ((_h) ? 1 : 0) -#define CACHE_VALUE_HITS_SIZE(_h, _s) ((float)((_h)+1) / ((_s) + ENTRY_OVERHEAD)) +#define CACHE_VALUE_HITS_SIZE(_h, _s) ((float)((_h) + 1) / ((_s) + ENTRY_OVERHEAD)) #define CACHE_VALUE(_x) CACHE_VALUE_HITS_SIZE((_x)->hits, (_x)->size) struct RamCacheCLFUSEntry { @@ -54,10 +54,10 @@ struct RamCacheCLFUSEntry { uint32_t compressed_len; union { struct { - uint32_t compressed:3; // compression type - uint32_t incompressible:1; - uint32_t lru:1; - uint32_t copy:1; // copy-in-copy-out + uint32_t compressed : 3; // compression type + uint32_t incompressible : 1; + uint32_t lru : 1; + uint32_t copy : 1; // copy-in-copy-out } flag_bits; uint32_t flags; }; @@ -83,7 +83,7 @@ struct RamCacheCLFUS : public RamCache { int64_t history; int ibuckets; int nbuckets; - DList(RamCacheCLFUSEntry, hash_link) *bucket; + DList(RamCacheCLFUSEntry, hash_link) * bucket; Que(RamCacheCLFUSEntry, lru_link) lru[2]; uint16_t *seen; int ncompressed; @@ -93,43 +93,43 @@ struct RamCacheCLFUS : public RamCache { void victimize(RamCacheCLFUSEntry *e); void move_compressed(RamCacheCLFUSEntry *e); RamCacheCLFUSEntry *destroy(RamCacheCLFUSEntry *e); - void requeue_victims(Que(RamCacheCLFUSEntry, lru_link) &victims); + void requeue_victims(Que(RamCacheCLFUSEntry, lru_link) & victims); void tick(); // move CLOCK on history - RamCacheCLFUS(): max_bytes(0), bytes(0), objects(0), vol(0), history(0), ibuckets(0), nbuckets(0), bucket(0), - seen(0), ncompressed(0), compressed(0) { } + RamCacheCLFUS() + : max_bytes(0), bytes(0), objects(0), vol(0), history(0), ibuckets(0), nbuckets(0), bucket(0), seen(0), ncompressed(0), + compressed(0) + { + } }; -class RamCacheCLFUSCompressor : public Continuation { +class RamCacheCLFUSCompressor : public Continuation +{ public: RamCacheCLFUS *rc; int mainEvent(int event, Event *e); - RamCacheCLFUSCompressor(RamCacheCLFUS *arc) - : rc(arc) - { - SET_HANDLER(&RamCacheCLFUSCompressor::mainEvent); - } + RamCacheCLFUSCompressor(RamCacheCLFUS *arc) : rc(arc) { SET_HANDLER(&RamCacheCLFUSCompressor::mainEvent); } }; int RamCacheCLFUSCompressor::mainEvent(int /* event ATS_UNUSED */, Event *e) { switch (cache_config_ram_cache_compress) { - default: - Warning("unknown RAM cache compression type: %d", cache_config_ram_cache_compress); - case CACHE_COMPRESSION_NONE: - case CACHE_COMPRESSION_FASTLZ: - break; - case CACHE_COMPRESSION_LIBZ: -#if ! TS_HAS_LIBZ - Warning("libz not available for RAM cache compression"); + default: + Warning("unknown RAM cache compression type: %d", cache_config_ram_cache_compress); + case CACHE_COMPRESSION_NONE: + case CACHE_COMPRESSION_FASTLZ: + break; + case CACHE_COMPRESSION_LIBZ: +#if !TS_HAS_LIBZ + Warning("libz not available for RAM cache compression"); #endif - break; - case CACHE_COMPRESSION_LIBLZMA: -#if ! TS_HAS_LZMA - Warning("lzma not available for RAM cache compression"); + break; + case CACHE_COMPRESSION_LIBLZMA: +#if !TS_HAS_LZMA + Warning("lzma not available for RAM cache compression"); #endif - break; + break; } if (cache_config_ram_cache_compress_percent) rc->compress_entries(e->ethread); @@ -138,11 +138,9 @@ RamCacheCLFUSCompressor::mainEvent(int /* event ATS_UNUSED */, Event *e) ClassAllocator<RamCacheCLFUSEntry> ramCacheCLFUSEntryAllocator("RamCacheCLFUSEntry"); -static const int bucket_sizes[] = { - 127, 251, 509, 1021, 2039, 4093, 8191, 16381, 32749, 65521, 131071, 262139, - 524287, 1048573, 2097143, 4194301, 8388593, 16777213, 33554393, 67108859, - 134217689, 268435399, 536870909, 1073741789, 2147483647 -}; +static const int bucket_sizes[] = {127, 251, 509, 1021, 2039, 4093, 8191, 16381, 32749, + 65521, 131071, 262139, 524287, 1048573, 2097143, 4194301, 8388593, 16777213, + 33554393, 67108859, 134217689, 268435399, 536870909, 1073741789, 2147483647}; void RamCacheCLFUS::resize_hashtable() @@ -165,7 +163,7 @@ RamCacheCLFUS::resize_hashtable() ats_free(seen); if (cache_config_ram_cache_use_seen_filter) { int size = bucket_sizes[ibuckets] * sizeof(uint16_t); - seen = (uint16_t*)ats_malloc(size); + seen = (uint16_t *)ats_malloc(size); memset(seen, 0, size); } } @@ -184,13 +182,21 @@ RamCacheCLFUS::init(int64_t abytes, Vol *avol) } #ifdef CHECK_ACOUNTING -static void check_accounting(RamCacheCLFUS *c) +static void +check_accounting(RamCacheCLFUS *c) { int64_t x = 0, xsize = 0, h = 0; RamCacheCLFUSEntry *y = c->lru[0].head; - while (y) { x++; xsize += y->size + ENTRY_OVERHEAD; y = y->lru_link.next; } + while (y) { + x++; + xsize += y->size + ENTRY_OVERHEAD; + y = y->lru_link.next; + } y = c->lru[1].head; - while (y) { h++; y = y->lru_link.next; } + while (y) { + h++; + y = y->lru_link.next; + } ink_assert(x == c->objects); ink_assert(xsize == c->bytes); ink_assert(h == c->history); @@ -215,32 +221,33 @@ RamCacheCLFUS::get(INK_MD5 *key, Ptr<IOBufferData> *ret_data, uint32_t auxkey1, if (!e->flag_bits.lru) { // in memory e->hits++; if (e->flag_bits.compressed) { - b = (char*)ats_malloc(e->len); + b = (char *)ats_malloc(e->len); switch (e->flag_bits.compressed) { - default: goto Lfailed; - case CACHE_COMPRESSION_FASTLZ: { - int l = (int)e->len; - if ((l != (int)fastlz_decompress(e->data->data(), e->compressed_len, b, l))) - goto Lfailed; - break; - } + default: + goto Lfailed; + case CACHE_COMPRESSION_FASTLZ: { + int l = (int)e->len; + if ((l != (int)fastlz_decompress(e->data->data(), e->compressed_len, b, l))) + goto Lfailed; + break; + } #if TS_HAS_LIBZ - case CACHE_COMPRESSION_LIBZ: { - uLongf l = e->len; - if (Z_OK != uncompress((Bytef*)b, &l, (Bytef*)e->data->data(), e->compressed_len)) - goto Lfailed; - break; - } + case CACHE_COMPRESSION_LIBZ: { + uLongf l = e->len; + if (Z_OK != uncompress((Bytef *)b, &l, (Bytef *)e->data->data(), e->compressed_len)) + goto Lfailed; + break; + } #endif #if TS_HAS_LZMA - case CACHE_COMPRESSION_LIBLZMA: { - size_t l = (size_t)e->len, ipos = 0, opos = 0; - uint64_t memlimit = e->len * 2 + LZMA_BASE_MEMLIMIT; - if (LZMA_OK != lzma_stream_buffer_decode( - &memlimit, 0, NULL, (uint8_t*)e->data->data(), &ipos, e->compressed_len, (uint8_t*)b, &opos, l)) - goto Lfailed; - break; - } + case CACHE_COMPRESSION_LIBLZMA: { + size_t l = (size_t)e->len, ipos = 0, opos = 0; + uint64_t memlimit = e->len * 2 + LZMA_BASE_MEMLIMIT; + if (LZMA_OK != lzma_stream_buffer_decode(&memlimit, 0, NULL, (uint8_t *)e->data->data(), &ipos, e->compressed_len, + (uint8_t *)b, &opos, l)) + goto Lfailed; + break; + } #endif } IOBufferData *data = new_xmalloc_IOBufferData(b, e->len); @@ -286,7 +293,9 @@ Lfailed: goto Lerror; } -void RamCacheCLFUS::tick() { +void +RamCacheCLFUS::tick() +{ RamCacheCLFUSEntry *e = lru[1].dequeue(); if (!e) return; @@ -378,13 +387,20 @@ RamCacheCLFUS::compress_entries(EThread *thread, int do_at_most) uint32_t l = 0; int ctype = cache_config_ram_cache_compress; switch (ctype) { - default: goto Lcontinue; - case CACHE_COMPRESSION_FASTLZ: l = (uint32_t)((double)e->len * 1.05 + 66); break; + default: + goto Lcontinue; + case CACHE_COMPRESSION_FASTLZ: + l = (uint32_t)((double)e->len * 1.05 + 66); + break; #if TS_HAS_LIBZ - case CACHE_COMPRESSION_LIBZ: l = (uint32_t)compressBound(e->len); break; + case CACHE_COMPRESSION_LIBZ: + l = (uint32_t)compressBound(e->len); + break; #endif #if TS_HAS_LZMA - case CACHE_COMPRESSION_LIBLZMA: l = e->len; break; + case CACHE_COMPRESSION_LIBLZMA: + l = e->len; + break; #endif } // store transient data for lock release @@ -392,33 +408,35 @@ RamCacheCLFUS::compress_entries(EThread *thread, int do_at_most) uint32_t elen = e->len; INK_MD5 key = e->key; MUTEX_UNTAKE_LOCK(vol->mutex, thread); - b = (char*)ats_malloc(l); + b = (char *)ats_malloc(l); bool failed = false; switch (ctype) { - default: goto Lfailed; - case CACHE_COMPRESSION_FASTLZ: - if (e->len < 16) goto Lfailed; - if ((l = fastlz_compress(edata->data(), elen, b)) <= 0) - failed = true; - break; + default: + goto Lfailed; + case CACHE_COMPRESSION_FASTLZ: + if (e->len < 16) + goto Lfailed; + if ((l = fastlz_compress(edata->data(), elen, b)) <= 0) + failed = true; + break; #if TS_HAS_LIBZ - case CACHE_COMPRESSION_LIBZ: { - uLongf ll = l; - if ((Z_OK != compress((Bytef*)b, &ll, (Bytef*)edata->data(), elen))) - failed = true; - l = (int)ll; - break; - } + case CACHE_COMPRESSION_LIBZ: { + uLongf ll = l; + if ((Z_OK != compress((Bytef *)b, &ll, (Bytef *)edata->data(), elen))) + failed = true; + l = (int)ll; + break; + } #endif #if TS_HAS_LZMA - case CACHE_COMPRESSION_LIBLZMA: { - size_t pos = 0, ll = l; - if (LZMA_OK != lzma_easy_buffer_encode(LZMA_PRESET_DEFAULT, LZMA_CHECK_NONE, NULL, - (uint8_t*)edata->data(), elen, (uint8_t*)b, &pos, ll)) - failed = true; - l = (int)pos; - break; - } + case CACHE_COMPRESSION_LIBLZMA: { + size_t pos = 0, ll = l; + if (LZMA_OK != lzma_easy_buffer_encode(LZMA_PRESET_DEFAULT, LZMA_CHECK_NONE, NULL, (uint8_t *)edata->data(), elen, + (uint8_t *)b, &pos, ll)) + failed = true; + l = (int)pos; + break; + } #endif } MUTEX_TAKE_LOCK(vol->mutex, thread); @@ -429,7 +447,8 @@ RamCacheCLFUS::compress_entries(EThread *thread, int do_at_most) uint32_t i = key.slice32(3) % nbuckets; RamCacheCLFUSEntry *ee = bucket[i].head; while (ee) { - if (ee->key == key && ee->data == edata) break; + if (ee->key == key && ee->data == edata) + break; ee = ee->hash_link.next; } if (!ee || ee != e) { @@ -443,7 +462,7 @@ RamCacheCLFUS::compress_entries(EThread *thread, int do_at_most) goto Lfailed; if (l < e->len) { e->flag_bits.compressed = cache_config_ram_cache_compress; - bb = (char*)ats_malloc(l); + bb = (char *)ats_malloc(l); memcpy(bb, b, l); ats_free(b); e->compressed_len = l; @@ -454,7 +473,7 @@ RamCacheCLFUS::compress_entries(EThread *thread, int do_at_most) } else { ats_free(b); e->flag_bits.compressed = 0; - bb = (char*)ats_malloc(e->len); + bb = (char *)ats_malloc(e->len); memcpy(bb, e->data->data(), e->len); int64_t delta = ((int64_t)e->len) - (int64_t)e->size; bytes += delta; @@ -470,11 +489,10 @@ RamCacheCLFUS::compress_entries(EThread *thread, int do_at_most) Lfailed: ats_free(b); e->flag_bits.incompressible = 1; - Lcontinue:; - DDebug("ram_cache", "compress %X %d %d %d %d %d %d %d", - e->key.slice32(3), e->auxkey1, e->auxkey2, - e->flag_bits.incompressible, e->flag_bits.compressed, - e->len, e->compressed_len, ncompressed); + Lcontinue: + ; + DDebug("ram_cache", "compress %X %d %d %d %d %d %d %d", e->key.slice32(3), e->auxkey1, e->auxkey2, e->flag_bits.incompressible, + e->flag_bits.compressed, e->len, e->compressed_len, ncompressed); if (!e->lru_link.next) break; compressed = e->lru_link.next; @@ -484,8 +502,7 @@ RamCacheCLFUS::compress_entries(EThread *thread, int do_at_most) return; } -void -RamCacheCLFUS::requeue_victims(Que(RamCacheCLFUSEntry, lru_link) &victims) +void RamCacheCLFUS::requeue_victims(Que(RamCacheCLFUSEntry, lru_link) & victims) { RamCacheCLFUSEntry *victim = 0; while ((victim = victims.dequeue())) { @@ -528,7 +545,7 @@ RamCacheCLFUS::put(INK_MD5 *key, IOBufferData *data, uint32_t len, bool copy, ui e->size = size; e->data = data; } else { - char *b = (char*)ats_malloc(len); + char *b = (char *)ats_malloc(len); memcpy(b, data->data(), len); e->data = new_xmalloc_IOBufferData(b, len); e->data->_mem_type = DEFAULT_ALLOC; @@ -584,8 +601,7 @@ RamCacheCLFUS::put(INK_MD5 *key, IOBufferData *data, uint32_t len, bool copy, ui if (bytes + victim->size + size > max_bytes && CACHE_VALUE(victim) > CACHE_VALUE(e)) { requeue_victims(victims); lru[1].enqueue(e); - DDebug("ram_cache", "put %X %d %d size %d INC %" PRId64" HISTORY", - key->slice32(3), auxkey1, auxkey2, e->size, e->hits); + DDebug("ram_cache", "put %X %d %d size %d INC %" PRId64 " HISTORY", key->slice32(3), auxkey1, auxkey2, e->size, e->hits); return 0; } } @@ -621,7 +637,7 @@ Linsert: if (!copy) e->data = data; else { - char *b = (char*)ats_malloc(len); + char *b = (char *)ats_malloc(len); memcpy(b, data->data(), len); e->data = new_xmalloc_IOBufferData(b, len); e->data->_mem_type = DEFAULT_ALLOC; @@ -655,8 +671,7 @@ Lhistory: } int -RamCacheCLFUS::fixup(INK_MD5 * key, uint32_t old_auxkey1, uint32_t old_auxkey2, uint32_t new_auxkey1, - uint32_t new_auxkey2) +RamCacheCLFUS::fixup(INK_MD5 *key, uint32_t old_auxkey1, uint32_t old_auxkey2, uint32_t new_auxkey1, uint32_t new_auxkey2) { if (!max_bytes) return 0;
