Repository: mesos
Updated Branches:
  refs/heads/master 7d1667333 -> 388fc834c


Added a recordio::Reader for wrapping an http::Pipe::Reader.

This is helpful for the scheduler library and tests. Since we
currently do not have a generalized abstraction in libprocess
for "streams" of asynchronous data (e.g. process::Stream<T>),
we have to create a one-off wrapper here.

Review: https://reviews.apache.org/r/37327


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3689a15b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3689a15b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3689a15b

Branch: refs/heads/master
Commit: 3689a15b9bb6c586c05a3c93704c1919f2b9a4db
Parents: 4c3da3d
Author: Benjamin Mahler <[email protected]>
Authored: Mon Aug 10 16:09:24 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Mon Aug 10 17:41:19 2015 -0700

----------------------------------------------------------------------
 src/Makefile.am                     |   2 +
 src/common/recordio.hpp             | 217 +++++++++++++++++++++++++++++++
 src/tests/common/recordio_tests.cpp | 163 +++++++++++++++++++++++
 3 files changed, 382 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3689a15b/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 85c9160..07502f0 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -654,6 +654,7 @@ libmesos_no_3rdparty_la_SOURCES +=                          
        \
        common/http.hpp                                                 \
        common/parse.hpp                                                \
        common/protobuf_utils.hpp                                       \
+       common/recordio.hpp                                             \
        common/resources_utils.hpp                                      \
        common/status_utils.hpp                                         \
        credentials/credentials.hpp                                     \
@@ -1612,6 +1613,7 @@ mesos_tests_SOURCES =                                     
        \
   tests/values_tests.cpp                                       \
   tests/zookeeper_url_tests.cpp                                        \
   tests/common/http_tests.cpp                                  \
+  tests/common/recordio_tests.cpp                              \
   tests/containerizer/composing_containerizer_tests.cpp                \
   tests/containerizer/docker_containerizer_tests.cpp           \
   tests/containerizer/docker_tests.cpp                         \

http://git-wip-us.apache.org/repos/asf/mesos/blob/3689a15b/src/common/recordio.hpp
----------------------------------------------------------------------
diff --git a/src/common/recordio.hpp b/src/common/recordio.hpp
new file mode 100644
index 0000000..64d2afb
--- /dev/null
+++ b/src/common/recordio.hpp
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __COMMON_RECORDIO_HPP__
+#define __COMMON_RECORDIO_HPP__
+
+#include <queue>
+#include <string>
+#include <utility>
+
+#include <mesos/mesos.hpp>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/http.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/recordio.hpp>
+#include <stout/result.hpp>
+
+namespace mesos {
+namespace internal {
+namespace recordio {
+
+namespace internal {
+template <typename T>
+class ReaderProcess;
+} // namespace internal {
+
+
+/**
+ * Provides RecordIO decoding on top of an http::Pipe::Reader.
+ * The caller is responsible for closing the http::Pipe::Reader
+ * when a failure is encountered or end-of-file is reached.
+ *
+ * TODO(bmahler): Since we currently do not have a generalized
+ * abstraction in libprocess for "streams" of asynchronous data
+ * (e.g. process::Stream<T>), we have to create a one-off wrapper
+ * here. In the future, this would be better expressed as "piping"
+ * data from a stream of raw bytes into a decoder, which yields a
+ * stream of typed data.
+ */
+template <typename T>
+class Reader
+{
+public:
+  Reader(::recordio::Decoder<T>&& decoder,
+         process::http::Pipe::Reader reader)
+    : process(new internal::ReaderProcess<T>(std::move(decoder), reader))
+  {
+    process::spawn(process.get());
+  }
+
+  virtual ~Reader()
+  {
+    process::terminate(process.get());
+    process::wait(process.get());
+  }
+
+  /**
+   * Returns the next piece of decoded data from the pipe.
+   * Returns error if an individual record could not be decoded.
+   * Returns none when end-of-file is reached.
+   * Returns failure when the pipe or decoder has failed.
+   */
+  process::Future<Result<T>> read()
+  {
+    return process::dispatch(process.get(), &internal::ReaderProcess<T>::read);
+  }
+
+private:
+  process::Owned<internal::ReaderProcess<T>> process;
+};
+
+
+namespace internal {
+
+template <typename T>
+class ReaderProcess : public process::Process<ReaderProcess<T>>
+{
+public:
+  ReaderProcess(
+      ::recordio::Decoder<T>&& _decoder,
+      process::http::Pipe::Reader _reader)
+    : decoder(_decoder),
+      reader(_reader),
+      done(false) {}
+
+  virtual ~ReaderProcess() {}
+
+  process::Future<Result<T>> read()
+  {
+    if (!records.empty()) {
+      Result<T> record = std::move(records.front());
+      records.pop();
+      return record;
+    }
+
+    if (error.isSome()) {
+      return process::Failure(error.get().message);
+    }
+
+    if (done) {
+      return None();
+    }
+
+    auto waiter = process::Owned<process::Promise<Result<T>>>(
+        new process::Promise<Result<T>>());
+    waiters.push(std::move(waiter));
+    return waiters.back()->future();
+  }
+
+protected:
+  virtual void initialize() override
+  {
+    consume();
+  }
+
+  virtual void finalize() override
+  {
+    // Fail any remaining waiters.
+    fail("Reader is terminating");
+  }
+
+private:
+  void fail(const std::string& message)
+  {
+    error = Error(message);
+
+    while (!waiters.empty()) {
+      waiters.front()->fail(message);
+      waiters.pop();
+    }
+  }
+
+  void complete()
+  {
+    done = true;
+
+    while (!waiters.empty()) {
+      waiters.front()->set(Result<T>::none());
+      waiters.pop();
+    }
+  }
+
+  void consume()
+  {
+    reader.read()
+      .onAny(process::defer(this, &ReaderProcess::_consume, lambda::_1));
+  }
+
+  void _consume(const process::Future<std::string>& read)
+  {
+    if (!read.isReady()) {
+      fail("Pipe::Reader failure: " +
+           (read.isFailed() ? read.failure() : "discarded"));
+      return;
+    }
+
+    // Have we reached EOF?
+    if (read.get().empty()) {
+      complete();
+      return;
+    }
+
+    Try<std::deque<Try<T>>> decode = decoder.decode(read.get());
+
+    if (decode.isError()) {
+      fail("Decoder failure: " + decode.error());
+      return;
+    }
+
+    foreach (const Try<T>& record, decode.get()) {
+      if (!waiters.empty()) {
+        waiters.front()->set(Result<T>(std::move(record)));
+        waiters.pop();
+      } else {
+        records.push(std::move(record));
+      }
+    }
+
+    consume();
+  }
+
+  ::recordio::Decoder<T> decoder;
+  process::http::Pipe::Reader reader;
+
+  std::queue<process::Owned<process::Promise<Result<T>>>> waiters;
+  std::queue<Result<T>> records;
+
+  bool done;
+  Option<Error> error;
+};
+
+} // namespace internal {
+} // namespace recordio {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __COMMON_RECORDIO_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/3689a15b/src/tests/common/recordio_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/common/recordio_tests.cpp 
b/src/tests/common/recordio_tests.cpp
new file mode 100644
index 0000000..db5e5c9
--- /dev/null
+++ b/src/tests/common/recordio_tests.cpp
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include <ostream>
+#include <string>
+
+#include <process/gtest.hpp>
+
+#include <stout/recordio.hpp>
+#include <stout/result.hpp>
+#include <stout/strings.hpp>
+
+#include "common/recordio.hpp"
+
+using process::Future;
+
+using std::string;
+
+using namespace mesos;
+using namespace mesos::internal;
+
+
+template <typename T>
+bool operator==(const Result<T>& lhs, const Result<T>& rhs)
+{
+  if (lhs.isNone()) {
+    return rhs.isNone();
+  }
+
+  if (lhs.isError()) {
+    return rhs.isError() && rhs.error() == lhs.error();
+  }
+
+  return rhs.isSome() && lhs.get() == rhs.get();
+}
+
+
+template <typename T>
+std::ostream& operator<<(std::ostream& out, const Result<T>& r)
+{
+  if (r.isNone()) {
+    return out << "none";
+  }
+
+  if (r.isError()) {
+    return out << "error(\"" << r.error() << "\")";
+  }
+
+  return out << r.get();
+}
+
+
+TEST(RecordIOReaderTest, EndOfFile)
+{
+  // Write some data to the pipe so that records
+  // are available before any reads occur.
+  ::recordio::Encoder<string> encoder(strings::upper);
+
+  string data;
+
+  data += encoder.encode("hello");
+  data += encoder.encode("world!");
+
+  process::http::Pipe pipe;
+  pipe.writer().write(data);
+
+  internal::recordio::Reader<string> reader(
+      ::recordio::Decoder<string>(strings::lower),
+      pipe.reader());
+
+  AWAIT_EXPECT_EQ(Result<string>::some("hello"), reader.read());
+  AWAIT_EXPECT_EQ(Result<string>::some("world!"), reader.read());
+
+  // Have multiple outstanding reads before we close the pipe.
+  Future<Result<string>> read1 = reader.read();
+  Future<Result<string>> read2 = reader.read();
+
+  EXPECT_TRUE(read1.isPending());
+  EXPECT_TRUE(read2.isPending());
+
+  pipe.writer().write(encoder.encode("goodbye"));
+  pipe.writer().close();
+
+  AWAIT_EXPECT_EQ(Result<string>::some("goodbye"), read1);
+  AWAIT_EXPECT_EQ(Result<string>::none(), read2);
+
+  // Subsequent reads should return EOF.
+  AWAIT_EXPECT_EQ(Result<string>::none(), reader.read());
+}
+
+
+TEST(RecordIOReaderTest, DecodingFailure)
+{
+  ::recordio::Encoder<string> encoder(strings::upper);
+  process::http::Pipe pipe;
+
+  internal::recordio::Reader<string> reader(
+      ::recordio::Decoder<string>(strings::lower),
+      pipe.reader());
+
+  // Have multiple outstanding reads before we fail the decoder.
+  Future<Result<string>> read1 = reader.read();
+  Future<Result<string>> read2 = reader.read();
+  Future<Result<string>> read3 = reader.read();
+
+  // Write non-encoded data to the pipe so that the decoder fails.
+  pipe.writer().write(encoder.encode("encoded"));
+  pipe.writer().write("not encoded!\n");
+
+  AWAIT_EXPECT_EQ(Result<string>::some("encoded"), read1);
+  AWAIT_EXPECT_FAILED(read2);
+  AWAIT_EXPECT_FAILED(read3);
+
+  // The reader is now in a failed state, subsequent
+  // writes will be dropped and all reads will fail.
+  pipe.writer().write(encoder.encode("encoded"));
+
+  AWAIT_EXPECT_FAILED(reader.read());
+}
+
+
+TEST(RecordIOReaderTest, PipeFailure)
+{
+  ::recordio::Encoder<string> encoder(strings::upper);
+  process::http::Pipe pipe;
+
+  internal::recordio::Reader<string> reader(
+      ::recordio::Decoder<string>(strings::lower),
+      pipe.reader());
+
+  // Have multiple outstanding reads before we fail the writer.
+  Future<Result<string>> read1 = reader.read();
+  Future<Result<string>> read2 = reader.read();
+  Future<Result<string>> read3 = reader.read();
+
+  // Write a record, then fail the pipe writer!
+  pipe.writer().write(encoder.encode("encoded"));
+  pipe.writer().fail("failure");
+
+  AWAIT_EXPECT_EQ(Result<string>::some("encoded"), read1);
+  AWAIT_EXPECT_FAILED(read2);
+  AWAIT_EXPECT_FAILED(read3);
+
+  // Subsequent reads should return a failure.
+  AWAIT_EXPECT_FAILED(reader.read());
+}

Reply via email to