Introduced 'recordio' encoding facilities to stout. Note that most "Record-IO" encodings are used for file I/O and consequently use a fixed-size header to encode the record length. However, decoding a base-10 integer is more straightforward to implement in most languages, and so this was chosen instead. (Note that the Twitter streaming API uses the same technique for portability).
Review: https://reviews.apache.org/r/36677 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d9a81ede Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d9a81ede Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d9a81ede Branch: refs/heads/master Commit: d9a81edee140fc43f56c4370cae1696b726fd66e Parents: 1e83bda Author: Benjamin Mahler <[email protected]> Authored: Tue Jul 21 22:54:02 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Fri Jul 24 14:44:37 2015 -0700 ---------------------------------------------------------------------- .../3rdparty/stout/include/Makefile.am | 1 + .../3rdparty/stout/include/stout/protobuf.hpp | 5 + .../3rdparty/stout/include/stout/recordio.hpp | 168 +++++++++++++++++++ .../3rdparty/stout/tests/recordio_tests.cpp | 161 ++++++++++++++++++ 4 files changed, 335 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/d9a81ede/3rdparty/libprocess/3rdparty/stout/include/Makefile.am ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/3rdparty/stout/include/Makefile.am b/3rdparty/libprocess/3rdparty/stout/include/Makefile.am index 2394b95..5c19e3e 100644 --- a/3rdparty/libprocess/3rdparty/stout/include/Makefile.am +++ b/3rdparty/libprocess/3rdparty/stout/include/Makefile.am @@ -73,6 +73,7 @@ nobase_include_HEADERS = \ stout/preprocessor.hpp \ stout/proc.hpp \ stout/protobuf.hpp \ + stout/recordio.hpp \ stout/result.hpp \ stout/set.hpp \ stout/some.hpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/d9a81ede/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp index 8c75f6b..a7de91f 100644 --- a/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp +++ b/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp @@ -45,6 +45,11 @@ namespace protobuf { +// TODO(bmahler): Re-use stout's 'recordio' facilities here. Note +// that these use a fixed size length header, whereas stout's +// currently uses a base-10 newline delimited header for language +// portability, which makes changing these a bit tricky. + // Write out the given protobuf to the specified file descriptor by // first writing out the length of the protobuf followed by the // contents. http://git-wip-us.apache.org/repos/asf/mesos/blob/d9a81ede/3rdparty/libprocess/3rdparty/stout/include/stout/recordio.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/recordio.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/recordio.hpp new file mode 100644 index 0000000..e8a6217 --- /dev/null +++ b/3rdparty/libprocess/3rdparty/stout/include/stout/recordio.hpp @@ -0,0 +1,168 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __STOUT_RECORDIO_HPP__ +#define __STOUT_RECORDIO_HPP__ + +#include <stdlib.h> + +#include <deque> +#include <functional> +#include <string> + +#include <stout/check.hpp> +#include <stout/foreach.hpp> +#include <stout/numify.hpp> +#include <stout/option.hpp> +#include <stout/stringify.hpp> +#include <stout/try.hpp> + +/** + * Provides facilities for "Record-IO" encoding of data. + * "Record-IO" encoding allows one to encode a sequence + * of variable-length records by prefixing each record + * with its size in bytes: + * + * 5\n + * hello + * 6\n + * world! + * + * Note that this currently only supports record lengths + * encoded as base 10 integer values with newlines as a + * delimiter. This is to provide better language portability + * portability: parsing a base 10 integer is simple. Most + * other "Record-IO" implementations use a fixed-size header + * of 4 bytes to directly encode an unsigned 32 bit length. + * + * TODO(bmahler): Move this to libprocess and support async + * consumption of data. + */ +namespace recordio { + +/** + * Given an encoding function for individual records, this + * provides encoding from typed records into "Record-IO" data. + */ +template <typename T> +class Encoder +{ +public: + Encoder(std::function<std::string(const T&)> _serialize) + : serialize(_serialize) {} + + /** + * Returns the "Record-IO" encoded record. + */ + std::string encode(const T& record) const + { + std::string s = serialize(record); + return stringify(s.size()) + "\n" + s; + } + +private: + std::function<std::string(const T&)> serialize; +}; + + +/** + * Given a decoding function for individual records, this + * provides decoding from "Record-IO" data into typed records. + */ +template <typename T> +class Decoder +{ +public: + Decoder(std::function<Try<T>(const std::string&)> _deserialize) + : state(HEADER), deserialize(_deserialize) {} + + /** + * Decodes another chunk of data from the "Record-IO" stream + * and returns the attempted decoding of any additional + * complete records. + * + * Returns an Error if the data contains an invalid length + * header, at which point the decoder will return Error for + * all subsequent calls. + */ + Try<std::deque<Try<T>>> decode(const std::string& data) + { + if (state == FAILED) { + return Error("Decoder is in a FAILED state"); + } + + std::deque<Try<T>> records; + + foreach (char c, data) { + if (state == HEADER) { + // Keep reading until we have the entire header. + if (c != '\n') { + buffer += c; + continue; + } + + Try<size_t> numify = ::numify<size_t>(buffer); + + // If we were unable to decode the length header, do not + // continue decoding since we cannot determine where to + // pick up the next length header! + if (numify.isError()) { + state = FAILED; + return Error("Failed to decode length '" + buffer + "': " + + numify.error()); + } + + length = numify.get(); + buffer.clear(); + state = RECORD; + + // Note that for 0 length records, we immediately decode. + if (numify.get() <= 0) { + records.push_back(deserialize(buffer)); + state = HEADER; + } + } else if (state == RECORD) { + CHECK_SOME(length); + CHECK_LT(buffer.size(), length.get()); + + buffer += c; + + if (buffer.size() == length.get()) { + records.push_back(deserialize(buffer)); + buffer.clear(); + state = HEADER; + } + } + } + + return records; + } + +private: + enum { + HEADER, + RECORD, + FAILED + } state; + + // TODO(bmahler): Avoid string here as it will not free + // its underlying memory allocation when we clear it. + std::string buffer; + Option<size_t> length; + + std::function<Try<T>(const std::string&)> deserialize; +}; + +} // namespace recordio { + +#endif // __STOUT_RECORDIO_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/d9a81ede/3rdparty/libprocess/3rdparty/stout/tests/recordio_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/3rdparty/stout/tests/recordio_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/recordio_tests.cpp new file mode 100644 index 0000000..d036785 --- /dev/null +++ b/3rdparty/libprocess/3rdparty/stout/tests/recordio_tests.cpp @@ -0,0 +1,161 @@ +/** +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +#include <deque> +#include <string> + +#include <gtest/gtest.h> + +#include <stout/error.hpp> +#include <stout/gtest.hpp> +#include <stout/recordio.hpp> +#include <stout/some.hpp> +#include <stout/strings.hpp> +#include <stout/try.hpp> + +using std::deque; +using std::string; + + +template <typename T> +bool operator == (Try<T> lhs, Try<T> rhs) +{ + if (lhs.isSome() != rhs.isSome()) { + return false; + } + + if (lhs.isSome()) { + return lhs.get() == rhs.get(); + } + + return lhs.error() == rhs.error(); +} + + +template <typename T> +bool operator != (Try<T> lhs, Try<T> rhs) +{ + return !(lhs == rhs); +} + + +template <typename T> +bool operator == (deque<T> rhs, deque<T> lhs) +{ + if (rhs.size() != lhs.size()) { + return false; + } + + auto it1 = rhs.begin(); + auto it2 = lhs.begin(); + + while (it1 != rhs.end()) { + if (*it1 != *it2) { + return false; + } + + ++it1; + ++it2; + } + + return true; +} + + +TEST(RecordIOTest, Encoder) +{ + recordio::Encoder<string> encoder(strings::upper); + + string data; + + data += encoder.encode("hello!"); + data += encoder.encode(""); + data += encoder.encode(" "); + data += encoder.encode("13 characters"); + + EXPECT_EQ( + "6\nHELLO!" + "0\n" + "1\n " + "13\n13 CHARACTERS", + data); + + // Make sure these can be decoded. + recordio::Decoder<string> decoder( + [=](const string& data) { + return Try<string>(strings::lower(data)); + }); + + deque<Try<string>> records; + records.push_back("hello!"); + records.push_back(""); + records.push_back(" "); + records.push_back("13 characters"); + + EXPECT_SOME_EQ(records, decoder.decode(data)); +} + + +TEST(RecordIOTest, Decoder) +{ + // Deserializing brings to lower case, but add an + // error case to test deserialization failures. + auto deserialize = [](const string& data) -> Try<string> { + if (data == "error") { + return Error("error"); + } + return strings::lower(data); + }; + + recordio::Decoder<string> decoder(deserialize); + + deque<Try<string>> records; + + // Empty data should not result in an error. + records.clear(); + + EXPECT_SOME_EQ(records, decoder.decode("")); + + // Should decode more than 1 record when possible. + records.clear(); + records.push_back("hello!"); + records.push_back(""); + records.push_back(" "); + + EXPECT_SOME_EQ(records, decoder.decode("6\nHELLO!0\n1\n ")); + + // An entry which cannot be decoded should not + // fail the decoder permanently. + records.clear(); + records.push_back(Error("error")); + + EXPECT_SOME_EQ(records, decoder.decode("5\nerror")); + + // Record should only be decoded once complete. + records.clear(); + + EXPECT_SOME_EQ(records, decoder.decode("1")); + EXPECT_SOME_EQ(records, decoder.decode("3")); + EXPECT_SOME_EQ(records, decoder.decode("\n")); + EXPECT_SOME_EQ(records, decoder.decode("13 CHARACTER")); + + records.clear(); + records.push_back("13 characters"); + + EXPECT_SOME_EQ(records, decoder.decode("S")); + + // If the format is bad, the decoder should fail permanently. + EXPECT_ERROR(decoder.decode("not a number\n")); + EXPECT_ERROR(decoder.decode("1\n")); +}
