Repository: mesos Updated Branches: refs/heads/master 7d1667333 -> 388fc834c
Added a recordio::Reader for wrapping an http::Pipe::Reader. This is helpful for the scheduler library and tests. Since we currently do not have a generalized abstraction in libprocess for "streams" of asynchronous data (e.g. process::Stream<T>), we have to create a one-off wrapper here. Review: https://reviews.apache.org/r/37327 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3689a15b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3689a15b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3689a15b Branch: refs/heads/master Commit: 3689a15b9bb6c586c05a3c93704c1919f2b9a4db Parents: 4c3da3d Author: Benjamin Mahler <[email protected]> Authored: Mon Aug 10 16:09:24 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Mon Aug 10 17:41:19 2015 -0700 ---------------------------------------------------------------------- src/Makefile.am | 2 + src/common/recordio.hpp | 217 +++++++++++++++++++++++++++++++ src/tests/common/recordio_tests.cpp | 163 +++++++++++++++++++++++ 3 files changed, 382 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3689a15b/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 85c9160..07502f0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -654,6 +654,7 @@ libmesos_no_3rdparty_la_SOURCES += \ common/http.hpp \ common/parse.hpp \ common/protobuf_utils.hpp \ + common/recordio.hpp \ common/resources_utils.hpp \ common/status_utils.hpp \ credentials/credentials.hpp \ @@ -1612,6 +1613,7 @@ mesos_tests_SOURCES = \ tests/values_tests.cpp \ tests/zookeeper_url_tests.cpp \ tests/common/http_tests.cpp \ + tests/common/recordio_tests.cpp \ tests/containerizer/composing_containerizer_tests.cpp \ tests/containerizer/docker_containerizer_tests.cpp \ tests/containerizer/docker_tests.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/3689a15b/src/common/recordio.hpp ---------------------------------------------------------------------- diff --git a/src/common/recordio.hpp b/src/common/recordio.hpp new file mode 100644 index 0000000..64d2afb --- /dev/null +++ b/src/common/recordio.hpp @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __COMMON_RECORDIO_HPP__ +#define __COMMON_RECORDIO_HPP__ + +#include <queue> +#include <string> +#include <utility> + +#include <mesos/mesos.hpp> + +#include <process/defer.hpp> +#include <process/dispatch.hpp> +#include <process/http.hpp> +#include <process/owned.hpp> +#include <process/process.hpp> + +#include <stout/lambda.hpp> +#include <stout/recordio.hpp> +#include <stout/result.hpp> + +namespace mesos { +namespace internal { +namespace recordio { + +namespace internal { +template <typename T> +class ReaderProcess; +} // namespace internal { + + +/** + * Provides RecordIO decoding on top of an http::Pipe::Reader. + * The caller is responsible for closing the http::Pipe::Reader + * when a failure is encountered or end-of-file is reached. + * + * TODO(bmahler): Since we currently do not have a generalized + * abstraction in libprocess for "streams" of asynchronous data + * (e.g. process::Stream<T>), we have to create a one-off wrapper + * here. In the future, this would be better expressed as "piping" + * data from a stream of raw bytes into a decoder, which yields a + * stream of typed data. + */ +template <typename T> +class Reader +{ +public: + Reader(::recordio::Decoder<T>&& decoder, + process::http::Pipe::Reader reader) + : process(new internal::ReaderProcess<T>(std::move(decoder), reader)) + { + process::spawn(process.get()); + } + + virtual ~Reader() + { + process::terminate(process.get()); + process::wait(process.get()); + } + + /** + * Returns the next piece of decoded data from the pipe. + * Returns error if an individual record could not be decoded. + * Returns none when end-of-file is reached. + * Returns failure when the pipe or decoder has failed. + */ + process::Future<Result<T>> read() + { + return process::dispatch(process.get(), &internal::ReaderProcess<T>::read); + } + +private: + process::Owned<internal::ReaderProcess<T>> process; +}; + + +namespace internal { + +template <typename T> +class ReaderProcess : public process::Process<ReaderProcess<T>> +{ +public: + ReaderProcess( + ::recordio::Decoder<T>&& _decoder, + process::http::Pipe::Reader _reader) + : decoder(_decoder), + reader(_reader), + done(false) {} + + virtual ~ReaderProcess() {} + + process::Future<Result<T>> read() + { + if (!records.empty()) { + Result<T> record = std::move(records.front()); + records.pop(); + return record; + } + + if (error.isSome()) { + return process::Failure(error.get().message); + } + + if (done) { + return None(); + } + + auto waiter = process::Owned<process::Promise<Result<T>>>( + new process::Promise<Result<T>>()); + waiters.push(std::move(waiter)); + return waiters.back()->future(); + } + +protected: + virtual void initialize() override + { + consume(); + } + + virtual void finalize() override + { + // Fail any remaining waiters. + fail("Reader is terminating"); + } + +private: + void fail(const std::string& message) + { + error = Error(message); + + while (!waiters.empty()) { + waiters.front()->fail(message); + waiters.pop(); + } + } + + void complete() + { + done = true; + + while (!waiters.empty()) { + waiters.front()->set(Result<T>::none()); + waiters.pop(); + } + } + + void consume() + { + reader.read() + .onAny(process::defer(this, &ReaderProcess::_consume, lambda::_1)); + } + + void _consume(const process::Future<std::string>& read) + { + if (!read.isReady()) { + fail("Pipe::Reader failure: " + + (read.isFailed() ? read.failure() : "discarded")); + return; + } + + // Have we reached EOF? + if (read.get().empty()) { + complete(); + return; + } + + Try<std::deque<Try<T>>> decode = decoder.decode(read.get()); + + if (decode.isError()) { + fail("Decoder failure: " + decode.error()); + return; + } + + foreach (const Try<T>& record, decode.get()) { + if (!waiters.empty()) { + waiters.front()->set(Result<T>(std::move(record))); + waiters.pop(); + } else { + records.push(std::move(record)); + } + } + + consume(); + } + + ::recordio::Decoder<T> decoder; + process::http::Pipe::Reader reader; + + std::queue<process::Owned<process::Promise<Result<T>>>> waiters; + std::queue<Result<T>> records; + + bool done; + Option<Error> error; +}; + +} // namespace internal { +} // namespace recordio { +} // namespace internal { +} // namespace mesos { + +#endif // __COMMON_RECORDIO_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/3689a15b/src/tests/common/recordio_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/common/recordio_tests.cpp b/src/tests/common/recordio_tests.cpp new file mode 100644 index 0000000..db5e5c9 --- /dev/null +++ b/src/tests/common/recordio_tests.cpp @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <gtest/gtest.h> + +#include <ostream> +#include <string> + +#include <process/gtest.hpp> + +#include <stout/recordio.hpp> +#include <stout/result.hpp> +#include <stout/strings.hpp> + +#include "common/recordio.hpp" + +using process::Future; + +using std::string; + +using namespace mesos; +using namespace mesos::internal; + + +template <typename T> +bool operator==(const Result<T>& lhs, const Result<T>& rhs) +{ + if (lhs.isNone()) { + return rhs.isNone(); + } + + if (lhs.isError()) { + return rhs.isError() && rhs.error() == lhs.error(); + } + + return rhs.isSome() && lhs.get() == rhs.get(); +} + + +template <typename T> +std::ostream& operator<<(std::ostream& out, const Result<T>& r) +{ + if (r.isNone()) { + return out << "none"; + } + + if (r.isError()) { + return out << "error(\"" << r.error() << "\")"; + } + + return out << r.get(); +} + + +TEST(RecordIOReaderTest, EndOfFile) +{ + // Write some data to the pipe so that records + // are available before any reads occur. + ::recordio::Encoder<string> encoder(strings::upper); + + string data; + + data += encoder.encode("hello"); + data += encoder.encode("world!"); + + process::http::Pipe pipe; + pipe.writer().write(data); + + internal::recordio::Reader<string> reader( + ::recordio::Decoder<string>(strings::lower), + pipe.reader()); + + AWAIT_EXPECT_EQ(Result<string>::some("hello"), reader.read()); + AWAIT_EXPECT_EQ(Result<string>::some("world!"), reader.read()); + + // Have multiple outstanding reads before we close the pipe. + Future<Result<string>> read1 = reader.read(); + Future<Result<string>> read2 = reader.read(); + + EXPECT_TRUE(read1.isPending()); + EXPECT_TRUE(read2.isPending()); + + pipe.writer().write(encoder.encode("goodbye")); + pipe.writer().close(); + + AWAIT_EXPECT_EQ(Result<string>::some("goodbye"), read1); + AWAIT_EXPECT_EQ(Result<string>::none(), read2); + + // Subsequent reads should return EOF. + AWAIT_EXPECT_EQ(Result<string>::none(), reader.read()); +} + + +TEST(RecordIOReaderTest, DecodingFailure) +{ + ::recordio::Encoder<string> encoder(strings::upper); + process::http::Pipe pipe; + + internal::recordio::Reader<string> reader( + ::recordio::Decoder<string>(strings::lower), + pipe.reader()); + + // Have multiple outstanding reads before we fail the decoder. + Future<Result<string>> read1 = reader.read(); + Future<Result<string>> read2 = reader.read(); + Future<Result<string>> read3 = reader.read(); + + // Write non-encoded data to the pipe so that the decoder fails. + pipe.writer().write(encoder.encode("encoded")); + pipe.writer().write("not encoded!\n"); + + AWAIT_EXPECT_EQ(Result<string>::some("encoded"), read1); + AWAIT_EXPECT_FAILED(read2); + AWAIT_EXPECT_FAILED(read3); + + // The reader is now in a failed state, subsequent + // writes will be dropped and all reads will fail. + pipe.writer().write(encoder.encode("encoded")); + + AWAIT_EXPECT_FAILED(reader.read()); +} + + +TEST(RecordIOReaderTest, PipeFailure) +{ + ::recordio::Encoder<string> encoder(strings::upper); + process::http::Pipe pipe; + + internal::recordio::Reader<string> reader( + ::recordio::Decoder<string>(strings::lower), + pipe.reader()); + + // Have multiple outstanding reads before we fail the writer. + Future<Result<string>> read1 = reader.read(); + Future<Result<string>> read2 = reader.read(); + Future<Result<string>> read3 = reader.read(); + + // Write a record, then fail the pipe writer! + pipe.writer().write(encoder.encode("encoded")); + pipe.writer().fail("failure"); + + AWAIT_EXPECT_EQ(Result<string>::some("encoded"), read1); + AWAIT_EXPECT_FAILED(read2); + AWAIT_EXPECT_FAILED(read3); + + // Subsequent reads should return a failure. + AWAIT_EXPECT_FAILED(reader.read()); +}
