Repository: mesos
Updated Branches:
  refs/heads/master 957b4f280 -> 54e47b443


Added parameter validation to ReRegisterSlaveMessage.

The ReRegisterSlaveMessage message sends a number of fields which have
internal consistency requirements. Add some simple validation checks
to ensure that we have a minimally consistent re-registration request
before proceeding.

Review: https://reviews.apache.org/r/58305/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b22c9e73
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b22c9e73
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b22c9e73

Branch: refs/heads/master
Commit: b22c9e73375b38c98a25c337e0d5d13e2c826573
Parents: 957b4f2
Author: James Peach <[email protected]>
Authored: Wed Apr 26 15:36:12 2017 -0400
Committer: Neil Conway <[email protected]>
Committed: Wed Apr 26 16:01:29 2017 -0400

----------------------------------------------------------------------
 src/master/master.cpp                 |  10 +++
 src/master/validation.cpp             | 135 +++++++++++++++++++++++++++--
 src/master/validation.hpp             |  16 ++++
 src/tests/master_validation_tests.cpp |  54 ++++++++++++
 4 files changed, 208 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b22c9e73/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d1cdc35..9814df2 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5567,6 +5567,16 @@ void Master::reregisterSlave(
     return;
   }
 
+  Option<Error> error = validation::master::message::reregisterSlave(
+      slaveInfo, tasks, checkpointedResources, executorInfos, frameworks);
+
+  if (error.isSome()) {
+    LOG(WARNING) << "Dropping re-registration of agent at " << from
+                 << " because it sent an invalid re-registration: "
+                 << error->message;
+    return;
+  }
+
   MachineID machineId;
   machineId.set_hostname(slaveInfo.hostname());
   machineId.set_ip(stringify(from.address.ip));

http://git-wip-us.apache.org/repos/asf/mesos/blob/b22c9e73/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 3f70875..768fc35 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -228,6 +228,105 @@ Option<Error> validate(
 }
 
 } // namespace call {
+
+namespace message {
+
+Option<Error> reregisterSlave(
+    const SlaveInfo& slaveInfo,
+    const vector<Task>& tasks,
+    const vector<Resource>& resources,
+    const vector<ExecutorInfo>& executorInfos,
+    const vector<FrameworkInfo>& frameworkInfos)
+{
+  hashset<FrameworkID> frameworkIDs;
+  hashset<ExecutorID> executorIDs;
+
+  foreach (const Resource& resource, resources) {
+    Option<Error> error = Resources::validate(resource);
+    if (error.isSome()) {
+      return error.get();
+    }
+  }
+
+  foreach (const FrameworkInfo& framework, frameworkInfos) {
+    Option<Error> error = validation::framework::validate(framework);
+    if (error.isSome()) {
+      return error.get();
+    }
+
+    if (frameworkIDs.contains(framework.id())) {
+      return Error("Framework has a duplicate FrameworkID: '" +
+                  stringify(framework.id()) + "'");
+    }
+
+    frameworkIDs.insert(framework.id());
+  }
+
+  foreach (const ExecutorInfo& executor, executorInfos) {
+    Option<Error> error = validation::executor::validate(executor);
+    if (error.isSome()) {
+      return error.get();
+    }
+
+    // We don't use internal::validateResources() here because
+    // that includes the validateAllocatedToSingleRole() check,
+    // which is not valid for agent re-registration.
+    error = Resources::validate(executor.resources());
+    if (error.isSome()) {
+      return error.get();
+    }
+
+    if (!frameworkIDs.contains(executor.framework_id())) {
+      return Error("Executor has an invalid FrameworkID '" +
+                   stringify(executor.framework_id()) + "'");
+    }
+
+    if (executor.has_executor_id()) {
+      if (executorIDs.contains(executor.executor_id())) {
+        return Error("Executor has a duplicate ExecutorID '" +
+                     stringify(executor.executor_id()) + "'");
+      }
+
+      executorIDs.insert(executor.executor_id());
+    }
+  }
+
+  foreach (const Task& task, tasks) {
+    Option<Error> error = common::validation::validateTaskID(task.task_id());
+    if (error.isSome()) {
+      return Error("Task has an invalid TaskID: " + error->message);
+    }
+
+    if (task.slave_id() != slaveInfo.id()) {
+      return Error("Task has an invalid SlaveID '" +
+                   stringify(task.slave_id()) + "'");
+    }
+
+    if (!frameworkIDs.contains(task.framework_id())) {
+      return Error("Task has an invalid FrameworkID '" +
+                   stringify(task.framework_id()) + "'");
+    }
+
+    // Command Executors don't send the executor ID in the task because it
+    // is generated on the agent (see Slave::doReliableRegistration). Only
+    // running tasks ought to have executors.
+    if (task.has_executor_id() && task.state() == TASK_RUNNING) {
+      if (!executorIDs.contains(task.executor_id())) {
+        return Error("Task has an invalid ExecutorID '" +
+                     stringify(task.executor_id()) + "'");
+      }
+    }
+
+    error = resource::validate(task.resources());
+    if (error.isSome()) {
+      return Error("Task uses invalid resources: " + error->message);
+    }
+  }
+
+  return None();
+}
+
+} // namespace message {
 } // namespace master {
 
 
@@ -827,19 +926,20 @@ Option<Error> validate(
   CHECK_NOTNULL(framework);
   CHECK_NOTNULL(slave);
 
-  vector<lambda::function<Option<Error>()>> validators = {
-    lambda::bind(internal::validateType, executor),
-    lambda::bind(internal::validateExecutorID, executor),
+  Option<Error> error = executor::validate(executor);
+  if (error.isSome()) {
+    return error;
+  }
+
+  const vector<lambda::function<Option<Error>()>> executorValidators = {
     lambda::bind(internal::validateFrameworkID, executor, framework),
-    lambda::bind(internal::validateShutdownGracePeriod, executor),
     lambda::bind(internal::validateResources, executor),
     lambda::bind(
         internal::validateCompatibleExecutorInfo, executor, framework, slave),
-    lambda::bind(internal::validateCommandInfo, executor)
   };
 
-  foreach (const lambda::function<Option<Error>()>& validator, validators) {
-    Option<Error> error = validator();
+  foreach (const auto& validator, executorValidators) {
+    error = validator();
     if (error.isSome()) {
       return error;
     }
@@ -849,6 +949,27 @@ Option<Error> validate(
 }
 
 } // namespace internal {
+
+Option<Error> validate(const ExecutorInfo& executor)
+{
+  const vector<lambda::function<Option<Error>(const ExecutorInfo&)>>
+      executorValidators = {
+    internal::validateType,
+    internal::validateExecutorID,
+    internal::validateShutdownGracePeriod,
+    internal::validateCommandInfo,
+  };
+
+  foreach (const auto& validator, executorValidators) {
+    Option<Error> error = validator(executor);
+    if (error.isSome()) {
+      return error.get();
+    }
+  }
+
+  return None();
+}
+
 } // namespace executor {
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b22c9e73/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index d96287d..5f01a67 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -17,6 +17,8 @@
 #ifndef __MASTER_VALIDATION_HPP__
 #define __MASTER_VALIDATION_HPP__
 
+#include <vector>
+
 #include <google/protobuf/repeated_field.h>
 
 #include <mesos/mesos.hpp>
@@ -52,6 +54,17 @@ Option<Error> validate(
     const Option<process::http::authentication::Principal>& principal = 
None());
 
 } // namespace call {
+
+namespace message {
+
+Option<Error> reregisterSlave(
+    const SlaveInfo& slaveInfo,
+    const std::vector<Task>& tasks,
+    const std::vector<Resource>& resources,
+    const std::vector<ExecutorInfo>& executorInfos,
+    const std::vector<FrameworkInfo>& frameworkInfos);
+
+} // namespace message {
 } // namespace master {
 
 
@@ -130,6 +143,9 @@ Option<Error> validateType(const ExecutorInfo& executor);
 Option<Error> validateResources(const ExecutorInfo& executor);
 
 } // namespace internal {
+
+Option<Error> validate(const ExecutorInfo& executor);
+
 } // namespace executor {
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b22c9e73/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp 
b/src/tests/master_validation_tests.cpp
index 5553808..1e0e764 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -3726,6 +3726,60 @@ TEST_F(FrameworkInfoValidationTest, 
RoleChangeWithMultiRoleMasterFailover)
   }
 }
 
+
+class RegisterSlaveValidationTest : public MesosTest {};
+
+
+TEST_F(RegisterSlaveValidationTest, DropInvalidReregistration)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
+  ASSERT_SOME(slave);
+
+  // Wait until the master acknowledges the slave registration.
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Drop and capture the slave's ReregisterSlaveMessage.
+  Future<ReregisterSlaveMessage> reregisterSlaveMessage =
+    DROP_PROTOBUF(ReregisterSlaveMessage(), slave.get()->pid, _);
+
+  // Simulate a new master detected event on the slave,
+  // so that the slave will do a re-registration.
+  detector.appoint(master.get()->pid);
+
+  AWAIT_READY(reregisterSlaveMessage);
+
+  // Now that we have a valid ReregisterSlaveMessage, tweak it to
+  // fail validation.
+  ReregisterSlaveMessage message = reregisterSlaveMessage.get();
+
+  Task* task = message.add_tasks();
+  task->set_name(UUID::random().toString());
+  task->mutable_slave_id()->set_value(UUID::random().toString());
+  task->mutable_task_id()->set_value(UUID::random().toString());
+  task->mutable_framework_id()->set_value(UUID::random().toString());
+  task->mutable_executor_id()->set_value(UUID::random().toString());
+  task->set_state(TASK_RUNNING);
+
+  // We expect the master to drop the ReregisterSlaveMessage, so it
+  // will not send any more SlaveReregisteredMessage responses.
+  EXPECT_NO_FUTURE_PROTOBUFS(SlaveReregisteredMessage(), _, _);
+
+  // Send the modified message to the master.
+  process::post(slave.get()->pid, master->get()->pid, message);
+
+  // Settle the clock to retire in-flight messages.
+  Clock::pause();
+  Clock::settle();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to