http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/include/mesos/v1/scheduler/scheduler.proto ---------------------------------------------------------------------- diff --git a/include/mesos/v1/scheduler/scheduler.proto b/include/mesos/v1/scheduler/scheduler.proto new file mode 100644 index 0000000..bd5e82a --- /dev/null +++ b/include/mesos/v1/scheduler/scheduler.proto @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import "mesos/v1/mesos.proto"; + +package mesos.v1.scheduler; + +option java_package = "org.apache.mesos.v1.scheduler"; +option java_outer_classname = "Protos"; + + +/** + * Scheduler event API. + * + * An event is described using the standard protocol buffer "union" + * trick, see: + * https://developers.google.com/protocol-buffers/docs/techniques#union. + */ +message Event { + // Possible event types, followed by message definitions if + // applicable. + enum Type { + SUBSCRIBED = 1; // See 'Subscribed' below. + OFFERS = 2; // See 'Offers' below. + RESCIND = 3; // See 'Rescind' below. + UPDATE = 4; // See 'Update' below. + MESSAGE = 5; // See 'Message' below. + FAILURE = 6; // See 'Failure' below. + ERROR = 7; // See 'Error' below. + + // Periodic message sent by the Mesos master according to + // 'Subscribed.heartbeat_interval_seconds'. If the scheduler does + // not receive any events (including heartbeats) for an extended + // period of time (e.g., 5 x heartbeat_interval_seconds), there is + // likely a network partition. In such a case the scheduler should + // close the existing subscription connection and resubscribe + // using a backoff strategy. + HEARTBEAT = 8; + } + + // First event received when the scheduler subscribes. + message Subscribed { + required FrameworkID framework_id = 1; + + // This value will be set if the master is sending heartbeats. See + // the comment above on 'HEARTBEAT' for more details. + // TODO(vinod): Implement heartbeats in the master once the master + // can send HTTP events. + optional double heartbeat_interval_seconds = 2; + } + + // Received whenever there are new resources that are offered to the + // scheduler. Each offer corresponds to a set of resources on a + // agent. Until the scheduler accepts or declines an offer the + // resources are considered allocated to the scheduler. + message Offers { + repeated Offer offers = 1; + } + + // Received when a particular offer is no longer valid (e.g., the + // agent corresponding to the offer has been removed) and hence + // needs to be rescinded. Any future calls ('Accept' / 'Decline') made + // by the scheduler regarding this offer will be invalid. + message Rescind { + required OfferID offer_id = 1; + } + + // Received whenever there is a status update that is generated by + // the executor or agent or master. Status updates should be used by + // executors to reliably communicate the status of the tasks that + // they manage. It is crucial that a terminal update (see TaskState + // in v1/mesos.proto) is sent by the executor as soon as the task + // terminates, in order for Mesos to release the resources allocated + // to the task. It is also the responsibility of the scheduler to + // explicitly acknowledge the receipt of a status update. See + // 'Acknowledge' in the 'Call' section below for the semantics. + message Update { + required TaskStatus status = 1; + } + + // Received when a custom message generated by the executor is + // forwarded by the master. Note that this message is not + // interpreted by Mesos and is only forwarded (without reliability + // guarantees) to the scheduler. It is up to the executor to retry + // if the message is dropped for any reason. + message Message { + required AgentID agent_id = 1; + required ExecutorID executor_id = 2; + required bytes data = 3; + } + + // Received when a agent is removed from the cluster (e.g., failed + // health checks) or when an executor is terminated. Note that, this + // event coincides with receipt of terminal UPDATE events for any + // active tasks belonging to the agent or executor and receipt of + // 'Rescind' events for any outstanding offers belonging to the + // agent. Note that there is no guaranteed order between the + // 'Failure', 'Update' and 'Rescind' events when an agent or executor + // is removed. + // TODO(vinod): Consider splitting the lost agent and terminated + // executor into separate events and ensure it's reliably generated. + message Failure { + optional AgentID agent_id = 1; + + // If this was just a failure of an executor on an agent then + // 'executor_id' will be set and possibly 'status' (if we were + // able to determine the exit status). + optional ExecutorID executor_id = 2; + optional int32 status = 3; + } + + // Received when an invalid framework (e.g., unauthenticated, + // unauthorized) attempts to subscribe with the master. Error can + // also be received if scheduler sends invalid Calls (e.g., not + // properly initialized). + // TODO(vinod): Remove this once the old scheduler driver is no + // longer supported. With HTTP API all errors will be signaled via + // HTTP response codes. + message Error { + required string message = 1; + } + + // Type of the event, indicates which optional field below should be + // present if that type has a nested message definition. + required Type type = 1; + + optional Subscribed subscribed = 2; + optional Offers offers = 3; + optional Rescind rescind = 4; + optional Update update = 5; + optional Message message = 6; + optional Failure failure = 7; + optional Error error = 8; +} + + +/** + * Scheduler call API. + * + * Like Event, a Call is described using the standard protocol buffer + * "union" trick (see above). + */ +message Call { + // Possible call types, followed by message definitions if + // applicable. + enum Type { + SUBSCRIBE = 1; // See 'Subscribe' below. + TEARDOWN = 2; // Shuts down all tasks/executors and removes framework. + ACCEPT = 3; // See 'Accept' below. + DECLINE = 4; // See 'Decline' below. + REVIVE = 5; // Removes any previous filters set via ACCEPT or DECLINE. + KILL = 6; // See 'Kill' below. + SHUTDOWN = 7; // See 'Shutdown' below. + ACKNOWLEDGE = 8; // See 'Acknowledge' below. + RECONCILE = 9; // See 'Reconcile' below. + MESSAGE = 10; // See 'Message' below. + REQUEST = 11; // See 'Request' below. + + // TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for + // already subscribed frameworks as a way of stopping offers from + // being generated and other events from being sent by the master. + // Note that this functionality existed originally to support + // SchedulerDriver::abort which was only necessary to handle + // exceptions getting thrown from within Scheduler callbacks, + // something that is not an issue with the Event/Call API. + } + + // Subscribes the scheduler with the master to receive events. A + // scheduler must send other calls only after it has received the + // SUBCRIBED event. + message Subscribe { + // See the comments below on 'framework_id' on the semantics for + // 'framework_info.id'. + required FrameworkInfo framework_info = 1; + + // 'force' field is only relevant when 'framework_info.id' is set. + // It tells the master what to do in case an instance of the + // scheduler attempts to subscribe when another instance of it is + // already connected (e.g., split brain due to network partition). + // If 'force' is true, this scheduler instance is allowed and the + // old connected scheduler instance is disconnected. If false, + // this scheduler instance is disallowed subscription in favor of + // the already connected scheduler instance. + // + // It is recommended to set this to true only when a newly elected + // scheduler instance is attempting to subscribe but not when a + // scheduler is retrying subscription (e.g., disconnection or + // master failover; see sched/sched.cpp for an example). + optional bool force = 2; + } + + // Accepts an offer, performing the specified operations + // in a sequential manner. + // + // E.g. Launch a task with a newly reserved persistent volume: + // + // Accept { + // offer_ids: [ ... ] + // operations: [ + // { type: RESERVE, + // reserve: { resources: [ disk(role):2 ] } } + // { type: CREATE, + // create: { volumes: [ disk(role):1+persistence ] } } + // { type: LAUNCH, + // launch: { task_infos ... disk(role):1;disk(role):1+persistence } } + // ] + // } + // + // Note that any of the offerâs resources not used in the 'Accept' + // call (e.g., to launch a task) are considered unused and might be + // reoffered to other frameworks. In other words, the same OfferID + // cannot be used in more than one 'Accept' call. + message Accept { + repeated OfferID offer_ids = 1; + repeated Offer.Operation operations = 2; + optional Filters filters = 3; + } + + // Declines an offer, signaling the master to potentially reoffer + // the resources to a different framework. Note that this is same + // as sending an Accept call with no operations. See comments on + // top of 'Accept' for semantics. + message Decline { + repeated OfferID offer_ids = 1; + optional Filters filters = 2; + } + + // Kills a specific task. If the scheduler has a custom executor, + // the kill is forwarded to the executor and it is up to the + // executor to kill the task and send a TASK_KILLED (or TASK_FAILED) + // update. Note that Mesos releases the resources for a task once it + // receives a terminal update (See TaskState in v1/mesos.proto) for + // it. If the task is unknown to the master, a TASK_LOST update is + // generated. + message Kill { + required TaskID task_id = 1; + optional AgentID agent_id = 2; + } + + // Shuts down a custom executor. When the executor gets a shutdown + // event, it is expected to kill all its tasks (and send TASK_KILLED + // updates) and terminate. If the executor doesnât terminate within + // a certain timeout (configurable via + // '--executor_shutdown_grace_period' agent flag), the agent will + // forcefully destroy the container (executor and its tasks) and + // transition its active tasks to TASK_LOST. + message Shutdown { + required ExecutorID executor_id = 1; + required AgentID agent_id = 2; + } + + // Acknowledges the receipt of status update. Schedulers are + // responsible for explicitly acknowledging the receipt of status + // updates that have 'Update.status().uuid()' field set. Such status + // updates are retried by the agent until they are acknowledged by + // the scheduler. + message Acknowledge { + required AgentID agent_id = 1; + required TaskID task_id = 2; + required bytes uuid = 3; + } + + // Allows the scheduler to query the status for non-terminal tasks. + // This causes the master to send back the latest task status for + // each task in 'tasks', if possible. Tasks that are no longer known + // will result in a TASK_LOST update. If 'statuses' is empty, then + // the master will send the latest status for each task currently + // known. + message Reconcile { + // TODO(vinod): Support arbitrary queries than just state of tasks. + message Task { + required TaskID task_id = 1; + optional AgentID agent_id = 2; + } + + repeated Task tasks = 1; + } + + // Sends arbitrary binary data to the executor. Note that Mesos + // neither interprets this data nor makes any guarantees about the + // delivery of this message to the executor. + message Message { + required AgentID agent_id = 1; + required ExecutorID executor_id = 2; + required bytes data = 3; + } + + // Requests a specific set of resources from Mesos's allocator. If + // the allocator has support for this, corresponding offers will be + // sent asynchronously via the OFFERS event(s). + // + // NOTE: The built-in hierarchical allocator doesn't have support + // for this call and hence simply ignores it. + message Request { + repeated mesos.v1.Request requests = 1; + } + + // Identifies who generated this call. Master assigns a framework id + // when a new scheduler subscribes for the first time. Once assigned, + // the scheduler must set the 'framework_id' here and within its + // FrameworkInfo (in any further 'Subscribe' calls). This allows the + // master to identify a scheduler correctly across disconnections, + // failovers, etc. + optional FrameworkID framework_id = 1; + + // Type of the call, indicates which optional field below should be + // present if that type has a nested message definition. + required Type type = 2; + + optional Subscribe subscribe = 3; + optional Accept accept = 4; + optional Decline decline = 5; + optional Kill kill = 6; + optional Shutdown shutdown = 7; + optional Acknowledge acknowledge = 8; + optional Reconcile reconcile = 9; + optional Message message = 10; + optional Request request = 11; +}
http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/include/mesos/v1/values.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/v1/values.hpp b/include/mesos/v1/values.hpp new file mode 100644 index 0000000..40fc167 --- /dev/null +++ b/include/mesos/v1/values.hpp @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __MESOS_V1_VALUES_HPP__ +#define __MESOS_V1_VALUES_HPP__ + +#include <mesos/v1/mesos.hpp> + +#include <stout/try.hpp> + +namespace mesos { +namespace v1 { + +std::ostream& operator << (std::ostream& stream, const Value::Scalar& scalar); +bool operator == (const Value::Scalar& left, const Value::Scalar& right); +bool operator <= (const Value::Scalar& left, const Value::Scalar& right); +Value::Scalar operator + (const Value::Scalar& left, + const Value::Scalar& right); +Value::Scalar operator - (const Value::Scalar& left, + const Value::Scalar& right); +Value::Scalar& operator += (Value::Scalar& left, const Value::Scalar& right); +Value::Scalar& operator -= (Value::Scalar& left, const Value::Scalar& right); + +std::ostream& operator << (std::ostream& stream, const Value::Ranges& ranges); +bool operator == (const Value::Ranges& left, const Value::Ranges& right); +bool operator <= (const Value::Ranges& left, const Value::Ranges& right); +Value::Ranges operator + (const Value::Ranges& left, + const Value::Ranges& right); +Value::Ranges operator - (const Value::Ranges& left, + const Value::Ranges& right); +Value::Ranges& operator += (Value::Ranges& left, const Value::Ranges& right); +Value::Ranges& operator -= (Value::Ranges& left, const Value::Ranges& right); + +std::ostream& operator << (std::ostream& stream, const Value::Set& set); +bool operator == (const Value::Set& left, const Value::Set& right); +bool operator <= (const Value::Set& left, const Value::Set& right); +Value::Set operator + (const Value::Set& left, const Value::Set& right); +Value::Set operator - (const Value::Set& left, const Value::Set& right); +Value::Set& operator += (Value::Set& left, const Value::Set& right); +Value::Set& operator -= (Value::Set& left, const Value::Set& right); + +std::ostream& operator << (std::ostream& stream, const Value::Text& value); +bool operator == (const Value::Text& left, const Value::Text& right); + +namespace internal { +namespace values { + +Try<Value> parse(const std::string& text); + +} // namespace values { +} // namespace internal { + +} // namespace v1 { +} // namespace mesos { + +#endif // __MESOS_V1_VALUES_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index c213ac7..9420031 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -136,6 +136,8 @@ endif MESOS_PROTO = $(top_srcdir)/include/mesos/mesos.proto +V1_MESOS_PROTO = $(top_srcdir)/include/mesos/v1/mesos.proto + AUTHENTICATION_PROTO = \ $(top_srcdir)/include/mesos/authentication/authentication.proto @@ -157,6 +159,9 @@ MODULE_PROTO = \ SCHEDULER_PROTO = \ $(top_srcdir)/include/mesos/scheduler/scheduler.proto +V1_SCHEDULER_PROTO = \ + $(top_srcdir)/include/mesos/v1/scheduler/scheduler.proto + ISOLATOR_PROTO = \ $(top_srcdir)/include/mesos/slave/isolator.proto @@ -183,7 +188,12 @@ CXX_PROTOS = \ slave/isolator.pb.cc \ ../include/mesos/slave/isolator.pb.h \ slave/oversubscription.pb.cc \ - ../include/mesos/slave/oversubscription.pb.h + ../include/mesos/slave/oversubscription.pb.h \ + v1/mesos.pb.cc \ + ../include/mesos/v1/mesos.pb.h \ + v1/scheduler/scheduler.pb.cc \ + ../include/mesos/v1/scheduler/scheduler.pb.h + JAVA_PROTOS = \ java/generated/org/apache/mesos/Protos.java \ @@ -224,10 +234,21 @@ CLEANFILES += $(REGISTRY_PROTOS) # Targets for generating protocol buffer code. # For the include headers, place the header files in the include # directory and leave the cc files in src. -%.pb.cc ../include/mesos/%.pb.h: $(top_srcdir)/include/mesos/%.proto +# +# TODO(benh): In certain circumstances below we need to do '-mv' +# instead of just 'mv' because the way we have our dependencies +# currently set up 'make' actually attempts to run the 'mv' command +# twice which causes the second invocation to fail because the file no +# longer exists (because it's already been moved). +mesos.pb.cc ../include/mesos/mesos.pb.h: $(MESOS_PROTO) $(MKDIR_P) $(@D) $(PROTOC) $(PROTOCFLAGS) --cpp_out=../include $^ - mv ../include/mesos/*.pb.cc . + -mv -f ../include/mesos/mesos.pb.cc $(@D) + +v1/mesos.pb.cc ../include/mesos/v1/mesos.pb.h: $(V1_MESOS_PROTO) + $(MKDIR_P) $(@D) + $(PROTOC) $(PROTOCFLAGS) --cpp_out=../include $^ + -mv -f ../include/mesos/v1/mesos.pb.cc $(@D) authentication/%.pb.cc ../include/mesos/authentication/%.pb.h: $(AUTHENTICATION_PROTO) $(MKDIR_P) $(@D) @@ -271,6 +292,12 @@ scheduler/%.pb.cc ../include/mesos/scheduler/%.pb.h: $(SCHEDULER_PROTO) $(PROTOC) $(PROTOCFLAGS) --cpp_out=../include $^ mv ../include/mesos/scheduler/*.pb.cc $(@D) +v1/scheduler/%.pb.cc ../include/mesos/v1/scheduler/%.pb.h: $(V1_SCHEDULER_PROTO) + $(MKDIR_P) $(@D) + $(MKDIR_P) ../include/mesos/v1/scheduler + $(PROTOC) $(PROTOCFLAGS) --cpp_out=../include $^ + mv ../include/mesos/v1/scheduler/*.pb.cc $(@D) + slave/%.pb.cc ../include/mesos/slave/%.pb.h: $(top_srcdir)/include/mesos/slave/%.proto $(MKDIR_P) $(@D) $(MKDIR_P) ../include/mesos/slave @@ -285,6 +312,10 @@ java/generated/org/apache/mesos/Protos.java: $(MESOS_PROTO) $(MKDIR_P) $(@D) $(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^ +java/generated/org/apache/mesos/v1/Protos.java: $(V1_MESOS_PROTO) + $(MKDIR_P) $(@D) + $(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^ + java/generated/org/apache/mesos/containerizer/Protos.java: \ $(CONTAINERIZER_PROTO) $(MKDIR_P) $(@D) @@ -298,6 +329,10 @@ java/generated/org/apache/mesos/scheduler/Protos.java: $(SCHEDULER_PROTO) $(MKDIR_P) $(@D) $(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^ +java/generated/org/apache/mesos/v1/scheduler/Protos.java: $(V1_SCHEDULER_PROTO) + $(MKDIR_P) $(@D) + $(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^ + java/generated/org/apache/mesos/executor/Protos.java: $(EXECUTOR_PROTO) $(MKDIR_P) $(@D) $(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^ @@ -305,7 +340,12 @@ java/generated/org/apache/mesos/executor/Protos.java: $(EXECUTOR_PROTO) python/interface/src/mesos/interface/mesos_pb2.py: $(MESOS_PROTO) $(MKDIR_P) $(@D) $(PROTOC) -I$(top_srcdir)/include/mesos \ - $(PROTOCFLAGS) --python_out=python/interface/src/mesos/interface $^ + $(PROTOCFLAGS) --python_out=python/interface/src/mesos/interface $^ + +python/interface/src/mesos/v1/interface/mesos_pb2.py: $(V1_MESOS_PROTO) + $(MKDIR_P) $(@D) + $(PROTOC) -I$(top_srcdir)/include/mesos \ + $(PROTOCFLAGS) --python_out=python/interface/src/mesos/v1/interface $^ # All python protocol buffer code that imports from mesos.proto has # to get patched using sed, removing the leading 'mesos.' namespace @@ -322,15 +362,22 @@ python/interface/src/mesos/interface/containerizer_pb2.py: \ python/interface/src/mesos/interface/scheduler_pb2.py: $(SCHEDULER_PROTO) $(MKDIR_P) $(@D) $(PROTOC) -I$(top_srcdir)/include/mesos/scheduler \ - $(PROTOCFLAGS) \ - --python_out=python/interface/src/mesos/interface $^ + $(PROTOCFLAGS) \ + --python_out=python/interface/src/mesos/interface $^ + sed -e 's/mesos\.mesos_pb2/mesos_pb2/' <$@ >$@ + +python/interface/src/mesos/v1/interface/scheduler_pb2.py: $(V1_SCHEDULER_PROTO) + $(MKDIR_P) $(@D) + $(PROTOC) -I$(top_srcdir)/include/mesos/v1/scheduler \ + $(PROTOCFLAGS) \ + --python_out=python/interface/src/mesos/v1/interface $^ sed -e 's/mesos\.mesos_pb2/mesos_pb2/' <$@ >$@ python/interface/src/mesos/interface/executor_pb2.py: $(EXECUTOR_PROTO) $(MKDIR_P) $(@D) $(PROTOC) -I$(top_srcdir)/include/mesos/executor \ - $(PROTOCFLAGS) \ - --python_out=python/interface/src/mesos/interface $^ + $(PROTOCFLAGS) \ + --python_out=python/interface/src/mesos/interface $^ sed -e 's/mesos\.mesos_pb2/mesos_pb2/' <$@ >$@ # We even use a convenience library for most of Mesos so that we can @@ -366,6 +413,8 @@ libmesos_no_3rdparty_la_SOURCES = \ exec/exec.cpp \ files/files.cpp \ hook/manager.cpp \ + internal/devolve.cpp \ + internal/evolve.cpp \ local/local.cpp \ logging/flags.cpp \ logging/logging.cpp \ @@ -383,6 +432,8 @@ libmesos_no_3rdparty_la_SOURCES = \ master/validation.cpp \ master/allocator/allocator.cpp \ master/allocator/sorter/drf/sorter.cpp \ + messages/flags.proto \ + messages/messages.proto \ module/manager.cpp \ sched/constants.cpp \ sched/sched.cpp \ @@ -415,25 +466,28 @@ libmesos_no_3rdparty_la_SOURCES = \ slave/containerizer/provisioner.cpp \ slave/resource_estimators/noop.cpp \ usage/usage.cpp \ + v1/attributes.cpp \ + v1/mesos.cpp \ + v1/resources.cpp \ + v1/values.cpp \ watcher/whitelist_watcher.cpp \ zookeeper/contender.cpp \ zookeeper/detector.cpp \ zookeeper/zookeeper.cpp \ zookeeper/authentication.cpp \ - zookeeper/group.cpp \ - messages/flags.proto \ - messages/messages.proto + zookeeper/group.cpp + pkginclude_HEADERS = \ $(top_srcdir)/include/mesos/executor.hpp \ $(top_srcdir)/include/mesos/hook.hpp \ $(top_srcdir)/include/mesos/mesos.hpp \ + $(top_srcdir)/include/mesos/mesos.proto \ $(top_srcdir)/include/mesos/module.hpp \ $(top_srcdir)/include/mesos/resources.hpp \ $(top_srcdir)/include/mesos/scheduler.hpp \ $(top_srcdir)/include/mesos/type_utils.hpp \ - $(top_srcdir)/include/mesos/values.hpp \ - $(top_srcdir)/include/mesos/mesos.proto + $(top_srcdir)/include/mesos/values.hpp nodist_pkginclude_HEADERS = \ ../include/mesos/version.hpp \ @@ -519,6 +573,27 @@ nodist_slave_HEADERS = \ ../include/mesos/slave/isolator.pb.h \ ../include/mesos/slave/oversubscription.pb.h +v1dir = $(pkgincludedir)/v1 + +v1_HEADERS = \ + $(top_srcdir)/include/mesos/v1/attributes.hpp \ + $(top_srcdir)/include/mesos/v1/mesos.hpp \ + $(top_srcdir)/include/mesos/v1/mesos.proto \ + $(top_srcdir)/include/mesos/v1/resources.hpp \ + $(top_srcdir)/include/mesos/v1/scheduler.hpp \ + $(top_srcdir)/include/mesos/v1/values.hpp + +nodist_v1_HEADERS = \ + ../include/mesos/v1/mesos.pb.h + +v1schedulerdir = $(pkgincludedir)/v1/scheduler + +v1scheduler_HEADERS = \ + $(top_srcdir)/include/mesos/v1/scheduler/scheduler.hpp \ + $(top_srcdir)/include/mesos/v1/scheduler/scheduler.proto + +nodist_v1scheduler_HEADERS = ../include/mesos/v1/scheduler/scheduler.pb.h + if OS_LINUX libmesos_no_3rdparty_la_SOURCES += linux/cgroups.cpp libmesos_no_3rdparty_la_SOURCES += linux/fs.cpp @@ -595,6 +670,8 @@ libmesos_no_3rdparty_la_SOURCES += \ files/files.hpp \ hdfs/hdfs.hpp \ hook/manager.hpp \ + internal/devolve.hpp \ + internal/evolve.hpp \ linux/cgroups.hpp \ linux/fs.hpp \ linux/ns.hpp \ @@ -794,7 +871,9 @@ libmesos_la_SOURCES = \ $(SCHEDULER_PROTO) \ $(EXECUTOR_PROTO) \ $(ISOLATOR_PROTO) \ - $(OVERSUBSCRIPTION_PROTO) + $(OVERSUBSCRIPTION_PROTO) \ + $(V1_MESOS_PROTO) \ + $(V1_SCHEDULER_PROTO) libmesos_la_LDFLAGS = -release $(PACKAGE_VERSION) @@ -1093,7 +1172,11 @@ libjava_la_CPPFLAGS = $(MESOS_CPPFLAGS) libjava_la_CPPFLAGS += $(JAVA_CPPFLAGS) libjava_la_CPPFLAGS += -I$(srcdir)/java/jni -Ijava/jni -libjava_la_DEPENDENCIES = $(MESOS_PROTO) $(CONTAINERIZER_PROTO) +libjava_la_DEPENDENCIES = \ + $(CONTAINERIZER_PROTO) \ + $(MESOS_PROTO) \ + $(V1_MESOS_PROTO) \ + $(V1_SCHEDULER_PROTO) # We don't add libjava.la to libmesos_no_3rdparty.la so we don't # include the JNI bindings in the Python egg (but we might want to @@ -1523,6 +1606,7 @@ mesos_tests_SOURCES = \ tests/resource_offers_tests.cpp \ tests/resources_tests.cpp \ tests/scheduler_tests.cpp \ + tests/scheduler_driver_tests.cpp \ tests/scheduler_event_call_tests.cpp \ tests/script.cpp \ tests/slave_recovery_tests.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/common/protobuf_utils.cpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp index 4de176b..6b28355 100644 --- a/src/common/protobuf_utils.cpp +++ b/src/common/protobuf_utils.cpp @@ -16,8 +16,6 @@ * limitations under the License. */ -#include <mesos/scheduler/scheduler.hpp> - #include <mesos/slave/isolator.hpp> #include <mesos/type_utils.hpp> @@ -35,8 +33,6 @@ using std::string; -using mesos::scheduler::Event; - using mesos::slave::ContainerLimitation; using mesos::slave::ContainerState; @@ -252,150 +248,6 @@ ContainerState createContainerState( } // namespace slave { -namespace scheduler { - -Event event(const FrameworkRegisteredMessage& message) -{ - Event event; - event.set_type(Event::SUBSCRIBED); - - Event::Subscribed* subscribed = event.mutable_subscribed(); - subscribed->mutable_framework_id()->CopyFrom(message.framework_id()); - - return event; -} - - -Event event(const FrameworkReregisteredMessage& message) -{ - Event event; - event.set_type(Event::SUBSCRIBED); - - Event::Subscribed* subscribed = event.mutable_subscribed(); - subscribed->mutable_framework_id()->CopyFrom(message.framework_id()); - - return event; -} - - -Event event(const ResourceOffersMessage& message) -{ - Event event; - event.set_type(Event::OFFERS); - - Event::Offers* offers = event.mutable_offers(); - offers->mutable_offers()->CopyFrom(message.offers()); - - return event; -} - - -Event event(const RescindResourceOfferMessage& message) -{ - Event event; - event.set_type(Event::RESCIND); - - Event::Rescind* rescind = event.mutable_rescind(); - rescind->mutable_offer_id()->CopyFrom(message.offer_id()); - - return event; -} - - -Event event(const StatusUpdateMessage& message) -{ - Event event; - event.set_type(Event::UPDATE); - - Event::Update* update = event.mutable_update(); - - update->mutable_status()->CopyFrom(message.update().status()); - - if (message.update().has_slave_id()) { - update->mutable_status()->mutable_slave_id()->CopyFrom( - message.update().slave_id()); - } - - if (message.update().has_executor_id()) { - update->mutable_status()->mutable_executor_id()->CopyFrom( - message.update().executor_id()); - } - - update->mutable_status()->set_timestamp(message.update().timestamp()); - - // If the update does not have a 'uuid', it does not need - // acknowledging. However, prior to 0.23.0, the update uuid - // was required and always set. In 0.24.0, we can rely on the - // update uuid check here, until then we must still check for - // this being sent from the driver (from == UPID()) or from - // the master (pid == UPID()). - // TODO(vinod): Get rid of this logic in 0.25.0 because master - // and slave correctly set task status in 0.24.0. - if (!message.update().has_uuid() || message.update().uuid() == "") { - update->mutable_status()->clear_uuid(); - } else if (UPID(message.pid()) == UPID()) { - update->mutable_status()->clear_uuid(); - } else { - update->mutable_status()->set_uuid(message.update().uuid()); - } - - return event; -} - - -Event event(const LostSlaveMessage& message) -{ - Event event; - event.set_type(Event::FAILURE); - - Event::Failure* failure = event.mutable_failure(); - failure->mutable_slave_id()->CopyFrom(message.slave_id()); - - return event; -} - - -Event event(const ExitedExecutorMessage& message) -{ - Event event; - event.set_type(Event::FAILURE); - - Event::Failure* failure = event.mutable_failure(); - failure->mutable_slave_id()->CopyFrom(message.slave_id()); - failure->mutable_executor_id()->CopyFrom(message.executor_id()); - failure->set_status(message.status()); - - return event; -} - - -Event event(const ExecutorToFrameworkMessage& message) -{ - Event event; - event.set_type(Event::MESSAGE); - - Event::Message* message_ = event.mutable_message(); - message_->mutable_slave_id()->CopyFrom(message.slave_id()); - message_->mutable_executor_id()->CopyFrom(message.executor_id()); - message_->set_data(message.data()); - - return event; -} - - -Event event(const FrameworkErrorMessage& message) -{ - Event event; - event.set_type(Event::ERROR); - - Event::Error* error = event.mutable_error(); - error->set_message(message.message()); - - return event; -} - -} // namespace scheduler { - } // namespace protobuf { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/common/protobuf_utils.hpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp index 312bc61..63eeb77 100644 --- a/src/common/protobuf_utils.hpp +++ b/src/common/protobuf_utils.hpp @@ -21,7 +21,7 @@ #include <string> -#include <mesos/scheduler/scheduler.hpp> +#include <mesos/mesos.hpp> #include <mesos/slave/isolator.hpp> @@ -92,22 +92,6 @@ mesos::slave::ContainerState createContainerState( } // namespace slave { -namespace scheduler { - -// Helper functions that create scheduler::Event from a message that -// is sent to the scheduler. -mesos::scheduler::Event event(const FrameworkRegisteredMessage& message); -mesos::scheduler::Event event(const FrameworkReregisteredMessage& message); -mesos::scheduler::Event event(const ResourceOffersMessage& message); -mesos::scheduler::Event event(const RescindResourceOfferMessage& message); -mesos::scheduler::Event event(const StatusUpdateMessage& message); -mesos::scheduler::Event event(const LostSlaveMessage& message); -mesos::scheduler::Event event(const ExitedExecutorMessage& message); -mesos::scheduler::Event event(const ExecutorToFrameworkMessage& message); -mesos::scheduler::Event event(const FrameworkErrorMessage& message); - -} // namespace scheduler { - } // namespace protobuf { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/examples/event_call_framework.cpp ---------------------------------------------------------------------- diff --git a/src/examples/event_call_framework.cpp b/src/examples/event_call_framework.cpp index 0093363..02c24c2 100644 --- a/src/examples/event_call_framework.cpp +++ b/src/examples/event_call_framework.cpp @@ -22,9 +22,9 @@ #include <boost/lexical_cast.hpp> -#include <mesos/resources.hpp> -#include <mesos/scheduler.hpp> -#include <mesos/type_utils.hpp> +#include <mesos/v1/mesos.hpp> +#include <mesos/v1/resources.hpp> +#include <mesos/v1/scheduler.hpp> #include <process/delay.hpp> #include <process/process.hpp> @@ -49,7 +49,7 @@ #include "logging/flags.hpp" #include "logging/logging.hpp" -using namespace mesos; +using namespace mesos::v1; using std::cerr; using std::cout; @@ -60,9 +60,8 @@ using std::vector; using boost::lexical_cast; -using mesos::Resources; -using mesos::scheduler::Call; -using mesos::scheduler::Event; +using mesos::v1::scheduler::Call; +using mesos::v1::scheduler::Event; const int32_t CPUS_PER_TASK = 1; const int32_t MEM_PER_TASK = 128; @@ -161,9 +160,9 @@ public: cout << "Executor '" << event.failure().executor_id().value() << "' terminated"; - if (event.failure().has_slave_id()) { - cout << " on Slave '" - << event.failure().slave_id().value() << "'"; + if (event.failure().has_agent_id()) { + cout << " on Agent '" + << event.failure().agent_id().value() << "'"; } if (event.failure().has_status()) { @@ -171,9 +170,9 @@ public: } cout << endl; - } else if (event.failure().has_slave_id()) { - // Slave failed. - cout << "Slave '" << event.failure().slave_id().value() + } else if (event.failure().has_agent_id()) { + // Agent failed. + cout << "Agent '" << event.failure().agent_id().value() << "' terminated" << endl; } break; @@ -197,7 +196,8 @@ private: void resourceOffers(const vector<Offer>& offers) { foreach (const Offer& offer, offers) { - cout << "Received offer " << offer.id() << " with " << offer.resources() + cout << "Received offer " << offer.id() << " with " + << Resources(offer.resources()) << endl; static const Resources TASK_RESOURCES = Resources::parse( @@ -219,14 +219,16 @@ private: task.set_name("Task " + lexical_cast<string>(taskId)); task.mutable_task_id()->set_value( lexical_cast<string>(taskId)); - task.mutable_slave_id()->MergeFrom(offer.slave_id()); + task.mutable_agent_id()->MergeFrom(offer.agent_id()); task.mutable_executor()->MergeFrom(executor); Option<Resources> resources = remaining.find(TASK_RESOURCES.flatten(framework.role())); CHECK_SOME(resources); - task.mutable_resources()->MergeFrom(resources.get()); + + task.mutable_resources()->CopyFrom(resources.get()); + remaining -= resources.get(); tasks.push_back(task); @@ -266,7 +268,7 @@ private: call.set_type(Call::ACKNOWLEDGE); Call::Acknowledge* ack = call.mutable_acknowledge(); - ack->mutable_slave_id()->CopyFrom(status.slave_id()); + ack->mutable_agent_id()->CopyFrom(status.agent_id()); ack->mutable_task_id ()->CopyFrom(status.task_id ()); ack->set_uuid(status.uuid()); @@ -393,7 +395,7 @@ int main(int argc, char** argv) } process::initialize(); - internal::logging::initialize(argv[0], flags, true); // Catch signals. + mesos::internal::logging::initialize(argv[0], flags, true); // Catch signals. FrameworkInfo framework; framework.set_name("Event Call Scheduler using libprocess (C++)"); http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/internal/devolve.cpp ---------------------------------------------------------------------- diff --git a/src/internal/devolve.cpp b/src/internal/devolve.cpp new file mode 100644 index 0000000..be74a26 --- /dev/null +++ b/src/internal/devolve.cpp @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stout/check.hpp> + +#include "internal/devolve.hpp" + +using std::string; + +namespace mesos { +namespace internal { + +template <typename T> +static T devolve(const google::protobuf::Message& message) +{ + T t; + + string data; + + // NOTE: We need to use 'SerializePartialToString' instead of + // 'SerializeToString' because some required fields might not be set + // and we don't want an exception to get thrown. + CHECK(message.SerializePartialToString(&data)) + << "Failed to serialize " << message.GetTypeName() + << " while devolving to " << t.GetTypeName(); + + // NOTE: We need to use 'ParsePartialFromString' instead of + // 'ParsePartialFromString' because some required fields might not + // be set and we don't want an exception to get thrown. + CHECK(t.ParsePartialFromString(data)) + << "Failed to parse " << t.GetTypeName() + << " while devolving from " << message.GetTypeName(); + + return t; +} + + +SlaveID devolve(const v1::AgentID& agentId) +{ + // NOTE: Not using 'devolve<v1::AgentID, SlaveID>(agentId)' since + // this will be a common 'devolve' call and we wanted to speed up + // performance. + + SlaveID id; + id.set_value(agentId.value()); + return id; +} + + +SlaveInfo devolve(const v1::AgentInfo& agentInfo) +{ + SlaveInfo info = devolve<SlaveInfo>(agentInfo); + + // We set 'checkpoint' to 'true' since the v1::AgentInfo doesn't + // have 'checkpoint' but all "slaves" were checkpointing by default + // when v1:;AgentInfo was introduced. See MESOS-2317. + info.set_checkpoint(true); + + return info; +} + + +FrameworkID devolve(const v1::FrameworkID& frameworkId) +{ + return devolve<FrameworkID>(frameworkId); +} + + +ExecutorID devolve(const v1::ExecutorID& executorId) +{ + return devolve<ExecutorID>(executorId); +} + + +Offer devolve(const v1::Offer& offer) +{ + return devolve<Offer>(offer); +} + + +Credential devolve(const v1::Credential& credential) +{ + return devolve<Credential>(credential); +} + + +scheduler::Call devolve(const v1::scheduler::Call& call) +{ + return devolve<scheduler::Call>(call); +} + + +scheduler::Event devolve(const v1::scheduler::Event& event) +{ + return devolve<scheduler::Event>(event); +} + +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/internal/devolve.hpp ---------------------------------------------------------------------- diff --git a/src/internal/devolve.hpp b/src/internal/devolve.hpp new file mode 100644 index 0000000..b9a854a --- /dev/null +++ b/src/internal/devolve.hpp @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __INTERNAL_DEVOLVE_HPP__ +#define __INTERNAL_DEVOLVE_HPP__ + +#include <google/protobuf/message.h> + +#include <mesos/mesos.hpp> + +#include <mesos/scheduler/scheduler.hpp> + +#include <mesos/v1/mesos.hpp> + +#include <mesos/v1/scheduler/scheduler.hpp> + +#include <stout/foreach.hpp> + +namespace mesos { +namespace internal { + +// Helpers for devolving types between versions. Please add as necessary! +SlaveID devolve(const v1::AgentID& agentId); +SlaveInfo devolve(const v1::AgentInfo& agentInfo); +FrameworkID devolve(const v1::FrameworkID& frameworkId); +ExecutorID devolve(const v1::ExecutorID& executorId); +Offer devolve(const v1::Offer& offer); +Credential devolve(const v1::Credential& credential); + +scheduler::Call devolve(const v1::scheduler::Call& call); +scheduler::Event devolve(const v1::scheduler::Event& event); + + +// Helper for repeated field devolving to 'T1' from 'T2'. +template <typename T1, typename T2> +google::protobuf::RepeatedPtrField<T1> devolve( + google::protobuf::RepeatedPtrField<T2> t2s) +{ + google::protobuf::RepeatedPtrField<T1> t1s; + + foreach (const T2& t2, t2s) { + t1s.Add()->CopyFrom(devolve(t2)); + } + + return t1s; +} + +} // namespace internal { +} // namespace mesos { + +#endif // __INTERNAL_DEVOLVE_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/internal/evolve.cpp ---------------------------------------------------------------------- diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp new file mode 100644 index 0000000..4678d67 --- /dev/null +++ b/src/internal/evolve.cpp @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <process/pid.hpp> + +#include <stout/check.hpp> + +#include "internal/evolve.hpp" + +using std::string; + +using process::UPID; + +namespace mesos { +namespace internal { + +// Helper for evolving a type by serializing/parsing when the types +// have not changed across versions. +template <typename T> +static T evolve(const google::protobuf::Message& message) +{ + T t; + + string data; + + // NOTE: We need to use 'SerializePartialToString' instead of + // 'SerializeToString' because some required fields might not be set + // and we don't want an exception to get thrown. + CHECK(message.SerializePartialToString(&data)) + << "Failed to serialize " << message.GetTypeName() + << " while evolving to " << t.GetTypeName(); + + // NOTE: We need to use 'ParsePartialFromString' instead of + // 'ParsePartialFromString' because some required fields might not + // be set and we don't want an exception to get thrown. + CHECK(t.ParsePartialFromString(data)) + << "Failed to parse " << t.GetTypeName() + << " while evolving from " << message.GetTypeName(); + + return t; +} + + +v1::AgentID evolve(const SlaveID& slaveId) +{ + // NOTE: Not using 'evolve<SlaveID, v1::AgentID>(slaveId)' since + // this will be a common 'evolve' call and we wanted to speed up + // performance. + + v1::AgentID id; + id.set_value(slaveId.value()); + return id; +} + + +v1::AgentInfo evolve(const SlaveInfo& slaveInfo) +{ + return evolve<v1::AgentInfo>(slaveInfo); +} + + +v1::FrameworkID evolve(const FrameworkID& frameworkId) +{ + return evolve<v1::FrameworkID>(frameworkId); +} + + +v1::ExecutorID evolve(const ExecutorID& executorId) +{ + return evolve<v1::ExecutorID>(executorId); +} + + +v1::Offer evolve(const Offer& offer) +{ + return evolve<v1::Offer>(offer); +} + + +v1::OfferID evolve(const OfferID& offerId) +{ + return evolve<v1::OfferID>(offerId); +} + + +v1::TaskInfo evolve(const TaskInfo& taskInfo) +{ + return evolve<v1::TaskInfo>(taskInfo); +} + + +v1::TaskStatus evolve(const TaskStatus& status) +{ + return evolve<v1::TaskStatus>(status); +} + + +v1::scheduler::Call evolve(const scheduler::Call& call) +{ + return evolve<v1::scheduler::Call>(call); +} + + +v1::scheduler::Event evolve(const FrameworkRegisteredMessage& message) +{ + v1::scheduler::Event event; + event.set_type(v1::scheduler::Event::SUBSCRIBED); + + v1::scheduler::Event::Subscribed* subscribed = event.mutable_subscribed(); + subscribed->mutable_framework_id()->CopyFrom(evolve(message.framework_id())); + + return event; +} + + +v1::scheduler::Event evolve(const FrameworkReregisteredMessage& message) +{ + v1::scheduler::Event event; + event.set_type(v1::scheduler::Event::SUBSCRIBED); + + v1::scheduler::Event::Subscribed* subscribed = event.mutable_subscribed(); + subscribed->mutable_framework_id()->CopyFrom(evolve(message.framework_id())); + + return event; +} + + +v1::scheduler::Event evolve(const ResourceOffersMessage& message) +{ + v1::scheduler::Event event; + event.set_type(v1::scheduler::Event::OFFERS); + + v1::scheduler::Event::Offers* offers = event.mutable_offers(); + offers->mutable_offers()->CopyFrom(evolve<v1::Offer>(message.offers())); + + return event; +} + + +v1::scheduler::Event evolve(const RescindResourceOfferMessage& message) +{ + v1::scheduler::Event event; + event.set_type(v1::scheduler::Event::RESCIND); + + v1::scheduler::Event::Rescind* rescind = event.mutable_rescind(); + rescind->mutable_offer_id()->CopyFrom(evolve(message.offer_id())); + + return event; +} + + +v1::scheduler::Event evolve(const StatusUpdateMessage& message) +{ + v1::scheduler::Event event; + event.set_type(v1::scheduler::Event::UPDATE); + + v1::scheduler::Event::Update* update = event.mutable_update(); + + update->mutable_status()->CopyFrom(evolve(message.update().status())); + + if (message.update().has_slave_id()) { + update->mutable_status()->mutable_agent_id()->CopyFrom( + evolve(message.update().slave_id())); + } + + if (message.update().has_executor_id()) { + update->mutable_status()->mutable_executor_id()->CopyFrom( + evolve(message.update().executor_id())); + } + + update->mutable_status()->set_timestamp(message.update().timestamp()); + + // If the update does not have a 'uuid', it does not need + // acknowledging. However, prior to 0.23.0, the update uuid + // was required and always set. In 0.24.0, we can rely on the + // update uuid check here, until then we must still check for + // this being sent from the driver (from == UPID()) or from + // the master (pid == UPID()). + // TODO(vinod): Get rid of this logic in 0.25.0 because master + // and slave correctly set task status in 0.24.0. + if (!message.update().has_uuid() || message.update().uuid() == "") { + update->mutable_status()->clear_uuid(); + } else if (UPID(message.pid()) == UPID()) { + update->mutable_status()->clear_uuid(); + } else { + update->mutable_status()->set_uuid(message.update().uuid()); + } + + return event; +} + + +v1::scheduler::Event evolve(const LostSlaveMessage& message) +{ + v1::scheduler::Event event; + event.set_type(v1::scheduler::Event::FAILURE); + + v1::scheduler::Event::Failure* failure = event.mutable_failure(); + failure->mutable_agent_id()->CopyFrom(evolve(message.slave_id())); + + return event; +} + + +v1::scheduler::Event evolve(const ExitedExecutorMessage& message) +{ + v1::scheduler::Event event; + event.set_type(v1::scheduler::Event::FAILURE); + + v1::scheduler::Event::Failure* failure = event.mutable_failure(); + failure->mutable_agent_id()->CopyFrom(evolve(message.slave_id())); + failure->mutable_executor_id()->CopyFrom(evolve(message.executor_id())); + failure->set_status(message.status()); + + return event; +} + + +v1::scheduler::Event evolve(const ExecutorToFrameworkMessage& message) +{ + v1::scheduler::Event event; + event.set_type(v1::scheduler::Event::MESSAGE); + + v1::scheduler::Event::Message* message_ = event.mutable_message(); + message_->mutable_agent_id()->CopyFrom(evolve(message.slave_id())); + message_->mutable_executor_id()->CopyFrom(evolve(message.executor_id())); + message_->set_data(message.data()); + + return event; +} + + +v1::scheduler::Event evolve(const FrameworkErrorMessage& message) +{ + v1::scheduler::Event event; + event.set_type(v1::scheduler::Event::ERROR); + + v1::scheduler::Event::Error* error = event.mutable_error(); + error->set_message(message.message()); + + return event; +} + +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/internal/evolve.hpp ---------------------------------------------------------------------- diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp new file mode 100644 index 0000000..2e03559 --- /dev/null +++ b/src/internal/evolve.hpp @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __INTERNAL_EVOLVE_HPP__ +#define __INTERNAL_EVOLVE_HPP__ + +#include <google/protobuf/message.h> + +#include <mesos/mesos.hpp> + +#include <mesos/scheduler/scheduler.hpp> + +#include <mesos/v1/mesos.hpp> + +#include <mesos/v1/scheduler/scheduler.hpp> + +#include <stout/foreach.hpp> + +#include "messages/messages.hpp" + +namespace mesos { +namespace internal { + +// Helpers for evolving types between versions. Please add as necessary! +v1::AgentID evolve(const SlaveID& slaveId); +v1::AgentInfo evolve(const SlaveInfo& slaveInfo); +v1::FrameworkID evolve(const FrameworkID& frameworkId); +v1::ExecutorID evolve(const ExecutorID& executorId); +v1::Offer evolve(const Offer& offer); +v1::OfferID evolve(const OfferID& offerId); +v1::TaskInfo evolve(const TaskInfo& taskInfo); +v1::TaskStatus evolve(const TaskStatus& status); + +v1::scheduler::Call evolve(const scheduler::Call& call); + + +// Helper for repeated field evolving to 'T1' from 'T2'. +template <typename T1, typename T2> +google::protobuf::RepeatedPtrField<T1> evolve( + google::protobuf::RepeatedPtrField<T2> t2s) +{ + google::protobuf::RepeatedPtrField<T1> t1s; + + foreach (const T2& t2, t2s) { + t1s.Add()->CopyFrom(evolve(t2)); + } + + return t1s; +} + + +// Helper functions that evolve old style internal messages to a +// v1::scheduler::Event. +v1::scheduler::Event evolve(const FrameworkRegisteredMessage& message); +v1::scheduler::Event evolve(const FrameworkReregisteredMessage& message); +v1::scheduler::Event evolve(const ResourceOffersMessage& message); +v1::scheduler::Event evolve(const RescindResourceOfferMessage& message); +v1::scheduler::Event evolve(const StatusUpdateMessage& message); +v1::scheduler::Event evolve(const LostSlaveMessage& message); +v1::scheduler::Event evolve(const ExitedExecutorMessage& message); +v1::scheduler::Event evolve(const ExecutorToFrameworkMessage& message); +v1::scheduler::Event evolve(const FrameworkErrorMessage& message); + +} // namespace internal { +} // namespace mesos { + +#endif // __INTERNAL_EVOLVE_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/master/contender.hpp ---------------------------------------------------------------------- diff --git a/src/master/contender.hpp b/src/master/contender.hpp index 62bcff4..927601c 100644 --- a/src/master/contender.hpp +++ b/src/master/contender.hpp @@ -49,6 +49,8 @@ class ZooKeeperMasterContenderProcess; // An abstraction for contending to be a leading master. +// +// TODO(benh): Support contending with a v1::MasterInfo. class MasterContender { public: http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index b288b8a..2f2de4f 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -60,6 +60,9 @@ #include "files/files.hpp" +#include "internal/devolve.hpp" +#include "internal/evolve.hpp" + #include "master/constants.hpp" #include "master/contender.hpp" #include "master/detector.hpp" @@ -1250,7 +1253,12 @@ struct HttpConnection // Converts the message to an Event before sending. template <typename Message> bool send(const Message& message) { - return writer.write(encoder.encode(protobuf::scheduler::event(message))); + // We need to evolve the internal "message" into a + // 'v1::scheduler::Event' which we then devolve back to an + // pre-versioned 'scheduler::Event' which we use internally. + // + // TODO(benh): This should only support v1! + return writer.write(encoder.encode(devolve(evolve(message)))); } bool close() @@ -1360,8 +1368,6 @@ struct Framework } if (http.isSome()) { - const scheduler::Event event = protobuf::scheduler::event(message); - if (!http.get().send(message)) { LOG(WARNING) << "Unable to send event to framework " << *this << ":" << " connection closed"; http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/scheduler/scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index a0df048..a8699a7 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -29,14 +29,13 @@ #include <string> #include <sstream> -#include <mesos/mesos.hpp> -#include <mesos/scheduler.hpp> -#include <mesos/type_utils.hpp> - #include <mesos/authentication/authenticatee.hpp> #include <mesos/module/authenticatee.hpp> +#include <mesos/v1/mesos.hpp> +#include <mesos/v1/scheduler.hpp> + #include <process/async.hpp> #include <process/defer.hpp> #include <process/delay.hpp> @@ -61,16 +60,17 @@ #include "authentication/cram_md5/authenticatee.hpp" -#include "common/protobuf_utils.hpp" - -#include "master/detector.hpp" -#include "master/validation.hpp" +#include "internal/devolve.hpp" +#include "internal/evolve.hpp" #include "local/local.hpp" #include "logging/flags.hpp" #include "logging/logging.hpp" +#include "master/detector.hpp" +#include "master/validation.hpp" + #include "messages/messages.hpp" using namespace mesos; @@ -86,6 +86,7 @@ using std::vector; using process::wait; // Necessary on some OS's to disambiguate. namespace mesos { +namespace v1 { namespace scheduler { // The process (below) is responsible for receiving messages @@ -192,7 +193,7 @@ public: return; } - Option<Error> error = validation::scheduler::call::validate(call); + Option<Error> error = validation::scheduler::call::validate(devolve(call)); if (error.isSome()) { drop(call, error.get().message); @@ -202,7 +203,7 @@ public: // TODO(vinod): Add support for sending MESSAGE calls directly // to the slave, instead of relaying it through the master, as // the scheduler driver does. - send(master.get(), call); + send(master.get(), devolve(call)); } protected: @@ -223,7 +224,7 @@ protected: .onAny(defer(self(), &MesosProcess::detected, lambda::_1)); } - void detected(const Future<Option<MasterInfo> >& future) + void detected(const Future<Option<mesos::MasterInfo>>& future) { CHECK(!future.isDiscarded()); @@ -313,9 +314,11 @@ protected: // 'Authenticatee'. // --> '~Authenticatee()' is invoked by 'AuthenticateeProcess'. // TODO(vinod): Consider using 'Shared' to 'Owned' upgrade. - authenticating = - authenticatee->authenticate(master.get(), self(), credential.get()) - .onAny(defer(self(), &Self::_authenticate)); + authenticating = authenticatee->authenticate( + master.get(), + self(), + devolve(credential.get())) + .onAny(defer(self(), &Self::_authenticate)); delay(Seconds(5), self(), @@ -433,47 +436,47 @@ protected: void receive(const UPID& from, const FrameworkRegisteredMessage& message) { - receive(from, protobuf::scheduler::event(message)); + receive(from, evolve(message)); } void receive(const UPID& from, const FrameworkReregisteredMessage& message) { - receive(from, protobuf::scheduler::event(message)); + receive(from, evolve(message)); } void receive(const UPID& from, const ResourceOffersMessage& message) { - receive(from, protobuf::scheduler::event(message)); + receive(from, evolve(message)); } void receive(const UPID& from, const RescindResourceOfferMessage& message) { - receive(from, protobuf::scheduler::event(message)); + receive(from, evolve(message)); } void receive(const UPID& from, const StatusUpdateMessage& message) { - receive(from, protobuf::scheduler::event(message)); + receive(from, evolve(message)); } void receive(const UPID& from, const LostSlaveMessage& message) { - receive(from, protobuf::scheduler::event(message)); + receive(from, evolve(message)); } void receive(const UPID& from, const ExitedExecutorMessage& message) { - receive(from, protobuf::scheduler::event(message)); + receive(from, evolve(message)); } void receive(const UPID& from, const ExecutorToFrameworkMessage& message) { - receive(from, protobuf::scheduler::event(message)); + receive(from, evolve(message)); } void receive(const UPID& from, const FrameworkErrorMessage& message) { - receive(from, protobuf::scheduler::event(message)); + receive(from, evolve(message)); } // Helper for injecting an ERROR event. @@ -563,6 +566,6 @@ void Mesos::send(const Call& call) dispatch(process, &MesosProcess::send, call); } - } // namespace scheduler { +} // namespace v1 { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 20418d4..8b48614 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -50,6 +50,7 @@ #include <stout/lambda.hpp> #include <stout/none.hpp> #include <stout/option.hpp> +#include <stout/os.hpp> #include <stout/stringify.hpp> #include <stout/try.hpp> #include <stout/uuid.hpp> @@ -347,6 +348,13 @@ protected: executor; }) +#define DEFAULT_V1_EXECUTOR_INFO \ + ({ v1::ExecutorInfo executor; \ + executor.mutable_executor_id()->set_value("default"); \ + executor.mutable_command()->set_value("exit 1"); \ + executor; }) + + #define CREATE_EXECUTOR_INFO(executorId, command) \ ({ ExecutorInfo executor; \ executor.mutable_executor_id()->set_value(executorId); \ @@ -361,10 +369,25 @@ protected: credential; }) +#define DEFAULT_V1_CREDENTIAL \ + ({ v1::Credential credential; \ + credential.set_principal("test-principal"); \ + credential.set_secret("test-secret"); \ + credential; }) + + #define DEFAULT_FRAMEWORK_INFO \ ({ FrameworkInfo framework; \ framework.set_name("default"); \ - framework.set_user(""); \ + framework.set_user(os::user().get()); \ + framework.set_principal(DEFAULT_CREDENTIAL.principal()); \ + framework; }) + + +#define DEFAULT_V1_FRAMEWORK_INFO \ + ({ v1::FrameworkInfo framework; \ + framework.set_name("default"); \ + framework.set_user(os::user().get()); \ framework.set_principal(DEFAULT_CREDENTIAL.principal()); \ framework; }) @@ -373,6 +396,10 @@ protected: DEFAULT_EXECUTOR_INFO.executor_id() +#define DEFAULT_V1_EXECUTOR_ID \ + DEFAULT_V1_EXECUTOR_INFO.executor_id() + + #define DEFAULT_CONTAINER_ID \ ({ ContainerID containerId; \ containerId.set_value("container"); \ http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/tests/scheduler_driver_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_driver_tests.cpp b/src/tests/scheduler_driver_tests.cpp new file mode 100644 index 0000000..4963f5d --- /dev/null +++ b/src/tests/scheduler_driver_tests.cpp @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <gmock/gmock.h> + +#include <string> +#include <vector> + +#include <mesos/executor.hpp> +#include <mesos/scheduler.hpp> +#include <mesos/type_utils.hpp> + +#include <mesos/scheduler/scheduler.hpp> + +#include <process/clock.hpp> +#include <process/future.hpp> +#include <process/gmock.hpp> +#include <process/gtest.hpp> +#include <process/http.hpp> +#include <process/pid.hpp> + +#include <process/metrics/metrics.hpp> + +#include <stout/json.hpp> +#include <stout/lambda.hpp> +#include <stout/try.hpp> +#include <stout/uuid.hpp> + +#include "master/allocator/mesos/allocator.hpp" + +#include "master/master.hpp" + +#include "tests/containerizer.hpp" +#include "tests/mesos.hpp" + +using mesos::internal::master::allocator::MesosAllocatorProcess; + +using mesos::internal::master::Master; + +using mesos::internal::slave::Containerizer; +using mesos::internal::slave::Slave; + +using process::Clock; +using process::Future; +using process::PID; + +using process::http::OK; + +using process::metrics::internal::MetricsProcess; + +using std::string; +using std::vector; + +using testing::_; +using testing::AtMost; +using testing::DoAll; +using testing::Return; + +namespace mesos { +namespace internal { +namespace tests { + +class MesosSchedulerDriverTest : public MesosTest {}; + + +TEST_F(MesosSchedulerDriverTest, MetricsEndpoint) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + Future<Nothing> registered; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureSatisfy(®istered)); + + ASSERT_EQ(DRIVER_RUNNING, driver.start()); + + AWAIT_READY(registered); + + Future<process::http::Response> response = + process::http::get(MetricsProcess::instance()->self(), "/snapshot"); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + + EXPECT_SOME_EQ( + "application/json", + response.get().headers.get("Content-Type")); + + Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); + + ASSERT_SOME(parse); + + JSON::Object metrics = parse.get(); + + EXPECT_EQ(1u, metrics.values.count("scheduler/event_queue_messages")); + EXPECT_EQ(1u, metrics.values.count("scheduler/event_queue_dispatches")); + + driver.stop(); + driver.join(); + + Shutdown(); +} + + +// This action calls driver stop() followed by abort(). +ACTION(StopAndAbort) +{ + arg0->stop(); + arg0->abort(); +} + + +// This test verifies that when the scheduler calls stop() before +// abort(), no pending acknowledgements are sent. +TEST_F(MesosSchedulerDriverTest, DropAckIfStopCalledBeforeAbort) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + TestContainerizer containerizer(&exec); + Try<PID<Slave>> slave = StartSlave(&containerizer); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 16, "*")) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + // When an update is received, stop the driver and then abort it. + Future<Nothing> statusUpdate; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(DoAll(StopAndAbort(), + FutureSatisfy(&statusUpdate))); + + // Ensure no status update acknowledgements are sent from the driver + // to the master. + EXPECT_NO_FUTURE_CALLS( + mesos::scheduler::Call(), + mesos::scheduler::Call::ACKNOWLEDGE, + _ , + master.get()); + + EXPECT_CALL(exec, registered(_, _, _, _)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.start(); + + AWAIT_READY(statusUpdate); + + // Settle the clock to ensure driver finishes processing the status + // update and sends acknowledgement if necessary. In this test it + // shouldn't send an acknowledgement. + Clock::pause(); + Clock::settle(); + + driver.stop(); + driver.join(); + + Shutdown(); +} + + +// Ensures that when a scheduler enables explicit acknowledgements +// on the driver, there are no implicit acknowledgements sent, and +// the call to 'acknowledgeStatusUpdate' sends the ack to the master. +TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + TestContainerizer containerizer(&exec); + Try<PID<Slave>> slave = StartSlave(&containerizer); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), false, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 16, "*")) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + // Ensure no status update acknowledgements are sent from the driver + // to the master until the explicit acknowledgement is sent. + EXPECT_NO_FUTURE_CALLS( + mesos::scheduler::Call(), + mesos::scheduler::Call::ACKNOWLEDGE, + _ , + master.get()); + + EXPECT_CALL(exec, registered(_, _, _, _)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.start(); + + AWAIT_READY(status); + + // Settle the clock to ensure driver finishes processing the status + // update, we want to ensure that no implicit acknowledgement gets + // sent. + Clock::pause(); + Clock::settle(); + + // Now send the acknowledgement. + Future<mesos::scheduler::Call> acknowledgement = FUTURE_CALL( + mesos::scheduler::Call(), + mesos::scheduler::Call::ACKNOWLEDGE, + _, + master.get()); + + driver.acknowledgeStatusUpdate(status.get()); + + AWAIT_READY(acknowledgement); + + driver.stop(); + driver.join(); + + Shutdown(); +} + + +// This test ensures that when explicit acknowledgements are enabled, +// acknowledgements for master-generated updates are dropped by the +// driver. We test this by creating an invalid task that uses no +// resources. +TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsMasterGeneratedUpdate) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + Try<PID<Slave>> slave = StartSlave(); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), false, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + // Ensure no status update acknowledgements are sent to the master. + EXPECT_NO_FUTURE_CALLS( + mesos::scheduler::Call(), + mesos::scheduler::Call::ACKNOWLEDGE, + _ , + master.get()); + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + // Launch a task using no resources. + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); + task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); + + vector<TaskInfo> tasks; + tasks.push_back(task); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY(status); + ASSERT_EQ(TASK_ERROR, status.get().state()); + ASSERT_EQ(TaskStatus::SOURCE_MASTER, status.get().source()); + ASSERT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason()); + + // Now send the acknowledgement. + driver.acknowledgeStatusUpdate(status.get()); + + // Settle the clock to ensure driver processes the acknowledgement, + // which should get dropped due to having come from the master. + Clock::pause(); + Clock::settle(); + + driver.stop(); + driver.join(); + + Shutdown(); +} + + +// This test ensures that the driver handles an empty slave id +// in an acknowledgement message by dropping it. The driver will +// log an error in this case (but we don't test for that). We +// generate a status with no slave id by performing reconciliation. +TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsUnsetSlaveID) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), false, DEFAULT_CREDENTIAL); + + Future<Nothing> registered; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureSatisfy(®istered)); + + // Ensure no status update acknowledgements are sent to the master. + EXPECT_NO_FUTURE_CALLS( + mesos::scheduler::Call(), + mesos::scheduler::Call::ACKNOWLEDGE, + _ , + master.get()); + + driver.start(); + + AWAIT_READY(registered); + + Future<TaskStatus> update; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&update)); + + // Peform reconciliation without using a slave id. + vector<TaskStatus> statuses; + + TaskStatus status; + status.mutable_task_id()->set_value("foo"); + status.set_state(TASK_RUNNING); + + statuses.push_back(status); + + driver.reconcileTasks(statuses); + + AWAIT_READY(update); + ASSERT_EQ(TASK_LOST, update.get().state()); + ASSERT_EQ(TaskStatus::SOURCE_MASTER, update.get().source()); + ASSERT_EQ(TaskStatus::REASON_RECONCILIATION, update.get().reason()); + ASSERT_FALSE(update.get().has_slave_id()); + + // Now send the acknowledgement. + driver.acknowledgeStatusUpdate(update.get()); + + // Settle the clock to ensure driver processes the acknowledgement, + // which should get dropped due to the missing slave id. + Clock::pause(); + Clock::settle(); + + driver.stop(); + driver.join(); + + Shutdown(); +} + +} // namespace tests { +} // namespace internal { +} // namespace mesos {
