Added tests for operation status reconciliation.

Review: https://reviews.apache.org/r/66468/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b8cca436
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b8cca436
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b8cca436

Branch: refs/heads/master
Commit: b8cca436fb5646adad8174124efdb549f6988422
Parents: c39ef69
Author: Gaston Kleiman <[email protected]>
Authored: Mon Apr 23 13:43:58 2018 -0700
Committer: Greg Mann <[email protected]>
Committed: Mon Apr 23 13:50:43 2018 -0700

----------------------------------------------------------------------
 src/Makefile.am                                 |   1 +
 src/tests/CMakeLists.txt                        |   1 +
 src/tests/mesos.hpp                             |   1 +
 src/tests/operation_reconciliation_tests.cpp    | 843 +++++++++++++++++++
 .../storage_local_resource_provider_tests.cpp   | 181 ++++
 5 files changed, 1027 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b8cca436/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index e50e43b..7e91681 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2552,6 +2552,7 @@ mesos_tests_SOURCES =                                     
        \
   tests/mock_registrar.cpp                                     \
   tests/module.cpp                                             \
   tests/module_tests.cpp                                       \
+  tests/operation_reconciliation_tests.cpp                     \
   tests/operation_status_update_manager_tests.cpp              \
   tests/oversubscription_tests.cpp                             \
   tests/partition_tests.cpp                                    \

http://git-wip-us.apache.org/repos/asf/mesos/blob/b8cca436/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index 4eb8e23..1fef060 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -108,6 +108,7 @@ set(MESOS_TESTS_SRC
   http_fault_tolerance_tests.cpp
   master_maintenance_tests.cpp
   master_slave_reconciliation_tests.cpp
+  operation_reconciliation_tests.cpp
   operation_status_update_manager_tests.cpp
   partition_tests.cpp
   paths_tests.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/b8cca436/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index dcbfb95..756a521 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -2597,6 +2597,7 @@ public:
 namespace v1 {
 namespace scheduler {
 
+using APIResult = mesos::v1::scheduler::APIResult;
 using Call = mesos::v1::scheduler::Call;
 using Event = mesos::v1::scheduler::Event;
 using Mesos = mesos::v1::scheduler::Mesos;

http://git-wip-us.apache.org/repos/asf/mesos/blob/b8cca436/src/tests/operation_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/operation_reconciliation_tests.cpp 
b/src/tests/operation_reconciliation_tests.cpp
new file mode 100644
index 0000000..76c1695
--- /dev/null
+++ b/src/tests/operation_reconciliation_tests.cpp
@@ -0,0 +1,843 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <gmock/gmock.h>
+
+#include <mesos/mesos.hpp>
+
+#include <mesos/v1/mesos.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/http.hpp>
+#include <process/message.hpp>
+
+#include <stout/gtest.hpp>
+
+#include "master/master.hpp"
+
+#include "master/detector/standalone.hpp"
+
+#include "slave/slave.hpp"
+
+#include "tests/mesos.hpp"
+
+using mesos::master::detector::MasterDetector;
+using mesos::master::detector::StandaloneMasterDetector;
+
+using process::Clock;
+using process::Future;
+using process::Message;
+using process::Owned;
+
+using testing::Eq;
+using testing::WithParamInterface;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+namespace v1 {
+
+class OperationReconciliationTest
+  : public MesosTest,
+    public WithParamInterface<ContentType> {};
+
+
+// These tests are parameterized by the content type of the HTTP request.
+INSTANTIATE_TEST_CASE_P(
+    ContentType,
+    OperationReconciliationTest,
+    ::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
+
+
+// This test ensures that the master responds with `OPERATION_PENDING` for
+// operations that are pending at the master.
+TEST_P(OperationReconciliationTest, PendingOperation)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(frameworkInfo));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(scheduler::DeclineOffers()); // Decline subsequent offers.
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  scheduler::TestMesos mesos(master.get()->pid, GetParam(), scheduler);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const Offer& offer = offers->offers(0);
+  const AgentID& agentId = offer.agent_id();
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  const Resources reservedResources =
+    Resources(offer.resources())
+      .pushReservation(createDynamicReservationInfo(
+          frameworkInfo.roles(0), frameworkInfo.principal()));
+
+  // We'll drop the `ApplyOperationMessage` from the master to the agent.
+  Future<ApplyOperationMessage> applyOperationMessage =
+    DROP_PROTOBUF(ApplyOperationMessage(), master.get()->pid, _);
+
+  mesos.send(createCallAccept(
+      frameworkId,
+      offer,
+      {RESERVE(reservedResources, operationId.value())}));
+
+  AWAIT_READY(applyOperationMessage);
+
+  scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<scheduler::APIResult> result =
+    mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const scheduler::Response response = result->response();
+  ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  EXPECT_EQ(operationId, operationStatus.operation_id());
+  EXPECT_EQ(OPERATION_PENDING, operationStatus.state());
+  EXPECT_FALSE(operationStatus.has_uuid());
+}
+
+
+// This test verifies that reconciliation of an unknown operation that belongs
+// to an agent that has been recovered from the registry after master failover
+// but has not yet registered, results in `OPERATION_RECOVERING`.
+//
+// TODO(gkleiman): Enable this test on Windows once Windows supports the
+// replicated log.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(
+    OperationReconciliationTest, UnknownOperationRecoveredAgent)
+{
+  mesos::internal::master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.registry = "replicated_log";
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the agent to register and get the agent ID.
+  AWAIT_READY(slaveRegisteredMessage);
+  const AgentID agentId = evolve(slaveRegisteredMessage->slave_id());
+
+  // Stop the master.
+  master->reset();
+
+  // Stop the slave.
+  slave.get()->terminate();
+  slave->reset();
+
+  // Restart the master.
+  master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(DEFAULT_FRAMEWORK_INFO));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  // Decline all offers.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(scheduler::DeclineOffers());
+
+  scheduler::TestMesos mesos(master.get()->pid, GetParam(), scheduler);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<scheduler::APIResult> result =
+    mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const scheduler::Response response = result->response();
+  ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  EXPECT_EQ(operationId, operationStatus.operation_id());
+  EXPECT_EQ(OPERATION_RECOVERING, operationStatus.state());
+  EXPECT_FALSE(operationStatus.has_uuid());
+}
+
+
+// This test verifies that reconciliation of an unknown operation that belongs
+// to a known agent results in `OPERATION_UNKNOWN`.
+TEST_P(OperationReconciliationTest, UnknownOperationKnownAgent)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the agent to register and get the agent ID.
+  AWAIT_READY(slaveRegisteredMessage);
+  const AgentID agentId = evolve(slaveRegisteredMessage->slave_id());
+
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(DEFAULT_FRAMEWORK_INFO));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  // Decline all offers.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(scheduler::DeclineOffers());
+
+  scheduler::TestMesos mesos(master.get()->pid, GetParam(), scheduler);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<scheduler::APIResult> result =
+    mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const scheduler::Response response = result->response();
+  ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  EXPECT_EQ(operationId, operationStatus.operation_id());
+  EXPECT_EQ(OPERATION_UNKNOWN, operationStatus.state());
+  EXPECT_FALSE(operationStatus.has_uuid());
+}
+
+
+// This test verifies that reconciliation of an unknown operation that belongs
+// to an unreachable agent results in `OPERATION_UNREACHABLE`.
+TEST_P(OperationReconciliationTest, UnknownOperationUnreachableAgent)
+{
+  Clock::pause();
+
+  mesos::internal::master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Allow the master to PING the agent, but drop all PONG messages
+  // from the agent. Note that we don't match on the master / agent
+  // PIDs because it's actually the `SlaveObserver` process that sends
+  // the pings.
+  Future<Message> ping =
+    FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+
+  DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the agent to register and get the agent ID.
+  AWAIT_READY(slaveRegisteredMessage);
+  const AgentID agentId = evolve(slaveRegisteredMessage->slave_id());
+
+  // Now, induce a partition of the agent by having the master
+  // timeout the agent.
+  size_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+  Clock::settle();
+
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(DEFAULT_FRAMEWORK_INFO));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  // Decline all offers.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(scheduler::DeclineOffers());
+
+  scheduler::TestMesos mesos(master.get()->pid, GetParam(), scheduler);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<scheduler::APIResult> result =
+    mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const scheduler::Response response = result->response();
+  ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  EXPECT_EQ(operationId, operationStatus.operation_id());
+  EXPECT_EQ(OPERATION_UNREACHABLE, operationStatus.state());
+  EXPECT_FALSE(operationStatus.has_uuid());
+}
+
+
+// This test verifies that reconciliation of an unknown operation that belongs
+// to an agent marked gone results in `OPERATION_GONE_BY_OPERATOR`.
+TEST_P(OperationReconciliationTest, UnknownOperationAgentMarkedGone)
+{
+  Clock::pause();
+
+  mesos::internal::master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the agent to register and get the agent ID.
+  AWAIT_READY(slaveRegisteredMessage);
+  const AgentID agentId = evolve(slaveRegisteredMessage->slave_id());
+
+  ContentType contentType = GetParam();
+
+  {
+    master::Call call;
+    call.set_type(master::Call::MARK_AGENT_GONE);
+
+    call.mutable_mark_agent_gone()->mutable_agent_id()->CopyFrom(agentId);
+
+    Future<process::http::Response> response = process::http::post(
+        master.get()->pid,
+        "api/v1",
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+        serialize(contentType, call),
+        stringify(contentType));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
+  }
+
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(DEFAULT_FRAMEWORK_INFO));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  // Decline all offers.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(scheduler::DeclineOffers());
+
+  scheduler::TestMesos mesos(master.get()->pid, contentType, scheduler);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<scheduler::APIResult> result =
+    mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const scheduler::Response response = result->response();
+  ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  EXPECT_EQ(operationId, operationStatus.operation_id());
+  EXPECT_EQ(OPERATION_GONE_BY_OPERATOR, operationStatus.state());
+  EXPECT_FALSE(operationStatus.has_uuid());
+}
+
+
+// This test verifies that reconciliation of an unknown operation that belongs
+// to an unknown agent results in `OPERATION_UNKNOWN`.
+TEST_P(OperationReconciliationTest, UnknownOperationUnknownAgent)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(DEFAULT_FRAMEWORK_INFO));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  // Decline all offers.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(scheduler::DeclineOffers());
+
+  scheduler::TestMesos mesos(master.get()->pid, GetParam(), scheduler);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  AgentID agentId;
+  agentId.set_value("agent");
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<scheduler::APIResult> result =
+    mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const scheduler::Response response = result->response();
+  ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  EXPECT_EQ(operationId, operationStatus.operation_id());
+  EXPECT_EQ(OPERATION_UNKNOWN, operationStatus.state());
+  EXPECT_FALSE(operationStatus.has_uuid());
+}
+
+
+// This test verifies that, after a master failover, reconciliation of an
+// operation that is still pending on an agent results in `OPERATION_PENDING`.
+TEST_P(OperationReconciliationTest, AgentPendingOperationAfterMasterFailover)
+{
+  Clock::pause();
+
+  mesos::internal::master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  auto detector = 
std::make_shared<StandaloneMasterDetector>(master.get()->pid);
+
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the agent to register.
+  AWAIT_READY(updateSlaveMessage);
+
+  // Start and register a resource provider.
+
+  ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+  resourceProviderInfo.set_name("test");
+
+  Resource disk =
+    createDiskResource("200", "*", None(), None(), createDiskSourceRaw());
+
+  Owned<MockResourceProvider> resourceProvider(
+      new MockResourceProvider(
+          resourceProviderInfo,
+          Resources(disk)));
+
+  // We override the mock resource provider's default action, so the operation
+  // will stay in `OPERATION_PENDING`.
+  Future<resource_provider::Event::ApplyOperation> applyOperation;
+  EXPECT_CALL(*resourceProvider, applyOperation(_))
+    .WillOnce(FutureArg<0>(&applyOperation));
+
+  Owned<EndpointDetector> endpointDetector(
+      mesos::internal::tests::resource_provider::createEndpointDetector(
+          slave.get()->pid));
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // fully register.
+  Clock::resume();
+
+  ContentType contentType = GetParam();
+
+  resourceProvider->start(endpointDetector, contentType, DEFAULT_CREDENTIAL);
+
+  // Wait until the agent's resources have been updated to include the
+  // resource provider resources.
+  AWAIT_READY(updateSlaveMessage);
+  ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+  ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+
+  Clock::pause();
+
+  // Start a v1 framework.
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(frameworkInfo));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  // Decline offers that do not contain wanted resources.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(scheduler::DeclineOffers());
+
+  Future<scheduler::Event::Offers> offers;
+
+  auto isRaw = [](const Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::RAW;
+  };
+
+  EXPECT_CALL(*scheduler, offers(_, scheduler::OffersHaveAnyResource(
+      std::bind(isRaw, lambda::_1))))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(scheduler::DeclineOffers()); // Decline successive offers.
+
+  scheduler::TestMesos mesos(
+      master.get()->pid, contentType, scheduler, detector);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  // NOTE: If the framework has not declined an unwanted offer yet when
+  // the master updates the agent with the RAW disk resource, the new
+  // allocation triggered by this update won't generate an allocatable
+  // offer due to no CPU and memory resources. So here we first settle
+  // the clock to ensure that the unwanted offer has been declined, then
+  // advance the clock to trigger another allocation.
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const Offer& offer = offers->offers(0);
+  const AgentID& agentId = offer.agent_id();
+
+  Option<Resource> source;
+  Option<ResourceProviderID> resourceProviderId;
+  foreach (const Resource& resource, offer.resources()) {
+    if (isRaw(resource)) {
+      source = resource;
+
+      ASSERT_TRUE(resource.has_provider_id());
+      resourceProviderId = resource.provider_id();
+
+      break;
+    }
+  }
+
+  ASSERT_SOME(source);
+  ASSERT_SOME(resourceProviderId);
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  mesos.send(createCallAccept(
+      frameworkId,
+      offer,
+      {CREATE_VOLUME(
+          source.get(),
+          Resource::DiskInfo::Source::MOUNT,
+          operationId.value())}));
+
+  AWAIT_READY(applyOperation);
+
+  // Simulate master failover.
+
+  detector->appoint(None());
+
+  master->reset();
+  master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Settle the clock to ensure the master finishes recovering the registry.
+  Clock::settle();
+
+  Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF(
+      SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(frameworkInfo, frameworkId));
+
+  Future<scheduler::Event::Subscribed> frameworkResubscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&frameworkResubscribed));
+
+  // Simulate a new master detected event to the agent and the scheduler.
+  detector->appoint(master.get()->pid);
+
+  // Advance the clock, so that the agent re-registers.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the framework and agent to re-register.
+  AWAIT_READY(slaveReregistered);
+  AWAIT_READY(frameworkResubscribed);
+
+  // Test explicit reconciliation
+  {
+    scheduler::Call::ReconcileOperations::Operation operation;
+    operation.mutable_operation_id()->CopyFrom(operationId);
+    operation.mutable_agent_id()->CopyFrom(agentId);
+
+    const Future<scheduler::APIResult> result =
+      mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+    AWAIT_READY(result);
+
+    // The master should respond with '200 OK' and with a 
`scheduler::Response`.
+    ASSERT_EQ(process::http::Status::OK, result->status_code());
+    ASSERT_TRUE(result->has_response());
+
+    const scheduler::Response response = result->response();
+    ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+    ASSERT_TRUE(response.has_reconcile_operations());
+
+    const scheduler::Response::ReconcileOperations& reconcile =
+      response.reconcile_operations();
+    ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+    const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+    EXPECT_EQ(operationId, operationStatus.operation_id());
+    EXPECT_EQ(OPERATION_PENDING, operationStatus.state());
+    EXPECT_FALSE(operationStatus.has_uuid());
+  }
+
+  // Test implicit reconciliation
+  {
+    const Future<scheduler::APIResult> result =
+      mesos.call({createCallReconcileOperations(frameworkId, {})});
+
+    AWAIT_READY(result);
+
+    // The master should respond with '200 OK' and with a 
`scheduler::Response`.
+    ASSERT_EQ(process::http::Status::OK, result->status_code());
+    ASSERT_TRUE(result->has_response());
+
+    const scheduler::Response response = result->response();
+    ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+    ASSERT_TRUE(response.has_reconcile_operations());
+
+    const scheduler::Response::ReconcileOperations& reconcile =
+      response.reconcile_operations();
+    ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+    const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+    EXPECT_EQ(operationId, operationStatus.operation_id());
+    EXPECT_EQ(OPERATION_PENDING, operationStatus.state());
+    EXPECT_FALSE(operationStatus.has_uuid());
+  }
+}
+
+} // namespace v1 {
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b8cca436/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp 
b/src/tests/storage_local_resource_provider_tests.cpp
index 2872f1a..ccb114a 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -3305,6 +3305,187 @@ TEST_F(
   Clock::settle();
 }
 
+
+// This test ensures that the master responds with the latest state
+// for operations that are terminal at the master, but have not been
+// acknowledged by the framework.
+TEST_F(
+    StorageLocalResourceProviderTest,
+    ROOT_ReconcileUnacknowledgedTerminalOperation)
+{
+  Clock::pause();
+
+  loadUriDiskProfileAdaptorModule();
+
+  setupResourceProviderConfig(Gigabytes(4));
+  setupDiskProfileMapping();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  flags.authenticate_http_readwrite = false;
+
+  flags.resource_provider_config_dir = resourceProviderConfigDir;
+  flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  // Since the local resource provider daemon is started after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider.
+  //
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(flags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave1);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // periodically check if the CSI endpoint socket has been created by
+  // the plugin container, which runs in another Linux process.
+  Clock::resume();
+
+  AWAIT_READY(updateSlave2);
+  ASSERT_TRUE(updateSlave2->has_resource_providers());
+  ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+
+  Clock::pause();
+
+  // Register a framework to exercise an operation.
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, "storage");
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  // Decline offers that do not contain wanted resources.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(v1::scheduler::DeclineOffers());
+
+  Future<v1::scheduler::Event::Offers> offers;
+
+  auto isRaw = [](const v1::Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().type() == v1::Resource::DiskInfo::Source::RAW;
+  };
+
+  EXPECT_CALL(*scheduler, offers(_, v1::scheduler::OffersHaveAnyResource(
+      std::bind(isRaw, lambda::_1))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // NOTE: If the framework has not declined an unwanted offer yet when
+  // the master updates the agent with the RAW disk resource, the new
+  // allocation triggered by this update won't generate an allocatable
+  // offer due to no CPU and memory resources. So here we first settle
+  // the clock to ensure that the unwanted offer has been declined, then
+  // advance the clock to trigger another allocation.
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Future<v1::scheduler::Event::UpdateOperationStatus> update;
+  EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Option<v1::Resource> source;
+  Option<mesos::v1::ResourceProviderID> resourceProviderId;
+  foreach (const v1::Resource& resource, offer.resources()) {
+    if (isRaw(resource)) {
+      source = resource;
+
+      ASSERT_TRUE(resource.has_provider_id());
+      resourceProviderId = resource.provider_id();
+
+      break;
+    }
+  }
+
+  ASSERT_SOME(source);
+  ASSERT_SOME(resourceProviderId);
+
+  v1::OperationID operationId;
+  operationId.set_value("operation");
+
+  mesos.send(v1::createCallAccept(
+      frameworkId,
+      offer,
+      {v1::CREATE_VOLUME(
+          source.get(),
+          v1::Resource::DiskInfo::Source::MOUNT,
+          operationId.value())}));
+
+  AWAIT_READY(update);
+
+  ASSERT_EQ(operationId, update->status().operation_id());
+  ASSERT_EQ(v1::OperationState::OPERATION_FINISHED, update->status().state());
+  ASSERT_TRUE(update->status().has_uuid());
+
+  v1::scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<v1::scheduler::APIResult> result =
+    mesos.call({v1::createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const v1::scheduler::Response response = result->response();
+  ASSERT_EQ(v1::scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const v1::scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const v1::OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  ASSERT_EQ(operationId, operationStatus.operation_id());
+  ASSERT_EQ(v1::OPERATION_FINISHED, operationStatus.state());
+  ASSERT_TRUE(operationStatus.has_uuid());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to