Repository: mesos Updated Branches: refs/heads/master c8a8a0403 -> fcd966bbc
Added support for RepeatedPtrField to ::protobuf::read. Review: https://reviews.apache.org/r/30111 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7633982b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7633982b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7633982b Branch: refs/heads/master Commit: 7633982b3407621e69078282ca6887b5bb46d548 Parents: c8a8a04 Author: Michael Park <[email protected]> Authored: Sun Jan 25 14:12:13 2015 -0800 Committer: Jie Yu <[email protected]> Committed: Sun Jan 25 14:12:14 2015 -0800 ---------------------------------------------------------------------- .../3rdparty/stout/include/stout/protobuf.hpp | 188 ++++++++++++------- 1 file changed, 117 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/7633982b/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp index a4d48c9..2c020d9 100644 --- a/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp +++ b/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp @@ -139,102 +139,148 @@ inline Try<Nothing> append( } -// Read the next protobuf of type T from the file by first reading the +namespace internal { + +// Reads a single message of type T from the file by first reading the // "size" followed by the contents (as written by 'write' above). -// If 'ignorePartial' is true, None() is returned when we unexpectedly -// hit EOF while reading the protobuf (e.g., partial write). -// If 'undoFailed' is true, failed read attempts will restore the file -// read/write file offset towards the initial callup position. +// NOTE: This struct is used by the public 'read' function. +// See comments there for the reason why we need this. template <typename T> -inline Result<T> read( - int fd, - bool ignorePartial = false, - bool undoFailed = false) +struct Read { - off_t offset = 0; + Result<T> operator () (int fd, bool ignorePartial, bool undoFailed) + { + off_t offset = 0; - if (undoFailed) { - // Save the offset so we can re-adjust if something goes wrong. - offset = lseek(fd, 0, SEEK_CUR); - if (offset == -1) { - return ErrnoError("Failed to lseek to SEEK_CUR"); + if (undoFailed) { + // Save the offset so we can re-adjust if something goes wrong. + offset = lseek(fd, 0, SEEK_CUR); + if (offset == -1) { + return ErrnoError("Failed to lseek to SEEK_CUR"); + } } - } - uint32_t size; - Result<std::string> result = os::read(fd, sizeof(size)); + uint32_t size; + Result<std::string> result = os::read(fd, sizeof(size)); - if (result.isError()) { - if (undoFailed) { - lseek(fd, offset, SEEK_SET); - } - return Error("Failed to read size: " + result.error()); - } else if (result.isNone()) { - return None(); // No more protobufs to read. - } else if (result.get().size() < sizeof(size)) { - // Hit EOF unexpectedly. - if (undoFailed) { - // Restore the offset to before the size read. - lseek(fd, offset, SEEK_SET); - } - if (ignorePartial) { - return None(); + if (result.isError()) { + if (undoFailed) { + lseek(fd, offset, SEEK_SET); + } + return Error("Failed to read size: " + result.error()); + } else if (result.isNone()) { + return None(); // No more protobufs to read. + } else if (result.get().size() < sizeof(size)) { + // Hit EOF unexpectedly. + if (undoFailed) { + // Restore the offset to before the size read. + lseek(fd, offset, SEEK_SET); + } + if (ignorePartial) { + return None(); + } + return Error( + "Failed to read size: hit EOF unexpectedly, possible corruption"); } - return Error( - "Failed to read size: hit EOF unexpectedly, possible corruption"); - } - // Parse the size from the bytes. - memcpy((void*) &size, (void*) result.get().data(), sizeof(size)); + // Parse the size from the bytes. + memcpy((void*) &size, (void*) result.get().data(), sizeof(size)); - // NOTE: Instead of specifically checking for corruption in 'size', - // we simply try to read 'size' bytes. If we hit EOF early, it is an - // indication of corruption. - result = os::read(fd, size); + // NOTE: Instead of specifically checking for corruption in 'size', + // we simply try to read 'size' bytes. If we hit EOF early, it is an + // indication of corruption. + result = os::read(fd, size); - if (result.isError()) { - if (undoFailed) { - // Restore the offset to before the size read. - lseek(fd, offset, SEEK_SET); - } - return Error("Failed to read message: " + result.error()); - } else if (result.isNone() || result.get().size() < size) { - // Hit EOF unexpectedly. - if (undoFailed) { - // Restore the offset to before the size read. - lseek(fd, offset, SEEK_SET); + if (result.isError()) { + if (undoFailed) { + // Restore the offset to before the size read. + lseek(fd, offset, SEEK_SET); + } + return Error("Failed to read message: " + result.error()); + } else if (result.isNone() || result.get().size() < size) { + // Hit EOF unexpectedly. + if (undoFailed) { + // Restore the offset to before the size read. + lseek(fd, offset, SEEK_SET); + } + if (ignorePartial) { + return None(); + } + return Error("Failed to read message of size " + stringify(size) + + " bytes: hit EOF unexpectedly, possible corruption"); } - if (ignorePartial) { - return None(); + + // Parse the protobuf from the string. + // NOTE: We need to capture a const reference to the data because it + // must outlive the creation of ArrayInputStream. + const std::string& data = result.get(); + + T message; + google::protobuf::io::ArrayInputStream stream(data.data(), data.size()); + + if (!message.ParseFromZeroCopyStream(&stream)) { + if (undoFailed) { + // Restore the offset to before the size read. + lseek(fd, offset, SEEK_SET); + } + return Error("Failed to deserialize message"); } - return Error("Failed to read message of size " + stringify(size) + - " bytes: hit EOF unexpectedly, possible corruption"); - } - // Parse the protobuf from the string. - // NOTE: We need to capture a const reference to the data because it - // must outlive the creation of ArrayInputStream. - const std::string& data = result.get(); + return message; + } +}; - T message; - google::protobuf::io::ArrayInputStream stream(data.data(), data.size()); - if (!message.ParseFromZeroCopyStream(&stream)) { - if (undoFailed) { - // Restore the offset to before the size read. - lseek(fd, offset, SEEK_SET); +// Partial specialization for RepeatedPtrField<T> to read a sequence +// of protobuf messages from a given fd by repeatedly invoking +// Read<T> until None is reached, which we treat as EOF. +// NOTE: This struct is used by the public 'read' function. +// See comments there for the reason why we need this. +template <typename T> +struct Read<google::protobuf::RepeatedPtrField<T>> +{ + Result<google::protobuf::RepeatedPtrField<T>> operator () ( + int fd, bool ignorePartial, bool undoFailed) + { + google::protobuf::RepeatedPtrField<T> result; + for (;;) { + Result<T> message = Read<T>()(fd, ignorePartial, undoFailed); + if (message.isError()) { + return Error(message.error()); + } else if (message.isNone()) { + break; + } else { + result.Add()->CopyFrom(message.get()); + } } - return Error("Failed to deserialize message"); + return result; } +}; - return message; +} // namespace internal { + + +// Reads the protobuf message(s) from a given fd based on the format +// written by write() above. We use partial specialization of +// - internal::Read<T> vs +// - internal::Read<google::protobuf::RepeatedPtrField<T>> +// in order to determine whether T is a single protobuf message or +// a sequence of messages. +// If 'ignorePartial' is true, None() is returned when we unexpectedly +// hit EOF while reading the protobuf (e.g., partial write). +// If 'undoFailed' is true, failed read attempts will restore the file +// read/write file offset towards the initial callup position. +template <typename T> +Result<T> read(int fd, bool ignorePartial = false, bool undoFailed = false) +{ + return internal::Read<T>()(fd, ignorePartial, undoFailed); } // A wrapper function that wraps the above read() with open and // closing the file. template <typename T> -inline Result<T> read(const std::string& path) +Result<T> read(const std::string& path) { Try<int> fd = os::open( path,
