http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/ClusterHandlerBase.cc ---------------------------------------------------------------------- diff --git a/iocore/cluster/ClusterHandlerBase.cc b/iocore/cluster/ClusterHandlerBase.cc index 1fec3ff..d116af0 100644 --- a/iocore/cluster/ClusterHandlerBase.cc +++ b/iocore/cluster/ClusterHandlerBase.cc @@ -40,14 +40,10 @@ extern int num_of_cluster_threads; // Incoming message continuation for periodic callout threads /////////////////////////////////////////////////////////////// -ClusterCalloutContinuation::ClusterCalloutContinuation(struct ClusterHandler *ch) - : -Continuation(0), -_ch(ch) +ClusterCalloutContinuation::ClusterCalloutContinuation(struct ClusterHandler *ch) : Continuation(0), _ch(ch) { mutex = new_ProxyMutex(); - SET_HANDLER((ClstCoutContHandler) - & ClusterCalloutContinuation::CalloutHandler); + SET_HANDLER((ClstCoutContHandler)&ClusterCalloutContinuation::CalloutHandler); } ClusterCalloutContinuation::~ClusterCalloutContinuation() @@ -64,8 +60,8 @@ ClusterCalloutContinuation::CalloutHandler(int /* event ATS_UNUSED */, Event * / /*************************************************************************/ // ClusterControl member functions (Internal Class) /*************************************************************************/ -ClusterControl::ClusterControl(): -Continuation(NULL), len(0), size_index(-1), real_data(0), data(0), free_proc(0), free_proc_arg(0), iob_block(0) +ClusterControl::ClusterControl() + : Continuation(NULL), len(0), size_index(-1), real_data(0), data(0), free_proc(0), free_proc_arg(0), iob_block(0) { } @@ -79,25 +75,25 @@ ClusterControl::real_alloc_data(int read_access, bool align_int32_on_non_int64_b if ((len + DATA_HDR + sizeof(int32_t)) <= DEFAULT_MAX_BUFFER_SIZE) { size_index = buffer_size_to_index(len + DATA_HDR + sizeof(int32_t), MAX_BUFFER_SIZE_INDEX); iob_block = new_IOBufferBlock(); - iob_block->alloc(size_index); // aligns on 8 byte boundary - real_data = (int64_t *) iob_block->buf(); + iob_block->alloc(size_index); // aligns on 8 byte boundary + real_data = (int64_t *)iob_block->buf(); if (align_int32_on_non_int64_boundary) { - data = ((char *) real_data) + sizeof(int32_t) + DATA_HDR; + data = ((char *)real_data) + sizeof(int32_t) + DATA_HDR; } else { - data = ((char *) real_data) + DATA_HDR; + data = ((char *)real_data) + DATA_HDR; } } else { int size = sizeof(int64_t) * (((len + DATA_HDR + sizeof(int32_t) + sizeof(int64_t) - 1) / sizeof(int64_t)) + 1); size_index = -1; iob_block = new_IOBufferBlock(); iob_block->alloc(BUFFER_SIZE_FOR_XMALLOC(size)); - real_data = (int64_t *) iob_block->buf(); + real_data = (int64_t *)iob_block->buf(); if (align_int32_on_non_int64_boundary) { - data = (char *) DOUBLE_ALIGN(real_data) + sizeof(int32_t) + DATA_HDR; + data = (char *)DOUBLE_ALIGN(real_data) + sizeof(int32_t) + DATA_HDR; } else { - data = (char *) DOUBLE_ALIGN(real_data) + DATA_HDR; + data = (char *)DOUBLE_ALIGN(real_data) + DATA_HDR; } CLUSTER_INCREMENT_DYN_STAT(CLUSTER_ALLOC_DATA_NEWS_STAT); } @@ -105,26 +101,26 @@ ClusterControl::real_alloc_data(int read_access, bool align_int32_on_non_int64_b // IOBufferBlock adjustments if (read_access) { // Make iob_block->read_avail() == len - iob_block->fill((char *) data - (char *) real_data); // skip header - iob_block->consume((char *) data - (char *) real_data); // skip header + iob_block->fill((char *)data - (char *)real_data); // skip header + iob_block->consume((char *)data - (char *)real_data); // skip header iob_block->fill(len); } else { // Make iob_block->write_avail() == len - iob_block->fill((char *) data - (char *) real_data); // skip header - iob_block->consume((char *) data - (char *) real_data); // skip header + iob_block->fill((char *)data - (char *)real_data); // skip header + iob_block->consume((char *)data - (char *)real_data); // skip header iob_block->_buf_end = iob_block->end() + len; } // Write size_index, magic number and 'this' in leading bytes - char *size_index_ptr = (char *) data - DATA_HDR; + char *size_index_ptr = (char *)data - DATA_HDR; *size_index_ptr = size_index; ++size_index_ptr; - *size_index_ptr = (char) ALLOC_DATA_MAGIC; + *size_index_ptr = (char)ALLOC_DATA_MAGIC; ++size_index_ptr; - void *val = (void *) this; - memcpy(size_index_ptr, (char *) &val, sizeof(void *)); + void *val = (void *)this; + memcpy(size_index_ptr, (char *)&val, sizeof(void *)); } void @@ -133,20 +129,20 @@ ClusterControl::free_data() if (data && iob_block) { if (free_proc) { // Free memory via callback proc - (*free_proc) (free_proc_arg); - iob_block = 0; // really free memory + (*free_proc)(free_proc_arg); + iob_block = 0; // really free memory return; } if (real_data) { - ink_release_assert(*(((uint8_t *) data) - DATA_HDR + 1) == (uint8_t) ALLOC_DATA_MAGIC); - *(((uint8_t *) data) - DATA_HDR + 1) = (uint8_t) ~ ALLOC_DATA_MAGIC; + ink_release_assert(*(((uint8_t *)data) - DATA_HDR + 1) == (uint8_t)ALLOC_DATA_MAGIC); + *(((uint8_t *)data) - DATA_HDR + 1) = (uint8_t)~ALLOC_DATA_MAGIC; - ink_release_assert(*(((char *) data) - DATA_HDR) == size_index); + ink_release_assert(*(((char *)data) - DATA_HDR) == size_index); } else { // malloc'ed memory, not alloced via real_alloc_data(). // Data will be ats_free()'ed when IOBufferBlock is freed } - iob_block = 0; // free memory + iob_block = 0; // free memory } } @@ -159,8 +155,7 @@ IncomingControl::alloc() return inControlAllocator.alloc(); } -IncomingControl::IncomingControl() -:recognized_time(0) +IncomingControl::IncomingControl() : recognized_time(0) { } @@ -180,27 +175,26 @@ OutgoingControl::alloc() return outControlAllocator.alloc(); } -OutgoingControl::OutgoingControl() -:ch(NULL), submit_time(0) +OutgoingControl::OutgoingControl() : ch(NULL), submit_time(0) { } int -OutgoingControl::startEvent(int event, Event * e) +OutgoingControl::startEvent(int event, Event *e) { // // This event handler is used by ClusterProcessor::invoke_remote() // to delay (CLUSTER_OPT_DELAY) the enqueuing of the control message. // - (void) event; - (void) e; + (void)event; + (void)e; // verify that the machine has not gone down if (!ch || !ch->thread) return EVENT_DONE; - int32_t cluster_fn = *(int32_t *) this->data; + int32_t cluster_fn = *(int32_t *)this->data; int32_t pri = ClusterFuncToQpri(cluster_fn); - ink_atomiclist_push(&ch->outgoing_control_al[pri], (void *) this); + ink_atomiclist_push(&ch->outgoing_control_al[pri], (void *)this); return EVENT_DONE; } @@ -215,26 +209,11 @@ OutgoingControl::freeall() /*************************************************************************/ // ClusterState member functions (Internal Class) /*************************************************************************/ -ClusterState::ClusterState(ClusterHandler * c, bool read_chan): -Continuation(0), -ch(c), -read_channel(read_chan), -do_iodone_event(false), -n_descriptors(0), -sequence_number(0), -to_do(0), -did(0), -n_iov(0), -io_complete(1), -io_complete_event(0), -v(0), -bytes_xfered(0), -last_ndone(0), -total_bytes_xfered(0), -iov(NULL), -iob_iov(NULL), -byte_bank(NULL), -n_byte_bank(0), byte_bank_size(0), missed(0), missed_msg(false), read_state_t(READ_START), write_state_t(WRITE_START) +ClusterState::ClusterState(ClusterHandler *c, bool read_chan) + : Continuation(0), ch(c), read_channel(read_chan), do_iodone_event(false), n_descriptors(0), sequence_number(0), to_do(0), did(0), + n_iov(0), io_complete(1), io_complete_event(0), v(0), bytes_xfered(0), last_ndone(0), total_bytes_xfered(0), iov(NULL), + iob_iov(NULL), byte_bank(NULL), n_byte_bank(0), byte_bank_size(0), missed(0), missed_msg(false), read_state_t(READ_START), + write_state_t(WRITE_START) { mutex = new_ProxyMutex(); if (read_channel) { @@ -258,23 +237,22 @@ n_byte_bank(0), byte_bank_size(0), missed(0), missed_msg(false), read_state_t(RE size_t pagesize = ats_pagesize(); size = ((MAX_TCOUNT + 1) * sizeof(IOVec)) + (2 * pagesize); iob_iov = new_IOBufferData(BUFFER_SIZE_FOR_XMALLOC(size)); - char *addr = (char *) align_pointer_forward(iob_iov->data(), pagesize); + char *addr = (char *)align_pointer_forward(iob_iov->data(), pagesize); - iov = (IOVec *) (addr + pagesize); + iov = (IOVec *)(addr + pagesize); /////////////////////////////////////////////////// // Place an invalid page in front of message data. /////////////////////////////////////////////////// - size = sizeof(ClusterMsgHeader) + (MAX_TCOUNT + 1) * sizeof(Descriptor) - + CONTROL_DATA + (2 * pagesize); + size = sizeof(ClusterMsgHeader) + (MAX_TCOUNT + 1) * sizeof(Descriptor) + CONTROL_DATA + (2 * pagesize); msg.iob_descriptor_block = new_IOBufferBlock(); msg.iob_descriptor_block->alloc(BUFFER_SIZE_FOR_XMALLOC(size)); - addr = (char *) align_pointer_forward(msg.iob_descriptor_block->data->data(), pagesize); + addr = (char *)align_pointer_forward(msg.iob_descriptor_block->data->data(), pagesize); addr = addr + pagesize; memset(addr, 0, size - (2 * pagesize)); - msg.descriptor = (Descriptor *) (addr + sizeof(ClusterMsgHeader)); + msg.descriptor = (Descriptor *)(addr + sizeof(ClusterMsgHeader)); mbuf = new_empty_MIOBuffer(); } @@ -283,11 +261,11 @@ ClusterState::~ClusterState() { mutex = 0; if (iov) { - iob_iov = 0; // Free memory + iob_iov = 0; // Free memory } if (msg.descriptor) { - msg.iob_descriptor_block = 0; // Free memory + msg.iob_descriptor_block = 0; // Free memory } // Deallocate IO Core structures int n; @@ -330,17 +308,17 @@ ClusterState::build_do_io_vector() } #ifdef CLUSTER_TOMCAT -#define REENABLE_IO() \ +#define REENABLE_IO() \ if (!ch->on_stolen_thread && !io_complete) { \ - v->reenable_re(); \ + v->reenable_re(); \ } #else // !CLUSTER_TOMCAT #ifdef CLUSTER_IMMEDIATE_NETIO -#define REENABLE_IO() \ - if (!io_complete) { \ - ((NetVConnection *) v->vc_server)->reenable_re_now(v); \ +#define REENABLE_IO() \ + if (!io_complete) { \ + ((NetVConnection *)v->vc_server)->reenable_re_now(v); \ } #else // !CLUSTER_IMMEDIATE_NETIO @@ -360,7 +338,7 @@ ClusterState::doIO() #if !defined(CLUSTER_IMMEDIATE_NETIO) MUTEX_TRY_LOCK(lock, this->mutex, this_ethread()); if (!lock.is_locked()) { - return 0; // unable to initiate operation + return 0; // unable to initiate operation } #endif @@ -375,7 +353,6 @@ ClusterState::doIO() // Setup and initiate or resume Cluster i/o request to the NetProcessor. // if ((to_do && (io_complete_event == VC_EVENT_READ_READY)) || (io_complete_event == VC_EVENT_WRITE_READY)) { - if (read_channel) { // Partial read case ink_assert(v->buffer.writer()->current_write_avail() == to_do); @@ -425,7 +402,7 @@ ClusterState::doIO() REENABLE_IO(); } } - return 1; // operation initiated + return 1; // operation initiated } int @@ -433,48 +410,45 @@ ClusterState::doIO_read_event(int event, void *d) { ink_release_assert(!io_complete); if (!v) { - v = (VIO *) d; // Immediate callback on first NetVC read + v = (VIO *)d; // Immediate callback on first NetVC read } - ink_assert((VIO *) d == v); + ink_assert((VIO *)d == v); switch (event) { - case VC_EVENT_READ_READY: - { - // Disable read processing - v->nbytes = v->ndone; - // fall through + case VC_EVENT_READ_READY: { + // Disable read processing + v->nbytes = v->ndone; + // fall through + } + case VC_EVENT_READ_COMPLETE: { + bytes_xfered = v->ndone - last_ndone; + if (bytes_xfered) { + total_bytes_xfered += bytes_xfered; + did += bytes_xfered; + to_do -= bytes_xfered; } - case VC_EVENT_READ_COMPLETE: - { - bytes_xfered = v->ndone - last_ndone; - if (bytes_xfered) { - total_bytes_xfered += bytes_xfered; - did += bytes_xfered; - to_do -= bytes_xfered; - } - last_ndone = v->ndone; - io_complete_event = event; - INK_WRITE_MEMORY_BARRIER; + last_ndone = v->ndone; + io_complete_event = event; + INK_WRITE_MEMORY_BARRIER; - io_complete = 1; - IOComplete(); + io_complete = 1; + IOComplete(); - break; - } + break; + } case VC_EVENT_EOS: case VC_EVENT_ERROR: case VC_EVENT_INACTIVITY_TIMEOUT: case VC_EVENT_ACTIVE_TIMEOUT: - default: - { - io_complete_event = event; - INK_WRITE_MEMORY_BARRIER; + default: { + io_complete_event = event; + INK_WRITE_MEMORY_BARRIER; - io_complete = -1; - IOComplete(); - break; - } - } // End of switch + io_complete = -1; + IOComplete(); + break; + } + } // End of switch return EVENT_DONE; } @@ -484,66 +458,64 @@ ClusterState::doIO_write_event(int event, void *d) { ink_release_assert(!io_complete); if (!v) { - v = (VIO *) d; // Immediate callback on first NetVC write + v = (VIO *)d; // Immediate callback on first NetVC write } - ink_assert((VIO *) d == v); + ink_assert((VIO *)d == v); switch (event) { case VC_EVENT_WRITE_READY: #ifdef CLUSTER_IMMEDIATE_NETIO - { - // Disable write processing - v->nbytes = v->ndone; - // fall through - } + { + // Disable write processing + v->nbytes = v->ndone; + // fall through + } #endif - case VC_EVENT_WRITE_COMPLETE: - { - bytes_xfered = v->ndone - last_ndone; - if (bytes_xfered) { - total_bytes_xfered += bytes_xfered; - did += bytes_xfered; - to_do -= bytes_xfered; - } - last_ndone = v->ndone; + case VC_EVENT_WRITE_COMPLETE: { + bytes_xfered = v->ndone - last_ndone; + if (bytes_xfered) { + total_bytes_xfered += bytes_xfered; + did += bytes_xfered; + to_do -= bytes_xfered; + } + last_ndone = v->ndone; #ifdef CLUSTER_IMMEDIATE_NETIO + io_complete_event = event; + INK_WRITE_MEMORY_BARRIER; + + io_complete = 1; + IOComplete(); +#else + if (event == VC_EVENT_WRITE_COMPLETE) { io_complete_event = event; INK_WRITE_MEMORY_BARRIER; io_complete = 1; IOComplete(); -#else - if (event == VC_EVENT_WRITE_COMPLETE) { - io_complete_event = event; - INK_WRITE_MEMORY_BARRIER; - - io_complete = 1; - IOComplete(); + } else { + if (bytes_xfered) { + v->reenable_re(); // Immediate action } else { - if (bytes_xfered) { - v->reenable_re(); // Immediate action - } else { - v->reenable(); - } - return EVENT_DONE; + v->reenable(); } -#endif - break; + return EVENT_DONE; } +#endif + break; + } case VC_EVENT_EOS: case VC_EVENT_ERROR: case VC_EVENT_INACTIVITY_TIMEOUT: case VC_EVENT_ACTIVE_TIMEOUT: - default: - { - io_complete_event = event; - INK_WRITE_MEMORY_BARRIER; + default: { + io_complete_event = event; + INK_WRITE_MEMORY_BARRIER; - io_complete = -1; - IOComplete(); - break; - } - } // End of switch + io_complete = -1; + IOComplete(); + break; + } + } // End of switch return EVENT_DONE; } @@ -559,7 +531,7 @@ ClusterState::IOComplete() if (do_iodone_event && !ch->mutex->thread_holding) { MUTEX_TRY_LOCK(lock, ch->mutex, this_ethread()); if (lock.is_locked()) { - ch->handleEvent(EVENT_IMMEDIATE, (void *) 0); + ch->handleEvent(EVENT_IMMEDIATE, (void *)0); } else { eventProcessor.schedule_imm_signal(ch, ET_CLUSTER); } @@ -567,7 +539,7 @@ ClusterState::IOComplete() } int -ClusterHandler::cluster_signal_and_update(int event, ClusterVConnection * vc, ClusterVConnState * s) +ClusterHandler::cluster_signal_and_update(int event, ClusterVConnection *vc, ClusterVConnState *s) { s->vio._cont->handleEvent(event, &s->vio); @@ -583,7 +555,7 @@ ClusterHandler::cluster_signal_and_update(int event, ClusterVConnection * vc, Cl } int -ClusterHandler::cluster_signal_and_update_locked(int event, ClusterVConnection * vc, ClusterVConnState * s) +ClusterHandler::cluster_signal_and_update_locked(int event, ClusterVConnection *vc, ClusterVConnState *s) { // should assert we have s->vio.mutex s->vio._cont->handleEvent(event, &s->vio); @@ -598,28 +570,28 @@ ClusterHandler::cluster_signal_and_update_locked(int event, ClusterVConnection * } int -ClusterHandler::cluster_signal_error_and_update(ClusterVConnection * vc, ClusterVConnState * s, int lerrno) +ClusterHandler::cluster_signal_error_and_update(ClusterVConnection *vc, ClusterVConnState *s, int lerrno) { s->enabled = 0; vc->lerrno = lerrno; return cluster_signal_and_update(VC_EVENT_ERROR, vc, s); } -bool ClusterHandler::check_channel(int c) +bool +ClusterHandler::check_channel(int c) { // // Check to see that there is enough room to store channel c // while (n_channels <= c) { - int - old_channels = n_channels; + int old_channels = n_channels; if (!n_channels) { n_channels = MIN_CHANNELS; } else { if ((n_channels * 2) <= MAX_CHANNELS) { n_channels = n_channels * 2; } else { - return false; // Limit exceeded + return false; // Limit exceeded } } // Allocate ClusterVConnection table entries @@ -631,7 +603,7 @@ bool ClusterHandler::check_channel(int c) for (int i = old_channels; i < n_channels; i++) { if (local_channel(i)) { if (i > LAST_DEDICATED_CHANNEL) { - channels[i] = (ClusterVConnection *) 1; // mark as invalid + channels[i] = (ClusterVConnection *)1; // mark as invalid channel_data[i] = (struct ChannelData *)ats_malloc(sizeof(struct ChannelData)); memset(channel_data[i], 0, sizeof(struct ChannelData)); channel_data[i]->channel_number = i; @@ -646,11 +618,11 @@ bool ClusterHandler::check_channel(int c) } } } - return true; // OK + return true; // OK } int -ClusterHandler::alloc_channel(ClusterVConnection * vc, int requested) +ClusterHandler::alloc_channel(ClusterVConnection *vc, int requested) { // // Allocate a channel @@ -664,7 +636,7 @@ ClusterHandler::alloc_channel(ClusterVConnection * vc, int requested) cdp = free_local_channels.dequeue(); if (!cdp) { if (!check_channel(n_channels)) { - return -2; // Limit exceeded + return -2; // Limit exceeded } } else { ink_assert(cdp == channel_data[cdp->channel_number]); @@ -673,17 +645,17 @@ ClusterHandler::alloc_channel(ClusterVConnection * vc, int requested) } } while (loops--); - ink_release_assert(i != 0); // required - ink_release_assert(channels[i] == (ClusterVConnection *) 1); // required + ink_release_assert(i != 0); // required + ink_release_assert(channels[i] == (ClusterVConnection *)1); // required Debug(CL_TRACE, "alloc_channel local chan=%d VC=%p", i, vc); } else { if (!check_channel(i)) { - return -2; // Limit exceeded + return -2; // Limit exceeded } if (channels[i]) { Debug(CL_TRACE, "alloc_channel remote inuse chan=%d VC=%p", i, vc); - return -1; // channel in use + return -1; // channel in use } else { Debug(CL_TRACE, "alloc_channel remote chan=%d VC=%p", i, vc); } @@ -694,7 +666,7 @@ ClusterHandler::alloc_channel(ClusterVConnection * vc, int requested) } void -ClusterHandler::free_channel(ClusterVConnection * vc) +ClusterHandler::free_channel(ClusterVConnection *vc) { // // Free a channel @@ -702,7 +674,7 @@ ClusterHandler::free_channel(ClusterVConnection * vc) int i = vc->channel; if (i > LAST_DEDICATED_CHANNEL && channels[i] == vc) { if (local_channel(i)) { - channels[i] = (ClusterVConnection *) 1; + channels[i] = (ClusterVConnection *)1; free_local_channels.enqueue(channel_data[i]); Debug(CL_TRACE, "free_channel local chan=%d VC=%p", i, vc); } else { @@ -721,13 +693,13 @@ ClusterHandler::machine_down() if (dead) { return EVENT_DONE; } - // - // Looks like this machine dropped out of the cluster. - // Deal with it. - // Fatal read/write errors on the node to node connection along - // with failure of the cluster membership check in the periodic event - // result in machine_down(). - // +// +// Looks like this machine dropped out of the cluster. +// Deal with it. +// Fatal read/write errors on the node to node connection along +// with failure of the cluster membership check in the periodic event +// result in machine_down(). +// #ifdef LOCAL_CLUSTER_TEST_MODE Note("machine down %u.%u.%u.%u:%d", DOT_SEPARATED(ip), port); #else @@ -759,7 +731,7 @@ ClusterHandler::machine_down() Debug(CL_NOTE, "cluster connect retry for %hhu.%hhu.%hhu.%hhu", DOT_SEPARATED(ip)); clusterProcessor.connect(ip, port, id); } - return zombify(); // defer deletion of *this + return zombify(); // defer deletion of *this } int @@ -776,7 +748,7 @@ ClusterHandler::zombify(Event * /* e ATS_UNUSED */) } clm->cancel_monitor(); - SET_HANDLER((ClusterContHandler) & ClusterHandler::protoZombieEvent); + SET_HANDLER((ClusterContHandler)&ClusterHandler::protoZombieEvent); // // At this point, allow the caller (either process_read/write to complete) // prior to performing node down actions. @@ -786,9 +758,8 @@ ClusterHandler::zombify(Event * /* e ATS_UNUSED */) } int -ClusterHandler::connectClusterEvent(int event, Event * e) +ClusterHandler::connectClusterEvent(int event, Event *e) { - if ((event == EVENT_IMMEDIATE) || (event == EVENT_INTERVAL)) { // // Attempt connect to target node and if successful, setup the event @@ -811,8 +782,8 @@ ClusterHandler::connectClusterEvent(int event, Event * e) return EVENT_DONE; } // Connect to cluster member - Debug(CL_NOTE, "connect_re from %u.%u.%u.%u to %u.%u.%u.%u", - DOT_SEPARATED(this_cluster_machine()->ip), DOT_SEPARATED(machine->ip)); + Debug(CL_NOTE, "connect_re from %u.%u.%u.%u to %u.%u.%u.%u", DOT_SEPARATED(this_cluster_machine()->ip), + DOT_SEPARATED(machine->ip)); ip = machine->ip; NetVCOptions opt; @@ -826,16 +797,15 @@ ClusterHandler::connectClusterEvent(int event, Event * e) opt.local_ip = this_cluster_machine()->ip; struct sockaddr_in addr; - ats_ip4_set(&addr, machine->ip, - htons(machine->cluster_port ? machine->cluster_port : cluster_port)); + ats_ip4_set(&addr, machine->ip, htons(machine->cluster_port ? machine->cluster_port : cluster_port)); // TODO: Should we check the Action* returned here? netProcessor.connect_re(this, ats_ip_sa_cast(&addr), &opt); return EVENT_DONE; } else { if (event == NET_EVENT_OPEN) { - net_vc = (NetVConnection *) e; - SET_HANDLER((ClusterContHandler) & ClusterHandler::startClusterEvent); + net_vc = (NetVConnection *)e; + SET_HANDLER((ClusterContHandler)&ClusterHandler::startClusterEvent); eventProcessor.schedule_imm(this, ET_CLUSTER); return EVENT_DONE; @@ -847,13 +817,13 @@ ClusterHandler::connectClusterEvent(int event, Event * e) } int -ClusterHandler::startClusterEvent(int event, Event * e) +ClusterHandler::startClusterEvent(int event, Event *e) { char textbuf[sizeof("255.255.255.255:65535")]; // Perform the node to node connection establish protocol. - (void) event; + (void)event; ink_assert(!read_vcs); ink_assert(!write_vcs); @@ -868,26 +838,25 @@ ClusterHandler::startClusterEvent(int event, Event * e) } for (;;) { - switch (cluster_connect_state) { - //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// case ClusterHandler::CLCON_INITIAL: //////////////////////////////////////////////////////////////////////////// { ink_release_assert(!"Invalid state [CLCON_INITIAL]"); } - //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// case ClusterHandler::CLCON_SEND_MSG: //////////////////////////////////////////////////////////////////////////// { - // Send initial message. +// Send initial message. #ifdef LOCAL_CLUSTER_TEST_MODE nodeClusteringVersion._port = cluster_port; #endif cluster_connect_state = ClusterHandler::CLCON_SEND_MSG_COMPLETE; if (connector) nodeClusteringVersion._id = id; - build_data_vector((char *) &nodeClusteringVersion, sizeof(nodeClusteringVersion), false); + build_data_vector((char *)&nodeClusteringVersion, sizeof(nodeClusteringVersion), false); if (!write.doIO()) { // i/o not initiated, delay and retry cluster_connect_state = ClusterHandler::CLCON_SEND_MSG; @@ -896,20 +865,18 @@ ClusterHandler::startClusterEvent(int event, Event * e) } break; } - //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// case ClusterHandler::CLCON_SEND_MSG_COMPLETE: //////////////////////////////////////////////////////////////////////////// { if (write.io_complete) { - if ((write.io_complete < 0) - || ((size_t) write.did < sizeof(nodeClusteringVersion))) { - Debug(CL_NOTE, "unable to write to cluster node %u.%u.%u.%u: %d", - DOT_SEPARATED(ip), write.io_complete_event); + if ((write.io_complete < 0) || ((size_t)write.did < sizeof(nodeClusteringVersion))) { + Debug(CL_NOTE, "unable to write to cluster node %u.%u.%u.%u: %d", DOT_SEPARATED(ip), write.io_complete_event); cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT; - break; // goto next state + break; // goto next state } // Write OK, await message from peer node. - build_data_vector((char *) &clusteringVersion, sizeof(clusteringVersion), true); + build_data_vector((char *)&clusteringVersion, sizeof(clusteringVersion), true); cluster_connect_state = ClusterHandler::CLCON_READ_MSG; break; } else { @@ -918,7 +885,7 @@ ClusterHandler::startClusterEvent(int event, Event * e) return EVENT_DONE; } } - //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// case ClusterHandler::CLCON_READ_MSG: //////////////////////////////////////////////////////////////////////////// { @@ -931,7 +898,7 @@ ClusterHandler::startClusterEvent(int event, Event * e) } break; } - //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// case ClusterHandler::CLCON_READ_MSG_COMPLETE: //////////////////////////////////////////////////////////////////////////// { @@ -939,9 +906,9 @@ ClusterHandler::startClusterEvent(int event, Event * e) if (read.io_complete < 0) { // Read error, abort connect cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT; - break; // goto next state + break; // goto next state } - if ((size_t) read.did < sizeof(clusteringVersion)) { + if ((size_t)read.did < sizeof(clusteringVersion)) { // Partial read, resume read. cluster_connect_state = ClusterHandler::CLCON_READ_MSG; break; @@ -954,7 +921,7 @@ ClusterHandler::startClusterEvent(int event, Event * e) return EVENT_DONE; } } - //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// case ClusterHandler::CLCON_VALIDATE_MSG: //////////////////////////////////////////////////////////////////////////// { @@ -982,17 +949,17 @@ ClusterHandler::startClusterEvent(int event, Event * e) proto_minor = clusteringVersion._minor; if (proto_minor != nodeClusteringVersion._minor) - Warning("Different clustering minor versions (%d,%d) for node %u.%u.%u.%u, continuing", - proto_minor, nodeClusteringVersion._minor, DOT_SEPARATED(ip)); + Warning("Different clustering minor versions (%d,%d) for node %u.%u.%u.%u, continuing", proto_minor, + nodeClusteringVersion._minor, DOT_SEPARATED(ip)); } else { proto_minor = 0; } } else { - Warning("Bad cluster major version range (%d-%d) for node %u.%u.%u.%u connect failed", - clusteringVersion._min_major, clusteringVersion._major, DOT_SEPARATED(ip)); + Warning("Bad cluster major version range (%d-%d) for node %u.%u.%u.%u connect failed", clusteringVersion._min_major, + clusteringVersion._major, DOT_SEPARATED(ip)); cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT; - break; // goto next state + break; // goto next state } #ifdef LOCAL_CLUSTER_TEST_MODE @@ -1018,152 +985,149 @@ ClusterHandler::startClusterEvent(int event, Event * e) } } - case ClusterHandler::CLCON_CONN_BIND_CLEAR: - { - UnixNetVConnection *vc = (UnixNetVConnection *)net_vc; - MUTEX_TRY_LOCK(lock, vc->nh->mutex, e->ethread); - MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread); - if (lock.is_locked() && lock1.is_locked()) { - vc->ep.stop(); - vc->nh->open_list.remove(vc); - vc->thread = NULL; - if (vc->nh->read_ready_list.in(vc)) - vc->nh->read_ready_list.remove(vc); - if (vc->nh->write_ready_list.in(vc)) - vc->nh->write_ready_list.remove(vc); - if (vc->read.in_enabled_list) - vc->nh->read_enable_list.remove(vc); - if (vc->write.in_enabled_list) - vc->nh->write_enable_list.remove(vc); - - // CLCON_CONN_BIND handle in bind vc->thread (bind thread nh) - cluster_connect_state = ClusterHandler::CLCON_CONN_BIND; - thread->schedule_in(this, CLUSTER_PERIOD); - return EVENT_DONE; - } else { - // CLCON_CONN_BIND_CLEAR handle in origin vc->thread (origin thread nh) - vc->thread->schedule_in(this, CLUSTER_PERIOD); - return EVENT_DONE; - } + case ClusterHandler::CLCON_CONN_BIND_CLEAR: { + UnixNetVConnection *vc = (UnixNetVConnection *)net_vc; + MUTEX_TRY_LOCK(lock, vc->nh->mutex, e->ethread); + MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread); + if (lock.is_locked() && lock1.is_locked()) { + vc->ep.stop(); + vc->nh->open_list.remove(vc); + vc->thread = NULL; + if (vc->nh->read_ready_list.in(vc)) + vc->nh->read_ready_list.remove(vc); + if (vc->nh->write_ready_list.in(vc)) + vc->nh->write_ready_list.remove(vc); + if (vc->read.in_enabled_list) + vc->nh->read_enable_list.remove(vc); + if (vc->write.in_enabled_list) + vc->nh->write_enable_list.remove(vc); + + // CLCON_CONN_BIND handle in bind vc->thread (bind thread nh) + cluster_connect_state = ClusterHandler::CLCON_CONN_BIND; + thread->schedule_in(this, CLUSTER_PERIOD); + return EVENT_DONE; + } else { + // CLCON_CONN_BIND_CLEAR handle in origin vc->thread (origin thread nh) + vc->thread->schedule_in(this, CLUSTER_PERIOD); + return EVENT_DONE; } + } - case ClusterHandler::CLCON_CONN_BIND: - { - // - NetHandler *nh = get_NetHandler(e->ethread); - UnixNetVConnection *vc = (UnixNetVConnection *)net_vc; - MUTEX_TRY_LOCK(lock, nh->mutex, e->ethread); - MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread); - if (lock.is_locked() && lock1.is_locked()) { - if (vc->read.in_enabled_list) - nh->read_enable_list.push(vc); - if (vc->write.in_enabled_list) - nh->write_enable_list.push(vc); - - vc->nh = nh; - vc->thread = e->ethread; - PollDescriptor *pd = get_PollDescriptor(e->ethread); - if (vc->ep.start(pd, vc, EVENTIO_READ|EVENTIO_WRITE) < 0) { - cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT; - break; // goto next state - } - - nh->open_list.enqueue(vc); - cluster_connect_state = ClusterHandler::CLCON_CONN_BIND_OK; - } else { - thread->schedule_in(this, CLUSTER_PERIOD); - return EVENT_DONE; + case ClusterHandler::CLCON_CONN_BIND: { + // + NetHandler *nh = get_NetHandler(e->ethread); + UnixNetVConnection *vc = (UnixNetVConnection *)net_vc; + MUTEX_TRY_LOCK(lock, nh->mutex, e->ethread); + MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread); + if (lock.is_locked() && lock1.is_locked()) { + if (vc->read.in_enabled_list) + nh->read_enable_list.push(vc); + if (vc->write.in_enabled_list) + nh->write_enable_list.push(vc); + + vc->nh = nh; + vc->thread = e->ethread; + PollDescriptor *pd = get_PollDescriptor(e->ethread); + if (vc->ep.start(pd, vc, EVENTIO_READ | EVENTIO_WRITE) < 0) { + cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT; + break; // goto next state } + + nh->open_list.enqueue(vc); + cluster_connect_state = ClusterHandler::CLCON_CONN_BIND_OK; + } else { + thread->schedule_in(this, CLUSTER_PERIOD); + return EVENT_DONE; } + } - case ClusterHandler::CLCON_CONN_BIND_OK: - { - int failed = 0; - - // include this node into the cluster configuration - MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread()); - MachineList *cc = the_cluster_config(); - if (cc && cc->find(ip, port)) { - ClusterConfiguration *c = this_cluster()->current_configuration(); - ClusterMachine *m = c->find(ip, port); - - if (!m) { // this first connection - ClusterConfiguration *cconf = configuration_add_machine(c, machine); - CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NODES_STAT); - this_cluster()->configurations.push(cconf); - } else { - // close new connection if old connections is exist - if (id >= m->num_connections || m->clusterHandlers[id]) { - failed = -2; - MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread()); - goto failed; - } - machine = m; - } - machine->now_connections++; - machine->clusterHandlers[id] = this; - machine->dead = false; - dead = false; + case ClusterHandler::CLCON_CONN_BIND_OK: { + int failed = 0; + + // include this node into the cluster configuration + MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread()); + MachineList *cc = the_cluster_config(); + if (cc && cc->find(ip, port)) { + ClusterConfiguration *c = this_cluster()->current_configuration(); + ClusterMachine *m = c->find(ip, port); + + if (!m) { // this first connection + ClusterConfiguration *cconf = configuration_add_machine(c, machine); + CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NODES_STAT); + this_cluster()->configurations.push(cconf); } else { - Debug(CL_NOTE, "cluster connect aborted, machine %u.%u.%u.%u:%d not in cluster", DOT_SEPARATED(ip), port); - failed = -1; + // close new connection if old connections is exist + if (id >= m->num_connections || m->clusterHandlers[id]) { + failed = -2; + MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread()); + goto failed; + } + machine = m; } - MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread()); -failed: - if (failed) { - if (failed == -1) { - if (++configLookupFails <= CONFIG_LOOKUP_RETRIES) { - thread->schedule_in(this, CLUSTER_PERIOD); - return EVENT_DONE; - } + machine->now_connections++; + machine->clusterHandlers[id] = this; + machine->dead = false; + dead = false; + } else { + Debug(CL_NOTE, "cluster connect aborted, machine %u.%u.%u.%u:%d not in cluster", DOT_SEPARATED(ip), port); + failed = -1; + } + MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread()); + failed: + if (failed) { + if (failed == -1) { + if (++configLookupFails <= CONFIG_LOOKUP_RETRIES) { + thread->schedule_in(this, CLUSTER_PERIOD); + return EVENT_DONE; } - cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT; - break; // goto next state } + cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT; + break; // goto next state + } - this->needByteSwap = !clusteringVersion.NativeByteOrder(); - machine_online_APIcallout(ip); + this->needByteSwap = !clusteringVersion.NativeByteOrder(); + machine_online_APIcallout(ip); - // Signal the manager - snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(ip), port); - RecSignalManager(REC_SIGNAL_MACHINE_UP, textbuf); + // Signal the manager + snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(ip), port); + RecSignalManager(REC_SIGNAL_MACHINE_UP, textbuf); #ifdef LOCAL_CLUSTER_TEST_MODE - Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d", - DOT_SEPARATED(ip), port, clusteringVersion._major, clusteringVersion._minor); + Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d", DOT_SEPARATED(ip), port, clusteringVersion._major, + clusteringVersion._minor); #else - Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d", - DOT_SEPARATED(ip), id, clusteringVersion._major, clusteringVersion._minor); + Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d", DOT_SEPARATED(ip), id, clusteringVersion._major, + clusteringVersion._minor); #endif - read_vcs = new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_read_link>[CLUSTER_BUCKETS]; - write_vcs = new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_write_link>[CLUSTER_BUCKETS]; - SET_HANDLER((ClusterContHandler) & ClusterHandler::beginClusterEvent); + read_vcs = new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_read_link>[CLUSTER_BUCKETS]; + write_vcs = new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_write_link>[CLUSTER_BUCKETS]; + SET_HANDLER((ClusterContHandler)&ClusterHandler::beginClusterEvent); - // enable schedule_imm() on i/o completion (optimization) - read.do_iodone_event = true; - write.do_iodone_event = true; + // enable schedule_imm() on i/o completion (optimization) + read.do_iodone_event = true; + write.do_iodone_event = true; - cluster_periodic_event = thread->schedule_every(this, -CLUSTER_PERIOD); + cluster_periodic_event = thread->schedule_every(this, -CLUSTER_PERIOD); - // Startup the periodic events to process entries in - // external_incoming_control. + // Startup the periodic events to process entries in + // external_incoming_control. - int procs_online = ink_number_of_processors(); - int total_callbacks = min(procs_online, MAX_COMPLETION_CALLBACK_EVENTS); - for (int n = 0; n < total_callbacks; ++n) { - callout_cont[n] = new ClusterCalloutContinuation(this); - callout_events[n] = eventProcessor.schedule_every(callout_cont[n], COMPLETION_CALLBACK_PERIOD, ET_NET); - } + int procs_online = ink_number_of_processors(); + int total_callbacks = min(procs_online, MAX_COMPLETION_CALLBACK_EVENTS); + for (int n = 0; n < total_callbacks; ++n) { + callout_cont[n] = new ClusterCalloutContinuation(this); + callout_events[n] = eventProcessor.schedule_every(callout_cont[n], COMPLETION_CALLBACK_PERIOD, ET_NET); + } - // Start cluster interconnect load monitoring + // Start cluster interconnect load monitoring - if (!clm) { - clm = new ClusterLoadMonitor(this); - clm->init(); - } - return EVENT_DONE; + if (!clm) { + clm = new ClusterLoadMonitor(this); + clm->init(); } - //////////////////////////////////////////////////////////////////////////// + return EVENT_DONE; + } + //////////////////////////////////////////////////////////////////////////// case ClusterHandler::CLCON_ABORT_CONNECT: //////////////////////////////////////////////////////////////////////////// { @@ -1173,9 +1137,9 @@ failed: clusterProcessor.connect(ip, port, id, true); } cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT; - break; // goto next state + break; // goto next state } - //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// case ClusterHandler::CLCON_DELETE_CONNECT: //////////////////////////////////////////////////////////////////////////// { @@ -1186,7 +1150,7 @@ failed: Debug(CL_NOTE, "Failed cluster connect, deleting"); return EVENT_DONE; } - //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// default: //////////////////////////////////////////////////////////////////////////// { @@ -1195,38 +1159,38 @@ failed: return EVENT_DONE; } - } // End of switch - } // End of for + } // End of switch + } // End of for return EVENT_DONE; } int -ClusterHandler::beginClusterEvent(int /* event ATS_UNUSED */, Event * e) +ClusterHandler::beginClusterEvent(int /* event ATS_UNUSED */, Event *e) { - // Establish the main periodic Cluster event +// Establish the main periodic Cluster event #ifdef CLUSTER_IMMEDIATE_NETIO build_poll(false); #endif - SET_HANDLER((ClusterContHandler) & ClusterHandler::mainClusterEvent); + SET_HANDLER((ClusterContHandler)&ClusterHandler::mainClusterEvent); return handleEvent(EVENT_INTERVAL, e); } int -ClusterHandler::zombieClusterEvent(int event, Event * e) +ClusterHandler::zombieClusterEvent(int event, Event *e) { // // The ZOMBIE state is entered when the handler may still be referenced // by short running tasks (one scheduling quanta). The object is delayed // after some unreasonably long (in comparison) time. // - (void) event; - (void) e; - delete this; // I am out of here + (void)event; + (void)e; + delete this; // I am out of here return EVENT_DONE; } int -ClusterHandler::protoZombieEvent(int /* event ATS_UNUSED */, Event * e) +ClusterHandler::protoZombieEvent(int /* event ATS_UNUSED */, Event *e) { // // Node associated with *this is declared down. @@ -1244,9 +1208,8 @@ ClusterHandler::protoZombieEvent(int /* event ATS_UNUSED */, Event * e) mainClusterEvent(EVENT_INTERVAL, e); item.data = external_incoming_open_local.head.data; - if (TO_PTR(FREELIST_POINTER(item)) || - delayed_reads.head || pw_write_descriptors_built - || pw_freespace_descriptors_built || pw_controldata_descriptors_built) { + if (TO_PTR(FREELIST_POINTER(item)) || delayed_reads.head || pw_write_descriptors_built || pw_freespace_descriptors_built || + pw_controldata_descriptors_built) { // Operations still pending, retry later if (e) { e->schedule_in(delay); @@ -1282,8 +1245,7 @@ ClusterHandler::protoZombieEvent(int /* event ATS_UNUSED */, Event * e) } } vc = channels[i]; - if (VALID_CHANNEL(vc) - && !vc->closed && vc->write.vio.op == VIO::WRITE) { + if (VALID_CHANNEL(vc) && !vc->closed && vc->write.vio.op == VIO::WRITE) { MUTEX_TRY_LOCK(lock, vc->write.vio.mutex, t); if (lock.is_locked()) { cluster_signal_error_and_update(vc, &vc->write, 0); @@ -1332,7 +1294,7 @@ ClusterHandler::protoZombieEvent(int /* event ATS_UNUSED */, Event * e) if (!failed) { Debug("cluster_down", "ClusterHandler zombie [%u.%u.%u.%u]", DOT_SEPARATED(ip)); - SET_HANDLER((ClusterContHandler) & ClusterHandler::zombieClusterEvent); + SET_HANDLER((ClusterContHandler)&ClusterHandler::zombieClusterEvent); delay = NO_RACE_DELAY; } if (e) { @@ -1357,10 +1319,8 @@ ClusterHandler::compute_active_channels() if (VALID_CHANNEL(vc) && (vc->iov_map != CLUSTER_IOV_NOT_OPEN)) { ++active_chans; if (dump_verbose) { - printf("ch[%d] vc=0x%p remote_free=%d last_local_free=%d\n", i, vc, - vc->remote_free, vc->last_local_free); - printf(" r_bytes=%d r_done=%d w_bytes=%d w_done=%d\n", - (int)vc->read.vio.nbytes, (int)vc->read.vio.ndone, + printf("ch[%d] vc=0x%p remote_free=%d last_local_free=%d\n", i, vc, vc->remote_free, vc->last_local_free); + printf(" r_bytes=%d r_done=%d w_bytes=%d w_done=%d\n", (int)vc->read.vio.nbytes, (int)vc->read.vio.ndone, (int)vc->write.vio.nbytes, (int)vc->write.vio.ndone); } } @@ -1383,41 +1343,34 @@ ClusterHandler::dump_internal_data() r = snprintf(&b[n], b_size - n, "Host: %hhu.%hhu.%hhu.%hhu\n", DOT_SEPARATED(ip)); n += r; - r = snprintf(&b[n], b_size - n, - "chans: %d vc_writes: %" PRId64 " write_bytes: %" PRId64 "(d)+%" PRId64 "(c)=%" PRId64 "\n", - compute_active_channels(), - _vc_writes, _vc_write_bytes, _control_write_bytes, _vc_write_bytes + _control_write_bytes); + r = + snprintf(&b[n], b_size - n, "chans: %d vc_writes: %" PRId64 " write_bytes: %" PRId64 "(d)+%" PRId64 "(c)=%" PRId64 "\n", + compute_active_channels(), _vc_writes, _vc_write_bytes, _control_write_bytes, _vc_write_bytes + _control_write_bytes); n += r; - r = snprintf(&b[n], b_size - n, - "dw: missed_lock: %d not_enabled: %d wait_remote_fill: %d no_active_vio: %d\n", - _dw_missed_lock, _dw_not_enabled, _dw_wait_remote_fill, _dw_no_active_vio); + r = snprintf(&b[n], b_size - n, "dw: missed_lock: %d not_enabled: %d wait_remote_fill: %d no_active_vio: %d\n", _dw_missed_lock, + _dw_not_enabled, _dw_wait_remote_fill, _dw_no_active_vio); n += r; - r = snprintf(&b[n], b_size - n, - "dw: not_enabled_or_no_write: %d set_data_pending: %d no_free_space: %d\n", + r = snprintf(&b[n], b_size - n, "dw: not_enabled_or_no_write: %d set_data_pending: %d no_free_space: %d\n", _dw_not_enabled_or_no_write, _dw_set_data_pending, _dw_no_free_space); n += r; - r = snprintf(&b[n], b_size - n, - "fw: missed_lock: %d not_enabled: %d wait_remote_fill: %d no_active_vio: %d\n", - _fw_missed_lock, _fw_not_enabled, _fw_wait_remote_fill, _fw_no_active_vio); + r = snprintf(&b[n], b_size - n, "fw: missed_lock: %d not_enabled: %d wait_remote_fill: %d no_active_vio: %d\n", _fw_missed_lock, + _fw_not_enabled, _fw_wait_remote_fill, _fw_no_active_vio); n += r; r = snprintf(&b[n], b_size - n, "fw: not_enabled_or_no_read: %d\n", _fw_not_enabled_or_no_read); n += r; - r = snprintf(&b[n], b_size - n, - "rd(%d): st:%d rh:%d ahd:%d sd:%d rd:%d ad:%d sda:%d rda:%d awd:%d p:%d c:%d\n", - _process_read_calls, _n_read_start, _n_read_header, _n_read_await_header, - _n_read_setup_descriptor, _n_read_descriptor, _n_read_await_descriptor, - _n_read_setup_data, _n_read_data, _n_read_await_data, _n_read_post_complete, _n_read_complete); + r = snprintf(&b[n], b_size - n, "rd(%d): st:%d rh:%d ahd:%d sd:%d rd:%d ad:%d sda:%d rda:%d awd:%d p:%d c:%d\n", + _process_read_calls, _n_read_start, _n_read_header, _n_read_await_header, _n_read_setup_descriptor, + _n_read_descriptor, _n_read_await_descriptor, _n_read_setup_data, _n_read_data, _n_read_await_data, + _n_read_post_complete, _n_read_complete); n += r; - r = snprintf(&b[n], b_size - n, - "wr(%d): st:%d set:%d ini:%d wait:%d post:%d comp:%d\n", - _process_write_calls, _n_write_start, _n_write_setup, _n_write_initiate, - _n_write_await_completion, _n_write_post_complete, _n_write_complete); + r = snprintf(&b[n], b_size - n, "wr(%d): st:%d set:%d ini:%d wait:%d post:%d comp:%d\n", _process_write_calls, _n_write_start, + _n_write_setup, _n_write_initiate, _n_write_await_completion, _n_write_post_complete, _n_write_complete); n += r; ink_release_assert((n + 1) <= BUFFER_SIZE_FOR_INDEX(MAX_IOBUFFER_SIZE)); @@ -1430,16 +1383,13 @@ ClusterHandler::dump_write_msg(int res) { // Debug support for inter cluster message trace Alias32 x; - x.u32 = (uint32_t) ((struct sockaddr_in *)(net_vc->get_remote_addr()))->sin_addr.s_addr; + x.u32 = (uint32_t)((struct sockaddr_in *)(net_vc->get_remote_addr()))->sin_addr.s_addr; - fprintf(stderr, - "[W] %hhu.%hhu.%hhu.%hhu SeqNo=%u, Cnt=%d, CntlCnt=%d Todo=%d, Res=%d\n", - x.byte[0], x.byte[1], x.byte[2], x.byte[3], write.sequence_number, write.msg.count, write.msg.control_bytes, write.to_do, res); + fprintf(stderr, "[W] %hhu.%hhu.%hhu.%hhu SeqNo=%u, Cnt=%d, CntlCnt=%d Todo=%d, Res=%d\n", x.byte[0], x.byte[1], x.byte[2], + x.byte[3], write.sequence_number, write.msg.count, write.msg.control_bytes, write.to_do, res); for (int i = 0; i < write.msg.count; ++i) { - fprintf(stderr, " d[%i] Type=%d, Chan=%d, SeqNo=%d, Len=%u\n", - i, (write.msg.descriptor[i].type ? 1 : 0), - (int) write.msg.descriptor[i].channel, - (int) write.msg.descriptor[i].sequence_number, write.msg.descriptor[i].length); + fprintf(stderr, " d[%i] Type=%d, Chan=%d, SeqNo=%d, Len=%u\n", i, (write.msg.descriptor[i].type ? 1 : 0), + (int)write.msg.descriptor[i].channel, (int)write.msg.descriptor[i].sequence_number, write.msg.descriptor[i].length); } } @@ -1448,15 +1398,13 @@ ClusterHandler::dump_read_msg() { // Debug support for inter cluster message trace Alias32 x; - x.u32 = (uint32_t) ((struct sockaddr_in *)(net_vc->get_remote_addr()))->sin_addr.s_addr; + x.u32 = (uint32_t)((struct sockaddr_in *)(net_vc->get_remote_addr()))->sin_addr.s_addr; - fprintf(stderr, "[R] %hhu.%hhu.%hhu.%hhu SeqNo=%u, Cnt=%d, CntlCnt=%d\n", - x.byte[0], x.byte[1], x.byte[2], x.byte[3], read.sequence_number, read.msg.count, read.msg.control_bytes); + fprintf(stderr, "[R] %hhu.%hhu.%hhu.%hhu SeqNo=%u, Cnt=%d, CntlCnt=%d\n", x.byte[0], x.byte[1], x.byte[2], x.byte[3], + read.sequence_number, read.msg.count, read.msg.control_bytes); for (int i = 0; i < read.msg.count; ++i) { - fprintf(stderr, " d[%i] Type=%d, Chan=%d, SeqNo=%d, Len=%u\n", - i, (read.msg.descriptor[i].type ? 1 : 0), - (int) read.msg.descriptor[i].channel, - (int) read.msg.descriptor[i].sequence_number, read.msg.descriptor[i].length); + fprintf(stderr, " d[%i] Type=%d, Chan=%d, SeqNo=%d, Len=%u\n", i, (read.msg.descriptor[i].type ? 1 : 0), + (int)read.msg.descriptor[i].channel, (int)read.msg.descriptor[i].sequence_number, read.msg.descriptor[i].length); } }
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/ClusterHash.cc ---------------------------------------------------------------------- diff --git a/iocore/cluster/ClusterHash.cc b/iocore/cluster/ClusterHash.cc index 08dc0d1..b171f98 100644 --- a/iocore/cluster/ClusterHash.cc +++ b/iocore/cluster/ClusterHash.cc @@ -53,7 +53,6 @@ bool randClusterHash = false; // bool randClusterHash = true; - // // Cluster Hash Table // @@ -86,7 +85,7 @@ next_rnd15(unsigned int *p) // Overall it is roughly linear in the number of nodes. // void -build_hash_table_machine(ClusterConfiguration * c) +build_hash_table_machine(ClusterConfiguration *c) { int left = CLUSTER_HASH_TABLE_SIZE; int m = 0; @@ -105,8 +104,7 @@ build_hash_table_machine(ClusterConfiguration * c) // do a little xor folding to get it into 15 bits // for (m = 0; m < c->n_machines; m++) - rnd[m] = (((c->machines[m]->ip >> 15) & 0x7FFF) ^ (c->machines[m]->ip & 0x7FFF)) - ^ (c->machines[m]->ip >> 30); + rnd[m] = (((c->machines[m]->ip >> 15) & 0x7FFF) ^ (c->machines[m]->ip & 0x7FFF)) ^ (c->machines[m]->ip >> 30); // Initialize the table to "empty" // @@ -136,7 +134,7 @@ build_hash_table_machine(ClusterConfiguration * c) } static void -build_hash_table_bucket(ClusterConfiguration * c) +build_hash_table_bucket(ClusterConfiguration *c) { int i = 0; unsigned int rnd[CLUSTER_HASH_TABLE_SIZE]; @@ -166,7 +164,7 @@ build_hash_table_bucket(ClusterConfiguration * c) } void -build_cluster_hash_table(ClusterConfiguration * c) +build_cluster_hash_table(ClusterConfiguration *c) { if (machineClusterHash) build_hash_table_machine(c); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/ClusterLib.cc ---------------------------------------------------------------------- diff --git a/iocore/cluster/ClusterLib.cc b/iocore/cluster/ClusterLib.cc index 6ff5c27..37360fc 100644 --- a/iocore/cluster/ClusterLib.cc +++ b/iocore/cluster/ClusterLib.cc @@ -35,7 +35,7 @@ // scheduling only occurs after they move into the data_bucket. // void -cluster_schedule(ClusterHandler * ch, ClusterVConnection * vc, ClusterVConnState * ns) +cluster_schedule(ClusterHandler *ch, ClusterVConnection *vc, ClusterVConnState *ns) { // // actually schedule into new bucket @@ -52,7 +52,7 @@ cluster_schedule(ClusterHandler * ch, ClusterVConnection * vc, ClusterVConnState } void -cluster_reschedule_offset(ClusterHandler * ch, ClusterVConnection * vc, ClusterVConnState * ns, int offset) +cluster_reschedule_offset(ClusterHandler *ch, ClusterVConnection *vc, ClusterVConnState *ns, int offset) { if (ns == &vc->read) { if (vc->read.queue) @@ -79,7 +79,7 @@ ClusterVCToken::alloc() #else ip_created = this_cluster_machine()->ip; #endif - sequence_number = ink_atomic_increment((int *) &cluster_sequence_number, 1); + sequence_number = ink_atomic_increment((int *)&cluster_sequence_number, 1); } /////////////////////////////////////////// @@ -87,7 +87,7 @@ ClusterVCToken::alloc() /////////////////////////////////////////// IOBufferBlock * -clone_IOBufferBlockList(IOBufferBlock * b, int start_off, int n, IOBufferBlock ** b_tail) +clone_IOBufferBlockList(IOBufferBlock *b, int start_off, int n, IOBufferBlock **b_tail) { //////////////////////////////////////////////////////////////// // Create a clone list of IOBufferBlock(s) where the sum @@ -112,7 +112,6 @@ clone_IOBufferBlockList(IOBufferBlock * b, int start_off, int n, IOBufferBlock * bclone->next = bsrc->clone(); bclone = bclone->next; } else { - // Skip bytes already processed if (bytes_to_skip) { bytes_to_skip -= bsrc->read_avail(); @@ -149,7 +148,7 @@ clone_IOBufferBlockList(IOBufferBlock * b, int start_off, int n, IOBufferBlock * } IOBufferBlock * -consume_IOBufferBlockList(IOBufferBlock * b, int64_t n) +consume_IOBufferBlockList(IOBufferBlock *b, int64_t n) { IOBufferBlock *b_remainder = 0; int64_t nbytes = n; @@ -160,8 +159,8 @@ consume_IOBufferBlockList(IOBufferBlock * b, int64_t n) if (nbytes < 0) { // Consumed a partial block, clone remainder b_remainder = b->clone(); - b->fill(nbytes); // make read_avail match nbytes - b_remainder->consume(b->read_avail()); // clone for remaining bytes + b->fill(nbytes); // make read_avail match nbytes + b_remainder->consume(b->read_avail()); // clone for remaining bytes b_remainder->next = b->next; b->next = 0; nbytes = 0; @@ -177,13 +176,14 @@ consume_IOBufferBlockList(IOBufferBlock * b, int64_t n) } } ink_release_assert(nbytes == 0); - return b_remainder; // return remaining blocks + return b_remainder; // return remaining blocks } int64_t -bytes_IOBufferBlockList(IOBufferBlock * b, int64_t read_avail_bytes) +bytes_IOBufferBlockList(IOBufferBlock *b, int64_t read_avail_bytes) { - int64_t n = 0;; + int64_t n = 0; + ; while (b) { if (read_avail_bytes) { @@ -205,19 +205,19 @@ bytes_IOBufferBlockList(IOBufferBlock * b, int64_t read_avail_bytes) // Test code which mimic the network slowdown // int -partial_readv(int fd, IOVec * iov, int n_iov, int seq) +partial_readv(int fd, IOVec *iov, int n_iov, int seq) { IOVec tiov[16]; for (int i = 0; i < n_iov; i++) tiov[i] = iov[i]; int tn_iov = n_iov; int rnd = seq; - int element = rand_r((unsigned int *) &rnd); + int element = rand_r((unsigned int *)&rnd); element = element % n_iov; - int byte = rand_r((unsigned int *) &rnd); + int byte = rand_r((unsigned int *)&rnd); byte = byte % iov[element].iov_len; - int stop = rand_r((unsigned int *) &rnd); - if (!(stop % 3)) { // 33% chance + int stop = rand_r((unsigned int *)&rnd); + if (!(stop % 3)) { // 33% chance tn_iov = element + 1; tiov[element].iov_len = byte; if (!byte) @@ -237,32 +237,32 @@ partial_readv(int fd, IOVec * iov, int n_iov, int seq) // Test code which mimic the network backing up (too little buffering) // int -partial_writev(int fd, IOVec * iov, int n_iov, int seq) +partial_writev(int fd, IOVec *iov, int n_iov, int seq) { int rnd = seq; int sum = 0; int i = 0; - for (i = 0; i < n_iov; i++) { - int l = iov[i].iov_len; - int r = rand_r((unsigned int *) &rnd); - if ((r >> 4) & 1) { - l = ((unsigned int) rand_r((unsigned int *) &rnd)) % iov[i].iov_len; - if (!l) { - l = iov[i].iov_len; - } - } - ink_assert(l <= iov[i].iov_len); - fprintf(stderr, "writing %d: [%d] &%X %d of %d\n", seq, i, iov[i].iov_base, l, iov[i].iov_len); - int res = socketManager.write(fd, iov[i].iov_base, l); - if (res < 0) { - return res; - } - sum += res; - if (res != iov[i].iov_len) { - return sum; + for (i = 0; i < n_iov; i++) { + int l = iov[i].iov_len; + int r = rand_r((unsigned int *)&rnd); + if ((r >> 4) & 1) { + l = ((unsigned int)rand_r((unsigned int *)&rnd)) % iov[i].iov_len; + if (!l) { + l = iov[i].iov_len; } } - return sum; + ink_assert(l <= iov[i].iov_len); + fprintf(stderr, "writing %d: [%d] &%X %d of %d\n", seq, i, iov[i].iov_base, l, iov[i].iov_len); + int res = socketManager.write(fd, iov[i].iov_base, l); + if (res < 0) { + return res; + } + sum += res; + if (res != iov[i].iov_len) { + return sum; + } + } + return sum; } #endif // TEST_PARTIAL_WRITES @@ -316,9 +316,9 @@ dump_time_buckets() #endif // ENABLE_TIME_TRACE } -GlobalClusterPeriodicEvent::GlobalClusterPeriodicEvent():Continuation(new_ProxyMutex()) +GlobalClusterPeriodicEvent::GlobalClusterPeriodicEvent() : Continuation(new_ProxyMutex()) { - SET_HANDLER((GClusterPEHandler) & GlobalClusterPeriodicEvent::calloutEvent); + SET_HANDLER((GClusterPEHandler)&GlobalClusterPeriodicEvent::calloutEvent); } GlobalClusterPeriodicEvent::~GlobalClusterPeriodicEvent() http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/ClusterLoadMonitor.cc ---------------------------------------------------------------------- diff --git a/iocore/cluster/ClusterLoadMonitor.cc b/iocore/cluster/ClusterLoadMonitor.cc index a3a267e..6d79a0b 100644 --- a/iocore/cluster/ClusterLoadMonitor.cc +++ b/iocore/cluster/ClusterLoadMonitor.cc @@ -28,31 +28,20 @@ ****************************************************************************/ #include "P_Cluster.h" -int - ClusterLoadMonitor::cf_monitor_enabled; -int - ClusterLoadMonitor::cf_ping_message_send_msec_interval; -int - ClusterLoadMonitor::cf_num_ping_response_buckets; -int - ClusterLoadMonitor::cf_msecs_per_ping_response_bucket; -int - ClusterLoadMonitor::cf_ping_latency_threshold_msecs; -int - ClusterLoadMonitor::cf_cluster_load_compute_msec_interval; -int - ClusterLoadMonitor::cf_cluster_periodic_msec_interval; -int - ClusterLoadMonitor::cf_ping_history_buf_length; -int - ClusterLoadMonitor::cf_cluster_load_clear_duration; -int - ClusterLoadMonitor::cf_cluster_load_exceed_duration; - -ClusterLoadMonitor::ClusterLoadMonitor(ClusterHandler * ch) -:Continuation(0), ch(ch), ping_history_buf_head(0), -periodic_action(0), cluster_overloaded(0), cancel_periodic(0), -cluster_load_msg_sequence_number(0), cluster_load_msg_start_sequence_number(0) +int ClusterLoadMonitor::cf_monitor_enabled; +int ClusterLoadMonitor::cf_ping_message_send_msec_interval; +int ClusterLoadMonitor::cf_num_ping_response_buckets; +int ClusterLoadMonitor::cf_msecs_per_ping_response_bucket; +int ClusterLoadMonitor::cf_ping_latency_threshold_msecs; +int ClusterLoadMonitor::cf_cluster_load_compute_msec_interval; +int ClusterLoadMonitor::cf_cluster_periodic_msec_interval; +int ClusterLoadMonitor::cf_ping_history_buf_length; +int ClusterLoadMonitor::cf_cluster_load_clear_duration; +int ClusterLoadMonitor::cf_cluster_load_exceed_duration; + +ClusterLoadMonitor::ClusterLoadMonitor(ClusterHandler *ch) + : Continuation(0), ch(ch), ping_history_buf_head(0), periodic_action(0), cluster_overloaded(0), cancel_periodic(0), + cluster_load_msg_sequence_number(0), cluster_load_msg_start_sequence_number(0) { mutex = this->ch->mutex; SET_HANDLER(&ClusterLoadMonitor::cluster_load_periodic); @@ -69,8 +58,7 @@ cluster_load_msg_sequence_number(0), cluster_load_msg_start_sequence_number(0) ping_latency_threshold_msecs = cf_ping_latency_threshold_msecs ? cf_ping_latency_threshold_msecs : 500; Debug("cluster_monitor", "ping_latency_threshold_msecs=%d", ping_latency_threshold_msecs); - cluster_load_compute_msec_interval = - cf_cluster_load_compute_msec_interval ? cf_cluster_load_compute_msec_interval : 5000; + cluster_load_compute_msec_interval = cf_cluster_load_compute_msec_interval ? cf_cluster_load_compute_msec_interval : 5000; Debug("cluster_monitor", "cluster_load_compute_msec_interval=%d", cluster_load_compute_msec_interval); cluster_periodic_msec_interval = cf_cluster_periodic_msec_interval ? cf_cluster_periodic_msec_interval : 100; @@ -87,11 +75,11 @@ cluster_load_msg_sequence_number(0), cluster_load_msg_start_sequence_number(0) int nbytes = sizeof(int) * num_ping_response_buckets; ping_response_buckets = (int *)ats_malloc(nbytes); - memset((char *) ping_response_buckets, 0, nbytes); + memset((char *)ping_response_buckets, 0, nbytes); nbytes = sizeof(ink_hrtime) * ping_history_buf_length; ping_response_history_buf = (ink_hrtime *)ats_malloc(nbytes); - memset((char *) ping_response_history_buf, 0, nbytes); + memset((char *)ping_response_history_buf, 0, nbytes); last_ping_message_sent = HRTIME_SECONDS(0); last_cluster_load_compute = HRTIME_SECONDS(0); @@ -131,7 +119,8 @@ ClusterLoadMonitor::cancel_monitor() cancel_periodic = 1; } -bool ClusterLoadMonitor::is_cluster_overloaded() +bool +ClusterLoadMonitor::is_cluster_overloaded() { return (cluster_overloaded ? true : false); } @@ -191,11 +180,9 @@ ClusterLoadMonitor::compute_cluster_load() end = start; if (cluster_overloaded) { - end -= (cluster_load_clear_duration <= ping_history_buf_length ? - cluster_load_clear_duration : ping_history_buf_length); + end -= (cluster_load_clear_duration <= ping_history_buf_length ? cluster_load_clear_duration : ping_history_buf_length); } else { - end -= (cluster_load_exceed_duration <= ping_history_buf_length ? - cluster_load_exceed_duration : ping_history_buf_length); + end -= (cluster_load_exceed_duration <= ping_history_buf_length ? cluster_load_exceed_duration : ping_history_buf_length); } if (end < 0) end += ping_history_buf_length; @@ -218,21 +205,19 @@ ClusterLoadMonitor::compute_cluster_load() if (threshold_exceeded && (threshold_clear == 0)) cluster_overloaded = 1; } - Debug("cluster_monitor", - "[%u.%u.%u.%u] overload=%d, clear=%d, exceed=%d, latency=%d", - DOT_SEPARATED(this->ch->machine->ip), cluster_overloaded, threshold_clear, threshold_exceeded, n_bucket); + Debug("cluster_monitor", "[%u.%u.%u.%u] overload=%d, clear=%d, exceed=%d, latency=%d", DOT_SEPARATED(this->ch->machine->ip), + cluster_overloaded, threshold_clear, threshold_exceeded, n_bucket); } void ClusterLoadMonitor::note_ping_response_time(ink_hrtime response_time, int sequence_number) { #ifdef CLUSTER_TOMCAT - ProxyMutex *mutex = this->ch->mutex; // hack for stats + ProxyMutex *mutex = this->ch->mutex; // hack for stats #endif CLUSTER_SUM_DYN_STAT(CLUSTER_PING_TIME_STAT, response_time); - int bucket = (int) - (response_time / HRTIME_MSECONDS(msecs_per_ping_response_bucket)); + int bucket = (int)(response_time / HRTIME_MSECONDS(msecs_per_ping_response_bucket)); Debug("cluster_monitor_ping", "[%u.%u.%u.%u] ping: %d %d", DOT_SEPARATED(this->ch->machine->ip), bucket, sequence_number); if (bucket >= num_ping_response_buckets) @@ -241,14 +226,13 @@ ClusterLoadMonitor::note_ping_response_time(ink_hrtime response_time, int sequen } void -ClusterLoadMonitor::recv_cluster_load_msg(cluster_load_ping_msg * m) +ClusterLoadMonitor::recv_cluster_load_msg(cluster_load_ping_msg *m) { // We have received back our ping message. ink_hrtime now = ink_get_hrtime(); - if ((now >= m->send_time) - && ((m->sequence_number >= cluster_load_msg_start_sequence_number) - && (m->sequence_number < cluster_load_msg_sequence_number))) { + if ((now >= m->send_time) && + ((m->sequence_number >= cluster_load_msg_start_sequence_number) && (m->sequence_number < cluster_load_msg_sequence_number))) { // Valid message, note response time. note_ping_response_time(now - m->send_time, m->sequence_number); } @@ -263,10 +247,10 @@ ClusterLoadMonitor::cluster_load_ping_rethandler(ClusterHandler *ch, void *data, if (ch) { if (len == sizeof(struct cluster_load_ping_msg)) { struct cluster_load_ping_msg m; - memcpy((void *) &m, data, len); // unmarshal + memcpy((void *)&m, data, len); // unmarshal - if (m.monitor && (m.magicno == cluster_load_ping_msg::CL_MSG_MAGICNO) - && (m.version == cluster_load_ping_msg::CL_MSG_VERSION)) { + if (m.monitor && (m.magicno == cluster_load_ping_msg::CL_MSG_MAGICNO) && + (m.version == cluster_load_ping_msg::CL_MSG_VERSION)) { m.monitor->recv_cluster_load_msg(&m); } } @@ -282,7 +266,7 @@ ClusterLoadMonitor::send_cluster_load_msg(ink_hrtime current_time) m.sequence_number = cluster_load_msg_sequence_number++; m.send_time = current_time; - cluster_ping(ch, cluster_load_ping_rethandler, (void *) &m, sizeof(m)); + cluster_ping(ch, cluster_load_ping_rethandler, (void *)&m, sizeof(m)); } int http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/ClusterMachine.cc ---------------------------------------------------------------------- diff --git a/iocore/cluster/ClusterMachine.cc b/iocore/cluster/ClusterMachine.cc index 0756579..2dee977 100644 --- a/iocore/cluster/ClusterMachine.cc +++ b/iocore/cluster/ClusterMachine.cc @@ -65,17 +65,8 @@ create_this_cluster_machine() } ClusterMachine::ClusterMachine(char *ahostname, unsigned int aip, int aport) - : dead(false), - hostname(ahostname), - ip(aip), - cluster_port(aport), - num_connections(0), - now_connections(0), - free_connections(0), - rr_count(0), - msg_proto_major(0), - msg_proto_minor(0), - clusterHandlers(0) + : dead(false), hostname(ahostname), ip(aip), cluster_port(aport), num_connections(0), now_connections(0), free_connections(0), + rr_count(0), msg_proto_major(0), msg_proto_minor(0), clusterHandlers(0) { EThread *thread = this_ethread(); ProxyMutex *mutex = thread->mutex; @@ -88,9 +79,9 @@ ClusterMachine::ClusterMachine(char *ahostname, unsigned int aip, int aport) } hostname = ats_strdup(ahostname); - // If we are running if the manager, it the our ip address for - // clustering from the manager, so the manager can control what - // interface we cluster over. Otherwise figure it out ourselves +// If we are running if the manager, it the our ip address for +// clustering from the manager, so the manager can control what +// interface we cluster over. Otherwise figure it out ourselves #ifdef LOCAL_CLUSTER_TEST_MODE ip = inet_addr("127.0.0.1"); #else @@ -104,32 +95,29 @@ ClusterMachine::ClusterMachine(char *ahostname, unsigned int aip, int aport) Debug("cluster_note", "[Machine::Machine] Cluster IP addr: %s\n", clusterIP); ip = inet_addr(clusterIP); } else { - ink_gethostbyname_r_data data; struct hostent *r = ink_gethostbyname_r(ahostname, &data); if (!r) { Warning("unable to DNS %s: %d", ahostname, data.herrno); ip = 0; } else { - // lowest IP address - ip = (unsigned int) -1; // 0xFFFFFFFF + ip = (unsigned int)-1; // 0xFFFFFFFF for (int i = 0; r->h_addr_list[i]; i++) - if (ip > *(unsigned int *) r->h_addr_list[i]) - ip = *(unsigned int *) r->h_addr_list[i]; - if (ip == (unsigned int) -1) + if (ip > *(unsigned int *)r->h_addr_list[i]) + ip = *(unsigned int *)r->h_addr_list[i]; + if (ip == (unsigned int)-1) ip = 0; } - //ip = htonl(ip); for the alpha! + // ip = htonl(ip); for the alpha! } #endif // LOCAL_CLUSTER_TEST_MODE } else { - ip = aip; ink_gethostbyaddr_r_data data; - struct hostent *r = ink_gethostbyaddr_r((char *) &ip, sizeof(int), AF_INET, &data); + struct hostent *r = ink_gethostbyaddr_r((char *)&ip, sizeof(int), AF_INET, &data); if (r == NULL) { Alias32 x; @@ -147,7 +135,8 @@ ClusterMachine::ClusterMachine(char *ahostname, unsigned int aip, int aport) clusterHandlers = (ClusterHandler **)ats_calloc(num_connections, sizeof(ClusterHandler *)); } -ClusterHandler *ClusterMachine::pop_ClusterHandler(int no_rr) +ClusterHandler * +ClusterMachine::pop_ClusterHandler(int no_rr) { int find = 0; int64_t now = rr_count; @@ -170,28 +159,27 @@ ClusterMachine::~ClusterMachine() } struct MachineTimeoutContinuation; -typedef int (MachineTimeoutContinuation::*McTimeoutContHandler) (int, void *); -struct MachineTimeoutContinuation: public Continuation -{ +typedef int (MachineTimeoutContinuation::*McTimeoutContHandler)(int, void *); +struct MachineTimeoutContinuation : public Continuation { ClusterMachine *m; - int dieEvent(int event, Event * e) + int + dieEvent(int event, Event *e) { - (void) event; - (void) e; + (void)event; + (void)e; delete m; delete this; return EVENT_DONE; } - MachineTimeoutContinuation(ClusterMachine * am) - : Continuation(NULL), m(am) + MachineTimeoutContinuation(ClusterMachine *am) : Continuation(NULL), m(am) { - SET_HANDLER((McTimeoutContHandler) & MachineTimeoutContinuation::dieEvent); + SET_HANDLER((McTimeoutContHandler)&MachineTimeoutContinuation::dieEvent); } }; void -free_ClusterMachine(ClusterMachine * m) +free_ClusterMachine(ClusterMachine *m) { EThread *thread = this_ethread(); ProxyMutex *mutex = thread->mutex; @@ -202,7 +190,7 @@ free_ClusterMachine(ClusterMachine * m) } void -free_MachineList(MachineList * l) +free_MachineList(MachineList *l) { new_Freer(l, MACHINE_TIMEOUT); } @@ -238,14 +226,14 @@ read_MachineList(const char *filename, int afd) goto Lfail; *port++ = 0; l->machine[i].ip = inet_addr(line); - if (-1 == (int) l->machine[i].ip) { + if (-1 == (int)l->machine[i].ip) { if (afd == -1) { Warning("read machine list failure, bad ip, line %d", ln); return NULL; } else { char s[256]; snprintf(s, sizeof s, "bad ip, line %d", ln); - return (MachineList *) ats_strdup(s); + return (MachineList *)ats_strdup(s); } } l->machine[i].port = atoi(port); @@ -261,7 +249,7 @@ read_MachineList(const char *filename, int afd) } else { char s[256]; snprintf(s, sizeof s, "bad port, line %d", ln); - return (MachineList *) ats_strdup(s); + return (MachineList *)ats_strdup(s); } } } @@ -277,8 +265,8 @@ read_MachineList(const char *filename, int afd) return NULL; } else ats_free(l); - return (MachineList *) ats_strdup("number of machines does not match length of list\n"); + return (MachineList *)ats_strdup("number of machines does not match length of list\n"); } } - return (afd != -1) ? (MachineList *) NULL : l; + return (afd != -1) ? (MachineList *)NULL : l; }
