Updated scheduler driver to send ACKNOWLEDGE call.

Review: https://reviews.apache.org/r/36467


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5717eabf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5717eabf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5717eabf

Branch: refs/heads/master
Commit: 5717eabf93dc47430492858d4f781f5ac7bb5373
Parents: e3bbdf8
Author: Vinod Kone <[email protected]>
Authored: Fri Jul 10 15:48:48 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Fri Jul 17 10:44:00 2015 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp                       | 41 +++++++++++++++++---------
 src/tests/fault_tolerance_tests.cpp       |  6 ++--
 src/tests/reconciliation_tests.cpp        |  5 +++-
 src/tests/scheduler_event_call_tests.cpp  |  8 +++--
 src/tests/scheduler_tests.cpp             | 37 ++++++++++++++++-------
 src/tests/slave_recovery_tests.cpp        |  9 ++++--
 src/tests/slave_tests.cpp                 |  2 ++
 src/tests/status_update_manager_tests.cpp |  5 +++-
 8 files changed, 78 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 2dfa5c5..613e40b 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -883,12 +883,19 @@ protected:
         VLOG(2) << "Sending ACK for status update " << update
             << " to " << master.get();
 
-        StatusUpdateAcknowledgementMessage message;
-        message.mutable_framework_id()->MergeFrom(framework.id());
-        message.mutable_slave_id()->MergeFrom(update.slave_id());
-        message.mutable_task_id()->MergeFrom(update.status().task_id());
-        message.set_uuid(update.uuid());
-        send(master.get(), message);
+        Call call;
+
+        CHECK(framework.has_id());
+        call.mutable_framework_id()->CopyFrom(framework.id());
+        call.set_type(Call::ACKNOWLEDGE);
+
+        Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+        acknowledge->mutable_slave_id()->CopyFrom(update.slave_id());
+        acknowledge->mutable_task_id()->CopyFrom(update.status().task_id());
+        acknowledge->set_uuid(update.uuid());
+
+        CHECK_SOME(master);
+        send(master.get(), call);
       }
     }
   }
@@ -1191,8 +1198,6 @@ protected:
       return;
     }
 
-    CHECK_SOME(master);
-
     // NOTE: By ignoring the volatile 'running' here, we ensure that
     // all acknowledgements requested before the driver was stopped
     // or aborted are processed. Any acknowledgement that is requested
@@ -1204,17 +1209,25 @@ protected:
     // ensures that master-generated and driver-generated updates
     // will not have a 'uuid' set.
     if (status.has_uuid() && status.has_slave_id()) {
+      CHECK_SOME(master);
+
       VLOG(2) << "Sending ACK for status update " << status.uuid()
               << " of task " << status.task_id()
               << " on slave " << status.slave_id()
               << " to " << master.get();
 
-      StatusUpdateAcknowledgementMessage message;
-      message.mutable_framework_id()->CopyFrom(framework.id());
-      message.mutable_slave_id()->CopyFrom(status.slave_id());
-      message.mutable_task_id()->CopyFrom(status.task_id());
-      message.set_uuid(status.uuid());
-      send(master.get(), message);
+      Call call;
+
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
+      call.set_type(Call::ACKNOWLEDGE);
+
+      Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+      acknowledge->mutable_slave_id()->CopyFrom(status.slave_id());
+      acknowledge->mutable_task_id()->CopyFrom(status.task_id());
+      acknowledge->set_uuid(status.uuid());
+
+      send(master.get(), call);
     } else {
       VLOG(2) << "Received ACK for status update"
               << (status.has_uuid() ? " " + status.uuid() : "")

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp 
b/src/tests/fault_tolerance_tests.cpp
index f218dc6..f64c797 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1296,8 +1296,8 @@ TEST_F(FaultToleranceTest, 
IgnoreKillTaskFromUnregisteredFramework)
   EXPECT_CALL(sched1, statusUpdate(&driver1, _))
     .WillOnce(FutureArg<1>(&status));
 
-  Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage
-    = FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+  Future<mesos::scheduler::Call> acknowledgeCall = FUTURE_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::ACKNOWLEDGE, _, _);
 
   ExecutorDriver* execDriver;
   EXPECT_CALL(exec, registered(_, _, _, _))
@@ -1314,7 +1314,7 @@ TEST_F(FaultToleranceTest, 
IgnoreKillTaskFromUnregisteredFramework)
   // Wait for the status update acknowledgement to be sent. This
   // ensures the slave doesn't resend the TASK_RUNNING update to the
   // failed over scheduler (below).
-  AWAIT_READY(statusUpdateAcknowledgementMessage);
+  AWAIT_READY(acknowledgeCall);
 
   // Now start the second failed over scheduler.
   MockScheduler sched2;

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp 
b/src/tests/reconciliation_tests.cpp
index aef245a..6940b6a 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -745,7 +745,10 @@ TEST_F(ReconciliationTest, UnacknowledgedTerminalTask)
 
   // Drop the status update acknowledgements to ensure that the
   // task remains terminal and unacknowledged in the master.
-  DROP_PROTOBUFS(StatusUpdateAcknowledgementMessage(), _, _);
+  DROP_CALLS(mesos::scheduler::Call(),
+             mesos::scheduler::Call::ACKNOWLEDGE,
+             _,
+             master.get());
 
   driver.start();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/scheduler_event_call_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_event_call_tests.cpp 
b/src/tests/scheduler_event_call_tests.cpp
index cf6aa19..fe15f05 100644
--- a/src/tests/scheduler_event_call_tests.cpp
+++ b/src/tests/scheduler_event_call_tests.cpp
@@ -20,6 +20,8 @@
 
 #include <mesos/scheduler.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
@@ -152,13 +154,13 @@ TEST_F(SchedulerDriverEventTest, Update)
   // Generate an update that requires acknowledgement.
   event.mutable_update()->mutable_status()->set_uuid(UUID::random().toBytes());
 
-  Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgement =
-    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+  Future<mesos::scheduler::Call> acknowledgement = DROP_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::ACKNOWLEDGE, _, _);
 
   process::post(master.get(), frameworkPid, event);
 
   AWAIT_READY(statusUpdate2);
-  AWAIT_READY(statusUpdateAcknowledgement);
+  AWAIT_READY(acknowledgement);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 2ce280a..13fecb2 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -27,6 +27,8 @@
 #include <mesos/scheduler.hpp>
 #include <mesos/type_utils.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
@@ -1087,8 +1089,11 @@ TEST_F(MesosSchedulerDriverTest, 
DropAckIfStopCalledBeforeAbort)
 
   // Ensure no status update acknowledgements are sent from the driver
   // to the master.
-  EXPECT_NO_FUTURE_PROTOBUFS(
-      StatusUpdateAcknowledgementMessage(), _ , master.get());
+  EXPECT_NO_FUTURE_CALLS(
+      mesos::scheduler::Call(),
+      mesos::scheduler::Call::ACKNOWLEDGE,
+      _ ,
+      master.get());
 
   EXPECT_CALL(exec, registered(_, _, _, _));
 
@@ -1144,8 +1149,11 @@ TEST_F(MesosSchedulerDriverTest, 
ExplicitAcknowledgements)
 
   // Ensure no status update acknowledgements are sent from the driver
   // to the master until the explicit acknowledgement is sent.
-  EXPECT_NO_FUTURE_PROTOBUFS(
-      StatusUpdateAcknowledgementMessage(), _ , master.get());
+  EXPECT_NO_FUTURE_CALLS(
+      mesos::scheduler::Call(),
+      mesos::scheduler::Call::ACKNOWLEDGE,
+      _ ,
+      master.get());
 
   EXPECT_CALL(exec, registered(_, _, _, _));
 
@@ -1166,8 +1174,11 @@ TEST_F(MesosSchedulerDriverTest, 
ExplicitAcknowledgements)
   Clock::settle();
 
   // Now send the acknowledgement.
-  Future<StatusUpdateAcknowledgementMessage> acknowledgement =
-    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _ , master.get());
+  Future<mesos::scheduler::Call> acknowledgement = FUTURE_CALL(
+      mesos::scheduler::Call(),
+      mesos::scheduler::Call::ACKNOWLEDGE,
+      _,
+      master.get());
 
   driver.acknowledgeStatusUpdate(status.get());
 
@@ -1204,8 +1215,11 @@ TEST_F(MesosSchedulerDriverTest, 
ExplicitAcknowledgementsMasterGeneratedUpdate)
     .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   // Ensure no status update acknowledgements are sent to the master.
-  EXPECT_NO_FUTURE_PROTOBUFS(
-      StatusUpdateAcknowledgementMessage(), _ , master.get());
+  EXPECT_NO_FUTURE_CALLS(
+      mesos::scheduler::Call(),
+      mesos::scheduler::Call::ACKNOWLEDGE,
+      _ ,
+      master.get());
 
   driver.start();
 
@@ -1266,8 +1280,11 @@ TEST_F(MesosSchedulerDriverTest, 
ExplicitAcknowledgementsUnsetSlaveID)
     .WillOnce(FutureSatisfy(&registered));
 
   // Ensure no status update acknowledgements are sent to the master.
-  EXPECT_NO_FUTURE_PROTOBUFS(
-      StatusUpdateAcknowledgementMessage(), _ , master.get());
+  EXPECT_NO_FUTURE_CALLS(
+      mesos::scheduler::Call(),
+      mesos::scheduler::Call::ACKNOWLEDGE,
+      _ ,
+      master.get());
 
   driver.start();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp 
b/src/tests/slave_recovery_tests.cpp
index 2f882cf..7708cf6 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -27,6 +27,8 @@
 #include <mesos/resources.hpp>
 #include <mesos/scheduler.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <process/dispatch.hpp>
 #include <process/gmock.hpp>
 #include <process/owned.hpp>
@@ -211,8 +213,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
   Future<StatusUpdateMessage> update =
     FUTURE_PROTOBUF(StatusUpdateMessage(), Eq(master.get()), _);
 
-  Future<StatusUpdateAcknowledgementMessage> ack =
-    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+  Future<mesos::scheduler::Call> ack = FUTURE_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::ACKNOWLEDGE, _, _);
 
   Future<Nothing> _ack =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
@@ -313,7 +315,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
                 .executors[executorId]
                 .runs[containerId.get()]
                 .tasks[task.task_id()]
-                .acks.contains(UUID::fromBytes(ack.get().uuid())));
+                .acks.contains(
+                    UUID::fromBytes(ack.get().acknowledge().uuid())));
 
   // Shut down the executor manually so that it doesn't hang around
   // after the test finishes.

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 89cc7f6..a16e4f4 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -29,6 +29,8 @@
 #include <mesos/executor.hpp>
 #include <mesos/scheduler.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp 
b/src/tests/status_update_manager_tests.cpp
index 440b074..0224e50 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -420,7 +420,10 @@ TEST_F(StatusUpdateManagerTest, 
IgnoreUnexpectedStatusUpdateAck)
 
   // Drop the ACKs, so that status update manager
   // retries the update.
-  DROP_PROTOBUFS(StatusUpdateAcknowledgementMessage(), _, _);
+  DROP_CALLS(mesos::scheduler::Call(),
+             mesos::scheduler::Call::ACKNOWLEDGE,
+             _,
+             master.get());
 
   driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
 

Reply via email to