http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 9d29d1a..3f01c06 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -18,32 +18,29 @@
 
 #include <gmock/gmock.h>
 
-#include <memory>
 #include <string>
 #include <queue>
-#include <vector>
 
 #include <mesos/executor.hpp>
-#include <mesos/scheduler.hpp>
-#include <mesos/type_utils.hpp>
 
-#include <mesos/scheduler/scheduler.hpp>
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
 
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
-#include <process/http.hpp>
-#include <process/owned.hpp>
 #include <process/pid.hpp>
 #include <process/queue.hpp>
 
-#include <process/metrics/metrics.hpp>
-
-#include <stout/json.hpp>
 #include <stout/lambda.hpp>
 #include <stout/try.hpp>
-#include <stout/uuid.hpp>
+
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
 
 #include "master/allocator/mesos/allocator.hpp"
 
@@ -59,23 +56,16 @@ using mesos::internal::master::Master;
 using mesos::internal::slave::Containerizer;
 using mesos::internal::slave::Slave;
 
-using mesos::scheduler::Call;
-using mesos::scheduler::Event;
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+using mesos::v1::scheduler::Mesos;
 
 using process::Clock;
 using process::Future;
-using process::Owned;
 using process::PID;
-using process::Promise;
 using process::Queue;
 
-using process::http::OK;
-
-using process::metrics::internal::MetricsProcess;
-
 using std::string;
-using std::queue;
-using std::vector;
 
 using testing::_;
 using testing::AtMost;
@@ -126,9 +116,9 @@ TEST_F(SchedulerTest, Subscribe)
   EXPECT_CALL(callbacks, connected())
     .WillOnce(FutureSatisfy(&connected));
 
-  scheduler::Mesos mesos(
+  Mesos mesos(
       master.get(),
-      DEFAULT_CREDENTIAL,
+      DEFAULT_V1_CREDENTIAL,
       lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
@@ -145,7 +135,7 @@ TEST_F(SchedulerTest, Subscribe)
     call.set_type(Call::SUBSCRIBE);
 
     Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
     subscribe->set_force(true);
 
     mesos.send(call);
@@ -155,7 +145,7 @@ TEST_F(SchedulerTest, Subscribe)
   AWAIT_READY(event);
   EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().subscribed().framework_id());
+  v1::FrameworkID id(event.get().subscribed().framework_id());
 
   // Resubscribe with the same framework id.
   {
@@ -164,7 +154,7 @@ TEST_F(SchedulerTest, Subscribe)
     call.set_type(Call::SUBSCRIBE);
 
     Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
     subscribe->mutable_framework_info()->mutable_id()->CopyFrom(id);
     subscribe->set_force(true);
 
@@ -198,9 +188,9 @@ TEST_F(SchedulerTest, TaskRunning)
   EXPECT_CALL(callbacks, connected())
     .WillOnce(FutureSatisfy(&connected));
 
-  scheduler::Mesos mesos(
+  Mesos mesos(
       master.get(),
-      DEFAULT_CREDENTIAL,
+      DEFAULT_V1_CREDENTIAL,
       lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
@@ -217,7 +207,7 @@ TEST_F(SchedulerTest, TaskRunning)
     call.set_type(Call::SUBSCRIBE);
 
     Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
 
     mesos.send(call);
   }
@@ -226,7 +216,7 @@ TEST_F(SchedulerTest, TaskRunning)
   AWAIT_READY(event);
   EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().subscribed().framework_id());
+  v1::FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
@@ -245,14 +235,14 @@ TEST_F(SchedulerTest, TaskRunning)
                     Return(Nothing())))
     .WillRepeatedly(Return(Future<Nothing>())); // Ignore subsequent calls.
 
-  TaskInfo taskInfo;
+  v1::TaskInfo taskInfo;
   taskInfo.set_name("");
   taskInfo.mutable_task_id()->set_value("1");
-  taskInfo.mutable_slave_id()->CopyFrom(
-      event.get().offers().offers(0).slave_id());
+  taskInfo.mutable_agent_id()->CopyFrom(
+      event.get().offers().offers(0).agent_id());
   taskInfo.mutable_resources()->CopyFrom(
       event.get().offers().offers(0).resources());
-  taskInfo.mutable_executor()->CopyFrom(DEFAULT_EXECUTOR_INFO);
+  taskInfo.mutable_executor()->CopyFrom(DEFAULT_V1_EXECUTOR_INFO);
 
   // TODO(benh): Enable just running a task with a command in the tests:
   //   taskInfo.mutable_command()->set_value("sleep 10");
@@ -265,8 +255,8 @@ TEST_F(SchedulerTest, TaskRunning)
     Call::Accept* accept = call.mutable_accept();
     accept->add_offer_ids()->CopyFrom(event.get().offers().offers(0).id());
 
-    Offer::Operation* operation = accept->add_operations();
-    operation->set_type(Offer::Operation::LAUNCH);
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
     operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
 
     mesos.send(call);
@@ -275,9 +265,9 @@ TEST_F(SchedulerTest, TaskRunning)
   event = events.get();
   AWAIT_READY(event);
   EXPECT_EQ(Event::UPDATE, event.get().type());
-  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+  EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
   EXPECT_TRUE(event.get().update().status().has_executor_id());
-  EXPECT_EQ(exec.id, event.get().update().status().executor_id());
+  EXPECT_EQ(exec.id, devolve(event.get().update().status().executor_id()));
 
   AWAIT_READY(update);
 
@@ -306,9 +296,9 @@ TEST_F(SchedulerTest, ReconcileTask)
   EXPECT_CALL(callbacks, connected())
     .WillOnce(FutureSatisfy(&connected));
 
-  scheduler::Mesos mesos(
+  Mesos mesos(
       master.get(),
-      DEFAULT_CREDENTIAL,
+      DEFAULT_V1_CREDENTIAL,
       lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
@@ -325,7 +315,7 @@ TEST_F(SchedulerTest, ReconcileTask)
     call.set_type(Call::SUBSCRIBE);
 
     Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
 
     mesos.send(call);
   }
@@ -334,7 +324,7 @@ TEST_F(SchedulerTest, ReconcileTask)
   AWAIT_READY(event);
   EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().subscribed().framework_id());
+  v1::FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
@@ -347,8 +337,10 @@ TEST_F(SchedulerTest, ReconcileTask)
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
 
-  Offer offer = event.get().offers().offers(0);
-  TaskInfo taskInfo = createTask(offer, "", DEFAULT_EXECUTOR_ID);
+  v1::Offer offer = event.get().offers().offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
 
   {
     Call call;
@@ -358,8 +350,8 @@ TEST_F(SchedulerTest, ReconcileTask)
     Call::Accept* accept = call.mutable_accept();
     accept->add_offer_ids()->CopyFrom(offer.id());
 
-    Offer::Operation* operation = accept->add_operations();
-    operation->set_type(Offer::Operation::LAUNCH);
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
     operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
 
     mesos.send(call);
@@ -368,7 +360,7 @@ TEST_F(SchedulerTest, ReconcileTask)
   event = events.get();
   AWAIT_READY(event);
   EXPECT_EQ(Event::UPDATE, event.get().type());
-  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+  EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
 
   {
     Call call;
@@ -385,8 +377,8 @@ TEST_F(SchedulerTest, ReconcileTask)
   AWAIT_READY(event);
   EXPECT_EQ(Event::UPDATE, event.get().type());
   EXPECT_FALSE(event.get().update().status().has_uuid());
-  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
-  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION,
+  EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
+  EXPECT_EQ(v1::TaskStatus::REASON_RECONCILIATION,
             event.get().update().status().reason());
 
   EXPECT_CALL(exec, shutdown(_))
@@ -414,9 +406,9 @@ TEST_F(SchedulerTest, KillTask)
   EXPECT_CALL(callbacks, connected())
     .WillOnce(FutureSatisfy(&connected));
 
-  scheduler::Mesos mesos(
+  Mesos mesos(
       master.get(),
-      DEFAULT_CREDENTIAL,
+      DEFAULT_V1_CREDENTIAL,
       lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
@@ -433,7 +425,7 @@ TEST_F(SchedulerTest, KillTask)
     call.set_type(Call::SUBSCRIBE);
 
     Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
 
     mesos.send(call);
   }
@@ -442,7 +434,7 @@ TEST_F(SchedulerTest, KillTask)
   AWAIT_READY(event);
   EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().subscribed().framework_id());
+  v1::FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
@@ -455,8 +447,10 @@ TEST_F(SchedulerTest, KillTask)
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
 
-  Offer offer = event.get().offers().offers(0);
-  TaskInfo taskInfo = createTask(offer, "", DEFAULT_EXECUTOR_ID);
+  v1::Offer offer = event.get().offers().offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
 
   {
     Call call;
@@ -466,8 +460,8 @@ TEST_F(SchedulerTest, KillTask)
     Call::Accept* accept = call.mutable_accept();
     accept->add_offer_ids()->CopyFrom(offer.id());
 
-    Offer::Operation* operation = accept->add_operations();
-    operation->set_type(Offer::Operation::LAUNCH);
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
     operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
 
     mesos.send(call);
@@ -476,7 +470,7 @@ TEST_F(SchedulerTest, KillTask)
   event = events.get();
   AWAIT_READY(event);
   EXPECT_EQ(Event::UPDATE, event.get().type());
-  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+  EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
 
   {
     // Acknowledge TASK_RUNNING update.
@@ -486,7 +480,7 @@ TEST_F(SchedulerTest, KillTask)
 
     Call::Acknowledge* acknowledge = call.mutable_acknowledge();
     acknowledge->mutable_task_id()->CopyFrom(taskInfo.task_id());
-    acknowledge->mutable_slave_id()->CopyFrom(offer.slave_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
     acknowledge->set_uuid(event.get().update().status().uuid());
 
     mesos.send(call);
@@ -502,7 +496,7 @@ TEST_F(SchedulerTest, KillTask)
 
     Call::Kill* kill = call.mutable_kill();
     kill->mutable_task_id()->CopyFrom(taskInfo.task_id());
-    kill->mutable_slave_id()->CopyFrom(offer.slave_id());
+    kill->mutable_agent_id()->CopyFrom(offer.agent_id());
 
     mesos.send(call);
   }
@@ -510,7 +504,7 @@ TEST_F(SchedulerTest, KillTask)
   event = events.get();
   AWAIT_READY(event);
   EXPECT_EQ(Event::UPDATE, event.get().type());
-  EXPECT_EQ(TASK_KILLED, event.get().update().status().state());
+  EXPECT_EQ(v1::TASK_KILLED, event.get().update().status().state());
 
   EXPECT_CALL(exec, shutdown(_))
     .Times(AtMost(1));
@@ -537,9 +531,9 @@ TEST_F(SchedulerTest, ShutdownExecutor)
   EXPECT_CALL(callbacks, connected())
     .WillOnce(FutureSatisfy(&connected));
 
-  scheduler::Mesos mesos(
+  Mesos mesos(
       master.get(),
-      DEFAULT_CREDENTIAL,
+      DEFAULT_V1_CREDENTIAL,
       lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
@@ -556,7 +550,7 @@ TEST_F(SchedulerTest, ShutdownExecutor)
     call.set_type(Call::SUBSCRIBE);
 
     Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
 
     mesos.send(call);
   }
@@ -565,7 +559,7 @@ TEST_F(SchedulerTest, ShutdownExecutor)
   AWAIT_READY(event);
   EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().subscribed().framework_id());
+  v1::FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
@@ -578,8 +572,10 @@ TEST_F(SchedulerTest, ShutdownExecutor)
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
 
-  Offer offer = event.get().offers().offers(0);
-  TaskInfo taskInfo = createTask(offer, "", DEFAULT_EXECUTOR_ID);
+  v1::Offer offer = event.get().offers().offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
 
   {
     Call call;
@@ -589,8 +585,8 @@ TEST_F(SchedulerTest, ShutdownExecutor)
     Call::Accept* accept = call.mutable_accept();
     accept->add_offer_ids()->CopyFrom(offer.id());
 
-    Offer::Operation* operation = accept->add_operations();
-    operation->set_type(Offer::Operation::LAUNCH);
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
     operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
 
     mesos.send(call);
@@ -599,7 +595,7 @@ TEST_F(SchedulerTest, ShutdownExecutor)
   event = events.get();
   AWAIT_READY(event);
   EXPECT_EQ(Event::UPDATE, event.get().type());
-  EXPECT_EQ(TASK_FINISHED, event.get().update().status().state());
+  EXPECT_EQ(v1::TASK_FINISHED, event.get().update().status().state());
 
   Future<Nothing> shutdown;
   EXPECT_CALL(exec, shutdown(_))
@@ -611,20 +607,20 @@ TEST_F(SchedulerTest, ShutdownExecutor)
     call.set_type(Call::SHUTDOWN);
 
     Call::Shutdown* shutdown = call.mutable_shutdown();
-    shutdown->mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID);
-    shutdown->mutable_slave_id()->CopyFrom(offer.slave_id());
+    shutdown->mutable_executor_id()->CopyFrom(DEFAULT_V1_EXECUTOR_ID);
+    shutdown->mutable_agent_id()->CopyFrom(offer.agent_id());
 
     mesos.send(call);
   }
 
   AWAIT_READY(shutdown);
-  containerizer.destroy(id, DEFAULT_EXECUTOR_ID);
+  containerizer.destroy(devolve(id), DEFAULT_EXECUTOR_ID);
 
   // Executor termination results in a 'FAILURE' event.
   event = events.get();
   AWAIT_READY(event);
   EXPECT_EQ(Event::FAILURE, event.get().type());
-  ExecutorID executorId(DEFAULT_EXECUTOR_ID);
+  v1::ExecutorID executorId(DEFAULT_V1_EXECUTOR_ID);
   EXPECT_EQ(executorId, event.get().failure().executor_id());
 
   Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
@@ -649,9 +645,9 @@ TEST_F(SchedulerTest, Teardown)
   EXPECT_CALL(callbacks, connected())
     .WillOnce(FutureSatisfy(&connected));
 
-  scheduler::Mesos mesos(
+  Mesos mesos(
       master.get(),
-      DEFAULT_CREDENTIAL,
+      DEFAULT_V1_CREDENTIAL,
       lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
@@ -668,7 +664,7 @@ TEST_F(SchedulerTest, Teardown)
     call.set_type(Call::SUBSCRIBE);
 
     Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
 
     mesos.send(call);
   }
@@ -677,7 +673,7 @@ TEST_F(SchedulerTest, Teardown)
   AWAIT_READY(event);
   EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().subscribed().framework_id());
+  v1::FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
@@ -690,8 +686,10 @@ TEST_F(SchedulerTest, Teardown)
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
 
-  Offer offer = event.get().offers().offers(0);
-  TaskInfo taskInfo = createTask(offer, "", DEFAULT_EXECUTOR_ID);
+  v1::Offer offer = event.get().offers().offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
 
   {
     Call call;
@@ -701,8 +699,8 @@ TEST_F(SchedulerTest, Teardown)
     Call::Accept* accept = call.mutable_accept();
     accept->add_offer_ids()->CopyFrom(offer.id());
 
-    Offer::Operation* operation = accept->add_operations();
-    operation->set_type(Offer::Operation::LAUNCH);
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
     operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
 
     mesos.send(call);
@@ -711,7 +709,7 @@ TEST_F(SchedulerTest, Teardown)
   event = events.get();
   AWAIT_READY(event);
   EXPECT_EQ(Event::UPDATE, event.get().type());
-  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+  EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
 
   Future<Nothing> shutdown;
   EXPECT_CALL(exec, shutdown(_))
@@ -745,9 +743,9 @@ TEST_F(SchedulerTest, Decline)
   EXPECT_CALL(callbacks, connected())
     .WillOnce(FutureSatisfy(&connected));
 
-  scheduler::Mesos mesos(
+  Mesos mesos(
       master.get(),
-      DEFAULT_CREDENTIAL,
+      DEFAULT_V1_CREDENTIAL,
       lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
@@ -764,7 +762,7 @@ TEST_F(SchedulerTest, Decline)
     call.set_type(Call::SUBSCRIBE);
 
     Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
 
     mesos.send(call);
   }
@@ -773,14 +771,14 @@ TEST_F(SchedulerTest, Decline)
   AWAIT_READY(event);
   EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().subscribed().framework_id());
+  v1::FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
   EXPECT_EQ(Event::OFFERS, event.get().type());
   ASSERT_EQ(1, event.get().offers().offers().size());
 
-  Offer offer = event.get().offers().offers(0);
+  v1::Offer offer = event.get().offers().offers(0);
   {
     Call call;
     call.mutable_framework_id()->CopyFrom(id);
@@ -790,7 +788,7 @@ TEST_F(SchedulerTest, Decline)
     decline->add_offer_ids()->CopyFrom(offer.id());
 
     // Set 0s filter to immediately get another offer.
-    Filters filters;
+    v1::Filters filters;
     filters.set_refuse_seconds(0);
     decline->mutable_filters()->CopyFrom(filters);
 
@@ -823,9 +821,9 @@ TEST_F(SchedulerTest, Revive)
   EXPECT_CALL(callbacks, connected())
     .WillOnce(FutureSatisfy(&connected));
 
-  scheduler::Mesos mesos(
+  Mesos mesos(
       master.get(),
-      DEFAULT_CREDENTIAL,
+      DEFAULT_V1_CREDENTIAL,
       lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
@@ -842,7 +840,7 @@ TEST_F(SchedulerTest, Revive)
     call.set_type(Call::SUBSCRIBE);
 
     Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
 
     mesos.send(call);
   }
@@ -851,14 +849,14 @@ TEST_F(SchedulerTest, Revive)
   AWAIT_READY(event);
   EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().subscribed().framework_id());
+  v1::FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
   EXPECT_EQ(Event::OFFERS, event.get().type());
   EXPECT_NE(0, event.get().offers().offers().size());
 
-  Offer offer = event.get().offers().offers(0);
+  v1::Offer offer = event.get().offers().offers(0);
   {
     Call call;
     call.mutable_framework_id()->CopyFrom(id);
@@ -868,7 +866,7 @@ TEST_F(SchedulerTest, Revive)
     decline->add_offer_ids()->CopyFrom(offer.id());
 
     // Set 1hr filter to not immediately get another offer.
-    Filters filters;
+    v1::Filters filters;
     filters.set_refuse_seconds(Hours(1).secs());
     decline->mutable_filters()->CopyFrom(filters);
 
@@ -921,9 +919,9 @@ TEST_F(SchedulerTest, Message)
   EXPECT_CALL(callbacks, connected())
     .WillOnce(FutureSatisfy(&connected));
 
-  scheduler::Mesos mesos(
+  Mesos mesos(
       master.get(),
-      DEFAULT_CREDENTIAL,
+      DEFAULT_V1_CREDENTIAL,
       lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
@@ -940,7 +938,7 @@ TEST_F(SchedulerTest, Message)
     call.set_type(Call::SUBSCRIBE);
 
     Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
 
     mesos.send(call);
   }
@@ -949,7 +947,7 @@ TEST_F(SchedulerTest, Message)
   AWAIT_READY(event);
   EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().subscribed().framework_id());
+  v1::FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
@@ -962,8 +960,10 @@ TEST_F(SchedulerTest, Message)
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
 
-  Offer offer = event.get().offers().offers(0);
-  TaskInfo taskInfo = createTask(offer, "", DEFAULT_EXECUTOR_ID);
+  v1::Offer offer = event.get().offers().offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
 
   {
     Call call;
@@ -973,8 +973,8 @@ TEST_F(SchedulerTest, Message)
     Call::Accept* accept = call.mutable_accept();
     accept->add_offer_ids()->CopyFrom(offer.id());
 
-    Offer::Operation* operation = accept->add_operations();
-    operation->set_type(Offer::Operation::LAUNCH);
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
     operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
 
     mesos.send(call);
@@ -983,7 +983,7 @@ TEST_F(SchedulerTest, Message)
   event = events.get();
   AWAIT_READY(event);
   EXPECT_EQ(Event::UPDATE, event.get().type());
-  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+  EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
 
   Future<string> data;
   EXPECT_CALL(exec, frameworkMessage(_, _))
@@ -995,8 +995,8 @@ TEST_F(SchedulerTest, Message)
     call.set_type(Call::MESSAGE);
 
     Call::Message* message = call.mutable_message();
-    message->mutable_slave_id()->CopyFrom(offer.slave_id());
-    message->mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID);
+    message->mutable_agent_id()->CopyFrom(offer.agent_id());
+    message->mutable_executor_id()->CopyFrom(DEFAULT_V1_EXECUTOR_ID);
     message->set_data("hello world");
 
     mesos.send(call);
@@ -1019,9 +1019,9 @@ TEST_F(SchedulerTest, Request)
   EXPECT_CALL(callbacks, connected())
     .WillOnce(FutureSatisfy(&connected));
 
-  scheduler::Mesos mesos(
+  Mesos mesos(
       master.get(),
-      DEFAULT_CREDENTIAL,
+      DEFAULT_V1_CREDENTIAL,
       lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
       lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
@@ -1038,7 +1038,7 @@ TEST_F(SchedulerTest, Request)
     call.set_type(Call::SUBSCRIBE);
 
     Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
 
     mesos.send(call);
   }
@@ -1047,7 +1047,7 @@ TEST_F(SchedulerTest, Request)
   AWAIT_READY(event);
   EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().subscribed().framework_id());
+  v1::FrameworkID id(event.get().subscribed().framework_id());
 
   Future<Nothing> requestResources =
     FUTURE_DISPATCH(_, &MesosAllocatorProcess::requestResources);
@@ -1073,326 +1073,6 @@ TEST_F(SchedulerTest, Request)
 // TODO(benh): Write test for sending Call::Acknowledgement through
 // master to slave when Event::Update was generated locally.
 
-
-class MesosSchedulerDriverTest : public MesosTest {};
-
-
-TEST_F(MesosSchedulerDriverTest, MetricsEndpoint)
-{
-  Try<PID<Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  Future<Nothing> registered;
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .WillOnce(FutureSatisfy(&registered));
-
-  ASSERT_EQ(DRIVER_RUNNING, driver.start());
-
-  AWAIT_READY(registered);
-
-  Future<process::http::Response> response =
-    process::http::get(MetricsProcess::instance()->self(), "/snapshot");
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-
-  EXPECT_SOME_EQ(
-      "application/json",
-      response.get().headers.get("Content-Type"));
-
-  Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
-
-  ASSERT_SOME(parse);
-
-  JSON::Object metrics = parse.get();
-
-  EXPECT_EQ(1u, metrics.values.count("scheduler/event_queue_messages"));
-  EXPECT_EQ(1u, metrics.values.count("scheduler/event_queue_dispatches"));
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-}
-
-
-// This action calls driver stop() followed by abort().
-ACTION(StopAndAbort)
-{
-  arg0->stop();
-  arg0->abort();
-}
-
-
-// This test verifies that when the scheduler calls stop() before
-// abort(), no pending acknowledgements are sent.
-TEST_F(MesosSchedulerDriverTest, DropAckIfStopCalledBeforeAbort)
-{
-  Try<PID<Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-  Try<PID<Slave>> slave = StartSlave(&containerizer);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 16, "*"))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  // When an update is received, stop the driver and then abort it.
-  Future<Nothing> statusUpdate;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(DoAll(StopAndAbort(),
-                    FutureSatisfy(&statusUpdate)));
-
-  // Ensure no status update acknowledgements are sent from the driver
-  // to the master.
-  EXPECT_NO_FUTURE_CALLS(
-      mesos::scheduler::Call(),
-      mesos::scheduler::Call::ACKNOWLEDGE,
-      _ ,
-      master.get());
-
-  EXPECT_CALL(exec, registered(_, _, _, _));
-
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  driver.start();
-
-  AWAIT_READY(statusUpdate);
-
-  // Settle the clock to ensure driver finishes processing the status
-  // update and sends acknowledgement if necessary. In this test it
-  // shouldn't send an acknowledgement.
-  Clock::pause();
-  Clock::settle();
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-}
-
-
-// Ensures that when a scheduler enables explicit acknowledgements
-// on the driver, there are no implicit acknowledgements sent, and
-// the call to 'acknowledgeStatusUpdate' sends the ack to the master.
-TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements)
-{
-  Try<PID<Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-  Try<PID<Slave>> slave = StartSlave(&containerizer);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), false, DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 16, "*"))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  // Ensure no status update acknowledgements are sent from the driver
-  // to the master until the explicit acknowledgement is sent.
-  EXPECT_NO_FUTURE_CALLS(
-      mesos::scheduler::Call(),
-      mesos::scheduler::Call::ACKNOWLEDGE,
-      _ ,
-      master.get());
-
-  EXPECT_CALL(exec, registered(_, _, _, _));
-
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  driver.start();
-
-  AWAIT_READY(status);
-
-  // Settle the clock to ensure driver finishes processing the status
-  // update, we want to ensure that no implicit acknowledgement gets
-  // sent.
-  Clock::pause();
-  Clock::settle();
-
-  // Now send the acknowledgement.
-  Future<mesos::scheduler::Call> acknowledgement = FUTURE_CALL(
-      mesos::scheduler::Call(),
-      mesos::scheduler::Call::ACKNOWLEDGE,
-      _,
-      master.get());
-
-  driver.acknowledgeStatusUpdate(status.get());
-
-  AWAIT_READY(acknowledgement);
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-}
-
-
-// This test ensures that when explicit acknowledgements are enabled,
-// acknowledgements for master-generated updates are dropped by the
-// driver. We test this by creating an invalid task that uses no
-// resources.
-TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsMasterGeneratedUpdate)
-{
-  Try<PID<Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  Try<PID<Slave>> slave = StartSlave();
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), false, DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  // Ensure no status update acknowledgements are sent to the master.
-  EXPECT_NO_FUTURE_CALLS(
-      mesos::scheduler::Call(),
-      mesos::scheduler::Call::ACKNOWLEDGE,
-      _ ,
-      master.get());
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  EXPECT_NE(0u, offers.get().size());
-
-  // Launch a task using no resources.
-  TaskInfo task;
-  task.set_name("");
-  task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
-  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-
-  vector<TaskInfo> tasks;
-  tasks.push_back(task);
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  driver.launchTasks(offers.get()[0].id(), tasks);
-
-  AWAIT_READY(status);
-  ASSERT_EQ(TASK_ERROR, status.get().state());
-  ASSERT_EQ(TaskStatus::SOURCE_MASTER, status.get().source());
-  ASSERT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
-
-  // Now send the acknowledgement.
-  driver.acknowledgeStatusUpdate(status.get());
-
-  // Settle the clock to ensure driver processes the acknowledgement,
-  // which should get dropped due to having come from the master.
-  Clock::pause();
-  Clock::settle();
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-}
-
-
-// This test ensures that the driver handles an empty slave id
-// in an acknowledgement message by dropping it. The driver will
-// log an error in this case (but we don't test for that). We
-// generate a status with no slave id by performing reconciliation.
-TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsUnsetSlaveID)
-{
-  Try<PID<Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), false, DEFAULT_CREDENTIAL);
-
-  Future<Nothing> registered;
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .WillOnce(FutureSatisfy(&registered));
-
-  // Ensure no status update acknowledgements are sent to the master.
-  EXPECT_NO_FUTURE_CALLS(
-      mesos::scheduler::Call(),
-      mesos::scheduler::Call::ACKNOWLEDGE,
-      _ ,
-      master.get());
-
-  driver.start();
-
-  AWAIT_READY(registered);
-
-  Future<TaskStatus> update;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&update));
-
-  // Peform reconciliation without using a slave id.
-  vector<TaskStatus> statuses;
-
-  TaskStatus status;
-  status.mutable_task_id()->set_value("foo");
-  status.set_state(TASK_RUNNING);
-
-  statuses.push_back(status);
-
-  driver.reconcileTasks(statuses);
-
-  AWAIT_READY(update);
-  ASSERT_EQ(TASK_LOST, update.get().state());
-  ASSERT_EQ(TaskStatus::SOURCE_MASTER, update.get().source());
-  ASSERT_EQ(TaskStatus::REASON_RECONCILIATION, update.get().reason());
-  ASSERT_FALSE(update.get().has_slave_id());
-
-  // Now send the acknowledgement.
-  driver.acknowledgeStatusUpdate(update.get());
-
-  // Settle the clock to ensure driver processes the acknowledgement,
-  // which should get dropped due to the missing slave id.
-  Clock::pause();
-  Clock::settle();
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-}
-
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index cb5a01e..d55e9dd 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -29,8 +29,6 @@
 #include <mesos/executor.hpp>
 #include <mesos/scheduler.hpp>
 
-#include <mesos/scheduler/scheduler.hpp>
-
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>

http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/v1/attributes.cpp
----------------------------------------------------------------------
diff --git a/src/v1/attributes.cpp b/src/v1/attributes.cpp
new file mode 100644
index 0000000..37085be
--- /dev/null
+++ b/src/v1/attributes.cpp
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <mesos/v1/attributes.hpp>
+#include <mesos/v1/values.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/strings.hpp>
+
+using std::ostream;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace v1 {
+
+std::ostream& operator << (std::ostream& stream, const Attribute& attribute)
+{
+  stream << attribute.name() << "=";
+  switch (attribute.type()) {
+    case Value::SCALAR: stream << attribute.scalar(); break;
+    case Value::RANGES: stream << attribute.ranges(); break;
+    case Value::SET:    stream << attribute.set();    break;
+    case Value::TEXT:   stream << attribute.text();   break;
+    default:
+      LOG(FATAL) << "Unexpected Value type: " << attribute.type();
+      break;
+  }
+
+  return stream;
+}
+
+
+bool Attributes::operator == (const Attributes& that) const
+{
+  if (size() != that.size()) {
+    return false;
+  }
+
+  foreach (const Attribute& attribute, attributes) {
+    Option<Attribute> maybeAttribute = that.get(attribute);
+    if (maybeAttribute.isNone()) {
+        return false;
+    }
+    const Attribute& thatAttribute = maybeAttribute.get();
+    switch (attribute.type()) {
+    case Value::SCALAR:
+      if (!(attribute.scalar() == thatAttribute.scalar())) {
+        return false;
+      }
+      break;
+    case Value::RANGES:
+      if (!(attribute.ranges() == thatAttribute.ranges())) {
+        return false;
+      }
+      break;
+    case Value::TEXT:
+      if (!(attribute.text() == thatAttribute.text())) {
+        return false;
+      }
+      break;
+    case Value::SET:
+      LOG(FATAL) << "Sets not supported for attributes";
+    }
+  }
+
+  return true;
+}
+
+
+const Option<Attribute> Attributes::get(const Attribute& thatAttribute) const
+{
+  foreach (const Attribute& attribute, attributes) {
+    if (attribute.name() == thatAttribute.name() &&
+        attribute.type() == thatAttribute.type()) {
+      return attribute;
+    }
+  }
+
+  return None();
+}
+
+
+Attribute Attributes::parse(const string& name, const string& text)
+{
+  Attribute attribute;
+  Try<Value> result = internal::values::parse(text);
+
+  if (result.isError()) {
+    LOG(FATAL) << "Failed to parse attribute " << name
+               << " text " << text
+               << " error " << result.error();
+  } else {
+    Value value = result.get();
+    attribute.set_name(name);
+
+    if (value.type() == Value::RANGES) {
+      attribute.set_type(Value::RANGES);
+      attribute.mutable_ranges()->MergeFrom(value.ranges());
+    } else if (value.type() == Value::TEXT) {
+      attribute.set_type(Value::TEXT);
+      attribute.mutable_text()->MergeFrom(value.text());
+    } else if (value.type() == Value::SCALAR) {
+      attribute.set_type(Value::SCALAR);
+      attribute.mutable_scalar()->MergeFrom(value.scalar());
+    } else {
+      LOG(FATAL) << "Bad type for attribute " << name
+                 << " text " << text
+                 << " type " << value.type();
+    }
+  }
+
+  return attribute;
+}
+
+
+Attributes Attributes::parse(const string& s)
+{
+  // Tokenize and parse the value of "attributes".
+  Attributes attributes;
+
+  vector<string> tokens = strings::tokenize(s, ";\n");
+
+  for (size_t i = 0; i < tokens.size(); i++) {
+    const vector<string>& pairs = strings::split(tokens[i], ":", 2);
+    if (pairs.size() != 2 || pairs[0].empty() || pairs[1].empty()) {
+      LOG(FATAL) << "Invalid attribute key:value pair '" << tokens[i] << "'";
+    }
+
+    attributes.add(parse(pairs[0], pairs[1]));
+  }
+
+  return attributes;
+}
+
+
+bool Attributes::isValid(const Attribute& attribute)
+{
+  if (!attribute.has_name() ||
+      attribute.name() == "" ||
+      !attribute.has_type() ||
+      !Value::Type_IsValid(attribute.type())) {
+    return false;
+    }
+
+  if (attribute.type() == Value::SCALAR) {
+    return attribute.has_scalar();
+  } else if (attribute.type() == Value::RANGES) {
+    return attribute.has_ranges();
+  } else if (attribute.type() == Value::TEXT) {
+    return attribute.has_text();
+  } else if (attribute.type() == Value::SET) {
+    // Attributes doesn't support set.
+    return false;
+  }
+
+    return false;
+}
+
+
+template <>
+Value::Scalar Attributes::get(
+    const string& name,
+    const Value::Scalar& scalar) const
+{
+  foreach (const Attribute& attribute, attributes) {
+    if (attribute.name() == name &&
+        attribute.type() == Value::SCALAR) {
+      return attribute.scalar();
+    }
+  }
+
+  return scalar;
+}
+
+
+template <>
+Value::Ranges Attributes::get(
+    const string& name,
+    const Value::Ranges& ranges) const
+{
+  foreach (const Attribute& attribute, attributes) {
+    if (attribute.name() == name &&
+        attribute.type() == Value::RANGES) {
+      return attribute.ranges();
+    }
+  }
+
+  return ranges;
+}
+
+
+template <>
+Value::Text Attributes::get(
+    const string& name,
+    const Value::Text& text) const
+{
+  foreach (const Attribute& attribute, attributes) {
+    if (attribute.name() == name &&
+        attribute.type() == Value::TEXT) {
+      return attribute.text();
+    }
+  }
+
+  return text;
+}
+
+} // namespace v1 {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/v1/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/v1/mesos.cpp b/src/v1/mesos.cpp
new file mode 100644
index 0000000..e42d04c
--- /dev/null
+++ b/src/v1/mesos.cpp
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <mesos/v1/attributes.hpp>
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+
+namespace mesos {
+namespace v1 {
+
+// TODO(vinod): Ensure that these operators do not go out of sync
+// when new fields are added to the protobufs (MESOS-2487).
+
+bool operator == (const CommandInfo& left, const CommandInfo& right)
+{
+  if (left.uris().size() != right.uris().size()) {
+    return false;
+  }
+
+  // TODO(vinod): Factor out the comparison for repeated fields.
+  for (int i = 0; i < left.uris().size(); i++) {
+    bool found = false;
+    for (int j = 0; j < right.uris().size(); j++) {
+      if (left.uris().Get(i) == right.uris().Get(j)) {
+        found = true;
+        break;
+      }
+    }
+    if (!found) {
+      return false;
+    }
+  }
+
+  if (left.arguments().size() != right.arguments().size()) {
+    return false;
+  }
+
+  // The order of argv is important.
+  for (int i = 0; i < left.arguments().size(); i++) {
+    if (left.arguments().Get(i) != right.arguments().Get(i)) {
+      return false;
+    }
+  }
+
+  // NOTE: We are not validating CommandInfo::ContainerInfo here
+  // because it is being deprecated in favor of ContainerInfo.
+  // TODO(vinod): Kill the above comment when
+  // CommandInfo::ContainerInfo is removed.
+  return left.environment() == right.environment() &&
+    left.value() == right.value() &&
+    left.user() == right.user() &&
+    left.shell() == right.shell();
+}
+
+
+bool operator == (const CommandInfo::URI& left, const CommandInfo::URI& right)
+{
+  return left.value() == right.value() &&
+    left.executable() == right.executable() &&
+    left.extract() == right.extract();
+}
+
+
+bool operator == (const Credential& left, const Credential& right)
+{
+  return left.principal() == right.principal() &&
+    left.secret() == right.secret();
+}
+
+
+bool operator == (
+    const Environment::Variable& left,
+    const Environment::Variable& right)
+{
+  return left.name() == right.name() && left.value() == right.value();
+}
+
+
+bool operator == (const Environment& left, const Environment& right)
+{
+  // Order of variables is not important.
+  if (left.variables().size() != right.variables().size()) {
+    return false;
+  }
+
+  for (int i = 0; i < left.variables().size(); i++) {
+    bool found = false;
+    for (int j = 0; j < right.variables().size(); j++) {
+      if (left.variables().Get(i) == right.variables().Get(j)) {
+        found = true;
+        break;
+      }
+    }
+    if (!found) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+
+bool operator == (const Volume& left, const Volume& right)
+{
+  return left.container_path() == right.container_path() &&
+    left.host_path() == right.host_path() &&
+    left.mode() == right.mode();
+}
+
+
+// TODO(bmahler): Leverage process::http::URL for equality.
+bool operator == (const URL& left, const URL& right)
+{
+  return left.SerializeAsString() == right.SerializeAsString();
+}
+
+
+bool operator == (
+    const ContainerInfo::DockerInfo::PortMapping& left,
+    const ContainerInfo::DockerInfo::PortMapping& right)
+{
+  return left.host_port() == right.host_port() &&
+    left.container_port() == right.container_port() &&
+    left.protocol() == right.protocol();
+}
+
+
+bool operator == (const Parameter& left, const Parameter& right)
+{
+  return left.key() == right.key() && left.value() == right.value();
+}
+
+
+bool operator == (
+    const ContainerInfo::DockerInfo& left,
+    const ContainerInfo::DockerInfo& right)
+{
+  // Order of port mappings is not important.
+  if (left.port_mappings().size() != right.port_mappings().size()) {
+    return false;
+  }
+
+  for (int i = 0; i < left.port_mappings().size(); i++) {
+    bool found = false;
+    for (int j = 0; j < right.port_mappings().size(); j++) {
+      if (left.port_mappings().Get(i) == right.port_mappings().Get(j)) {
+        found = true;
+        break;
+      }
+    }
+    if (!found) {
+      return false;
+    }
+  }
+
+  // Order of parameters is not important.
+  if (left.parameters().size() != right.parameters().size()) {
+    return false;
+  }
+
+  for (int i = 0; i < left.parameters().size(); i++) {
+    bool found = false;
+    for (int j = 0; j < right.parameters().size(); j++) {
+      if (left.parameters().Get(i) == right.parameters().Get(j)) {
+        found = true;
+        break;
+      }
+    }
+    if (!found) {
+      return false;
+    }
+  }
+
+  return left.image() == right.image() &&
+    left.network() == right.network() &&
+    left.privileged() == right.privileged() &&
+    left.force_pull_image() == right.force_pull_image();
+}
+
+
+bool operator == (const ContainerInfo& left, const ContainerInfo& right)
+{
+  // Order of volumes is not important.
+  if (left.volumes().size() != right.volumes().size()) {
+    return false;
+  }
+
+  for (int i = 0; i < left.volumes().size(); i++) {
+    bool found = false;
+    for (int j = 0; j < right.volumes().size(); j++) {
+      if (left.volumes().Get(i) == right.volumes().Get(j)) {
+        found = true;
+        break;
+      }
+    }
+    if (!found) {
+      return false;
+    }
+  }
+
+  return left.type() == right.type() &&
+    left.hostname() == right.hostname() &&
+    left.docker() == right.docker();
+}
+
+
+bool operator == (const Port& left, const Port& right)
+{
+  return left.number() == right.number() &&
+    left.name() == right.name() &&
+    left.protocol() == right.protocol();
+}
+
+
+bool operator == (const Ports& left, const Ports& right)
+{
+  // Order of ports is not important.
+  if (left.ports().size() != right.ports().size()) {
+    return false;
+  }
+
+  for (int i = 0; i < left.ports().size(); i++) {
+    bool found = false;
+    for (int j = 0; j < right.ports().size(); j++) {
+      if (left.ports().Get(i) == right.ports().Get(j)) {
+        found = true;
+        break;
+      }
+    }
+    if (!found) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+
+bool operator == (const Label& left, const Label& right)
+{
+  return left.key() == right.key() && left.value() == right.value();
+}
+
+
+bool operator == (const Labels& left, const Labels& right)
+{
+  // Order of labels is not important.
+  if (left.labels().size() != right.labels().size()) {
+    return false;
+  }
+
+  for (int i = 0; i < left.labels().size(); i++) {
+    bool found = false;
+    for (int j = 0; j < right.labels().size(); j++) {
+      if (left.labels().Get(i) == right.labels().Get(j)) {
+        found = true;
+        break;
+      }
+    }
+    if (!found) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+
+bool operator == (const DiscoveryInfo& left, const DiscoveryInfo& right)
+{
+  return left.visibility() == right.visibility() &&
+    left.name() == right.name() &&
+    left.environment() == right.environment() &&
+    left.location() == right.location() &&
+    left.version() == right.version() &&
+    left.ports() == right.ports() &&
+    left.labels() == right.labels();
+}
+
+
+bool operator == (const ExecutorInfo& left, const ExecutorInfo& right)
+{
+  return left.executor_id() == right.executor_id() &&
+    left.data() == right.data() &&
+    Resources(left.resources()) == Resources(right.resources()) &&
+    left.command() == right.command() &&
+    left.framework_id() == right.framework_id() &&
+    left.name() == right.name() &&
+    left.source() == right.source() &&
+    left.container() == right.container() &&
+    left.discovery() == right.discovery();
+}
+
+
+bool operator == (const MasterInfo& left, const MasterInfo& right)
+{
+  return left.id() == right.id() &&
+    left.ip() == right.ip() &&
+    left.port() == right.port() &&
+    left.pid() == right.pid() &&
+    left.hostname() == right.hostname() &&
+    left.version() == right.version();
+}
+
+
+bool operator == (
+    const ResourceStatistics& left,
+    const ResourceStatistics& right)
+{
+  return left.SerializeAsString() == right.SerializeAsString();
+}
+
+
+bool operator == (const AgentInfo& left, const AgentInfo& right)
+{
+  return left.hostname() == right.hostname() &&
+    Resources(left.resources()) == Resources(right.resources()) &&
+    Attributes(left.attributes()) == Attributes(right.attributes()) &&
+    left.id() == right.id() &&
+    left.port() == right.port();
+}
+
+
+// TODO(bmahler): Use SerializeToString here?
+bool operator == (const TaskStatus& left, const TaskStatus& right)
+{
+  return left.task_id() == right.task_id() &&
+    left.state() == right.state() &&
+    left.data() == right.data() &&
+    left.message() == right.message() &&
+    left.agent_id() == right.agent_id() &&
+    left.timestamp() == right.timestamp() &&
+    left.executor_id() == right.executor_id() &&
+    left.healthy() == right.healthy() &&
+    left.source() == right.source() &&
+    left.reason() == right.reason() &&
+    left.uuid() == right.uuid();
+}
+
+
+bool operator != (const TaskStatus& left, const TaskStatus& right)
+{
+  return !(left == right);
+}
+
+
+} // namespace v1 {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/v1/resources.cpp
----------------------------------------------------------------------
diff --git a/src/v1/resources.cpp b/src/v1/resources.cpp
new file mode 100644
index 0000000..fd8df51
--- /dev/null
+++ b/src/v1/resources.cpp
@@ -0,0 +1,1264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdint.h>
+
+#include <set>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/values.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/lambda.hpp>
+#include <stout/strings.hpp>
+
+using std::map;
+using std::ostream;
+using std::set;
+using std::string;
+using std::vector;
+
+
+namespace mesos {
+namespace v1 {
+
+/////////////////////////////////////////////////
+// Helper functions.
+/////////////////////////////////////////////////
+
+bool operator == (
+    const Resource::ReservationInfo& left,
+    const Resource::ReservationInfo& right)
+{
+  return left.principal() == right.principal();
+}
+
+
+bool operator != (
+    const Resource::ReservationInfo& left,
+    const Resource::ReservationInfo& right)
+{
+  return !(left == right);
+}
+
+
+bool operator == (
+    const Resource::DiskInfo& left,
+    const Resource::DiskInfo& right)
+{
+  // NOTE: We ignore 'volume' inside DiskInfo when doing comparison
+  // because it describes how this resource will be used which has
+  // nothing to do with the Resource object itself. A framework can
+  // use this resource and specify different 'volume' every time it
+  // uses it.
+  if (left.has_persistence() != right.has_persistence()) {
+    return false;
+  }
+
+  if (left.has_persistence()) {
+    return left.persistence().id() == right.persistence().id();
+  }
+
+  return true;
+}
+
+
+bool operator != (
+    const Resource::DiskInfo& left,
+    const Resource::DiskInfo& right)
+{
+  return !(left == right);
+}
+
+
+bool operator == (const Resource& left, const Resource& right)
+{
+  if (left.name() != right.name() ||
+      left.type() != right.type() ||
+      left.role() != right.role()) {
+    return false;
+  }
+
+  // Check ReservationInfo.
+  if (left.has_reservation() != right.has_reservation()) {
+    return false;
+  }
+
+  if (left.has_reservation() && left.reservation() != right.reservation()) {
+    return false;
+  }
+
+  // Check DiskInfo.
+  if (left.has_disk() != right.has_disk()) {
+    return false;
+  }
+
+  if (left.has_disk() && left.disk() != right.disk()) {
+    return false;
+  }
+
+  // Check RevocableInfo.
+  if (left.has_revocable() != right.has_revocable()) {
+    return false;
+  }
+
+  if (left.type() == Value::SCALAR) {
+    return left.scalar() == right.scalar();
+  } else if (left.type() == Value::RANGES) {
+    return left.ranges() == right.ranges();
+  } else if (left.type() == Value::SET) {
+    return left.set() == right.set();
+  } else {
+    return false;
+  }
+}
+
+
+bool operator != (const Resource& left, const Resource& right)
+{
+  return !(left == right);
+}
+
+
+namespace internal {
+
+// Tests if we can add two Resource objects together resulting in one
+// valid Resource object. For example, two Resource objects with
+// different name, type or role are not addable.
+static bool addable(const Resource& left, const Resource& right)
+{
+  if (left.name() != right.name() ||
+      left.type() != right.type() ||
+      left.role() != right.role()) {
+    return false;
+  }
+
+  // Check ReservationInfo.
+  if (left.has_reservation() != right.has_reservation()) {
+    return false;
+  }
+
+  if (left.has_reservation() && left.reservation() != right.reservation()) {
+    return false;
+  }
+
+  // Check DiskInfo.
+  if (left.has_disk() != right.has_disk()) {
+    return false;
+  }
+
+  if (left.has_disk() && left.disk() != right.disk()) {
+    return false;
+  }
+
+  // TODO(jieyu): Even if two Resource objects with DiskInfo have the
+  // same persistence ID, they cannot be added together. In fact, this
+  // shouldn't happen if we do not add resources from different
+  // namespaces (e.g., across slave). Consider adding a warning.
+  if (left.has_disk() && left.disk().has_persistence()) {
+    return false;
+  }
+
+  // Check RevocableInfo.
+  if (left.has_revocable() != right.has_revocable()) {
+    return false;
+  }
+
+  return true;
+}
+
+
+// Tests if we can subtract "right" from "left" resulting in one valid
+// Resource object. For example, two Resource objects with different
+// name, type or role are not subtractable.
+// NOTE: Set subtraction is always well defined, it does not require
+// 'right' to be contained within 'left'. For example, assuming that
+// "left = {1, 2}" and "right = {2, 3}", "left" and "right" are
+// subtractable because "left - right = {1}". However, "left" does not
+// contain "right".
+static bool subtractable(const Resource& left, const Resource& right)
+{
+  if (left.name() != right.name() ||
+      left.type() != right.type() ||
+      left.role() != right.role()) {
+    return false;
+  }
+
+  // Check ReservationInfo.
+  if (left.has_reservation() != right.has_reservation()) {
+    return false;
+  }
+
+  if (left.has_reservation() && left.reservation() != right.reservation()) {
+    return false;
+  }
+
+  // Check DiskInfo.
+  if (left.has_disk() != right.has_disk()) {
+    return false;
+  }
+
+  if (left.has_disk() && left.disk() != right.disk()) {
+    return false;
+  }
+
+  // NOTE: For Resource objects that have DiskInfo, we can only do
+  // subtraction if they are equal.
+  if (left.has_disk() && left.disk().has_persistence() && left != right) {
+    return false;
+  }
+
+  // Check RevocableInfo.
+  if (left.has_revocable() != right.has_revocable()) {
+    return false;
+  }
+
+  return true;
+}
+
+
+// Tests if "right" is contained in "left".
+static bool contains(const Resource& left, const Resource& right)
+{
+  // NOTE: This is a necessary condition for 'contains'.
+  // 'subtractable' will verify name, role, type, ReservationInfo,
+  // DiskInfo and RevocableInfo compatibility.
+  if (!subtractable(left, right)) {
+    return false;
+  }
+
+  if (left.type() == Value::SCALAR) {
+    return right.scalar() <= left.scalar();
+  } else if (left.type() == Value::RANGES) {
+    return right.ranges() <= left.ranges();
+  } else if (left.type() == Value::SET) {
+    return right.set() <= left.set();
+  } else {
+    return false;
+  }
+}
+
+} // namespace internal {
+
+
+Resource& operator += (Resource& left, const Resource& right)
+{
+  if (left.type() == Value::SCALAR) {
+    *left.mutable_scalar() += right.scalar();
+  } else if (left.type() == Value::RANGES) {
+    *left.mutable_ranges() += right.ranges();
+  } else if (left.type() == Value::SET) {
+    *left.mutable_set() += right.set();
+  }
+
+  return left;
+}
+
+
+Resource operator + (const Resource& left, const Resource& right)
+{
+  Resource result = left;
+  result += right;
+  return result;
+}
+
+
+Resource& operator -= (Resource& left, const Resource& right)
+{
+  if (left.type() == Value::SCALAR) {
+    *left.mutable_scalar() -= right.scalar();
+  } else if (left.type() == Value::RANGES) {
+    *left.mutable_ranges() -= right.ranges();
+  } else if (left.type() == Value::SET) {
+    *left.mutable_set() -= right.set();
+  }
+
+  return left;
+}
+
+
+Resource operator - (const Resource& left, const Resource& right)
+{
+  Resource result = left;
+  result -= right;
+  return result;
+}
+
+
+/////////////////////////////////////////////////
+// Public static functions.
+/////////////////////////////////////////////////
+
+
+Try<Resource> Resources::parse(
+    const string& name,
+    const string& value,
+    const string& role)
+{
+  Try<Value> result = internal::values::parse(value);
+  if (result.isError()) {
+    return Error(
+        "Failed to parse resource " + name +
+        " value " + value + " error " + result.error());
+  }
+
+  Resource resource;
+
+  Value _value = result.get();
+  resource.set_name(name);
+  resource.set_role(role);
+
+  if (_value.type() == Value::SCALAR) {
+    resource.set_type(Value::SCALAR);
+    resource.mutable_scalar()->CopyFrom(_value.scalar());
+  } else if (_value.type() == Value::RANGES) {
+    resource.set_type(Value::RANGES);
+    resource.mutable_ranges()->CopyFrom(_value.ranges());
+  } else if (_value.type() == Value::SET) {
+    resource.set_type(Value::SET);
+    resource.mutable_set()->CopyFrom(_value.set());
+  } else {
+    return Error(
+        "Bad type for resource " + name + " value " + value +
+        " type " + Value::Type_Name(_value.type()));
+  }
+
+  return resource;
+}
+
+
+// TODO(wickman) It is possible for Resources::ostream<< to produce
+// unparseable resources, i.e.  those with
+// ReservationInfo/DiskInfo/RevocableInfo.
+Try<Resources> Resources::parse(
+    const string& text,
+    const string& defaultRole)
+{
+  Resources resources;
+  hashmap<string, Value_Type> nameTypes;
+
+  foreach (const string& token, strings::tokenize(text, ";")) {
+    vector<string> pair = strings::tokenize(token, ":");
+    if (pair.size() != 2) {
+      return Error("Bad value for resources, missing or extra ':' in " + 
token);
+    }
+
+    string name;
+    string role;
+    size_t openParen = pair[0].find("(");
+    if (openParen == string::npos) {
+      name = strings::trim(pair[0]);
+      role = defaultRole;
+    } else {
+      size_t closeParen = pair[0].find(")");
+      if (closeParen == string::npos || closeParen < openParen) {
+        return Error(
+            "Bad value for resources, mismatched parentheses in " + token);
+      }
+
+      name = strings::trim(pair[0].substr(0, openParen));
+
+      role = strings::trim(pair[0].substr(
+          openParen + 1,
+          closeParen - openParen - 1));
+    }
+
+    Try<Resource> resource = Resources::parse(name, pair[1], role);
+    if (resource.isError()) {
+      return Error(resource.error());
+    }
+
+    if (nameTypes.contains(name) && nameTypes[name] != resource.get().type()) {
+      return Error(
+          "Resources with the same name ('" + name + "') but different types "
+          "are not allowed");
+    } else if (!nameTypes.contains(name)) {
+      nameTypes[name] = resource.get().type();
+    }
+
+    resources += resource.get();
+  }
+
+  return resources;
+}
+
+
+Option<Error> Resources::validate(const Resource& resource)
+{
+  if (resource.name().empty()) {
+    return Error("Empty resource name");
+  }
+
+  if (!Value::Type_IsValid(resource.type())) {
+    return Error("Invalid resource type");
+  }
+
+  if (resource.type() == Value::SCALAR) {
+    if (!resource.has_scalar() ||
+        resource.has_ranges() ||
+        resource.has_set()) {
+      return Error("Invalid scalar resource");
+    }
+
+    if (resource.scalar().value() < 0) {
+      return Error("Invalid scalar resource: value < 0");
+    }
+  } else if (resource.type() == Value::RANGES) {
+    if (resource.has_scalar() ||
+        !resource.has_ranges() ||
+        resource.has_set()) {
+      return Error("Invalid ranges resource");
+    }
+
+    for (int i = 0; i < resource.ranges().range_size(); i++) {
+      const Value::Range& range = resource.ranges().range(i);
+
+      // Ensure the range make sense (isn't inverted).
+      if (range.begin() > range.end()) {
+        return Error("Invalid ranges resource: begin > end");
+      }
+
+      // Ensure ranges don't overlap (but not necessarily coalesced).
+      for (int j = i + 1; j < resource.ranges().range_size(); j++) {
+        if (range.begin() <= resource.ranges().range(j).begin() &&
+            resource.ranges().range(j).begin() <= range.end()) {
+          return Error("Invalid ranges resource: overlapping ranges");
+        }
+      }
+    }
+  } else if (resource.type() == Value::SET) {
+    if (resource.has_scalar() ||
+        resource.has_ranges() ||
+        !resource.has_set()) {
+      return Error("Invalid set resource");
+    }
+
+    for (int i = 0; i < resource.set().item_size(); i++) {
+      const string& item = resource.set().item(i);
+
+      // Ensure no duplicates.
+      for (int j = i + 1; j < resource.set().item_size(); j++) {
+        if (item == resource.set().item(j)) {
+          return Error("Invalid set resource: duplicated elements");
+        }
+      }
+    }
+  } else {
+    // Resource doesn't support TEXT or other value types.
+    return Error("Unsupported resource type");
+  }
+
+  // Checks for 'disk' resource.
+  if (resource.has_disk() && resource.name() != "disk") {
+    return Error(
+        "DiskInfo should not be set for " + resource.name() + " resource");
+  }
+
+  // Checks for the invalid state of (role, reservation) pair.
+  if (resource.role() == "*" && resource.has_reservation()) {
+    return Error(
+        "Invalid reservation: role \"*\" cannot be dynamically reserved");
+  }
+
+  return None();
+}
+
+
+Option<Error> Resources::validate(
+    const google::protobuf::RepeatedPtrField<Resource>& resources)
+{
+  foreach (const Resource& resource, resources) {
+    Option<Error> error = validate(resource);
+    if (error.isSome()) {
+      return Error(
+          "Resource '" + stringify(resource) +
+          "' is invalid: " + error.get().message);
+    }
+  }
+
+  return None();
+}
+
+
+bool Resources::isEmpty(const Resource& resource)
+{
+  if (resource.type() == Value::SCALAR) {
+    return resource.scalar().value() == 0;
+  } else if (resource.type() == Value::RANGES) {
+    return resource.ranges().range_size() == 0;
+  } else if (resource.type() == Value::SET) {
+    return resource.set().item_size() == 0;
+  } else {
+    return false;
+  }
+}
+
+
+bool Resources::isPersistentVolume(const Resource& resource)
+{
+  return resource.has_disk() && resource.disk().has_persistence();
+}
+
+
+bool Resources::isReserved(
+    const Resource& resource,
+    const Option<std::string>& role)
+{
+  if (role.isSome()) {
+    return !isUnreserved(resource) && role.get() == resource.role();
+  } else {
+    return !isUnreserved(resource);
+  }
+}
+
+
+bool Resources::isUnreserved(const Resource& resource)
+{
+  return resource.role() == "*" && !resource.has_reservation();
+}
+
+
+bool Resources::isDynamicallyReserved(const Resource& resource)
+{
+  return resource.has_reservation();
+}
+
+
+bool Resources::isRevocable(const Resource& resource)
+{
+  return resource.has_revocable();
+}
+
+/////////////////////////////////////////////////
+// Public member functions.
+/////////////////////////////////////////////////
+
+
+Resources::Resources(const Resource& resource)
+{
+  // NOTE: Invalid and zero Resource object will be ignored.
+  *this += resource;
+}
+
+
+Resources::Resources(const vector<Resource>& _resources)
+{
+  foreach (const Resource& resource, _resources) {
+    // NOTE: Invalid and zero Resource objects will be ignored.
+    *this += resource;
+  }
+}
+
+
+Resources::Resources(
+    const google::protobuf::RepeatedPtrField<Resource>& _resources)
+{
+  foreach (const Resource& resource, _resources) {
+    // NOTE: Invalid and zero Resource objects will be ignored.
+    *this += resource;
+  }
+}
+
+
+bool Resources::contains(const Resources& that) const
+{
+  Resources remaining = *this;
+
+  foreach (const Resource& resource, that.resources) {
+    // NOTE: We use _contains because Resources only contain valid
+    // Resource objects, and we don't want the performance hit of the
+    // validity check.
+    if (!remaining._contains(resource)) {
+      return false;
+    }
+
+    remaining -= resource;
+  }
+
+  return true;
+}
+
+
+bool Resources::contains(const Resource& that) const
+{
+  // NOTE: We must validate 'that' because invalid resources can lead
+  // to false positives here (e.g., "cpus:-1" will return true). This
+  // is because 'contains' assumes resources are valid.
+  return validate(that).isNone() && _contains(that);
+}
+
+
+Resources Resources::filter(
+    const lambda::function<bool(const Resource&)>& predicate) const
+{
+  Resources result;
+  foreach (const Resource& resource, resources) {
+    if (predicate(resource)) {
+      result += resource;
+    }
+  }
+  return result;
+}
+
+
+hashmap<string, Resources> Resources::reserved() const
+{
+  hashmap<string, Resources> result;
+
+  foreach (const Resource& resource, resources) {
+    if (isReserved(resource)) {
+      result[resource.role()] += resource;
+    }
+  }
+
+  return result;
+}
+
+
+Resources Resources::reserved(const string& role) const
+{
+  return filter(lambda::bind(isReserved, lambda::_1, role));
+}
+
+
+Resources Resources::unreserved() const
+{
+  return filter(isUnreserved);
+}
+
+
+Resources Resources::persistentVolumes() const
+{
+  return filter(isPersistentVolume);
+}
+
+
+Resources Resources::revocable() const
+{
+  return filter(isRevocable);
+}
+
+
+Resources Resources::flatten(
+    const string& role,
+    const Option<Resource::ReservationInfo>& reservation) const
+{
+  Resources flattened;
+
+  foreach (Resource resource, resources) {
+    resource.set_role(role);
+    if (reservation.isNone()) {
+      resource.clear_reservation();
+    } else {
+      resource.mutable_reservation()->CopyFrom(reservation.get());
+    }
+    flattened += resource;
+  }
+
+  return flattened;
+}
+
+
+// A predicate that returns true for any resource.
+static bool any(const Resource&) { return true; }
+
+
+Option<Resources> Resources::find(const Resource& target) const
+{
+  Resources found;
+  Resources total = *this;
+  Resources remaining = Resources(target).flatten();
+
+  // First look in the target role, then unreserved, then any remaining role.
+  // TODO(mpark): Use a lambda for 'any' instead once we get full C++11.
+  vector<lambda::function<bool(const Resource&)>> predicates = {
+    lambda::bind(isReserved, lambda::_1, target.role()),
+    isUnreserved,
+    any
+  };
+
+  foreach (const auto& predicate, predicates) {
+    foreach (const Resource& resource, total.filter(predicate)) {
+      // Need to flatten to ignore the roles in contains().
+      Resources flattened = Resources(resource).flatten();
+
+      if (flattened.contains(remaining)) {
+        // Done!
+        if (!resource.has_reservation()) {
+          return found + remaining.flatten(resource.role());
+        } else {
+          return found +
+                 remaining.flatten(resource.role(), resource.reservation());
+        }
+      } else if (remaining.contains(flattened)) {
+        found += resource;
+        total -= resource;
+        remaining -= flattened;
+        break;
+      }
+    }
+  }
+
+  return None();
+}
+
+
+Option<Resources> Resources::find(const Resources& targets) const
+{
+  Resources total;
+
+  foreach (const Resource& target, targets) {
+    Option<Resources> found = find(target);
+
+    // Each target needs to be found!
+    if (found.isNone()) {
+      return None();
+    }
+
+    total += found.get();
+  }
+
+  return total;
+}
+
+
+Try<Resources> Resources::apply(const Offer::Operation& operation) const
+{
+  Resources result = *this;
+
+  switch (operation.type()) {
+    case Offer::Operation::LAUNCH:
+      // Launch operation does not alter the offered resources.
+      break;
+
+    case Offer::Operation::RESERVE: {
+      Option<Error> error = validate(operation.reserve().resources());
+      if (error.isSome()) {
+        return Error("Invalid RESERVE Operation: " + error.get().message);
+      }
+
+      foreach (const Resource& reserved, operation.reserve().resources()) {
+        if (!Resources::isReserved(reserved)) {
+          return Error("Invalid RESERVE Operation: Resource must be reserved");
+        } else if (!reserved.has_reservation()) {
+          return Error("Invalid RESERVE Operation: Missing 'reservation'");
+        }
+
+        Resources unreserved = Resources(reserved).flatten();
+
+        if (!result.contains(unreserved)) {
+          return Error("Invalid RESERVE Operation: " + stringify(result) +
+                       " does not contain " + stringify(unreserved));
+        }
+
+        result -= unreserved;
+        result += reserved;
+      }
+      break;
+    }
+
+    case Offer::Operation::UNRESERVE: {
+      Option<Error> error = validate(operation.unreserve().resources());
+      if (error.isSome()) {
+        return Error("Invalid UNRESERVE Operation: " + error.get().message);
+      }
+
+      foreach (const Resource& reserved, operation.unreserve().resources()) {
+        if (!Resources::isReserved(reserved)) {
+          return Error("Invalid UNRESERVE Operation: Resource is not 
reserved");
+        } else if (!reserved.has_reservation()) {
+          return Error("Invalid UNRESERVE Operation: Missing 'reservation'");
+        }
+
+        if (!result.contains(reserved)) {
+          return Error("Invalid UNRESERVE Operation: " + stringify(result) +
+                       " does not contain " + stringify(reserved));
+        }
+
+        Resources unreserved = Resources(reserved).flatten();
+
+        result -= reserved;
+        result += unreserved;
+      }
+      break;
+    }
+
+    case Offer::Operation::CREATE: {
+      Option<Error> error = validate(operation.create().volumes());
+      if (error.isSome()) {
+        return Error("Invalid CREATE Operation: " + error.get().message);
+      }
+
+      foreach (const Resource& volume, operation.create().volumes()) {
+        if (!volume.has_disk()) {
+          return Error("Invalid CREATE Operation: Missing 'disk'");
+        } else if (!volume.disk().has_persistence()) {
+          return Error("Invalid CREATE Operation: Missing 'persistence'");
+        }
+
+        // Strip the disk info so that we can subtract it from the
+        // original resources.
+        // TODO(jieyu): Non-persistent volumes are not supported for
+        // now. Persistent volumes can only be be created from regular
+        // disk resources. Revisit this once we start to support
+        // non-persistent volumes.
+        Resource stripped = volume;
+        stripped.clear_disk();
+
+        if (!result.contains(stripped)) {
+          return Error("Invalid CREATE Operation: Insufficient disk 
resources");
+        }
+
+        result -= stripped;
+        result += volume;
+      }
+      break;
+    }
+
+    case Offer::Operation::DESTROY: {
+      Option<Error> error = validate(operation.destroy().volumes());
+      if (error.isSome()) {
+        return Error("Invalid DESTROY Operation: " + error.get().message);
+      }
+
+      foreach (const Resource& volume, operation.destroy().volumes()) {
+        if (!volume.has_disk()) {
+          return Error("Invalid DESTROY Operation: Missing 'disk'");
+        } else if (!volume.disk().has_persistence()) {
+          return Error("Invalid DESTROY Operation: Missing 'persistence'");
+        }
+
+        if (!result.contains(volume)) {
+          return Error(
+              "Invalid DESTROY Operation: Persistent volume does not exist");
+        }
+
+        Resource stripped = volume;
+        stripped.clear_disk();
+
+        result -= volume;
+        result += stripped;
+      }
+      break;
+    }
+
+    default:
+      return Error("Unknown offer operation " + stringify(operation.type()));
+  }
+
+  // This is a sanity check to ensure the amount of each type of
+  // resource does not change.
+  // TODO(jieyu): Currently, we only check known resource types like
+  // cpus, mem, disk, ports, etc. We should generalize this.
+  CHECK(result.cpus() == cpus() &&
+        result.mem() == mem() &&
+        result.disk() == disk() &&
+        result.ports() == ports());
+
+  return result;
+}
+
+
+template <>
+Option<Value::Scalar> Resources::get(const string& name) const
+{
+  Value::Scalar total;
+  bool found = false;
+
+  foreach (const Resource& resource, resources) {
+    if (resource.name() == name &&
+        resource.type() == Value::SCALAR) {
+      total += resource.scalar();
+      found = true;
+    }
+  }
+
+  if (found) {
+    return total;
+  }
+
+  return None();
+}
+
+
+template <>
+Option<Value::Set> Resources::get(const string& name) const
+{
+  Value::Set total;
+  bool found = false;
+
+  foreach (const Resource& resource, resources) {
+    if (resource.name() == name &&
+        resource.type() == Value::SET) {
+      total += resource.set();
+      found = true;
+    }
+  }
+
+  if (found) {
+    return total;
+  }
+
+  return None();
+}
+
+
+template <>
+Option<Value::Ranges> Resources::get(const string& name) const
+{
+  Value::Ranges total;
+  bool found = false;
+
+  foreach (const Resource& resource, resources) {
+    if (resource.name() == name &&
+        resource.type() == Value::RANGES) {
+      total += resource.ranges();
+      found = true;
+    }
+  }
+
+  if (found) {
+    return total;
+  }
+
+  return None();
+}
+
+
+Resources Resources::get(const string& name) const
+{
+  return filter([=](const Resource& resource) {
+    return resource.name() == name;
+  });
+}
+
+
+Resources Resources::scalars() const
+{
+  return filter([=](const Resource& resource) {
+    return resource.type() == Value::SCALAR;
+  });
+}
+
+
+set<string> Resources::names() const
+{
+  set<string> result;
+  foreach(const Resource& resource, resources) {
+    result.insert(resource.name());
+  }
+
+  return result;
+}
+
+
+map<string, Value_Type> Resources::types() const
+{
+  map<string, Value_Type> result;
+  foreach(const Resource& resource, resources) {
+    result[resource.name()] = resource.type();
+  }
+
+  return result;
+}
+
+
+Option<double> Resources::cpus() const
+{
+  Option<Value::Scalar> value = get<Value::Scalar>("cpus");
+  if (value.isSome()) {
+    return value.get().value();
+  } else {
+    return None();
+  }
+}
+
+
+Option<Bytes> Resources::mem() const
+{
+  Option<Value::Scalar> value = get<Value::Scalar>("mem");
+  if (value.isSome()) {
+    return Megabytes(static_cast<uint64_t>(value.get().value()));
+  } else {
+    return None();
+  }
+}
+
+
+Option<Bytes> Resources::disk() const
+{
+  Option<Value::Scalar> value = get<Value::Scalar>("disk");
+  if (value.isSome()) {
+    return Megabytes(static_cast<uint64_t>(value.get().value()));
+  } else {
+    return None();
+  }
+}
+
+
+Option<Value::Ranges> Resources::ports() const
+{
+  Option<Value::Ranges> value = get<Value::Ranges>("ports");
+  if (value.isSome()) {
+    return value.get();
+  } else {
+    return None();
+  }
+}
+
+
+Option<Value::Ranges> Resources::ephemeral_ports() const
+{
+  Option<Value::Ranges> value = get<Value::Ranges>("ephemeral_ports");
+  if (value.isSome()) {
+    return value.get();
+  } else {
+    return None();
+  }
+}
+
+
+bool Resources::_contains(const Resource& that) const
+{
+  foreach (const Resource& resource, resources) {
+    if (internal::contains(resource, that)) {
+      return true;
+    }
+  }
+
+  return false;
+}
+
+
+/////////////////////////////////////////////////
+// Overloaded operators.
+/////////////////////////////////////////////////
+
+
+Resources::operator const google::protobuf::RepeatedPtrField<Resource>& () 
const
+{
+  return resources;
+}
+
+
+bool Resources::operator == (const Resources& that) const
+{
+  return this->contains(that) && that.contains(*this);
+}
+
+
+bool Resources::operator != (const Resources& that) const
+{
+  return !(*this == that);
+}
+
+
+Resources Resources::operator + (const Resource& that) const
+{
+  Resources result = *this;
+  result += that;
+  return result;
+}
+
+
+Resources Resources::operator + (const Resources& that) const
+{
+  Resources result = *this;
+  result += that;
+  return result;
+}
+
+
+Resources& Resources::operator += (const Resource& that)
+{
+  if (validate(that).isNone() && !isEmpty(that)) {
+    bool found = false;
+    foreach (Resource& resource, resources) {
+      if (internal::addable(resource, that)) {
+        resource += that;
+        found = true;
+        break;
+      }
+    }
+
+    // Cannot be combined with any existing Resource object.
+    if (!found) {
+      resources.Add()->CopyFrom(that);
+    }
+  }
+
+  return *this;
+}
+
+
+Resources& Resources::operator += (const Resources& that)
+{
+  foreach (const Resource& resource, that.resources) {
+    *this += resource;
+  }
+
+  return *this;
+}
+
+
+Resources Resources::operator - (const Resource& that) const
+{
+  Resources result = *this;
+  result -= that;
+  return result;
+}
+
+
+Resources Resources::operator - (const Resources& that) const
+{
+  Resources result = *this;
+  result -= that;
+  return result;
+}
+
+
+Resources& Resources::operator -= (const Resource& that)
+{
+  if (validate(that).isNone() && !isEmpty(that)) {
+    for (int i = 0; i < resources.size(); i++) {
+      Resource* resource = resources.Mutable(i);
+
+      if (internal::subtractable(*resource, that)) {
+        *resource -= that;
+
+        // Remove the resource if it becomes invalid or zero. We need
+        // to do the validation because we want to strip negative
+        // scalar Resource object.
+        if (validate(*resource).isSome() || isEmpty(*resource)) {
+          resources.DeleteSubrange(i, 1);
+        }
+
+        break;
+      }
+    }
+  }
+
+  return *this;
+}
+
+
+Resources& Resources::operator -= (const Resources& that)
+{
+  foreach (const Resource& resource, that.resources) {
+    *this -= resource;
+  }
+
+  return *this;
+}
+
+
+ostream& operator << (ostream& stream, const Volume& volume) {
+  string volumeConfig = volume.container_path();
+
+  if (volume.has_host_path()) {
+    volumeConfig = volume.host_path() + ":" + volumeConfig;
+
+    if (volume.has_mode()) {
+      switch (volume.mode()) {
+        case Volume::RW: volumeConfig += ":rw"; break;
+        case Volume::RO: volumeConfig += ":ro"; break;
+        default:
+          LOG(FATAL) << "Unknown Volume mode: " << volume.mode();
+          break;
+      }
+    }
+  }
+
+  stream << volumeConfig;
+
+  return stream;
+}
+
+
+ostream& operator << (ostream& stream, const Resource::DiskInfo& disk) {
+  if (disk.has_persistence()) {
+    stream << disk.persistence().id();
+  }
+
+  if (disk.has_volume()) {
+    stream << ":" << disk.volume();
+  }
+
+  return stream;
+}
+
+
+ostream& operator << (ostream& stream, const Resource& resource)
+{
+  stream << resource.name();
+
+  stream << "(" << resource.role();
+
+  if (resource.has_reservation()) {
+    stream << ", " << resource.reservation().principal();
+  }
+
+  stream << ")";
+
+  if (resource.has_disk()) {
+    stream << "[" << resource.disk() << "]";
+  }
+
+  // Once extended revocable attributes are available, change this to a more
+  // meaningful value.
+  if (resource.has_revocable()) {
+    stream << "{REV}";
+  }
+
+  stream << ":";
+
+  switch (resource.type()) {
+    case Value::SCALAR: stream << resource.scalar(); break;
+    case Value::RANGES: stream << resource.ranges(); break;
+    case Value::SET:    stream << resource.set();    break;
+    default:
+      LOG(FATAL) << "Unexpected Value type: " << resource.type();
+      break;
+  }
+
+  return stream;
+}
+
+
+ostream& operator << (ostream& stream, const Resources& resources)
+{
+  Resources::const_iterator it = resources.begin();
+
+  while (it != resources.end()) {
+    stream << *it;
+    if (++it != resources.end()) {
+      stream << "; ";
+    }
+  }
+
+  return stream;
+}
+
+} // namespace v1 {
+} // namespace mesos {

Reply via email to