Looks like there are some warnings for some of the loop changes: ../../../3rdparty/libprocess/src/io.cpp: In lambda function: ../../../3rdparty/libprocess/src/io.cpp:75:9: warning: control reaches end of non-void function [-Wreturn-type] }(); ^ mv -f .deps/libprocess_la-io.Tpo .deps/libprocess_la-io.Plo mv -f .deps/libprocess_la-metrics.Tpo .deps/libprocess_la-metrics.Plo ../../../3rdparty/libprocess/src/http.cpp: In lambda function: ../../../3rdparty/libprocess/src/http.cpp:1414:7: warning: control reaches end of non-void function [-Wreturn-type] }, ^ ../../../3rdparty/libprocess/src/http.cpp: In lambda function: ../../../3rdparty/libprocess/src/http.cpp:1646:13: warning: control reaches end of non-void function [-Wreturn-type] }() ^
On Sun, Jan 8, 2017 at 7:27 PM, <b...@apache.org> wrote: > Used `loop` in implementation of io::read and io::write. > > Review: https://reviews.apache.org/r/54841 > > > Project: http://git-wip-us.apache.org/repos/asf/mesos/repo > Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d28c198 > Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d28c198 > Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d28c198 > > Branch: refs/heads/master > Commit: 2d28c198a5c09308825b2771483b70ac42839d16 > Parents: 9984449 > Author: Benjamin Hindman <benjamin.hind...@gmail.com> > Authored: Mon Dec 5 10:48:56 2016 -0800 > Committer: Benjamin Hindman <benjamin.hind...@gmail.com> > Committed: Sun Jan 8 19:27:02 2017 -0800 > > ---------------------------------------------------------------------- > 3rdparty/libprocess/src/io.cpp | 289 ++++++++++++++---------------------- > 1 file changed, 114 insertions(+), 175 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/mesos/blob/2d28c198/ > 3rdparty/libprocess/src/io.cpp > ---------------------------------------------------------------------- > diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io. > cpp > index d0b3ba1..8aa3576 100644 > --- a/3rdparty/libprocess/src/io.cpp > +++ b/3rdparty/libprocess/src/io.cpp > @@ -42,132 +42,114 @@ enum ReadFlags > }; > > > -void read( > - int fd, > - void* data, > - size_t size, > - ReadFlags flags, > - const std::shared_ptr<Promise<size_t>>& promise, > - const Future<short>& future) > +Future<size_t> read(int fd, void* data, size_t size, ReadFlags flags = > NONE) > { > - // Ignore this function if the read operation has been discarded. > - if (promise->future().hasDiscard()) { > - CHECK(!future.isPending()); > - promise->discard(); > - return; > - } > - > + // TODO(benh): Let the system calls do what ever they're supposed to > + // rather than return 0 here? > if (size == 0) { > - promise->set(0); > - return; > + return 0; > } > > - if (future.isDiscarded()) { > - promise->fail("Failed to poll: discarded future"); > - } else if (future.isFailed()) { > - promise->fail(future.failure()); > - } else { > - ssize_t length; > - if (flags == NONE) { > - length = os::read(fd, data, size); > - } else { // PEEK. > - // In case 'fd' is not a socket ::recv() will fail with ENOTSOCK > and the > - // error will be propagted out. > - // NOTE: We cast to `char*` here because the function prototypes on > - // Windows use `char*` instead of `void*`. > - length = net::recv(fd, (char*) data, size, MSG_PEEK); > - } > - > + return loop( > + None(), > + [=]() -> Future<Option<size_t>> { > + // Because the file descriptor is non-blocking, we call > + // read()/recv() immediately. If no data is available than > + // we'll call `poll` and block. We also observed that for some > + // combination of libev and Linux kernel versions, the poll > + // would block for non-deterministically long periods of > + // time. This may be fixed in a newer version of libev (we use > + // 3.8 at the time of writing this comment). > + ssize_t length = [=]() { > + switch (flags) { > + case PEEK: > + // In case `fd` is not a socket os::recv() will fail > + // with ENOTSOCK and the error will be returned. > + // > + // NOTE: We cast to `char*` here because the function > + // prototypes on Windows use `char*` instead of `void*`. > + return net::recv(fd, (char*) data, size, MSG_PEEK); > + case NONE: > + return os::read(fd, data, size); > + } > + }(); > + > + if (length < 0) { > #ifdef __WINDOWS__ > - int error = WSAGetLastError(); > + int error = WSAGetLastError(); > #else > - int error = errno; > + int error = errno; > #endif // __WINDOWS__ > > - if (length < 0) { > - if (net::is_restartable_error(error) || > net::is_retryable_error(error)) { > - // Restart the read operation. > - Future<short> future = > - io::poll(fd, process::io::READ).onAny( > - lambda::bind(&internal::read, > - fd, > - data, > - size, > - flags, > - promise, > - lambda::_1)); > - > - // Stop polling if a discard occurs on our future. > - promise->future().onDiscard( > - lambda::bind(&process::internal::discard<short>, > - WeakFuture<short>(future))); > - } else { > - // Error occurred. > - promise->fail(os::strerror(errno)); > - } > - } else { > - promise->set(length); > - } > - } > + if (!net::is_restartable_error(error) && > + !net::is_retryable_error(error)) { > + // TODO(benh): Confirm that `os::strerror` does the right > + // thing for `error` on Windows. > + return Failure(os::strerror(error)); > + } > + > + return None(); > + } > + > + return length; > + }, > + [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>> { > + // Restart/retry if we don't yet have a result. > + if (length.isNone()) { > + return io::poll(fd, io::READ) > + .then([](short event) -> ControlFlow<size_t> { > + CHECK_EQ(io::READ, event); > + return Continue(); > + }); > + } > + return Break(length.get()); > + }); > } > > > -void write( > - int fd, > - const void* data, > - size_t size, > - const std::shared_ptr<Promise<size_t>>& promise, > - const Future<short>& future) > +Future<size_t> write(int fd, const void* data, size_t size) > { > - // Ignore this function if the write operation has been discarded. > - if (promise->future().hasDiscard()) { > - promise->discard(); > - return; > - } > - > + // TODO(benh): Let the system calls do what ever they're supposed to > + // rather than return 0 here? > if (size == 0) { > - promise->set(0); > - return; > + return 0; > } > > - if (future.isDiscarded()) { > - promise->fail("Failed to poll: discarded future"); > - } else if (future.isFailed()) { > - promise->fail(future.failure()); > - } else { > - ssize_t length = os::write(fd, data, size); > + return loop( > + None(), > + [=]() -> Future<Option<size_t>> { > + ssize_t length = os::write(fd, data, size); > > + if (length < 0) { > #ifdef __WINDOWS__ > - int error = WSAGetLastError(); > + int error = WSAGetLastError(); > #else > - int error = errno; > + int error = errno; > #endif // __WINDOWS__ > > - if (length < 0) { > - if (net::is_restartable_error(error) || > net::is_retryable_error(error)) { > - // Restart the write operation. > - Future<short> future = > - io::poll(fd, process::io::WRITE).onAny( > - lambda::bind(&internal::write, > - fd, > - data, > - size, > - promise, > - lambda::_1)); > - > - // Stop polling if a discard occurs on our future. > - promise->future().onDiscard( > - lambda::bind(&process::internal::discard<short>, > - WeakFuture<short>(future))); > - } else { > - // Error occurred. > - promise->fail(os::strerror(errno)); > - } > - } else { > - // TODO(benh): Retry if 'length' is 0? > - promise->set(length); > - } > - } > + if (!net::is_restartable_error(error) && > + !net::is_retryable_error(error)) { > + // TODO(benh): Confirm that `os::strerror` does the right > + // thing for `error` on Windows. > + return Failure(os::strerror(error)); > + } > + > + return None(); > + } > + > + return length; > + }, > + [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>> { > + // Restart/retry if we don't yet have a result. > + if (length.isNone()) { > + return io::poll(fd, io::WRITE) > + .then([](short event) -> ControlFlow<size_t> { > + CHECK_EQ(io::WRITE, event); > + return Continue(); > + }); > + } > + return Break(length.get()); > + }); > } > > } // namespace internal { > @@ -177,32 +159,18 @@ Future<size_t> read(int fd, void* data, size_t size) > { > process::initialize(); > > - std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>()); > - > // Check the file descriptor. > Try<bool> nonblock = os::isNonblock(fd); > if (nonblock.isError()) { > // The file descriptor is not valid (e.g., has been closed). > - promise->fail( > - "Failed to check if file descriptor was non-blocking: " + > - nonblock.error()); > - return promise->future(); > + return Failure("Failed to check if file descriptor was non-blocking: > " + > + nonblock.error()); > } else if (!nonblock.get()) { > // The file descriptor is not non-blocking. > - promise->fail("Expected a non-blocking file descriptor"); > - return promise->future(); > + return Failure("Expected a non-blocking file descriptor"); > } > > - // Because the file descriptor is non-blocking, we call read() > - // immediately. The read may in turn call poll if necessary, > - // avoiding unnecessary polling. We also observed that for some > - // combination of libev and Linux kernel versions, the poll would > - // block for non-deterministically long periods of time. This may be > - // fixed in a newer version of libev (we use 3.8 at the time of > - // writing this comment). > - internal::read(fd, data, size, internal::NONE, promise, io::READ); > - > - return promise->future(); > + return internal::read(fd, data, size); > } > > > @@ -210,32 +178,19 @@ Future<size_t> write(int fd, const void* data, > size_t size) > { > process::initialize(); > > - std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>()); > - > // Check the file descriptor. > Try<bool> nonblock = os::isNonblock(fd); > if (nonblock.isError()) { > // The file descriptor is not valid (e.g., has been closed). > - promise->fail( > + return Failure( > "Failed to check if file descriptor was non-blocking: " + > nonblock.error()); > - return promise->future(); > } else if (!nonblock.get()) { > // The file descriptor is not non-blocking. > - promise->fail("Expected a non-blocking file descriptor"); > - return promise->future(); > + return Failure("Expected a non-blocking file descriptor"); > } > > - // Because the file descriptor is non-blocking, we call write() > - // immediately. The write may in turn call poll if necessary, > - // avoiding unnecessary polling. We also observed that for some > - // combination of libev and Linux kernel versions, the poll would > - // block for non-deterministically long periods of time. This may be > - // fixed in a newer version of libev (we use 3.8 at the time of > - // writing this comment). > - internal::write(fd, data, size, promise, io::WRITE); > - > - return promise->future(); > + return internal::write(fd, data, size); > } > > > @@ -280,43 +235,15 @@ Future<size_t> peek(int fd, void* data, size_t size, > size_t limit) > nonblock.error()); > } > > - std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>()); > - > - // Because the file descriptor is non-blocking, we call read() > - // immediately. The read may in turn call poll if necessary, > - // avoiding unnecessary polling. We also observed that for some > - // combination of libev and Linux kernel versions, the poll would > - // block for non-deterministically long periods of time. This may be > - // fixed in a newer version of libev (we use 3.8 at the time of > - // writing this comment). > - internal::read(fd, data, limit, internal::PEEK, promise, io::READ); > - > - // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows. > - promise->future().onAny([fd]() { os::close(fd); }); > - > - return promise->future(); > + return internal::read(fd, data, limit, internal::PEEK) > + .onAny([fd]() { > + os::close(fd); > + }); > } > > > namespace internal { > > -Future<string> _read( > - int fd, > - const std::shared_ptr<string>& buffer, > - const boost::shared_array<char>& data, > - size_t length) > -{ > - return io::read(fd, data.get(), length) > - .then([=](size_t size) -> Future<string> { > - if (size == 0) { // EOF. > - return string(*buffer); > - } > - buffer->append(data.get(), size); > - return _read(fd, buffer, data, length); > - }); > -} > - > - > Future<Nothing> splice( > int from, > int to, > @@ -392,9 +319,21 @@ Future<string> read(int fd) > std::shared_ptr<string> buffer(new string()); > boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]); > > - // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows. > - return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE) > - .onAny([fd]() { os::close(fd); }); > + return loop( > + None(), > + [=]() { > + return io::read(fd, data.get(), BUFFERED_READ_SIZE); > + }, > + [=](size_t length) -> ControlFlow<string> { > + if (length == 0) { // EOF. > + return Break(std::move(*buffer)); > + } > + buffer->append(data.get(), length); > + return Continue(); > + }) > + .onAny([fd]() { > + os::close(fd); > + }); > } > > > >