Repository: mesos Updated Branches: refs/heads/master b16999a4c -> 7aede4ad4
http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/fetcher_cache_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fetcher_cache_tests.cpp b/src/tests/fetcher_cache_tests.cpp new file mode 100644 index 0000000..99777f8 --- /dev/null +++ b/src/tests/fetcher_cache_tests.cpp @@ -0,0 +1,1359 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <unistd.h> + +#include <gmock/gmock.h> + +#include <list> +#include <mutex> +#include <string> +#include <vector> + +#include <mesos/executor.hpp> +#include <mesos/scheduler.hpp> + +#include <process/check.hpp> +#include <process/clock.hpp> +#include <process/collect.hpp> +#include <process/future.hpp> +#include <process/gmock.hpp> +#include <process/message.hpp> +#include <process/owned.hpp> +#include <process/pid.hpp> +#include <process/process.hpp> +#include <process/queue.hpp> +#include <process/subprocess.hpp> + +#include <stout/option.hpp> +#include <stout/os.hpp> +#include <stout/try.hpp> + +#include "common/lock.hpp" + +#include "master/flags.hpp" +#include "master/master.hpp" + +#include "slave/constants.hpp" +#include "slave/gc.hpp" +#include "slave/flags.hpp" +#include "slave/paths.hpp" +#include "slave/slave.hpp" + +#include "slave/containerizer/fetcher.hpp" + +#include "tests/containerizer.hpp" +#include "tests/flags.hpp" +#include "tests/mesos.hpp" + +using mesos::internal::master::Master; + +using mesos::internal::slave::Slave; +using mesos::internal::slave::Containerizer; +using mesos::internal::slave::MesosContainerizer; +using mesos::internal::slave::MesosContainerizerProcess; +using mesos::internal::slave::Fetcher; +using mesos::internal::slave::FetcherProcess; + +using process::Future; +using process::HttpEvent; +using process::Owned; +using process::PID; +using process::Promise; +using process::Process; +using process::Queue; +using process::Subprocess; + +using std::list; +using std::string; +using std::vector; + +using testing::_; +using testing::DoAll; +using testing::DoDefault; +using testing::Eq; +using testing::Invoke; +using testing::InvokeWithoutArgs; +using testing::Return; + +namespace mesos { +namespace internal { +namespace tests { + +static const string ASSETS_DIRECTORY_NAME = "mesos-fetcher-test-assets"; +static const string COMMAND_NAME = "mesos-fetcher-test-cmd"; +static const string ARCHIVE_NAME = "mesos-fetcher-test-archive.tgz"; +static const string ARCHIVED_COMMAND_NAME = "mesos-fetcher-test-acmd"; + +// Every task executes one of these shell scripts, which create a +// file that includes the current task name in its name. The latter +// is expected to be passed in as a script argument. The existence +// of the file with that name is then used as proof that the task +// ran successfully. +static const string COMMAND_SCRIPT = "touch " + COMMAND_NAME + "$1"; +static const string ARCHIVED_COMMAND_SCRIPT = + "touch " + ARCHIVED_COMMAND_NAME + "$1"; + + +class FetcherCacheTest : public MesosTest +{ +public: + struct Task { + Path runDirectory; + Queue<TaskStatus> statusQueue; + }; + + void setupCommandFileAsset(); + +protected: + void setupArchiveAsset(); + + virtual void SetUp(); + virtual void TearDown(); + + // Sets up the slave and starts it. Calling this late in the test + // instead of having it included in SetUp() gives us the opportunity + // to manipulate values in 'flags', first. + void startSlave(); + + // Stops the slave, deleting the containerizer, for subsequent + // recovery testing. + void stopSlave(); + + Task launchTask(const CommandInfo& commandInfo, const size_t taskIndex); + + vector<Task> launchTasks(const vector<CommandInfo>& commandInfos); + + // Waits until FetcherProcess::run() has been called for all tasks. + void awaitFetchContention(); + + string assetsDirectory; + string commandPath; + string archivePath; + + slave::Flags flags; + MesosContainerizer* containerizer; + PID<Slave> slavePid; + SlaveID slaveId; + string cacheDirectory; + MockFetcherProcess* fetcherProcess; + MockScheduler scheduler; + MesosSchedulerDriver* driver; + +private: + Fetcher* fetcher; + + FrameworkID frameworkId; + + // Promises whose futures indicate that FetcherProcess::_fetch() has been + // called for a task with a given index. + vector<Owned<Promise<Nothing>>> fetchContentionWaypoints; +}; + + +void FetcherCacheTest::SetUp() +{ + MesosTest::SetUp(); + + flags = CreateSlaveFlags(); + flags.resources = + Some(stringify(Resources::parse("cpus:1000;mem:1000").get())); + + assetsDirectory = path::join(flags.work_dir, ASSETS_DIRECTORY_NAME); + ASSERT_SOME(os::mkdir(assetsDirectory)); + + setupCommandFileAsset(); + setupArchiveAsset(); + + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + fetcherProcess = new MockFetcherProcess(); + fetcher = new Fetcher(Owned<FetcherProcess>(fetcherProcess)); + + FrameworkInfo frameworkInfo; + frameworkInfo.set_name("default"); + frameworkInfo.set_checkpoint(true); + + driver = new MesosSchedulerDriver( + &scheduler, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(scheduler, registered(driver, _, _)) + .Times(1); +} + + +void FetcherCacheTest::TearDown() +{ + driver->stop(); + driver->join(); + delete driver; + + delete fetcher; + + MesosTest::TearDown(); +} + + +// TODO(bernd-mesos): Make this abstractions as generic and generally +// available for all testing as possible. +void FetcherCacheTest::startSlave() +{ + Try<MesosContainerizer*> create = MesosContainerizer::create( + flags, true, fetcher); + ASSERT_SOME(create); + containerizer = create.get(); + + Try<PID<Slave>> pid = StartSlave(containerizer, flags); + ASSERT_SOME(pid); + slavePid = pid.get(); + + // Obtain the slave ID. + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + AWAIT_READY(slaveRegisteredMessage); + slaveId = slaveRegisteredMessage.get().slave_id(); + + cacheDirectory = + slave::paths::getSlavePath(flags.fetcher_cache_dir, slaveId); +} + + +void FetcherCacheTest::stopSlave() +{ + Stop(slavePid); + delete containerizer; +} + + +void FetcherCacheTest::setupCommandFileAsset() +{ + commandPath = path::join(assetsDirectory, COMMAND_NAME); + ASSERT_SOME(os::write(commandPath, COMMAND_SCRIPT)); + + // Make the command file read-only, so we can discern the URI + // executable flag. + ASSERT_SOME(os::chmod(commandPath, S_IRUSR | S_IRGRP | S_IROTH)); +} + + +void FetcherCacheTest::setupArchiveAsset() +{ + string path = path::join(assetsDirectory, ARCHIVED_COMMAND_NAME); + ASSERT_SOME(os::write(path, ARCHIVED_COMMAND_SCRIPT)); + + // Make the archived command file executable before archiving it, + // since the executable flag for CommandInfo::URI has no effect on + // what comes out of an archive. + ASSERT_SOME(os::chmod(path, S_IRWXU | S_IRWXG | S_IRWXO)); + + const string cwd = os::getcwd(); + ASSERT_SOME(os::chdir(assetsDirectory)); + ASSERT_SOME(os::tar(ARCHIVED_COMMAND_NAME, ARCHIVE_NAME)); + ASSERT_SOME(os::chdir(cwd)); + archivePath = path::join(assetsDirectory, ARCHIVE_NAME); + + // Make the archive file read-only, so we can tell if it becomes + // executable by acccident. + ASSERT_SOME(os::chmod(archivePath, S_IRUSR | S_IRGRP | S_IROTH)); +} + + +static string taskName(int taskIndex) +{ + return stringify(taskIndex); +} + + +// TODO(bernd-mesos): Use Path, not string, create Path::executable(). +static bool isExecutable(const string& path) +{ + Try<bool> access = os::access(path, X_OK); + EXPECT_SOME(access); + return access.isSome() && access.get(); +} + + +// Create a future that indicates that the task observed by the given +// status queue is finished. +static Future<Nothing> awaitFinished(FetcherCacheTest::Task task) +{ + return task.statusQueue.get() + .then([=](const TaskStatus& status) -> Future<Nothing> { + if (status.state() == TASK_FINISHED) { + return Nothing(); + } + return awaitFinished(task); + }); +} + + +// Create a future that indicates that all tasks are finished. +// TODO(bernd-mesos): Make this abstractions as generic and generally +// available for all testing as possible. +static Future<list<Nothing>> awaitFinished( + vector<FetcherCacheTest::Task> tasks) +{ + list<Future<Nothing>> futures; + + foreach (FetcherCacheTest::Task task, tasks) { + futures.push_back(awaitFinished(task)); + } + + return collect(futures); +} + + +// Pushes the TaskStatus value in mock call argument #1 into the +// given queue, which later on shall be queried by awaitFinished(). +ACTION_P(PushTaskStatus, taskStatusQueue) +{ + TaskStatus taskStatus = arg1; + + // Input parameters of ACTION_P are const. We make a mutable copy + // so that we can use put(). + Queue<TaskStatus> queue = taskStatusQueue; + + queue.put(taskStatus); +} + + +// Launches a task as described by its CommandInfo and returns its sandbox +// run directory path. Its completion will be indicated by the result of +// awaitFinished(task), where `task` is the return value of this method.. +// TODO(bernd-mesos): Make this abstractions as generic and generally +// available for all testing as possible. +FetcherCacheTest::Task FetcherCacheTest::launchTask( + const CommandInfo& commandInfo, + const size_t taskIndex) +{ + Future<vector<Offer>> offers; + EXPECT_CALL(scheduler, resourceOffers(driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(DeclineOffers()); + + offers.await(Seconds(15)); + CHECK_READY(offers) << "Failed to wait for resource offers"; + + EXPECT_NE(0u, offers.get().size()); + const Offer offer = offers.get()[0]; + + TaskInfo task; + task.set_name(taskName(taskIndex)); + task.mutable_task_id()->set_value(taskName(taskIndex)); + task.mutable_slave_id()->CopyFrom(offer.slave_id()); + + // We don't care about resources in these tests. This small amount + // will always succeed. + task.mutable_resources()->CopyFrom( + Resources::parse("cpus:1;mem:1").get()); + + task.mutable_command()->CopyFrom(commandInfo); + + // Since we are always using a command executor here, the executor + // ID can be determined by copying the task ID. + ExecutorID executorId; + executorId.set_value(task.task_id().value()); + + vector<TaskInfo> tasks; + tasks.push_back(task); + + Queue<TaskStatus> taskStatusQueue; + + EXPECT_CALL(scheduler, statusUpdate(driver, _)) + .WillRepeatedly(PushTaskStatus(taskStatusQueue)); + + driver->launchTasks(offer.id(), tasks); + + const Path path = Path(slave::paths::getExecutorLatestRunPath( + flags.work_dir, + slaveId, + offer.framework_id(), + executorId)); + + return Task {path, taskStatusQueue}; +} + + +// Pushes the task status value of a task status update callback +// into the task status queue that corresponds to the task index/ID +// for which the status update is being reported. 'tasks' must be a +// 'vector<Task>>', where every slot index corresponds to a task +// index/ID. +// TODO(bernd-mesos): Make this abstractions as generic and generally +// available for all testing as possible. +ACTION_TEMPLATE(PushIndexedTaskStatus, + HAS_1_TEMPLATE_PARAMS(int, k), + AND_1_VALUE_PARAMS(tasks)) +{ + TaskStatus taskStatus = ::std::tr1::get<k>(args); + Try<int> taskId = numify<int>(taskStatus.task_id().value()); + ASSERT_SOME(taskId); + Queue<TaskStatus> queue = (tasks)[taskId.get()].statusQueue; + queue.put(taskStatus); +} + + +// Satisfies the first promise in the list that is not satisfied yet. +ACTION_P(SatisfyOne, promises) +{ + foreach (const Owned<Promise<Nothing>>& promise, *promises) { + if (promise->future().isPending()) { + promise->set(Nothing()); + return; + } + } + + FAIL() << "Tried to call FetcherProcess::_fetch() " + << "for more tasks than launched"; +} + + +// Launches the tasks described by the given CommandInfo and returns a +// vector holding the run directory paths. All these tasks run +// concurrently. Their completion will be indicated by the result of +// awaitFinished(tasks), where `tasks` is the return value of this +// method. +// TODO(bernd-mesos): Make this abstractions as generic and generally +// available for all testing as possible. +vector<FetcherCacheTest::Task> FetcherCacheTest::launchTasks( + const vector<CommandInfo>& commandInfos) +{ + vector<FetcherCacheTest::Task> result; + + // When _fetch() is called, notify us by satisfying a promise that + // a task has passed the code stretch in which it competes for cache + // entries. + EXPECT_CALL(*fetcherProcess, _fetch(_, _, _, _, _, _, _)) + .WillRepeatedly( + DoAll(SatisfyOne(&fetchContentionWaypoints), + Invoke(fetcherProcess, &MockFetcherProcess::unmocked__fetch))); + + Future<vector<Offer>> offers; + EXPECT_CALL(scheduler, resourceOffers(driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(DeclineOffers()); + + offers.await(Seconds(15)); + CHECK_READY(offers) << "Failed to wait for resource offers"; + + EXPECT_NE(0u, offers.get().size()); + const Offer offer = offers.get()[0]; + + vector<TaskInfo> tasks; + foreach (const CommandInfo& commandInfo, commandInfos) { + size_t taskIndex = tasks.size(); + + // Grabbing the framework ID from somewhere. It should not matter + // if this happens several times, as we expect the framework ID to + // remain the same. + frameworkId = offer.framework_id(); + + TaskInfo task; + task.set_name(taskName(taskIndex)); + task.mutable_task_id()->set_value(taskName(taskIndex)); + task.mutable_slave_id()->CopyFrom(offer.slave_id()); + + // We don't care about resources in these tests. This small amount + // will always succeed. + task.mutable_resources()->CopyFrom( + Resources::parse("cpus:1;mem:1").get()); + + task.mutable_command()->CopyFrom(commandInfo); + + tasks.push_back(task); + + // Since we are always using a command executor here, the executor + // ID can be determined by copying the task ID. + ExecutorID executorId; + executorId.set_value(task.task_id().value()); + + Path runDirectory = Path(slave::paths::getExecutorLatestRunPath( + flags.work_dir, + slaveId, + frameworkId, + executorId)); + + // Grabbing task status futures to wait for. We make a queue of futures + // for each task. We can then wait until the front element indicates + // status TASK_FINISHED. We use a queue, because we never know which + // status update will be the one we have been waiting for. + Queue<TaskStatus> taskStatusQueue; + + result.push_back(Task {runDirectory, taskStatusQueue}); + + EXPECT_CALL(scheduler, statusUpdate(driver, _)) + .WillRepeatedly(PushIndexedTaskStatus<1>(result)); + + auto waypoint = Owned<Promise<Nothing>>(new Promise<Nothing>()); + fetchContentionWaypoints.push_back(waypoint); + } + + driver->launchTasks(offer.id(), tasks); + + return result; +} + + +// Ensure that FetcherProcess::_fetch() has been called for each task, +// which means that all tasks are competing for downloading the same URIs. +void FetcherCacheTest::awaitFetchContention() +{ + foreach (const Owned<Promise<Nothing>>& waypoint, fetchContentionWaypoints) { + AWAIT(waypoint->future()); + } +} + + +// Tests fetching from the local asset directory without cache. This +// gives us a baseline for the following tests and lets us debug our +// test infrastructure without extra complications. +TEST_F(FetcherCacheTest, LocalUncached) +{ + startSlave(); + driver->start(); + + for (size_t i = 0; i < 3; i++) { + CommandInfo::URI uri; + uri.set_value(commandPath); + uri.set_executable(true); + + CommandInfo commandInfo; + commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i)); + commandInfo.add_uris()->CopyFrom(uri); + + const Task task = launchTask(commandInfo, i); + + AWAIT_READY(awaitFinished(task)); + + EXPECT_EQ(0u, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(0u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); + + const string path = path::join(task.runDirectory.value, COMMAND_NAME); + EXPECT_TRUE(isExecutable(path)); + EXPECT_TRUE(os::exists(path + taskName(i))); + } +} + + +// Tests fetching from the local asset directory with simple caching. +// Only one download must occur. Fetching is serialized, to cover +// code areas without overlapping/concurrent fetch attempts. +TEST_F(FetcherCacheTest, LocalCached) +{ + startSlave(); + driver->start(); + + for (size_t i = 0; i < 3; i++) { + CommandInfo::URI uri; + uri.set_value(commandPath); + uri.set_executable(true); + uri.set_cache(true); + + CommandInfo commandInfo; + commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i)); + commandInfo.add_uris()->CopyFrom(uri); + + const Task task = launchTask(commandInfo, i); + + AWAIT_READY(awaitFinished(task)); + + const string path = path::join(task.runDirectory.value, COMMAND_NAME); + EXPECT_TRUE(isExecutable(path)); + EXPECT_TRUE(os::exists(path + taskName(i))); + + EXPECT_EQ(1u, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); + } +} + + +// Tests falling back on bypassing the cache when fetching the download +// size of a URI that is supposed to be cached fails. +TEST_F(FetcherCacheTest, CachedFallback) +{ + startSlave(); + driver->start(); + + // Make sure the content-length request fails. + ASSERT_SOME(os::rm(commandPath)); + + CommandInfo::URI uri; + uri.set_value(commandPath); + uri.set_executable(true); + uri.set_cache(true); + + CommandInfo commandInfo; + commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(0)); + commandInfo.add_uris()->CopyFrom(uri); + + // Bring back the asset just before running mesos-fetcher to fetch it. + Future<FetcherInfo> fetcherInfo; + EXPECT_CALL(*fetcherProcess, run(_, _, _)) + .WillOnce(DoAll(FutureArg<1>(&fetcherInfo), + InvokeWithoutArgs(this, + &FetcherCacheTest::setupCommandFileAsset), + Invoke(fetcherProcess, + &MockFetcherProcess::unmocked_run))); + + const Task task = launchTask(commandInfo, 0); + + AWAIT_READY(awaitFinished(task)); + + const string path = path::join(task.runDirectory.value, COMMAND_NAME); + EXPECT_TRUE(isExecutable(path)); + EXPECT_TRUE(os::exists(path + taskName(0))); + + AWAIT_READY(fetcherInfo); + + EXPECT_EQ(1, fetcherInfo.get().items_size()); + EXPECT_EQ(FetcherInfo::Item::BYPASS_CACHE, + fetcherInfo.get().items(0).action()); + + EXPECT_EQ(0u, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(0u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); +} + + +// Tests archive extraction without caching as a baseline for the +// subsequent test below. +TEST_F(FetcherCacheTest, LocalUncachedExtract) +{ + startSlave(); + driver->start(); + + for (size_t i = 0; i < 3; i++) { + CommandInfo::URI uri; + uri.set_value(archivePath); + uri.set_extract(true); + + CommandInfo commandInfo; + commandInfo.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(i)); + commandInfo.add_uris()->CopyFrom(uri); + + const Task task = launchTask(commandInfo, i); + + AWAIT_READY(awaitFinished(task)); + + EXPECT_TRUE(os::exists( + path::join(task.runDirectory.value, ARCHIVE_NAME))); + EXPECT_FALSE(isExecutable( + path::join(task.runDirectory.value, ARCHIVE_NAME))); + + const string path = + path::join(task.runDirectory.value, ARCHIVED_COMMAND_NAME); + EXPECT_TRUE(isExecutable(path)); + EXPECT_TRUE(os::exists(path + taskName(i))); + + EXPECT_EQ(0u, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(0u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); + } +} + + +// Tests archive extraction in combination with caching. +TEST_F(FetcherCacheTest, LocalCachedExtract) +{ + startSlave(); + driver->start(); + + for (size_t i = 0; i < 3; i++) { + CommandInfo::URI uri; + uri.set_value(archivePath); + uri.set_extract(true); + uri.set_cache(true); + + CommandInfo commandInfo; + commandInfo.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(i)); + commandInfo.add_uris()->CopyFrom(uri); + + const Task task = launchTask(commandInfo, i); + + AWAIT_READY(awaitFinished(task)); + + EXPECT_FALSE(os::exists( + path::join(task.runDirectory.value, ARCHIVE_NAME))); + + const string path = + path::join(task.runDirectory.value, ARCHIVED_COMMAND_NAME); + EXPECT_TRUE(isExecutable(path)); + EXPECT_TRUE(os::exists(path + taskName(i))); + + EXPECT_EQ(1u, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); + } +} + + +class FetcherCacheHttpTest : public FetcherCacheTest +{ +public: + // A minimal HTTP server (not intended as an actor) just reusing what + // is already implemented somewhere to serve some HTTP requests for + // file downloads. Plus counting how many requests are made. Plus the + // ability to pause answering requests, stalling them. + class HttpServer : public Process<HttpServer> + { + public: + HttpServer(FetcherCacheHttpTest* test) + : countRequests(0), + countCommandRequests(0), + countArchiveRequests(0) + { + provide(COMMAND_NAME, test->commandPath); + provide(ARCHIVE_NAME, test->archivePath); + + spawn(this); + } + + string url() + { + return "http://127.0.0.1:" + + stringify(self().address.port) + + "/" + self().id + "/"; + } + + // Stalls the execution of HTTP requests inside visit(). + void pause() + { + mutex.lock(); + } + + void resume() + { + mutex.unlock(); + } + + virtual void visit(const HttpEvent& event) + { + std::lock_guard<std::mutex> lock(mutex); + + countRequests++; + + if (strings::contains(event.request->path, COMMAND_NAME)) { + countCommandRequests++; + } + + if (strings::contains(event.request->path, ARCHIVE_NAME)) { + countArchiveRequests++; + } + + ProcessBase::visit(event); + } + + void resetCounts() + { + countRequests = 0; + countCommandRequests = 0; + countArchiveRequests = 0; + } + + size_t countRequests; + size_t countCommandRequests; + size_t countArchiveRequests; + + private: + std::mutex mutex; + }; + + + virtual void SetUp() + { + FetcherCacheTest::SetUp(); + + httpServer = new HttpServer(this); + } + + virtual void TearDown() + { + terminate(httpServer); + wait(httpServer); + delete httpServer; + + FetcherCacheTest::TearDown(); + } + + HttpServer* httpServer; +}; + + +// Tests fetching via HTTP with caching. Only one download must +// occur. Fetching is serialized, to cover code areas without +// overlapping/concurrent fetch attempts. +TEST_F(FetcherCacheHttpTest, HttpCachedSerialized) +{ + startSlave(); + driver->start(); + + for (size_t i = 0; i < 3; i++) { + CommandInfo::URI uri; + uri.set_value(httpServer->url() + COMMAND_NAME); + uri.set_executable(true); + uri.set_cache(true); + + CommandInfo commandInfo; + commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i)); + commandInfo.add_uris()->CopyFrom(uri); + + const Task task = launchTask(commandInfo, i); + + AWAIT_READY(awaitFinished(task)); + + const string path = + path::join(task.runDirectory.value, COMMAND_NAME); + EXPECT_TRUE(isExecutable(path)); + EXPECT_TRUE(os::exists(path + taskName(i))); + + EXPECT_EQ(1u, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); + + // 2 requests: 1 for content-length, 1 for download. + EXPECT_EQ(2u, httpServer->countCommandRequests); + } +} + + +// Tests multiple concurrent fetching efforts that require some +// concurrency control. One task must "win" and perform the size +// and download request for the URI alone. The others must reuse +// the result. +TEST_F(FetcherCacheHttpTest, HttpCachedConcurrent) +{ + startSlave(); + driver->start(); + + // Causes fetch contention. No task can run yet until resume(). + httpServer->pause(); + + vector<CommandInfo> commandInfos; + const size_t countTasks = 5; + + for (size_t i = 0; i < countTasks; i++) { + CommandInfo::URI uri0; + uri0.set_value(httpServer->url() + COMMAND_NAME); + uri0.set_executable(true); + uri0.set_cache(true); + + CommandInfo commandInfo; + commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i)); + commandInfo.add_uris()->CopyFrom(uri0); + + // Not always caching this URI causes that it will be downloaded + // some of the time. Thus we exercise code paths that eagerly fetch + // new assets while waiting for pending downloads of cached assets + // as well as code paths where no downloading occurs at all. + if (i % 2 == 1) { + CommandInfo::URI uri1; + uri1.set_value(httpServer->url() + ARCHIVE_NAME); + commandInfo.add_uris()->CopyFrom(uri1); + } + + commandInfos.push_back(commandInfo); + } + + vector<Task> tasks = launchTasks(commandInfos); + + CHECK_EQ(countTasks, tasks.size()); + + // Given pausing the HTTP server, this proves that fetch contention + // has happened. All tasks have passed the point where it occurs, + // but they are not running yet. + awaitFetchContention(); + + // Now let the tasks run. + httpServer->resume(); + + AWAIT_READY(awaitFinished(tasks)); + + EXPECT_EQ(1u, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); + + // command content-length requests: 1 + // command downloads: 1 + // archive downloads: 2 + EXPECT_EQ(2u, httpServer->countCommandRequests); + EXPECT_EQ(2u, httpServer->countArchiveRequests); + + for (size_t i = 0; i < countTasks; i++) { + EXPECT_EQ(i % 2 == 1, os::exists( + path::join(tasks[i].runDirectory.value, ARCHIVE_NAME))); + EXPECT_TRUE(isExecutable( + path::join(tasks[i].runDirectory.value, COMMAND_NAME))); + EXPECT_TRUE(os::exists( + path::join(tasks[i].runDirectory.value, COMMAND_NAME + taskName(i)))); + } +} + + +// Tests using multiple URIs per command, variations of caching, +// setting the executable flag, and archive extraction. +TEST_F(FetcherCacheHttpTest, HttpMixed) +{ + startSlave(); + driver->start(); + + // Causes fetch contention. No task can run yet until resume(). + httpServer->pause(); + + vector<CommandInfo> commandInfos; + + // Task 0. + + CommandInfo::URI uri00; + uri00.set_value(httpServer->url() + ARCHIVE_NAME); + uri00.set_cache(true); + uri00.set_extract(false); + uri00.set_executable(false); + + CommandInfo::URI uri01; + uri01.set_value(httpServer->url() + COMMAND_NAME); + uri01.set_extract(false); + uri01.set_executable(true); + + CommandInfo commandInfo0; + commandInfo0.set_value("./" + COMMAND_NAME + " " + taskName(0)); + commandInfo0.add_uris()->CopyFrom(uri00); + commandInfo0.add_uris()->CopyFrom(uri01); + commandInfos.push_back(commandInfo0); + + // Task 1. + + CommandInfo::URI uri10; + uri10.set_value(httpServer->url() + ARCHIVE_NAME); + uri10.set_extract(true); + uri10.set_executable(false); + + CommandInfo::URI uri11; + uri11.set_value(httpServer->url() + COMMAND_NAME); + uri11.set_extract(true); + uri11.set_executable(false); + + CommandInfo commandInfo1; + commandInfo1.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(1)); + commandInfo1.add_uris()->CopyFrom(uri10); + commandInfo1.add_uris()->CopyFrom(uri11); + commandInfos.push_back(commandInfo1); + + // Task 2. + + CommandInfo::URI uri20; + uri20.set_value(httpServer->url() + ARCHIVE_NAME); + uri20.set_cache(true); + uri20.set_extract(true); + uri20.set_executable(false); + + CommandInfo::URI uri21; + uri21.set_value(httpServer->url() + COMMAND_NAME); + uri21.set_extract(false); + uri21.set_executable(false); + + CommandInfo commandInfo2; + commandInfo2.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(2)); + commandInfo2.add_uris()->CopyFrom(uri20); + commandInfo2.add_uris()->CopyFrom(uri21); + commandInfos.push_back(commandInfo2); + + vector<Task> tasks = launchTasks(commandInfos); + + CHECK_EQ(3u, tasks.size()); + + // Given pausing the HTTP server, this proves that fetch contention + // has happened. All tasks have passed the point where it occurs, + // but they are not running yet. + awaitFetchContention(); + + // Now let the tasks run. + httpServer->resume(); + + AWAIT_READY(awaitFinished(tasks)); + + EXPECT_EQ(1u, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); + + // command content-length requests: 0 + // command downloads: 3 + // archive content-length requests: 1 + // archive downloads: 2 + EXPECT_EQ(3u, httpServer->countCommandRequests); + EXPECT_EQ(3u, httpServer->countArchiveRequests); + + // Task 0. + + EXPECT_FALSE(isExecutable( + path::join(tasks[0].runDirectory.value, ARCHIVE_NAME))); + EXPECT_FALSE(os::exists( + path::join(tasks[0].runDirectory.value, ARCHIVED_COMMAND_NAME))); + + EXPECT_TRUE(isExecutable( + path::join(tasks[0].runDirectory.value, COMMAND_NAME))); + EXPECT_TRUE(os::exists( + path::join(tasks[0].runDirectory.value, COMMAND_NAME + taskName(0)))); + + // Task 1. + + EXPECT_FALSE(isExecutable( + path::join(tasks[1].runDirectory.value, ARCHIVE_NAME))); + EXPECT_TRUE(isExecutable( + path::join(tasks[1].runDirectory.value, ARCHIVED_COMMAND_NAME))); + EXPECT_TRUE(os::exists(path::join( + tasks[1].runDirectory.value, ARCHIVED_COMMAND_NAME + taskName(1)))); + + EXPECT_FALSE(isExecutable( + path::join(tasks[1].runDirectory.value, COMMAND_NAME))); + + // Task 2. + + EXPECT_FALSE(os::exists( + path::join(tasks[2].runDirectory.value, ARCHIVE_NAME))); + EXPECT_TRUE(isExecutable( + path::join(tasks[2].runDirectory.value, ARCHIVED_COMMAND_NAME))); + EXPECT_TRUE(os::exists(path::join( + tasks[2].runDirectory.value, ARCHIVED_COMMAND_NAME + taskName(2)))); + + EXPECT_FALSE(isExecutable( + path::join(tasks[2].runDirectory.value, COMMAND_NAME))); +} + + +// Tests slave recovery of the fetcher cache. The cache must be +// wiped clean on recovery, causing renewed downloads. +TEST_F(FetcherCacheHttpTest, HttpCachedRecovery) +{ + startSlave(); + driver->start(); + + for (size_t i = 0; i < 3; i++) { + CommandInfo::URI uri; + uri.set_value(httpServer->url() + COMMAND_NAME); + uri.set_executable(true); + uri.set_cache(true); + + CommandInfo commandInfo; + commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i)); + commandInfo.add_uris()->CopyFrom(uri); + + const Task task = launchTask(commandInfo, i); + + AWAIT_READY(awaitFinished(task)); + + const string path = path::join(task.runDirectory.value, COMMAND_NAME); + EXPECT_TRUE(isExecutable(path)); + EXPECT_TRUE(os::exists(path + taskName(i))); + + EXPECT_EQ(1u, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); + + // content-length requests: 1 + // downloads: 1 + EXPECT_EQ(2u, httpServer->countCommandRequests); + } + + stopSlave(); + + // Start over. + httpServer->resetCounts(); + + // Don't reuse the old fetcher, which has stale state after + // stopping the slave. + Fetcher fetcher2; + + Try<MesosContainerizer*> c = + MesosContainerizer::create(flags, true, &fetcher2); + CHECK_SOME(c); + containerizer = c.get(); + + // Set up so we can wait until the new slave updates the container's + // resources (this occurs after the executor has re-registered). + Future<Nothing> update = + FUTURE_DISPATCH(_, &MesosContainerizerProcess::update); + + Try<PID<Slave>> pid = StartSlave(containerizer, flags); + CHECK_SOME(pid); + slavePid = pid.get(); + + // Wait until the containerizer is updated. + AWAIT_READY(update); + + // Recovery must have cleaned the cache by now. + EXPECT_FALSE(os::exists(cacheDirectory)); + + // Repeat of the above to see if it works the same. + for (size_t i = 0; i < 3; i++) { + CommandInfo::URI uri; + uri.set_value(httpServer->url() + COMMAND_NAME); + uri.set_executable(true); + uri.set_cache(true); + + CommandInfo commandInfo; + commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i)); + commandInfo.add_uris()->CopyFrom(uri); + + const Task task = launchTask(commandInfo, i); + + AWAIT_READY(awaitFinished(task)); + + const string path = + path::join(task.runDirectory.value, COMMAND_NAME); + EXPECT_TRUE(isExecutable(path)); + EXPECT_TRUE(os::exists(path + taskName(i))); + + EXPECT_EQ(1u, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); + + // content-length requests: 1 + // downloads: 1 + EXPECT_EQ(2u, httpServer->countCommandRequests); + } +} + + +// Tests cache eviction. Limits the available cache space then fetches +// more task scripts than fit into the cache and runs them all. We +// observe how the number of cache files rises and then stays constant. +TEST_F(FetcherCacheTest, SimpleEviction) +{ + const size_t countCacheEntries = 3; + + // Let only the first 'countCacheEntries' downloads fit in the cache. + flags.fetcher_cache_size = COMMAND_SCRIPT.size() * countCacheEntries; + + startSlave(); + driver->start(); + + for (size_t i = 0; i < countCacheEntries + 2; i++) { + string commandFilename = "cmd" + stringify(i); + string command = commandFilename + " " + taskName(i); + + commandPath = path::join(assetsDirectory, commandFilename); + ASSERT_SOME(os::write(commandPath, COMMAND_SCRIPT)); + + CommandInfo::URI uri; + uri.set_value(commandPath); + uri.set_executable(true); + uri.set_cache(true); + + CommandInfo commandInfo; + commandInfo.set_value("./" + command); + commandInfo.add_uris()->CopyFrom(uri); + + const Task task = launchTask(commandInfo, i); + + AWAIT_READY(awaitFinished(task)); + + // Check that the task succeeded. + EXPECT_TRUE(isExecutable( + path::join(task.runDirectory.value, commandFilename))); + EXPECT_TRUE(os::exists( + path::join(task.runDirectory.value, COMMAND_NAME + taskName(i)))); + + if (i < countCacheEntries) { + EXPECT_EQ(i + 1, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(i+1u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); + } else { + EXPECT_EQ(countCacheEntries, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(countCacheEntries, + fetcherProcess->cacheFiles(slaveId, flags).get().size()); + } + } +} + + +// Tests cache eviction fallback to bypassing the cache. A first task +// runs normally. Then a second succeeds using eviction. Then a third +// task fails to evict, but still gets executed bypassing the cache. +TEST_F(FetcherCacheTest, FallbackFromEviction) +{ + // The size by which every task's URI download is going to be larger + // than the previous one. + const size_t growth = 10; + + // Let only the first two downloads fit into the cache, one at a time, + // the second evicting the first. The third file won't fit any more, + // being larger than the entire cache. + flags.fetcher_cache_size = COMMAND_SCRIPT.size() + growth; + + startSlave(); + driver->start(); + + // We'll run 3 tasks and these are the task completion futures to wait + // for each time. + Future<FetcherInfo> fetcherInfo0; + Future<FetcherInfo> fetcherInfo1; + Future<FetcherInfo> fetcherInfo2; + EXPECT_CALL(*fetcherProcess, run(_, _, _)) + .WillOnce(DoAll(FutureArg<1>(&fetcherInfo0), + Invoke(fetcherProcess, + &MockFetcherProcess::unmocked_run))) + .WillOnce(DoAll(FutureArg<1>(&fetcherInfo1), + Invoke(fetcherProcess, + &MockFetcherProcess::unmocked_run))) + .WillOnce(DoAll(FutureArg<1>(&fetcherInfo2), + Invoke(fetcherProcess, + &MockFetcherProcess::unmocked_run))); + + + // Task 0: + + const string commandFilename0 = "cmd0"; + const string command0 = commandFilename0 + " " + taskName(0); + + commandPath = path::join(assetsDirectory, commandFilename0); + + // Write the command into the script that gets fetched. + ASSERT_SOME(os::write(commandPath, COMMAND_SCRIPT)); + + CommandInfo::URI uri0; + uri0.set_value(commandPath); + uri0.set_executable(true); + uri0.set_cache(true); + + CommandInfo commandInfo0; + commandInfo0.set_value("./" + command0); + commandInfo0.add_uris()->CopyFrom(uri0); + + const Task task0 = launchTask(commandInfo0, 0); + + AWAIT_READY(awaitFinished(task0)); + + // Check that the task succeeded. + EXPECT_TRUE(isExecutable( + path::join(task0.runDirectory.value, commandFilename0))); + EXPECT_TRUE(os::exists( + path::join(task0.runDirectory.value, COMMAND_NAME + taskName(0)))); + + AWAIT_READY(fetcherInfo0); + + EXPECT_EQ(1, fetcherInfo0.get().items_size()); + EXPECT_EQ(FetcherInfo::Item::DOWNLOAD_AND_CACHE, + fetcherInfo0.get().items(0).action()); + + // We have put a file of size 'COMMAND_SCRIPT.size()' in the cache + // with space 'COMMAND_SCRIPT.size() + growth'. So we must have 'growth' + // space left. + CHECK_EQ(Bytes(growth), fetcherProcess->availableCacheSpace()); + + EXPECT_EQ(1u, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); + + + // Task 1: + + const string commandFilename1 = "cmd1"; + const string command1 = commandFilename1 + " " + taskName(1); + + commandPath = path::join(assetsDirectory, commandFilename1); + + // Write the command into the script that gets fetched. Add 'growth' + // extra characters so the cache will fill up to the last byte. + ASSERT_SOME(os::write( + commandPath, + COMMAND_SCRIPT + std::string(growth, '\n'))); + + CommandInfo::URI uri1; + uri1.set_value(commandPath); + uri1.set_executable(true); + uri1.set_cache(true); + + CommandInfo commandInfo1; + commandInfo1.set_value("./" + command1); + commandInfo1.add_uris()->CopyFrom(uri1); + + const Task task1 = launchTask(commandInfo1, 1); + + AWAIT_READY(awaitFinished(task1)); + + // Check that the task succeeded. + EXPECT_TRUE(isExecutable( + path::join(task1.runDirectory.value, commandFilename1))); + EXPECT_TRUE(os::exists( + path::join(task1.runDirectory.value, COMMAND_NAME + taskName(1)))); + + AWAIT_READY(fetcherInfo1); + + EXPECT_EQ(1, fetcherInfo1.get().items_size()); + EXPECT_EQ(FetcherInfo::Item::DOWNLOAD_AND_CACHE, + fetcherInfo1.get().items(0).action()); + + // The cache must now be full. + CHECK_EQ(Bytes(0u), fetcherProcess->availableCacheSpace()); + + EXPECT_EQ(1u, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); + + + // Task 2: + + const string commandFilename2 = "cmd2"; + const string command2 = commandFilename2 + " " + taskName(2); + + commandPath = path::join(assetsDirectory, commandFilename2); + + // Write the command into the script that gets fetched. Add + // '2 * growth' now. Thus the file will be so big that it will not + // fit into the cache any more. + ASSERT_SOME(os::write( + commandPath, + COMMAND_SCRIPT + std::string(2 * growth, '\n'))); + + CommandInfo::URI uri2; + uri2.set_value(commandPath); + uri2.set_executable(true); + uri2.set_cache(true); + + CommandInfo commandInfo2; + commandInfo2.set_value("./" + command2); + commandInfo2.add_uris()->CopyFrom(uri2); + + const Task task2 = launchTask(commandInfo2, 2); + + AWAIT_READY(awaitFinished(task2)); + + // Check that the task succeeded. + EXPECT_TRUE(isExecutable( + path::join(task2.runDirectory.value, commandFilename2))); + EXPECT_TRUE(os::exists( + path::join(task2.runDirectory.value, COMMAND_NAME + taskName(2)))); + + AWAIT_READY(fetcherInfo2); + + EXPECT_EQ(1, fetcherInfo2.get().items_size()); + EXPECT_EQ(FetcherInfo::Item::BYPASS_CACHE, + fetcherInfo2.get().items(0).action()); + + EXPECT_EQ(1u, fetcherProcess->cacheSize()); + EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); + EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); +} + +} // namespace tests { +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/fetcher_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp index 4549e6a..361d918 100644 --- a/src/tests/fetcher_tests.cpp +++ b/src/tests/fetcher_tests.cpp @@ -48,314 +48,154 @@ #include "tests/mesos.hpp" #include "tests/utils.hpp" +using namespace mesos::slave; + using namespace process; -using process::Subprocess; -using process::Future; +using mesos::fetcher::FetcherInfo; using mesos::internal::slave::Fetcher; -using std::string; +using process::Subprocess; +using process::Future; + using std::map; +using std::string; -using mesos::fetcher::FetcherInfo; namespace mesos { namespace internal { namespace tests { -class FetcherEnvironmentTest : public ::testing::Test {}; +class FetcherTest : public TemporaryDirectoryTest {}; -TEST_F(FetcherEnvironmentTest, Simple) +TEST_F(FetcherTest, FileURI) { - CommandInfo commandInfo; - CommandInfo::URI* uri = commandInfo.add_uris(); - uri->set_value("hdfs:///uri"); - uri->set_executable(false); + string fromDir = path::join(os::getcwd(), "from"); + ASSERT_SOME(os::mkdir(fromDir)); + string testFile = path::join(fromDir, "test"); + EXPECT_SOME(os::write(testFile, "data")); - string directory = "/tmp/directory"; - Option<string> user = "user"; + string localFile = path::join(os::getcwd(), "test"); + EXPECT_FALSE(os::exists(localFile)); slave::Flags flags; - flags.frameworks_home = "/tmp/frameworks"; - flags.hadoop_home = "/tmp/hadoop"; - - map<string, string> environment = - Fetcher::environment(commandInfo, directory, user, flags); - - EXPECT_EQ(2u, environment.size()); - - EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]); - - Try<JSON::Object> parse = - JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]); - ASSERT_SOME(parse); - - Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get()); - ASSERT_SOME(fetcherInfo); - - EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), - stringify(JSON::Protobuf(fetcherInfo.get().command_info()))); - EXPECT_EQ(directory, fetcherInfo.get().work_directory()); - EXPECT_EQ(user.get(), fetcherInfo.get().user()); - EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home()); -} + flags.launcher_dir = path::join(tests::flags.build_dir, "src"); + ContainerID containerId; + containerId.set_value(UUID::random().toString()); -TEST_F(FetcherEnvironmentTest, MultipleURIs) -{ CommandInfo commandInfo; - CommandInfo::URI uri; - uri.set_value("hdfs:///uri1"); - uri.set_executable(false); - commandInfo.add_uris()->MergeFrom(uri); - uri.set_value("hdfs:///uri2"); - uri.set_executable(true); - commandInfo.add_uris()->MergeFrom(uri); - - string directory = "/tmp/directory"; - Option<string> user("user"); - - slave::Flags flags; - flags.frameworks_home = "/tmp/frameworks"; - flags.hadoop_home = "/tmp/hadoop"; - - map<string, string> environment = - Fetcher::environment(commandInfo, directory, user, flags); - - EXPECT_EQ(2u, environment.size()); - - EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]); + CommandInfo::URI* uri = commandInfo.add_uris(); + uri->set_value("file://" + testFile); - Try<JSON::Object> parse = - JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]); - ASSERT_SOME(parse); + Fetcher fetcher; + SlaveID slaveId; - Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get()); - ASSERT_SOME(fetcherInfo); + Future<Nothing> fetch = fetcher.fetch( + containerId, commandInfo, os::getcwd(), None(), slaveId, flags); + AWAIT_READY(fetch); - EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), - stringify(JSON::Protobuf(fetcherInfo.get().command_info()))); - EXPECT_EQ(directory, fetcherInfo.get().work_directory()); - EXPECT_EQ(user.get(), fetcherInfo.get().user()); - EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home()); + EXPECT_TRUE(os::exists(localFile)); } -TEST_F(FetcherEnvironmentTest, NoUser) +// Negative test: invalid user name. Copied from FileTest, so this +// normally would succeed, but here a bogus user name is specified. +// So we check for fetch failure. +TEST_F(FetcherTest, InvalidUser) { - CommandInfo commandInfo; - CommandInfo::URI* uri = commandInfo.add_uris(); - uri->set_value("hdfs:///uri"); - uri->set_executable(false); + string fromDir = path::join(os::getcwd(), "from"); + ASSERT_SOME(os::mkdir(fromDir)); + string testFile = path::join(fromDir, "test"); + EXPECT_SOME(os::write(testFile, "data")); - string directory = "/tmp/directory"; + string localFile = path::join(os::getcwd(), "test"); + EXPECT_FALSE(os::exists(localFile)); slave::Flags flags; + flags.launcher_dir = path::join(tests::flags.build_dir, "src"); flags.frameworks_home = "/tmp/frameworks"; - flags.hadoop_home = "/tmp/hadoop"; - - map<string, string> environment = - Fetcher::environment(commandInfo, directory, None(), flags); - - EXPECT_EQ(2u, environment.size()); - - EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]); - - Try<JSON::Object> parse = - JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]); - ASSERT_SOME(parse); - - Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get()); - ASSERT_SOME(fetcherInfo); - - EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), - stringify(JSON::Protobuf(fetcherInfo.get().command_info()))); - EXPECT_EQ(directory, fetcherInfo.get().work_directory()); - EXPECT_FALSE(fetcherInfo.get().has_user()); - EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home()); -} + ContainerID containerId; + containerId.set_value(UUID::random().toString()); -TEST_F(FetcherEnvironmentTest, EmptyHadoop) -{ CommandInfo commandInfo; - CommandInfo::URI* uri = commandInfo.add_uris(); - uri->set_value("hdfs:///uri"); - uri->set_executable(false); + commandInfo.set_user(UUID::random().toString()); - string directory = "/tmp/directory"; - Option<string> user = "user"; - - slave::Flags flags; - flags.frameworks_home = "/tmp/frameworks"; - flags.hadoop_home = ""; - - map<string, string> environment = - Fetcher::environment(commandInfo, directory, user, flags); + CommandInfo::URI* uri = commandInfo.add_uris(); + uri->set_value("file://" + testFile); - EXPECT_EQ(0u, environment.count("HADOOP_HOME")); - EXPECT_EQ(1u, environment.size()); + Fetcher fetcher; + SlaveID slaveId; - Try<JSON::Object> parse = - JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]); - ASSERT_SOME(parse); + Future<Nothing> fetch = fetcher.fetch( + containerId, commandInfo, os::getcwd(), None(), slaveId, flags); + AWAIT_FAILED(fetch); - Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get()); - ASSERT_SOME(fetcherInfo); + // See FetcherProcess::fetch(), the message must mention "chown" in + // this case. + EXPECT_TRUE(strings::contains(fetch.failure(), "chown")); - EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), - stringify(JSON::Protobuf(fetcherInfo.get().command_info()))); - EXPECT_EQ(directory, fetcherInfo.get().work_directory()); - EXPECT_EQ(user.get(), fetcherInfo.get().user()); - EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home()); + EXPECT_FALSE(os::exists(localFile)); } -TEST_F(FetcherEnvironmentTest, NoHadoop) +// Negative test: URI leading to non-existing file. Copied from FileTest, +// but here the resource is missing. So we check for fetch failure. +TEST_F(FetcherTest, NonExistingFile) { - CommandInfo commandInfo; - CommandInfo::URI* uri = commandInfo.add_uris(); - uri->set_value("hdfs:///uri"); - uri->set_executable(false); - - string directory = "/tmp/directory"; - Option<string> user = "user"; + string fromDir = path::join(os::getcwd(), "from"); + ASSERT_SOME(os::mkdir(fromDir)); + string testFile = path::join(fromDir, "nonExistingFile"); slave::Flags flags; + flags.launcher_dir = path::join(tests::flags.build_dir, "src"); flags.frameworks_home = "/tmp/frameworks"; - map<string, string> environment = - Fetcher::environment(commandInfo, directory, user, flags); - - EXPECT_EQ(0u, environment.count("HADOOP_HOME")); - EXPECT_EQ(1u, environment.size()); - - Try<JSON::Object> parse = - JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]); - ASSERT_SOME(parse); - - Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get()); - ASSERT_SOME(fetcherInfo); - - EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), - stringify(JSON::Protobuf(fetcherInfo.get().command_info()))); - EXPECT_EQ(directory, fetcherInfo.get().work_directory()); - EXPECT_EQ(user.get(), fetcherInfo.get().user()); - EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home()); -} - + ContainerID containerId; + containerId.set_value(UUID::random().toString()); -TEST_F(FetcherEnvironmentTest, NoExtractNoExecutable) -{ CommandInfo commandInfo; CommandInfo::URI* uri = commandInfo.add_uris(); - uri->set_value("hdfs:///uri"); - uri->set_executable(false); - uri->set_extract(false); - - string directory = "/tmp/directory"; - Option<string> user = "user"; - - slave::Flags flags; - flags.frameworks_home = "/tmp/frameworks"; - flags.hadoop_home = "/tmp/hadoop"; - - map<string, string> environment = - Fetcher::environment(commandInfo, directory, user, flags); - - EXPECT_EQ(2u, environment.size()); - - EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]); + uri->set_value("file://" + testFile); - Try<JSON::Object> parse = - JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]); - ASSERT_SOME(parse); + Fetcher fetcher; + SlaveID slaveId; - Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get()); - ASSERT_SOME(fetcherInfo); + Future<Nothing> fetch = fetcher.fetch( + containerId, commandInfo, os::getcwd(), None(), slaveId, flags); + AWAIT_FAILED(fetch); - EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), - stringify(JSON::Protobuf(fetcherInfo.get().command_info()))); - EXPECT_EQ(directory, fetcherInfo.get().work_directory()); - EXPECT_EQ(user.get(), fetcherInfo.get().user()); - EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home()); + // See FetcherProcess::run(). + EXPECT_TRUE(strings::contains(fetch.failure(), "Failed to fetch")); } -TEST_F(FetcherEnvironmentTest, NoExtractExecutable) +// Negative test: malformed URI, missing path. +TEST_F(FetcherTest, MalformedURI) { - CommandInfo commandInfo; - CommandInfo::URI* uri = commandInfo.add_uris(); - uri->set_value("hdfs:///uri"); - uri->set_executable(true); - uri->set_extract(false); - - string directory = "/tmp/directory"; - Option<string> user = "user"; - slave::Flags flags; + flags.launcher_dir = path::join(tests::flags.build_dir, "src"); flags.frameworks_home = "/tmp/frameworks"; - flags.hadoop_home = "/tmp/hadoop"; - - map<string, string> environment = - Fetcher::environment(commandInfo, directory, user, flags); - - EXPECT_EQ(2u, environment.size()); - - EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]); - - Try<JSON::Object> parse = - JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]); - ASSERT_SOME(parse); - - Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get()); - ASSERT_SOME(fetcherInfo); - - EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), - stringify(JSON::Protobuf(fetcherInfo.get().command_info()))); - EXPECT_EQ(directory, fetcherInfo.get().work_directory()); - EXPECT_EQ(user.get(), fetcherInfo.get().user()); - EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home()); -} - - -class FetcherTest : public TemporaryDirectoryTest {}; - -TEST_F(FetcherTest, FileURI) -{ - string fromDir = path::join(os::getcwd(), "from"); - ASSERT_SOME(os::mkdir(fromDir)); - string testFile = path::join(fromDir, "test"); - EXPECT_FALSE(os::write(testFile, "data").isError()); - - string localFile = path::join(os::getcwd(), "test"); - EXPECT_FALSE(os::exists(localFile)); - - slave::Flags flags; + ContainerID containerId; + containerId.set_value(UUID::random().toString()); CommandInfo commandInfo; CommandInfo::URI* uri = commandInfo.add_uris(); - uri->set_value("file://" + testFile); - - map<string, string> environment = - Fetcher::environment(commandInfo, os::getcwd(), None(), flags); - - Try<Subprocess> fetcherSubprocess = - process::subprocess( - path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"), - environment); + uri->set_value("lala://nopath"); - ASSERT_SOME(fetcherSubprocess); - Future<Option<int>> status = fetcherSubprocess.get().status(); + Fetcher fetcher; + SlaveID slaveId; - AWAIT_READY(status); - ASSERT_SOME(status.get()); - EXPECT_EQ(0, status.get().get()); + Future<Nothing> fetch = fetcher.fetch( + containerId, commandInfo, os::getcwd(), None(), slaveId, flags); + AWAIT_FAILED(fetch); - EXPECT_TRUE(os::exists(localFile)); + // See Fetcher::basename(). + EXPECT_TRUE(strings::contains(fetch.failure(), "Malformed")); } @@ -364,31 +204,27 @@ TEST_F(FetcherTest, AbsoluteFilePath) string fromDir = path::join(os::getcwd(), "from"); ASSERT_SOME(os::mkdir(fromDir)); string testPath = path::join(fromDir, "test"); - EXPECT_FALSE(os::write(testPath, "data").isError()); + EXPECT_SOME(os::write(testPath, "data")); string localFile = path::join(os::getcwd(), "test"); EXPECT_FALSE(os::exists(localFile)); slave::Flags flags; + flags.launcher_dir = path::join(tests::flags.build_dir, "src"); + + ContainerID containerId; + containerId.set_value(UUID::random().toString()); CommandInfo commandInfo; CommandInfo::URI* uri = commandInfo.add_uris(); uri->set_value(testPath); - map<string, string> environment = - Fetcher::environment(commandInfo, os::getcwd(), None(), flags); - - Try<Subprocess> fetcherSubprocess = - process::subprocess( - path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"), - environment); - - ASSERT_SOME(fetcherSubprocess); - Future<Option<int>> status = fetcherSubprocess.get().status(); + Fetcher fetcher; + SlaveID slaveId; - AWAIT_READY(status); - ASSERT_SOME(status.get()); - EXPECT_EQ(0, status.get().get()); + Future<Nothing> fetch = fetcher.fetch( + containerId, commandInfo, os::getcwd(), None(), slaveId, flags); + AWAIT_READY(fetch); EXPECT_TRUE(os::exists(localFile)); } @@ -399,55 +235,38 @@ TEST_F(FetcherTest, RelativeFilePath) string fromDir = path::join(os::getcwd(), "from"); ASSERT_SOME(os::mkdir(fromDir)); string testPath = path::join(fromDir, "test"); - EXPECT_FALSE(os::write(testPath, "data").isError()); + EXPECT_SOME(os::write(testPath, "data")); string localFile = path::join(os::getcwd(), "test"); EXPECT_FALSE(os::exists(localFile)); slave::Flags flags; + flags.launcher_dir = path::join(tests::flags.build_dir, "src"); + + ContainerID containerId; + containerId.set_value(UUID::random().toString()); CommandInfo commandInfo; CommandInfo::URI* uri = commandInfo.add_uris(); uri->set_value("test"); - // The first run must fail, because we have not set frameworks_home yet. - - map<string, string> environment1 = - Fetcher::environment(commandInfo, os::getcwd(), None(), flags); - - Try<Subprocess> fetcherSubprocess1 = - process::subprocess( - path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"), - environment1); - - ASSERT_SOME(fetcherSubprocess1); - Future<Option<int>> status1 = fetcherSubprocess1.get().status(); + Fetcher fetcher; + SlaveID slaveId; - AWAIT_READY(status1); - ASSERT_SOME(status1.get()); + // The first run must fail, because we have not set frameworks_home yet. - // mesos-fetcher always exits with EXIT(1) on failure. - EXPECT_EQ(1, WIFEXITED(status1.get().get())); + Future<Nothing> fetch1 = fetcher.fetch( + containerId, commandInfo, os::getcwd(), None(), slaveId, flags); + AWAIT_FAILED(fetch1); EXPECT_FALSE(os::exists(localFile)); // The next run must succeed due to this flag. flags.frameworks_home = fromDir; - map<string, string> environment2 = - Fetcher::environment(commandInfo, os::getcwd(), None(), flags); - - Try<Subprocess> fetcherSubprocess2 = - process::subprocess( - path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"), - environment2); - - ASSERT_SOME(fetcherSubprocess2); - Future<Option<int>> status2 = fetcherSubprocess2.get().status(); - - AWAIT_READY(status2); - ASSERT_SOME(status2.get()); - EXPECT_EQ(0, status2.get().get()); + Future<Nothing> fetch2 = fetcher.fetch( + containerId, commandInfo, os::getcwd(), None(), slaveId, flags); + AWAIT_READY(fetch2); EXPECT_TRUE(os::exists(localFile)); } @@ -481,26 +300,22 @@ TEST_F(FetcherTest, OSNetUriTest) EXPECT_FALSE(os::exists(localFile)); slave::Flags flags; + flags.launcher_dir = path::join(tests::flags.build_dir, "src"); flags.frameworks_home = "/tmp/frameworks"; + ContainerID containerId; + containerId.set_value(UUID::random().toString()); + CommandInfo commandInfo; CommandInfo::URI* uri = commandInfo.add_uris(); uri->set_value(url); - map<string, string> environment = - Fetcher::environment(commandInfo, os::getcwd(), None(), flags); - - Try<Subprocess> fetcherSubprocess = - process::subprocess( - path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"), - environment); - - ASSERT_SOME(fetcherSubprocess); - Future<Option<int>> status = fetcherSubprocess.get().status(); + Fetcher fetcher; + SlaveID slaveId; - AWAIT_READY(status); - ASSERT_SOME(status.get()); - EXPECT_EQ(0, status.get().get()); + Future<Nothing> fetch = fetcher.fetch( + containerId, commandInfo, os::getcwd(), None(), slaveId, flags); + AWAIT_READY(fetch); EXPECT_TRUE(os::exists(localFile)); } @@ -511,32 +326,27 @@ TEST_F(FetcherTest, FileLocalhostURI) string fromDir = path::join(os::getcwd(), "from"); ASSERT_SOME(os::mkdir(fromDir)); string testFile = path::join(fromDir, "test"); - EXPECT_FALSE(os::write(testFile, "data").isError()); + EXPECT_SOME(os::write(testFile, "data")); string localFile = path::join(os::getcwd(), "test"); EXPECT_FALSE(os::exists(localFile)); slave::Flags flags; - flags.frameworks_home = "/tmp/frameworks"; + flags.launcher_dir = path::join(tests::flags.build_dir, "src"); + + ContainerID containerId; + containerId.set_value(UUID::random().toString()); CommandInfo commandInfo; CommandInfo::URI* uri = commandInfo.add_uris(); uri->set_value(path::join("file://localhost", testFile)); - map<string, string> environment = - Fetcher::environment(commandInfo, os::getcwd(), None(), flags); - - Try<Subprocess> fetcherSubprocess = - process::subprocess( - path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"), - environment); - - ASSERT_SOME(fetcherSubprocess); - Future<Option<int>> status = fetcherSubprocess.get().status(); + Fetcher fetcher; + SlaveID slaveId; - AWAIT_READY(status); - ASSERT_SOME(status.get()); - EXPECT_EQ(0, status.get().get()); + Future<Nothing> fetch = fetcher.fetch( + containerId, commandInfo, os::getcwd(), None(), slaveId, flags); + AWAIT_READY(fetch); EXPECT_TRUE(os::exists(localFile)); } @@ -561,20 +371,11 @@ TEST_F(FetcherTest, NoExtractNotExecutable) slave::Flags flags; flags.launcher_dir = path::join(tests::flags.build_dir, "src"); - Option<int> stdout = None(); - Option<int> stderr = None(); - - // Redirect mesos-fetcher output if running the tests verbosely. - if (tests::flags.verbose) { - stdout = STDOUT_FILENO; - stderr = STDERR_FILENO; - } - Fetcher fetcher; + SlaveID slaveId; Future<Nothing> fetch = fetcher.fetch( - containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr); - + containerId, commandInfo, os::getcwd(), None(), slaveId, flags); AWAIT_READY(fetch); Try<string> basename = os::basename(path.get()); @@ -611,19 +412,11 @@ TEST_F(FetcherTest, NoExtractExecutable) slave::Flags flags; flags.launcher_dir = path::join(tests::flags.build_dir, "src"); - Option<int> stdout = None(); - Option<int> stderr = None(); - - // Redirect mesos-fetcher output if running the tests verbosely. - if (tests::flags.verbose) { - stdout = STDOUT_FILENO; - stderr = STDERR_FILENO; - } - Fetcher fetcher; + SlaveID slaveId; Future<Nothing> fetch = fetcher.fetch( - containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr); + containerId, commandInfo, os::getcwd(), None(), slaveId, flags); AWAIT_READY(fetch); @@ -669,19 +462,11 @@ TEST_F(FetcherTest, ExtractNotExecutable) slave::Flags flags; flags.launcher_dir = path::join(tests::flags.build_dir, "src"); - Option<int> stdout = None(); - Option<int> stderr = None(); - - // Redirect mesos-fetcher output if running the tests verbosely. - if (tests::flags.verbose) { - stdout = STDOUT_FILENO; - stderr = STDERR_FILENO; - } - Fetcher fetcher; + SlaveID slaveId; Future<Nothing> fetch = fetcher.fetch( - containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr); + containerId, commandInfo, os::getcwd(), None(), slaveId, flags); AWAIT_READY(fetch); @@ -768,19 +553,11 @@ TEST_F(FetcherTest, HdfsURI) CommandInfo::URI* uri = commandInfo.add_uris(); uri->set_value(path::join("hdfs://localhost", testFile)); - Option<int> stdout = None(); - Option<int> stderr = None(); - - // Redirect mesos-fetcher output if running the tests verbosely. - if (tests::flags.verbose) { - stdout = STDOUT_FILENO; - stderr = STDERR_FILENO; - } - Fetcher fetcher; + SlaveID slaveId; Future<Nothing> fetch = fetcher.fetch( - containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr); + containerId, commandInfo, os::getcwd(), None(), slaveId, flags); AWAIT_READY(fetch); http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/mesos.cpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp index 1d5639c..d7a3c06 100644 --- a/src/tests/mesos.cpp +++ b/src/tests/mesos.cpp @@ -48,6 +48,7 @@ #include "tests/flags.hpp" #include "tests/mesos.hpp" +using std::list; using std::shared_ptr; using std::string; using testing::_; @@ -140,6 +141,7 @@ slave::Flags MesosTest::CreateSlaveFlags() CHECK_SOME(directory) << "Failed to create temporary directory"; flags.work_dir = directory.get(); + flags.fetcher_cache_dir = path::join(directory.get(), "fetch"); flags.launcher_dir = path::join(tests::flags.build_dir, "src"); @@ -445,6 +447,47 @@ void MockSlave::unmocked___recover(const Future<Nothing>& future) } +MockFetcherProcess::MockFetcherProcess() +{ + // Set up default behaviors, calling the original methods. + EXPECT_CALL(*this, _fetch(_, _, _, _, _, _, _)). + WillRepeatedly( + Invoke(this, &MockFetcherProcess::unmocked__fetch)); + EXPECT_CALL(*this, run(_, _, _)). + WillRepeatedly(Invoke(this, &MockFetcherProcess::unmocked_run)); +} + + +process::Future<Nothing> MockFetcherProcess::unmocked__fetch( + const list<Future<shared_ptr<Cache::Entry>>> futures, + const hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>>& + entries, + const ContainerID& containerId, + const string& sandboxDirectory, + const string& cacheDirectory, + const Option<string>& user, + const slave::Flags& flags) +{ + return slave::FetcherProcess::_fetch( + futures, + entries, + containerId, + sandboxDirectory, + cacheDirectory, + user, + flags); +} + + +process::Future<Nothing> MockFetcherProcess::unmocked_run( + const ContainerID& containerId, + const FetcherInfo& info, + const slave::Flags& flags) +{ + return slave::FetcherProcess::run(containerId, info, flags); +} + + slave::Flags ContainerizerTest<slave::MesosContainerizer>::CreateSlaveFlags() { slave::Flags flags = MesosTest::CreateSlaveFlags(); http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index ac986a0..a1c6ae4 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -30,6 +30,8 @@ #include <mesos/master/allocator.hpp> +#include <mesos/fetcher/fetcher.hpp> + #include <mesos/slave/resource_estimator.hpp> #include <process/future.hpp> @@ -44,6 +46,7 @@ #include <stout/foreach.hpp> #include <stout/gtest.hpp> #include <stout/lambda.hpp> +#include <stout/memory.hpp> #include <stout/none.hpp> #include <stout/option.hpp> #include <stout/stringify.hpp> @@ -799,6 +802,52 @@ private: }; +// Definition of a mock FetcherProcess to be used in tests with gmock. +class MockFetcherProcess : public slave::FetcherProcess +{ +public: + MockFetcherProcess(); + + virtual ~MockFetcherProcess() {} + + MOCK_METHOD7(_fetch, process::Future<Nothing>( + const std::list<process::Future<std::shared_ptr<Cache::Entry>>> + futures, + const hashmap< + CommandInfo::URI, + Option<process::Future<std::shared_ptr<Cache::Entry>>>>& + entries, + const ContainerID& containerId, + const std::string& sandboxDirectory, + const std::string& cacheDirectory, + const Option<std::string>& user, + const slave::Flags& flags)); + + process::Future<Nothing> unmocked__fetch( + const std::list<process::Future<std::shared_ptr<Cache::Entry>>> + futures, + const hashmap< + CommandInfo::URI, + Option<process::Future<std::shared_ptr<Cache::Entry>>>>& + entries, + const ContainerID& containerId, + const std::string& sandboxDirectory, + const std::string& cacheDirectory, + const Option<std::string>& user, + const slave::Flags& flags); + + MOCK_METHOD3(run, process::Future<Nothing>( + const ContainerID& containerId, + const FetcherInfo& info, + const slave::Flags& flags)); + + process::Future<Nothing> unmocked_run( + const ContainerID& containerId, + const FetcherInfo& info, + const slave::Flags& flags); +}; + + // Definition of a MockAuthozier that can be used in tests with gmock. class MockAuthorizer : public Authorizer {
