[ 
https://issues.apache.org/jira/browse/MESOS-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bernd Mathiske updated MESOS-3705:
----------------------------------
          Sprint: Mesosphere Sprint 21
    Story Points: 3

> HTTP Pipelining doesn't keep order of requests
> ----------------------------------------------
>
>                 Key: MESOS-3705
>                 URL: https://issues.apache.org/jira/browse/MESOS-3705
>             Project: Mesos
>          Issue Type: Bug
>          Components: libprocess
>    Affects Versions: 0.24.0
>            Reporter: Alexander Rojas
>            Assignee: Alexander Rojas
>              Labels: http, libprocess, mesosphere
>
> [HTTP 1.1 Pipelining|https://en.wikipedia.org/wiki/HTTP_pipelining] describes 
> a mechanism by which multiple HTTP request can be performed over a single 
> socket. The requirement here is that responses should be send in the same 
> order as requests are being made.
> Libprocess has some mechanisms built in to deal with pipelining when multiple 
> HTTP requests are made, it is still, however, possible to create a situation 
> in which responses are scrambled respected to the requests arrival.
> Consider the situation in which there are two libprocess processes, 
> {{processA}} and {{processB}}, each running in a different thread, 
> {{thread2}} and {{thread3}} respectively. The 
> [{{ProcessManager}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L374]
>  runs in {{thread1}}.
> {{processA}} is of type {{ProcessA}} which looks roughly as follows:
> {code}
> class ProcessA : public ProcessBase<ProcessA>
> {
> public:
>   ProcessA() {}
>   Future<http::Response> foo(const http::Request&) {
>     // … Do something …
>    return http::Ok();
>   }
> protected:
>   virtual void initialize() {
>     route("/foo", None(), &ProcessA::foo);
>   }
> }
> {code}
> {{processB}} is from type {{ProcessB}} which is just like {{ProcessA}} but 
> routes {{"bar"}} instead of {{"foo"}}.
> The situation in which the bug arises is the following:
> # Two requests, one for {{"http://server_uri/(1)/foo"}} and one for 
> {{"http://server_uri/(2)//bar"}} are made over the same socket.
> # The first request arrives to 
> [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202]
>  which is still running in {{thread1}}. This one creates an {{HttpEvent}} and 
> delivers to the handler, in this case {{processA}}.
> # 
> [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361]
>  enqueues the HTTP event in to the {{processA}} queue. This happens in 
> {{thread1}}.
> # The second request arrives to 
> [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202]
>  which is still running in {{thread1}}. Another {{HttpEvent}} is created and 
> delivered to the handler, in this case {{processB}}.
> # 
> [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361]
>  enqueues the HTTP event in to the {{processB}} queue. This happens in 
> {{thread1}}.
> # {{Thread2}} is blocked, so {{processA}} cannot handle the first request, it 
> is stuck in the queue.
> # {{Thread3}} is idle, so it picks up the request to {{processB}} immediately.
> # 
> [{{ProcessBase::visit(HttpEvent)}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3073]
>  is called in {{thread3}}, this one in turn 
> [dispatches|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3106]
>  the response's future to the {{HttpProxy}} associated with the socket where 
> the request came.
> At the last point, the bug is evident, the request to {{processB}} will be 
> send before the request to {{processA}} even if the handler takes a long time 
> and the {{processA::bar()}} actually finishes before. The responses are not 
> send in the order the requests are done.
> h1. Reproducer
> The following is a test which successfully reproduces the issue:
> {code}
> class PipelineScramblerProcess : public Process<PipelineScramblerProcess>
> {
> public:
>   PipelineScramblerProcess()
>     : ProcessBase(ID::generate("PipelineScramblerProcess")) {}
>   void block(const Future<Nothing>& trigger)
>   {
>     trigger.await();
>   }
>   Future<http::Response> get(const http::Request& request)
>   {
>     if (promise_) {
>       promise_->set(Nothing());
>     }
>     return http::OK(self().id);
>   }
>   void setPromise(std::unique_ptr<Promise<Nothing>>& promise)
>   {
>     promise_ = std::move(promise);
>   }
> protected:
>   virtual void initialize()
>   {
>     route("/get", None(), &PipelineScramblerProcess::get);
>   }
> private:
>   std::unique_ptr<Promise<Nothing>> promise_;
> };
> TEST(HTTPConnectionTest, ComplexPipelining)
> {
>   PipelineScramblerProcess blocked;
>   spawn(blocked);
>   PipelineScramblerProcess unblocked;
>   spawn(unblocked);
>   ASSERT_EQ(blocked.self().address.ip, unblocked.self().address.ip);
>   ASSERT_EQ(blocked.self().address.port, unblocked.self().address.port);
>   std::unique_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
>   // Block the first process so it cannot process the first request until
>   // the second request is finished.
>   dispatch(blocked, &PipelineScramblerProcess::block, promise->future());
>   // Promise will be set once 'fast' serves the second request.
>   unblocked.setPromise(promise);
>   // Get connection for HTTP pipelining.
>   Future<http::Connection> connect =
>     http::connect(http::URL(
>         "http",
>         blocked.self().address.ip,
>         blocked.self().address.port));
>   AWAIT_READY(connect);
>   http::Connection connection =  connect.get();
>   http::Request blockedRequest;
>   blockedRequest.method = "GET";
>   blockedRequest.url = http::URL(
>       "http",
>       blocked.self().address.ip,
>       blocked.self().address.port,
>       blocked.self().id + "/get");
>   blockedRequest.keepAlive = true;
>   Future<http::Response> blockedResponse = connection.send(blockedRequest);
>   http::Request unblockedRequest;
>   unblockedRequest.method = "GET";
>   unblockedRequest.url = http::URL(
>       "http",
>       unblocked.self().address.ip,
>       unblocked.self().address.port,
>       unblocked.self().id + "/get");
>   unblockedRequest.keepAlive = true;
>   Future<http::Response> unblockedResponse = 
> connection.send(unblockedRequest);
>   AWAIT_READY(blockedResponse);
>   AWAIT_READY(unblockedResponse);
>   EXPECT_EQ(blocked.self().id, blockedResponse->body);
>   EXPECT_EQ(unblocked.self().id, unblockedResponse->body);
>   AWAIT_READY(connection.disconnect());
>   AWAIT_READY(connection.disconnected());
>   terminate(blocked);
>   wait(blocked);
>   terminate(unblocked);
>   wait(unblocked);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to