This is an automated email from the ASF dual-hosted git repository.

jpeach pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git

commit cd4d6b4ddf475c2b019cb57100765cdbde9ca786
Author: James Peach <[email protected]>
AuthorDate: Sun May 21 11:09:08 2017 -0700

    Refactor ProcessManager to stop the poll thread.
    
    Clean up the ProcessManager so remove use of mgmt_log. Refactor to improve
    comment and code legibility. Move the code to tear down the message queue
    from the destructor into the stop() function, and capture the poll thread
    so that we can join it and stop the manager relatively gracefully.
---
 mgmt/ProcessManager.cc | 199 +++++++++++++++++++++++++++++++++----------------
 mgmt/ProcessManager.h  |  50 ++++---------
 2 files changed, 151 insertions(+), 98 deletions(-)

diff --git a/mgmt/ProcessManager.cc b/mgmt/ProcessManager.cc
index 128711f..7c5c4ea 100644
--- a/mgmt/ProcessManager.cc
+++ b/mgmt/ProcessManager.cc
@@ -36,19 +36,56 @@
  */
 inkcoreapi ProcessManager *pmgmt = nullptr;
 
+void
+ProcessManager::start(std::function<void()> const &cb)
+{
+  Debug("pmgmt", "starting process manager");
+
+  init = cb;
+
+  ink_release_assert(running == 0);
+  ink_atomic_increment(&running, 1);
+  poll_thread = ink_thread_create(processManagerThread, NULL, 0, 0, NULL);
+}
+
+void
+ProcessManager::stop()
+{
+  Debug("pmgmt", "stopping process manager");
+
+  ink_release_assert(running == 1);
+  ink_atomic_decrement(&running, 1);
+
+  int tmp = local_manager_sockfd;
+
+  local_manager_sockfd = -1;
+  close_socket(tmp);
+
+  ink_thread_join(poll_thread);
+  poll_thread = ink_thread_null();
+
+  while (!queue_is_empty(mgmt_signal_queue)) {
+    char *sig = (char *)dequeue(mgmt_signal_queue);
+    ats_free(sig);
+  }
+
+  ats_free(mgmt_signal_queue);
+}
+
 /*
- * startProcessManager(...)
+ * processManagerThread(...)
  *   The start function and thread loop for the process manager.
  */
 void *
-startProcessManager(void *arg)
+ProcessManager::processManagerThread(void *arg)
 {
   void *ret = arg;
 
   while (!pmgmt) { /* Avert race condition, thread spun during constructor */
-    Debug("pmgmt", "[startProcessManager] Waiting for initialization of 
object...");
+    Debug("pmgmt", "waiting for initialization");
     mgmt_sleep_sec(1);
   }
+
   if (pmgmt->require_lm) { /* Allow p. process to run w/o a lm */
     pmgmt->initLMConnection();
   }
@@ -57,22 +94,21 @@ startProcessManager(void *arg)
     pmgmt->init();
   }
 
-  for (;;) {
-    if (unlikely(shutdown_event_system == true)) {
-      return nullptr;
-    }
+  while (pmgmt->running) {
     if (pmgmt->require_lm) {
       pmgmt->pollLMConnection();
     }
+
     pmgmt->processEventQueue();
     pmgmt->processSignalQueue();
     mgmt_sleep_sec(pmgmt->timeout);
   }
+
   return ret;
-} /* End startProcessManager */
+}
 
 ProcessManager::ProcessManager(bool rlm)
-  : BaseManager(), require_lm(rlm), local_manager_sockfd(0), cbtable(nullptr), 
max_msgs_in_a_row(1)
+  : BaseManager(), require_lm(rlm), pid(getpid()), local_manager_sockfd(0), 
cbtable(nullptr), max_msgs_in_a_row(1)
 {
   mgmt_signal_queue = create_queue();
 
@@ -80,19 +116,25 @@ ProcessManager::ProcessManager(bool rlm)
   // Making the process_manager thread a spinning thread to start traffic 
server
   // as quickly as possible. Will reset this timeout when reconfigure()
   timeout = 0;
-  pid     = getpid();
-} /* End ProcessManager::ProcessManager */
+}
+
+ProcessManager::~ProcessManager()
+{
+  if (running) {
+    stop();
+  }
+}
 
 void
 ProcessManager::reconfigure()
 {
-  bool found;
   max_msgs_in_a_row = MAX_MSGS_IN_A_ROW;
-  timeout           = REC_readInteger("proxy.config.process_manager.timeout", 
&found);
-  ink_assert(found);
 
-  return;
-} /* End ProcessManager::reconfigure */
+  if (RecGetRecordInt("proxy.config.process_manager.timeout", &timeout) != 
REC_ERR_OKAY) {
+    // Default to 5sec if the timeout is unspecified.
+    timeout = 5;
+  }
+}
 
 void
 ProcessManager::signalConfigFileChild(const char *parent, const char *child, 
unsigned int options)
@@ -100,8 +142,9 @@ ProcessManager::signalConfigFileChild(const char *parent, 
const char *child, uns
   static const MgmtMarshallType fields[] = {MGMT_MARSHALL_STRING, 
MGMT_MARSHALL_STRING, MGMT_MARSHALL_INT};
 
   MgmtMarshallInt mgmtopt = options;
-  size_t len              = mgmt_message_length(fields, countof(fields), 
&parent, &child, &mgmtopt);
-  void *buffer            = ats_malloc(len);
+
+  size_t len   = mgmt_message_length(fields, countof(fields), &parent, &child, 
&mgmtopt);
+  void *buffer = ats_malloc(len);
 
   mgmt_message_marshall(buffer, len, fields, countof(fields), &parent, &child, 
&mgmtopt);
   signalManager(MGMT_SIGNAL_CONFIG_FILE_CHILD, (const char *)buffer, len);
@@ -113,8 +156,7 @@ void
 ProcessManager::signalManager(int msg_id, const char *data_str)
 {
   signalManager(msg_id, data_str, strlen(data_str) + 1);
-  return;
-} /* End ProcessManager::signalManager */
+}
 
 void
 ProcessManager::signalManager(int msg_id, const char *data_raw, int data_len)
@@ -125,10 +167,9 @@ ProcessManager::signalManager(int msg_id, const char 
*data_raw, int data_len)
   mh->msg_id   = msg_id;
   mh->data_len = data_len;
   memcpy((char *)mh + sizeof(MgmtMessageHdr), data_raw, data_len);
-  ink_assert(enqueue(mgmt_signal_queue, mh));
-  return;
 
-} /* End ProcessManager::signalManager */
+  ink_release_assert(enqueue(mgmt_signal_queue, mh));
+}
 
 bool
 ProcessManager::processEventQueue()
@@ -138,20 +179,24 @@ ProcessManager::processEventQueue()
   while (!queue_is_empty(mgmt_event_queue)) {
     MgmtMessageHdr *mh = (MgmtMessageHdr *)dequeue(mgmt_event_queue);
 
-    Debug("pmgmt", "[ProcessManager] ==> Processing event id '%d' payload=%d", 
mh->msg_id, mh->data_len);
+    Debug("pmgmt", "processing event id '%d' payload=%d", mh->msg_id, 
mh->data_len);
     if (mh->data_len > 0) {
       executeMgmtCallback(mh->msg_id, (char *)mh + sizeof(MgmtMessageHdr), 
mh->data_len);
     } else {
       executeMgmtCallback(mh->msg_id, nullptr, 0);
     }
+
+    // A shutdown message is a normal exit, so Alert rather than Fatal.
     if (mh->msg_id == MGMT_EVENT_SHUTDOWN) {
-      mgmt_fatal(0, "[ProcessManager::processEventQueue] Shutdown msg 
received, exiting\n");
-    } /* Exit on shutdown */
+      Alert("exiting on shutdown message");
+    }
+
     ats_free(mh);
     ret = true;
   }
+
   return ret;
-} /* End ProcessManager::processEventQueue */
+}
 
 bool
 ProcessManager::processSignalQueue()
@@ -161,11 +206,10 @@ ProcessManager::processSignalQueue()
   while (!queue_is_empty(mgmt_signal_queue)) {
     MgmtMessageHdr *mh = (MgmtMessageHdr *)dequeue(mgmt_signal_queue);
 
-    Debug("pmgmt", "[ProcessManager] ==> Signalling local manager '%d'", 
mh->msg_id);
+    Debug("pmgmt", "signaling local manager with message ID %d", mh->msg_id);
 
     if (require_lm && mgmt_write_pipe(local_manager_sockfd, (char *)mh, 
sizeof(MgmtMessageHdr) + mh->data_len) <= 0) {
-      mgmt_fatal(errno, "[ProcessManager::processSignalQueue] Error writing 
message!");
-      // ink_assert(enqueue(mgmt_signal_queue, mh));
+      Fatal("error writing message: %s", strerror(errno));
     } else {
       ats_free(mh);
       ret = true;
@@ -173,7 +217,7 @@ ProcessManager::processSignalQueue()
   }
 
   return ret;
-} /* End ProcessManager::processSignalQueue */
+}
 
 void
 ProcessManager::initLMConnection()
@@ -197,16 +241,17 @@ ProcessManager::initLMConnection()
 #else
   servlen = strlen(serv_addr.sun_path) + sizeof(serv_addr.sun_family);
 #endif
+
   if ((local_manager_sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
-    mgmt_fatal(errno, "[ProcessManager::initLMConnection] Unable to create 
socket\n");
+    Fatal("Unable to create socket '%s': %s", (const char *)sockpath, 
strerror(errno));
   }
 
   if (fcntl(local_manager_sockfd, F_SETFD, FD_CLOEXEC) < 0) {
-    mgmt_fatal(errno, "[ProcessManager::initLMConnection] Unable to set 
close-on-exec\n");
+    Fatal("unable to set close-on-exec flag: %s", strerror(errno));
   }
 
   if ((connect(local_manager_sockfd, (struct sockaddr *)&serv_addr, servlen)) 
< 0) {
-    mgmt_fatal(errno, "[ProcessManager::initLMConnection] failed to connect 
management socket '%s'\n", (const char *)sockpath);
+    Fatal("failed to connect management socket '%s': %s", (const char 
*)sockpath, strerror(errno));
   }
 
   data_len          = sizeof(pid_t);
@@ -214,63 +259,89 @@ ProcessManager::initLMConnection()
   mh_full->msg_id   = MGMT_SIGNAL_PID;
   mh_full->data_len = data_len;
   memcpy((char *)mh_full + sizeof(MgmtMessageHdr), &(pid), data_len);
+
   if (mgmt_write_pipe(local_manager_sockfd, (char *)mh_full, 
sizeof(MgmtMessageHdr) + data_len) <= 0) {
-    mgmt_fatal(errno, "[ProcessManager::initLMConnection] Error writing 
message!\n");
+    Fatal("error writing message: %s", strerror(errno));
   }
-
-} /* End ProcessManager::initLMConnection */
+}
 
 void
 ProcessManager::pollLMConnection()
 {
-  int res;
-
   MgmtMessageHdr mh_hdr;
-  MgmtMessageHdr *mh_full;
   char *data_raw;
 
   // Avoid getting stuck enqueuing too many requests in a row, limit to 
MAX_MSGS_IN_A_ROW.
   int count;
-  for (count = 0; count < max_msgs_in_a_row; ++count) {
+  for (count = 0; running && count < max_msgs_in_a_row; ++count) {
     int num;
 
     num = mgmt_read_timeout(local_manager_sockfd, 1 /* sec */, 0 /* usec */);
-    if (num == 0) { /* Have nothing */
+    if (num == 0) {
+      // Nothing but a timeout. We are done for now.
       break;
-    } else if (num > 0) { /* We have a message */
-      if ((res = mgmt_read_pipe(local_manager_sockfd, (char *)&mh_hdr, 
sizeof(MgmtMessageHdr))) > 0) {
-        size_t mh_full_size = sizeof(MgmtMessageHdr) + mh_hdr.data_len;
-        mh_full             = (MgmtMessageHdr *)ats_malloc(mh_full_size);
+    }
 
-        memcpy(mh_full, &mh_hdr, sizeof(MgmtMessageHdr));
-        data_raw = (char *)mh_full + sizeof(MgmtMessageHdr);
+    if (num < 0) {
+      // Socket read error.
+      Debug("pmgmt", "socket select failed: %s", strerror(errno));
+      continue;
+    }
 
-        if ((res = mgmt_read_pipe(local_manager_sockfd, data_raw, 
mh_hdr.data_len)) > 0) {
-          Debug("pmgmt", "[ProcessManager::pollLMConnection] Message: '%d'", 
mh_full->msg_id);
-          handleMgmtMsgFromLM(mh_full);
-        } else if (res < 0) {
-          mgmt_fatal(errno, "[ProcessManager::pollLMConnection] Error in 
read!");
+    if (num > 0) {
+      // We have a message, try to read the message header.
+      int res = mgmt_read_pipe(local_manager_sockfd, (char *)&mh_hdr, 
sizeof(MgmtMessageHdr));
+
+      if (res < 0) {
+        if (running) {
+          // If we are no longer running, the socket might have been closed out
+          // from under us. At any rate, we don't care any more.
+          Fatal("socket read error: %s", strerror(errno));
         }
 
-        ats_free(mh_full);
-      } else if (res < 0) {
-        mgmt_fatal(errno, "[ProcessManager::pollLMConnection] Error in read!");
+        break;
       }
 
-      // handle EOF
+      // Handle EOF from the manager. This is normal, so we log an Alert
+      // rather than a Fatal.
       if (res == 0) {
         close_socket(local_manager_sockfd);
-        if (!shutdown_event_system) {
-          mgmt_fatal(0, "[ProcessManager::pollLMConnection] Lost Manager 
EOF!");
+        Alert("exiting with EOF from process manager");
+      }
+
+      if (res > 0) {
+        size_t mh_full_size     = sizeof(MgmtMessageHdr) + mh_hdr.data_len;
+        MgmtMessageHdr *mh_full = (MgmtMessageHdr *)ats_malloc(mh_full_size);
+
+        memcpy(mh_full, &mh_hdr, sizeof(MgmtMessageHdr));
+        data_raw = (char *)mh_full + sizeof(MgmtMessageHdr);
+
+        res = mgmt_read_pipe(local_manager_sockfd, data_raw, mh_hdr.data_len);
+        if (res > 0) {
+          Debug("pmgmt", "received message ID %d", mh_full->msg_id);
+          handleMgmtMsgFromLM(mh_full);
+        }
+
+        if (res == 0) {
+          close_socket(local_manager_sockfd);
+          Alert("exiting with EOF from process manager");
+        }
+
+        if (res < 0) {
+          if (running) {
+            // If we are no longer running, the socket might have been closed 
out
+            // from under us. At any rate, we don't care any more.
+            Fatal("socket read error: %s", strerror(errno));
+          }
         }
+
+        ats_free(mh_full);
       }
-    } else if (num < 0) { /* Error */
-      mgmt_log("[ProcessManager::pollLMConnection] select failed or was 
interrupted (%d)\n", errno);
     }
   }
 
-  Debug("pmgmt", "[ProcessManager::pollLMConnection] enqueued %d of max %d 
messages in a row", count, max_msgs_in_a_row);
-} /* End ProcessManager::pollLMConnection */
+  Debug("pmgmt", "[%s] enqueued %d of max %d messages in a row", __func__, 
count, max_msgs_in_a_row);
+}
 
 void
 ProcessManager::handleMgmtMsgFromLM(MgmtMessageHdr *mh)
@@ -322,7 +393,7 @@ ProcessManager::handleMgmtMsgFromLM(MgmtMessageHdr *mh)
     signalMgmtEntity(MGMT_EVENT_LIFECYCLE_MESSAGE, data_raw, mh->data_len);
     break;
   default:
-    mgmt_log("[ProcessManager::pollLMConnection] unknown type %d\n", 
mh->msg_id);
+    Warning("received unknown message ID %d\n", mh->msg_id);
     break;
   }
 }
diff --git a/mgmt/ProcessManager.h b/mgmt/ProcessManager.h
index 7e728d4..e25b4ef 100644
--- a/mgmt/ProcessManager.h
+++ b/mgmt/ProcessManager.h
@@ -45,38 +45,19 @@
 
 class ConfigUpdateCbTable;
 
-void *startProcessManager(void *arg);
 class ProcessManager : public BaseManager
 {
 public:
   ProcessManager(bool rlm);
-  ~ProcessManager()
-  {
-    close_socket(local_manager_sockfd);
-    while (!queue_is_empty(mgmt_signal_queue)) {
-      char *sig = (char *)dequeue(mgmt_signal_queue);
-      ats_free(sig);
-    }
-    ats_free(mgmt_signal_queue);
-  }
-
-  /** Start a thread for the process manager.
+  ~ProcessManager();
 
-      If @a cb is set then it is called after the thread is started and before 
any messages are processed.
-  */
-  void
-  start(std::function<void()> const &cb = std::function<void()>())
-  {
-    init = cb;
-    ink_thread_create(startProcessManager, NULL, 0, 0, NULL);
-  }
+  // Start a thread for the process manager. If @a cb is set then it
+  // is called after the thread is started and before any messages are
+  // processed.
+  void start(std::function<void()> const &cb = std::function<void()>());
 
-  void
-  stop()
-  {
-    mgmt_log("[ProcessManager::stop] Bringing down connection\n");
-    close_socket(local_manager_sockfd);
-  }
+  // Stop the process manager, dropping any unprocessed messages.
+  void stop();
 
   inkcoreapi void signalConfigFileChild(const char *parent, const char *child, 
unsigned int options);
   inkcoreapi void signalManager(int msg_id, const char *data_str);
@@ -96,25 +77,26 @@ public:
     cbtable = _cbtable;
   }
 
+private:
   bool require_lm;
-  time_t timeout;
-
+  RecInt timeout;
   LLQ *mgmt_signal_queue;
-
   pid_t pid;
 
+  ink_thread poll_thread = ink_thread_null();
+  volatile int running   = 0;
+
   /// Thread initialization callback.
   /// This allows @c traffic_server and @c traffic_manager to perform 
different initialization in the thread.
   std::function<void()> init;
 
   int local_manager_sockfd;
-
-private:
-  static const int MAX_MSGS_IN_A_ROW = 10000;
-
   ConfigUpdateCbTable *cbtable;
   int max_msgs_in_a_row;
-}; /* End class ProcessManager */
+
+  static const int MAX_MSGS_IN_A_ROW = 10000;
+  static void *processManagerThread(void *arg);
+};
 
 inkcoreapi extern ProcessManager *pmgmt;
 

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to