Added an http::serve abstraction.

Review: https://reviews.apache.org/r/54115


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c33ba209
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c33ba209
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c33ba209

Branch: refs/heads/master
Commit: c33ba209d226fb91874b00976298faf278a29369
Parents: 9b34363
Author: Benjamin Hindman <benjamin.hind...@gmail.com>
Authored: Sun Nov 27 14:22:45 2016 -0800
Committer: Benjamin Hindman <benjamin.hind...@gmail.com>
Committed: Thu Dec 1 00:49:07 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp  |  54 +++
 3rdparty/libprocess/include/process/queue.hpp |   7 +
 3rdparty/libprocess/src/http.cpp              | 433 +++++++++++++++++++++
 3rdparty/libprocess/src/tests/http_tests.cpp  | 217 +++++++++++
 4 files changed, 711 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c33ba209/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp 
b/3rdparty/libprocess/include/process/http.hpp
index 7894b31..f0f849c 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -870,6 +870,60 @@ Future<Connection> connect(const network::Address& 
address);
 Future<Connection> connect(const URL& url);
 
 
+namespace internal {
+
+Future<Nothing> serve(
+    network::Socket s,
+    std::function<Future<Response>(const Request&)>&& f);
+
+} // namespace internal {
+
+
+// Serves HTTP requests on the specified socket using the specified
+// handler.
+//
+// Returns `Nothing` after serving has completed, either because (1) a
+// failure occured receiving requests or sending responses or (2) the
+// HTTP connection was not persistent (i.e., a 'Connection: close'
+// header existed either on the request or the response) or (3)
+// serving was discarded.
+//
+// Doing a `discard()` on the Future returned from `serve` will
+// discard any current socket receiving and any current socket
+// sending and shutdown the socket in both directions.
+//
+// NOTE: HTTP pipelining is automatically performed. If you don't want
+// pipelining you must explicitly sequence/serialize the requests to
+// wait for previous responses yourself.
+template <typename F>
+Future<Nothing> serve(const network::Socket& s, F&& f)
+{
+  return internal::serve(s, std::function<Future<Response>(const 
Request&)>(f));
+}
+
+
+// TODO(benh): Eventually we probably want something like a `Server`
+// that will handle accepting new sockets and then calling `serve`. It
+// would also be valuable to introduce shutdown semantics that are
+// better than the current discard semantics on `serve`. For example:
+//
+// class Server
+// {
+//   struct ShutdownOptions
+//   {
+//     // During the grace period, no new connections will
+//     // be accepted. Existing connections will be closed
+//     // when currently received requests have been handled.
+//     // The server will shut down reads on each connection
+//     // to prevent new requests from arriving.
+//     Duration gracePeriod;
+//   };
+//
+//   // Shuts down the server.
+//   Future<Nothing> shutdown(Sever::ShutdownOptions options);
+// };
+
+
 // Create a http Request from the specified parameters.
 Request createRequest(
   const UPID& upid,

http://git-wip-us.apache.org/repos/asf/mesos/blob/c33ba209/3rdparty/libprocess/include/process/queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/queue.hpp 
b/3rdparty/libprocess/include/process/queue.hpp
index 59f84a1..ab08e30 100644
--- a/3rdparty/libprocess/include/process/queue.hpp
+++ b/3rdparty/libprocess/include/process/queue.hpp
@@ -69,6 +69,13 @@ public:
     return future;
   }
 
+  size_t size() const
+  {
+    synchronized (data->lock) {
+      return data->elements.size();
+    }
+  }
+
 private:
   struct Data
   {

http://git-wip-us.apache.org/repos/asf/mesos/blob/c33ba209/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 8a10dca..5203fb3 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -31,13 +31,17 @@
 #include <tuple>
 #include <vector>
 
+#include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
 #include <process/future.hpp>
 #include <process/http.hpp>
 #include <process/id.hpp>
+#include <process/io.hpp>
+#include <process/loop.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
+#include <process/queue.hpp>
 #include <process/socket.hpp>
 
 #include <stout/foreach.hpp>
@@ -52,6 +56,7 @@
 #include <stout/try.hpp>
 
 #include "decoder.hpp"
+#include "encoder.hpp"
 
 using std::deque;
 using std::istringstream;
@@ -1388,6 +1393,434 @@ Future<Connection> connect(const URL& url)
 }
 
 
+namespace internal {
+
+Future<Nothing> send(network::Socket socket, Encoder* encoder)
+{
+  size_t* size = new size_t();
+  return [=]() {
+    switch (encoder->kind()) {
+      case Encoder::DATA: {
+        const char* data = static_cast<DataEncoder*>(encoder)->next(size);
+        return socket.send(data, *size);
+      }
+      case Encoder::FILE: {
+        off_t offset = 0;
+        int fd = static_cast<FileEncoder*>(encoder)->next(&offset, size);
+        return socket.sendfile(fd, offset, *size);
+      }
+    }
+  }()
+  .then([=](size_t length) -> Future<Nothing> {
+    // Update the encoder with the amount sent.
+    encoder->backup(*size - length);
+
+    // See if there is any more of the message to send.
+    if (encoder->remaining() != 0) {
+      return send(socket, encoder);
+    }
+
+    return Nothing();
+  })
+  .onAny([=]() {
+    delete size;
+  });
+}
+
+
+Future<Nothing> send(
+    network::Socket socket,
+    const Response& response,
+    Request* request)
+{
+  CHECK(response.type == Response::BODY ||
+        response.type == Response::NONE);
+
+  Encoder* encoder = new HttpResponseEncoder(response, *request);
+
+  return send(socket, encoder)
+    .onAny([=]() {
+      delete encoder;
+    });
+}
+
+
+Future<Nothing> sendfile(
+    network::Socket socket,
+    Response response,
+    Request* request)
+{
+  CHECK(response.type == Response::PATH);
+
+  // Make sure no body is sent (this is really an error and
+  // should be reported and no response sent.
+  response.body.clear();
+
+  Try<int> fd = os::open(response.path, O_CLOEXEC | O_NONBLOCK | O_RDONLY);
+
+  if (fd.isError()) {
+    const string body = "Failed to open '" + response.path + "': " + 
fd.error();
+    // TODO(benh): VLOG(1)?
+    // TODO(benh): Don't send error back as part of InternalServiceError?
+    // TODO(benh): Copy headers from `response`?
+    return send(socket, InternalServerError(body), request);
+  }
+
+  struct stat s; // Need 'struct' because of function named 'stat'.
+  if (fstat(fd.get(), &s) != 0) {
+    const string body =
+      "Failed to fstat '" + response.path + "': " + os::strerror(errno);
+    // TODO(benh): VLOG(1)?
+    // TODO(benh): Don't send error back as part of InternalServiceError?
+    // TODO(benh): Copy headers from `response`?
+    os::close(fd.get());
+    return send(socket, InternalServerError(body), request);
+  } else if (S_ISDIR(s.st_mode)) {
+    const string body = "'" + response.path + "' is a directory";
+    // TODO(benh): VLOG(1)?
+    // TODO(benh): Don't send error back as part of InternalServiceError?
+    // TODO(benh): Copy headers from `response`?
+    os::close(fd.get());
+    return send(socket, InternalServerError(body), request);
+  }
+
+  // While the user is expected to properly set a 'Content-Type'
+  // header, we'll fill in (or overwrite) 'Content-Length' header.
+  response.headers["Content-Length"] = stringify(s.st_size);
+
+  // TODO(benh): If this is a TCP socket consider turning on TCP_CORK
+  // for both sends and then turning it off.
+  Encoder* encoder = new HttpResponseEncoder(response, *request);
+
+  return send(socket, encoder)
+    .onAny([=](const Future<Nothing>& future) {
+      delete encoder;
+
+      // Close file descriptor if we aren't doing any more sending.
+      if (future.isDiscarded() || future.isFailed()) {
+        os::close(fd.get());
+      }
+    })
+    .then([=]() mutable {
+      // NOTE: the file descriptor gets closed by FileEncoder.
+      Encoder* encoder = new FileEncoder(fd.get(), s.st_size);
+      return send(socket, encoder)
+        .onAny([=]() {
+          delete encoder;
+        });
+    });
+}
+
+
+Future<Nothing> stream(
+    const network::Socket& socket,
+    http::Pipe::Reader reader)
+{
+  return reader.read()
+    .then([=](const string& data) mutable {
+      bool finished = false;
+
+      ostringstream out;
+
+      if (data.empty()) {
+        // Finished reading.
+        out << "0\r\n" << "\r\n";
+        finished = true;
+      } else {
+        out << std::hex << data.size() << "\r\n";
+        out << data;
+        out << "\r\n";
+      }
+
+      Encoder* encoder = new DataEncoder(out.str());
+
+      return send(socket, encoder)
+        .onAny([=]() {
+          delete encoder;
+        })
+        .then([=]() mutable -> Future<Nothing> {
+          if (!finished) {
+            return stream(socket, reader);
+          }
+
+          return Nothing();
+        });
+    });
+}
+
+
+Future<Nothing> stream(
+    const network::Socket& socket,
+    Response response,
+    Request* request)
+{
+  CHECK(response.type == Response::PIPE);
+
+  // Make sure no body is sent (this is really an error and
+  // should be reported and no response sent).
+  response.body.clear();
+
+  if (response.reader.isNone()) {
+    // This is clearly a programmer error, we don't have a reader from
+    // which to stream! We return an `InternalServerError` rather than
+    // failing just as we do in `sendfile` when we get a malformed
+    // response.
+    const string body = "Missing data to stream";
+    // TODO(benh): VLOG(1)?
+    // TODO(benh): Don't send error back as part of InternalServiceError?
+    // TODO(benh): Copy headers from `response`?
+    return send(socket, InternalServerError(body), request);
+  }
+
+  // While the user is expected to properly set a 'Content-Type'
+  // header, we'll fill in (or overwrite) 'Transfer-Encoding' header.
+  response.headers["Transfer-Encoding"] = "chunked";
+
+  Encoder* encoder = new HttpResponseEncoder(response, *request);
+
+  return send(socket, encoder)
+    .onAny([=]() {
+      delete encoder;
+    })
+    .then([=]() {
+      return stream(socket, response.reader.get());
+    })
+    // Regardless of whether `send` or `stream` completed succesfully
+    // or failed we close the reader so any writers will be notified.
+    .onAny([=]() mutable {
+      response.reader->close();
+    });
+}
+
+
+struct Item
+{
+  Request* request;
+  Future<Response> response;
+};
+
+
+Future<Nothing> send(
+    network::Socket socket,
+    Queue<Option<Item>> pipeline)
+{
+  return loop(
+      [=]() mutable {
+        return pipeline.get();
+      },
+      [=](const Option<Item>& item) -> Future<bool> {
+        if (item.isNone()) {
+          return false;
+        }
+
+        Request* request = item->request;
+        Future<Response> response = item->response;
+
+        return response
+          // TODO(benh):
+          // .recover([]() {
+          //   return InternalServerError("Discarded response");
+          // })
+          .repair([](const Future<Response>& response) {
+            // TODO(benh): Is this severe enough that we should close
+            // the connection?
+            return InternalServerError(response.failure());
+          })
+          .then([=](const Response& response) {
+            // TODO(benh): Should any generated InternalServerError
+            // responses due to bugs in the Response passed to us cause
+            // us to return a Failure here rather than keep processing
+            // more requests/responses?
+            return [&]() {
+              switch (response.type) {
+                case Response::PATH: return sendfile(socket, response, 
request);
+                case Response::PIPE: return stream(socket, response, request);
+                case Response::BODY:
+                case Response::NONE: return send(socket, response, request);
+              }
+            }()
+            .then([=]() {
+              // Persist the connection if the request expects it and
+              // the response doesn't include 'Connection: close'.
+              bool persist = request->keepAlive;
+              if (response.headers.contains("Connection")) {
+                if (response.headers.at("Connection") == "close") {
+                  persist = false;
+                }
+              }
+              return persist;
+            });
+          })
+          .onAny([=]() {
+            delete request;
+          });
+      });
+}
+
+
+Future<Nothing> receive(
+    network::Socket socket,
+    std::function<Future<Response>(const Request&)>&& f,
+    Queue<Option<Item>> pipeline)
+{
+  // Get the peer address to augment any requests we receive.
+  Try<network::Address> address = socket.peer();
+
+  if (address.isError()) {
+    return Failure("Failed to get peer address: " + address.error());
+  }
+
+  const size_t size = io::BUFFERED_READ_SIZE;
+  char* data = new char[size];
+
+  DataDecoder* decoder = new DataDecoder();
+
+  return loop(
+      [=]() {
+        return socket.recv(data, size);
+      },
+      [=](size_t length) mutable -> Future<bool> {
+        if (length == 0) {
+          return false;
+        }
+
+        // Decode as much of the data as possible into HTTP requests.
+        const deque<Request*> requests = decoder->decode(data, length);
+
+        // NOTE: it's possible the decoder has failed but some
+        // requests might be available, i.e., `requests.empty()` is
+        // not true, so we wait to return a `Failure` until when there
+        // are no requests.
+
+        if (decoder->failed() && requests.empty()) {
+          return Failure("Decoder error while receiving");
+        }
+
+        foreach (Request* request, requests) {
+          request->client = address.get();
+          // TODO(benh): To support HTTP pipelining we invoke `f`
+          // regardless of whether the previous response has been
+          // completed. This can make handling of requests more
+          // difficult so we could consider supporting disabling HTTP
+          // pipelining via some sort of "options" initially passed
+          // in.
+          pipeline.put(Item{request, f(*request)});
+        }
+
+        return true; // Keep looping!
+      })
+    .onAny([=]() {
+      delete decoder;
+      delete[] data;
+    });
+}
+
+
+Future<Nothing> serve(
+    network::Socket socket,
+    std::function<Future<Response>(const Request&)>&& f)
+{
+  // HTTP serving is implemented by running two loops, a "receive"
+  // loop and a "send" loop. The receive loop passes the pipeline of
+  // request/responses via a `Queue` to the send loop that is
+  // responsible for sending the response back to the client. A `None`
+  // passed on the queue signifies that receiving has completed.
+  //
+  // TODO(benh): Replace this with something like `Stream` that can
+  // give us completion semantics without having to encode them with
+  // an `Option` like we do here.
+  Queue<Option<Item>> pipeline;
+
+  Future<Nothing> receiving =
+    receive(socket, std::move(f), pipeline)
+      .onAny([=]() mutable {
+        // Either:
+        //
+        //   (1) An EOF was received.
+        //   (2) A failure occured while receiving.
+        //   (3) Receiving was discarded (likely because serving was
+        //       discarded).
+        //
+        // In all cases the best course of action is to signify that
+        // no more items will be enqueued on the `pipeline` and in
+        // the case of (2) or (3) shutdown the read end of the
+        // socket so the client recognizes it can't send any more
+        // requests.
+        //
+        // Note that we don't look at the return value of
+        // `Socket::shutdown` because the socket might already be
+        // shutdown!
+        pipeline.put(None());
+        socket.shutdown(network::Socket::Shutdown::READ);
+      });
+
+  Future<Nothing> sending =
+    send(socket, pipeline)
+      .onAny([=]() mutable {
+        // Either:
+        //
+        //   (1) HTTP connection is not meant to be persistent or
+        //       there are no more items expected in the pipeline.
+        //   (2) A failure occured while sending.
+        //   (3) Sending was discarded (likely because serving was
+        //       discarded).
+        //
+        // In all cases the best course of action is to shutdown the
+        // socket which will also force receiving to complete.
+        //
+        // Note that we don't look at the return value of
+        // `Socket::shutdown` because the socket might already be
+        // shutdown!
+        socket.shutdown(network::Socket::Shutdown::READ_WRITE);
+      });
+
+  std::shared_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
+
+  promise->future().onDiscard([=]() mutable {
+    receiving.discard();
+    sending.discard();
+  });
+
+  await(sending, receiving)
+    .onAny([=]() mutable {
+      // Delete remaining requests and discard remaining responses.
+      if (pipeline.size() != 0) {
+        loop([=]() mutable {
+               return pipeline.get();
+             },
+             [=](Option<Item> item) {
+               if (item.isNone()) {
+                 return false;
+               }
+               delete item->request;
+               if (promise->future().hasDiscard()) {
+                 item->response.discard();
+               }
+               return true;
+             });
+      }
+
+      if (receiving.isReady() && sending.isReady()) {
+        promise->set(Nothing());
+      } else if (receiving.isFailed() && sending.isFailed()) {
+        promise->fail("Failed to receive (" + receiving.failure() +
+                      ") and send (" + sending.failure() + ")");
+      } else if (receiving.isFailed()) {
+        promise->fail("Failed to receive: " + receiving.failure());
+      } else if (sending.isFailed()) {
+        promise->fail("Failed to send: " + sending.failure());
+      } else {
+        CHECK(receiving.isDiscarded() || sending.isDiscarded());
+        promise->discard();
+      }
+    });
+
+  return promise->future();
+}
+
+} // namespace internal {
+
+
 Request createRequest(
     const URL& url,
     const string& method,

http://git-wip-us.apache.org/repos/asf/mesos/blob/c33ba209/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp 
b/3rdparty/libprocess/src/tests/http_tests.cpp
index de22d20..4a0d852 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -44,11 +44,16 @@
 #include <stout/os.hpp>
 #include <stout/stringify.hpp>
 
+#include <stout/tests/utils.hpp>
+
 #include "encoder.hpp"
 
 namespace authentication = process::http::authentication;
 namespace ID = process::ID;
 namespace http = process::http;
+#ifndef __WINDOWS__
+namespace unix = process::network::unix;
+#endif // __WINDOWS__
 
 using authentication::Authenticator;
 using authentication::AuthenticationResult;
@@ -62,6 +67,7 @@ using process::Promise;
 
 using process::http::URL;
 
+using process::network::inet::Address;
 using process::network::inet::Socket;
 
 using std::string;
@@ -1759,3 +1765,214 @@ TEST_F(HttpAuthenticationTest, Basic)
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
   }
 }
+
+
+class HttpServeTest : public TemporaryDirectoryTest {};
+
+
+TEST_F(HttpServeTest, Pipelining)
+{
+  Try<Socket> server = Socket::create();
+  ASSERT_SOME(server);
+
+  ASSERT_SOME(server->bind(Address::ANY_ANY()));
+  ASSERT_SOME(server->listen(1));
+
+  Try<Address> address = server->address();
+  ASSERT_SOME(address);
+
+  Future<Socket> accept = server->accept();
+
+  Future<http::Connection> connect = http::connect(address.get());
+
+  AWAIT_READY(connect);
+  http::Connection connection = connect.get();
+
+  AWAIT_READY(accept);
+  Socket socket = accept.get();
+
+  class Handler
+  {
+  public:
+    MOCK_METHOD1(handle, Future<http::Response>(const http::Request&));
+  } handler;
+
+  Future<Nothing> serve = http::serve(
+    socket,
+    [&](const http::Request& request) {
+      return handler.handle(request);
+    });
+
+  Promise<http::Response> promise1;
+  Future<http::Request> request1;
+
+  Promise<http::Response> promise2;
+  Future<http::Request> request2;
+
+  Promise<http::Response> promise3;
+  Future<http::Request> request3;
+
+  EXPECT_CALL(handler, handle(_))
+    .WillOnce(DoAll(FutureArg<0>(&request1), Return(promise1.future())))
+    .WillOnce(DoAll(FutureArg<0>(&request2), Return(promise2.future())))
+    .WillOnce(DoAll(FutureArg<0>(&request3), Return(promise3.future())))
+    .WillRepeatedly(Return(http::OK()));
+
+  http::URL url("http", address->hostname().get(), address->port, "/");
+
+  http::Request request;
+  request.method = "GET";
+  request.url = url;
+  request.keepAlive = true;
+
+  Future<http::Response> response1 = connection.send(request);
+  Future<http::Response> response2 = connection.send(request);
+  Future<http::Response> response3 = connection.send(request);
+
+  AWAIT_READY(request1);
+  AWAIT_READY(request2);
+  AWAIT_READY(request3);
+
+  ASSERT_TRUE(response1.isPending());
+  ASSERT_TRUE(response2.isPending());
+  ASSERT_TRUE(response3.isPending());
+
+  promise3.set(http::OK("3"));
+
+  ASSERT_TRUE(response1.isPending());
+  ASSERT_TRUE(response2.isPending());
+  ASSERT_TRUE(response3.isPending());
+
+  promise1.set(http::OK("1"));
+
+  AWAIT_READY(response1);
+  EXPECT_EQ("1", response1->body);
+
+  ASSERT_TRUE(response2.isPending());
+  ASSERT_TRUE(response3.isPending());
+
+  promise2.set(http::OK("2"));
+
+  AWAIT_READY(response2);
+  EXPECT_EQ("2", response2->body);
+
+  AWAIT_READY(response3);
+  EXPECT_EQ("3", response3->body);
+
+  AWAIT_READY(connection.disconnect());
+
+  AWAIT_READY(serve);
+}
+
+
+TEST_F(HttpServeTest, Discard)
+{
+  Try<Socket> server = Socket::create();
+  ASSERT_SOME(server);
+
+  ASSERT_SOME(server->bind(Address::ANY_ANY()));
+  ASSERT_SOME(server->listen(1));
+
+  Try<Address> address = server->address();
+  ASSERT_SOME(address);
+
+  Future<Socket> accept = server->accept();
+
+  Future<http::Connection> connect = http::connect(address.get());
+
+  AWAIT_READY(connect);
+  http::Connection connection = connect.get();
+
+  AWAIT_READY(accept);
+  Socket socket = accept.get();
+
+  class Handler
+  {
+  public:
+    MOCK_METHOD1(handle, Future<http::Response>(const http::Request&));
+  } handler;
+
+  Future<Nothing> serve = http::serve(
+    socket,
+    [&](const http::Request& request) {
+      return handler.handle(request);
+    });
+
+  Promise<http::Response> promise1;
+  Future<http::Request> request1;
+
+  EXPECT_CALL(handler, handle(_))
+    .WillOnce(DoAll(FutureArg<0>(&request1), Return(promise1.future())));
+
+  http::URL url("http", address->hostname().get(), address->port, "/");
+
+  http::Request request;
+  request.method = "GET";
+  request.url = url;
+  request.keepAlive = true;
+
+  Future<http::Response> response = connection.send(request);
+
+  AWAIT_READY(request1);
+
+  promise1.future().onDiscard([&]() { promise1.discard(); });
+
+  serve.discard();
+
+  AWAIT_DISCARDED(serve);
+
+  EXPECT_TRUE(promise1.future().hasDiscard());
+
+  AWAIT_FAILED(response);
+
+  AWAIT_READY(connection.disconnected());
+}
+
+
+#ifndef __WINDOWS__
+TEST_F(HttpServeTest, Unix)
+{
+  Try<unix::Socket> server = unix::Socket::create();
+  ASSERT_SOME(server);
+
+  // Use a path in the temporary directory so it gets cleaned up.
+  string path = path::join(sandbox.get(), "socket");
+
+  Try<unix::Address> address = unix::Address::create(path);
+  ASSERT_SOME(address);
+
+  ASSERT_SOME(server->bind(address.get()));
+  ASSERT_SOME(server->listen(1));
+
+  Future<unix::Socket> accept = server->accept();
+
+  Future<http::Connection> connect = http::connect(address.get());
+
+  AWAIT_READY(connect);
+  http::Connection connection = connect.get();
+
+  AWAIT_READY(accept);
+  unix::Socket socket = accept.get();
+
+  Future<Nothing> serve = http::serve(
+    socket,
+    [](const http::Request& request) {
+      return http::OK(request.body);
+    });
+
+  http::Request request;
+  request.method = "GET";
+  request.url = http::URL("http", "", 80, "/");
+  request.keepAlive = true;
+  request.body = "Hello World!";
+
+  Future<http::Response> response = connection.send(request);
+
+  AWAIT_READY(response);
+  EXPECT_EQ(request.body, response->body);
+
+  AWAIT_READY(connection.disconnect());
+
+  AWAIT_READY(serve);
+}
+#endif // __WINDOWS__

Reply via email to