Updated Branches: refs/heads/master 3050ce9bd -> 7b2bc024a
TS-1751 when ts have high cpu usage, cluster thread isn't balance Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/7b2bc024 Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/7b2bc024 Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/7b2bc024 Branch: refs/heads/master Commit: 7b2bc024aace7515c9638063a9c434da40b80799 Parents: 3050ce9 Author: Chen Bin <[email protected]> Authored: Tue Apr 2 13:29:21 2013 +0800 Committer: Chen Bin <[email protected]> Committed: Tue Apr 2 13:29:21 2013 +0800 ---------------------------------------------------------------------- iocore/cluster/ClusterHandlerBase.cc | 70 ++++++++++++++++++++++++---- iocore/cluster/P_ClusterHandler.h | 3 + 2 files changed, 63 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7b2bc024/iocore/cluster/ClusterHandlerBase.cc ---------------------------------------------------------------------- diff --git a/iocore/cluster/ClusterHandlerBase.cc b/iocore/cluster/ClusterHandlerBase.cc index ee8070c..1a4e2ef 100644 --- a/iocore/cluster/ClusterHandlerBase.cc +++ b/iocore/cluster/ClusterHandlerBase.cc @@ -984,7 +984,6 @@ ClusterHandler::startClusterEvent(int event, Event * e) { int proto_major = -1; int proto_minor = -1; - int failed = 0; clusteringVersion.AdjustByteOrder(); ///////////////////////////////////////////////////////////////////////// @@ -1026,6 +1025,64 @@ ClusterHandler::startClusterEvent(int event, Event * e) if (!connector) id = clusteringVersion._id & 0xffff; + machine->msg_proto_major = proto_major; + machine->msg_proto_minor = proto_minor; + + thread = eventProcessor.eventthread[ET_CLUSTER][id % eventProcessor.n_threads_for_type[ET_CLUSTER]]; + if (net_vc->thread == thread) { + cluster_connect_state = CLCON_CONN_BIND_OK; + break; + } else { + cluster_connect_state = ClusterHandler::CLCON_CONN_BIND_CLEAR; + thread->schedule_in(this, CLUSTER_PERIOD); + return EVENT_DONE; + } + } + + case ClusterHandler::CLCON_CONN_BIND_CLEAR: + { + // + UnixNetVConnection *vc = (UnixNetVConnection *)net_vc; + MUTEX_TRY_LOCK(lock, vc->nh->mutex, e->ethread); + MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread); + if (lock && lock1) { + vc->ep.stop(); + vc->nh->open_list.remove(vc); + vc->nh = NULL; + vc->thread = NULL; + cluster_connect_state = ClusterHandler::CLCON_CONN_BIND; + } else { + thread->schedule_in(this, CLUSTER_PERIOD); + return EVENT_DONE; + } + } + + case ClusterHandler::CLCON_CONN_BIND: + { + // + NetHandler *nh = get_NetHandler(e->ethread); + UnixNetVConnection *vc = (UnixNetVConnection *)net_vc; + MUTEX_TRY_LOCK(lock, nh->mutex, e->ethread); + MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread); + if (lock && lock1) { + vc->nh = nh; + vc->nh->open_list.enqueue(vc); + vc->thread = e->ethread; + PollDescriptor *pd = get_PollDescriptor(e->ethread); + if (vc->ep.start(pd, vc, EVENTIO_READ|EVENTIO_WRITE) < 0) { + cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT; + break; // goto next state + } + } else { + thread->schedule_in(this, CLUSTER_PERIOD); + return EVENT_DONE; + } + } + + case ClusterHandler::CLCON_CONN_BIND_OK: + { + int failed = 0; + // include this node into the cluster configuration MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread()); MachineList *cc = the_cluster_config(); @@ -1068,8 +1125,6 @@ failed: } this->needByteSwap = !clusteringVersion.NativeByteOrder(); - machine->msg_proto_major = proto_major; - machine->msg_proto_minor = proto_minor; #ifdef NON_MODULAR machine_online_APIcallout(ip); #endif @@ -1083,7 +1138,7 @@ failed: Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d", DOT_SEPARATED(ip), id, clusteringVersion._major, clusteringVersion._minor); #endif - thread = e->ethread; + read_vcs = NEW((new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_read_link>[CLUSTER_BUCKETS])); write_vcs = NEW((new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_write_link>[CLUSTER_BUCKETS])); SET_HANDLER((ClusterContHandler) & ClusterHandler::beginClusterEvent); @@ -1092,12 +1147,7 @@ failed: read.do_iodone_event = true; write.do_iodone_event = true; -#ifdef CLUSTER_IMMEDIATE_NETIO - e->schedule_every(-CLUSTER_PERIOD); // Negative event -#else - e->schedule_every(-CLUSTER_PERIOD); -#endif - cluster_periodic_event = e; + cluster_periodic_event = thread->schedule_every(this, -CLUSTER_PERIOD); // Startup the periodic events to process entries in // external_incoming_control. http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7b2bc024/iocore/cluster/P_ClusterHandler.h ---------------------------------------------------------------------- diff --git a/iocore/cluster/P_ClusterHandler.h b/iocore/cluster/P_ClusterHandler.h index 979de00..ec0fb22 100644 --- a/iocore/cluster/P_ClusterHandler.h +++ b/iocore/cluster/P_ClusterHandler.h @@ -465,6 +465,9 @@ struct ClusterHandler:public ClusterHandlerBase CLCON_READ_MSG, CLCON_READ_MSG_COMPLETE, CLCON_VALIDATE_MSG, + CLCON_CONN_BIND_CLEAR, + CLCON_CONN_BIND, + CLCON_CONN_BIND_OK, CLCON_ABORT_CONNECT, CLCON_DELETE_CONNECT } clcon_state_t;
