This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 90a56eb  Fixed a deadlock in libprocess.
90a56eb is described below

commit 90a56eb289d1dbd40cb0adcb8fd2f21eaf1ce0cf
Author: Benjamin Mahler <[email protected]>
AuthorDate: Mon Jun 3 20:19:35 2019 -0400

    Fixed a deadlock in libprocess.
    
    Per MESOS-9808, when we deliver a DispatchEvent to a TERMINATING
    Process, the DispatchEvent is destructed while holding a
    TERMINATING process reference. This can lead to a deadlock
    because the DispatchEvent destruction can run additional code
    (e.g. destructors of objects that have been bound into the
    dispatch) that ultimately invokes terminate (or dispatch, or send)
    while the original TERMINATING process reference is still held. If
    the TERMINATING process referenced is being cleaned up, we may
    deadlock between (1) `ProcessManager::cleanup()` holding the
    `processes_mutex` and spinning for all references to go away and
    (2) the terminate (or dispatch, or send) in ProcessManager::use
    trying to resolve the destination Process by locking
    `processes_mutex`.
    
    Example from MESOS-9808:
    
      Thread 19:
        Running ReaderProcess
        ReaderProcess::_consume, sets a Promise
        Dispatches to mock API subscriber Process. **It's TERMINATING,
          so DispatchEvent gets deleted while holding a reference to
          the mock api subscriber!**
        Destructs the process::Loop in mock API subscriber Process
        Deletes the Reader, Reader terminates ReaderProcess
        **Trying to lock `processes_mutex` in `ProcessManager::use()`
          of ReaderProcess (deadlock)**
    
      Thread 5:
        ProcessManager::cleanup of Mock API Subscriber Process,
          looping on references to go away (spinning forever)
    
    Here, thread 5 is holding `processes_mutex` locked and spinning
    waiting for references to the Mock API Subscriber Process to go
    away, but thread 19 is blocked also trying to lock `processes_mutex`
    while holding a reference to the Mock API Subscriber Process!
    
    The fix here is to not allow arbitrary code to execute while a
    ProcessReference is held. This is already done in other locations
    but this spot was missed. The reason we haven't hit this bug yet
    is that it's rather uncommon for a dispatch to have bound
    objects whose destructors lead to a `terminate()` call (today
    `terminate()` always resolves the process reference, even if
    it's already resolved, whereas `dispatch()` uses cached process
    references).
    
    Review: https://reviews.apache.org/r/70778
---
 3rdparty/libprocess/include/process/process.hpp |   6 +-
 3rdparty/libprocess/src/event_queue.hpp         |  21 ++---
 3rdparty/libprocess/src/process.cpp             | 109 ++++++++++++++++--------
 3 files changed, 90 insertions(+), 46 deletions(-)

diff --git a/3rdparty/libprocess/include/process/process.hpp 
b/3rdparty/libprocess/include/process/process.hpp
index 7c255ac..34989df 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -418,7 +418,11 @@ private:
   std::atomic<bool> termination = ATOMIC_VAR_INIT(false);
 
   // Enqueue the specified message, request, or function call.
-  void enqueue(Event* event);
+  // Returns false if not enqueued (i.e. the process is terminating).
+  // In this case the caller retains ownership of the event.
+  // Should not be called directly, callers should go through
+  // `ProcessManager::deliver(...)`.
+  bool enqueue(Event* event);
 
   // Delegates for messages.
   std::map<std::string, UPID> delegates;
diff --git a/3rdparty/libprocess/src/event_queue.hpp 
b/3rdparty/libprocess/src/event_queue.hpp
index 999d552..85252e6 100644
--- a/3rdparty/libprocess/src/event_queue.hpp
+++ b/3rdparty/libprocess/src/event_queue.hpp
@@ -73,7 +73,10 @@ public:
   class Producer
   {
   public:
-    void enqueue(Event* event) { queue->enqueue(event); }
+    // Returns false if not enqueued; this means the queue
+    // is decomissioned. In this case the caller retains
+    // ownership of the event.
+    bool enqueue(Event* event) { return queue->enqueue(event); }
 
   private:
     friend class EventQueue;
@@ -106,19 +109,16 @@ private:
   friend class Consumer;
 
 #ifndef LOCK_FREE_EVENT_QUEUE
-  void enqueue(Event* event)
+  bool enqueue(Event* event)
   {
-    bool enqueued = false;
     synchronized (mutex) {
       if (comissioned) {
         events.push_back(event);
-        enqueued = true;
+        return true;
       }
     }
 
-    if (!enqueued) {
-      delete event;
-    }
+    return false;
   }
 
   Event* dequeue()
@@ -185,13 +185,14 @@ private:
   std::deque<Event*> events;
   bool comissioned = true;
 #else // LOCK_FREE_EVENT_QUEUE
-  void enqueue(Event* event)
+  bool enqueue(Event* event)
   {
     if (comissioned.load()) {
       queue.enqueue(event);
-    } else {
-      delete event;
+      return true;
     }
+
+    return false;
   }
 
   Event* dequeue()
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index a8d9151..799666f 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -369,13 +369,15 @@ public:
       const Socket& socket,
       Request* request);
 
+  // Returns whether the event was delivered to the destination's
+  // queue. This function takes ownership over `event` and will
+  // delete it if it was not delivered.
   bool deliver(
-      ProcessBase* receiver,
+      ProcessBase* destination,
       Event* event,
       ProcessBase* sender = nullptr);
-
   bool deliver(
-      const UPID& to,
+      const UPID& destination,
       Event* event,
       ProcessBase* sender = nullptr);
 
@@ -424,6 +426,8 @@ public:
   }
 
 private:
+  bool _deliver(ProcessBase* destination, Event* event, ProcessBase* sender);
+
   // Delegate process name to receive root HTTP requests.
   const Option<string> delegate;
 
@@ -1591,7 +1595,7 @@ void SocketManager::link(
           // for the linkee. At this point, we have not passed ownership of
           // this socket to the `SocketManager`, so there is only one possible
           // linkee to notify.
-          process->enqueue(new ExitedEvent(to));
+          process_manager->deliver(process, new ExitedEvent(to));
           return;
         }
         socket = create.get();
@@ -1624,7 +1628,7 @@ void SocketManager::link(
           // for the linkee. At this point, we have not passed ownership of
           // this socket to the `SocketManager`, so there is only one possible
           // linkee to notify.
-          process->enqueue(new ExitedEvent(to));
+          process_manager->deliver(process, new ExitedEvent(to));
           return;
         }
 
@@ -2260,7 +2264,7 @@ void SocketManager::exited(const Address& address)
       CHECK(links.linkers.contains(linkee));
 
       foreach (ProcessBase* linker, links.linkers[linkee]) {
-        linker->enqueue(new ExitedEvent(linkee));
+        process_manager->deliver(linker, new ExitedEvent(linkee));
 
         // Remove the linkee pid from the linker.
         CHECK(links.linkees.contains(linker));
@@ -2327,7 +2331,7 @@ void SocketManager::exited(ProcessBase* process)
     foreach (ProcessBase* linker, links.linkers[pid]) {
       CHECK(linker != process) << "Process linked with itself";
       Clock::update(linker, time);
-      linker->enqueue(new ExitedEvent(pid));
+      process_manager->deliver(linker, new ExitedEvent(pid));
 
       // Remove the linkee pid from the linker.
       CHECK(links.linkees.contains(linker));
@@ -2766,47 +2770,68 @@ void ProcessManager::handle(
 
 
 bool ProcessManager::deliver(
-    ProcessBase* receiver,
+    ProcessBase* destination,
     Event* event,
     ProcessBase* sender)
 {
   CHECK(event != nullptr);
 
-  // If we are using a manual clock then update the current time of
-  // the receiver using the sender if necessary to preserve the
-  // happens-before relationship between the sender and receiver. Note
-  // that the assumption is that the sender remains valid for at least
-  // the duration of this routine (so that we can look up its current
-  // time).
-  if (Clock::paused()) {
-    Clock::update(
-        receiver, Clock::now(sender != nullptr ? sender : __process__));
+  if (_deliver(destination, event, sender)) {
+    return true;
   }
 
-  receiver->enqueue(event);
-
-  return true;
+  delete event;
+  return false;
 }
 
 
 bool ProcessManager::deliver(
-    const UPID& to,
+    const UPID& destination,
     Event* event,
     ProcessBase* sender)
 {
   CHECK(event != nullptr);
 
-  if (ProcessReference receiver = use(to)) {
-    return deliver(receiver, event, sender);
+  if (ProcessReference reference = use(destination)) {
+    if (_deliver(reference, event, sender)) {
+      return true;
+    }
+  } else {
+    VLOG(2) << "Dropping event for process " << destination;
   }
 
-  VLOG(2) << "Dropping event for process " << to;
-
+  // Note that we must delete the event without holding the
+  // process reference, since deletion of a dispatch event
+  // may invoke other code via destructors of objects bound
+  // into the dispatched function and therefore can lead to
+  // deadlock. An example of such a deadlock is in MESOS-9808.
   delete event;
   return false;
 }
 
 
+bool ProcessManager::_deliver(
+    ProcessBase* destination,
+    Event* event,
+    ProcessBase* sender)
+{
+  CHECK(event != nullptr);
+
+  // If we are using a manual clock then update the current time of
+  // the receiver using the sender if necessary to preserve the
+  // happens-before relationship between the sender and receiver. Note
+  // that the assumption is that the sender remains valid for at least
+  // the duration of this routine (so that we can look up its current
+  // time).
+  if (Clock::paused()) {
+    Clock::update(
+        destination, Clock::now(sender != nullptr ? sender : __process__));
+  }
+
+  return destination->enqueue(event);
+}
+
+
 UPID ProcessManager::spawn(ProcessBase* process, bool manage)
 {
   CHECK_NOTNULL(process);
@@ -3141,7 +3166,7 @@ void ProcessManager::link(
     } else {
       // Since the pid isn't valid its process must have already died
       // (or hasn't been spawned yet) so send a process exit message.
-      process->enqueue(new ExitedEvent(to));
+      process_manager->deliver(process, new ExitedEvent(to));
     }
   }
 }
@@ -3158,11 +3183,11 @@ void ProcessManager::terminate(
           process, Clock::now(sender != nullptr ? sender : __process__));
     }
 
-    if (sender != nullptr) {
-      process->enqueue(new TerminateEvent(sender->self(), inject));
-    } else {
-      process->enqueue(new TerminateEvent(UPID(), inject));
-    }
+    process_manager->deliver(
+        process,
+        new TerminateEvent(
+            sender != nullptr ? sender->self() : UPID(),
+            inject));
   }
 }
 
@@ -3500,7 +3525,7 @@ size_t ProcessBase::eventCount<TerminateEvent>()
 }
 
 
-void ProcessBase::enqueue(Event* event)
+bool ProcessBase::enqueue(Event* event)
 {
   CHECK_NOTNULL(event);
 
@@ -3513,15 +3538,27 @@ void ProcessBase::enqueue(Event* event)
     event->is<TerminateEvent>() &&
     event->as<TerminateEvent>().inject;
 
+  bool enqueued = false;
+
   switch (old) {
     case State::BOTTOM:
     case State::READY:
     case State::BLOCKED:
-      events->producer.enqueue(event);
+      enqueued = events->producer.enqueue(event);
       break;
     case State::TERMINATING:
-      delete event;
-      return;
+      break;
+  }
+
+  // NOTE: It's the responsibility of the caller to delete the
+  // undelivered event. This is by design since the destruction
+  // of a dispatch event may invoke other code. Therefore, if
+  // the caller is holding a `ProcessReference` to this process
+  // it must be cleared prior to deleting the dispatch event.
+  if (!enqueued) {
+    // TODO(bmahler): Log the type of event being dropped.
+    VLOG(2) << "Dropping event for TERMINATING process " << pid;
+    return false;
   }
 
   // We need to store terminate _AFTER_ we enqueue the event because
@@ -3547,6 +3584,8 @@ void ProcessBase::enqueue(Event* event)
       process_manager->enqueue(this);
     }
   }
+
+  return true;
 }
 
 

Reply via email to