http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cache/RamCacheLRU.cc ---------------------------------------------------------------------- diff --git a/iocore/cache/RamCacheLRU.cc b/iocore/cache/RamCacheLRU.cc index fc3c949..249e193 100644 --- a/iocore/cache/RamCacheLRU.cc +++ b/iocore/cache/RamCacheLRU.cc @@ -34,7 +34,7 @@ struct RamCacheLRUEntry { #define ENTRY_OVERHEAD 128 // per-entry overhead to consider when computing sizes -struct RamCacheLRU: public RamCache { +struct RamCacheLRU : public RamCache { int64_t max_bytes; int64_t bytes; int64_t objects; @@ -49,7 +49,7 @@ struct RamCacheLRU: public RamCache { // private uint16_t *seen; Que(RamCacheLRUEntry, lru_link) lru; - DList(RamCacheLRUEntry, hash_link) *bucket; + DList(RamCacheLRUEntry, hash_link) * bucket; int nbuckets; int ibuckets; Vol *vol; @@ -57,18 +57,18 @@ struct RamCacheLRU: public RamCache { void resize_hashtable(); RamCacheLRUEntry *remove(RamCacheLRUEntry *e); - RamCacheLRU():bytes(0), objects(0), seen(0), bucket(0), nbuckets(0), ibuckets(0), vol(NULL) {} + RamCacheLRU() : bytes(0), objects(0), seen(0), bucket(0), nbuckets(0), ibuckets(0), vol(NULL) {} }; ClassAllocator<RamCacheLRUEntry> ramCacheLRUEntryAllocator("RamCacheLRUEntry"); -static const int bucket_sizes[] = { - 127, 251, 509, 1021, 2039, 4093, 8191, 16381, 32749, 65521, 131071, 262139, - 524287, 1048573, 2097143, 4194301, 8388593, 16777213, 33554393, 67108859, - 134217689, 268435399, 536870909 -}; +static const int bucket_sizes[] = {127, 251, 509, 1021, 2039, 4093, 8191, 16381, + 32749, 65521, 131071, 262139, 524287, 1048573, 2097143, 4194301, + 8388593, 16777213, 33554393, 67108859, 134217689, 268435399, 536870909}; -void RamCacheLRU::resize_hashtable() { +void +RamCacheLRU::resize_hashtable() +{ int anbuckets = bucket_sizes[ibuckets]; DDebug("ram_cache", "resize hashtable %d", anbuckets); int64_t s = anbuckets * sizeof(DList(RamCacheLRUEntry, hash_link)); @@ -87,13 +87,14 @@ void RamCacheLRU::resize_hashtable() { ats_free(seen); int size = bucket_sizes[ibuckets] * sizeof(uint16_t); if (cache_config_ram_cache_use_seen_filter) { - seen = (uint16_t*)ats_malloc(size); + seen = (uint16_t *)ats_malloc(size); memset(seen, 0, size); } } void -RamCacheLRU::init(int64_t abytes, Vol *avol) { +RamCacheLRU::init(int64_t abytes, Vol *avol) +{ vol = avol; max_bytes = abytes; DDebug("ram_cache", "initializing ram_cache %" PRId64 " bytes", abytes); @@ -103,7 +104,8 @@ RamCacheLRU::init(int64_t abytes, Vol *avol) { } int -RamCacheLRU::get(INK_MD5 * key, Ptr<IOBufferData> *ret_data, uint32_t auxkey1, uint32_t auxkey2) { +RamCacheLRU::get(INK_MD5 *key, Ptr<IOBufferData> *ret_data, uint32_t auxkey1, uint32_t auxkey2) +{ if (!max_bytes) return 0; uint32_t i = key->slice32(3) % nbuckets; @@ -124,7 +126,9 @@ RamCacheLRU::get(INK_MD5 * key, Ptr<IOBufferData> *ret_data, uint32_t auxkey1, u return 0; } -RamCacheLRUEntry * RamCacheLRU::remove(RamCacheLRUEntry *e) { +RamCacheLRUEntry * +RamCacheLRU::remove(RamCacheLRUEntry *e) +{ RamCacheLRUEntry *ret = e->hash_link.next; uint32_t b = e->key.slice32(3) % nbuckets; bucket[b].remove(e); @@ -139,7 +143,9 @@ RamCacheLRUEntry * RamCacheLRU::remove(RamCacheLRUEntry *e) { } // ignore 'copy' since we don't touch the data -int RamCacheLRU::put(INK_MD5 *key, IOBufferData *data, uint32_t len, bool, uint32_t auxkey1, uint32_t auxkey2) { +int +RamCacheLRU::put(INK_MD5 *key, IOBufferData *data, uint32_t len, bool, uint32_t auxkey1, uint32_t auxkey2) +{ if (!max_bytes) return 0; uint32_t i = key->slice32(3) % nbuckets; @@ -191,7 +197,9 @@ int RamCacheLRU::put(INK_MD5 *key, IOBufferData *data, uint32_t len, bool, uint3 return 1; } -int RamCacheLRU::fixup(INK_MD5 * key, uint32_t old_auxkey1, uint32_t old_auxkey2, uint32_t new_auxkey1, uint32_t new_auxkey2) { +int +RamCacheLRU::fixup(INK_MD5 *key, uint32_t old_auxkey1, uint32_t old_auxkey2, uint32_t new_auxkey1, uint32_t new_auxkey2) +{ if (!max_bytes) return 0; uint32_t i = key->slice32(3) % nbuckets; @@ -207,6 +215,8 @@ int RamCacheLRU::fixup(INK_MD5 * key, uint32_t old_auxkey1, uint32_t old_auxkey2 return 0; } -RamCache *new_RamCacheLRU() { +RamCache * +new_RamCacheLRU() +{ return new RamCacheLRU; }
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cache/Store.cc ---------------------------------------------------------------------- diff --git a/iocore/cache/Store.cc b/iocore/cache/Store.cc index debe16c..2fac7b4 100644 --- a/iocore/cache/Store.cc +++ b/iocore/cache/Store.cc @@ -40,10 +40,13 @@ static span_error_t make_span_error(int error) { switch (error) { - case ENOENT: return SPAN_ERROR_NOT_FOUND; + case ENOENT: + return SPAN_ERROR_NOT_FOUND; case EPERM: /* fallthru */ - case EACCES: return SPAN_ERROR_NO_ACCESS; - default: return SPAN_ERROR_UNKNOWN; + case EACCES: + return SPAN_ERROR_NO_ACCESS; + default: + return SPAN_ERROR_UNKNOWN; } } @@ -51,31 +54,38 @@ static const char * span_file_typename(mode_t st_mode) { switch (st_mode & S_IFMT) { - case S_IFBLK: return "block device"; - case S_IFCHR: return "character device"; - case S_IFDIR: return "directory"; - case S_IFREG: return "file"; - default: return "<unsupported>"; + case S_IFBLK: + return "block device"; + case S_IFCHR: + return "character device"; + case S_IFDIR: + return "directory"; + case S_IFREG: + return "file"; + default: + return "<unsupported>"; } } Ptr<ProxyMutex> tmp_p; -Store::Store():n_disks(0), disk(NULL) +Store::Store() + : n_disks(0), disk(NULL) #if TS_USE_INTERIM_CACHE == 1 - ,n_interim_disks(0), interim_disk(NULL) + , + n_interim_disks(0), interim_disk(NULL) #endif { } void -Store::add(Span * ds) +Store::add(Span *ds) { extend(n_disks + 1); disk[n_disks - 1] = ds; } void -Store::add(Store & s) +Store::add(Store &s) { // assume on different disks for (unsigned i = 0; i < s.n_disks; i++) { @@ -86,17 +96,16 @@ Store::add(Store & s) } - // should be changed to handle offset in more general // case (where this is not a free of a "just" allocated // store void -Store::free(Store & s) +Store::free(Store &s) { for (unsigned i = 0; i < s.n_disks; i++) { - for (Span * sd = s.disk[i]; sd; sd = sd->link.next) { + for (Span *sd = s.disk[i]; sd; sd = sd->link.next) { for (unsigned j = 0; j < n_disks; j++) - for (Span * d = disk[j]; d; d = d->link.next) + for (Span *d = disk[j]; d; d = d->link.next) if (!strcmp(sd->pathname, d->pathname)) { if (sd->offset < d->offset) d->offset = sd->offset; @@ -104,7 +113,8 @@ Store::free(Store & s) goto Lfound; } ink_release_assert(!"Store::free failed"); - Lfound:; + Lfound: + ; } } } @@ -112,7 +122,7 @@ Store::free(Store & s) void Store::sort() { - Span **vec = (Span **) alloca(sizeof(Span *) * n_disks); + Span **vec = (Span **)alloca(sizeof(Span *) * n_disks); memset(vec, 0, sizeof(Span *) * n_disks); for (unsigned i = 0; i < n_disks; i++) { vec[i] = disk[i]; @@ -123,7 +133,7 @@ Store::sort() unsigned n = 0; for (unsigned i = 0; i < n_disks; i++) { - for (Span * sd = vec[i]; sd; sd = vec[i]) { + for (Span *sd = vec[i]; sd; sd = vec[i]) { vec[i] = vec[i]->link.next; for (unsigned d = 0; d < n; d++) { if (sd->disk_id == disk[d]->disk_id) { @@ -133,7 +143,8 @@ Store::sort() } } disk[n++] = sd; - Ldone:; + Ldone: + ; } } n_disks = n; @@ -142,12 +153,11 @@ Store::sort() for (unsigned i = 0; i < n_disks; i++) { Lagain: - Span * prev = 0; - for (Span * sd = disk[i]; sd;) { + Span *prev = 0; + for (Span *sd = disk[i]; sd;) { Span *next = sd->link.next; if (next && - ((strcmp(sd->pathname, next->pathname) < 0) || - (!strcmp(sd->pathname, next->pathname) && sd->offset > next->offset))) { + ((strcmp(sd->pathname, next->pathname) < 0) || (!strcmp(sd->pathname, next->pathname) && sd->offset > next->offset))) { if (!prev) { disk[i] = next; sd->link.next = next->link.next; @@ -167,7 +177,7 @@ Store::sort() // merge adjacent spans for (unsigned i = 0; i < n_disks; i++) { - for (Span * sd = disk[i]; sd;) { + for (Span *sd = disk[i]; sd;) { Span *next = sd->link.next; if (next && !strcmp(sd->pathname, next->pathname)) { if (!sd->file_pathname) { @@ -211,7 +221,7 @@ Span::errorstr(span_error_t serr) } int -Span::path(char *filename, int64_t * aoffset, char *buf, int buflen) +Span::path(char *filename, int64_t *aoffset, char *buf, int buflen) { ink_assert(!aoffset); Span *ds = this; @@ -228,7 +238,7 @@ Span::path(char *filename, int64_t * aoffset, char *buf, int buflen) } void -Span::hash_base_string_set(char const* s) +Span::hash_base_string_set(char const *s) { hash_base_string = s ? ats_strdup(s) : NULL; } @@ -263,7 +273,7 @@ Span::~Span() } static int -get_int64(int fd, int64_t & data) +get_int64(int fd, int64_t &data) { char buf[PATH_NAME_MAX + 1]; if (ink_file_fd_readline(fd, PATH_NAME_MAX, buf) <= 0) @@ -284,7 +294,7 @@ Store::remove(char *n) Lagain: for (unsigned i = 0; i < n_disks; i++) { Span *p = NULL; - for (Span * sd = disk[i]; sd; sd = sd->link.next) { + for (Span *sd = disk[i]; sd; sd = sd->link.next) { if (!strcmp(n, sd->pathname)) { found = true; if (p) @@ -325,15 +335,16 @@ Store::read_config() char line[1024]; int len; while ((len = ink_file_fd_readline(fd, sizeof(line), line)) > 0) { - char const* path; - char const* seed = 0; + char const *path; + char const *seed = 0; // update lines ++ln; // Because the SimpleTokenizer is a bit too simple, we have to normalize whitespace. - for ( char *spot = line, *limit = line+len ; spot < limit ; ++spot ) - if (ParseRules::is_space(*spot)) *spot = ' '; // force whitespace to literal space. + for (char *spot = line, *limit = line + len; spot < limit; ++spot) + if (ParseRules::is_space(*spot)) + *spot = ' '; // force whitespace to literal space. SimpleTokenizer tokens(line, ' ', SimpleTokenizer::OVERWRITE_INPUT_STRING); @@ -347,21 +358,23 @@ Store::read_config() int64_t size = -1; int volume_num = -1; - char const* e; + char const *e; while (0 != (e = tokens.getNext())) { if (ParseRules::is_digit(*e)) { if ((size = ink_atoi64(e)) <= 0) { err = "error parsing size"; goto Lfail; } - } else if (0 == strncasecmp(HASH_BASE_STRING_KEY, e, sizeof(HASH_BASE_STRING_KEY)-1)) { + } else if (0 == strncasecmp(HASH_BASE_STRING_KEY, e, sizeof(HASH_BASE_STRING_KEY) - 1)) { e += sizeof(HASH_BASE_STRING_KEY) - 1; - if ('=' == *e) ++e; + if ('=' == *e) + ++e; if (*e && !ParseRules::is_space(*e)) seed = e; - } else if (0 == strncasecmp(VOLUME_KEY, e, sizeof(VOLUME_KEY)-1)) { + } else if (0 == strncasecmp(VOLUME_KEY, e, sizeof(VOLUME_KEY) - 1)) { e += sizeof(VOLUME_KEY) - 1; - if ('=' == *e) ++e; + if ('=' == *e) + ++e; if (!*e || !ParseRules::is_digit(*e) || 0 >= (volume_num = ink_atoi(e))) { err = "error parsing volume number"; goto Lfail; @@ -371,8 +384,8 @@ Store::read_config() char *pp = Layout::get()->relative(path); ns = new Span; - Debug("cache_init", "Store::read_config - ns = new Span; ns->init(\"%s\",%" PRId64 "), forced volume=%d%s%s", - pp, size, volume_num, seed ? " id=" : "", seed ? seed : ""); + Debug("cache_init", "Store::read_config - ns = new Span; ns->init(\"%s\",%" PRId64 "), forced volume=%d%s%s", pp, size, + volume_num, seed ? " id=" : "", seed ? seed : ""); if ((err = ns->init(pp, size))) { RecSignalWarning(REC_SIGNAL_SYSTEM_ERROR, "could not initialize storage \"%s\" [%s]", pp, err); Debug("cache_init", "Store::read_config - could not initialize storage \"%s\" [%s]", pp, err); @@ -384,8 +397,10 @@ Store::read_config() n_dsstore++; // Set side values if present. - if (seed) ns->hash_base_string_set(seed); - if (volume_num > 0) ns->volume_number_set(volume_num); + if (seed) + ns->hash_base_string_set(seed); + if (volume_num > 0) + ns->volume_number_set(volume_num); // new Span { @@ -402,7 +417,7 @@ Store::read_config() extend(n_dsstore); cur = sd; while (cur) { - Span* next = cur->link.next; + Span *next = cur->link.next; cur->link.next = NULL; disk[i++] = cur; cur = next; @@ -421,7 +436,8 @@ Lfail: #if TS_USE_INTERIM_CACHE == 1 const char * -Store::read_interim_config() { +Store::read_interim_config() +{ char p[PATH_NAME_MAX + 1]; Span *sd = NULL; Span *ns; @@ -449,7 +465,7 @@ Store::read_interim_config() { } n_interim_disks = interim_store; - interim_disk = (Span **) ats_malloc(interim_store * sizeof(Span *)); + interim_disk = (Span **)ats_malloc(interim_store * sizeof(Span *)); { int i = 0; while (sd) { @@ -467,9 +483,9 @@ int Store::write_config_data(int fd) const { for (unsigned i = 0; i < n_disks; i++) - for (Span * sd = disk[i]; sd; sd = sd->link.next) { + for (Span *sd = disk[i]; sd; sd = sd->link.next) { char buf[PATH_NAME_MAX + 64]; - snprintf(buf, sizeof(buf), "%s %" PRId64 "\n", sd->pathname.get(), (int64_t) sd->blocks * (int64_t) STORE_BLOCK_SIZE); + snprintf(buf, sizeof(buf), "%s %" PRId64 "\n", sd->pathname.get(), (int64_t)sd->blocks * (int64_t)STORE_BLOCK_SIZE); if (ink_file_fd_writestring(fd, buf) == -1) return (-1); } @@ -477,11 +493,11 @@ Store::write_config_data(int fd) const } const char * -Span::init(const char * path, int64_t size) +Span::init(const char *path, int64_t size) { - struct stat sbuf; - struct statvfs vbuf; - span_error_t serr; + struct stat sbuf; + struct statvfs vbuf; + span_error_t serr; ink_device_geometry geometry; ats_scoped_fd fd(socketManager.open(path, O_RDONLY)); @@ -596,7 +612,7 @@ Span::init(const char * path, int64_t size) int64_t newsz = MIN(size, this->size()); Note("cache %s '%s' is %" PRId64 " bytes, but the configured size is %" PRId64 " bytes, using the minimum", - span_file_typename(sbuf.st_mode), path, this->size(), size); + span_file_typename(sbuf.st_mode), path, this->size(), size); this->blocks = newsz / STORE_BLOCK_SIZE; } @@ -607,7 +623,7 @@ Span::init(const char * path, int64_t size) Debug("cache_init", "initialized span '%s'", this->pathname.get()); Debug("cache_init", "hw_sector_size=%d, size=%" PRId64 ", blocks=%" PRId64 ", disk_id=%" PRId64 "/%" PRId64 ", file_pathname=%d", - this->hw_sector_size, this->size(), this->blocks, this->disk_id[0], this->disk_id[1], this->file_pathname); + this->hw_sector_size, this->size(), this->blocks, this->disk_id[0], this->disk_id[1], this->file_pathname); return NULL; @@ -628,13 +644,13 @@ Store::normalize() } static unsigned int -try_alloc(Store & target, Span * source, unsigned int start_blocks, bool one_only = false) +try_alloc(Store &target, Span *source, unsigned int start_blocks, bool one_only = false) { unsigned int blocks = start_blocks; Span *ds = NULL; while (source && blocks) { if (source->blocks) { - unsigned int a; // allocated + unsigned int a; // allocated if (blocks > source->blocks) a = source->blocks; else @@ -660,7 +676,7 @@ try_alloc(Store & target, Span * source, unsigned int start_blocks, bool one_onl } void -Store::spread_alloc(Store & s, unsigned int blocks, bool mmapable) +Store::spread_alloc(Store &s, unsigned int blocks, bool mmapable) { // // Count the eligable disks.. @@ -695,13 +711,13 @@ Store::spread_alloc(Store & s, unsigned int blocks, bool mmapable) } void -Store::try_realloc(Store & s, Store & diff) +Store::try_realloc(Store &s, Store &diff) { for (unsigned i = 0; i < s.n_disks; i++) { Span *prev = 0; - for (Span * sd = s.disk[i]; sd;) { + for (Span *sd = s.disk[i]; sd;) { for (unsigned j = 0; j < n_disks; j++) - for (Span * d = disk[j]; d; d = d->link.next) + for (Span *d = disk[j]; d; d = d->link.next) if (!strcmp(sd->pathname, d->pathname)) { if (sd->offset >= d->offset && (sd->end() <= d->end())) { if (!sd->file_pathname || (sd->end() == d->end())) { @@ -734,7 +750,8 @@ Store::try_realloc(Store & s, Store & diff) sd = prev ? prev->link.next : s.disk[i]; continue; } - Lfound:; + Lfound: + ; prev = sd; sd = sd->link.next; } @@ -748,7 +765,7 @@ Store::try_realloc(Store & s, Store & diff) // Stupid grab first availabled space allocator // void -Store::alloc(Store & s, unsigned int blocks, bool one_only, bool mmapable) +Store::alloc(Store &s, unsigned int blocks, bool one_only, bool mmapable) { unsigned int oblocks = blocks; for (unsigned i = 0; blocks && i < n_disks; i++) { @@ -782,7 +799,7 @@ Span::write(int fd) const if (ink_file_fd_writestring(fd, buf) == -1) return (-1); - snprintf(buf, sizeof(buf), "%d\n", (int) is_mmapable()); + snprintf(buf, sizeof(buf), "%d\n", (int)is_mmapable()); if (ink_file_fd_writestring(fd, buf) == -1) return (-1); @@ -939,7 +956,7 @@ Span::dup() } void -Store::dup(Store & s) +Store::dup(Store &s) { s.n_disks = n_disks; s.disk = (Span **)ats_malloc(sizeof(Span *) * n_disks); @@ -963,7 +980,7 @@ Store::clear(char *filename, bool clear_dirs) int r = d->path(filename, NULL, path, PATH_NAME_MAX); if (r < 0) return -1; - int fd =::open(path, O_RDWR | O_CREAT, 0644); + int fd = ::open(path, O_RDWR | O_CREAT, 0644); if (fd < 0) return -1; for (int b = 0; d->blocks; b++) http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/ClusterAPI.cc ---------------------------------------------------------------------- diff --git a/iocore/cluster/ClusterAPI.cc b/iocore/cluster/ClusterAPI.cc index 3345713..7c08330 100644 --- a/iocore/cluster/ClusterAPI.cc +++ b/iocore/cluster/ClusterAPI.cc @@ -26,7 +26,7 @@ ClusterAPI.cc - Support for Cluster RPC API. + Support for Cluster RPC API. ****************************************************************************/ #include "P_Cluster.h" @@ -35,17 +35,16 @@ class ClusterAPIPeriodicSM; static void send_machine_online_list(TSClusterStatusHandle_t *); -typedef struct node_callout_entry -{ +typedef struct node_callout_entry { Ptr<ProxyMutex> mutex; TSClusterStatusFunction func; - int state; // See NE_STATE_XXX defines + int state; // See NE_STATE_XXX defines } node_callout_entry_t; -#define NE_STATE_FREE 0 -#define NE_STATE_INITIALIZED 1 +#define NE_STATE_FREE 0 +#define NE_STATE_INITIALIZED 1 -#define MAX_CLUSTERSTATUS_CALLOUTS 32 +#define MAX_CLUSTERSTATUS_CALLOUTS 32 static ProxyMutex *ClusterAPI_mutex; static ClusterAPIPeriodicSM *periodicSM; @@ -54,21 +53,17 @@ static node_callout_entry_t status_callouts[MAX_CLUSTERSTATUS_CALLOUTS]; static TSClusterRPCFunction RPC_Functions[API_END_CLUSTER_FUNCTION]; #define INDEX_TO_CLUSTER_STATUS_HANDLE(i) ((TSClusterStatusHandle_t)((i))) -#define CLUSTER_STATUS_HANDLE_TO_INDEX(h) ((int) ((h))) -#define NODE_HANDLE_TO_IP(h) (*((struct in_addr *) &((h)))) +#define CLUSTER_STATUS_HANDLE_TO_INDEX(h) ((int)((h))) +#define NODE_HANDLE_TO_IP(h) (*((struct in_addr *)&((h)))) #define RPC_FUNCTION_KEY_TO_CLUSTER_NUMBER(k) ((int)((k))) #define IP_TO_NODE_HANDLE(ip) ((TSNodeHandle_t)((ip))) -#define SIZEOF_RPC_MSG_LESS_DATA (sizeof(TSClusterRPCMsg_t) - \ - (sizeof(TSClusterRPCMsg_t) - sizeof(TSClusterRPCHandle_t))) +#define SIZEOF_RPC_MSG_LESS_DATA (sizeof(TSClusterRPCMsg_t) - (sizeof(TSClusterRPCMsg_t) - sizeof(TSClusterRPCHandle_t))) -typedef struct RPCHandle -{ - union - { // Note: All union elements are assumed to be the same size +typedef struct RPCHandle { + union { // Note: All union elements are assumed to be the same size // sizeof(u.internal) == sizeof(u.external) TSClusterRPCHandle_t external; - struct real_format - { + struct real_format { int cluster_function; int magic; } internal; @@ -78,39 +73,35 @@ typedef struct RPCHandle #define RPC_HANDLE_MAGIC 0x12345678 class MachineStatusSM; -typedef int (MachineStatusSM::*MachineStatusSMHandler) (int, void *); -class MachineStatusSM:public Continuation +typedef int (MachineStatusSM::*MachineStatusSMHandler)(int, void *); +class MachineStatusSM : public Continuation { public: // Broadcast constructor - MachineStatusSM(TSNodeHandle_t h, TSNodeStatus_t s):_node_handle(h), _node_status(s), _status_handle(0), - _broadcast(1), _restart(0), _next_n(0) + MachineStatusSM(TSNodeHandle_t h, TSNodeStatus_t s) + : _node_handle(h), _node_status(s), _status_handle(0), _broadcast(1), _restart(0), _next_n(0) { - SET_HANDLER((MachineStatusSMHandler) - & MachineStatusSM::MachineStatusSMEvent); + SET_HANDLER((MachineStatusSMHandler)&MachineStatusSM::MachineStatusSMEvent); } // Unicast constructor - MachineStatusSM(TSNodeHandle_t h, TSNodeStatus_t s, - TSClusterStatusHandle_t sh):_node_handle(h), _node_status(s), _status_handle(sh), - _broadcast(0), _restart(0), _next_n(0) + MachineStatusSM(TSNodeHandle_t h, TSNodeStatus_t s, TSClusterStatusHandle_t sh) + : _node_handle(h), _node_status(s), _status_handle(sh), _broadcast(0), _restart(0), _next_n(0) { - SET_HANDLER((MachineStatusSMHandler) - & MachineStatusSM::MachineStatusSMEvent); + SET_HANDLER((MachineStatusSMHandler)&MachineStatusSM::MachineStatusSMEvent); } // Send machine online list constructor -MachineStatusSM(TSClusterStatusHandle_t sh): - _node_handle(0), _node_status(NODE_ONLINE), _status_handle(sh), _broadcast(0), _restart(0), _next_n(0) { - SET_HANDLER((MachineStatusSMHandler) - & MachineStatusSM::MachineStatusSMEvent); - } - ~MachineStatusSM() { + MachineStatusSM(TSClusterStatusHandle_t sh) + : _node_handle(0), _node_status(NODE_ONLINE), _status_handle(sh), _broadcast(0), _restart(0), _next_n(0) + { + SET_HANDLER((MachineStatusSMHandler)&MachineStatusSM::MachineStatusSMEvent); } - int MachineStatusSMEvent(Event * e, void *d); + ~MachineStatusSM() {} + int MachineStatusSMEvent(Event *e, void *d); private: TSNodeHandle_t _node_handle; TSNodeStatus_t _node_status; - TSClusterStatusHandle_t _status_handle; // Valid only if !_broadcast + TSClusterStatusHandle_t _status_handle; // Valid only if !_broadcast int _broadcast; int _restart; int _next_n; @@ -129,7 +120,6 @@ MachineStatusSM::MachineStatusSMEvent(Event * /* e ATS_UNUSED */, void * /* d AT n = _restart ? _next_n : 0; for (; n < MAX_CLUSTERSTATUS_CALLOUTS; ++n) { if (status_callouts[n].func && (status_callouts[n].state == NE_STATE_INITIALIZED)) { - MUTEX_TRY_LOCK(lock, status_callouts[n].mutex, et); if (lock.is_locked()) { status_callouts[n].func(&_node_handle, _node_status); @@ -163,8 +153,8 @@ MachineStatusSM::MachineStatusSMEvent(Event * /* e ATS_UNUSED */, void * /* d AT nh = IP_TO_NODE_HANDLE(cc->machines[mi]->ip); status_callouts[n].func(&nh, NODE_ONLINE); - Debug("cluster_api", - "initial callout: n %d ([%u.%u.%u.%u], %d)", n, DOT_SEPARATED(cc->machines[mi]->ip), NODE_ONLINE); + Debug("cluster_api", "initial callout: n %d ([%u.%u.%u.%u], %d)", n, DOT_SEPARATED(cc->machines[mi]->ip), + NODE_ONLINE); } } } @@ -186,8 +176,7 @@ MachineStatusSM::MachineStatusSMEvent(Event * /* e ATS_UNUSED */, void * /* d AT if (lock.is_locked()) { status_callouts[n].func(&_node_handle, _node_status); - Debug("cluster_api", - "directed callout: n %d ([%u.%u.%u.%u], %d)", n, DOT_SEPARATED(_node_handle), _node_status); + Debug("cluster_api", "directed callout: n %d ([%u.%u.%u.%u], %d)", n, DOT_SEPARATED(_node_handle), _node_status); } else { _restart = 1; _next_n = n; @@ -201,23 +190,20 @@ MachineStatusSM::MachineStatusSMEvent(Event * /* e ATS_UNUSED */, void * /* d AT } class ClusterAPIPeriodicSM; -typedef int (ClusterAPIPeriodicSM::*ClusterAPIPeriodicSMHandler) (int, void *); -class ClusterAPIPeriodicSM:public Continuation +typedef int (ClusterAPIPeriodicSM::*ClusterAPIPeriodicSMHandler)(int, void *); +class ClusterAPIPeriodicSM : public Continuation { public: - ClusterAPIPeriodicSM(ProxyMutex * m):Continuation(m), _active_msmp(0) - { - SET_HANDLER((ClusterAPIPeriodicSMHandler) - & ClusterAPIPeriodicSM::ClusterAPIPeriodicSMEvent); - } - ~ClusterAPIPeriodicSM() + ClusterAPIPeriodicSM(ProxyMutex *m) : Continuation(m), _active_msmp(0) { + SET_HANDLER((ClusterAPIPeriodicSMHandler)&ClusterAPIPeriodicSM::ClusterAPIPeriodicSMEvent); } + ~ClusterAPIPeriodicSM() {} int ClusterAPIPeriodicSMEvent(int, void *); MachineStatusSM *GetNextSM(); private: - MachineStatusSM * _active_msmp; + MachineStatusSM *_active_msmp; }; static InkAtomicList status_callout_atomic_q; @@ -232,11 +218,10 @@ ClusterAPIPeriodicSM::GetNextSM() while (1) { msmp = status_callout_q.pop(); if (!msmp) { - msmp = (MachineStatusSM *) - ink_atomiclist_popall(&status_callout_atomic_q); + msmp = (MachineStatusSM *)ink_atomiclist_popall(&status_callout_atomic_q); if (msmp) { while (msmp) { - msmp_next = (MachineStatusSM *) msmp->link.next; + msmp_next = (MachineStatusSM *)msmp->link.next; msmp->link.next = 0; status_callout_q.push(msmp); msmp = msmp_next; @@ -277,11 +262,10 @@ void clusterAPI_init() { MachineStatusSM *mssmp = 0; - ink_atomiclist_init(&status_callout_atomic_q, - "cluster API status_callout_q", (char *) &mssmp->link.next - (char *) mssmp); + ink_atomiclist_init(&status_callout_atomic_q, "cluster API status_callout_q", (char *)&mssmp->link.next - (char *)mssmp); ClusterAPI_mutex = new_ProxyMutex(); MUTEX_TRY_LOCK(lock, ClusterAPI_mutex, this_ethread()); - ink_release_assert(lock.is_locked()); // Should never fail + ink_release_assert(lock.is_locked()); // Should never fail periodicSM = new ClusterAPIPeriodicSM(ClusterAPI_mutex); // TODO: Should we do something with this return value? @@ -296,7 +280,7 @@ clusterAPI_init() * called at plugin load time. */ int -TSAddClusterStatusFunction(TSClusterStatusFunction Status_Function, TSMutex m, TSClusterStatusHandle_t * h) +TSAddClusterStatusFunction(TSClusterStatusFunction Status_Function, TSMutex m, TSClusterStatusHandle_t *h) { Debug("cluster_api", "TSAddClusterStatusFunction func %p", Status_Function); int n; @@ -306,7 +290,7 @@ TSAddClusterStatusFunction(TSClusterStatusFunction Status_Function, TSMutex m, T MUTEX_TAKE_LOCK(ClusterAPI_mutex, e); for (n = 0; n < MAX_CLUSTERSTATUS_CALLOUTS; ++n) { if (!status_callouts[n].func) { - status_callouts[n].mutex = (ProxyMutex *) m; + status_callouts[n].mutex = (ProxyMutex *)m; status_callouts[n].func = Status_Function; MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e); *h = INDEX_TO_CLUSTER_STATUS_HANDLE(n); @@ -327,7 +311,7 @@ TSAddClusterStatusFunction(TSClusterStatusFunction Status_Function, TSMutex m, T * called at plugin unload time (unload currently not supported). */ int -TSDeleteClusterStatusFunction(TSClusterStatusHandle_t * h) +TSDeleteClusterStatusFunction(TSClusterStatusHandle_t *h) { int n = CLUSTER_STATUS_HANDLE_TO_INDEX(*h); EThread *e = this_ethread(); @@ -337,7 +321,7 @@ TSDeleteClusterStatusFunction(TSClusterStatusHandle_t * h) MUTEX_TAKE_LOCK(ClusterAPI_mutex, e); status_callouts[n].mutex = 0; - status_callouts[n].func = (TSClusterStatusFunction) 0; + status_callouts[n].func = (TSClusterStatusFunction)0; status_callouts[n].state = NE_STATE_FREE; MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e); @@ -345,14 +329,14 @@ TSDeleteClusterStatusFunction(TSClusterStatusHandle_t * h) } int -TSNodeHandleToIPAddr(TSNodeHandle_t * h, struct in_addr *in) +TSNodeHandleToIPAddr(TSNodeHandle_t *h, struct in_addr *in) { *in = NODE_HANDLE_TO_IP(*h); return 0; } void -TSGetMyNodeHandle(TSNodeHandle_t * h) +TSGetMyNodeHandle(TSNodeHandle_t *h) { *h = IP_TO_NODE_HANDLE((this_cluster_machine())->ip); } @@ -364,7 +348,7 @@ TSGetMyNodeHandle(TSNodeHandle_t * h) * callouts are updates to the state obtained at this point. */ void -TSEnableClusterStatusCallout(TSClusterStatusHandle_t * h) +TSEnableClusterStatusCallout(TSClusterStatusHandle_t *h) { int ci = CLUSTER_STATUS_HANDLE_TO_INDEX(*h); // This isn't used. @@ -380,11 +364,11 @@ TSEnableClusterStatusCallout(TSClusterStatusHandle_t * h) } static void -send_machine_online_list(TSClusterStatusHandle_t * h) +send_machine_online_list(TSClusterStatusHandle_t *h) { MachineStatusSM *msm = new MachineStatusSM(*h); - ink_atomiclist_push(&status_callout_atomic_q, (void *) msm); + ink_atomiclist_push(&status_callout_atomic_q, (void *)msm); } /* @@ -393,11 +377,11 @@ send_machine_online_list(TSClusterStatusHandle_t * h) // This doesn't seem to be used... #ifdef NOT_USED_HERE static void -directed_machine_online(int Ipaddr, TSClusterStatusHandle_t * h) +directed_machine_online(int Ipaddr, TSClusterStatusHandle_t *h) { MachineStatusSM *msm = new MachineStatusSM(IP_TO_NODE_HANDLE(Ipaddr), NODE_ONLINE, *h); - ink_atomiclist_push(&status_callout_atomic_q, (void *) msm); + ink_atomiclist_push(&status_callout_atomic_q, (void *)msm); } #endif @@ -409,7 +393,7 @@ machine_online_APIcallout(int Ipaddr) { MachineStatusSM *msm = new MachineStatusSM(IP_TO_NODE_HANDLE(Ipaddr), NODE_ONLINE); - ink_atomiclist_push(&status_callout_atomic_q, (void *) msm); + ink_atomiclist_push(&status_callout_atomic_q, (void *)msm); } /* @@ -420,7 +404,7 @@ machine_offline_APIcallout(int Ipaddr) { MachineStatusSM *msm = new MachineStatusSM(IP_TO_NODE_HANDLE(Ipaddr), NODE_OFFLINE); - ink_atomiclist_push(&status_callout_atomic_q, (void *) msm); + ink_atomiclist_push(&status_callout_atomic_q, (void *)msm); } /* @@ -430,15 +414,14 @@ machine_offline_APIcallout(int Ipaddr) * called at plugin load time. */ int -TSAddClusterRPCFunction(TSClusterRPCKey_t k, TSClusterRPCFunction func, TSClusterRPCHandle_t * h) +TSAddClusterRPCFunction(TSClusterRPCKey_t k, TSClusterRPCFunction func, TSClusterRPCHandle_t *h) { RPCHandle_t handle; int n = RPC_FUNCTION_KEY_TO_CLUSTER_NUMBER(k); EThread *e = this_ethread(); ink_release_assert(func); - ink_release_assert((n >= API_STARECT_CLUSTER_FUNCTION) - && (n <= API_END_CLUSTER_FUNCTION)); + ink_release_assert((n >= API_STARECT_CLUSTER_FUNCTION) && (n <= API_END_CLUSTER_FUNCTION)); Debug("cluster_api", "TSAddClusterRPCFunction: key %d func %p", k, func); handle.u.internal.cluster_function = n; @@ -460,13 +443,13 @@ TSAddClusterRPCFunction(TSClusterRPCKey_t k, TSClusterRPCFunction func, TSCluste * called at plugin unload time (unload currently not supported). */ int -TSDeleteClusterRPCFunction(TSClusterRPCHandle_t * rpch) +TSDeleteClusterRPCFunction(TSClusterRPCHandle_t *rpch) { - RPCHandle_t *h = (RPCHandle_t *) rpch; + RPCHandle_t *h = (RPCHandle_t *)rpch; EThread *e = this_ethread(); - ink_release_assert(((h->u.internal.cluster_function >= API_STARECT_CLUSTER_FUNCTION) - && (h->u.internal.cluster_function <= API_END_CLUSTER_FUNCTION))); + ink_release_assert(((h->u.internal.cluster_function >= API_STARECT_CLUSTER_FUNCTION) && + (h->u.internal.cluster_function <= API_END_CLUSTER_FUNCTION))); Debug("cluster_api", "TSDeleteClusterRPCFunction: n %d", h->u.internal.cluster_function); MUTEX_TAKE_LOCK(ClusterAPI_mutex, e); @@ -483,20 +466,19 @@ default_api_ClusterFunction(ClusterHandler *ch, void *data, int len) { Debug("cluster_api", "default_api_ClusterFunction: [%u.%u.%u.%u] data %p len %d", DOT_SEPARATED(ch->machine->ip), data, len); - TSClusterRPCMsg_t *msg = (TSClusterRPCMsg_t *) data; - RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle; + TSClusterRPCMsg_t *msg = (TSClusterRPCMsg_t *)data; + RPCHandle_t *rpch = (RPCHandle_t *)&msg->m_handle; int cluster_function = rpch->u.internal.cluster_function; - ink_release_assert((size_t) len >= sizeof(TSClusterRPCMsg_t)); - ink_release_assert(((cluster_function >= API_STARECT_CLUSTER_FUNCTION) - && (cluster_function <= API_END_CLUSTER_FUNCTION))); + ink_release_assert((size_t)len >= sizeof(TSClusterRPCMsg_t)); + ink_release_assert(((cluster_function >= API_STARECT_CLUSTER_FUNCTION) && (cluster_function <= API_END_CLUSTER_FUNCTION))); if (cluster_function < API_END_CLUSTER_FUNCTION && RPC_Functions[cluster_function]) { int msg_data_len = len - SIZEOF_RPC_MSG_LESS_DATA; TSNodeHandle_t nh = IP_TO_NODE_HANDLE(ch->machine->ip); - (*RPC_Functions[cluster_function]) (&nh, msg, msg_data_len); + (*RPC_Functions[cluster_function])(&nh, msg, msg_data_len); } else { - clusterProcessor.free_remote_data((char *) data, len); + clusterProcessor.free_remote_data((char *)data, len); } } @@ -504,25 +486,25 @@ default_api_ClusterFunction(ClusterHandler *ch, void *data, int len) * Free TSClusterRPCMsg_t received via the RPC function. */ void -TSFreeRPCMsg(TSClusterRPCMsg_t * msg, int msg_data_len) +TSFreeRPCMsg(TSClusterRPCMsg_t *msg, int msg_data_len) { - RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle; + RPCHandle_t *rpch = (RPCHandle_t *)&msg->m_handle; ink_release_assert(rpch->u.internal.magic == RPC_HANDLE_MAGIC); Debug("cluster_api", "TSFreeRPCMsg: msg %p msg_data_len %d", msg, msg_data_len); - clusterProcessor.free_remote_data((char *) msg, msg_data_len + SIZEOF_RPC_MSG_LESS_DATA); + clusterProcessor.free_remote_data((char *)msg, msg_data_len + SIZEOF_RPC_MSG_LESS_DATA); } /* * Allocate a message structure for use in the call to TSSendClusterRPC(). */ TSClusterRPCMsg_t * -TSAllocClusterRPCMsg(TSClusterRPCHandle_t * h, int data_size) +TSAllocClusterRPCMsg(TSClusterRPCHandle_t *h, int data_size) { ink_assert(data_size >= 4); if (data_size < 4) { /* Message must be at least 4 bytes in length */ - return (TSClusterRPCMsg_t *) 0; + return (TSClusterRPCMsg_t *)0; } TSClusterRPCMsg_t *rpcm; @@ -530,9 +512,9 @@ TSAllocClusterRPCMsg(TSClusterRPCHandle_t * h, int data_size) c->len = sizeof(OutgoingControl *) + SIZEOF_RPC_MSG_LESS_DATA + data_size; c->alloc_data(); - *((OutgoingControl **) c->data) = c; + *((OutgoingControl **)c->data) = c; - rpcm = (TSClusterRPCMsg_t *) (c->data + sizeof(OutgoingControl *)); + rpcm = (TSClusterRPCMsg_t *)(c->data + sizeof(OutgoingControl *)); rpcm->m_handle = *h; /* @@ -548,25 +530,24 @@ TSAllocClusterRPCMsg(TSClusterRPCHandle_t * h, int data_size) * Send the given message to the specified node. */ int -TSSendClusterRPC(TSNodeHandle_t * nh, TSClusterRPCMsg_t * msg) +TSSendClusterRPC(TSNodeHandle_t *nh, TSClusterRPCMsg_t *msg) { struct in_addr ipaddr = NODE_HANDLE_TO_IP(*nh); - RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle; + RPCHandle_t *rpch = (RPCHandle_t *)&msg->m_handle; - OutgoingControl *c = *((OutgoingControl **) - ((char *) msg - sizeof(OutgoingControl *))); - ClusterConfiguration * cc = this_cluster()->current_configuration(); + OutgoingControl *c = *((OutgoingControl **)((char *)msg - sizeof(OutgoingControl *))); + ClusterConfiguration *cc = this_cluster()->current_configuration(); ClusterMachine *m; ink_release_assert(rpch->u.internal.magic == RPC_HANDLE_MAGIC); if ((m = cc->find(ipaddr.s_addr))) { int len = c->len - sizeof(OutgoingControl *); - ink_release_assert((size_t) len >= sizeof(TSClusterRPCMsg_t)); + ink_release_assert((size_t)len >= sizeof(TSClusterRPCMsg_t)); Debug("cluster_api", "TSSendClusterRPC: msg %p dlen %d [%u.%u.%u.%u] sent", msg, len, DOT_SEPARATED(ipaddr.s_addr)); - clusterProcessor.invoke_remote(m->pop_ClusterHandler(), rpch->u.internal.cluster_function, - msg, len, (CLUSTER_OPT_STEAL | CLUSTER_OPT_DATA_IS_OCONTROL)); + clusterProcessor.invoke_remote(m->pop_ClusterHandler(), rpch->u.internal.cluster_function, msg, len, + (CLUSTER_OPT_STEAL | CLUSTER_OPT_DATA_IS_OCONTROL)); } else { Debug("cluster_api", "TSSendClusterRPC: msg %p to [%u.%u.%u.%u] dropped", msg, DOT_SEPARATED(ipaddr.s_addr)); c->freeall();
