http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/ClusterConfig.cc ---------------------------------------------------------------------- diff --git a/iocore/cluster/ClusterConfig.cc b/iocore/cluster/ClusterConfig.cc index 68bee6b..cac0014 100644 --- a/iocore/cluster/ClusterConfig.cc +++ b/iocore/cluster/ClusterConfig.cc @@ -32,13 +32,8 @@ int cluster_port = DEFAULT_CLUSTER_PORT_NUMBER; ClusterAccept::ClusterAccept(int *port, int send_bufsize, int recv_bufsize) - : Continuation(0), - p_cluster_port(port), - socket_send_bufsize(send_bufsize), - socket_recv_bufsize(recv_bufsize), - current_cluster_port(-1), - accept_action(0), - periodic_event(0) + : Continuation(0), p_cluster_port(port), socket_send_bufsize(send_bufsize), socket_recv_bufsize(recv_bufsize), + current_cluster_port(-1), accept_action(0), periodic_event(0) { mutex = new_ProxyMutex(); SET_HANDLER(&ClusterAccept::ClusterAcceptEvent); @@ -86,54 +81,50 @@ int ClusterAccept::ClusterAcceptEvent(int event, void *data) { switch (event) { - case EVENT_IMMEDIATE: - { - ShutdownDelete(); - return EVENT_DONE; - } - case EVENT_INTERVAL: - { - int cluster_port = *p_cluster_port; - - if (cluster_port != current_cluster_port) { - // Configuration changed cluster port, redo accept on new port. - if (accept_action) { - accept_action->cancel(); - accept_action = 0; - } + case EVENT_IMMEDIATE: { + ShutdownDelete(); + return EVENT_DONE; + } + case EVENT_INTERVAL: { + int cluster_port = *p_cluster_port; + + if (cluster_port != current_cluster_port) { + // Configuration changed cluster port, redo accept on new port. + if (accept_action) { + accept_action->cancel(); + accept_action = 0; + } - NetProcessor::AcceptOptions opt; - opt.recv_bufsize = socket_recv_bufsize; - opt.send_bufsize = socket_send_bufsize; - opt.etype = ET_CLUSTER; - opt.local_port = cluster_port; - opt.ip_family = AF_INET; - opt.localhost_only = false; - - accept_action = netProcessor.main_accept(this, NO_FD, opt); - if (!accept_action) { - Warning("Unable to accept cluster connections on port: %d", cluster_port); - } else { - current_cluster_port = cluster_port; - } + NetProcessor::AcceptOptions opt; + opt.recv_bufsize = socket_recv_bufsize; + opt.send_bufsize = socket_send_bufsize; + opt.etype = ET_CLUSTER; + opt.local_port = cluster_port; + opt.ip_family = AF_INET; + opt.localhost_only = false; + + accept_action = netProcessor.main_accept(this, NO_FD, opt); + if (!accept_action) { + Warning("Unable to accept cluster connections on port: %d", cluster_port); + } else { + current_cluster_port = cluster_port; } - return EVENT_CONT; - } - case NET_EVENT_ACCEPT: - { - ClusterAcceptMachine((NetVConnection *) data); - return EVENT_DONE; - } - default: - { - Warning("ClusterAcceptEvent: received unknown event %d", event); - return EVENT_DONE; } - } // End of switch + return EVENT_CONT; + } + case NET_EVENT_ACCEPT: { + ClusterAcceptMachine((NetVConnection *)data); + return EVENT_DONE; + } + default: { + Warning("ClusterAcceptEvent: received unknown event %d", event); + return EVENT_DONE; + } + } // End of switch } int -ClusterAccept::ClusterAcceptMachine(NetVConnection * NetVC) +ClusterAccept::ClusterAcceptMachine(NetVConnection *NetVC) { // Validate remote IP address. unsigned int remote_ip = NetVC->get_remote_ip(); @@ -155,7 +146,7 @@ ClusterAccept::ClusterAcceptMachine(NetVConnection * NetVC) } static void -make_cluster_connections(MachineList * l) +make_cluster_connections(MachineList *l) { // // Connect to all new machines. @@ -179,8 +170,7 @@ make_cluster_connections(MachineList * l) } int -machine_config_change(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data, - void *cookie) +machine_config_change(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data, void *cookie) { // Handle changes to the cluster.config or machines.config // file. cluster.config is the list of machines in the @@ -189,11 +179,11 @@ machine_config_change(const char * /* name ATS_UNUSED */, RecDataT /* data_type // This may include front-end load redirectors, machines going // up or coming down etc. // - char *filename = (char *) data.rec_string; + char *filename = (char *)data.rec_string; MachineList *l = read_MachineList(filename); MachineList *old = NULL; #ifdef USE_SEPARATE_MACHINE_CONFIG - switch ((int) cookie) { + switch ((int)cookie) { case MACHINE_CONFIG: old = machines_config; machines_config = l; @@ -205,7 +195,7 @@ machine_config_change(const char * /* name ATS_UNUSED */, RecDataT /* data_type break; } #else - (void) cookie; + (void)cookie; old = cluster_config; machines_config = l; cluster_config = l; @@ -229,7 +219,7 @@ do_machine_config_change(void *d, const char *s) /*************************************************************************/ // ClusterConfiguration member functions (Public Class) /*************************************************************************/ -ClusterConfiguration::ClusterConfiguration():n_machines(0), changed(0) +ClusterConfiguration::ClusterConfiguration() : n_machines(0), changed(0) { memset(machines, 0, sizeof(machines)); memset(hash_table, 0, sizeof(hash_table)); @@ -239,40 +229,39 @@ ClusterConfiguration::ClusterConfiguration():n_machines(0), changed(0) // ConfigurationContinuation member functions (Internal Class) /*************************************************************************/ struct ConfigurationContinuation; -typedef int (ConfigurationContinuation::*CfgContHandler) (int, void *); -struct ConfigurationContinuation: public Continuation -{ +typedef int (ConfigurationContinuation::*CfgContHandler)(int, void *); +struct ConfigurationContinuation : public Continuation { ClusterConfiguration *c; ClusterConfiguration *prev; int - zombieEvent(int /* event ATS_UNUSED */, Event * e) + zombieEvent(int /* event ATS_UNUSED */, Event *e) { - prev->link.next = NULL; // remove that next pointer - SET_HANDLER((CfgContHandler) & ConfigurationContinuation::dieEvent); + prev->link.next = NULL; // remove that next pointer + SET_HANDLER((CfgContHandler)&ConfigurationContinuation::dieEvent); e->schedule_in(CLUSTER_CONFIGURATION_ZOMBIE); return EVENT_CONT; } int - dieEvent(int event, Event * e) + dieEvent(int event, Event *e) { - (void) event; - (void) e; + (void)event; + (void)e; delete c; delete this; return EVENT_DONE; } - ConfigurationContinuation(ClusterConfiguration * cc, ClusterConfiguration * aprev) - : Continuation(NULL), c(cc), prev(aprev) { + ConfigurationContinuation(ClusterConfiguration *cc, ClusterConfiguration *aprev) : Continuation(NULL), c(cc), prev(aprev) + { mutex = new_ProxyMutex(); - SET_HANDLER((CfgContHandler) & ConfigurationContinuation::zombieEvent); + SET_HANDLER((CfgContHandler)&ConfigurationContinuation::zombieEvent); } }; static void -free_configuration(ClusterConfiguration * c, ClusterConfiguration * prev) +free_configuration(ClusterConfiguration *c, ClusterConfiguration *prev) { // // Delete the configuration after a time. @@ -286,7 +275,7 @@ free_configuration(ClusterConfiguration * c, ClusterConfiguration * prev) } ClusterConfiguration * -configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m) +configuration_add_machine(ClusterConfiguration *c, ClusterMachine *m) { // Build a new cluster configuration with the new machine. // Machines are stored in ip sorted order. @@ -318,7 +307,7 @@ configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m) ink_assert(cc->n_machines < CLUSTER_MAX_MACHINES); build_cluster_hash_table(cc); - INK_MEMORY_BARRIER; // commit writes before freeing old hash table + INK_MEMORY_BARRIER; // commit writes before freeing old hash table CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT); free_configuration(c, cc); @@ -326,7 +315,7 @@ configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m) } ClusterConfiguration * -configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m) +configuration_remove_machine(ClusterConfiguration *c, ClusterMachine *m) { EThread *thread = this_ethread(); ProxyMutex *mutex = thread->mutex; @@ -349,7 +338,7 @@ configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m) cc->changed = ink_get_hrtime(); build_cluster_hash_table(cc); - INK_MEMORY_BARRIER; // commit writes before freeing old hash table + INK_MEMORY_BARRIER; // commit writes before freeing old hash table CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT); free_configuration(c, cc); @@ -365,16 +354,14 @@ configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m) // owner (machine now as opposed to in the past). // ClusterMachine * -cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine ** past_probes) +cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine **past_probes) { #ifdef CLUSTER_TOMCAT if (!cache_clustering_enabled) return NULL; #endif - ClusterConfiguration * - cc = this_cluster()->current_configuration(); - ClusterConfiguration * - next_cc = cc; + ClusterConfiguration *cc = this_cluster()->current_configuration(); + ClusterConfiguration *next_cc = cc; ink_hrtime now = ink_get_hrtime(); int fake_probe_depth = 0; int &probe_depth = pprobe_depth ? (*pprobe_depth) : fake_probe_depth; @@ -413,14 +400,12 @@ cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine ** continue; } - ClusterMachine * - m = cc->machine_hash(hash); + ClusterMachine *m = cc->machine_hash(hash); // If it is not this machine, or a machine we have done before // and one that is still up, try again // - bool - ok = !(m == this_cluster_machine() || (past_probes && machine_in_vector(m, past_probes, probe_depth)) || m->dead); + bool ok = !(m == this_cluster_machine() || (past_probes && machine_in_vector(m, past_probes, probe_depth)) || m->dead); // Store the all but the last probe, so that we never return // the same machine @@ -431,7 +416,7 @@ cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine ** if (!ok) { if (!pprobe_depth) - break; // don't go down if we don't have a depth + break; // don't go down if we don't have a depth continue; } @@ -447,9 +432,9 @@ cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine ** // stored in the ClusterMachine structures // void -initialize_thread_for_cluster(EThread * e) +initialize_thread_for_cluster(EThread *e) { - (void) e; + (void)e; } /*************************************************************************/
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/ClusterHandler.cc ---------------------------------------------------------------------- diff --git a/iocore/cluster/ClusterHandler.cc b/iocore/cluster/ClusterHandler.cc index afb0156..979bc8d 100644 --- a/iocore/cluster/ClusterHandler.cc +++ b/iocore/cluster/ClusterHandler.cc @@ -55,15 +55,15 @@ static int dump_msgs = 0; // VERIFY_PETERS_DATA support code ///////////////////////////////////////// #ifdef VERIFY_PETERS_DATA -#define DO_VERIFY_PETERS_DATA(_p,_l) verify_peters_data(_p,_l) +#define DO_VERIFY_PETERS_DATA(_p, _l) verify_peters_data(_p, _l) #else -#define DO_VERIFY_PETERS_DATA(_p,_l) +#define DO_VERIFY_PETERS_DATA(_p, _l) #endif void verify_peters_data(char *ap, int l) { - unsigned char *p = (unsigned char *) ap; + unsigned char *p = (unsigned char *)ap; for (int i = 0; i < l - 1; i++) { unsigned char x1 = p[i]; unsigned char x2 = p[i + 1]; @@ -134,71 +134,22 @@ verify_peters_data(char *ap, int l) /*************************************************************************/ ClusterHandler::ClusterHandler() - : net_vc(0), - thread(0), - ip(0), - port(0), - hostname(NULL), - machine(NULL), - ifd(-1), - id(-1), - dead(true), - downing(false), - active(false), - on_stolen_thread(false), - n_channels(0), - channels(NULL), - channel_data(NULL), - connector(false), - cluster_connect_state(ClusterHandler::CLCON_INITIAL), - needByteSwap(false), - configLookupFails(0), - cluster_periodic_event(0), - read(this, true), - write(this, false), - current_time(0), - last(0), - last_report(0), - n_since_last_report(0), - last_cluster_op_enable(0), - last_trace_dump(0), - clm(0), - disable_remote_cluster_ops(0), - pw_write_descriptors_built(0), - pw_freespace_descriptors_built(0), - pw_controldata_descriptors_built(0), pw_time_expired(0), started_on_stolen_thread(false), control_message_write(false) + : net_vc(0), thread(0), ip(0), port(0), hostname(NULL), machine(NULL), ifd(-1), id(-1), dead(true), downing(false), active(false), + on_stolen_thread(false), n_channels(0), channels(NULL), channel_data(NULL), connector(false), + cluster_connect_state(ClusterHandler::CLCON_INITIAL), needByteSwap(false), configLookupFails(0), cluster_periodic_event(0), + read(this, true), write(this, false), current_time(0), last(0), last_report(0), n_since_last_report(0), + last_cluster_op_enable(0), last_trace_dump(0), clm(0), disable_remote_cluster_ops(0), pw_write_descriptors_built(0), + pw_freespace_descriptors_built(0), pw_controldata_descriptors_built(0), pw_time_expired(0), started_on_stolen_thread(false), + control_message_write(false) #ifdef CLUSTER_STATS - , - _vc_writes(0), - _vc_write_bytes(0), - _control_write_bytes(0), - _dw_missed_lock(0), - _dw_not_enabled(0), - _dw_wait_remote_fill(0), - _dw_no_active_vio(0), - _dw_not_enabled_or_no_write(0), - _dw_set_data_pending(0), - _dw_no_free_space(0), - _fw_missed_lock(0), - _fw_not_enabled(0), - _fw_wait_remote_fill(0), - _fw_no_active_vio(0), - _fw_not_enabled_or_no_read(0), - _process_read_calls(0), - _n_read_start(0), - _n_read_header(0), - _n_read_await_header(0), - _n_read_setup_descriptor(0), - _n_read_descriptor(0), - _n_read_await_descriptor(0), - _n_read_setup_data(0), - _n_read_data(0), - _n_read_await_data(0), - _n_read_post_complete(0), - _n_read_complete(0), - _process_write_calls(0), - _n_write_start(0), - _n_write_setup(0), _n_write_initiate(0), _n_write_await_completion(0), _n_write_post_complete(0), _n_write_complete(0) + , + _vc_writes(0), _vc_write_bytes(0), _control_write_bytes(0), _dw_missed_lock(0), _dw_not_enabled(0), _dw_wait_remote_fill(0), + _dw_no_active_vio(0), _dw_not_enabled_or_no_write(0), _dw_set_data_pending(0), _dw_no_free_space(0), _fw_missed_lock(0), + _fw_not_enabled(0), _fw_wait_remote_fill(0), _fw_no_active_vio(0), _fw_not_enabled_or_no_read(0), _process_read_calls(0), + _n_read_start(0), _n_read_header(0), _n_read_await_header(0), _n_read_setup_descriptor(0), _n_read_descriptor(0), + _n_read_await_descriptor(0), _n_read_setup_data(0), _n_read_data(0), _n_read_await_data(0), _n_read_post_complete(0), + _n_read_complete(0), _process_write_calls(0), _n_write_start(0), _n_write_setup(0), _n_write_initiate(0), + _n_write_await_completion(0), _n_write_post_complete(0), _n_write_complete(0) #endif { #ifdef MSG_TRACE @@ -207,26 +158,24 @@ ClusterHandler::ClusterHandler() // we need to lead by at least 1 min_priority = 1; - SET_HANDLER((ClusterContHandler) & ClusterHandler::startClusterEvent); + SET_HANDLER((ClusterContHandler)&ClusterHandler::startClusterEvent); mutex = new_ProxyMutex(); OutgoingControl oc; int n; for (n = 0; n < CLUSTER_CMSG_QUEUES; ++n) { - ink_atomiclist_init(&outgoing_control_al[n], "OutGoingControlQueue", (char *) &oc.link.next - (char *) &oc); + ink_atomiclist_init(&outgoing_control_al[n], "OutGoingControlQueue", (char *)&oc.link.next - (char *)&oc); } IncomingControl ic; - ink_atomiclist_init(&external_incoming_control, - "ExternalIncomingControlQueue", (char *) &ic.link.next - (char *) &ic); + ink_atomiclist_init(&external_incoming_control, "ExternalIncomingControlQueue", (char *)&ic.link.next - (char *)&ic); ClusterVConnection ivc; - ink_atomiclist_init(&external_incoming_open_local, - "ExternalIncomingOpenLocalQueue", (char *) &ivc.link.next - (char *) &ivc); + ink_atomiclist_init(&external_incoming_open_local, "ExternalIncomingOpenLocalQueue", (char *)&ivc.link.next - (char *)&ivc); ink_atomiclist_init(&read_vcs_ready, "ReadVcReadyQueue", offsetof(ClusterVConnection, ready_alink.next)); ink_atomiclist_init(&write_vcs_ready, "WriteVcReadyQueue", offsetof(ClusterVConnection, ready_alink.next)); - memset((char *) &callout_cont[0], 0, sizeof(callout_cont)); - memset((char *) &callout_events[0], 0, sizeof(callout_events)); + memset((char *)&callout_cont[0], 0, sizeof(callout_cont)); + memset((char *)&callout_events[0], 0, sizeof(callout_events)); } ClusterHandler::~ClusterHandler() @@ -260,11 +209,11 @@ ClusterHandler::~ClusterHandler() channel_data = NULL; } if (read_vcs) - delete[]read_vcs; + delete[] read_vcs; read_vcs = NULL; if (write_vcs) - delete[]write_vcs; + delete[] write_vcs; write_vcs = NULL; if (clm) { @@ -277,7 +226,7 @@ ClusterHandler::~ClusterHandler() } void -ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc) +ClusterHandler::close_ClusterVConnection(ClusterVConnection *vc) { // // Close down a ClusterVConnection @@ -315,7 +264,6 @@ ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc) ink_assert(!vc->write_bytes_in_transit); if (((!vc->remote_closed && !vc->have_all_data) || (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL)) && vc->ch) { - CloseMessage msg; int vers = CloseMessage::protoToVersion(vc->ch->machine->msg_proto_major); void *data; @@ -326,7 +274,7 @@ ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc) msg.status = (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL) ? FORCE_CLOSE_ON_OPEN_CHANNEL : vc->closed; msg.lerrno = vc->lerrno; msg.sequence_number = vc->token.sequence_number; - data = (void *) &msg; + data = (void *)&msg; len = sizeof(CloseMessage); } else { @@ -349,41 +297,41 @@ ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc) } inline bool -ClusterHandler::vc_ok_write(ClusterVConnection * vc) +ClusterHandler::vc_ok_write(ClusterVConnection *vc) { - return (((vc->closed > 0) - && (vc->write_list || vc->write_bytes_in_transit)) || + return (((vc->closed > 0) && (vc->write_list || vc->write_bytes_in_transit)) || (!vc->closed && vc->write.enabled && vc->write.vio.op == VIO::WRITE && vc->write.vio.buffer.writer())); } inline bool -ClusterHandler::vc_ok_read(ClusterVConnection * vc) +ClusterHandler::vc_ok_read(ClusterVConnection *vc) { return (!vc->closed && vc->read.vio.op == VIO::READ && vc->read.vio.buffer.writer()); } void -ClusterHandler::close_free_lock(ClusterVConnection * vc, ClusterVConnState * s) +ClusterHandler::close_free_lock(ClusterVConnection *vc, ClusterVConnState *s) { Ptr<ProxyMutex> m(s->vio.mutex); if (s == &vc->read) { - if ((ProxyMutex *) vc->read_locked) + if ((ProxyMutex *)vc->read_locked) MUTEX_UNTAKE_LOCK(vc->read_locked, thread); vc->read_locked = NULL; } else { - if ((ProxyMutex *) vc->write_locked) + if ((ProxyMutex *)vc->write_locked) MUTEX_UNTAKE_LOCK(vc->write_locked, thread); vc->write_locked = NULL; } close_ClusterVConnection(vc); } -bool ClusterHandler::build_data_vector(char *d, int len, bool read_flag) +bool +ClusterHandler::build_data_vector(char *d, int len, bool read_flag) { // Internal interface to general network i/o facility allowing // single vector read/write to static data buffer. - ClusterState & s = (read_flag ? read : write); + ClusterState &s = (read_flag ? read : write); ink_assert(d); ink_assert(len); ink_assert(s.iov); @@ -409,7 +357,8 @@ bool ClusterHandler::build_data_vector(char *d, int len, bool read_flag) return true; } -bool ClusterHandler::build_initial_vector(bool read_flag) +bool +ClusterHandler::build_initial_vector(bool read_flag) { // // Build initial read/write struct iovec and corresponding IOBufferData @@ -447,7 +396,7 @@ bool ClusterHandler::build_initial_vector(bool read_flag) // MIOBuffer *w; ink_hrtime now = ink_get_hrtime(); - ClusterState & s = (read_flag ? read : write); + ClusterState &s = (read_flag ? read : write); OutgoingControl *oc = s.msg.outgoing_control.head; IncomingControl *ic = incoming_control.head; int new_n_iov = 0; @@ -512,8 +461,7 @@ bool ClusterHandler::build_initial_vector(bool read_flag) // Note: We are assuming that free space descriptors follow // the data descriptors. ////////////////////////////////////////////////////////////// - for (i = 0; i<(read_flag ? ((s.msg.state>= 2) ? s.msg.count : 0) - : s.msg.count); i++) { + for (i = 0; i < (read_flag ? ((s.msg.state >= 2) ? s.msg.count : 0) : s.msg.count); i++) { if (s.msg.descriptor[i].type == CLUSTER_SEND_DATA) { /////////////////////////////////// // Control channel data @@ -533,7 +481,7 @@ bool ClusterHandler::build_initial_vector(bool read_flag) CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_RECVD_STAT); } // Mark message data as invalid - *((uint32_t *) ic->data) = UNDEFINED_CLUSTER_FUNCTION; + *((uint32_t *)ic->data) = UNDEFINED_CLUSTER_FUNCTION; incoming_control.enqueue(ic); } s.iov[new_n_iov].iov_base = 0; @@ -541,7 +489,7 @@ bool ClusterHandler::build_initial_vector(bool read_flag) s.block[new_n_iov] = ic->get_block(); to_do += s.iov[new_n_iov].iov_len; ++new_n_iov; - ic = (IncomingControl *) ic->link.next; + ic = (IncomingControl *)ic->link.next; } else { /////////////////////// // Outgoing Control @@ -552,23 +500,21 @@ bool ClusterHandler::build_initial_vector(bool read_flag) s.block[new_n_iov] = oc->get_block(); to_do += s.iov[new_n_iov].iov_len; ++new_n_iov; - oc = (OutgoingControl *) oc->link.next; + oc = (OutgoingControl *)oc->link.next; } } else { /////////////////////////////// // User channel data /////////////////////////////// - ClusterVConnection * - vc = channels[s.msg.descriptor[i].channel]; + ClusterVConnection *vc = channels[s.msg.descriptor[i].channel]; - if (VALID_CHANNEL(vc) && - (s.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { + if (VALID_CHANNEL(vc) && (s.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { if (read_flag) { ink_release_assert(!vc->initial_data_bytes); ///////////////////////////////////// // Try to get the read VIO mutex ///////////////////////////////////// - ink_release_assert(!(ProxyMutex *) vc->read_locked); + ink_release_assert(!(ProxyMutex *)vc->read_locked); #ifdef CLUSTER_TOMCAT if (!vc->read.vio.mutex || !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) @@ -616,21 +562,21 @@ bool ClusterHandler::build_initial_vector(bool read_flag) bool remote_write_fill = (vc->pending_remote_fill && vc->remote_write_block); // Sanity check, assert we have the lock if (!remote_write_fill) { - ink_assert((ProxyMutex *) vc->write_locked); + ink_assert((ProxyMutex *)vc->write_locked); } if (vc_ok_write(vc) || remote_write_fill) { if (remote_write_fill) { s.iov[new_n_iov].iov_base = 0; - ink_release_assert((int) s.msg.descriptor[i].length == bytes_IOBufferBlockList(vc->remote_write_block, 1)); + ink_release_assert((int)s.msg.descriptor[i].length == bytes_IOBufferBlockList(vc->remote_write_block, 1)); s.block[new_n_iov] = vc->remote_write_block; } else { s.iov[new_n_iov].iov_base = 0; - ink_release_assert((int) s.msg.descriptor[i].length <= vc->write_list_bytes); + ink_release_assert((int)s.msg.descriptor[i].length <= vc->write_list_bytes); s.block[new_n_iov] = vc->write_list; - vc->write_list = consume_IOBufferBlockList(vc->write_list, (int) s.msg.descriptor[i].length); - vc->write_list_bytes -= (int) s.msg.descriptor[i].length; - vc->write_bytes_in_transit += (int) s.msg.descriptor[i].length; + vc->write_list = consume_IOBufferBlockList(vc->write_list, (int)s.msg.descriptor[i].length); + vc->write_list_bytes -= (int)s.msg.descriptor[i].length; + vc->write_bytes_in_transit += (int)s.msg.descriptor[i].length; vc->write_list_tail = vc->write_list; while (vc->write_list_tail && vc->write_list_tail->next) @@ -680,8 +626,8 @@ bool ClusterHandler::build_initial_vector(bool read_flag) s.n_iov = new_n_iov; return true; - // TODO: This is apparently dead code, I added the #if 0 to avoid compiler - // warnings, but is this really intentional?? +// TODO: This is apparently dead code, I added the #if 0 to avoid compiler +// warnings, but is this really intentional?? #if 0 // Release all IOBufferBlock references. for (n = 0; n < MAX_TCOUNT; ++n) { @@ -694,22 +640,23 @@ bool ClusterHandler::build_initial_vector(bool read_flag) #endif } -bool ClusterHandler::get_read_locks() +bool +ClusterHandler::get_read_locks() { /////////////////////////////////////////////////////////////////////// // Reacquire locks for the request setup by build_initial_vector(). // We are called after each read completion prior to posting completion /////////////////////////////////////////////////////////////////////// - ClusterState & s = read; + ClusterState &s = read; int i, n; int bytes_processed; int vec_bytes_remainder; int iov_done[MAX_TCOUNT]; - memset((char *) iov_done, 0, sizeof(int) * MAX_TCOUNT); + memset((char *)iov_done, 0, sizeof(int) * MAX_TCOUNT); // Compute bytes transferred on a per vector basis - bytes_processed = s.did - s.bytes_xfered; // not including bytes in this xfer + bytes_processed = s.did - s.bytes_xfered; // not including bytes in this xfer i = -1; for (n = 0; n < s.n_iov; ++n) { @@ -719,7 +666,7 @@ bool ClusterHandler::get_read_locks() } else { iov_done[n] = s.iov[n].iov_len + bytes_processed; if (i < 0) { - i = n; // note i/o start vector + i = n; // note i/o start vector // Now at vector where last transfer started, // make considerations for the last transfer on this vector. @@ -748,25 +695,21 @@ bool ClusterHandler::get_read_locks() // the data descriptors. for (; i < s.n_iov; ++i) { - if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA) - && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) { - + if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA) && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) { // Only user channels require locks - ClusterVConnection * - vc = channels[s.msg.descriptor[i].channel]; - if (!VALID_CHANNEL(vc) || - ((s.msg.descriptor[i].sequence_number) != - CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) || !vc_ok_read(vc)) { + ClusterVConnection *vc = channels[s.msg.descriptor[i].channel]; + if (!VALID_CHANNEL(vc) || ((s.msg.descriptor[i].sequence_number) != CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) || + !vc_ok_read(vc)) { // Channel no longer valid, lock not needed since we // already have a reference to the buffer continue; } - ink_assert(!(ProxyMutex *) vc->read_locked); + ink_assert(!(ProxyMutex *)vc->read_locked); vc->read_locked = vc->read.vio.mutex; - if (vc->byte_bank_q.head - || !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) { + if (vc->byte_bank_q.head || + !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) { // Pending byte bank completions or lock acquire failure. vc->read_locked = NULL; @@ -784,7 +727,7 @@ bool ClusterHandler::get_read_locks() int64_t read_avail = vc->read_block->read_avail(); if (!vc->pending_remote_fill && read_avail) { - Debug("cluster_vc_xfer", "Deferred fill ch %d %p %" PRId64" bytes", vc->channel, vc, read_avail); + Debug("cluster_vc_xfer", "Deferred fill ch %d %p %" PRId64 " bytes", vc->channel, vc, read_avail); vc->read.vio.buffer.writer()->append_block(vc->read_block->clone()); if (complete_channel_read(read_avail, vc)) { @@ -793,34 +736,31 @@ bool ClusterHandler::get_read_locks() } } } - return true; // success + return true; // success } -bool ClusterHandler::get_write_locks() +bool +ClusterHandler::get_write_locks() { /////////////////////////////////////////////////////////////////////// // Reacquire locks for the request setup by build_initial_vector(). // We are called after the entire write completes prior to // posting completion. /////////////////////////////////////////////////////////////////////// - ClusterState & s = write; + ClusterState &s = write; int i; for (i = 0; i < s.msg.count; ++i) { - if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA) - && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) { - + if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA) && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) { // Only user channels require locks - ClusterVConnection * - vc = channels[s.msg.descriptor[i].channel]; - if (!VALID_CHANNEL(vc) || - (s.msg.descriptor[i].sequence_number) != CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { + ClusterVConnection *vc = channels[s.msg.descriptor[i].channel]; + if (!VALID_CHANNEL(vc) || (s.msg.descriptor[i].sequence_number) != CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { // Channel no longer valid, lock not needed since we // already have a reference to the buffer continue; } - ink_assert(!(ProxyMutex *) vc->write_locked); + ink_assert(!(ProxyMutex *)vc->write_locked); vc->write_locked = vc->write.vio.mutex; #ifdef CLUSTER_TOMCAT if (vc->write_locked && @@ -861,34 +801,34 @@ ClusterHandler::process_set_data_msgs() // Process small control set_data messages. ///////////////////////////////////////////// if (!read.msg.did_small_control_set_data) { - char *p = (char *) &read.msg.descriptor[read.msg.count]; + char *p = (char *)&read.msg.descriptor[read.msg.count]; char *endp = p + read.msg.control_bytes; while (p < endp) { if (needByteSwap) { - ats_swap32((uint32_t *) p); // length - ats_swap32((uint32_t *) (p + sizeof(int32_t))); // function code + ats_swap32((uint32_t *)p); // length + ats_swap32((uint32_t *)(p + sizeof(int32_t))); // function code } - int len = *(int32_t *) p; - cluster_function_index = *(uint32_t *) (p + sizeof(int32_t)); + int len = *(int32_t *)p; + cluster_function_index = *(uint32_t *)(p + sizeof(int32_t)); - if ((cluster_function_index < (uint32_t) SIZE_clusterFunction) - && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) { + if ((cluster_function_index < (uint32_t)SIZE_clusterFunction) && + (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) { clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this, p + (2 * sizeof(uint32_t)), len - sizeof(uint32_t)); // Mark message as processed. - *((uint32_t *) (p + sizeof(uint32_t))) = ~*((uint32_t *) (p + sizeof(uint32_t))); + *((uint32_t *)(p + sizeof(uint32_t))) = ~*((uint32_t *)(p + sizeof(uint32_t))); p += (2 * sizeof(uint32_t)) + (len - sizeof(uint32_t)); - p = (char *) DOUBLE_ALIGN(p); + p = (char *)DOUBLE_ALIGN(p); } else { // Reverse swap since this message will be reprocessed. if (needByteSwap) { - ats_swap32((uint32_t *) p); // length - ats_swap32((uint32_t *) (p + sizeof(int32_t))); // function code + ats_swap32((uint32_t *)p); // length + ats_swap32((uint32_t *)(p + sizeof(int32_t))); // function code } - break; // End of set_data messages + break; // End of set_data messages } } - read.msg.control_data_offset = p - (char *) &read.msg.descriptor[read.msg.count]; + read.msg.control_data_offset = p - (char *)&read.msg.descriptor[read.msg.count]; read.msg.did_small_control_set_data = 1; } ///////////////////////////////////////////// @@ -899,31 +839,29 @@ ClusterHandler::process_set_data_msgs() while (ic) { if (needByteSwap) { - ats_swap32((uint32_t *) ic->data); // function code + ats_swap32((uint32_t *)ic->data); // function code } - cluster_function_index = *((uint32_t *) ic->data); - - if ((cluster_function_index < (uint32_t) SIZE_clusterFunction) - && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) { + cluster_function_index = *((uint32_t *)ic->data); + if ((cluster_function_index < (uint32_t)SIZE_clusterFunction) && + (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) { char *p = ic->data; - clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this, - (void *) (p + sizeof(int32_t)), ic->len - sizeof(int32_t)); + clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this, (void *)(p + sizeof(int32_t)), ic->len - sizeof(int32_t)); // Reverse swap since this will be processed again for deallocation. if (needByteSwap) { - ats_swap32((uint32_t *) p); // length - ats_swap32((uint32_t *) (p + sizeof(int32_t))); // function code + ats_swap32((uint32_t *)p); // length + ats_swap32((uint32_t *)(p + sizeof(int32_t))); // function code } // Mark message as processed. // Defer dellocation until entire read is complete. - *((uint32_t *) p) = ~*((uint32_t *) p); + *((uint32_t *)p) = ~*((uint32_t *)p); - ic = (IncomingControl *) ic->link.next; + ic = (IncomingControl *)ic->link.next; } else { // Reverse swap action this message will be reprocessed. if (needByteSwap) { - ats_swap32((uint32_t *) ic->data); // function code + ats_swap32((uint32_t *)ic->data); // function code } break; } @@ -942,8 +880,8 @@ ClusterHandler::process_small_control_msgs() } ink_hrtime now = ink_get_hrtime(); - char *p = (char *) &read.msg.descriptor[read.msg.count] + read.msg.control_data_offset; - char *endp = (char *) &read.msg.descriptor[read.msg.count] + read.msg.control_bytes; + char *p = (char *)&read.msg.descriptor[read.msg.count] + read.msg.control_data_offset; + char *endp = (char *)&read.msg.descriptor[read.msg.count] + read.msg.control_bytes; while (p < endp) { ///////////////////////////////////////////////////////////////// @@ -951,15 +889,15 @@ ClusterHandler::process_small_control_msgs() // incoming queue for processing by callout threads. ///////////////////////////////////////////////////////////////// if (needByteSwap) { - ats_swap32((uint32_t *) p); // length - ats_swap32((uint32_t *) (p + sizeof(int32_t))); // function code + ats_swap32((uint32_t *)p); // length + ats_swap32((uint32_t *)(p + sizeof(int32_t))); // function code } - int len = *(int32_t *) p; + int len = *(int32_t *)p; p += sizeof(int32_t); - uint32_t cluster_function_index = *(uint32_t *) p; + uint32_t cluster_function_index = *(uint32_t *)p; ink_release_assert(cluster_function_index != SET_CHANNEL_DATA_CLUSTER_FUNCTION); - if (cluster_function_index >= (uint32_t) SIZE_clusterFunction) { + if (cluster_function_index >= (uint32_t)SIZE_clusterFunction) { Warning("1Bad cluster function index (small control)"); p += len; @@ -980,11 +918,11 @@ ClusterHandler::process_small_control_msgs() ic->len = len; ic->alloc_data(); memcpy(ic->data, p, ic->len); - SetHighBit(&ic->len); // mark as small cntl - ink_atomiclist_push(&external_incoming_control, (void *) ic); + SetHighBit(&ic->len); // mark as small cntl + ink_atomiclist_push(&external_incoming_control, (void *)ic); p += len; } - p = (char *) DOUBLE_ALIGN(p); + p = (char *)DOUBLE_ALIGN(p); } } @@ -1006,12 +944,12 @@ ClusterHandler::process_large_control_msgs() while ((ic = incoming_control.dequeue())) { if (needByteSwap) { - ats_swap32((uint32_t *) ic->data); // function code + ats_swap32((uint32_t *)ic->data); // function code } - cluster_function_index = *((uint32_t *) ic->data); + cluster_function_index = *((uint32_t *)ic->data); ink_release_assert(cluster_function_index != SET_CHANNEL_DATA_CLUSTER_FUNCTION); - if (cluster_function_index == (uint32_t) ~ SET_CHANNEL_DATA_CLUSTER_FUNCTION) { + if (cluster_function_index == (uint32_t)~SET_CHANNEL_DATA_CLUSTER_FUNCTION) { // SET_CHANNEL_DATA_CLUSTER_FUNCTION already processed. // Just do memory deallocation. @@ -1020,14 +958,13 @@ ClusterHandler::process_large_control_msgs() continue; } - if (cluster_function_index >= (uint32_t) SIZE_clusterFunction) { + if (cluster_function_index >= (uint32_t)SIZE_clusterFunction) { Warning("Bad cluster function index (large control)"); ic->freeall(); } else if (clusterFunction[cluster_function_index].ClusterFunc) { // Cluster message, process in ET_CLUSTER thread - clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)), - ic->len - sizeof(int32_t)); + clusterFunction[cluster_function_index].pfn(this, (void *)(ic->data + sizeof(int32_t)), ic->len - sizeof(int32_t)); // Deallocate memory if (!clusterFunction[cluster_function_index].fMalloced) @@ -1035,7 +972,7 @@ ClusterHandler::process_large_control_msgs() } else { // Non Cluster message, process in non ET_CLUSTER thread - ink_atomiclist_push(&external_incoming_control, (void *) ic); + ink_atomiclist_push(&external_incoming_control, (void *)ic); } } } @@ -1072,7 +1009,7 @@ ClusterHandler::process_freespace_msgs() } void -ClusterHandler::add_to_byte_bank(ClusterVConnection * vc) +ClusterHandler::add_to_byte_bank(ClusterVConnection *vc) { ByteBankDescriptor *bb_desc = ByteBankDescriptor::ByteBankDescriptor_alloc(vc->read_block); bool pending_byte_bank_completion = vc->byte_bank_q.head ? true : false; @@ -1110,31 +1047,29 @@ ClusterHandler::update_channels_read() for (i = 0; i < read.msg.count; i++) { if (read.msg.descriptor[i].type == CLUSTER_SEND_DATA && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) { ClusterVConnection *vc = channels[read.msg.descriptor[i].channel]; - if (VALID_CHANNEL(vc) && - (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { - vc->last_activity_time = current_time; // note activity time + if (VALID_CHANNEL(vc) && (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { + vc->last_activity_time = current_time; // note activity time len = read.msg.descriptor[i].length; if (!len) { continue; } - if (!vc->pending_remote_fill && vc_ok_read(vc) - && (!((ProxyMutex *) vc->read_locked) || vc->byte_bank_q.head)) { + if (!vc->pending_remote_fill && vc_ok_read(vc) && (!((ProxyMutex *)vc->read_locked) || vc->byte_bank_q.head)) { // // Byte bank active or unable to acquire lock on VC. // Move data into the byte bank and attempt delivery // at the next periodic event. // - vc->read_block->fill(len); // note bytes received + vc->read_block->fill(len); // note bytes received add_to_byte_bank(vc); } else { - if (vc->pending_remote_fill || ((ProxyMutex *) vc->read_locked && vc_ok_read(vc))) { - vc->read_block->fill(len); // note bytes received + if (vc->pending_remote_fill || ((ProxyMutex *)vc->read_locked && vc_ok_read(vc))) { + vc->read_block->fill(len); // note bytes received if (!vc->pending_remote_fill) { vc->read.vio.buffer.writer()->append_block(vc->read_block->clone()); - vc->read_block->consume(len); // note bytes moved to user + vc->read_block->consume(len); // note bytes moved to user } complete_channel_read(len, vc); } @@ -1156,7 +1091,7 @@ ClusterHandler::update_channels_read() // for message processing which cannot be done with a ET_CLUSTER thread. // int -ClusterHandler::process_incoming_callouts(ProxyMutex * m) +ClusterHandler::process_incoming_callouts(ProxyMutex *m) { ProxyMutex *mutex = m; ink_hrtime now; @@ -1170,13 +1105,12 @@ ClusterHandler::process_incoming_callouts(ProxyMutex * m) IncomingControl *ic_ext; while (true) { - ic_ext = (IncomingControl *) - ink_atomiclist_popall(&external_incoming_control); + ic_ext = (IncomingControl *)ink_atomiclist_popall(&external_incoming_control); if (!ic_ext) break; while (ic_ext) { - ic_ext_next = (IncomingControl *) ic_ext->link.next; + ic_ext_next = (IncomingControl *)ic_ext->link.next; ic_ext->link.next = NULL; local_incoming_control.push(ic_ext); ic_ext = ic_ext_next; @@ -1191,15 +1125,15 @@ ClusterHandler::process_incoming_callouts(ProxyMutex * m) // Determine if this a small control message small_control_msg = IsHighBitSet(&ic->len); - ClearHighBit(&ic->len); // Clear small msg flag bit + ClearHighBit(&ic->len); // Clear small msg flag bit if (small_control_msg) { int len = ic->len; char *p = ic->data; - uint32_t cluster_function_index = *(uint32_t *) p; + uint32_t cluster_function_index = *(uint32_t *)p; p += sizeof(uint32_t); - if (cluster_function_index < (uint32_t) SIZE_clusterFunction) { + if (cluster_function_index < (uint32_t)SIZE_clusterFunction) { //////////////////////////////// // Invoke processing function //////////////////////////////// @@ -1216,17 +1150,16 @@ ClusterHandler::process_incoming_callouts(ProxyMutex * m) } else { ink_assert(ic->len > 4); - uint32_t cluster_function_index = *(uint32_t *) ic->data; + uint32_t cluster_function_index = *(uint32_t *)ic->data; bool valid_index; - if (cluster_function_index < (uint32_t) SIZE_clusterFunction) { + if (cluster_function_index < (uint32_t)SIZE_clusterFunction) { valid_index = true; //////////////////////////////// // Invoke processing function //////////////////////////////// ink_assert(!clusterFunction[cluster_function_index].ClusterFunc); - clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)), - ic->len - sizeof(int32_t)); + clusterFunction[cluster_function_index].pfn(this, (void *)(ic->data + sizeof(int32_t)), ic->len - sizeof(int32_t)); now = ink_get_hrtime(); CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time); } else { @@ -1270,7 +1203,7 @@ ClusterHandler::update_channels_partial_read() if (already_read) { already_read -= iov_done[i]; if (already_read < 0) { - iov_done[i] = -already_read; // bytes remaining + iov_done[i] = -already_read; // bytes remaining already_read = 0; } else { iov_done[i] = 0; @@ -1295,10 +1228,9 @@ ClusterHandler::update_channels_partial_read() for (i = 0; i < read.msg.count; i++) { if (read.msg.descriptor[i].type == CLUSTER_SEND_DATA && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) { ClusterVConnection *vc = channels[read.msg.descriptor[i].channel]; - if (VALID_CHANNEL(vc) && - (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { + if (VALID_CHANNEL(vc) && (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { if (vc->pending_remote_fill || (vc_ok_read(vc) && (vc->iov_map != CLUSTER_IOV_NONE))) { - vc->last_activity_time = current_time; // note activity time + vc->last_activity_time = current_time; // note activity time ClusterVConnState *s = &vc->read; ink_assert(vc->iov_map < read.n_iov); int len = iov_done[vc->iov_map]; @@ -1318,10 +1250,10 @@ ClusterHandler::update_channels_partial_read() read_all_large_control_msgs = 1; } iov_done[vc->iov_map] = 0; - vc->read_block->fill(len); // note bytes received + vc->read_block->fill(len); // note bytes received if (!vc->pending_remote_fill) { - if ((ProxyMutex *) vc->read_locked) { + if ((ProxyMutex *)vc->read_locked) { Debug("cluster_vc_xfer", "Partial read, credit ch %d %p %d bytes", vc->channel, vc, len); s->vio.buffer.writer()->append_block(vc->read_block->clone()); if (complete_channel_read(len, vc)) { @@ -1333,7 +1265,7 @@ ClusterHandler::update_channels_partial_read() // into the byte bank. Otherwise, do nothing since // we will resume the read at this VC. - if (len == (int) read.msg.descriptor[i].length) { + if (len == (int)read.msg.descriptor[i].length) { Debug("cluster_vc_xfer", "Partial read, byte bank move ch %d %p %d bytes", vc->channel, vc, len); add_to_byte_bank(vc); } @@ -1343,7 +1275,7 @@ ClusterHandler::update_channels_partial_read() complete_channel_read(len, vc); } read.msg.descriptor[i].length -= len; - ink_assert(((int) read.msg.descriptor[i].length) >= 0); + ink_assert(((int)read.msg.descriptor[i].length) >= 0); } Debug(CL_TRACE, "partial_channel_read chan=%d len=%d", vc->channel, len); } @@ -1352,7 +1284,8 @@ ClusterHandler::update_channels_partial_read() } } -bool ClusterHandler::complete_channel_read(int len, ClusterVConnection * vc) +bool +ClusterHandler::complete_channel_read(int len, ClusterVConnection *vc) { // // We have processed a complete VC read request message for a channel, @@ -1363,22 +1296,21 @@ bool ClusterHandler::complete_channel_read(int len, ClusterVConnection * vc) if (vc->pending_remote_fill) { Debug(CL_TRACE, "complete_channel_read chan=%d len=%d", vc->channel, len); vc->initial_data_bytes += len; - ++vc->pending_remote_fill; // Note completion + ++vc->pending_remote_fill; // Note completion return (vc->closed ? false : true); } if (vc->closed) - return false; // No action if already closed + return false; // No action if already closed - ink_assert((ProxyMutex *) s->vio.mutex == (ProxyMutex *) s->vio._cont->mutex); + ink_assert((ProxyMutex *)s->vio.mutex == (ProxyMutex *)s->vio._cont->mutex); Debug("cluster_vc_xfer", "Complete read, credit ch %d %p %d bytes", vc->channel, vc, len); s->vio.ndone += len; if (s->vio.ntodo() <= 0) { s->enabled = 0; - if (cluster_signal_and_update_locked(VC_EVENT_READ_COMPLETE, vc, s) - == EVENT_DONE) + if (cluster_signal_and_update_locked(VC_EVENT_READ_COMPLETE, vc, s) == EVENT_DONE) return false; } else { if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s) == EVENT_DONE) @@ -1401,7 +1333,7 @@ ClusterHandler::finish_delayed_reads() // ClusterVConnection *vc = NULL; DLL<ClusterVConnectionBase> l; - while ((vc = (ClusterVConnection *) delayed_reads.pop())) { + while ((vc = (ClusterVConnection *)delayed_reads.pop())) { MUTEX_TRY_LOCK_SPIN(lock, vc->read.vio.mutex, thread, READ_LOCK_SPIN_COUNT); if (lock.is_locked()) { if (vc_ok_read(vc)) { @@ -1414,8 +1346,8 @@ ClusterHandler::finish_delayed_reads() // remove our self to process another byte bank completion ClusterVC_remove_read(vc); } - Debug("cluster_vc_xfer", - "Delayed read, credit ch %d %p %" PRId64" bytes", vc->channel, vc, d->get_block()->read_avail()); + Debug("cluster_vc_xfer", "Delayed read, credit ch %d %p %" PRId64 " bytes", vc->channel, vc, + d->get_block()->read_avail()); vc->read.vio.buffer.writer()->append_block(d->get_block()); if (complete_channel_read(d->get_block()->read_avail(), vc)) { @@ -1447,17 +1379,13 @@ ClusterHandler::update_channels_written() if (write.msg.descriptor[i].type == CLUSTER_SEND_DATA) { if (write.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) { ClusterVConnection *vc = channels[write.msg.descriptor[i].channel]; - if (VALID_CHANNEL(vc) && - (write.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { - + if (VALID_CHANNEL(vc) && (write.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { if (vc->pending_remote_fill) { - Debug(CL_TRACE, - "update_channels_written chan=%d seqno=%d len=%d", - write.msg.descriptor[i].channel, + Debug(CL_TRACE, "update_channels_written chan=%d seqno=%d len=%d", write.msg.descriptor[i].channel, write.msg.descriptor[i].sequence_number, write.msg.descriptor[i].length); vc->pending_remote_fill = 0; vc->remote_write_block = 0; // free data block - continue; // ignore remote write fill VC(s) + continue; // ignore remote write fill VC(s) } ClusterVConnState *s = &vc->write; @@ -1467,8 +1395,8 @@ ClusterHandler::update_channels_written() Debug(CL_PROTO, "(%d) data sent %d %" PRId64, write.msg.descriptor[i].channel, len, s->vio.ndone); if (vc_ok_write(vc)) { - vc->last_activity_time = current_time; // note activity time - int64_t ndone = vc->was_closed()? 0 : s->vio.ndone; + vc->last_activity_time = current_time; // note activity time + int64_t ndone = vc->was_closed() ? 0 : s->vio.ndone; if (ndone < vc->remote_free) { vcs_push(vc, VC_CLUSTER_WRITE); @@ -1497,7 +1425,7 @@ ClusterHandler::update_channels_written() invoke_remote_data_args *args; OutgoingControl *hdr_oc; while ((hdr_oc = write.msg.outgoing_callout.dequeue())) { - args = (invoke_remote_data_args *) (hdr_oc->data + sizeof(int32_t)); + args = (invoke_remote_data_args *)(hdr_oc->data + sizeof(int32_t)); ink_assert(args->magicno == invoke_remote_data_args::MagicNo); // Free data descriptor @@ -1524,7 +1452,7 @@ ClusterHandler::build_write_descriptors() // write (struct iovec system maximum). // int count_bucket = cur_vcs; - int tcount = write.msg.count + 2; // count + descriptor + int tcount = write.msg.count + 2; // count + descriptor int write_descriptors_built = 0; int valid; int list_len = 0; @@ -1536,7 +1464,7 @@ ClusterHandler::build_write_descriptors() vc = (ClusterVConnection *)ink_atomiclist_popall(&write_vcs_ready); while (vc) { enter_exit(&cls_build_writes_entered, &cls_writes_exited); - vc_next = (ClusterVConnection *) vc->ready_alink.next; + vc_next = (ClusterVConnection *)vc->ready_alink.next; vc->ready_alink.next = NULL; list_len++; if (VC_CLUSTER_CLOSED == vc->type) { @@ -1561,10 +1489,10 @@ ClusterHandler::build_write_descriptors() } tcount = write.msg.count + 2; - vc_next = (ClusterVConnection *) write_vcs[count_bucket].head; + vc_next = (ClusterVConnection *)write_vcs[count_bucket].head; while (vc_next) { vc = vc_next; - vc_next = (ClusterVConnection *) vc->write.link.next; + vc_next = (ClusterVConnection *)vc->write.link.next; if (VC_CLUSTER_CLOSED == vc->type) { vc->type = VC_NULL; @@ -1579,10 +1507,8 @@ ClusterHandler::build_write_descriptors() if (-1 == valid) { vcs_push(vc, VC_CLUSTER_WRITE); } else if (valid) { - ink_assert(vc->write_locked); // Acquired in valid_for_data_write() - if ((vc->remote_free > (vc->write.vio.ndone - vc->write_list_bytes)) - && channels[vc->channel] == vc) { - + ink_assert(vc->write_locked); // Acquired in valid_for_data_write() + if ((vc->remote_free > (vc->write.vio.ndone - vc->write_list_bytes)) && channels[vc->channel] == vc) { ink_assert(vc->write_list && vc->write_list_bytes); int d = write.msg.count; @@ -1630,7 +1556,7 @@ ClusterHandler::build_freespace_descriptors() // in the list. // int count_bucket = cur_vcs; - int tcount = write.msg.count + 2; // count + descriptor require 2 iovec(s) + int tcount = write.msg.count + 2; // count + descriptor require 2 iovec(s) int freespace_descriptors_built = 0; int s = 0; int list_len = 0; @@ -1642,7 +1568,7 @@ ClusterHandler::build_freespace_descriptors() vc = (ClusterVConnection *)ink_atomiclist_popall(&read_vcs_ready); while (vc) { enter_exit(&cls_build_reads_entered, &cls_reads_exited); - vc_next = (ClusterVConnection *) vc->ready_alink.next; + vc_next = (ClusterVConnection *)vc->ready_alink.next; vc->ready_alink.next = NULL; list_len++; if (VC_CLUSTER_CLOSED == vc->type) { @@ -1667,10 +1593,10 @@ ClusterHandler::build_freespace_descriptors() } tcount = write.msg.count + 2; - vc_next = (ClusterVConnection *) read_vcs[count_bucket].head; + vc_next = (ClusterVConnection *)read_vcs[count_bucket].head; while (vc_next) { vc = vc_next; - vc_next = (ClusterVConnection *) vc->read.link.next; + vc_next = (ClusterVConnection *)vc->read.link.next; if (VC_CLUSTER_CLOSED == vc->type) { vc->type = VC_NULL; @@ -1714,9 +1640,9 @@ ClusterHandler::build_controlmsg_descriptors() // write (struct iovec system maximum) and for elements already // in the list. // - int tcount = write.msg.count + 2; // count + descriptor require 2 iovec(s) + int tcount = write.msg.count + 2; // count + descriptor require 2 iovec(s) int control_msgs_built = 0; - bool compound_msg; // msg + chan data + bool compound_msg; // msg + chan data // // Build descriptors for control messages // @@ -1724,12 +1650,12 @@ ClusterHandler::build_controlmsg_descriptors() int control_bytes = 0; int q = 0; - while (tcount < (MAX_TCOUNT - 1)) { // -1 to allow for compound messages + while (tcount < (MAX_TCOUNT - 1)) { // -1 to allow for compound messages c = outgoing_control[q].pop(); if (!c) { // Move elements from global outgoing_control to local queue OutgoingControl *c_next; - c = (OutgoingControl *) ink_atomiclist_popall(&outgoing_control_al[q]); + c = (OutgoingControl *)ink_atomiclist_popall(&outgoing_control_al[q]); if (c == 0) { if (++q >= CLUSTER_CMSG_QUEUES) { break; @@ -1738,7 +1664,7 @@ ClusterHandler::build_controlmsg_descriptors() } } while (c) { - c_next = (OutgoingControl *) c->link.next; + c_next = (OutgoingControl *)c->link.next; c->link.next = NULL; outgoing_control[q].push(c); c = c_next; @@ -1746,17 +1672,17 @@ ClusterHandler::build_controlmsg_descriptors() continue; } else { - compound_msg = (*((int32_t *) c->data) == -1); // (msg+chan data)? + compound_msg = (*((int32_t *)c->data) == -1); // (msg+chan data)? } if (!compound_msg && c->len <= SMALL_CONTROL_MESSAGE && // check if the receiving cluster function will want to malloc'ed data - !clusterFunction[*(int32_t *) c->data].fMalloced && control_bytes + c->len + sizeof(int32_t) * 2 + 7 < CONTROL_DATA) { + !clusterFunction[*(int32_t *)c->data].fMalloced && control_bytes + c->len + sizeof(int32_t) * 2 + 7 < CONTROL_DATA) { write.msg.outgoing_small_control.enqueue(c); - control_bytes += c->len + sizeof(int32_t) * 2 + 7; // safe approximation + control_bytes += c->len + sizeof(int32_t) * 2 + 7; // safe approximation control_msgs_built++; - if (clusterFunction[*(int32_t *) c->data].post_pfn) { - clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len); + if (clusterFunction[*(int32_t *)c->data].post_pfn) { + clusterFunction[*(int32_t *)c->data].post_pfn(this, c->data + sizeof(int32_t), c->len); } continue; } @@ -1765,7 +1691,7 @@ ClusterHandler::build_controlmsg_descriptors() // if (compound_msg) { // Extract out components of compound message. - invoke_remote_data_args *cmhdr = (invoke_remote_data_args *) (c->data + sizeof(int32_t)); + invoke_remote_data_args *cmhdr = (invoke_remote_data_args *)(c->data + sizeof(int32_t)); OutgoingControl *oc_header = c; OutgoingControl *oc_msg = cmhdr->msg_oc; OutgoingControl *oc_data = cmhdr->data_oc; @@ -1831,7 +1757,7 @@ ClusterHandler::build_controlmsg_descriptors() oc_msg->freeall(); // Free data descriptor - oc_data->free_data(); // invoke memory free callback + oc_data->free_data(); // invoke memory free callback oc_data->mutex = 0; oc_data->freeall(); } @@ -1852,8 +1778,8 @@ ClusterHandler::build_controlmsg_descriptors() tcount++; control_msgs_built++; - if (clusterFunction[*(int32_t *) c->data].post_pfn) { - clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len); + if (clusterFunction[*(int32_t *)c->data].post_pfn) { + clusterFunction[*(int32_t *)c->data].post_pfn(this, c->data + sizeof(int32_t), c->len); } } } @@ -1866,11 +1792,11 @@ ClusterHandler::add_small_controlmsg_descriptors() // // Move small control message data to free space after descriptors // - char *p = (char *) &write.msg.descriptor[write.msg.count]; + char *p = (char *)&write.msg.descriptor[write.msg.count]; OutgoingControl *c = NULL; while ((c = write.msg.outgoing_small_control.dequeue())) { - *(int32_t *) p = c->len; + *(int32_t *)p = c->len; p += sizeof(int32_t); memcpy(p, c->data, c->len); c->free_data(); @@ -1880,9 +1806,9 @@ ClusterHandler::add_small_controlmsg_descriptors() CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - c->submit_time); LOG_EVENT_TIME(c->submit_time, cluster_send_time_dist, cluster_send_events); c->freeall(); - p = (char *) DOUBLE_ALIGN(p); + p = (char *)DOUBLE_ALIGN(p); } - write.msg.control_bytes = p - (char *) &write.msg.descriptor[write.msg.count]; + write.msg.control_bytes = p - (char *)&write.msg.descriptor[write.msg.count]; #ifdef CLUSTER_STATS _control_write_bytes += write.msg.control_bytes; @@ -1891,14 +1817,13 @@ ClusterHandler::add_small_controlmsg_descriptors() return 1; } -struct DestructorLock -{ - DestructorLock(EThread * thread) +struct DestructorLock { + DestructorLock(EThread *thread) { have_lock = false; t = thread; } - ~DestructorLock() + ~DestructorLock() { if (have_lock && m) { Mutex_unlock(m, t); @@ -1911,7 +1836,7 @@ struct DestructorLock }; int -ClusterHandler::valid_for_data_write(ClusterVConnection * vc) +ClusterHandler::valid_for_data_write(ClusterVConnection *vc) { // // Determine if writes are allowed on this VC @@ -1919,7 +1844,7 @@ ClusterHandler::valid_for_data_write(ClusterVConnection * vc) ClusterVConnState *s = &vc->write; ink_assert(!on_stolen_thread); - ink_assert((ProxyMutex *) ! vc->write_locked); + ink_assert((ProxyMutex *)!vc->write_locked); // // Attempt to get the lock, if we miss, push vc into the future @@ -1977,7 +1902,7 @@ retry: if (!lock.have_lock && s->vio.mutex && s->vio._cont) { goto retry; } else { - // No active VIO +// No active VIO #ifdef CLUSTER_STATS _dw_no_active_vio++; #endif @@ -2025,7 +1950,7 @@ retry: // // Calculate amount writable // - MIOBufferAccessor & buf = s->vio.buffer; + MIOBufferAccessor &buf = s->vio.buffer; int64_t towrite = buf.reader()->read_avail(); int64_t ntodo = s->vio.ntodo(); @@ -2061,8 +1986,7 @@ retry: if (towrite && bytes_to_fill) { consume_bytes = (towrite > bytes_to_fill) ? bytes_to_fill : towrite; - b_list = clone_IOBufferBlockList(s->vio.buffer.reader()->block, - s->vio.buffer.reader()->start_offset, consume_bytes, &b_tail); + b_list = clone_IOBufferBlockList(s->vio.buffer.reader()->block, s->vio.buffer.reader()->start_offset, consume_bytes, &b_tail); ink_assert(b_tail); // Append cloned IOBufferBlock list to VC write_list. @@ -2095,13 +2019,13 @@ retry: return 1; } else { if (!write_vc_signal && buf.writer()->write_avail() && towrite != ntodo) - cluster_signal_and_update(VC_EVENT_WRITE_READY, vc, s); + cluster_signal_and_update(VC_EVENT_WRITE_READY, vc, s); return 0; } } int -ClusterHandler::valid_for_freespace_write(ClusterVConnection * vc) +ClusterHandler::valid_for_freespace_write(ClusterVConnection *vc) { // // Determine if freespace messages are allowed on this VC @@ -2156,7 +2080,7 @@ retry: if (!lock.have_lock && s->vio.mutex && s->vio._cont) { goto retry; } else { - // No active VIO +// No active VIO #ifdef CLUSTER_STATS _fw_no_active_vio++; #endif @@ -2194,7 +2118,6 @@ retry: int64_t bytes_to_move = vc->initial_data_bytes; if (vc->read_block && bytes_to_move) { - // Push initial read data into VC if (ntodo >= bytes_to_move) { @@ -2231,8 +2154,7 @@ retry: return 0; } } - if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s) - == EVENT_DONE) + if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s) == EVENT_DONE) return false; if (s->vio.ntodo() <= 0) @@ -2269,7 +2191,7 @@ retry: } void -ClusterHandler::vcs_push(ClusterVConnection * vc, int type) +ClusterHandler::vcs_push(ClusterVConnection *vc, int type) { if (vc->type <= VC_CLUSTER) vc->type = type; @@ -2284,7 +2206,7 @@ ClusterHandler::vcs_push(ClusterVConnection * vc, int type) } int -ClusterHandler::remote_close(ClusterVConnection * vc, ClusterVConnState * ns) +ClusterHandler::remote_close(ClusterVConnection *vc, ClusterVConnState *ns) { if (ns->vio.op != VIO::NONE && !vc->closed) { ns->enabled = 0; @@ -2306,17 +2228,17 @@ ClusterHandler::remote_close(ClusterVConnection * vc, ClusterVConnState * ns) } void -ClusterHandler::steal_thread(EThread * t) +ClusterHandler::steal_thread(EThread *t) { // // Attempt to push the control message now instead of waiting // for the periodic event to process it. // - if (t != thread && // different thread to steal - write.to_do <= 0 && // currently not trying to send data + if (t != thread && // different thread to steal + write.to_do <= 0 && // currently not trying to send data // nothing big outstanding !write.msg.count) { - mainClusterEvent(CLUSTER_EVENT_STEAL_THREAD, (Event *) t); + mainClusterEvent(CLUSTER_EVENT_STEAL_THREAD, (Event *)t); } } @@ -2333,29 +2255,28 @@ ClusterHandler::free_locks(bool read_flag, int i) i = write.msg.count; } } - ClusterState & s = (read_flag ? read : write); + ClusterState &s = (read_flag ? read : write); for (int j = 0; j < i; j++) { if (s.msg.descriptor[j].type == CLUSTER_SEND_DATA && s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) { ClusterVConnection *vc = channels[s.msg.descriptor[j].channel]; if (VALID_CHANNEL(vc)) { if (read_flag) { - if ((ProxyMutex *) vc->read_locked) { + if ((ProxyMutex *)vc->read_locked) { MUTEX_UNTAKE_LOCK(vc->read.vio.mutex, thread); vc->read_locked = NULL; } } else { - if ((ProxyMutex *) vc->write_locked) { + if ((ProxyMutex *)vc->write_locked) { MUTEX_UNTAKE_LOCK(vc->write_locked, thread); vc->write_locked = NULL; } } } - } else if (!read_flag && - s.msg.descriptor[j].type == CLUSTER_SEND_FREE && + } else if (!read_flag && s.msg.descriptor[j].type == CLUSTER_SEND_FREE && s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) { ClusterVConnection *vc = channels[s.msg.descriptor[j].channel]; if (VALID_CHANNEL(vc)) { - if ((ProxyMutex *) vc->read_locked) { + if ((ProxyMutex *)vc->read_locked) { MUTEX_UNTAKE_LOCK(vc->read_locked, thread); vc->read_locked = NULL; } @@ -2401,7 +2322,7 @@ extern int CacheClusterMonitorIntervalSecs; // The main event for machine-machine link // int -ClusterHandler::mainClusterEvent(int event, Event * e) +ClusterHandler::mainClusterEvent(int event, Event *e) { // Set global time current_time = ink_get_hrtime(); @@ -2412,15 +2333,15 @@ ClusterHandler::mainClusterEvent(int event, Event * e) dump_internal_data(); } } - // - // Note: The caller always acquires the ClusterHandler mutex prior - // to the call. This guarantees single threaded access in - // mainClusterEvent() - // +// +// Note: The caller always acquires the ClusterHandler mutex prior +// to the call. This guarantees single threaded access in +// mainClusterEvent() +// - ///////////////////////////////////////////////////////////////////////// - // If cluster interconnect is overloaded, disable remote cluster ops. - ///////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////// +// If cluster interconnect is overloaded, disable remote cluster ops. +///////////////////////////////////////////////////////////////////////// #ifndef DEBUG if (clm && ClusterLoadMonitor::cf_monitor_enabled > 0) { #else @@ -2446,7 +2367,7 @@ ClusterHandler::mainClusterEvent(int event, Event * e) bool io_callback = (event == EVENT_IMMEDIATE); if (on_stolen_thread) { - thread = (EThread *) e; + thread = (EThread *)e; } else { if (io_callback) { thread = this_ethread(); @@ -2499,7 +2420,7 @@ ClusterHandler::mainClusterEvent(int event, Event * e) ///////////////////////////////////////// if (!on_stolen_thread) { if (do_open_local_requests()) - thread->signal_hook(thread); + thread->signal_hook(thread); } } @@ -2513,8 +2434,7 @@ ClusterHandler::mainClusterEvent(int event, Event * e) return EVENT_CONT; } -int -ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) +int ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) { #ifdef CLUSTER_STATS _process_read_calls++; @@ -2528,9 +2448,8 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) /////////////////////////////// for (;;) { - switch (read.state) { - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::READ_START: /////////////////////////////////////////////// { @@ -2546,7 +2465,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) } break; } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::READ_HEADER: /////////////////////////////////////////////// { @@ -2561,7 +2480,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) } break; } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::READ_AWAIT_HEADER: /////////////////////////////////////////////// { @@ -2589,11 +2508,9 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) } } else { #ifdef MSG_TRACE - fprintf(t_fd, - "[R] seqno=%d count=%d control_bytes=%d count_check=%d dsum=%d csum=%d\n", - read.sequence_number, - read.msg.hdr()->count, read.msg.hdr()->control_bytes, - read.msg.hdr()->count_check, read.msg.hdr()->descriptor_cksum, read.msg.hdr()->control_bytes_cksum); + fprintf(t_fd, "[R] seqno=%d count=%d control_bytes=%d count_check=%d dsum=%d csum=%d\n", read.sequence_number, + read.msg.hdr()->count, read.msg.hdr()->control_bytes, read.msg.hdr()->count_check, + read.msg.hdr()->descriptor_cksum, read.msg.hdr()->control_bytes_cksum); fflush(t_fd); #endif CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.did); @@ -2623,7 +2540,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) break; } } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::READ_SETUP_DESCRIPTOR: /////////////////////////////////////////////// { @@ -2637,7 +2554,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) } break; } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::READ_DESCRIPTOR: /////////////////////////////////////////////// { @@ -2652,7 +2569,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) } break; } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::READ_AWAIT_DESCRIPTOR: /////////////////////////////////////////////// { @@ -2698,7 +2615,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) break; } } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::READ_SETUP_DATA: /////////////////////////////////////////////// { @@ -2718,7 +2635,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) } break; } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::READ_DATA: /////////////////////////////////////////////// { @@ -2734,7 +2651,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) } break; } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::READ_AWAIT_DATA: /////////////////////////////////////////////// { @@ -2742,7 +2659,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) _n_read_await_data++; #endif if (!read.io_complete) { - return 0; // awaiting i/o complete + return 0; // awaiting i/o complete } else { if (read.io_complete > 0) { read.state = ClusterState::READ_POST_COMPLETE; @@ -2754,7 +2671,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) } break; } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::READ_POST_COMPLETE: /////////////////////////////////////////////// { @@ -2784,7 +2701,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) break; } } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::READ_COMPLETE: /////////////////////////////////////////////// { @@ -2801,17 +2718,17 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) free_locks(CLUSTER_READ); read.state = ClusterState::READ_START; - break; // setup next read + break; // setup next read } - ////////////////// + ////////////////// default: ////////////////// { ink_release_assert(!"ClusterHandler::process_read invalid state"); } - } // end of switch - } // end of for + } // end of switch + } // end of for } int @@ -2824,9 +2741,8 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs) // Cluster write state machine ///////////////////////////////// for (;;) { - switch (write.state) { - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::WRITE_START: /////////////////////////////////////////////// { @@ -2842,7 +2758,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs) write.state = ClusterState::WRITE_SETUP; break; } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::WRITE_SETUP: /////////////////////////////////////////////// { @@ -2867,7 +2783,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs) if (pw_freespace_descriptors_built) { pw_freespace_descriptors_built = build_freespace_descriptors(); } - add_small_controlmsg_descriptors(); // always last + add_small_controlmsg_descriptors(); // always last } else { ///////////////////////////////////////////////////////////// // Build a write descriptor only containing control data. @@ -2875,7 +2791,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs) pw_write_descriptors_built = 0; pw_freespace_descriptors_built = 0; pw_controldata_descriptors_built = build_controlmsg_descriptors(); - add_small_controlmsg_descriptors(); // always last + add_small_controlmsg_descriptors(); // always last } // If nothing to write, post write completion @@ -2887,7 +2803,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs) control_message_write = only_write_control_msgs; } - // Move required data into the message header +// Move required data into the message header #ifdef CLUSTER_MESSAGE_CKSUM write.msg.descriptor_cksum = write.msg.calc_descriptor_cksum(); write.msg.hdr()->descriptor_cksum = write.msg.descriptor_cksum; @@ -2905,7 +2821,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs) write.state = ClusterState::WRITE_INITIATE; break; } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::WRITE_INITIATE: /////////////////////////////////////////////// { @@ -2920,7 +2836,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs) } break; } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::WRITE_AWAIT_COMPLETION: /////////////////////////////////////////////// { @@ -2954,7 +2870,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs) } break; } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::WRITE_POST_COMPLETE: /////////////////////////////////////////////// { @@ -2974,7 +2890,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs) write.state = ClusterState::WRITE_COMPLETE; break; } - /////////////////////////////////////////////// + /////////////////////////////////////////////// case ClusterState::WRITE_COMPLETE: /////////////////////////////////////////////// { @@ -2990,8 +2906,8 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs) // pw_time_expired = (curtime - now) > CLUSTER_MAX_RUN_TIME; - if (!control_message_write && !pw_write_descriptors_built - && !pw_freespace_descriptors_built && !pw_controldata_descriptors_built) { + if (!control_message_write && !pw_write_descriptors_built && !pw_freespace_descriptors_built && + !pw_controldata_descriptors_built) { // skip to the next bucket cur_vcs = (cur_vcs + 1) % CLUSTER_BUCKETS; } @@ -3018,24 +2934,24 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs) } } if (pw_time_expired) { - return -1; // thread run time expired + return -1; // thread run time expired } else { if (pw_write_descriptors_built || pw_freespace_descriptors_built || pw_controldata_descriptors_built) { - break; // start another write + break; // start another write } else { - return 0; // no more data to write + return 0; // no more data to write } } } - ////////////////// + ////////////////// default: ////////////////// { ink_release_assert(!"ClusterHandler::process_write invalid state"); } - } // End of switch - } // End of for + } // End of switch + } // End of for } int @@ -3059,13 +2975,12 @@ ClusterHandler::do_open_local_requests() // move them to the local working queue while maintaining insertion order. // while (true) { - cvc_ext = (ClusterVConnection *) - ink_atomiclist_popall(&external_incoming_open_local); + cvc_ext = (ClusterVConnection *)ink_atomiclist_popall(&external_incoming_open_local); if (cvc_ext == 0) break; while (cvc_ext) { - cvc_ext_next = (ClusterVConnection *) cvc_ext->link.next; + cvc_ext_next = (ClusterVConnection *)cvc_ext->link.next; cvc_ext->link.next = NULL; local_incoming_open_local.push(cvc_ext); cvc_ext = cvc_ext_next; @@ -3089,7 +3004,7 @@ ClusterHandler::do_open_local_requests() // unable to get mutex, insert request back onto global queue. Debug(CL_TRACE, "do_open_local_requests() unable to acquire mutex (cvc=%p)", cvc); pending_request = 1; - ink_atomiclist_push(&external_incoming_open_local, (void *) cvc); + ink_atomiclist_push(&external_incoming_open_local, (void *)cvc); } } }
