Added `unreachable_time` to TaskStatus.

This field contains the time at which the master marked an agent as
unreachable. It will only be set for status updates (including
reconciliation updates) that describe tasks running on unreachable
(e.g., partitioned) agents.

The intent is to help frameworks decide how to handle tasks running on
partitioned agents: since the framework might have missed the initial
TASK_LOST/TASK_UNREACHABLE status update (e.g., due to framework
downtime/failover), it might otherwise be difficult for frameworks to
determine how long an agent has been partitioned.

This unreachable timestamp is stored in the registry and loaded as part
of master recovery.

Review: https://reviews.apache.org/r/50845/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ac269193
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ac269193
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ac269193

Branch: refs/heads/master
Commit: ac26919329cb137e09b1a6c8ef500dea2aa908eb
Parents: 06506fa
Author: Neil Conway <neil.con...@gmail.com>
Authored: Mon Sep 19 15:47:26 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Mon Sep 19 15:47:26 2016 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto          |   6 ++
 include/mesos/v1/mesos.proto       |   6 ++
 src/common/protobuf_utils.cpp      |   7 +-
 src/common/protobuf_utils.hpp      |   3 +-
 src/master/master.cpp              |  53 ++++++++--
 src/master/master.hpp              |  11 ++-
 src/tests/partition_tests.cpp      |  32 ++++++
 src/tests/reconciliation_tests.cpp | 168 ++++++++++++++++++++++++++++++++
 src/tests/registrar_tests.cpp      |  19 +++-
 9 files changed, 289 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ac269193/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 7241c11..2209ea2 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1636,6 +1636,12 @@ message TaskStatus {
   // Container related information that is resolved dynamically such as
   // network address.
   optional ContainerStatus container_status = 13;
+
+  // The time (according to the master's clock) when the agent where
+  // this task was running became unreachable. This is only set on
+  // status updates for tasks running on agents that are unreachable
+  // (e.g., partitioned away from the master).
+  optional TimeInfo unreachable_time = 14;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac269193/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index 04eaeb3..00c6234 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -1635,6 +1635,12 @@ message TaskStatus {
   // Container related information that is resolved dynamically such as
   // network address.
   optional ContainerStatus container_status = 13;
+
+  // The time (according to the master's clock) when the agent where
+  // this task was running became unreachable. This is only set on
+  // status updates for tasks running on agents that are unreachable
+  // (e.g., partitioned away from the master).
+  optional TimeInfo unreachable_time = 14;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac269193/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index ed3ac7f..5db4be4 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -102,7 +102,8 @@ StatusUpdate createStatusUpdate(
     const Option<ExecutorID>& executorId,
     const Option<bool>& healthy,
     const Option<Labels>& labels,
-    const Option<ContainerStatus>& containerStatus)
+    const Option<ContainerStatus>& containerStatus,
+    const Option<TimeInfo> unreachableTime)
 {
   StatusUpdate update;
 
@@ -150,6 +151,10 @@ StatusUpdate createStatusUpdate(
     status->mutable_container_status()->CopyFrom(containerStatus.get());
   }
 
+  if (unreachableTime.isSome()) {
+    status->mutable_unreachable_time()->CopyFrom(unreachableTime.get());
+  }
+
   return update;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac269193/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 3324838..c5e5a9a 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -74,7 +74,8 @@ StatusUpdate createStatusUpdate(
     const Option<ExecutorID>& executorId = None(),
     const Option<bool>& healthy = None(),
     const Option<Labels>& labels = None(),
-    const Option<ContainerStatus>& containerStatus = None());
+    const Option<ContainerStatus>& containerStatus = None(),
+    const Option<TimeInfo> unreachableTime = None());
 
 
 StatusUpdate createStatusUpdate(

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac269193/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 89fde24..d5a194b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1569,6 +1569,11 @@ Future<Nothing> Master::_recover(const Registry& 
registry)
     slaves.recovered.insert(slave.info().id());
   }
 
+  foreach (const Registry::UnreachableSlave& unreachable,
+           registry.unreachable().slaves()) {
+    slaves.unreachable[unreachable.id()] = unreachable.timestamp();
+  }
+
   // Set up a timeout for slaves to re-register.
   slaves.recoveredTimer =
     delay(flags.agent_reregister_timeout,
@@ -1759,7 +1764,8 @@ Nothing Master::markUnreachableAfterFailover(const 
Registry::Slave& slave)
   if (flags.registry_strict) {
     slaves.markingUnreachable.insert(slave.info().id());
 
-    registrar->apply(Owned<Operation>(new MarkSlaveUnreachable(slave.info())))
+    registrar->apply(Owned<Operation>(
+        new MarkSlaveUnreachable(slave.info(), protobuf::getCurrentTime())))
       .onAny(defer(self(),
                    &Self::_markUnreachable,
                    slave.info(),
@@ -1774,7 +1780,8 @@ Nothing Master::markUnreachableAfterFailover(const 
Registry::Slave& slave)
     const string& message =
       "Failed to mark agent " + stringify(slave.info().id()) + " unreachable";
 
-    registrar->apply(Owned<Operation>(new MarkSlaveUnreachable(slave.info())))
+    registrar->apply(Owned<Operation>(
+        new MarkSlaveUnreachable(slave.info(), protobuf::getCurrentTime())))
       .onFailed(lambda::bind(fail, message, lambda::_1));
   }
 
@@ -5676,6 +5683,10 @@ void Master::markUnreachable(const SlaveID& slaveId)
   // the slave is already removed.
   allocator->removeSlave(slave->id);
 
+  // Use the same timestamp for all status updates sent below; we also
+  // use this timestamp when updating the registry.
+  TimeInfo unreachableTime = protobuf::getCurrentTime();
+
   // Transition the tasks to TASK_LOST and remove them, BUT do not
   // send updates yet. Rather, build up the updates so that we can can
   // send them after the slave has been moved to the unreachable list
@@ -5698,7 +5709,11 @@ void Master::markUnreachable(const SlaveID& slaveId)
           "Slave " + slave->info.hostname() + " is unreachable",
           TaskStatus::REASON_SLAVE_REMOVED,
           (task->has_executor_id() ?
-              Option<ExecutorID>(task->executor_id()) : None()));
+              Option<ExecutorID>(task->executor_id()) : None()),
+          None(),
+          None(),
+          None(),
+          unreachableTime);
 
       updateTask(task, update);
       removeTask(task);
@@ -5737,6 +5752,7 @@ void Master::markUnreachable(const SlaveID& slaveId)
   // Mark the slave as being unreachable.
   slaves.registered.remove(slave);
   slaves.removed.put(slave->id, Nothing());
+  slaves.unreachable[slave->id] = unreachableTime;
   slaves.markingUnreachable.insert(slave->id);
   authenticated.erase(slave->pid);
 
@@ -5755,7 +5771,8 @@ void Master::markUnreachable(const SlaveID& slaveId)
   // Update the registry to move this slave from the list of admitted
   // slaves to the list of unreachable slaves. Once this is completed,
   // we can forward the TASK_LOST updates to the frameworks.
-  registrar->apply(Owned<Operation>(new MarkSlaveUnreachable(slave->info)))
+  registrar->apply(Owned<Operation>(
+          new MarkSlaveUnreachable(slave->info, unreachableTime)))
     .onAny(defer(self(),
                  &Self::_markUnreachable,
                  slave->info,
@@ -5955,9 +5972,10 @@ void Master::_reconcileTasks(
   //   (2) Task is known: send the latest state.
   //   (3) Task is unknown, slave is registered: TASK_LOST.
   //   (4) Task is unknown, slave is transitioning: no-op.
-  //   (5) Task is unknown, slave is unknown: TASK_LOST.
+  //   (5) Task is unknown, slave is unreachable: TASK_LOST.
+  //   (6) Task is unknown, slave is unknown: TASK_LOST.
   //
-  // When using a non-strict registry, case (5) may result in
+  // When using a non-strict registry, case (6) may result in
   // a TASK_LOST for a task that may later be non-terminal. This
   // is better than no reply at all because the framework can take
   // action for TASK_LOST. Later, if the task is running, the
@@ -6023,8 +6041,28 @@ void Master::_reconcileTasks(
       LOG(INFO) << "Dropping reconciliation of task " << status.task_id()
                 << " for framework " << *framework
                 << " because there are transitional agents";
+    } else if (slaveId.isSome() && slaves.unreachable.contains(slaveId.get())) 
{
+      // (5) Slave is unreachable: TASK_LOST. The status update
+      // includes the time at which the slave was marked as
+      // unreachable.
+      TimeInfo unreachableTime = slaves.unreachable[slaveId.get()];
+
+      update = protobuf::createStatusUpdate(
+          framework->id(),
+          slaveId.get(),
+          status.task_id(),
+          TASK_LOST,
+          TaskStatus::SOURCE_MASTER,
+          None(),
+          "Reconciliation: Task is unreachable",
+          TaskStatus::REASON_RECONCILIATION,
+          None(),
+          None(),
+          None(),
+          None(),
+          unreachableTime);
     } else {
-      // (5) Task is unknown, slave is unknown: TASK_LOST.
+      // (6) Task is unknown, slave is unknown: TASK_LOST.
       update = protobuf::createStatusUpdate(
           framework->id(),
           slaveId,
@@ -7000,6 +7038,7 @@ void Master::addSlave(
   CHECK_NOTNULL(slave);
 
   slaves.removed.erase(slave->id);
+  slaves.unreachable.erase(slave->id);
   slaves.registered.put(slave);
 
   link(slave->pid);

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac269193/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index b092de4..6aeb1b9 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1697,6 +1697,11 @@ private:
     // TODO(bmahler): Ideally we could use a cache with set semantics.
     Cache<SlaveID, Nothing> removed;
 
+    // Slaves that have been marked unreachable. We recover this from the
+    // registry, so it includes slaves marked as unreachable by other
+    // instances of the master.
+    hashmap<SlaveID, TimeInfo> unreachable;
+
     // This rate limiter is used to limit the removal of slaves failing
     // health checks.
     // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
@@ -1942,7 +1947,8 @@ private:
 class MarkSlaveUnreachable : public Operation
 {
 public:
-  explicit MarkSlaveUnreachable(const SlaveInfo& _info) : info(_info) {
+  MarkSlaveUnreachable(const SlaveInfo& _info, TimeInfo _unreachableTime)
+    : info(_info), unreachableTime(_unreachableTime) {
     CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
   }
 
@@ -1970,7 +1976,7 @@ protected:
           registry->mutable_unreachable()->add_slaves();
 
         unreachable->mutable_id()->CopyFrom(info.id());
-        unreachable->mutable_timestamp()->CopyFrom(protobuf::getCurrentTime());
+        unreachable->mutable_timestamp()->CopyFrom(unreachableTime);
 
         return true; // Mutation.
       }
@@ -1982,6 +1988,7 @@ protected:
 
 private:
   const SlaveInfo info;
+  const TimeInfo unreachableTime;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac269193/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index efdb49e..f962f84 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -402,6 +402,15 @@ TEST_P(PartitionTest, ReregisterSlaveNotPartitionAware)
   }
 
   Clock::advance(masterFlags.agent_ping_timeout);
+  Clock::settle();
+
+  // Record the time at which we expect the master to have marked the
+  // agent as unhealthy. We then advance the clock -- this shouldn't
+  // do anything, but it ensures that the `unreachable_time` we check
+  // below is computed at the right time.
+  TimeInfo partitionTime = protobuf::getCurrentTime();
+
+  Clock::advance(Milliseconds(100));
 
   // The scheduler should see TASK_LOST because it is not
   // PARTITION_AWARE.
@@ -410,6 +419,7 @@ TEST_P(PartitionTest, ReregisterSlaveNotPartitionAware)
   EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, lostStatus.get().reason());
   EXPECT_EQ(task.task_id(), lostStatus.get().task_id());
   EXPECT_EQ(slaveId, lostStatus.get().slave_id());
+  EXPECT_EQ(partitionTime, lostStatus.get().unreachable_time());
 
   AWAIT_READY(slaveLost);
 
@@ -450,6 +460,7 @@ TEST_P(PartitionTest, ReregisterSlaveNotPartitionAware)
   AWAIT_READY(reconcileUpdate);
   EXPECT_EQ(TASK_LOST, reconcileUpdate.get().state());
   EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate.get().reason());
+  EXPECT_FALSE(reconcileUpdate.get().has_unreachable_time());
 
   Clock::resume();
 
@@ -608,6 +619,15 @@ TEST_P(PartitionTest, 
PartitionedSlaveReregistrationMasterFailover)
   }
 
   Clock::advance(masterFlags.agent_ping_timeout);
+  Clock::settle();
+
+  // Record the time at which we expect the master to have marked the
+  // agent as unhealthy. We then advance the clock -- this shouldn't
+  // do anything, but it ensures that the `unreachable_time` we check
+  // below is computed at the right time.
+  TimeInfo partitionTime = protobuf::getCurrentTime();
+
+  Clock::advance(Milliseconds(100));
 
   // `sched1` should see TASK_LOST.
   AWAIT_READY(lostStatus);
@@ -615,6 +635,7 @@ TEST_P(PartitionTest, 
PartitionedSlaveReregistrationMasterFailover)
   EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, lostStatus.get().reason());
   EXPECT_EQ(task1.task_id(), lostStatus.get().task_id());
   EXPECT_EQ(slaveId, lostStatus.get().slave_id());
+  EXPECT_EQ(partitionTime, lostStatus.get().unreachable_time());
 
   // `sched2` should see TASK_UNREACHABLE.
   AWAIT_READY(unreachableStatus);
@@ -622,6 +643,7 @@ TEST_P(PartitionTest, 
PartitionedSlaveReregistrationMasterFailover)
   EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, 
unreachableStatus.get().reason());
   EXPECT_EQ(task2.task_id(), unreachableStatus.get().task_id());
   EXPECT_EQ(slaveId, unreachableStatus.get().slave_id());
+  EXPECT_EQ(partitionTime, unreachableStatus.get().unreachable_time());
 
   // The master should notify both schedulers that the slave was lost.
   AWAIT_READY(slaveLost1);
@@ -797,12 +819,22 @@ TEST_P(PartitionTest, PartitionedSlaveOrphanedTask)
   }
 
   Clock::advance(masterFlags.agent_ping_timeout);
+  Clock::settle();
+
+  // Record the time at which we expect the master to have marked the
+  // agent as unhealthy. We then advance the clock -- this shouldn't
+  // do anything, but it ensures that the `unreachable_time` we check
+  // below is computed at the right time.
+  TimeInfo partitionTime = protobuf::getCurrentTime();
+
+  Clock::advance(Milliseconds(100));
 
   AWAIT_READY(unreachableStatus);
   EXPECT_EQ(TASK_UNREACHABLE, unreachableStatus.get().state());
   EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, 
unreachableStatus.get().reason());
   EXPECT_EQ(task.task_id(), unreachableStatus.get().task_id());
   EXPECT_EQ(slaveId, unreachableStatus.get().slave_id());
+  EXPECT_EQ(partitionTime, unreachableStatus.get().unreachable_time());
 
   AWAIT_READY(slaveLost);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac269193/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp 
b/src/tests/reconciliation_tests.cpp
index 8e438bf..a043356 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -55,6 +55,7 @@ using mesos::master::detector::StandaloneMasterDetector;
 
 using process::Clock;
 using process::Future;
+using process::Message;
 using process::Owned;
 using process::PID;
 using process::Promise;
@@ -65,6 +66,7 @@ using testing::_;
 using testing::An;
 using testing::AtMost;
 using testing::DoAll;
+using testing::Eq;
 using testing::Return;
 using testing::SaveArg;
 
@@ -262,6 +264,7 @@ TEST_F(ReconciliationTest, UnknownSlave)
   AWAIT_READY(update);
   EXPECT_EQ(TASK_LOST, update.get().state());
   EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, update.get().reason());
+  EXPECT_FALSE(update.get().has_unreachable_time());
 
   driver.stop();
   driver.join();
@@ -318,6 +321,7 @@ TEST_F(ReconciliationTest, UnknownTask)
   AWAIT_READY(update);
   EXPECT_EQ(TASK_LOST, update.get().state());
   EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, update.get().reason());
+  EXPECT_FALSE(update.get().has_unreachable_time());
 
   driver.stop();
   driver.join();
@@ -359,6 +363,7 @@ TEST_F(ReconciliationTest, UnknownKillTask)
   AWAIT_READY(update);
   EXPECT_EQ(TASK_LOST, update.get().state());
   EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, update.get().reason());
+  EXPECT_FALSE(update.get().has_unreachable_time());
 
   driver.stop();
   driver.join();
@@ -878,6 +883,169 @@ TEST_F(ReconciliationTest, ReconcileStatusUpdateTaskState)
   driver.join();
 }
 
+
+TEST_F(ReconciliationTest, PartitionedAgentThenMasterFailover)
+{
+  master::Flags masterFlags = CreateMasterFlags();
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Allow the master to PING the slave, but drop all PONG messages
+  // from the slave. Note that we don't match on the master / slave
+  // PIDs because it's actually the `SlaveObserver` process that sends
+  // the pings.
+  Future<Message> ping = FUTURE_MESSAGE(
+      Eq(PingSlaveMessage().GetTypeName()), _, _);
+
+  DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  StandaloneMasterDetector schedulerDetector(master.get()->pid);
+  TestingMesosSchedulerDriver driver(&sched, &schedulerDetector);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers.get().empty());
+
+  Offer offer = offers.get()[0];
+
+  // Launch `task` using `sched`.
+  TaskInfo task = createTask(offer, "sleep 60");
+
+  Future<TaskStatus> runningStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&runningStatus));
+
+  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(runningStatus);
+  EXPECT_EQ(TASK_RUNNING, runningStatus.get().state());
+  EXPECT_EQ(task.task_id(), runningStatus.get().task_id());
+
+  const SlaveID slaveId = runningStatus.get().slave_id();
+
+  AWAIT_READY(statusUpdateAck);
+
+  // Now, induce a partition of the slave by having the master
+  // timeout the slave.
+  Future<TaskStatus> lostStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&lostStatus));
+
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost));
+
+  Clock::pause();
+
+  size_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+  Clock::settle();
+
+  // Record the time at which we expect the master to have marked the
+  // agent as unhealthy. We then advance the clock -- this shouldn't
+  // do anything, but it ensures that the `unreachable_time` we check
+  // below is computed at the right time.
+  TimeInfo partitionTime = protobuf::getCurrentTime();
+
+  Clock::advance(Milliseconds(100));
+
+  AWAIT_READY(lostStatus);
+  EXPECT_EQ(TASK_LOST, lostStatus.get().state());
+  EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, lostStatus.get().reason());
+  EXPECT_EQ(task.task_id(), lostStatus.get().task_id());
+  EXPECT_EQ(slaveId, lostStatus.get().slave_id());
+  EXPECT_EQ(partitionTime, lostStatus.get().unreachable_time());
+
+  AWAIT_READY(slaveLost);
+
+  // Do the first explicit reconciliation before restarting the master.
+  TaskStatus status1;
+  status1.mutable_task_id()->CopyFrom(task.task_id());
+  status1.mutable_slave_id()->CopyFrom(slaveId);
+  status1.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate1;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate1));
+
+  driver.reconcileTasks({status1});
+
+  AWAIT_READY(reconcileUpdate1);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate1.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate1.get().reason());
+  EXPECT_EQ(partitionTime, reconcileUpdate1.get().unreachable_time());
+
+  // Drop all subsequent messages from the slave, since they aren't
+  // useful anymore.
+  DROP_MESSAGES(_, slave.get()->pid, _);
+
+  EXPECT_CALL(sched, disconnected(&driver));
+
+  // Simulate master failover by restarting the master.
+  master->reset();
+  master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  // Re-register the framework. Note that we don't update the slave's
+  // master detector, so it does not try to reregister.
+  schedulerDetector.appoint(master.get()->pid);
+
+  AWAIT_READY(registered);
+
+  // Do a second explicit reconciliation; we expect to observe the same data.
+  TaskStatus status2;
+  status2.mutable_task_id()->CopyFrom(task.task_id());
+  status2.mutable_slave_id()->CopyFrom(slaveId);
+  status2.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate2));
+
+  driver.reconcileTasks({status2});
+
+  AWAIT_READY(reconcileUpdate2);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate2.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate2.get().reason());
+  EXPECT_EQ(partitionTime, reconcileUpdate2.get().unreachable_time());
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac269193/src/tests/registrar_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index b04fc92..7ea00df 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -236,7 +236,9 @@ TEST_P(RegistrarTest, Recover)
   AWAIT_EXPECT_FAILED(
       registrar.apply(Owned<Operation>(new AdmitSlave(slave))));
   AWAIT_EXPECT_FAILED(
-      registrar.apply(Owned<Operation>(new MarkSlaveUnreachable(slave))));
+      registrar.apply(
+          Owned<Operation>(
+              new MarkSlaveUnreachable(slave, protobuf::getCurrentTime()))));
   AWAIT_EXPECT_FAILED(
       registrar.apply(Owned<Operation>(new MarkSlaveReachable(slave))));
   AWAIT_EXPECT_FAILED(
@@ -249,7 +251,8 @@ TEST_P(RegistrarTest, Recover)
   Future<bool> admit = registrar.apply(
       Owned<Operation>(new AdmitSlave(slave)));
   Future<bool> unreachable = registrar.apply(
-      Owned<Operation>(new MarkSlaveUnreachable(slave)));
+      Owned<Operation>(
+          new MarkSlaveUnreachable(slave, protobuf::getCurrentTime())));
   Future<bool> reachable = registrar.apply(
       Owned<Operation>(new MarkSlaveReachable(slave)));
   Future<bool> remove = registrar.apply(
@@ -334,18 +337,24 @@ TEST_P(RegistrarTest, MarkUnreachable)
   AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info1))));
 
   AWAIT_TRUE(
-      registrar.apply(Owned<Operation>(new MarkSlaveUnreachable(info1))));
+      registrar.apply(
+          Owned<Operation>(
+              new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
 
   AWAIT_TRUE(
       registrar.apply(Owned<Operation>(new MarkSlaveReachable(info1))));
 
   AWAIT_TRUE(
-      registrar.apply(Owned<Operation>(new MarkSlaveUnreachable(info1))));
+      registrar.apply(
+          Owned<Operation>(
+              new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
 
   // If a slave is already unreachable, trying to mark it unreachable
   // again should fail.
   AWAIT_FALSE(
-      registrar.apply(Owned<Operation>(new MarkSlaveUnreachable(info1))));
+      registrar.apply(
+          Owned<Operation>(
+              new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
 }
 
 

Reply via email to