This is an automated email from the ASF dual-hosted git repository. josephwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit f8f6720591931ee6a71e4aeffd50192f6fc96df1 Author: Joseph Wu <[email protected]> AuthorDate: Tue Nov 19 15:01:44 2019 -0800 SSL Socket: Implemented sendfile. This implements the SSL socket's sendfile method, which must read the file (unlike the zero-copy os::sendfile). This also moves a test exercising sendfile from process_tests.cpp into http_tests.cpp and parameterizes it for SSL and non-SSL. Review: https://reviews.apache.org/r/71790 --- 3rdparty/libprocess/src/ssl/openssl_socket.cpp | 50 ++++++++++++++++- 3rdparty/libprocess/src/tests/http_tests.cpp | 71 +++++++++++++++++++++++++ 3rdparty/libprocess/src/tests/process_tests.cpp | 45 ---------------- 3 files changed, 120 insertions(+), 46 deletions(-) diff --git a/3rdparty/libprocess/src/ssl/openssl_socket.cpp b/3rdparty/libprocess/src/ssl/openssl_socket.cpp index 42a1918..74f9fe2 100644 --- a/3rdparty/libprocess/src/ssl/openssl_socket.cpp +++ b/3rdparty/libprocess/src/ssl/openssl_socket.cpp @@ -20,9 +20,12 @@ #include <openssl/ssl.h> #include <openssl/err.h> +#include <algorithm> #include <atomic> #include <queue> +#include <boost/shared_array.hpp> + #include <process/io.hpp> #include <process/loop.hpp> #include <process/owned.hpp> @@ -35,6 +38,8 @@ #include <stout/unimplemented.hpp> #include <stout/unreachable.hpp> +#include <stout/os/lseek.hpp> + #include "openssl.hpp" #include "ssl/openssl_socket.hpp" @@ -490,7 +495,50 @@ Future<size_t> OpenSSLSocketImpl::send(const char* input, size_t size) Future<size_t> OpenSSLSocketImpl::sendfile( int_fd fd, off_t offset, size_t size) { - UNIMPLEMENTED; + if (dirty_shutdown) { + return Failure("Socket is shutdown"); + } + + // Hold a weak pointer since both read and write are not guaranteed to finish. + std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this)); + + Try<off_t> seek = os::lseek(fd, offset, SEEK_SET); + if (seek.isError()) { + return Failure("Failed to seek: " + seek.error()); + } + + Try<Nothing> async = io::prepare_async(fd); + if (async.isError()) { + return Failure("Failed to make FD asynchronous: " + async.error()); + } + + size_t remaining_size = size; + boost::shared_array<char> data(new char[io::BUFFERED_READ_SIZE]); + + return process::loop( + compute_thread, + [weak_self, fd, remaining_size, data]() -> Future<size_t> { + return io::read( + fd, data.get(), std::min(io::BUFFERED_READ_SIZE, remaining_size)) + .then([weak_self, data](size_t read_bytes) -> Future<size_t> { + std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); + if (self == nullptr) { + return Failure("Socket destroyed while sending file"); + } + + return self->send(data.get(), read_bytes); + }); + }, + [size, &remaining_size](size_t written) mutable + -> Future<ControlFlow<size_t>> { + remaining_size -= written; + + if (remaining_size > 0) { + return Continue(); + } + + return Break(size); + }); } diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp index 1433f3d..b906b3c 100644 --- a/3rdparty/libprocess/src/tests/http_tests.cpp +++ b/3rdparty/libprocess/src/tests/http_tests.cpp @@ -51,6 +51,8 @@ #include <stout/os.hpp> #include <stout/stringify.hpp> +#include <stout/os/write.hpp> + #include <stout/tests/utils.hpp> #include "encoder.hpp" @@ -887,6 +889,75 @@ TEST_P(HTTPTest, StreamingGetFailure) } +class FileServerProcess : public Process<FileServerProcess> +{ +public: + explicit FileServerProcess(const string& _path) + : path(_path) {} + +protected: + void initialize() override + { + provide("", path); + } + + const string path; +}; + + +class FileServer +{ +public: + FileServer(const string& path) : process(new FileServerProcess(path)) + { + spawn(process.get()); + } + + ~FileServer() + { + terminate(process.get()); + wait(process.get()); + } + + Owned<FileServerProcess> process; +}; + + +TEST_P(HTTPTest, ProvideSendfile) +{ + // A file smaller than the buffered read size. + const string LOREM_IPSUM = + "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do " + "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad " + "minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip " + "ex ea commodo consequat. Duis aute irure dolor in reprehenderit in " + "voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur " + "sint occaecat cupidatat non proident, sunt in culpa qui officia " + "deserunt mollit anim id est laborum."; + + const string path = path::join(sandbox.get(), "lorem.txt"); + ASSERT_SOME(os::write(path, LOREM_IPSUM)); + + FileServer server(path); + + Future<http::Response> response = + http::get(server.process->self(), None(), None(), None(), GetParam()); + + AWAIT_READY(response); + ASSERT_EQ(LOREM_IPSUM, response->body); + + // A file significantly larger than the buffered read size. + const string LOREM_IPSUM_AND_JUNK = LOREM_IPSUM + string(1024 * 1024, 'A'); + ASSERT_SOME(os::write(path, LOREM_IPSUM_AND_JUNK)); + + response = + http::get(server.process->self(), None(), None(), None(), GetParam()); + + AWAIT_READY(response); + ASSERT_EQ(LOREM_IPSUM_AND_JUNK, response->body); +} + + TEST_P(HTTPTest, PipeEquality) { // Pipes are shared objects, like Futures. Copies are considered diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp index 05dc5ec..42295a6 100644 --- a/3rdparty/libprocess/src/tests/process_tests.cpp +++ b/3rdparty/libprocess/src/tests/process_tests.cpp @@ -1657,51 +1657,6 @@ TEST_F(ProcessTest, Async) } -class FileServer : public Process<FileServer> -{ -public: - explicit FileServer(const string& _path) - : path(_path) {} - - void initialize() override - { - provide("", path); - } - - const string path; -}; - - -TEST_F(ProcessTest, Provide) -{ - const string LOREM_IPSUM = - "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do " - "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad " - "minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip " - "ex ea commodo consequat. Duis aute irure dolor in reprehenderit in " - "voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur " - "sint occaecat cupidatat non proident, sunt in culpa qui officia " - "deserunt mollit anim id est laborum."; - - const string path = path::join(sandbox.get(), "lorem.txt"); - ASSERT_SOME(os::write(path, LOREM_IPSUM)); - - FileServer server(path); - PID<FileServer> pid = spawn(server); - - Future<http::Response> response = http::get(pid); - - AWAIT_READY(response); - - ASSERT_EQ(LOREM_IPSUM, response->body); - - terminate(server); - wait(server); - - ASSERT_SOME(os::rmdir(path)); -} - - static int baz(string s) { return 42; }
