http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/ClusterCache.cc ---------------------------------------------------------------------- diff --git a/iocore/cluster/ClusterCache.cc b/iocore/cluster/ClusterCache.cc index d6315d9..198d8e1 100644 --- a/iocore/cluster/ClusterCache.cc +++ b/iocore/cluster/ClusterCache.cc @@ -29,7 +29,7 @@ #include "P_Cluster.h" #ifdef DEBUG -#define CLUSTER_TEST_DEBUG 1 +#define CLUSTER_TEST_DEBUG 1 #endif #ifdef ENABLE_TIME_TRACE @@ -62,7 +62,7 @@ static Queue<CacheContinuation> remoteCacheContQueue[REMOTE_CONNECT_HASH]; static Ptr<ProxyMutex> remoteCacheContQueueMutex[REMOTE_CONNECT_HASH]; // 0 is an illegal sequence number -#define CACHE_NO_RESPONSE 0 +#define CACHE_NO_RESPONSE 0 static int cluster_sequence_number = 1; #ifdef CLUSTER_TEST_DEBUG @@ -78,12 +78,11 @@ static CacheContinuation *find_cache_continuation(unsigned int, unsigned int); static unsigned int new_cache_sequence_number(); -#define DOT_SEPARATED(_x) \ -((unsigned char*)&(_x))[0], ((unsigned char*)&(_x))[1], \ - ((unsigned char*)&(_x))[2], ((unsigned char*)&(_x))[3] +#define DOT_SEPARATED(_x) \ + ((unsigned char *)&(_x))[0], ((unsigned char *)&(_x))[1], ((unsigned char *)&(_x))[2], ((unsigned char *)&(_x))[3] -#define ET_CACHE_CONT_SM ET_NET -#define ALLOW_THREAD_STEAL true +#define ET_CACHE_CONT_SM ET_NET +#define ALLOW_THREAD_STEAL true /**********************************************************************/ #ifdef CACHE_MSG_TRACE @@ -93,9 +92,8 @@ static unsigned int new_cache_sequence_number(); // Debug trace support for cache RPC messages /**********************************************************************/ -#define MAX_TENTRIES 4096 -struct traceEntry -{ +#define MAX_TENTRIES 4096 +struct traceEntry { unsigned int seqno; int op; char *type; @@ -132,8 +130,8 @@ dump_recvtrace_table() int n; printf("\n"); for (n = 0; n < MAX_TENTRIES; ++n) - printf("[%d] seqno=%d, op=%d type=%s\n", n, recvTraceTable[n].seqno, - recvTraceTable[n].op, recvTraceTable[n].type ? recvTraceTable[n].type : ""); + printf("[%d] seqno=%d, op=%d type=%s\n", n, recvTraceTable[n].seqno, recvTraceTable[n].op, + recvTraceTable[n].type ? recvTraceTable[n].type : ""); } void @@ -142,8 +140,8 @@ dump_sndtrace_table() int n; printf("\n"); for (n = 0; n < MAX_TENTRIES; ++n) - printf("[%d] seqno=%d, op=%d type=%s\n", n, sndTraceTable[n].seqno, - sndTraceTable[n].op, sndTraceTable[n].type ? sndTraceTable[n].type : ""); + printf("[%d] seqno=%d, op=%d type=%s\n", n, sndTraceTable[n].seqno, sndTraceTable[n].op, + sndTraceTable[n].type ? sndTraceTable[n].type : ""); } /**********************************************************************/ @@ -165,61 +163,50 @@ dump_sndtrace_table() class ClusterVConnectionCache { public: - ClusterVConnectionCache() - { - memset(hash_event, 0, sizeof(hash_event)); - } + ClusterVConnectionCache() { memset(hash_event, 0, sizeof(hash_event)); } void init(); - int MD5ToIndex(INK_MD5 * p); + int MD5ToIndex(INK_MD5 *p); int insert(INK_MD5 *, ClusterVConnection *); ClusterVConnection *lookup(INK_MD5 *); public: - struct Entry - { + struct Entry { LINK(Entry, link); bool mark_for_delete; INK_MD5 key; ClusterVConnection *vc; - Entry():mark_for_delete(0), vc(0) - { - } - ~Entry() - { - } + Entry() : mark_for_delete(0), vc(0) {} + ~Entry() {} }; - enum - { MAX_TABLE_ENTRIES = 256, // must be power of 2 - SCAN_INTERVAL = 10 // seconds + enum { + MAX_TABLE_ENTRIES = 256, // must be power of 2 + SCAN_INTERVAL = 10 // seconds }; Queue<Entry> hash_table[MAX_TABLE_ENTRIES]; Ptr<ProxyMutex> hash_lock[MAX_TABLE_ENTRIES]; Event *hash_event[MAX_TABLE_ENTRIES]; }; -static ClassAllocator < - ClusterVConnectionCache::Entry > -ClusterVCCacheEntryAlloc("ClusterVConnectionCache::Entry"); +static ClassAllocator<ClusterVConnectionCache::Entry> ClusterVCCacheEntryAlloc("ClusterVConnectionCache::Entry"); ClusterVConnectionCache *GlobalOpenWriteVCcache = 0; ///////////////////////////////////////////////////////////////// // Perform periodic purges of ClusterVConnectionCache entries ///////////////////////////////////////////////////////////////// -class ClusterVConnectionCacheEvent:public Continuation +class ClusterVConnectionCacheEvent : public Continuation { public: - ClusterVConnectionCacheEvent(ClusterVConnectionCache * c, int n) - : Continuation(new_ProxyMutex()), cache(c), hash_index(n) + ClusterVConnectionCacheEvent(ClusterVConnectionCache *c, int n) : Continuation(new_ProxyMutex()), cache(c), hash_index(n) { SET_HANDLER(&ClusterVConnectionCacheEvent::eventHandler); } int eventHandler(int, Event *); private: - ClusterVConnectionCache * cache; + ClusterVConnectionCache *cache; int hash_index; }; @@ -236,12 +223,11 @@ ClusterVConnectionCache::init() // Setup up periodic purge events on each hash list eh = new ClusterVConnectionCacheEvent(this, n); - hash_event[n] = - eventProcessor.schedule_in(eh, HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM); + hash_event[n] = eventProcessor.schedule_in(eh, HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM); } } inline int -ClusterVConnectionCache::MD5ToIndex(INK_MD5 * p) +ClusterVConnectionCache::MD5ToIndex(INK_MD5 *p) { uint64_t i = p->fold(); int32_t h, l; @@ -252,7 +238,7 @@ ClusterVConnectionCache::MD5ToIndex(INK_MD5 * p) } int -ClusterVConnectionCache::insert(INK_MD5 * key, ClusterVConnection * vc) +ClusterVConnectionCache::insert(INK_MD5 *key, ClusterVConnection *vc) { int index = MD5ToIndex(key); Entry *e; @@ -262,7 +248,7 @@ ClusterVConnectionCache::insert(INK_MD5 * key, ClusterVConnection * vc) MUTEX_TRY_LOCK(lock, hash_lock[index], thread); if (!lock.is_locked()) { CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERT_LOCK_MISSES_STAT); - return 0; // lock miss, retry later + return 0; // lock miss, retry later } else { // Add entry to list @@ -273,11 +259,11 @@ ClusterVConnectionCache::insert(INK_MD5 * key, ClusterVConnection * vc) hash_table[index].enqueue(e); CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERTS_STAT); } - return 1; // Success + return 1; // Success } ClusterVConnection * -ClusterVConnectionCache::lookup(INK_MD5 * key) +ClusterVConnectionCache::lookup(INK_MD5 *key) { int index = MD5ToIndex(key); Entry *e; @@ -288,12 +274,12 @@ ClusterVConnectionCache::lookup(INK_MD5 * key) MUTEX_TRY_LOCK(lock, hash_lock[index], thread); if (!lock.is_locked()) { CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_LOCK_MISSES_STAT); - return vc; // lock miss, retry later + return vc; // lock miss, retry later } else { e = hash_table[index].head; while (e) { - if (*key == e->key) { // Hit + if (*key == e->key) { // Hit vc = e->vc; hash_table[index].remove(e); ClusterVCCacheEntryAlloc.free(e); @@ -306,11 +292,11 @@ ClusterVConnectionCache::lookup(INK_MD5 * key) } } CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_MISSES_STAT); - return (ClusterVConnection *) - 1; // Miss + return (ClusterVConnection *)-1; // Miss } int -ClusterVConnectionCacheEvent::eventHandler(int /* event ATS_UNUSED */, Event * e) +ClusterVConnectionCacheEvent::eventHandler(int /* event ATS_UNUSED */, Event *e) { CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_SCANS_STAT); MUTEX_TRY_LOCK(lock, cache->hash_lock[hash_index], this_ethread()); @@ -321,8 +307,8 @@ ClusterVConnectionCacheEvent::eventHandler(int /* event ATS_UNUSED */, Event * e } // Perform purge action on unreferenced VC(s). - ClusterVConnectionCache::Entry * entry; - ClusterVConnectionCache::Entry * next_entry; + ClusterVConnectionCache::Entry *entry; + ClusterVConnectionCache::Entry *next_entry; entry = cache->hash_table[hash_index].head; while (entry) { @@ -372,8 +358,8 @@ CacheContinuation::init() // Main function to do a cluster cache operation /////////////////////////////////////////////////////////////////////// Action * -CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args, - int user_opcode, char *data, int data_len, int nbytes, MIOBuffer * b) +CacheContinuation::do_op(Continuation *c, ClusterMachine *mp, void *args, int user_opcode, char *data, int data_len, int nbytes, + MIOBuffer *b) { CacheContinuation *cc = 0; Action *act = 0; @@ -410,8 +396,7 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args, cc->start_time = ink_get_hrtime(); cc->from = mp; cc->result = op_failure(opcode); - SET_CONTINUATION_HANDLER(cc, (CacheContHandler) - & CacheContinuation::remoteOpEvent); + SET_CONTINUATION_HANDLER(cc, (CacheContHandler)&CacheContinuation::remoteOpEvent); act = &cc->action; // set up sequence number so we can find this continuation @@ -424,7 +409,6 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args, unsigned int hash = FOLDHASH(cc->target_ip, cc->seq_number); MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread()); if (!queuelock.is_locked()) { - // failed to acquire lock: no problem, retry later cc->timeout = eventProcessor.schedule_in(cc, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM); } else { @@ -437,176 +421,169 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args, // Determine the type of the "Over The Wire" (OTW) message header and // initialize it. // - Debug("cache_msg", - "do_op opcode=%d seqno=%d Machine=%p data=%p datalen=%d mio=%p", - opcode, (c ? cc->seq_number : CACHE_NO_RESPONSE), mp, data, data_len, b); + Debug("cache_msg", "do_op opcode=%d seqno=%d Machine=%p data=%p datalen=%d mio=%p", opcode, + (c ? cc->seq_number : CACHE_NO_RESPONSE), mp, data, data_len, b); switch (opcode) { case CACHE_OPEN_WRITE_BUFFER: - case CACHE_OPEN_WRITE_BUFFER_LONG: - { - ink_release_assert(!"write buffer not supported"); - break; - } + case CACHE_OPEN_WRITE_BUFFER_LONG: { + ink_release_assert(!"write buffer not supported"); + break; + } case CACHE_OPEN_READ_BUFFER: - case CACHE_OPEN_READ_BUFFER_LONG: - { - ink_release_assert(!"read buffer not supported"); - break; - } + case CACHE_OPEN_READ_BUFFER_LONG: { + ink_release_assert(!"read buffer not supported"); + break; + } case CACHE_OPEN_WRITE: - case CACHE_OPEN_READ: - { - ink_release_assert(c > 0); - ////////////////////// - // Use short format // - ////////////////////// - if (!data) { - data_len = op_to_sizeof_fixedlen_msg(opcode); - data = (char *) ALLOCA_DOUBLE(data_len); - } - msg = (char *) data; - CacheOpMsg_short *m = (CacheOpMsg_short *) msg; - m->init(); - m->opcode = opcode; - m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags; - m->md5 = *((CacheOpArgs_General *) args)->url_md5; - cc->url_md5 = m->md5; - m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE); - m->frag_type = ((CacheOpArgs_General *) args)->frag_type; - if (opcode == CACHE_OPEN_WRITE) { - m->nbytes = nbytes; - m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache; - } else { - m->nbytes = 0; - m->data = 0; - } - - if (opcode == CACHE_OPEN_READ) { - // - // Set upper limit on initial data received with response - // for open read response - // - m->buffer_size = DEFAULT_MAX_BUFFER_SIZE; - } else { - m->buffer_size = 0; - } + case CACHE_OPEN_READ: { + ink_release_assert(c > 0); + ////////////////////// + // Use short format // + ////////////////////// + if (!data) { + data_len = op_to_sizeof_fixedlen_msg(opcode); + data = (char *)ALLOCA_DOUBLE(data_len); + } + msg = (char *)data; + CacheOpMsg_short *m = (CacheOpMsg_short *)msg; + m->init(); + m->opcode = opcode; + m->cfl_flags = ((CacheOpArgs_General *)args)->cfl_flags; + m->md5 = *((CacheOpArgs_General *)args)->url_md5; + cc->url_md5 = m->md5; + m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE); + m->frag_type = ((CacheOpArgs_General *)args)->frag_type; + if (opcode == CACHE_OPEN_WRITE) { + m->nbytes = nbytes; + m->data = (uint32_t)((CacheOpArgs_General *)args)->pin_in_cache; + } else { + m->nbytes = 0; + m->data = 0; + } + if (opcode == CACHE_OPEN_READ) { // - // Establish the local VC + // Set upper limit on initial data received with response + // for open read response // - int res = setup_local_vc(msg, data_len, cc, mp, &act); - if (!res) { - ///////////////////////////////////////////////////// - // Unable to setup local VC, request aborted. - // Remove request from pending list and deallocate. - ///////////////////////////////////////////////////// - cc->remove_and_delete(0, (Event *) 0); - return act; - - } else if (res != -1) { - /////////////////////////////////////// - // VC established, send request - /////////////////////////////////////// - break; + m->buffer_size = DEFAULT_MAX_BUFFER_SIZE; + } else { + m->buffer_size = 0; + } - } else { - ////////////////////////////////////////////////////// - // Unable to setup VC, delay required, await callback - ////////////////////////////////////////////////////// - goto no_send_exit; - } + // + // Establish the local VC + // + int res = setup_local_vc(msg, data_len, cc, mp, &act); + if (!res) { + ///////////////////////////////////////////////////// + // Unable to setup local VC, request aborted. + // Remove request from pending list and deallocate. + ///////////////////////////////////////////////////// + cc->remove_and_delete(0, (Event *)0); + return act; + + } else if (res != -1) { + /////////////////////////////////////// + // VC established, send request + /////////////////////////////////////// + break; + + } else { + ////////////////////////////////////////////////////// + // Unable to setup VC, delay required, await callback + ////////////////////////////////////////////////////// + goto no_send_exit; } + } case CACHE_OPEN_READ_LONG: - case CACHE_OPEN_WRITE_LONG: - { - ink_release_assert(c > 0); - ////////////////////// - // Use long format // - ////////////////////// - msg = data; - CacheOpMsg_long *m = (CacheOpMsg_long *) msg; - m->init(); - m->opcode = opcode; - m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags; - m->url_md5 = *((CacheOpArgs_General *) args)->url_md5; - cc->url_md5 = m->url_md5; - m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE); - m->nbytes = nbytes; - m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache; - m->frag_type = (uint32_t) ((CacheOpArgs_General *) args)->frag_type; - - if (opcode == CACHE_OPEN_READ_LONG) { - // - // Set upper limit on initial data received with response - // for open read response - // - m->buffer_size = DEFAULT_MAX_BUFFER_SIZE; - } else { - m->buffer_size = 0; - } + case CACHE_OPEN_WRITE_LONG: { + ink_release_assert(c > 0); + ////////////////////// + // Use long format // + ////////////////////// + msg = data; + CacheOpMsg_long *m = (CacheOpMsg_long *)msg; + m->init(); + m->opcode = opcode; + m->cfl_flags = ((CacheOpArgs_General *)args)->cfl_flags; + m->url_md5 = *((CacheOpArgs_General *)args)->url_md5; + cc->url_md5 = m->url_md5; + m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE); + m->nbytes = nbytes; + m->data = (uint32_t)((CacheOpArgs_General *)args)->pin_in_cache; + m->frag_type = (uint32_t)((CacheOpArgs_General *)args)->frag_type; + + if (opcode == CACHE_OPEN_READ_LONG) { // - // Establish the local VC + // Set upper limit on initial data received with response + // for open read response // - int res = setup_local_vc(msg, data_len, cc, mp, &act); - if (!res) { - ///////////////////////////////////////////////////// - // Unable to setup local VC, request aborted. - // Remove request from pending list and deallocate. - ///////////////////////////////////////////////////// - cc->remove_and_delete(0, (Event *) 0); - return act; - - } else if (res != -1) { - /////////////////////////////////////// - // VC established, send request - /////////////////////////////////////// - break; + m->buffer_size = DEFAULT_MAX_BUFFER_SIZE; + } else { + m->buffer_size = 0; + } + // + // Establish the local VC + // + int res = setup_local_vc(msg, data_len, cc, mp, &act); + if (!res) { + ///////////////////////////////////////////////////// + // Unable to setup local VC, request aborted. + // Remove request from pending list and deallocate. + ///////////////////////////////////////////////////// + cc->remove_and_delete(0, (Event *)0); + return act; - } else { - ////////////////////////////////////////////////////// - // Unable to setup VC, delay required, await callback - ////////////////////////////////////////////////////// - goto no_send_exit; - } + } else if (res != -1) { + /////////////////////////////////////// + // VC established, send request + /////////////////////////////////////// + break; + + } else { + ////////////////////////////////////////////////////// + // Unable to setup VC, delay required, await callback + ////////////////////////////////////////////////////// + goto no_send_exit; } + } case CACHE_UPDATE: case CACHE_REMOVE: - case CACHE_DEREF: - { - ////////////////////// - // Use short format // - ////////////////////// - msg = data; - CacheOpMsg_short *m = (CacheOpMsg_short *) msg; - m->init(); - m->opcode = opcode; - m->frag_type = ((CacheOpArgs_Deref *) args)->frag_type; - m->cfl_flags = ((CacheOpArgs_Deref *) args)->cfl_flags; - if (opcode == CACHE_DEREF) - m->md5 = *((CacheOpArgs_Deref *) args)->md5; - else - m->md5 = *((CacheOpArgs_General *) args)->url_md5; - m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE); - break; - } - case CACHE_LINK: - { - //////////////////////// - // Use short_2 format // - //////////////////////// - msg = data; - CacheOpMsg_short_2 *m = (CacheOpMsg_short_2 *) msg; - m->init(); - m->opcode = opcode; - m->cfl_flags = ((CacheOpArgs_Link *) args)->cfl_flags; - m->md5_1 = *((CacheOpArgs_Link *) args)->from; - m->md5_2 = *((CacheOpArgs_Link *) args)->to; - m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE); - m->frag_type = ((CacheOpArgs_Link *) args)->frag_type; - break; - } + case CACHE_DEREF: { + ////////////////////// + // Use short format // + ////////////////////// + msg = data; + CacheOpMsg_short *m = (CacheOpMsg_short *)msg; + m->init(); + m->opcode = opcode; + m->frag_type = ((CacheOpArgs_Deref *)args)->frag_type; + m->cfl_flags = ((CacheOpArgs_Deref *)args)->cfl_flags; + if (opcode == CACHE_DEREF) + m->md5 = *((CacheOpArgs_Deref *)args)->md5; + else + m->md5 = *((CacheOpArgs_General *)args)->url_md5; + m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE); + break; + } + case CACHE_LINK: { + //////////////////////// + // Use short_2 format // + //////////////////////// + msg = data; + CacheOpMsg_short_2 *m = (CacheOpMsg_short_2 *)msg; + m->init(); + m->opcode = opcode; + m->cfl_flags = ((CacheOpArgs_Link *)args)->cfl_flags; + m->md5_1 = *((CacheOpArgs_Link *)args)->from; + m->md5_2 = *((CacheOpArgs_Link *)args)->to; + m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE); + m->frag_type = ((CacheOpArgs_Link *)args)->frag_type; + break; + } default: msg = 0; break; @@ -614,20 +591,19 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args, #ifdef CACHE_MSG_TRACE log_cache_op_sndmsg((c ? cc->seq_number : CACHE_NO_RESPONSE), 0, "do_op"); #endif - clusterProcessor.invoke_remote(ch, - op_needs_marshalled_coi(opcode) ? CACHE_OP_MALLOCED_CLUSTER_FUNCTION - : CACHE_OP_CLUSTER_FUNCTION, (char *) msg, data_len); + clusterProcessor.invoke_remote( + ch, op_needs_marshalled_coi(opcode) ? CACHE_OP_MALLOCED_CLUSTER_FUNCTION : CACHE_OP_CLUSTER_FUNCTION, (char *)msg, data_len); no_send_exit: if (c) { return act; } else { - return (Action *) 0; + return (Action *)0; } } int -CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation * cc, ClusterMachine * mp, Action ** act) +CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation *cc, ClusterMachine *mp, Action **act) { bool read_op = op_is_read(cc->request_opcode); bool short_msg = op_is_shortform(cc->request_opcode); @@ -637,13 +613,12 @@ CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation * cc->allocMsgBuffer(); memcpy(cc->getMsgBuffer(), data, data_len); - SET_CONTINUATION_HANDLER(cc, (CacheContHandler) - & CacheContinuation::localVCsetupEvent); + SET_CONTINUATION_HANDLER(cc, (CacheContHandler)&CacheContinuation::localVCsetupEvent); if (short_msg) { - Debug("cache_proto", "open_local-s (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number); + Debug("cache_proto", "open_local-s (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_short *)data)->seq_number); } else { - Debug("cache_proto", "open_local-l (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number); + Debug("cache_proto", "open_local-l (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_long *)data)->seq_number); } // Create local VC @@ -655,17 +630,14 @@ CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation * } else { vc = clusterProcessor.open_local(cc, mp, cc->open_local_token, - (CLUSTER_OPT_ALLOW_IMMEDIATE | - (read_op ? CLUSTER_OPT_CONN_READ : CLUSTER_OPT_CONN_WRITE))); + (CLUSTER_OPT_ALLOW_IMMEDIATE | (read_op ? CLUSTER_OPT_CONN_READ : CLUSTER_OPT_CONN_WRITE))); } if (!vc) { // Error, abort request if (short_msg) { - Debug("cache_proto", "0open_local-s (%s) failed, seqno=%d", - (read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number); + Debug("cache_proto", "0open_local-s (%s) failed, seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_short *)data)->seq_number); } else { - Debug("cache_proto", "1open_local-l (%s) failed, seqno=%d", - (read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number); + Debug("cache_proto", "1open_local-l (%s) failed, seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_long *)data)->seq_number); } cc->freeMsgBuffer(); if (cc->timeout) @@ -687,23 +659,20 @@ CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation * vc->current_cont = cc; if (short_msg) { - CacheOpMsg_short *ms = (CacheOpMsg_short *) data; + CacheOpMsg_short *ms = (CacheOpMsg_short *)data; ms->channel = vc->channel; ms->token = cc->open_local_token; - Debug("cache_proto", - "0open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p", - (read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc); + Debug("cache_proto", "0open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p", (read_op ? "R" : "W"), ms->seq_number, + vc->channel, ms->token.ip_created, ms->token.sequence_number, vc); } else { - CacheOpMsg_long *ml = (CacheOpMsg_long *) data; + CacheOpMsg_long *ml = (CacheOpMsg_long *)data; ml->channel = vc->channel; ml->token = cc->open_local_token; - Debug("cache_proto", - "1open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p", - (read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc); + Debug("cache_proto", "1open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p", (read_op ? "R" : "W"), ml->seq_number, + vc->channel, ml->token.ip_created, ml->token.sequence_number, vc); } cc->freeMsgBuffer(); - SET_CONTINUATION_HANDLER(cc, (CacheContHandler) - & CacheContinuation::remoteOpEvent); + SET_CONTINUATION_HANDLER(cc, (CacheContHandler)&CacheContinuation::remoteOpEvent); return 1; } else { @@ -723,14 +692,13 @@ CacheContinuation::lookupOpenWriteVC() // failed. /////////////////////////////////////////////////////////////// ClusterVConnection *vc; - CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer(); + CacheOpMsg_long *ml = (CacheOpMsg_long *)getMsgBuffer(); vc = GlobalOpenWriteVCcache->lookup(&ml->url_md5); - if (vc == ((ClusterVConnection *) 0)) { + if (vc == ((ClusterVConnection *)0)) { // Retry lookup - SET_CONTINUATION_HANDLER(this, (CacheContHandler) - & CacheContinuation::lookupOpenWriteVCEvent); + SET_CONTINUATION_HANDLER(this, (CacheContHandler)&CacheContinuation::lookupOpenWriteVCEvent); // // Note: In the lookupOpenWriteVCEvent handler, we use EVENT_IMMEDIATE // to distinguish the lookup retry from a request timeout @@ -738,15 +706,14 @@ CacheContinuation::lookupOpenWriteVC() // lookup_open_write_vc_event = eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM); - } else if (vc != ((ClusterVConnection *) - 1)) { + } else if (vc != ((ClusterVConnection *)-1)) { // Hit, found open_write VC in cache. // Post open_write completion by simulating a // remote cache op result message. - vc->action_ = action; // establish new continuation + vc->action_ = action; // establish new continuation - SET_CONTINUATION_HANDLER(this, (CacheContHandler) - & CacheContinuation::localVCsetupEvent); + SET_CONTINUATION_HANDLER(this, (CacheContHandler)&CacheContinuation::localVCsetupEvent); this->handleEvent(CLUSTER_EVENT_OPEN_EXISTS, vc); CacheOpReplyMsg msg; @@ -757,15 +724,13 @@ CacheContinuation::lookupOpenWriteVC() msg.seq_number = seq_number; msg.token = vc->token; - cache_op_result_ClusterFunction(ch, (void *) &msg, msglen); + cache_op_result_ClusterFunction(ch, (void *)&msg, msglen); } else { // Miss, establish local VC and send remote open_write request - SET_CONTINUATION_HANDLER(this, (CacheContHandler) - & CacheContinuation::localVCsetupEvent); - vc = clusterProcessor.open_local(this, from, open_local_token, - (CLUSTER_OPT_ALLOW_IMMEDIATE | CLUSTER_OPT_CONN_WRITE)); + SET_CONTINUATION_HANDLER(this, (CacheContHandler)&CacheContinuation::localVCsetupEvent); + vc = clusterProcessor.open_local(this, from, open_local_token, (CLUSTER_OPT_ALLOW_IMMEDIATE | CLUSTER_OPT_CONN_WRITE)); if (!vc) { this->handleEvent(CLUSTER_EVENT_OPEN_FAILED, 0); @@ -773,11 +738,11 @@ CacheContinuation::lookupOpenWriteVC() this->handleEvent(CLUSTER_EVENT_OPEN, vc); } } - return CLUSTER_DELAYED_OPEN; // force completion in callback + return CLUSTER_DELAYED_OPEN; // force completion in callback } int -CacheContinuation::lookupOpenWriteVCEvent(int event, Event * e) +CacheContinuation::lookupOpenWriteVCEvent(int event, Event *e) { if (event == EVENT_IMMEDIATE) { // Retry open_write VC lookup @@ -785,15 +750,14 @@ CacheContinuation::lookupOpenWriteVCEvent(int event, Event * e) } else { lookup_open_write_vc_event->cancel(); - SET_CONTINUATION_HANDLER(this, (CacheContHandler) - & CacheContinuation::localVCsetupEvent); + SET_CONTINUATION_HANDLER(this, (CacheContHandler)&CacheContinuation::localVCsetupEvent); this->handleEvent(event, e); } return EVENT_DONE; } int -CacheContinuation::remove_and_delete(int /* event ATS_UNUSED */, Event * e) +CacheContinuation::remove_and_delete(int /* event ATS_UNUSED */, Event *e) { unsigned int hash = FOLDHASH(target_ip, seq_number); MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread()); @@ -808,7 +772,7 @@ CacheContinuation::remove_and_delete(int /* event ATS_UNUSED */, Event * e) cacheContAllocator_free(this); } else { - SET_HANDLER((CacheContHandler) & CacheContinuation::remove_and_delete); + SET_HANDLER((CacheContHandler)&CacheContinuation::remove_and_delete); if (!e) { timeout = eventProcessor.schedule_in(this, cache_cluster_timeout, ET_CACHE_CONT_SM); } else { @@ -819,15 +783,15 @@ CacheContinuation::remove_and_delete(int /* event ATS_UNUSED */, Event * e) } int -CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc) +CacheContinuation::localVCsetupEvent(int event, ClusterVConnection *vc) { - ink_assert(magicno == (int) MagicNo); + ink_assert(magicno == (int)MagicNo); ink_assert(getMsgBuffer()); bool short_msg = op_is_shortform(request_opcode); bool read_op = op_is_read(request_opcode); if (event == EVENT_INTERVAL) { - Event *e = (Event *) vc; + Event *e = (Event *)vc; unsigned int hash = FOLDHASH(target_ip, seq_number); MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], e->ethread); @@ -854,7 +818,7 @@ CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc) MUTEX_RELEASE(queuelock); Debug("cluster_timeout", "0cluster op timeout %d", seq_number); CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT); - timeout = (Event *) 1; // Note timeout + timeout = (Event *)1; // Note timeout ///////////////////////////////////////////////////////////////// // Note: Failure callback is sent now, but the deallocation of // the CacheContinuation is deferred until we receive the @@ -865,8 +829,8 @@ CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc) return EVENT_DONE; } - } else if (((event == CLUSTER_EVENT_OPEN) || (event == CLUSTER_EVENT_OPEN_EXISTS)) - && (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0)) { + } else if (((event == CLUSTER_EVENT_OPEN) || (event == CLUSTER_EVENT_OPEN_EXISTS)) && + (((ptrdiff_t)timeout & (ptrdiff_t)1) == 0)) { ink_hrtime now; now = ink_get_hrtime(); CLUSTER_SUM_DYN_STAT(CLUSTER_OPEN_DELAY_TIME_STAT, now - start_time); @@ -880,43 +844,40 @@ CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc) vc->current_cont = this; if (short_msg) { - CacheOpMsg_short *ms = (CacheOpMsg_short *) getMsgBuffer(); + CacheOpMsg_short *ms = (CacheOpMsg_short *)getMsgBuffer(); ms->channel = vc->channel; ms->token = open_local_token; - Debug("cache_proto", - "2open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p", - (read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc); + Debug("cache_proto", "2open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p", (read_op ? "R" : "W"), ms->seq_number, + vc->channel, ms->token.ip_created, ms->token.sequence_number, vc); } else { - CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer(); + CacheOpMsg_long *ml = (CacheOpMsg_long *)getMsgBuffer(); ml->channel = vc->channel; ml->token = open_local_token; - Debug("cache_proto", - "3open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p", - (read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc); + Debug("cache_proto", "3open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p", (read_op ? "R" : "W"), ml->seq_number, + vc->channel, ml->token.ip_created, ml->token.sequence_number, vc); } - SET_HANDLER((CacheContHandler) & CacheContinuation::remoteOpEvent); + SET_HANDLER((CacheContHandler)&CacheContinuation::remoteOpEvent); if (event != CLUSTER_EVENT_OPEN_EXISTS) { // Send request message - clusterProcessor.invoke_remote(ch, - (op_needs_marshalled_coi(request_opcode) ? - CACHE_OP_MALLOCED_CLUSTER_FUNCTION : - CACHE_OP_CLUSTER_FUNCTION), (char *) getMsgBuffer(), getMsgBufferLen()); + clusterProcessor.invoke_remote( + ch, (op_needs_marshalled_coi(request_opcode) ? CACHE_OP_MALLOCED_CLUSTER_FUNCTION : CACHE_OP_CLUSTER_FUNCTION), + (char *)getMsgBuffer(), getMsgBufferLen()); } } else { int send_failure_callback = 1; - if (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0) { + if (((ptrdiff_t)timeout & (ptrdiff_t)1) == 0) { if (short_msg) { - Debug("cache_proto", "2open_local-s (%s) failed, seqno=%d", - (read_op ? "R" : "W"), ((CacheOpMsg_short *) getMsgBuffer())->seq_number); + Debug("cache_proto", "2open_local-s (%s) failed, seqno=%d", (read_op ? "R" : "W"), + ((CacheOpMsg_short *)getMsgBuffer())->seq_number); } else { - Debug("cache_proto", "3open_local-l (%s) failed, seqno=%d", - (read_op ? "R" : "W"), ((CacheOpMsg_long *) getMsgBuffer())->seq_number); + Debug("cache_proto", "3open_local-l (%s) failed, seqno=%d", (read_op ? "R" : "W"), + ((CacheOpMsg_long *)getMsgBuffer())->seq_number); } } else { @@ -927,10 +888,10 @@ CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc) if (event == CLUSTER_EVENT_OPEN) { vc->pending_remote_fill = 0; - vc->remote_closed = 1; // avoid remote close msg + vc->remote_closed = 1; // avoid remote close msg vc->do_io(VIO::CLOSE); } - send_failure_callback = 0; // already sent. + send_failure_callback = 0; // already sent. } if (this->timeout) @@ -947,7 +908,7 @@ CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc) this->use_deferred_callback = true; this->result = (read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED); this->result_error = 0; - remove_and_delete(0, (Event *) 0); + remove_and_delete(0, (Event *)0); } else { cacheContAllocator_free(this); @@ -973,29 +934,29 @@ inline CacheOpMsg_long * unmarshal_CacheOpMsg_long(void *data, int NeedByteSwap) { if (NeedByteSwap) - ((CacheOpMsg_long *) data)->SwapBytes(); - return (CacheOpMsg_long *) data; + ((CacheOpMsg_long *)data)->SwapBytes(); + return (CacheOpMsg_long *)data; } inline CacheOpMsg_short * unmarshal_CacheOpMsg_short(void *data, int NeedByteSwap) { if (NeedByteSwap) - ((CacheOpMsg_short *) data)->SwapBytes(); - return (CacheOpMsg_short *) data; + ((CacheOpMsg_short *)data)->SwapBytes(); + return (CacheOpMsg_short *)data; } inline CacheOpMsg_short_2 * unmarshal_CacheOpMsg_short_2(void *data, int NeedByteSwap) { if (NeedByteSwap) - ((CacheOpMsg_short_2 *) data)->SwapBytes(); - return (CacheOpMsg_short_2 *) data; + ((CacheOpMsg_short_2 *)data)->SwapBytes(); + return (CacheOpMsg_short_2 *)data; } // init_from_long() support routine for cache_op_ClusterFunction() inline void -init_from_long(CacheContinuation * cont, CacheOpMsg_long * msg, ClusterMachine * m) +init_from_long(CacheContinuation *cont, CacheOpMsg_long *msg, ClusterMachine *m) { cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE); cont->seq_number = msg->seq_number; @@ -1003,15 +964,14 @@ init_from_long(CacheContinuation * cont, CacheOpMsg_long * msg, ClusterMachine * cont->from = m; cont->url_md5 = msg->url_md5; cont->cluster_vc_channel = msg->channel; - cont->frag_type = (CacheFragType) msg->frag_type; - if ((cont->request_opcode == CACHE_OPEN_WRITE_LONG) - || (cont->request_opcode == CACHE_OPEN_READ_LONG)) { - cont->pin_in_cache = (time_t) msg->data; + cont->frag_type = (CacheFragType)msg->frag_type; + if ((cont->request_opcode == CACHE_OPEN_WRITE_LONG) || (cont->request_opcode == CACHE_OPEN_READ_LONG)) { + cont->pin_in_cache = (time_t)msg->data; } else { cont->pin_in_cache = 0; } cont->token = msg->token; - cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes); + cont->nbytes = (((int)msg->nbytes < 0) ? 0 : msg->nbytes); if (cont->request_opcode == CACHE_OPEN_READ_LONG) { cont->caller_buf_freebytes = msg->buffer_size; @@ -1022,7 +982,7 @@ init_from_long(CacheContinuation * cont, CacheOpMsg_long * msg, ClusterMachine * // init_from_short() support routine for cache_op_ClusterFunction() inline void -init_from_short(CacheContinuation * cont, CacheOpMsg_short * msg, ClusterMachine * m) +init_from_short(CacheContinuation *cont, CacheOpMsg_short *msg, ClusterMachine *m) { cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE); cont->seq_number = msg->seq_number; @@ -1031,11 +991,11 @@ init_from_short(CacheContinuation * cont, CacheOpMsg_short * msg, ClusterMachine cont->url_md5 = msg->md5; cont->cluster_vc_channel = msg->channel; cont->token = msg->token; - cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes); - cont->frag_type = (CacheFragType) msg->frag_type; + cont->nbytes = (((int)msg->nbytes < 0) ? 0 : msg->nbytes); + cont->frag_type = (CacheFragType)msg->frag_type; if (cont->request_opcode == CACHE_OPEN_WRITE) { - cont->pin_in_cache = (time_t) msg->data; + cont->pin_in_cache = (time_t)msg->data; } else { cont->pin_in_cache = 0; } @@ -1049,18 +1009,18 @@ init_from_short(CacheContinuation * cont, CacheOpMsg_short * msg, ClusterMachine // init_from_short_2() support routine for cache_op_ClusterFunction() inline void -init_from_short_2(CacheContinuation * cont, CacheOpMsg_short_2 * msg, ClusterMachine * m) +init_from_short_2(CacheContinuation *cont, CacheOpMsg_short_2 *msg, ClusterMachine *m) { cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE); cont->seq_number = msg->seq_number; cont->cfl_flags = msg->cfl_flags; cont->from = m; cont->url_md5 = msg->md5_1; - cont->frag_type = (CacheFragType) msg->frag_type; + cont->frag_type = (CacheFragType)msg->frag_type; } void -cache_op_ClusterFunction(ClusterHandler * ch, void *data, int len) +cache_op_ClusterFunction(ClusterHandler *ch, void *data, int len) { EThread *thread = this_ethread(); ProxyMutex *mutex = thread->mutex; @@ -1070,14 +1030,14 @@ cache_op_ClusterFunction(ClusterHandler * ch, void *data, int len) CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT); int opcode; - ClusterMessageHeader *mh = (ClusterMessageHeader *) data; + ClusterMessageHeader *mh = (ClusterMessageHeader *)data; - if (mh->GetMsgVersion() != CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) { //////////////////////////////////////////////// + if (mh->GetMsgVersion() != CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) { //////////////////////////////////////////////// // Convert from old to current message format //////////////////////////////////////////////// ink_release_assert(!"cache_op_ClusterFunction() bad msg version"); } - opcode = ((CacheOpMsg_long *) data)->opcode; + opcode = ((CacheOpMsg_long *)data)->opcode; // If necessary, create a continuation to reflect the response back @@ -1088,8 +1048,7 @@ cache_op_ClusterFunction(ClusterHandler * ch, void *data, int len) c->token.clear(); c->start_time = ink_get_hrtime(); c->ch = ch; - SET_CONTINUATION_HANDLER(c, (CacheContHandler) - & CacheContinuation::replyOpEvent); + SET_CONTINUATION_HANDLER(c, (CacheContHandler)&CacheContinuation::replyOpEvent); switch (opcode) { case CACHE_OPEN_WRITE_BUFFER: @@ -1102,362 +1061,320 @@ cache_op_ClusterFunction(ClusterHandler * ch, void *data, int len) ink_release_assert(!"cache_op_ClusterFunction READ_BUFFER not supported"); break; - case CACHE_OPEN_READ: - { - CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap()); - init_from_short(c, msg, ch->machine); - Debug("cache_msg", - "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); - // - // Establish the remote side of the ClusterVConnection - // - c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0, - &c->token, - c->cluster_vc_channel, - (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ)); - if (!c->write_cluster_vc) { - // Unable to setup channel, abort processing. - CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT); - Debug("chan_inuse", - "1Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d", - c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number); - - // Send cluster op failed reply - c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE); - break; + case CACHE_OPEN_READ: { + CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap()); + init_from_short(c, msg, ch->machine); + Debug("cache_msg", "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); + // + // Establish the remote side of the ClusterVConnection + // + c->write_cluster_vc = clusterProcessor.connect_local((Continuation *)0, &c->token, c->cluster_vc_channel, + (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ)); + if (!c->write_cluster_vc) { + // Unable to setup channel, abort processing. + CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT); + Debug("chan_inuse", "1Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d", c->cluster_vc_channel, + DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number); + + // Send cluster op failed reply + c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *)-ECLUSTER_CHANNEL_INUSE); + break; - } else { - c->write_cluster_vc->current_cont = c; - } - ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN); - ink_release_assert((opcode == CACHE_OPEN_READ) - || c->write_cluster_vc->pending_remote_fill); - - SET_CONTINUATION_HANDLER(c, (CacheContHandler) - & CacheContinuation::setupVCdataRead); - Debug("cache_proto", - "0read op, seqno=%d chan=%d bufsize=%d token=%d,%d", - msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number); + } else { + c->write_cluster_vc->current_cont = c; + } + ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN); + ink_release_assert((opcode == CACHE_OPEN_READ) || c->write_cluster_vc->pending_remote_fill); + + SET_CONTINUATION_HANDLER(c, (CacheContHandler)&CacheContinuation::setupVCdataRead); + Debug("cache_proto", "0read op, seqno=%d chan=%d bufsize=%d token=%d,%d", msg->seq_number, msg->channel, msg->buffer_size, + msg->token.ip_created, msg->token.sequence_number); #ifdef CACHE_MSG_TRACE - log_cache_op_msg(msg->seq_number, len, "cache_op_open_read"); + log_cache_op_msg(msg->seq_number, len, "cache_op_open_read"); #endif - CacheKey key(msg->md5); + CacheKey key(msg->md5); - char *hostname = NULL; - int host_len = len - op_to_sizeof_fixedlen_msg(opcode); - if (host_len) { - hostname = (char *) msg->moi.byte; - } - Cache *call_cache = caches[c->frag_type]; - c->cache_action = call_cache->open_read(c, &key, c->frag_type, hostname, host_len); - break; + char *hostname = NULL; + int host_len = len - op_to_sizeof_fixedlen_msg(opcode); + if (host_len) { + hostname = (char *)msg->moi.byte; } - case CACHE_OPEN_READ_LONG: - { - // Cache needs message data, copy it. - c->setMsgBufferLen(len); - c->allocMsgBuffer(); - memcpy(c->getMsgBuffer(), (char *) data, len); - - int flen = CacheOpMsg_long::sizeof_fixedlen_msg(); - CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap()); - init_from_long(c, msg, ch->machine); - Debug("cache_msg", - "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); + Cache *call_cache = caches[c->frag_type]; + c->cache_action = call_cache->open_read(c, &key, c->frag_type, hostname, host_len); + break; + } + case CACHE_OPEN_READ_LONG: { + // Cache needs message data, copy it. + c->setMsgBufferLen(len); + c->allocMsgBuffer(); + memcpy(c->getMsgBuffer(), (char *)data, len); + + int flen = CacheOpMsg_long::sizeof_fixedlen_msg(); + CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap()); + init_from_long(c, msg, ch->machine); + Debug("cache_msg", "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); #ifdef CACHE_MSG_TRACE - log_cache_op_msg(msg->seq_number, len, "cache_op_open_read_long"); + log_cache_op_msg(msg->seq_number, len, "cache_op_open_read_long"); #endif - // - // Establish the remote side of the ClusterVConnection - // - c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0, - &c->token, - c->cluster_vc_channel, - (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ)); - if (!c->write_cluster_vc) { - // Unable to setup channel, abort processing. - CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT); - Debug("chan_inuse", - "2Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d", - c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number); - - // Send cluster op failed reply - c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE); - break; + // + // Establish the remote side of the ClusterVConnection + // + c->write_cluster_vc = clusterProcessor.connect_local((Continuation *)0, &c->token, c->cluster_vc_channel, + (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ)); + if (!c->write_cluster_vc) { + // Unable to setup channel, abort processing. + CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT); + Debug("chan_inuse", "2Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d", c->cluster_vc_channel, + DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number); + + // Send cluster op failed reply + c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *)-ECLUSTER_CHANNEL_INUSE); + break; - } else { - c->write_cluster_vc->current_cont = c; - } - ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN); - ink_release_assert((opcode == CACHE_OPEN_READ_LONG) - || c->write_cluster_vc->pending_remote_fill); - - SET_CONTINUATION_HANDLER(c, (CacheContHandler) - & CacheContinuation::setupReadWriteVC); - Debug("cache_proto", - "1read op, seqno=%d chan=%d bufsize=%d token=%d,%d", - msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number); - - const char *p = (const char *) msg + flen; - int moi_len = len - flen; - int res; + } else { + c->write_cluster_vc->current_cont = c; + } + ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN); + ink_release_assert((opcode == CACHE_OPEN_READ_LONG) || c->write_cluster_vc->pending_remote_fill); - ink_assert(moi_len > 0); + SET_CONTINUATION_HANDLER(c, (CacheContHandler)&CacheContinuation::setupReadWriteVC); + Debug("cache_proto", "1read op, seqno=%d chan=%d bufsize=%d token=%d,%d", msg->seq_number, msg->channel, msg->buffer_size, + msg->token.ip_created, msg->token.sequence_number); - // Unmarshal CacheHTTPHdr - res = c->ic_request.unmarshal((char *) p, moi_len, NULL); - ink_assert(res > 0); - ink_assert(c->ic_request.valid()); - c->request_purge = c->ic_request.method_get_wksidx() == HTTP_WKSIDX_PURGE || c->ic_request.method_get_wksidx() == HTTP_WKSIDX_DELETE; - moi_len -= res; - p += res; - ink_assert(moi_len > 0); - // Unmarshal CacheLookupHttpConfig - c->ic_params = new(CacheLookupHttpConfigAllocator.alloc()) - CacheLookupHttpConfig(); - res = c->ic_params->unmarshal(&c->ic_arena, (const char *) p, moi_len); - ink_assert(res > 0); + const char *p = (const char *)msg + flen; + int moi_len = len - flen; + int res; - moi_len -= res; - p += res; + ink_assert(moi_len > 0); - CacheKey key(msg->url_md5); + // Unmarshal CacheHTTPHdr + res = c->ic_request.unmarshal((char *)p, moi_len, NULL); + ink_assert(res > 0); + ink_assert(c->ic_request.valid()); + c->request_purge = + c->ic_request.method_get_wksidx() == HTTP_WKSIDX_PURGE || c->ic_request.method_get_wksidx() == HTTP_WKSIDX_DELETE; + moi_len -= res; + p += res; + ink_assert(moi_len > 0); + // Unmarshal CacheLookupHttpConfig + c->ic_params = new (CacheLookupHttpConfigAllocator.alloc()) CacheLookupHttpConfig(); + res = c->ic_params->unmarshal(&c->ic_arena, (const char *)p, moi_len); + ink_assert(res > 0); - char *hostname = NULL; - int host_len = 0; + moi_len -= res; + p += res; - if (moi_len) { - hostname = (char *) p; - host_len = moi_len; + CacheKey key(msg->url_md5); - // Save hostname and attach it to the continuation since we may - // need it if we convert this to an open_write. + char *hostname = NULL; + int host_len = 0; - c->ic_hostname = new_IOBufferData(iobuffer_size_to_index(host_len)); - c->ic_hostname_len = host_len; + if (moi_len) { + hostname = (char *)p; + host_len = moi_len; - memcpy(c->ic_hostname->data(), hostname, host_len); - } + // Save hostname and attach it to the continuation since we may + // need it if we convert this to an open_write. - Cache *call_cache = caches[c->frag_type]; - Action *a = call_cache->open_read(c, &key, &c->ic_request, - c->ic_params, - c->frag_type, hostname, host_len); - // Get rid of purify warnings since 'c' can be freed by open_read. - if (a != ACTION_RESULT_DONE) { - c->cache_action = a; - } - break; + c->ic_hostname = new_IOBufferData(iobuffer_size_to_index(host_len)); + c->ic_hostname_len = host_len; + + memcpy(c->ic_hostname->data(), hostname, host_len); } - case CACHE_OPEN_WRITE: - { - CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap()); - init_from_short(c, msg, ch->machine); - Debug("cache_msg", - "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); + + Cache *call_cache = caches[c->frag_type]; + Action *a = call_cache->open_read(c, &key, &c->ic_request, c->ic_params, c->frag_type, hostname, host_len); + // Get rid of purify warnings since 'c' can be freed by open_read. + if (a != ACTION_RESULT_DONE) { + c->cache_action = a; + } + break; + } + case CACHE_OPEN_WRITE: { + CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap()); + init_from_short(c, msg, ch->machine); + Debug("cache_msg", "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); #ifdef CACHE_MSG_TRACE - log_cache_op_msg(msg->seq_number, len, "cache_op_open_write"); + log_cache_op_msg(msg->seq_number, len, "cache_op_open_write"); #endif - // - // Establish the remote side of the ClusterVConnection - // - c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0, - &c->token, - c->cluster_vc_channel, - (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE)); - if (!c->read_cluster_vc) { - // Unable to setup channel, abort processing. - CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT); - Debug("chan_inuse", - "3Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d", - c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number); - - // Send cluster op failed reply - c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE); - break; + // + // Establish the remote side of the ClusterVConnection + // + c->read_cluster_vc = clusterProcessor.connect_local((Continuation *)0, &c->token, c->cluster_vc_channel, + (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE)); + if (!c->read_cluster_vc) { + // Unable to setup channel, abort processing. + CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT); + Debug("chan_inuse", "3Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d", c->cluster_vc_channel, + DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number); + + // Send cluster op failed reply + c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *)-ECLUSTER_CHANNEL_INUSE); + break; - } else { - c->read_cluster_vc->current_cont = c; - } - ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN); + } else { + c->read_cluster_vc->current_cont = c; + } + ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN); - CacheKey key(msg->md5); + CacheKey key(msg->md5); - char *hostname = NULL; - int host_len = len - op_to_sizeof_fixedlen_msg(opcode); - if (host_len) { - hostname = (char *) msg->moi.byte; - } + char *hostname = NULL; + int host_len = len - op_to_sizeof_fixedlen_msg(opcode); + if (host_len) { + hostname = (char *)msg->moi.byte; + } - Cache *call_cache = caches[c->frag_type]; - Action *a = call_cache->open_write(c, &key, c->frag_type, - !!(c->cfl_flags & CFL_OVERWRITE_ON_WRITE), - c->pin_in_cache, hostname, host_len); - if (a != ACTION_RESULT_DONE) { - c->cache_action = a; - } - break; + Cache *call_cache = caches[c->frag_type]; + Action *a = + call_cache->open_write(c, &key, c->frag_type, !!(c->cfl_flags & CFL_OVERWRITE_ON_WRITE), c->pin_in_cache, hostname, host_len); + if (a != ACTION_RESULT_DONE) { + c->cache_action = a; } - case CACHE_OPEN_WRITE_LONG: - { - // Cache needs message data, copy it. - c->setMsgBufferLen(len); - c->allocMsgBuffer(); - memcpy(c->getMsgBuffer(), (char *) data, len); - - int flen = CacheOpMsg_long::sizeof_fixedlen_msg(); - CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap()); - init_from_long(c, msg, ch->machine); - Debug("cache_msg", - "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); + break; + } + case CACHE_OPEN_WRITE_LONG: { + // Cache needs message data, copy it. + c->setMsgBufferLen(len); + c->allocMsgBuffer(); + memcpy(c->getMsgBuffer(), (char *)data, len); + + int flen = CacheOpMsg_long::sizeof_fixedlen_msg(); + CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap()); + init_from_long(c, msg, ch->machine); + Debug("cache_msg", "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); #ifdef CACHE_MSG_TRACE - log_cache_op_msg(msg->seq_number, len, "cache_op_open_write_long"); + log_cache_op_msg(msg->seq_number, len, "cache_op_open_write_long"); #endif - // - // Establish the remote side of the ClusterVConnection - // - c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0, - &c->token, - c->cluster_vc_channel, - (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE)); - if (!c->read_cluster_vc) { - // Unable to setup channel, abort processing. - CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT); - Debug("chan_inuse", - "4Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d", - c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number); - - // Send cluster op failed reply - c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE); - break; - - } else { - c->read_cluster_vc->current_cont = c; - } - ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN); + // + // Establish the remote side of the ClusterVConnection + // + c->read_cluster_vc = clusterProcessor.connect_local((Continuation *)0, &c->token, c->cluster_vc_channel, + (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE)); + if (!c->read_cluster_vc) { + // Unable to setup channel, abort processing. + CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT); + Debug("chan_inuse", "4Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d", c->cluster_vc_channel, + DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number); + + // Send cluster op failed reply + c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *)-ECLUSTER_CHANNEL_INUSE); + break; - CacheHTTPInfo *ci = 0; - const char *p = (const char *) msg + flen; - int res = 0; - int moi_len = len - flen; + } else { + c->read_cluster_vc->current_cont = c; + } + ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN); - if (moi_len && c->cfl_flags & CFL_LOPENWRITE_HAVE_OLDINFO) { + CacheHTTPInfo *ci = 0; + const char *p = (const char *)msg + flen; + int res = 0; + int moi_len = len - flen; - // Unmarshal old CacheHTTPInfo - res = HTTPInfo::unmarshal((char *) p, moi_len, NULL); - ink_assert(res > 0); - c->ic_old_info.get_handle((char *) p, moi_len); - ink_assert(c->ic_old_info.valid()); - ci = &c->ic_old_info; - } - if (c->cfl_flags & CFL_ALLOW_MULTIPLE_WRITES) { - ink_assert(!ci); - ci = (CacheHTTPInfo *) CACHE_ALLOW_MULTIPLE_WRITES; - } - moi_len -= res; - p += res; + if (moi_len && c->cfl_flags & CFL_LOPENWRITE_HAVE_OLDINFO) { + // Unmarshal old CacheHTTPInfo + res = HTTPInfo::unmarshal((char *)p, moi_len, NULL); + ink_assert(res > 0); + c->ic_old_info.get_handle((char *)p, moi_len); + ink_assert(c->ic_old_info.valid()); + ci = &c->ic_old_info; + } + if (c->cfl_flags & CFL_ALLOW_MULTIPLE_WRITES) { + ink_assert(!ci); + ci = (CacheHTTPInfo *)CACHE_ALLOW_MULTIPLE_WRITES; + } + moi_len -= res; + p += res; - CacheKey key(msg->url_md5); - char *hostname = NULL; + CacheKey key(msg->url_md5); + char *hostname = NULL; - if (moi_len) { - hostname = (char *) p; - } + if (moi_len) { + hostname = (char *)p; + } - Cache *call_cache = caches[c->frag_type]; - Action *a = call_cache->open_write(c, &key, ci, c->pin_in_cache, - NULL, c->frag_type, hostname, moi_len); - if (a != ACTION_RESULT_DONE) { - c->cache_action = a; - } - break; + Cache *call_cache = caches[c->frag_type]; + Action *a = call_cache->open_write(c, &key, ci, c->pin_in_cache, NULL, c->frag_type, hostname, moi_len); + if (a != ACTION_RESULT_DONE) { + c->cache_action = a; } - case CACHE_REMOVE: - { - CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap()); - init_from_short(c, msg, ch->machine); - Debug("cache_msg", - "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); + break; + } + case CACHE_REMOVE: { + CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap()); + init_from_short(c, msg, ch->machine); + Debug("cache_msg", "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); #ifdef CACHE_MSG_TRACE - log_cache_op_msg(msg->seq_number, len, "cache_op_remove"); + log_cache_op_msg(msg->seq_number, len, "cache_op_remove"); #endif - CacheKey key(msg->md5); + CacheKey key(msg->md5); - char *hostname = NULL; - int host_len = len - op_to_sizeof_fixedlen_msg(opcode); - if (host_len) { - hostname = (char *) msg->moi.byte; - } + char *hostname = NULL; + int host_len = len - op_to_sizeof_fixedlen_msg(opcode); + if (host_len) { + hostname = (char *)msg->moi.byte; + } - Cache *call_cache = caches[c->frag_type]; - Action *a = call_cache->remove(c, &key, c->frag_type, - !!(c->cfl_flags & CFL_REMOVE_USER_AGENTS), - !!(c->cfl_flags & CFL_REMOVE_LINK), - hostname, host_len); - if (a != ACTION_RESULT_DONE) { - c->cache_action = a; - } - break; + Cache *call_cache = caches[c->frag_type]; + Action *a = call_cache->remove(c, &key, c->frag_type, !!(c->cfl_flags & CFL_REMOVE_USER_AGENTS), + !!(c->cfl_flags & CFL_REMOVE_LINK), hostname, host_len); + if (a != ACTION_RESULT_DONE) { + c->cache_action = a; } - case CACHE_LINK: - { - CacheOpMsg_short_2 *msg = unmarshal_CacheOpMsg_short_2(data, mh->NeedByteSwap()); - init_from_short_2(c, msg, ch->machine); - Debug("cache_msg", - "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); + break; + } + case CACHE_LINK: { + CacheOpMsg_short_2 *msg = unmarshal_CacheOpMsg_short_2(data, mh->NeedByteSwap()); + init_from_short_2(c, msg, ch->machine); + Debug("cache_msg", "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); #ifdef CACHE_MSG_TRACE - log_cache_op_msg(msg->seq_number, len, "cache_op_link"); + log_cache_op_msg(msg->seq_number, len, "cache_op_link"); #endif - CacheKey key1(msg->md5_1); - CacheKey key2(msg->md5_2); + CacheKey key1(msg->md5_1); + CacheKey key2(msg->md5_2); - char *hostname = NULL; - int host_len = len - op_to_sizeof_fixedlen_msg(opcode); - if (host_len) { - hostname = (char *) msg->moi.byte; - } + char *hostname = NULL; + int host_len = len - op_to_sizeof_fixedlen_msg(opcode); + if (host_len) { + hostname = (char *)msg->moi.byte; + } - Cache *call_cache = caches[c->frag_type]; - Action *a = call_cache->link(c, &key1, &key2, c->frag_type, - hostname, host_len); - if (a != ACTION_RESULT_DONE) { - c->cache_action = a; - } - break; + Cache *call_cache = caches[c->frag_type]; + Action *a = call_cache->link(c, &key1, &key2, c->frag_type, hostname, host_len); + if (a != ACTION_RESULT_DONE) { + c->cache_action = a; } - case CACHE_DEREF: - { - CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap()); - init_from_short(c, msg, ch->machine); - Debug("cache_msg", - "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); + break; + } + case CACHE_DEREF: { + CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap()); + init_from_short(c, msg, ch->machine); + Debug("cache_msg", "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine); #ifdef CACHE_MSG_TRACE - log_cache_op_msg(msg->seq_number, len, "cache_op_deref"); + log_cache_op_msg(msg->seq_number, len, "cache_op_deref"); #endif - CacheKey key(msg->md5); + CacheKey key(msg->md5); - char *hostname = NULL; - int host_len = len - op_to_sizeof_fixedlen_msg(opcode); - if (host_len) { - hostname = (char *) msg->moi.byte; - } - - Cache *call_cache = caches[c->frag_type]; - Action *a = call_cache->deref(c, &key, c->frag_type, - hostname, host_len); - if (a != ACTION_RESULT_DONE) { - c->cache_action = a; - } - break; + char *hostname = NULL; + int host_len = len - op_to_sizeof_fixedlen_msg(opcode); + if (host_len) { + hostname = (char *)msg->moi.byte; } - default: - { - ink_release_assert(0); + Cache *call_cache = caches[c->frag_type]; + Action *a = call_cache->deref(c, &key, c->frag_type, hostname, host_len); + if (a != ACTION_RESULT_DONE) { + c->cache_action = a; } - } // End of switch + break; + } + + default: { + ink_release_assert(0); + } + } // End of switch } void @@ -1465,13 +1382,13 @@ cache_op_malloc_ClusterFunction(ClusterHandler *ch, void *data, int len) { cache_op_ClusterFunction(ch, data, len); // We own the message data, free it back to the Cluster subsystem - clusterProcessor.free_remote_data((char *) data, len); + clusterProcessor.free_remote_data((char *)data, len); } int -CacheContinuation::setupVCdataRead(int event, VConnection * vc) +CacheContinuation::setupVCdataRead(int event, VConnection *vc) { - ink_assert(magicno == (int) MagicNo); + ink_assert(magicno == (int)MagicNo); // // Setup the initial data read for the given Cache VC. // This data is sent back in the response message. @@ -1482,27 +1399,27 @@ CacheContinuation::setupVCdataRead(int event, VConnection * vc) ////////////////////////////////////////// Debug("cache_proto", "setupVCdataRead CACHE_EVENT_OPEN_READ seqno=%d", seq_number); ink_release_assert(caller_buf_freebytes); - SET_HANDLER((CacheContHandler) & CacheContinuation::VCdataRead); + SET_HANDLER((CacheContHandler)&CacheContinuation::VCdataRead); int64_t size_index = iobuffer_size_to_index(caller_buf_freebytes); MIOBuffer *buf = new_MIOBuffer(size_index); readahead_reader = buf->alloc_reader(); - MUTEX_TRY_LOCK(lock, mutex, this_ethread()); // prevent immediate callback + MUTEX_TRY_LOCK(lock, mutex, this_ethread()); // prevent immediate callback readahead_vio = vc->do_io_read(this, caller_buf_freebytes, buf); return EVENT_DONE; } else { // Error case, deflect processing to replyOpEvent. - SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent); + SET_HANDLER((CacheContHandler)&CacheContinuation::replyOpEvent); return handleEvent(event, vc); } } int -CacheContinuation::VCdataRead(int event, VIO * target_vio) +CacheContinuation::VCdataRead(int event, VIO *target_vio) { - ink_release_assert(magicno == (int) MagicNo); + ink_release_assert(magicno == (int)MagicNo); ink_release_assert(readahead_vio == target_vio); VConnection *vc = target_vio->vc_server; @@ -1510,129 +1427,121 @@ CacheContinuation::VCdataRead(int event, VIO * target_vio) int32_t object_size; switch (event) { - case VC_EVENT_EOS: - { - if (!target_vio->ndone) { - // Doc with zero byte body, handle as read failure - goto read_failed; - } - // Fall through + case VC_EVENT_EOS: { + if (!target_vio->ndone) { + // Doc with zero byte body, handle as read failure + goto read_failed; } + // Fall through + } case VC_EVENT_READ_READY: - case VC_EVENT_READ_COMPLETE: - { - int clone_bytes; - int current_ndone = target_vio->ndone; + case VC_EVENT_READ_COMPLETE: { + int clone_bytes; + int current_ndone = target_vio->ndone; - ink_assert(current_ndone); - ink_assert(current_ndone <= readahead_reader->read_avail()); + ink_assert(current_ndone); + ink_assert(current_ndone <= readahead_reader->read_avail()); - object_size = getObjectSize(vc, request_opcode, &cache_vc_info); - have_all_data = ((object_size <= caller_buf_freebytes) && (object_size == current_ndone)); + object_size = getObjectSize(vc, request_opcode, &cache_vc_info); + have_all_data = ((object_size <= caller_buf_freebytes) && (object_size == current_ndone)); - // Use no more than the caller's max buffer limit + // Use no more than the caller's max buffer limit - clone_bytes = current_ndone; - if (!have_all_data) { - if (current_ndone > caller_buf_freebytes) { - clone_bytes = caller_buf_freebytes; - } + clone_bytes = current_ndone; + if (!have_all_data) { + if (current_ndone > caller_buf_freebytes) { + clone_bytes = caller_buf_freebytes; } - // Clone data - - IOBufferBlock *tail; - readahead_data = clone_IOBufferBlockList(readahead_reader->get_current_block(), - readahead_reader->start_offset, clone_bytes, &tail); + } + // Clone data - if (have_all_data) { - // Close VC, since no more data and also to avoid VC_EVENT_EOS + IOBufferBlock *tail; + readahead_data = + clone_IOBufferBlockList(readahead_reader->get_current_block(), readahead_reader->start_offset, clone_bytes, &tail); - MIOBuffer *mbuf = target_vio->buffer.writer(); - vc->do_io(VIO::CLOSE); - free_MIOBuffer(mbuf); - readahead_vio = 0; - } - SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent); - handleEvent(reply, vc); - return EVENT_CONT; - } - case VC_EVENT_ERROR: - case VC_EVENT_INACTIVITY_TIMEOUT: - case VC_EVENT_ACTIVE_TIMEOUT: - default: - { - read_failed: - // Read failed, deflect to replyOpEvent. + if (have_all_data) { + // Close VC, since no more data and also to avoid VC_EVENT_EOS - MIOBuffer * mbuf = target_vio->buffer.writer(); + MIOBuffer *mbuf = target_vio->buffer.writer(); vc->do_io(VIO::CLOSE); free_MIOBuffer(mbuf); readahead_vio = 0; - reply = CACHE_EVENT_OPEN_READ_FAILED; - - SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent); - handleEvent(reply, (VConnection *) - ECLUSTER_ORB_DATA_READ); - return EVENT_DONE; } - } // End of switch + SET_HANDLER((CacheContHandler)&CacheContinuation::replyOpEvent); + handleEvent(reply, vc); + return EVENT_CONT; + } + case VC_EVENT_ERROR: + case VC_EVENT_INACTIVITY_TIMEOUT: + case VC_EVENT_ACTIVE_TIMEOUT: + default: { + read_failed: + // Read failed, deflect to replyOpEvent. + + MIOBuffer *mbuf = target_vio->buffer.writer(); + vc->do_io(VIO::CLOSE); + free_MIOBuffer(mbuf); + readahead_vio = 0; + reply = CACHE_EVENT_OPEN_READ_FAILED; + + SET_HANDLER((CacheContHandler)&CacheContinuation::replyOpEvent); + handleEvent(reply, (VConnection *)-ECLUSTER_ORB_DATA_READ); + return EVENT_DONE; + } + } // End of switch } int -CacheContinuation::setupReadWriteVC(int event, VConnection * vc) +CacheContinuation::setupReadWriteVC(int event, VConnection *vc) { // Only handles OPEN_READ_LONG processing. switch (event) { - case CACHE_EVENT_OPEN_READ: - { - // setup readahead + case CACHE_EVENT_OPEN_READ: { + // setup readahead - SET_HANDLER((CacheContHandler) & CacheContinuation::setupVCdataRead); - return handleEvent(event, vc); - break; - } - case CACHE_EVENT_OPEN_READ_FAILED: - { - if (frag_type == CACHE_FRAG_TYPE_HTTP && !request_purge) { - // HTTP open read failed, attempt open write now to avoid an additional - // message round trip - - CacheKey key(url_md5); - - Cache *call_cache = caches[frag_type]; - Action *a = call_cache->open_write(this, &key, 0, pin_in_cache, - NULL, frag_type, ic_hostname ? ic_hostname->data() : NULL, - ic_hostname_len); - if (a != ACTION_RESULT_DONE) { - cache_action = a; - } - } else { - SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent); - return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0); + SET_HANDLER((CacheContHandler)&CacheContinuation::setupVCdataRead); + return handleEvent(event, vc); + break; + } + case CACHE_EVENT_OPEN_READ_FAILED: { + if (frag_type == CACHE_FRAG_TYPE_HTTP && !request_purge) { + // HTTP open read failed, attempt open write now to avoid an additional + // message round trip + + CacheKey key(url_md5); + + Cache *call_cache = caches[frag_type]; + Action *a = call_cache->open_write(this, &key, 0, pin_in_cache, NULL, frag_type, ic_hostname ? ic_hostname->data() : NULL, + ic_hostname_len); + if (a != ACTION_RESULT_DONE) { + cache_action = a; } - break; + } else { + SET_HANDLER((CacheContHandler)&CacheContinuation::replyOpEvent); + return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0); } - case CACHE_EVENT_OPEN_WRITE: - { - // Convert from read to write connection + break; + } + case CACHE_EVENT_OPEN_WRITE: { + // Convert from read to write connection - ink_assert(!read_cluster_vc && write_cluster_vc); - read_cluster_vc = write_cluster_vc; - read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE); - write_cluster_vc = 0; + ink_assert(!read_cluster_vc && write_cluster_vc); + read_cluster_vc = write_cluster_vc; + read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE); + write_cluster_vc = 0; - SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent); - return handleEvent(event, vc); - break; - } + SET_HANDLER((CacheContHandler)&CacheContinuation::replyOpEvent); + return handleEvent(event, vc); + break; + } case CACHE_EVENT_OPEN_WRITE_FAILED: - default: - { - SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent); - return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0); - break; - } - } // end of switch + default: { + SET_HANDLER((CacheContHandler)&CacheContinuation::replyOpEvent); + return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0); + break; + } + } // end of switch return EVENT_DONE; } @@ -1642,16 +1551,16 @@ CacheContinuation::setupReadWriteVC(int event, VConnection * vc) // Reflect the (local) reply back to the (remote) requesting node. ///////////////////////////////////////////////////////////////////////// int -CacheContinuation::replyOpEvent(int event, VConnection * cvc) +CacheContinuation::replyOpEvent(int event, VConnection *cvc) { - ink_assert(magicno == (int) MagicNo); + ink_assert(magicno == (int)MagicNo); Debug("cache_proto", "replyOpEvent(this=%p,event=%d,VC=%p)", this, event, cvc); ink_hrtime now; now = ink_get_hrtime(); CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT, now - start_time); LOG_EVENT_TIME(start_time, callback_time_dist, cache_callbacks); ink_release_assert(expect_cache_callback); - expect_cache_callback = false; // make sure we are called back exactly once + expect_cache_callback = false; // make sure we are called back exactly once result = event; @@ -1664,8 +1573,7 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc) CacheOpReplyMsg *msg = &rmsg; msg->result = event; - if ((request_opcode == CACHE_OPEN_READ_LONG) - && cvc && (event == CACHE_EVENT_OPEN_WRITE)) { + if ((request_opcode == CACHE_OPEN_READ_LONG) && cvc && (event == CACHE_EVENT_OPEN_WRITE)) { ////////////////////////////////////////////////////////////////////////// // open read failed, but open write succeeded, set result to // CACHE_EVENT_OPEN_READ_FAILED and make result token non zero to @@ -1676,17 +1584,16 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc) } msg->seq_number = seq_number; - int flen = CacheOpReplyMsg::sizeof_fixedlen_msg(); // include token + int flen = CacheOpReplyMsg::sizeof_fixedlen_msg(); // include token int len = 0; int vers = 0; int results_expected = 1; - if (no_reply_message) // CACHE_NO_RESPONSE request + if (no_reply_message) // CACHE_NO_RESPONSE request goto free_exit; if (open) { - // prepare for CACHE_OPEN_EVENT results_expected = 2; @@ -1695,20 +1602,20 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc) if (read_op && !open_read_now_open_write) { ink_release_assert(write_cluster_vc->pending_remote_fill); - ink_assert(have_all_data || (readahead_vio == &((CacheVC *) cache_vc)->vio)); + ink_assert(have_all_data || (readahead_vio == &((CacheVC *)cache_vc)->vio)); Debug("cache_proto", "connect_local success seqno=%d have_all_data=%d", seq_number, (have_all_data ? 1 : 0)); if (have_all_data) { - msg->token.clear(); // Tell sender no conn established + msg->token.clear(); // Tell sender no conn established write_cluster_vc->type = VC_CLUSTER_WRITE; } else { - msg->token = token; // Tell sender conn established + msg->token = token; // Tell sender conn established setupReadBufTunnel(cache_vc, write_cluster_vc); } } else { Debug("cache_proto", "cache_open [%s] success seqno=%d", (cache_read ? "R" : "W"), seq_number); - msg->token = token; // Tell sender conn established + msg->token = token; // Tell sender conn established OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc(); pOWT->init(read_cluster_vc, cache_vc, NULL, nbytes ? nbytes : DEFAULT_MAX_BUFFER_SIZE, this->mutex); @@ -1723,17 +1630,17 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc) msg->is_ram_cache_hit = ((CacheVC *)cache_vc)->is_ram_cache_hit(); if (!cache_vc_info.valid()) { - (void) getObjectSize(cache_vc, request_opcode, &cache_vc_info); + (void)getObjectSize(cache_vc, request_opcode, &cache_vc_info); } // Determine data length and allocate len = cache_vc_info.marshal_length(); - CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len); + CacheOpReplyMsg *reply = (CacheOpReplyMsg *)ALLOCA_DOUBLE(flen + len); // Initialize reply message header *reply = *msg; // Marshal response data into reply message - res = cache_vc_info.marshal((char *) reply + flen, len); + res = cache_vc_info.marshal((char *)reply + flen, len); ink_assert(res >= 0 && res <= len); // Make reply message the current message @@ -1742,11 +1649,11 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc) } else { Debug("cache_proto", "cache operation failed result=%d seqno=%d (this=%p)", event, seq_number, this); - msg->token.clear(); // Tell sender no conn established + msg->token.clear(); // Tell sender no conn established // Reallocate reply message, allowing for marshalled data len += sizeof(int32_t); - CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len); + CacheOpReplyMsg *reply = (CacheOpReplyMsg *)ALLOCA_DOUBLE(flen + len); // Initialize reply message header *reply = *msg; @@ -1756,24 +1663,24 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc) // open read/write failed, close preallocated VC // if (read_cluster_vc) { - read_cluster_vc->remote_closed = 1; // avoid remote close msg + read_cluster_vc->remote_closed = 1; // avoid remote close msg read_cluster_vc->do_io(VIO::CLOSE); } if (write_cluster_vc) { write_cluster_vc->pending_remote_fill = 0; - write_cluster_vc->remote_closed = 1; // avoid remote close msg + write_cluster_vc->remote_closed = 1; // avoid remote close msg write_cluster_vc->do_io(VIO::CLOSE); } - reply->moi.u32 = (int32_t) ((uintptr_t) cvc & 0xffffffff); // code describing failure + reply->moi.u32 = (int32_t)((uintptr_t)cvc & 0xffffffff); // code describing failure } // Make reply message the current message msg = reply; } CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT); - // - // Send reply message - // +// +// Send reply message +// #ifdef CACHE_MSG_TRACE log_cache_op_sndmsg(msg->seq_number, 0, "replyOpEvent"); #endif @@ -1781,18 +1688,14 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc) if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) { if (read_op) { // Transmit reply message and object data in same cluster message - Debug("cache_proto", "Sending reply/data seqno=%d buflen=%" PRId64, - seq_number, readahead_data ? bytes_IOBufferBlockList(readahead_data, 1) : 0); - clusterProcessor.invoke_remote_data(ch, - CACHE_OP_RESULT_CLUSTER_FUNCTION, - (void *) msg, (flen + len), - readahead_data, - cluster_vc_channel, &token, - &CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL); + Debug("cache_proto", "Sending reply/data seqno=%d buflen=%" PRId64, seq_number, + readahead_data ? bytes_IOBufferBlockList(readahead_data, 1) : 0); + clusterProcessor.invoke_remote_data(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, (void *)msg, (flen + len), readahead_data, + cluster_vc_channel, &token, &CacheContinuation::disposeOfDataBuffer, (void *)this, + CLUSTER_OPT_STEAL); } else { Debug("cache_proto", "Sending reply seqno=%d, (this=%p)", seq_number, this); - clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, - (void *) msg, (flen + len), CLUSTER_OPT_STEAL); + clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, (void *)msg, (flen + len), CLUSTER_OPT_STEAL); } } else { @@ -1812,7 +1715,7 @@ free_exit: } void -CacheContinuation::setupReadBufTunnel(VConnection * cache_read_vc, VConnection * cluster_write_vc) +CacheContinuation::setupReadBufTunnel(VConnection *cache_read_vc, VConnection *cluster_write_vc) { //////////////////////////////////////////////////////////// // Setup OneWayTunnel and tunnel close event handler. @@ -1820,22 +1723,21 @@ CacheContinuation::setupReadBufTunnel(VConnection * cache_read_vc, VConnection * //////////////////////////////////////////////////////////// tunnel_cont = cacheContAllocator_alloc(); tunnel_cont->mutex = this->mutex; - SET_CONTINUATION_HANDLER(tunnel_cont, (CacheContHandler) - & CacheContinuation::tunnelClosedEvent); + SET_CONTINUATION_HANDLER(tunnel_cont, (CacheContHandler)&CacheContinuation::tunnelClosedEvent); int64_t ravail = bytes_IOBufferBlockList(readahead_data, 1); tunnel_mutex = tunnel_cont->mutex; tunnel_closed = false; tunnel = OneWayTunnel::OneWayTunnel_alloc(); - readahead_reader->consume(ravail); // allow for bytes sent in initial reply + readahead_reader->consume(ravail); // allow for bytes sent in initial reply tunnel->init(cache_read_vc, cluster_write_vc, tunnel_cont, readahead_vio, readahead_reader); tunnel_cont->action = this; tunnel_cont->tunnel = tunnel; tunnel_cont->tunnel_cont = tunnel_cont; // Disable cluster_write_vc - ((ClusterVConnection *) cluster_write_vc)->write.enabled = 0; + ((ClusterVConnection *)cluster_write_vc)->write.enabled = 0; // Disable cache read VC readahead_vio->nbytes = readahead_vio->ndone; @@ -1853,11 +1755,11 @@ CacheContinuation::setupReadBufTunnel(VConnection * cache_read_vc, VConnection * int CacheContinuation::tunnelClosedEvent(int /* event ATS_UNUSED */, void *c) { - ink_assert(magicno == (int) MagicNo); + ink_assert(magicno == (int)MagicNo); // Note: We are called with the tunnel_mutex held. - CacheContinuation *tc = (CacheContinuation *) c; + CacheContinuation *tc = (CacheContinuation *)c; ink_release_assert(tc->tunnel_cont == tc); - CacheContinuation *real_cc = (CacheContinuation *) tc->action.continuation; + CacheContinuation *real_cc = (CacheContinuation *)tc->action.continuation; if (real_cc) { // Notify the real continuation of the tunnel closed event @@ -1875,26 +1777,24 @@ CacheContinuation::tunnelClosedEvent(int /* event ATS_UNUSED */, void *c) // Retry DisposeOfDataBuffer continuation //////////////////////////////////////////////////////////// struct retryDisposeOfDataBuffer; -typedef int (retryDisposeOfDataBuffer::*rtryDisOfDBufHandler) (int, void *); -struct retryDisposeOfDataBuffer:public Continuation -{ +typedef int (retryDisposeOfDataBuffer::*rtryDisOfDBufHandler)(int, void *); +struct retryDisposeOfDataBuffer : public Continuation { CacheContinuation *c; - int handleRetryEvent(int event, Event * e) + int + handleRetryEvent(int event, Event *e) { if (CacheContinuation::handleDisposeEvent(event, c) == EVENT_DONE) { delete this; - return EVENT_DONE; - } else - { + return EVENT_DONE; + } else { e->schedule_in(HRTIME_MSECONDS(10)); return EVENT_CONT; } } - retryDisposeOfDataBuffer(CacheContinuation * cont) -: Continuation(new_ProxyMutex()), c(cont) { - SET_HANDLER((rtryDisOfDBufHandler) - & retryDisposeOfDataBuffer::handleRetryEvent); + retryDisposeOfDataBuffer(CacheContinuation *cont) : Continuation(new_ProxyMutex()), c(cont) + { + SET_HANDLER((rtryDisOfDBufHandler)&retryDisposeOfDataBuffer::handleRetryEvent); } }; @@ -1906,9 +1806,9 @@ void CacheContinuation::disposeOfDataBuffer(void *d) { ink_assert(d); - CacheContinuation *cc = (CacheContinuation *) d; + CacheContinuation *cc = (CacheContinuation *)d; ink_assert(cc->have_all_data || cc->readahead_vio); - ink_assert(cc->have_all_data || (cc->readahead_vio == &((CacheVC *) cc->cache_vc)->vio)); + ink_assert(cc->have_all_data || (cc->readahead_vio == &((CacheVC *)cc->cache_vc)->vio)); if (cc->have_all_data) { // @@ -1936,9 +1836,9 @@ CacheContinuation::disposeOfDataBuffer(void *d) } int -CacheContinuation::handleDisposeEvent(int /* event ATS_UNUSED */, CacheContinuation * cc) +CacheContinuation::handleDisposeEvent(int /* event ATS_UNUSED */, CacheContinuation *cc) { - ink_assert(cc->magicno == (int) MagicNo); + ink_assert(cc->magicno == (int)MagicNo); MUTEX_TRY_LOCK(lock, cc->tunnel_mutex, this_ethread()); if (lock.is_locked()) { // Write of initial object data is complete. @@ -1981,13 +1881,13 @@ cache_op_result_ClusterFunct
<TRUNCATED>
