Updated Branches: refs/heads/master f47c6be30 -> 3a903f2bc
TS-2201: split drainIncomingChannel two thread, one handle Broadcast message and other handle Reliable(TCP) request. Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/3a903f2b Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/3a903f2b Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/3a903f2b Branch: refs/heads/master Commit: 3a903f2bc69c0fb5623fa212f2f1ef64b5455c8b Parents: f47c6be Author: Chen Bin <[email protected]> Authored: Tue Sep 17 11:51:11 2013 +0800 Committer: Chen Bin <[email protected]> Committed: Tue Sep 17 11:51:11 2013 +0800 ---------------------------------------------------------------------- CHANGES | 3 ++ mgmt/cluster/ClusterCom.cc | 109 +++++++++++++++++++++++++++------------- 2 files changed, 77 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3a903f2b/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index e9c056d..f2e9fd4 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,9 @@ -*- coding: utf-8 -*- Changes with Apache Traffic Server 4.1.0 + *) [TS-2201] split drainIncomingChannel two thread, one handle Broadcast message and other handle Reliable(TCP) + request for supporing large cluster. + *) [TS-2144] Avoid race on e.g. "traffic_server -Cclear" which would crash the process intermittently. http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3a903f2b/mgmt/cluster/ClusterCom.cc ---------------------------------------------------------------------- diff --git a/mgmt/cluster/ClusterCom.cc b/mgmt/cluster/ClusterCom.cc index 430e01b..fa173cc 100644 --- a/mgmt/cluster/ClusterCom.cc +++ b/mgmt/cluster/ClusterCom.cc @@ -48,6 +48,76 @@ int MultiCastMessages = 0; long LastHighestDelta = -1L; + +void * +drainIncomingChannel_broadcast(void *arg) +{ + char message[61440]; + fd_set fdlist; + void *ret = arg; + + time_t t; + time_t last_multicast_receive_time = time(NULL); + struct timeval tv; + + /* Avert race condition, thread spun during constructor */ + while (!lmgmt->ccom || !lmgmt->ccom->init) { + mgmt_sleep_sec(1); + } + + lmgmt->syslogThrInit(); + + for (;;) { /* Loop draining mgmt network channels */ + // linux: set tv.tv_set in select() loop, since linux's select() + // will update tv with the amount of time not slept (most other + // implementations do not do this) + tv.tv_sec = lmgmt->ccom->mc_poll_timeout; // interface not-responding timeout + tv.tv_usec = 0; + + memset(message, 0, 61440); + FD_ZERO(&fdlist); + + if (lmgmt->ccom->cluster_type != NO_CLUSTER) { + if (lmgmt->ccom->receive_fd > 0) { + FD_SET(lmgmt->ccom->receive_fd, &fdlist); /* Multicast fd */ + } + } + + mgmt_select(FD_SETSIZE, &fdlist, NULL, NULL, &tv); + + if (lmgmt->ccom->cluster_type != NO_CLUSTER) { + // Multicast timeout considerations + if ((lmgmt->ccom->receive_fd < 0) || !FD_ISSET(lmgmt->ccom->receive_fd, &fdlist)) { + t = time(NULL); + if ((t - last_multicast_receive_time) > (tv.tv_sec - 1)) { + // Timeout on multicast receive channel, reset channel. + if (lmgmt->ccom->receive_fd > 0) { + close(lmgmt->ccom->receive_fd); + } + lmgmt->ccom->receive_fd = -1; + Debug("ccom", "Timeout, resetting multicast receive channel"); + if (lmgmt->ccom->establishReceiveChannel(0)) { + Debug("ccom", "establishReceiveChannel failed"); + lmgmt->ccom->receive_fd = -1; + } + last_multicast_receive_time = t; // next action at next interval + } + } else { + last_multicast_receive_time = time(NULL); // valid multicast msg + } + } + + /* Broadcast message */ + if (lmgmt->ccom->cluster_type != NO_CLUSTER && + lmgmt->ccom->receive_fd > 0 && + FD_ISSET(lmgmt->ccom->receive_fd, &fdlist) && + (lmgmt->ccom->receiveIncomingMessage(message, 61440) > 0)) { + lmgmt->ccom->handleMultiCastMessage(message); + } + } + return ret; +} /* End drainIncomingChannel */ + /* * drainIncomingChannel * This function is blocking, it never returns. It is meant to allow for @@ -89,8 +159,6 @@ drainIncomingChannel(void *arg) // to reopen the channel (e.g. opening the socket would fail if the // interface was down). In this case, the ccom->receive_fd is set // to '-1' and the open is retried until it succeeds. - time_t t; - time_t last_multicast_receive_time = time(NULL); struct timeval tv; /* Avert race condition, thread spun during constructor */ @@ -111,43 +179,12 @@ drainIncomingChannel(void *arg) FD_ZERO(&fdlist); if (lmgmt->ccom->cluster_type != NO_CLUSTER) { - if (lmgmt->ccom->receive_fd > 0) { - FD_SET(lmgmt->ccom->receive_fd, &fdlist); /* Multicast fd */ - } FD_SET(lmgmt->ccom->reliable_server_fd, &fdlist); /* TCP Server fd */ } mgmt_select(FD_SETSIZE, &fdlist, NULL, NULL, &tv); - if (lmgmt->ccom->cluster_type != NO_CLUSTER) { - // Multicast timeout considerations - if ((lmgmt->ccom->receive_fd < 0) || !FD_ISSET(lmgmt->ccom->receive_fd, &fdlist)) { - t = time(NULL); - if ((t - last_multicast_receive_time) > (tv.tv_sec - 1)) { - // Timeout on multicast receive channel, reset channel. - if (lmgmt->ccom->receive_fd > 0) { - close(lmgmt->ccom->receive_fd); - } - lmgmt->ccom->receive_fd = -1; - Debug("ccom", "Timeout, resetting multicast receive channel"); - if (lmgmt->ccom->establishReceiveChannel(0)) { - Debug("ccom", "establishReceiveChannel failed"); - lmgmt->ccom->receive_fd = -1; - } - last_multicast_receive_time = t; // next action at next interval - } - } else { - last_multicast_receive_time = time(NULL); // valid multicast msg - } - } - - /* Broadcast message */ - if (lmgmt->ccom->cluster_type != NO_CLUSTER && - lmgmt->ccom->receive_fd > 0 && - FD_ISSET(lmgmt->ccom->receive_fd, &fdlist) && - (lmgmt->ccom->receiveIncomingMessage(message, 61440) > 0)) { - lmgmt->ccom->handleMultiCastMessage(message); - } else if (FD_ISSET(lmgmt->ccom->reliable_server_fd, &fdlist)) { + if (FD_ISSET(lmgmt->ccom->reliable_server_fd, &fdlist)) { /* Reliable(TCP) request */ int clilen = sizeof(cli_addr); int req_fd = mgmt_accept(lmgmt->ccom->reliable_server_fd, (struct sockaddr *) &cli_addr, &clilen); @@ -442,8 +479,10 @@ ClusterCom::ClusterCom(unsigned long oip, char *host, int mcport, char *group, i peers = ink_hash_table_create(InkHashTableKeyType_String); mismatchLog = ink_hash_table_create(InkHashTableKeyType_String); - if (cluster_type != NO_CLUSTER) + if (cluster_type != NO_CLUSTER) { + ink_thread_create(drainIncomingChannel_broadcast, 0); /* Spin drainer thread */ ink_thread_create(drainIncomingChannel, 0); /* Spin drainer thread */ + } return; } /* End ClusterCom::ClusterCom */
