This is an automated email from the ASF dual-hosted git repository.

bennoe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 1915150c6a83cd95197e25a68a6adf9b3ef5fb11
Author: Benno Evers <[email protected]>
AuthorDate: Fri Mar 22 17:51:34 2019 +0100

    Added new example framework for operation feedback.
    
    This adds a new example framework showcasing a possible
    implementation of the newly added operation feedback API.
    
    Review: https://reviews.apache.org/r/70282
---
 src/Makefile.am                                |   6 +
 src/examples/CMakeLists.txt                    |   2 +
 src/examples/operation_feedback_framework.cpp  | 861 +++++++++++++++++++++++++
 src/tests/CMakeLists.txt                       |   1 +
 src/tests/examples_tests.cpp                   |   3 +
 src/tests/operation_feedback_framework_test.sh |  44 ++
 6 files changed, 917 insertions(+)

diff --git a/src/Makefile.am b/src/Makefile.am
index bcafe48..7c2131a 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2274,6 +2274,11 @@ test_http_executor_SOURCES = 
examples/test_http_executor.cpp
 test_http_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
 test_http_executor_LDADD = libmesos.la $(LDADD)
 
+check_PROGRAMS += operation-feedback-framework
+operation_feedback_framework_SOURCES = 
examples/operation_feedback_framework.cpp
+operation_feedback_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
+operation_feedback_framework_LDADD = libmesos.la $(LDADD)
+
 check_PROGRAMS += long-lived-framework
 long_lived_framework_SOURCES = examples/long_lived_framework.cpp
 long_lived_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
@@ -2796,6 +2801,7 @@ dist_check_SCRIPTS +=                                     
        \
   tests/java_v0_framework_test.sh                              \
   tests/java_v1_framework_test.sh                              \
   tests/no_executor_framework_test.sh                          \
+  tests/operation_feedback_framework_test.sh                   \
   tests/persistent_volume_framework_test.sh                    \
   tests/python_framework_test.sh                               \
   tests/test_http_framework_test.sh                            \
diff --git a/src/examples/CMakeLists.txt b/src/examples/CMakeLists.txt
index f477539..a811a07 100644
--- a/src/examples/CMakeLists.txt
+++ b/src/examples/CMakeLists.txt
@@ -45,6 +45,7 @@ if (NOT WIN32)
   add_executable(load-generator-framework      load_generator_framework.cpp)
   add_executable(long-lived-executor           long_lived_executor.cpp)
   add_executable(long-lived-framework          long_lived_framework.cpp)
+  add_executable(operation-feedback-framework  
operation_feedback_framework.cpp)
   add_executable(no-executor-framework         no_executor_framework.cpp)
   add_executable(persistent-volume-framework   persistent_volume_framework.cpp)
   add_executable(test-executor                 test_executor.cpp)
@@ -84,6 +85,7 @@ if (NOT WIN32)
   target_link_libraries(load-generator-framework      PRIVATE mesos)
   target_link_libraries(long-lived-executor           PRIVATE mesos)
   target_link_libraries(long-lived-framework          PRIVATE mesos)
+  target_link_libraries(operation-feedback-framework  PRIVATE mesos)
   target_link_libraries(no-executor-framework         PRIVATE mesos)
   target_link_libraries(persistent-volume-framework   PRIVATE mesos)
   target_link_libraries(test-executor                 PRIVATE mesos)
diff --git a/src/examples/operation_feedback_framework.cpp 
b/src/examples/operation_feedback_framework.cpp
new file mode 100644
index 0000000..2480c34
--- /dev/null
+++ b/src/examples/operation_feedback_framework.cpp
@@ -0,0 +1,861 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+#include <vector>
+
+#include <mesos/type_utils.hpp>
+
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
+
+#include <mesos/authorizer/acls.hpp>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/check.hpp>
+#include <stout/exit.hpp>
+#include <stout/flags.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "examples/flags.hpp"
+
+// TODO(bevers): Write a version of `isTerminalState()` that is available
+// from public headers and can handle v1 protobufs.
+#include "internal/devolve.hpp"
+
+#include "logging/logging.hpp"
+
+using process::Owned;
+
+using std::string;
+using std::vector;
+
+using mesos::ACL;
+using mesos::ACLs;
+
+using mesos::v1::AgentID;
+using mesos::v1::Credential;
+using mesos::v1::FrameworkID;
+using mesos::v1::FrameworkInfo;
+using mesos::v1::Offer;
+using mesos::v1::OperationID;
+using mesos::v1::OperationState;
+using mesos::v1::OperationStatus;
+using mesos::v1::Resource;
+using mesos::v1::Resources;
+using mesos::v1::TaskID;
+using mesos::v1::TaskInfo;
+using mesos::v1::TaskStatus;
+
+using mesos::v1::OPERATION_FINISHED;
+using mesos::v1::OPERATION_GONE_BY_OPERATOR;
+using mesos::v1::OPERATION_FAILED;
+using mesos::v1::OPERATION_ERROR;
+using mesos::v1::OPERATION_DROPPED;
+using mesos::v1::OPERATION_RECOVERING;
+using mesos::v1::OPERATION_PENDING;
+using mesos::v1::OPERATION_UNREACHABLE;
+using mesos::v1::OPERATION_UNKNOWN;
+using mesos::v1::OPERATION_UNSUPPORTED;
+
+using mesos::v1::TASK_DROPPED;
+using mesos::v1::TASK_LOST;
+using mesos::v1::TASK_ERROR;
+using mesos::v1::TASK_FAILED;
+using mesos::v1::TASK_FINISHED;
+using mesos::v1::TASK_GONE;
+using mesos::v1::TASK_GONE_BY_OPERATOR;
+using mesos::v1::TASK_KILLING;
+using mesos::v1::TASK_KILLED;
+using mesos::v1::TASK_RUNNING;
+using mesos::v1::TASK_STAGING;
+using mesos::v1::TASK_STARTING;
+using mesos::v1::TASK_UNREACHABLE;
+using mesos::v1::TASK_UNKNOWN;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+using mesos::v1::scheduler::Mesos;
+
+
+namespace {
+
+constexpr char FRAMEWORK_NAME[] = "Operation Feedback Framework (C++)";
+constexpr Duration RECONCILIATION_INTERVAL = Seconds(30);
+constexpr Duration RESUBSCRIPTION_INTERVAL = Seconds(2);
+
+} // namespace {
+
+
+// This framework will run a set of given tasks on a Mesos cluster.
+// For each task, the framework will attempt to `RESERVE` task resources,
+// then `LAUNCH` the task and finally `UNRESERVE` task resources.
+//
+// This is very similar to what the `DynamicReservationFramework` does, but
+// this one does so using the v1 scheduler API and offer operation feedback
+// to showcase both of these features.
+//
+// In particular, if everything works smoothly every task passes sequentially
+// through the following lifecycle stages:
+//
+//   1) AWAITING_RESERVE_OFFER
+//      The framework is waiting for a suitable offer containing
+//      resources to reserve.
+//
+//   2) AWAITING_RESERVE_ACK
+//      The framework attempted to reserve resources and is awaiting
+//      confirmation via offer operation feedback.
+//
+//   3) AWAITING_LAUNCH_OFFER
+//      The resources for this task have been successfully reserved, and it
+//      is awaiting a suitable offer to launch the task on the reservation.
+//
+//   4) AWAITING_TASK_FINISHED
+//      The task has been launched, and the framework is waiting for it to
+//      finish.
+//
+//   5) AWAITING_UNRESERVE_OFFER
+//      The task has finished successfully, and the framework is waiting for
+//      another offer containing this task's reservation so it can clean up.
+//
+//   6) AWAITING_UNRESERVE_ACK
+//      The framework attempted to unreserve the resources for this task.
+//
+//
+// Note that some failure conditions are not handled by this example framework:
+//  - If an agent containing the reservation for one of the tasks is 
permanently
+//    removed before the task finished, this is not detected and no attempt is
+//    made to move the reservation to another agent. Instead, the task will be
+//    left waiting for an offer from that agent forever.
+//
+//  - If the framework is killed or shut down before all reservations have been
+//    unreserved, these left-over reservation require manual cleanup.
+//
+//  - The framework does not currently suppress or revive offers.
+
+class OperationFeedbackScheduler
+  : public process::Process<OperationFeedbackScheduler>
+{
+  struct SchedulerTask;
+
+public:
+  OperationFeedbackScheduler(
+      const FrameworkInfo& framework,
+      const string& master,
+      const string& role,
+      const Option<Credential>& credential)
+    : framework_(framework),
+      master_(master),
+      role_(role),
+      credential_(credential)
+  {
+  }
+
+  ~OperationFeedbackScheduler() override {}
+
+  void addTask(const string& command, const Resources& resources)
+  {
+    static int taskIdCounter = 0;
+    ++taskIdCounter;
+
+    Resource::ReservationInfo reservationInfo;
+    reservationInfo.set_type(Resource::ReservationInfo::DYNAMIC);
+    reservationInfo.set_role(role_);
+    reservationInfo.set_principal(framework_.principal());
+
+    SchedulerTask task;
+    task.stage = SchedulerTask::AWAITING_RESERVE_OFFER;
+    task.taskResources = resources;
+    task.taskResources.allocate(role_);
+    // The task will run on reserved resources.
+    Resources taskResourcesReserved =
+      task.taskResources.pushReservation(reservationInfo);
+    task.taskInfo.mutable_resources()->CopyFrom(taskResourcesReserved);
+    task.taskInfo.mutable_command()->set_shell(true);
+    task.taskInfo.mutable_command()->set_value(command);
+    task.taskInfo.mutable_task_id()->set_value(
+        "task-" + stringify(taskIdCounter));
+    task.taskInfo.set_name(
+        "Operation Feedback Task " + stringify(taskIdCounter));
+
+    tasks_.push_back(task);
+    return;
+  }
+
+protected:
+  void initialize() override
+  {
+    // We initialize the library here to ensure that callbacks are only invoked
+    // after the process has spawned.
+    mesos_.reset(
+        new Mesos(
+            master_,
+            mesos::ContentType::PROTOBUF,
+            process::defer(self(), &Self::connected),
+            process::defer(self(), &Self::disconnected),
+            process::defer(self(), &Self::received, lambda::_1),
+            credential_));
+  }
+
+  void finalize() override
+  {
+    if (framework_.has_id()) {
+      Call call;
+      call.mutable_framework_id()->CopyFrom(framework_.id());
+      call.set_type(Call::TEARDOWN);
+      mesos_->send(call);
+    }
+  }
+
+  void connected()
+  {
+    LOG(INFO) << "Connected";
+    reconcileOperations();
+    subscribe();
+  }
+
+  void subscribe()
+  {
+    LOG(INFO) << "Sending `SUBSCRIBE` call to Mesos master";
+    // The master can respond with an error to the `SUBSCRIBE` call, but the
+    // `Mesos` class will just silently swallow that error, leaving us hanging
+    // forever with no clue what's going on.
+    // In particular, running this with the `--master=local` flag frequently
+    // results in `503 Service Unavailable` responses.
+    // Therefore, we have to retry in a loop until we're actually registered.
+    if (!framework_.has_id()) {
+      mesos_->send(SUBSCRIBE(framework_));
+      process::delay(RESUBSCRIPTION_INTERVAL, self(), &Self::subscribe);
+    }
+  }
+
+  void disconnected()
+  {
+    EXIT(EXIT_FAILURE) << "Disconnected";
+  }
+
+  void received(std::queue<Event> events)
+  {
+    while (!events.empty()) {
+      Event event = events.front();
+      events.pop();
+
+      VLOG(1) << "Received " << event.type() << " event";
+
+      switch (event.type()) {
+        case Event::SUBSCRIBED: {
+          framework_.mutable_id()->CopyFrom(event.subscribed().framework_id());
+          LOG(INFO) << "Subscribed with ID '" << framework_.id();
+          break;
+        }
+
+        case Event::OFFERS: {
+          resourceOffers(google::protobuf::convert(event.offers().offers()));
+          break;
+        }
+
+        case Event::UPDATE: {
+          taskStatusUpdate(event.update().status());
+          break;
+        }
+
+        case Event::UPDATE_OPERATION_STATUS: {
+          operationStatusUpdate(event.update_operation_status().status());
+          break;
+        }
+
+        case Event::ERROR: {
+          EXIT(EXIT_FAILURE) << "Error: " << event.error().message();
+          break;
+        }
+
+        default:
+          break;
+      }
+    }
+  }
+
+  void resourceOffers(const vector<Offer>& offers)
+  {
+    foreach (const Offer& offer, offers) {
+      VLOG(1) << "Offer " << offer.id()
+              << " from agent " << offer.agent_id()
+              << " contained resources " << offer.resources();
+
+      Call call;
+      call.set_type(Call::ACCEPT);
+      call.mutable_framework_id()->CopyFrom(framework_.id());
+      Call::Accept* accept = call.mutable_accept();
+      accept->add_offer_ids()->CopyFrom(offer.id());
+
+      Resources remaining(offer.resources());
+      int reservations = 0, launches = 0, unreservations = 0;
+      bool havePendingTasks = false;  // Whether any task awaits an offer.
+
+      // For each pending task that does not yet have a reservation,
+      // check whether the offer contains enough unreserved resources
+      // and if so, attempt to reserve them.
+      foreachTaskInState(SchedulerTask::AWAITING_RESERVE_OFFER,
+          [&] (SchedulerTask& task) {
+        havePendingTasks = true;
+        if (remaining.contains(task.taskResources)) {
+          LOG(INFO) << "Reserving resources for task "
+                    << task.taskInfo.task_id();
+
+          Offer::Operation* operation = accept->add_operations();
+          operation->set_type(Offer::Operation::RESERVE);
+          std::string id = "reserve-" + id::UUID::random().toString();
+          operation->mutable_id()->set_value(id);
+          operation->mutable_reserve()->mutable_resources()->CopyFrom(
+              task.taskInfo.resources());
+
+          remaining -= task.taskResources;
+          task.taskInfo.mutable_agent_id()->CopyFrom(offer.agent_id());
+          task.reserveOperationId = operation->id();
+          task.stage = SchedulerTask::AWAITING_RESERVE_ACK;
+          ++reservations;
+        }
+      });
+
+      // For each pending task that had its resources reserved successfully,
+      // attempt to launch the task.
+      foreachTaskInState(SchedulerTask::AWAITING_LAUNCH_OFFER,
+          [&] (SchedulerTask& task) {
+        CHECK(task.taskInfo.has_agent_id());
+        havePendingTasks = true;
+        if (offer.agent_id() == task.taskInfo.agent_id() &&
+            remaining.contains(task.taskInfo.resources())) {
+          LOG(INFO) << "Launching task " << task.taskInfo.task_id();
+
+          Offer::Operation* operation = accept->add_operations();
+          operation->set_type(Offer::Operation::LAUNCH);
+          operation->mutable_launch()->add_task_infos()->CopyFrom(
+              task.taskInfo);
+
+          task.stage = SchedulerTask::AWAITING_TASK_FINISHED;
+          ++launches;
+        }
+      });
+
+      // For each task that finished running, attempt to unreserve its
+      // resources.
+      foreachTaskInState(SchedulerTask::AWAITING_UNRESERVE_OFFER,
+          [&] (SchedulerTask& task) {
+        CHECK(task.taskInfo.has_agent_id());
+        havePendingTasks = true;
+        if (offer.agent_id() == task.taskInfo.agent_id()) {
+          LOG(INFO) << "Unreserving resources for task "
+                    << task.taskInfo.task_id();
+
+          Offer::Operation* operation = accept->add_operations();
+          operation->set_type(Offer::Operation::UNRESERVE);
+          std::string id = "unreserve-" + id::UUID::random().toString();
+          operation->mutable_id()->set_value(id);
+          operation->mutable_unreserve()->mutable_resources()->CopyFrom(
+              task.taskInfo.resources());
+
+          task.unreserveOperationId = operation->id();
+          task.stage = SchedulerTask::AWAITING_UNRESERVE_ACK;
+          ++unreservations;
+        }
+      });
+
+      if (havePendingTasks) {
+        LOG(INFO) << "Accepting offer with "
+          << reservations << " `RESERVE` operations, "
+          << launches << " `LAUNCH` operations and "
+          << unreservations << " `UNRESERVE` operations while having "
+          << tasks_.size() << " non-completed tasks";
+      }
+
+      // Each `ACCEPT` call must only contain offers with the same agent
+      // id, so we have to send one call per offer back to the master.
+      // We also want to send the `ACCEPT` call if we don't launch any
+      // operations on this offer, in order to decline the offer.
+      mesos_->send(call);
+    }
+  }
+
+  void taskStatusUpdate(const TaskStatus& status)
+  {
+    VLOG(1) << "Received status update " << status;
+
+    const TaskID& taskId = status.task_id();
+
+    auto taskIterator = std::find_if(tasks_.begin(), tasks_.end(),
+      [&] (const SchedulerTask& tx) {
+        return tx.taskInfo.task_id() == taskId;
+      });
+
+    if (taskIterator == tasks_.end()) {
+      LOG(WARNING) << "Status update for unknown task " << taskId;
+      return;
+    }
+
+    if (status.has_uuid()) {
+      mesos_->send(ACKNOWLEDGE(status, framework_.id()));
+    }
+
+    if (taskIterator->stage != SchedulerTask::AWAITING_TASK_FINISHED) {
+      // We could get spurious updates due to the reconciliation process
+      // or because of retries; ignore them.
+      return;
+    }
+
+    switch (status.state()) {
+      case TASK_STAGING:
+      case TASK_KILLING:
+      case TASK_STARTING:
+      case TASK_RUNNING:
+      case TASK_UNREACHABLE: {
+        // Nothing to do yet, wait for further updates.
+        VLOG(1) << "Received " << status.state() << " for task " << taskId;
+        break;
+      }
+
+      case TASK_DROPPED:
+      case TASK_ERROR:
+      case TASK_KILLED:
+      case TASK_GONE:
+      case TASK_GONE_BY_OPERATOR:
+      case TASK_UNKNOWN:
+      case TASK_LOST:
+      case TASK_FAILED: {
+        LOG(INFO) << "Task " << taskId << " failed, attempting to relaunch"
+                  << " with the next offer";
+
+        taskIterator->stage = SchedulerTask::AWAITING_LAUNCH_OFFER;
+        break;
+      }
+
+      case TASK_FINISHED: {
+        LOG(INFO) << "Task " << taskId << " finished, attempting to unreserve"
+                  << " its resources with the next offer";
+
+        taskIterator->stage = SchedulerTask::AWAITING_UNRESERVE_OFFER;
+        break;
+      }
+    }
+  }
+
+  void operationStatusUpdate(const OperationStatus& status)
+  {
+    VLOG(1) << "Received operation status update " << status;
+
+    if (status.has_uuid()) {
+      mesos_->send(ACKNOWLEDGE_OPERATION(status, framework_.id()));
+    }
+
+    const OperationID& operationId = status.operation_id();
+
+    auto taskIterator = std::find_if(tasks_.begin(), tasks_.end(),
+        [&] (const SchedulerTask& tx) {
+          return tx.reserveOperationId == operationId;
+        });
+
+    if (taskIterator != tasks_.end()) {
+      handleReserveOperationStatusUpdate(taskIterator, status);
+      return;
+    }
+
+    taskIterator = std::find_if(tasks_.begin(), tasks_.end(),
+        [&] (const SchedulerTask& tx) {
+          return tx.unreserveOperationId == operationId;
+        });
+
+    if (taskIterator != tasks_.end()) {
+      handleUnreserveOperationStatusUpdate(taskIterator, status);
+      return;
+    }
+
+    LOG(WARNING) << "Status update for unknown operation " << operationId;
+  }
+
+  void handleReserveOperationStatusUpdate(
+      typename std::list<SchedulerTask>::iterator task,
+      const OperationStatus& status)
+  {
+    if (task->stage != SchedulerTask::AWAITING_RESERVE_ACK) {
+      // We could get spurious updates due to the reconciliation process
+      // or because of retries; ignore them.
+      return;
+    }
+
+    switch (status.state()) {
+      case OPERATION_PENDING:
+      case OPERATION_RECOVERING:
+      case OPERATION_UNKNOWN:
+      case OPERATION_UNREACHABLE: {
+        // Nothing to do but wait.
+        break;
+      }
+
+      case OPERATION_UNSUPPORTED: {
+        // Someone in the future invented an additional operation state
+        // and accidentally sent it to us.
+        break;
+      }
+
+      case OPERATION_FAILED:
+      case OPERATION_ERROR:
+      case OPERATION_DROPPED:
+      case OPERATION_GONE_BY_OPERATOR: {
+        LOG(INFO)
+          << "Received update " << status << " attempting to reserve "
+          << " resources for task " << task->taskInfo.task_id()
+          << "; retrying";
+
+        task->stage = SchedulerTask::AWAITING_RESERVE_OFFER;
+        break;
+      }
+
+      case OPERATION_FINISHED: {
+        LOG(INFO) << "Successfully reserved resources for task "
+                  << task->taskInfo.task_id() << "; awaiting launch offer";
+
+        task->stage = SchedulerTask::AWAITING_LAUNCH_OFFER;
+        break;
+      }
+    }
+  }
+
+  void handleUnreserveOperationStatusUpdate(
+      typename std::list<SchedulerTask>::iterator task,
+      const OperationStatus& status)
+  {
+    if (task->stage != SchedulerTask::AWAITING_UNRESERVE_ACK) {
+      // We could get spurious updates due to the reconciliation
+      // or because of retries; ignore them.
+      return;
+    }
+
+    switch (status.state()) {
+      case OPERATION_PENDING:
+      case OPERATION_RECOVERING:
+      case OPERATION_UNKNOWN:
+      case OPERATION_UNREACHABLE: {
+        // Nothing to do but wait.
+        break;
+      }
+
+      case OPERATION_UNSUPPORTED: {
+        // Someone in the future invented an additional operation state
+        // and accidentally sent it to us.
+        break;
+      }
+
+      case OPERATION_FAILED:
+      case OPERATION_ERROR:
+      case OPERATION_DROPPED: {
+        LOG(INFO)
+          << "Received update " << status << " attempting to unreserve "
+          << " resources for task " << task->taskInfo.task_id()
+          << "; retrying";
+
+        task->stage = SchedulerTask::AWAITING_UNRESERVE_OFFER;
+        break;
+      }
+
+      case OPERATION_FINISHED:
+      case OPERATION_GONE_BY_OPERATOR: {
+        // We also count `GONE_BY_OPERATOR` as a success, because in that
+        // case there's nothing left to unreserve.
+        LOG(INFO) << "Task " << task->taskInfo.task_id() << " done; removing";
+        tasks_.erase(task);
+        if (tasks_.empty()) {
+          LOG(INFO) << "All tasks completed, shutting down the framework...";
+          process::terminate(self());
+        }
+        break;
+      }
+    }
+  }
+
+  void reconcileOperations() {
+    // Keep reconciling as long as the framework is running.
+    process::delay(RECONCILIATION_INTERVAL, self(), 
&Self::reconcileOperations);
+
+    if (!framework_.has_id()) {
+      return;
+    }
+
+    Call call;
+    call.set_type(Call::RECONCILE_OPERATIONS);
+    call.mutable_framework_id()->CopyFrom(framework_.id());
+    Call::ReconcileOperations* reconcile = call.mutable_reconcile_operations();
+
+    for (const SchedulerTask& task : tasks_) {
+      switch (task.stage) {
+        case SchedulerTask::AWAITING_RESERVE_OFFER:
+        case SchedulerTask::AWAITING_UNRESERVE_OFFER:
+        case SchedulerTask::AWAITING_LAUNCH_OFFER:
+        case SchedulerTask::AWAITING_TASK_FINISHED: {
+          // Not currently waiting for any offer feedback.
+          break;
+        }
+
+        case SchedulerTask::AWAITING_RESERVE_ACK: {
+          Call::ReconcileOperations::Operation* operation =
+            reconcile->add_operations();
+          operation->mutable_agent_id()->MergeFrom(task.taskInfo.agent_id());
+          operation->mutable_operation_id()->MergeFrom(
+              task.reserveOperationId.get());
+        }
+
+        case SchedulerTask::AWAITING_UNRESERVE_ACK: {
+          Call::ReconcileOperations::Operation* operation =
+            reconcile->add_operations();
+          operation->mutable_agent_id()->MergeFrom(task.taskInfo.agent_id());
+          operation->mutable_operation_id()->MergeFrom(
+              task.unreserveOperationId.get());
+        }
+      }
+    }
+
+    mesos_->send(call);
+  }
+
+private:
+  // Static helper functions to deal with protobuf generation:
+
+  static Call SUBSCRIBE(const FrameworkInfo& framework)
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    if (framework.has_id()) {
+      call.mutable_framework_id()->CopyFrom(framework.id());
+    }
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(framework);
+
+    return call;
+  }
+
+  static Call ACKNOWLEDGE(
+      const TaskStatus& status,
+      const FrameworkID& frameworkId)
+  {
+    CHECK(status.has_uuid());
+    Call call;
+    call.set_type(Call::ACKNOWLEDGE);
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_agent_id()->CopyFrom(status.agent_id());
+    acknowledge->mutable_task_id()->CopyFrom(status.task_id());
+    acknowledge->set_uuid(std::string(status.uuid()));
+
+    return call;
+  }
+
+  static Call ACKNOWLEDGE_OPERATION(
+      const OperationStatus& status,
+      const FrameworkID& frameworkId)
+  {
+    CHECK(status.has_uuid());
+    Call call;
+    call.set_type(Call::ACKNOWLEDGE_OPERATION_STATUS);
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+
+    Call::AcknowledgeOperationStatus* acknowledge =
+      call.mutable_acknowledge_operation_status();
+    acknowledge->mutable_agent_id()->CopyFrom(status.agent_id());
+    acknowledge->mutable_operation_id()->CopyFrom(status.operation_id());
+    acknowledge->set_uuid(status.uuid().value());
+
+    return call;
+  }
+
+  Owned<Mesos> mesos_;
+  FrameworkInfo framework_;
+  string master_;
+  string role_;
+  Option<Credential> credential_;
+
+  // Represents a task lifecycle from the scheduler's perspective, i.e.
+  // a reserve operation followed by a mesos task followed by an unreserve
+  // operation.
+  struct SchedulerTask {
+    enum Stage {
+      AWAITING_RESERVE_OFFER,
+      AWAITING_RESERVE_ACK,
+      AWAITING_LAUNCH_OFFER,
+      AWAITING_TASK_FINISHED,
+      AWAITING_UNRESERVE_OFFER,
+      AWAITING_UNRESERVE_ACK,
+    };
+
+    Stage stage;
+
+    TaskInfo taskInfo;
+    // Since the resources inside `taskInfo` already contain a
+    // `ReservationInfo`, we store an extra copy without that as a
+    // convenience for offer matching.
+    Resources taskResources;
+    Option<OperationID> reserveOperationId;
+    Option<OperationID> unreserveOperationId;
+  };
+
+  std::list<SchedulerTask> tasks_;
+
+  // This function needs to have `SchedulerTask::Stage` be already defined.
+  template<typename UnaryOperation>
+  void foreachTaskInState(
+      SchedulerTask::Stage stage,
+      UnaryOperation f)
+  {
+    for (SchedulerTask& task : tasks_) {
+      if (task.stage == stage) {
+        f(task);
+      }
+    }
+  }
+};
+
+
+class Flags : public virtual mesos::internal::examples::Flags
+{
+public:
+  Flags()
+  {
+    add(&Flags::user,
+        "user",
+        "The username under which to run tasks.");
+
+    add(&Flags::command,
+        "command",
+        "The command to run for each task.",
+        "sleep 60");
+
+    add(&Flags::resources,
+        "resources",
+        "The resources to reserve for each task.",
+        "cpus:1;mem:32");
+
+    add(&Flags::num_tasks,
+        "num_tasks",
+        "The number of task.",
+        60);
+  }
+
+  string command;
+  string resources;
+  Option<string> user;
+  int num_tasks;
+};
+
+
+int main(int argc, char** argv)
+{
+  Flags flags;
+  Try<flags::Warnings> load = flags.load("MESOS_EXAMPLE_", argc, argv);
+
+  if (flags.help) {
+    std::cout << flags.usage() << std::endl;
+    return EXIT_SUCCESS;
+  }
+
+  if (load.isError()) {
+    std::cerr << flags.usage(load.error()) << std::endl;
+    return EXIT_FAILURE;
+  }
+
+  mesos::internal::logging::initialize(argv[0], false);
+  foreach (const flags::Warning& warning, load->warnings) {
+    LOG(WARNING) << warning.message;
+  }
+
+  if (flags.role == "*") {
+    EXIT(EXIT_FAILURE) << flags.usage(
+        "Role is incorrect; the default '*' role cannot be used");
+  }
+
+  Option<Credential> credential = None();
+
+  if (flags.authenticate) {
+    LOG(INFO) << "Enabling authentication for the framework";
+
+    Credential credential_;
+    credential_.set_principal(flags.principal);
+    if (flags.secret.isSome()) {
+      credential_.set_secret(flags.secret.get());
+    }
+    credential = credential_;
+  }
+
+  Try<Resources> parsedResources = Resources::parse(flags.resources);
+  if (parsedResources.isError()) {
+    EXIT(EXIT_FAILURE)
+      << "Failed to parse resources: "  << parsedResources.error();
+  }
+
+  FrameworkInfo framework;
+  framework.set_user(flags.user.isSome() ? flags.user.get() : 
os::user().get());
+  framework.set_principal(flags.principal);
+  framework.set_name(FRAMEWORK_NAME);
+  framework.set_checkpoint(flags.checkpoint);
+  framework.add_roles(flags.role);
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::MULTI_ROLE);
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::RESERVATION_REFINEMENT);
+
+  LOG(INFO) << "Starting OperationFeedbackScheduler for " << flags.num_tasks
+            << " tasks with command " << flags.command;
+
+  OperationFeedbackScheduler scheduler(
+      framework,
+      flags.master,
+      flags.role,
+      credential);
+
+  for (int i=0; i < flags.num_tasks; ++i) {
+    scheduler.addTask(flags.command, parsedResources.get());
+  }
+
+  if (flags.master == "local") {
+    // Configure master. The constructor of `Mesos()` will load all
+    // environment variables prefixed by `MESOS_`.
+    os::setenv("MESOS_AUTHENTICATE_FRAMEWORKS", stringify(flags.authenticate));
+
+    ACLs acls;
+    ACL::RegisterFramework* acl = acls.add_register_frameworks();
+    acl->mutable_principals()->set_type(ACL::Entity::ANY);
+    acl->mutable_roles()->add_values(flags.role);
+    os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls)));
+  }
+
+  process::spawn(&scheduler);
+  process::wait(&scheduler);
+
+  return EXIT_SUCCESS;
+}
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index ab7f6c2..f3acd82 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -357,6 +357,7 @@ if (NOT WIN32)
     load-generator-framework
     long-lived-executor
     long-lived-framework
+    operation-feedback-framework
     no-executor-framework
     persistent-volume-framework
     test-executor
diff --git a/src/tests/examples_tests.cpp b/src/tests/examples_tests.cpp
index cadf1d0..9c7b28b 100644
--- a/src/tests/examples_tests.cpp
+++ b/src/tests/examples_tests.cpp
@@ -34,6 +34,9 @@ TEST_SCRIPT(ExamplesTest, DynamicReservationFramework,
 TEST_SCRIPT(ExamplesTest, DiskFullFramework,
             "disk_full_framework_test.sh")
 
+TEST_SCRIPT(ExamplesTest, OperationFeedbackFramework,
+            "operation_feedback_framework_test.sh")
+
 #ifdef MESOS_HAS_JAVA
 TEST_SCRIPT(ExamplesTest, JavaFramework, "java_framework_test.sh")
 TEST_SCRIPT(ExamplesTest, JavaException, "java_exception_test.sh")
diff --git a/src/tests/operation_feedback_framework_test.sh 
b/src/tests/operation_feedback_framework_test.sh
new file mode 100755
index 0000000..168b0db
--- /dev/null
+++ b/src/tests/operation_feedback_framework_test.sh
@@ -0,0 +1,44 @@
+#!/usr/bin/env bash
+
+# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment.
+
+env | grep MESOS_SOURCE_DIR >/dev/null
+
+test $? != 0 && \
+  echo "Failed to find MESOS_SOURCE_DIR in environment" && \
+  exit 1
+
+env | grep MESOS_BUILD_DIR >/dev/null
+
+test $? != 0 && \
+  echo "Failed to find MESOS_BUILD_DIR in environment" && \
+  exit 1
+
+source ${MESOS_SOURCE_DIR}/support/colors.sh
+source ${MESOS_SOURCE_DIR}/support/atexit.sh
+
+MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX`
+
+atexit "rm -rf ${MESOS_WORK_DIR}"
+export MESOS_WORK_DIR=${MESOS_WORK_DIR}
+
+MESOS_RUNTIME_DIR=`mktemp -d -t mesos-XXXXXX`
+
+atexit "rm -rf ${MESOS_RUNTIME_DIR}"
+export MESOS_RUNTIME_DIR=${MESOS_RUNTIME_DIR}
+
+# Lower the authentication timeout to speed up the test (the master
+# may drop the authentication message while it is recovering).
+export MESOS_AUTHENTICATION_TIMEOUT=200ms
+
+# Set local Mesos runner to use 1 agent
+export MESOS_NUM_SLAVES=1
+
+# Set isolation for the slave.
+export MESOS_ISOLATION="filesystem/posix,posix/cpu,posix/mem"
+
+# Set launcher for the slave.
+export MESOS_LAUNCHER="posix"
+
+# Check that the C++ test framework executes without crashing (returns 0).
+exec ${MESOS_HELPER_DIR}/operation-feedback-framework --master=local 
--role=test --num_tasks=1 --command="echo hello"

Reply via email to