This is an automated email from the ASF dual-hosted git repository.

gilbert pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit bf01659f5b064842d9a8d088f3fa925c20a660ed
Author: Andrei Budnik <[email protected]>
AuthorDate: Thu Jul 18 09:10:39 2019 -0700

    Added `PendingFutureTracker` class for tracking pending futures.
    
    This patch introduces a mechanism for tracking pending futures.
    This feature allows detection of hanging operations, which get
    stuck on a blocking operation or asynchronously. However, this
    feature does not provide any mechanism for tracking pending
    promises, because `Promise` objects might not be accessible in
    various cases. Thereby, we introduce a new class that can be
    used to track pending futures, so it might facilitate debugging
    of stuck issues.
    
    Review: https://reviews.apache.org/r/70887/
---
 src/Makefile.am               |   1 +
 src/common/future_tracker.hpp | 159 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 160 insertions(+)

diff --git a/src/Makefile.am b/src/Makefile.am
index 761dde1..ecdced4 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1067,6 +1067,7 @@ libmesos_no_3rdparty_la_SOURCES +=                        
                \
   common/build.hpp                                                     \
   common/command_utils.cpp                                             \
   common/command_utils.hpp                                             \
+  common/future_tracker.hpp                                            \
   common/heartbeater.hpp                                               \
   common/http.cpp                                                      \
   common/http.hpp                                                      \
diff --git a/src/common/future_tracker.hpp b/src/common/future_tracker.hpp
new file mode 100644
index 0000000..a3f191a
--- /dev/null
+++ b/src/common/future_tracker.hpp
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __FUTURE_TRACKER_HPP__
+#define __FUTURE_TRACKER_HPP__
+
+#include <list>
+#include <map>
+#include <string>
+#include <vector>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+namespace mesos {
+namespace internal {
+
+struct FutureMetadata
+{
+  std::string operation;
+  std::string component;
+  std::map<std::string, std::string> args;
+
+  inline bool operator==(const FutureMetadata& that) const
+  {
+    return operation == that.operation &&
+           component == that.component &&
+           args == that.args;
+  }
+};
+
+
+class PendingFutureTrackerProcess
+  : public process::Process<PendingFutureTrackerProcess>
+{
+public:
+  PendingFutureTrackerProcess()
+    : ProcessBase(process::ID::generate("pending-future-tracker")) {}
+
+  template <typename T>
+  void addFuture(const process::Future<T>& future, FutureMetadata&& metadata)
+  {
+    auto it = pending.emplace(pending.end(), std::move(metadata));
+
+    future
+      .onAny(process::defer(
+          self(), &PendingFutureTrackerProcess::eraseFuture, it))
+      .onAbandoned(process::defer(
+          self(), &PendingFutureTrackerProcess::eraseFuture, it))
+      .onDiscard(process::defer(
+          self(), &PendingFutureTrackerProcess::eraseFuture, it));
+  }
+
+  void eraseFuture(typename std::list<FutureMetadata>::iterator it)
+  {
+    pending.erase(it);
+  }
+
+  process::Future<std::vector<FutureMetadata>> pendingFutures()
+  {
+    return std::vector<FutureMetadata>(pending.begin(), pending.end());
+  }
+
+private:
+  std::list<FutureMetadata> pending;
+};
+
+
+class PendingFutureTracker
+{
+public:
+  static Try<PendingFutureTracker*> create()
+  {
+    return new 
PendingFutureTracker(process::Owned<PendingFutureTrackerProcess>(
+        new PendingFutureTrackerProcess));
+  }
+
+  ~PendingFutureTracker()
+  {
+    terminate(process.get());
+    process::wait(process.get());
+  }
+
+  /**
+   * This method subscribes on state transitions of the `future` to keep track
+   * of pending operations/promises associated with this future.
+   *
+   * @param operation Operation's name identifies the place in the code related
+   * to this future. E.g., "some/isolator::prepare".
+   *
+   * @param component Component is used to distinguish pending futures
+   * related to different components so that they can be exposed by
+   * different API endpoints.
+   *
+   * @param args A list of pairs <argument name, argument value> representing
+   * arguments passed to the function that returned the given future.
+   *
+   * @return The same `future` which is passed as the first argument.
+   */
+  template <typename T>
+  process::Future<T> track(
+      const process::Future<T>& future,
+      const std::string& operation,
+      const std::string& component,
+      const std::map<std::string, std::string>& args = {})
+  {
+    process::dispatch(
+        process.get(),
+        &PendingFutureTrackerProcess::addFuture<T>,
+        future,
+        FutureMetadata{operation, component, args});
+
+    return future;
+  }
+
+  /**
+   * This method returns a list of pending futures represented as objects of
+   * `FutureMetadata` class, whose variables are initialized by the arguments
+   * passed to the `track` method.
+   */
+  process::Future<std::vector<FutureMetadata>> pendingFutures()
+  {
+    return process::dispatch(
+        process.get(),
+        &PendingFutureTrackerProcess::pendingFutures);
+  }
+
+private:
+  explicit PendingFutureTracker(
+      const process::Owned<PendingFutureTrackerProcess>& _process)
+    : process(_process)
+  {
+    spawn(process.get());
+  }
+
+  process::Owned<PendingFutureTrackerProcess> process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __FUTURE_TRACKER_HPP__

Reply via email to