This is an automated email from the ASF dual-hosted git repository. jpeach pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit cd4d6b4ddf475c2b019cb57100765cdbde9ca786 Author: James Peach <[email protected]> AuthorDate: Sun May 21 11:09:08 2017 -0700 Refactor ProcessManager to stop the poll thread. Clean up the ProcessManager so remove use of mgmt_log. Refactor to improve comment and code legibility. Move the code to tear down the message queue from the destructor into the stop() function, and capture the poll thread so that we can join it and stop the manager relatively gracefully. --- mgmt/ProcessManager.cc | 199 +++++++++++++++++++++++++++++++++---------------- mgmt/ProcessManager.h | 50 ++++--------- 2 files changed, 151 insertions(+), 98 deletions(-) diff --git a/mgmt/ProcessManager.cc b/mgmt/ProcessManager.cc index 128711f..7c5c4ea 100644 --- a/mgmt/ProcessManager.cc +++ b/mgmt/ProcessManager.cc @@ -36,19 +36,56 @@ */ inkcoreapi ProcessManager *pmgmt = nullptr; +void +ProcessManager::start(std::function<void()> const &cb) +{ + Debug("pmgmt", "starting process manager"); + + init = cb; + + ink_release_assert(running == 0); + ink_atomic_increment(&running, 1); + poll_thread = ink_thread_create(processManagerThread, NULL, 0, 0, NULL); +} + +void +ProcessManager::stop() +{ + Debug("pmgmt", "stopping process manager"); + + ink_release_assert(running == 1); + ink_atomic_decrement(&running, 1); + + int tmp = local_manager_sockfd; + + local_manager_sockfd = -1; + close_socket(tmp); + + ink_thread_join(poll_thread); + poll_thread = ink_thread_null(); + + while (!queue_is_empty(mgmt_signal_queue)) { + char *sig = (char *)dequeue(mgmt_signal_queue); + ats_free(sig); + } + + ats_free(mgmt_signal_queue); +} + /* - * startProcessManager(...) + * processManagerThread(...) * The start function and thread loop for the process manager. */ void * -startProcessManager(void *arg) +ProcessManager::processManagerThread(void *arg) { void *ret = arg; while (!pmgmt) { /* Avert race condition, thread spun during constructor */ - Debug("pmgmt", "[startProcessManager] Waiting for initialization of object..."); + Debug("pmgmt", "waiting for initialization"); mgmt_sleep_sec(1); } + if (pmgmt->require_lm) { /* Allow p. process to run w/o a lm */ pmgmt->initLMConnection(); } @@ -57,22 +94,21 @@ startProcessManager(void *arg) pmgmt->init(); } - for (;;) { - if (unlikely(shutdown_event_system == true)) { - return nullptr; - } + while (pmgmt->running) { if (pmgmt->require_lm) { pmgmt->pollLMConnection(); } + pmgmt->processEventQueue(); pmgmt->processSignalQueue(); mgmt_sleep_sec(pmgmt->timeout); } + return ret; -} /* End startProcessManager */ +} ProcessManager::ProcessManager(bool rlm) - : BaseManager(), require_lm(rlm), local_manager_sockfd(0), cbtable(nullptr), max_msgs_in_a_row(1) + : BaseManager(), require_lm(rlm), pid(getpid()), local_manager_sockfd(0), cbtable(nullptr), max_msgs_in_a_row(1) { mgmt_signal_queue = create_queue(); @@ -80,19 +116,25 @@ ProcessManager::ProcessManager(bool rlm) // Making the process_manager thread a spinning thread to start traffic server // as quickly as possible. Will reset this timeout when reconfigure() timeout = 0; - pid = getpid(); -} /* End ProcessManager::ProcessManager */ +} + +ProcessManager::~ProcessManager() +{ + if (running) { + stop(); + } +} void ProcessManager::reconfigure() { - bool found; max_msgs_in_a_row = MAX_MSGS_IN_A_ROW; - timeout = REC_readInteger("proxy.config.process_manager.timeout", &found); - ink_assert(found); - return; -} /* End ProcessManager::reconfigure */ + if (RecGetRecordInt("proxy.config.process_manager.timeout", &timeout) != REC_ERR_OKAY) { + // Default to 5sec if the timeout is unspecified. + timeout = 5; + } +} void ProcessManager::signalConfigFileChild(const char *parent, const char *child, unsigned int options) @@ -100,8 +142,9 @@ ProcessManager::signalConfigFileChild(const char *parent, const char *child, uns static const MgmtMarshallType fields[] = {MGMT_MARSHALL_STRING, MGMT_MARSHALL_STRING, MGMT_MARSHALL_INT}; MgmtMarshallInt mgmtopt = options; - size_t len = mgmt_message_length(fields, countof(fields), &parent, &child, &mgmtopt); - void *buffer = ats_malloc(len); + + size_t len = mgmt_message_length(fields, countof(fields), &parent, &child, &mgmtopt); + void *buffer = ats_malloc(len); mgmt_message_marshall(buffer, len, fields, countof(fields), &parent, &child, &mgmtopt); signalManager(MGMT_SIGNAL_CONFIG_FILE_CHILD, (const char *)buffer, len); @@ -113,8 +156,7 @@ void ProcessManager::signalManager(int msg_id, const char *data_str) { signalManager(msg_id, data_str, strlen(data_str) + 1); - return; -} /* End ProcessManager::signalManager */ +} void ProcessManager::signalManager(int msg_id, const char *data_raw, int data_len) @@ -125,10 +167,9 @@ ProcessManager::signalManager(int msg_id, const char *data_raw, int data_len) mh->msg_id = msg_id; mh->data_len = data_len; memcpy((char *)mh + sizeof(MgmtMessageHdr), data_raw, data_len); - ink_assert(enqueue(mgmt_signal_queue, mh)); - return; -} /* End ProcessManager::signalManager */ + ink_release_assert(enqueue(mgmt_signal_queue, mh)); +} bool ProcessManager::processEventQueue() @@ -138,20 +179,24 @@ ProcessManager::processEventQueue() while (!queue_is_empty(mgmt_event_queue)) { MgmtMessageHdr *mh = (MgmtMessageHdr *)dequeue(mgmt_event_queue); - Debug("pmgmt", "[ProcessManager] ==> Processing event id '%d' payload=%d", mh->msg_id, mh->data_len); + Debug("pmgmt", "processing event id '%d' payload=%d", mh->msg_id, mh->data_len); if (mh->data_len > 0) { executeMgmtCallback(mh->msg_id, (char *)mh + sizeof(MgmtMessageHdr), mh->data_len); } else { executeMgmtCallback(mh->msg_id, nullptr, 0); } + + // A shutdown message is a normal exit, so Alert rather than Fatal. if (mh->msg_id == MGMT_EVENT_SHUTDOWN) { - mgmt_fatal(0, "[ProcessManager::processEventQueue] Shutdown msg received, exiting\n"); - } /* Exit on shutdown */ + Alert("exiting on shutdown message"); + } + ats_free(mh); ret = true; } + return ret; -} /* End ProcessManager::processEventQueue */ +} bool ProcessManager::processSignalQueue() @@ -161,11 +206,10 @@ ProcessManager::processSignalQueue() while (!queue_is_empty(mgmt_signal_queue)) { MgmtMessageHdr *mh = (MgmtMessageHdr *)dequeue(mgmt_signal_queue); - Debug("pmgmt", "[ProcessManager] ==> Signalling local manager '%d'", mh->msg_id); + Debug("pmgmt", "signaling local manager with message ID %d", mh->msg_id); if (require_lm && mgmt_write_pipe(local_manager_sockfd, (char *)mh, sizeof(MgmtMessageHdr) + mh->data_len) <= 0) { - mgmt_fatal(errno, "[ProcessManager::processSignalQueue] Error writing message!"); - // ink_assert(enqueue(mgmt_signal_queue, mh)); + Fatal("error writing message: %s", strerror(errno)); } else { ats_free(mh); ret = true; @@ -173,7 +217,7 @@ ProcessManager::processSignalQueue() } return ret; -} /* End ProcessManager::processSignalQueue */ +} void ProcessManager::initLMConnection() @@ -197,16 +241,17 @@ ProcessManager::initLMConnection() #else servlen = strlen(serv_addr.sun_path) + sizeof(serv_addr.sun_family); #endif + if ((local_manager_sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { - mgmt_fatal(errno, "[ProcessManager::initLMConnection] Unable to create socket\n"); + Fatal("Unable to create socket '%s': %s", (const char *)sockpath, strerror(errno)); } if (fcntl(local_manager_sockfd, F_SETFD, FD_CLOEXEC) < 0) { - mgmt_fatal(errno, "[ProcessManager::initLMConnection] Unable to set close-on-exec\n"); + Fatal("unable to set close-on-exec flag: %s", strerror(errno)); } if ((connect(local_manager_sockfd, (struct sockaddr *)&serv_addr, servlen)) < 0) { - mgmt_fatal(errno, "[ProcessManager::initLMConnection] failed to connect management socket '%s'\n", (const char *)sockpath); + Fatal("failed to connect management socket '%s': %s", (const char *)sockpath, strerror(errno)); } data_len = sizeof(pid_t); @@ -214,63 +259,89 @@ ProcessManager::initLMConnection() mh_full->msg_id = MGMT_SIGNAL_PID; mh_full->data_len = data_len; memcpy((char *)mh_full + sizeof(MgmtMessageHdr), &(pid), data_len); + if (mgmt_write_pipe(local_manager_sockfd, (char *)mh_full, sizeof(MgmtMessageHdr) + data_len) <= 0) { - mgmt_fatal(errno, "[ProcessManager::initLMConnection] Error writing message!\n"); + Fatal("error writing message: %s", strerror(errno)); } - -} /* End ProcessManager::initLMConnection */ +} void ProcessManager::pollLMConnection() { - int res; - MgmtMessageHdr mh_hdr; - MgmtMessageHdr *mh_full; char *data_raw; // Avoid getting stuck enqueuing too many requests in a row, limit to MAX_MSGS_IN_A_ROW. int count; - for (count = 0; count < max_msgs_in_a_row; ++count) { + for (count = 0; running && count < max_msgs_in_a_row; ++count) { int num; num = mgmt_read_timeout(local_manager_sockfd, 1 /* sec */, 0 /* usec */); - if (num == 0) { /* Have nothing */ + if (num == 0) { + // Nothing but a timeout. We are done for now. break; - } else if (num > 0) { /* We have a message */ - if ((res = mgmt_read_pipe(local_manager_sockfd, (char *)&mh_hdr, sizeof(MgmtMessageHdr))) > 0) { - size_t mh_full_size = sizeof(MgmtMessageHdr) + mh_hdr.data_len; - mh_full = (MgmtMessageHdr *)ats_malloc(mh_full_size); + } - memcpy(mh_full, &mh_hdr, sizeof(MgmtMessageHdr)); - data_raw = (char *)mh_full + sizeof(MgmtMessageHdr); + if (num < 0) { + // Socket read error. + Debug("pmgmt", "socket select failed: %s", strerror(errno)); + continue; + } - if ((res = mgmt_read_pipe(local_manager_sockfd, data_raw, mh_hdr.data_len)) > 0) { - Debug("pmgmt", "[ProcessManager::pollLMConnection] Message: '%d'", mh_full->msg_id); - handleMgmtMsgFromLM(mh_full); - } else if (res < 0) { - mgmt_fatal(errno, "[ProcessManager::pollLMConnection] Error in read!"); + if (num > 0) { + // We have a message, try to read the message header. + int res = mgmt_read_pipe(local_manager_sockfd, (char *)&mh_hdr, sizeof(MgmtMessageHdr)); + + if (res < 0) { + if (running) { + // If we are no longer running, the socket might have been closed out + // from under us. At any rate, we don't care any more. + Fatal("socket read error: %s", strerror(errno)); } - ats_free(mh_full); - } else if (res < 0) { - mgmt_fatal(errno, "[ProcessManager::pollLMConnection] Error in read!"); + break; } - // handle EOF + // Handle EOF from the manager. This is normal, so we log an Alert + // rather than a Fatal. if (res == 0) { close_socket(local_manager_sockfd); - if (!shutdown_event_system) { - mgmt_fatal(0, "[ProcessManager::pollLMConnection] Lost Manager EOF!"); + Alert("exiting with EOF from process manager"); + } + + if (res > 0) { + size_t mh_full_size = sizeof(MgmtMessageHdr) + mh_hdr.data_len; + MgmtMessageHdr *mh_full = (MgmtMessageHdr *)ats_malloc(mh_full_size); + + memcpy(mh_full, &mh_hdr, sizeof(MgmtMessageHdr)); + data_raw = (char *)mh_full + sizeof(MgmtMessageHdr); + + res = mgmt_read_pipe(local_manager_sockfd, data_raw, mh_hdr.data_len); + if (res > 0) { + Debug("pmgmt", "received message ID %d", mh_full->msg_id); + handleMgmtMsgFromLM(mh_full); + } + + if (res == 0) { + close_socket(local_manager_sockfd); + Alert("exiting with EOF from process manager"); + } + + if (res < 0) { + if (running) { + // If we are no longer running, the socket might have been closed out + // from under us. At any rate, we don't care any more. + Fatal("socket read error: %s", strerror(errno)); + } } + + ats_free(mh_full); } - } else if (num < 0) { /* Error */ - mgmt_log("[ProcessManager::pollLMConnection] select failed or was interrupted (%d)\n", errno); } } - Debug("pmgmt", "[ProcessManager::pollLMConnection] enqueued %d of max %d messages in a row", count, max_msgs_in_a_row); -} /* End ProcessManager::pollLMConnection */ + Debug("pmgmt", "[%s] enqueued %d of max %d messages in a row", __func__, count, max_msgs_in_a_row); +} void ProcessManager::handleMgmtMsgFromLM(MgmtMessageHdr *mh) @@ -322,7 +393,7 @@ ProcessManager::handleMgmtMsgFromLM(MgmtMessageHdr *mh) signalMgmtEntity(MGMT_EVENT_LIFECYCLE_MESSAGE, data_raw, mh->data_len); break; default: - mgmt_log("[ProcessManager::pollLMConnection] unknown type %d\n", mh->msg_id); + Warning("received unknown message ID %d\n", mh->msg_id); break; } } diff --git a/mgmt/ProcessManager.h b/mgmt/ProcessManager.h index 7e728d4..e25b4ef 100644 --- a/mgmt/ProcessManager.h +++ b/mgmt/ProcessManager.h @@ -45,38 +45,19 @@ class ConfigUpdateCbTable; -void *startProcessManager(void *arg); class ProcessManager : public BaseManager { public: ProcessManager(bool rlm); - ~ProcessManager() - { - close_socket(local_manager_sockfd); - while (!queue_is_empty(mgmt_signal_queue)) { - char *sig = (char *)dequeue(mgmt_signal_queue); - ats_free(sig); - } - ats_free(mgmt_signal_queue); - } - - /** Start a thread for the process manager. + ~ProcessManager(); - If @a cb is set then it is called after the thread is started and before any messages are processed. - */ - void - start(std::function<void()> const &cb = std::function<void()>()) - { - init = cb; - ink_thread_create(startProcessManager, NULL, 0, 0, NULL); - } + // Start a thread for the process manager. If @a cb is set then it + // is called after the thread is started and before any messages are + // processed. + void start(std::function<void()> const &cb = std::function<void()>()); - void - stop() - { - mgmt_log("[ProcessManager::stop] Bringing down connection\n"); - close_socket(local_manager_sockfd); - } + // Stop the process manager, dropping any unprocessed messages. + void stop(); inkcoreapi void signalConfigFileChild(const char *parent, const char *child, unsigned int options); inkcoreapi void signalManager(int msg_id, const char *data_str); @@ -96,25 +77,26 @@ public: cbtable = _cbtable; } +private: bool require_lm; - time_t timeout; - + RecInt timeout; LLQ *mgmt_signal_queue; - pid_t pid; + ink_thread poll_thread = ink_thread_null(); + volatile int running = 0; + /// Thread initialization callback. /// This allows @c traffic_server and @c traffic_manager to perform different initialization in the thread. std::function<void()> init; int local_manager_sockfd; - -private: - static const int MAX_MSGS_IN_A_ROW = 10000; - ConfigUpdateCbTable *cbtable; int max_msgs_in_a_row; -}; /* End class ProcessManager */ + + static const int MAX_MSGS_IN_A_ROW = 10000; + static void *processManagerThread(void *arg); +}; inkcoreapi extern ProcessManager *pmgmt; -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
