This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit ed39b42034254f6c2e8da65dfea3a35e4d6ddce3
Author: Benjamin Mahler <[email protected]>
AuthorDate: Mon Nov 25 17:37:36 2019 -0500

    Updated mesos code to work against recordio encoder/decoder changes.
    
    The recordio encoder and decoder were updated to operate on records
    as bytes instead of typed T records.
    
    Review: https://reviews.apache.org/r/71825
---
 src/checks/checker_process.cpp                   |  25 +++--
 src/common/http.hpp                              |  13 ++-
 src/common/recordio.hpp                          |  21 +++--
 src/executor/executor.cpp                        |   8 +-
 src/resource_provider/http_connection.hpp        |   5 +-
 src/resource_provider/manager.cpp                |  48 ++--------
 src/scheduler/scheduler.cpp                      |   8 +-
 src/slave/containerizer/mesos/io/switchboard.cpp |  15 +--
 src/slave/http.cpp                               |  25 +++--
 src/tests/api_tests.cpp                          | 111 ++++++++---------------
 src/tests/common/recordio_tests.cpp              |  52 ++++-------
 src/tests/containerizer/io_switchboard_tests.cpp |  39 ++++----
 src/tests/executor_http_api_tests.cpp            |  17 ++--
 src/tests/master/mock_master_api_subscriber.cpp  |   2 +-
 src/tests/resource_provider_manager_tests.cpp    |  15 +--
 src/tests/scheduler_http_api_tests.cpp           |  22 ++---
 16 files changed, 162 insertions(+), 264 deletions(-)

diff --git a/src/checks/checker_process.cpp b/src/checks/checker_process.cpp
index c214bd1..6a9c7fc 100644
--- a/src/checks/checker_process.cpp
+++ b/src/checks/checker_process.cpp
@@ -171,30 +171,29 @@ static Try<tuple<string, string>> 
decodeProcessIOData(const string& data)
   string stdoutReceived;
   string stderrReceived;
 
-  ::recordio::Decoder<v1::agent::ProcessIO> decoder(
-      lambda::bind(
-          deserialize<v1::agent::ProcessIO>,
-          ContentType::PROTOBUF,
-          lambda::_1));
+  ::recordio::Decoder decoder;
 
-  Try<std::deque<Try<v1::agent::ProcessIO>>> records = decoder.decode(data);
+  Try<std::deque<string>> records = decoder.decode(data);
 
   if (records.isError()) {
     return Error(records.error());
   }
 
   while (!records->empty()) {
-    Try<v1::agent::ProcessIO> record = records->front();
+    string record = std::move(records->front());
     records->pop_front();
 
-    if (record.isError()) {
-      return Error(record.error());
+    Try<v1::agent::ProcessIO> result = deserialize<v1::agent::ProcessIO>(
+        ContentType::PROTOBUF, record);
+
+    if (result.isError()) {
+      return Error(result.error());
     }
 
-    if (record->data().type() == v1::agent::ProcessIO::Data::STDOUT) {
-      stdoutReceived += record->data().data();
-    } else if (record->data().type() == v1::agent::ProcessIO::Data::STDERR) {
-      stderrReceived += record->data().data();
+    if (result->data().type() == v1::agent::ProcessIO::Data::STDOUT) {
+      stdoutReceived += result->data().data();
+    } else if (result->data().type() == v1::agent::ProcessIO::Data::STDERR) {
+      stderrReceived += result->data().data();
     }
   }
 
diff --git a/src/common/http.hpp b/src/common/http.hpp
index b9ab561..534fc26 100644
--- a/src/common/http.hpp
+++ b/src/common/http.hpp
@@ -153,15 +153,19 @@ struct StreamingHttpConnection
       id::UUID _streamId = id::UUID::random())
     : writer(_writer),
       contentType(_contentType),
-      encoder(lambda::bind(serialize, contentType, lambda::_1)),
       streamId(_streamId) {}
 
-  // Converts the message to the templated `Event`, via `evolve()`,
-  // before sending.
   template <typename Message>
   bool send(const Message& message)
   {
-    return writer.write(encoder.encode(evolve(message)));
+    // TODO(bmahler): Remove this evolve(). Could we still
+    // somehow assert that evolve(message) produces a result
+    // of type Event without calling evolve()?
+    Event e = evolve(message);
+
+    std::string record = serialize(contentType, e);
+
+    return writer.write(::recordio::encode(record));
   }
 
   bool close()
@@ -176,7 +180,6 @@ struct StreamingHttpConnection
 
   process::http::Pipe::Writer writer;
   ContentType contentType;
-  ::recordio::Encoder<Event> encoder;
   id::UUID streamId;
 };
 
diff --git a/src/common/recordio.hpp b/src/common/recordio.hpp
index 8cb2e73..ee91a60 100644
--- a/src/common/recordio.hpp
+++ b/src/common/recordio.hpp
@@ -65,10 +65,10 @@ public:
   // We spawn `ReaderProcess` as a managed process to guarantee
   // that it does not wait on itself (this would cause a deadlock!).
   // See comments in `Connection::Data` for further details.
-  Reader(::recordio::Decoder<T>&& decoder,
+  Reader(std::function<Try<T>(const std::string&)> deserialize,
          process::http::Pipe::Reader reader)
     : process(process::spawn(
-        new internal::ReaderProcess<T>(std::move(decoder), reader),
+        new internal::ReaderProcess<T>(std::move(deserialize), reader),
         true)) {}
 
   virtual ~Reader()
@@ -149,10 +149,10 @@ class ReaderProcess : public 
process::Process<ReaderProcess<T>>
 {
 public:
   ReaderProcess(
-      ::recordio::Decoder<T>&& _decoder,
+      std::function<Try<T>(const std::string&)>&& _deserialize,
       process::http::Pipe::Reader _reader)
     : process::ProcessBase(process::ID::generate("__reader__")),
-      decoder(_decoder),
+      deserialize(_deserialize),
       reader(_reader),
       done(false) {}
 
@@ -235,26 +235,29 @@ private:
       return;
     }
 
-    Try<std::deque<Try<T>>> decode = decoder.decode(read.get());
+    Try<std::deque<std::string>> decode = decoder.decode(read.get());
 
     if (decode.isError()) {
       fail("Decoder failure: " + decode.error());
       return;
     }
 
-    foreach (const Try<T>& record, decode.get()) {
+    foreach (const std::string& record, decode.get()) {
+      Result<T> t = deserialize(record);
+
       if (!waiters.empty()) {
-        waiters.front()->set(Result<T>(std::move(record)));
+        waiters.front()->set(std::move(t));
         waiters.pop();
       } else {
-        records.push(std::move(record));
+        records.push(std::move(t));
       }
     }
 
     consume();
   }
 
-  ::recordio::Decoder<T> decoder;
+  ::recordio::Decoder decoder;
+  std::function<Try<T>(const std::string&)> deserialize;
   process::http::Pipe::Reader reader;
 
   std::queue<process::Owned<process::Promise<Result<T>>>> waiters;
diff --git a/src/executor/executor.cpp b/src/executor/executor.cpp
index b412603..dfa5820 100644
--- a/src/executor/executor.cpp
+++ b/src/executor/executor.cpp
@@ -634,11 +634,9 @@ protected:
 
       Pipe::Reader reader = response->reader.get();
 
-      auto deserializer =
-        lambda::bind(deserialize<Event>, contentType, lambda::_1);
-
-      Owned<Reader<Event>> decoder(
-          new Reader<Event>(Decoder<Event>(deserializer), reader));
+      Owned<Reader<Event>> decoder(new Reader<Event>(
+          lambda::bind(deserialize<Event>, contentType, lambda::_1),
+          reader));
 
       subscribed = SubscribedResponse {reader, decoder};
 
diff --git a/src/resource_provider/http_connection.hpp 
b/src/resource_provider/http_connection.hpp
index 05863aa..84a2610 100644
--- a/src/resource_provider/http_connection.hpp
+++ b/src/resource_provider/http_connection.hpp
@@ -368,12 +368,9 @@ protected:
 
       process::http::Pipe::Reader reader = response.reader.get();
 
-      auto deserializer =
-        lambda::bind(deserialize<Event>, contentType, lambda::_1);
-
       process::Owned<recordio::Reader<Event>> decoder(
           new recordio::Reader<Event>(
-              ::recordio::Decoder<Event>(deserializer),
+              lambda::bind(deserialize<Event>, contentType, lambda::_1),
               reader));
 
       subscribed = SubscribedResponse(reader, std::move(decoder));
diff --git a/src/resource_provider/manager.cpp 
b/src/resource_provider/manager.cpp
index 427ce70..5665167 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -113,48 +113,12 @@ createRegistryResourceProvider(const 
ResourceProviderInfo& resourceProviderInfo)
   return resourceProvider;
 }
 
-// Represents the streaming HTTP connection to a resource provider.
-struct HttpConnection
-{
-  HttpConnection(const http::Pipe::Writer& _writer,
-                 ContentType _contentType,
-                 id::UUID _streamId)
-    : writer(_writer),
-      contentType(_contentType),
-      streamId(_streamId),
-      encoder(lambda::bind(serialize, contentType, lambda::_1)) {}
-
-  // Converts the message to an Event before sending.
-  template <typename Message>
-  bool send(const Message& message)
-  {
-    // We need to evolve the internal 'message' into a
-    // 'v1::resource_provider::Event'.
-    return writer.write(encoder.encode(evolve(message)));
-  }
-
-  bool close()
-  {
-    return writer.close();
-  }
-
-  Future<Nothing> closed() const
-  {
-    return writer.readerClosed();
-  }
-
-  http::Pipe::Writer writer;
-  ContentType contentType;
-  id::UUID streamId;
-  ::recordio::Encoder<v1::resource_provider::Event> encoder;
-};
-
 
 struct ResourceProvider
 {
   ResourceProvider(
       const ResourceProviderInfo& _info,
-      const HttpConnection& _http)
+      const StreamingHttpConnection<v1::resource_provider::Event>& _http)
     : info(_info),
       http(_http) {}
 
@@ -172,7 +136,7 @@ struct ResourceProvider
   }
 
   ResourceProviderInfo info;
-  HttpConnection http;
+  StreamingHttpConnection<v1::resource_provider::Event> http;
   hashmap<id::UUID, Owned<Promise<Nothing>>> publishes;
 };
 
@@ -203,7 +167,7 @@ public:
 
 private:
   void subscribe(
-      const HttpConnection& http,
+      const StreamingHttpConnection<v1::resource_provider::Event>& http,
       const Call::Subscribe& subscribe);
 
   void _subscribe(
@@ -394,7 +358,9 @@ Future<http::Response> ResourceProviderManagerProcess::api(
           id::UUID streamId = id::UUID::random();
           ok.headers["Mesos-Stream-Id"] = streamId.toString();
 
-          HttpConnection http(pipe.writer(), acceptType, streamId);
+          StreamingHttpConnection<v1::resource_provider::Event> http(
+              pipe.writer(), acceptType, streamId);
+
           this->subscribe(http, call.subscribe());
 
           return std::move(ok);
@@ -804,7 +770,7 @@ Future<Nothing> 
ResourceProviderManagerProcess::publishResources(
 
 
 void ResourceProviderManagerProcess::subscribe(
-    const HttpConnection& http,
+    const StreamingHttpConnection<v1::resource_provider::Event>& http,
     const Call::Subscribe& subscribe)
 {
   const ResourceProviderInfo& resourceProviderInfo =
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 674483a..48be291 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -655,11 +655,9 @@ protected:
 
       Pipe::Reader reader = response->reader.get();
 
-      auto deserializer =
-        lambda::bind(deserialize<Event>, contentType, lambda::_1);
-
-      Owned<Reader<Event>> decoder(
-          new Reader<Event>(Decoder<Event>(deserializer), reader));
+      Owned<Reader<Event>> decoder(new Reader<Event>(
+          lambda::bind(deserialize<Event>, contentType, lambda::_1),
+          reader));
 
       subscribed = SubscribedResponse {reader, decoder};
 
diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp 
b/src/slave/containerizer/mesos/io/switchboard.cpp
index 8e02e51..8d78f7c 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -977,18 +977,21 @@ public:
   Future<Nothing> unblock();
 
 private:
+  // TODO(bmahler): Replace this with the common StreamingHttpConnection.
   class HttpConnection
   {
   public:
     HttpConnection(
         const http::Pipe::Writer& _writer,
-        const ContentType& contentType)
+        const ContentType& _contentType)
       : writer(_writer),
-        encoder(lambda::bind(serialize, contentType, lambda::_1)) {}
+        contentType(_contentType) {}
 
     bool send(const agent::ProcessIO& message)
     {
-      return writer.write(encoder.encode(message));
+      string record = serialize(contentType, message);
+
+      return writer.write(::recordio::encode(record));
     }
 
     bool close()
@@ -1003,7 +1006,7 @@ private:
 
   private:
     http::Pipe::Writer writer;
-    ::recordio::Encoder<agent::ProcessIO> encoder;
+    ContentType contentType;
   };
 
   // Sit in a heartbeat loop forever.
@@ -1483,10 +1486,10 @@ Future<http::Response> 
IOSwitchboardServerProcess::handler(
 
     Owned<recordio::Reader<agent::Call>> reader(
         new recordio::Reader<agent::Call>(
-            ::recordio::Decoder<agent::Call>(lambda::bind(
+            lambda::bind(
                 deserialize<agent::Call>,
                 messageContentType.get(),
-                lambda::_1)),
+                lambda::_1),
             request.reader.get()));
 
     return reader->read()
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 4d68ce7..04ad0d8 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -498,8 +498,8 @@ Future<Response> Http::api(
     CHECK_SOME(mediaTypes.messageContent);
 
     Owned<Reader<mesos::agent::Call>> reader(new Reader<mesos::agent::Call>(
-        Decoder<mesos::agent::Call>(lambda::bind(
-            deserializer, lambda::_1, mediaTypes.messageContent.get())),
+        lambda::bind(
+            deserializer, lambda::_1, mediaTypes.messageContent.get()),
         request.reader.get()));
 
     return reader->read()
@@ -3191,10 +3191,8 @@ Future<Response> Http::_attachContainerInput(
 
   CHECK_SOME(mediaTypes.messageContent);
   auto encoder = [mediaTypes](const mesos::agent::Call& call) {
-    ::recordio::Encoder<mesos::agent::Call> encoder(lambda::bind(
-        serialize, mediaTypes.messageContent.get(), lambda::_1));
-
-    return encoder.encode(call);
+    string record = serialize(mediaTypes.messageContent.get(), call);
+    return ::recordio::encode(record);
   };
 
   // Write the first record. We had extracted it from the `decoder`
@@ -3728,17 +3726,18 @@ Future<Response> Http::_attachContainerOutput(
           CHECK_SOME(response.reader);
           Pipe::Reader reader = response.reader.get();
 
-          auto deserializer = lambda::bind(
-              deserialize<ProcessIO>, messageContentType, lambda::_1);
-
           Owned<Reader<ProcessIO>> decoder(new Reader<ProcessIO>(
-              Decoder<ProcessIO>(deserializer), reader));
+              lambda::bind(
+                  deserialize<ProcessIO>,
+                  messageContentType,
+                  lambda::_1),
+              reader));
 
           auto encoder = [messageContentType](const ProcessIO& processIO) {
-            ::recordio::Encoder<v1::agent::ProcessIO> encoder (lambda::bind(
-                serialize, messageContentType, lambda::_1));
+            v1::agent::ProcessIO evolved = evolve(processIO);
+            string record = serialize(messageContentType, evolved);
 
-            return encoder.encode(evolve(processIO));
+            return ::recordio::encode(record);
           };
 
           recordio::transform<ProcessIO>(std::move(decoder), encoder, writer)
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 393f9a2..8755016 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -3073,8 +3073,7 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering)
   auto deserializer =
     lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
 
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
+  Reader<v1::master::Event> decoder(deserializer, reader);
 
   {
     Future<Result<v1::master::Event>> event = decoder.read();
@@ -3334,8 +3333,7 @@ TEST_P(MasterAPITest, EventAuthorizationDelayed)
   auto deserializer =
     lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
 
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
+  Reader<v1::master::Event> decoder(deserializer, reader);
 
   Future<Result<v1::master::Event>> event = decoder.read();
   AWAIT_READY(event);
@@ -3525,8 +3523,7 @@ TEST_P(MasterAPITest, FrameworksEvent)
   auto deserializer =
     lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
 
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
+  Reader<v1::master::Event> decoder(deserializer, reader);
 
   Future<Result<v1::master::Event>> event = decoder.read();
   AWAIT_READY(event);
@@ -3697,8 +3694,7 @@ TEST_P(MasterAPITest, Heartbeat)
   auto deserializer =
     lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
 
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
+  Reader<v1::master::Event> decoder(deserializer, reader);
 
   Future<Result<v1::master::Event>> event = decoder.read();
   AWAIT_READY(event);
@@ -3785,8 +3781,7 @@ TEST_P(MasterAPITest, MaxEventStreamSubscribers)
   ASSERT_SOME(response1->reader);
   http::Pipe::Reader reader1 = response1->reader.get();
 
-  Reader<v1::master::Event> decoder1(
-      Decoder<v1::master::Event>(deserializer), reader1);
+  Reader<v1::master::Event> decoder1(deserializer, reader1);
 
   event = decoder1.read();
   AWAIT_READY(event);
@@ -3807,8 +3802,7 @@ TEST_P(MasterAPITest, MaxEventStreamSubscribers)
   ASSERT_SOME(response2->reader);
   http::Pipe::Reader reader2 = response2->reader.get();
 
-  Reader<v1::master::Event> decoder2(
-      Decoder<v1::master::Event>(deserializer), reader2);
+  Reader<v1::master::Event> decoder2(deserializer, reader2);
 
   event = decoder2.read();
   AWAIT_READY(event);
@@ -3854,8 +3848,7 @@ TEST_P(MasterAPITest, MaxEventStreamSubscribers)
     ASSERT_SOME(response3->reader);
     http::Pipe::Reader reader3 = response3->reader.get();
 
-    Reader<v1::master::Event> decoder3(
-        Decoder<v1::master::Event>(deserializer), reader3);
+    Reader<v1::master::Event> decoder3(deserializer, reader3);
 
     event = decoder3.read();
     AWAIT_READY(event);
@@ -3898,8 +3891,7 @@ TEST_P(MasterAPITest, MaxEventStreamSubscribers)
   ASSERT_SOME(response4->reader);
   http::Pipe::Reader reader4 = response4->reader.get();
 
-  Reader<v1::master::Event> decoder4(
-      Decoder<v1::master::Event>(deserializer), reader4);
+  Reader<v1::master::Event> decoder4(deserializer, reader4);
 
   event = decoder4.read();
   AWAIT_READY(event);
@@ -5675,29 +5667,30 @@ static Future<tuple<string, string>> getProcessIOData(
       string stdoutReceived;
       string stderrReceived;
 
-      ::recordio::Decoder<v1::agent::ProcessIO> decoder(lambda::bind(
-          deserialize<v1::agent::ProcessIO>, contentType, lambda::_1));
+      ::recordio::Decoder decoder;
 
-      Try<std::deque<Try<v1::agent::ProcessIO>>> records =
-        decoder.decode(data);
+      Try<std::deque<string>> records = decoder.decode(data);
 
       if (records.isError()) {
         return process::Failure(records.error());
       }
 
       while(!records->empty()) {
-        Try<v1::agent::ProcessIO> record = records->front();
+        string record = std::move(records->front());
         records->pop_front();
 
-        if (record.isError()) {
-          return process::Failure(record.error());
+        Try<v1::agent::ProcessIO> processIO =
+          deserialize<v1::agent::ProcessIO>(contentType, record);
+
+        if (processIO.isError()) {
+          return process::Failure(processIO.error());
         }
 
-        if (record->data().type() == v1::agent::ProcessIO::Data::STDOUT) {
-          stdoutReceived += record->data().data();
-        } else if (record->data().type() ==
+        if (processIO->data().type() == v1::agent::ProcessIO::Data::STDOUT) {
+          stdoutReceived += processIO->data().data();
+        } else if (processIO->data().type() ==
             v1::agent::ProcessIO::Data::STDERR) {
-          stderrReceived += record->data().data();
+          stderrReceived += processIO->data().data();
         }
       }
 
@@ -7968,9 +7961,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   http::Pipe::Writer writer = pipe.writer();
   http::Pipe::Reader reader = pipe.reader();
 
-  ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-      serialize, contentType, lambda::_1));
-
   {
     v1::agent::Call call;
     call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
@@ -7981,7 +7971,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
     attach->mutable_container_id()->CopyFrom(containerId);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(contentType, call)));
   }
 
   const std::string command = "pkill sleep\n";
@@ -8000,7 +7990,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
     processIO->mutable_data()->set_data(command);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(contentType, call)));
   }
 
   {
@@ -8150,9 +8140,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, 
AttachContainerInputRepeat)
     http::Pipe::Writer writer = pipe.writer();
     http::Pipe::Reader reader = pipe.reader();
 
-    ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-        serialize, contentType, lambda::_1));
-
     {
       v1::agent::Call call;
       call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
@@ -8163,7 +8150,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, 
AttachContainerInputRepeat)
       attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
       attach->mutable_container_id()->CopyFrom(containerId);
 
-      writer.write(encoder.encode(call));
+      writer.write(::recordio::encode(serialize(contentType, call)));
     }
 
     size_t offset = 0;
@@ -8185,7 +8172,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, 
AttachContainerInputRepeat)
       processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
       processIO->mutable_data()->set_data(dataChunk);
 
-      writer.write(encoder.encode(call));
+      writer.write(::recordio::encode(serialize(contentType, call)));
     }
 
     // Signal `EOF` to the 'cat' command.
@@ -8203,7 +8190,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, 
AttachContainerInputRepeat)
       processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
       processIO->mutable_data()->set_data("");
 
-      writer.write(encoder.encode(call));
+      writer.write(::recordio::encode(serialize(contentType, call)));
     }
 
     writer.close();
@@ -8435,9 +8422,6 @@ TEST_F(AgentAPITest, AttachContainerInputFailure)
   http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
   headers[MESSAGE_CONTENT_TYPE] = stringify(messageContentType);
 
-  ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-      serialize, messageContentType, lambda::_1));
-
   EXPECT_CALL(containerizer, attach(_))
     .WillOnce(Return(process::Failure("Unsupported")));
 
@@ -8445,7 +8429,7 @@ TEST_F(AgentAPITest, AttachContainerInputFailure)
     slave.get()->pid,
     "api/v1",
     headers,
-    encoder.encode(call),
+    ::recordio::encode(serialize(messageContentType, call)),
     stringify(contentType));
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::InternalServerError().status, 
response);
@@ -8573,9 +8557,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     ContentType contentType = ContentType::RECORDIO;
     ContentType messageContentType = ContentType::PROTOBUF;
 
-    ::recordio::Encoder<v1::agent::Call> encoder(
-        lambda::bind(serialize, messageContentType, lambda::_1));
-
     http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
     headers[MESSAGE_CONTENT_TYPE] = stringify(messageContentType);
 
@@ -8583,7 +8564,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
       slave.get()->pid,
       "api/v1",
       headers,
-      encoder.encode(call),
+      ::recordio::encode(serialize(messageContentType, call)),
       stringify(contentType));
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response);
@@ -8624,14 +8605,11 @@ TEST_F(AgentAPITest, AttachContainerInputValidation)
     call.mutable_attach_container_input()->set_type(
         v1::agent::Call::AttachContainerInput::CONTAINER_ID);
 
-    ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-        serialize, messageContentType, lambda::_1));
-
     Future<http::Response> response = http::post(
         slave.get()->pid,
         "api/v1",
         headers,
-        encoder.encode(call),
+        ::recordio::encode(serialize(messageContentType, call)),
         stringify(contentType));
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::BadRequest().status, response);
@@ -8646,14 +8624,11 @@ TEST_F(AgentAPITest, AttachContainerInputValidation)
     call.mutable_attach_container_input()->set_type(
         v1::agent::Call::AttachContainerInput::PROCESS_IO);
 
-    ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-        serialize, messageContentType, lambda::_1));
-
     Future<http::Response> response = http::post(
         slave.get()->pid,
         "api/v1",
         headers,
-        encoder.encode(call),
+        ::recordio::encode(serialize(messageContentType, call)),
         stringify(contentType));
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::BadRequest().status, response);
@@ -8687,14 +8662,11 @@ TEST_F(AgentAPITest, HeaderValidation)
     call.mutable_attach_container_input()->set_type(
         v1::agent::Call::AttachContainerInput::CONTAINER_ID);
 
-    ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-        serialize, ContentType::PROTOBUF, lambda::_1));
-
     Future<http::Response> response = http::post(
         slave.get()->pid,
         "api/v1",
         createBasicAuthHeaders(DEFAULT_CREDENTIAL),
-        encoder.encode(call),
+        ::recordio::encode(serialize(ContentType::PROTOBUF, call)),
         stringify(ContentType::RECORDIO));
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::BadRequest().status, response);
@@ -8708,9 +8680,6 @@ TEST_F(AgentAPITest, HeaderValidation)
     call.mutable_attach_container_input()->set_type(
         v1::agent::Call::AttachContainerInput::CONTAINER_ID);
 
-    ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-        serialize, ContentType::PROTOBUF, lambda::_1));
-
     http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
     headers[MESSAGE_CONTENT_TYPE] = "unsupported/media-type";
 
@@ -8718,7 +8687,7 @@ TEST_F(AgentAPITest, HeaderValidation)
         slave.get()->pid,
         "api/v1",
         headers,
-        encoder.encode(call),
+        ::recordio::encode(serialize(ContentType::PROTOBUF, call)),
         stringify(ContentType::RECORDIO));
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::UnsupportedMediaType().status,
@@ -9382,9 +9351,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPIStreamingTest,
   http::Pipe::Writer writer = pipe.writer();
   http::Pipe::Reader reader = pipe.reader();
 
-  ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-      serialize, messageContentType, lambda::_1));
-
   // Prepare the data that needs to be streamed to the entrypoint
   // of the container.
 
@@ -9398,7 +9364,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPIStreamingTest,
     attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
     attach->mutable_container_id()->CopyFrom(containerId);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(messageContentType, call)));
   }
 
   size_t offset = 0;
@@ -9420,7 +9386,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPIStreamingTest,
     processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
     processIO->mutable_data()->set_data(dataChunk);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(messageContentType, call)));
   }
 
   // Signal `EOT` to the terminal so that it sends `EOF` to `cat` command.
@@ -9438,7 +9404,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPIStreamingTest,
     processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
     processIO->mutable_data()->set_data("\x04");
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(messageContentType, call)));
   }
 
   writer.close();
@@ -9610,9 +9576,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   http::Pipe::Writer writer = pipe.writer();
   http::Pipe::Reader reader = pipe.reader();
 
-  ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-      serialize, messageContentType, lambda::_1));
-
   {
     v1::agent::Call call;
     call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
@@ -9623,7 +9586,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
     attach->mutable_container_id()->CopyFrom(containerId);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(messageContentType, call)));
   }
 
   size_t offset = 0;
@@ -9645,7 +9608,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
     processIO->mutable_data()->set_data(dataChunk);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(messageContentType, call)));
   }
 
   // Signal `EOF` to the 'cat' command.
@@ -9663,7 +9626,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
     processIO->mutable_data()->set_data("");
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(messageContentType, call)));
   }
 
   writer.close();
diff --git a/src/tests/common/recordio_tests.cpp 
b/src/tests/common/recordio_tests.cpp
index 5dd6880..a79f08b 100644
--- a/src/tests/common/recordio_tests.cpp
+++ b/src/tests/common/recordio_tests.cpp
@@ -69,19 +69,16 @@ TEST(RecordIOReaderTest, EndOfFile)
 {
   // Write some data to the pipe so that records
   // are available before any reads occur.
-  ::recordio::Encoder<string> encoder(strings::upper);
-
   string data;
 
-  data += encoder.encode("hello");
-  data += encoder.encode("world!");
+  data += ::recordio::encode("HELLO");
+  data += ::recordio::encode("WORLD!");
 
   process::http::Pipe pipe;
   pipe.writer().write(data);
 
   mesos::internal::recordio::Reader<string> reader(
-      ::recordio::Decoder<string>(strings::lower),
-      pipe.reader());
+      strings::lower, pipe.reader());
 
   AWAIT_EXPECT_EQ(Result<string>::some("hello"), reader.read());
   AWAIT_EXPECT_EQ(Result<string>::some("world!"), reader.read());
@@ -93,7 +90,7 @@ TEST(RecordIOReaderTest, EndOfFile)
   EXPECT_TRUE(read1.isPending());
   EXPECT_TRUE(read2.isPending());
 
-  pipe.writer().write(encoder.encode("goodbye"));
+  pipe.writer().write(::recordio::encode("goodbye"));
   pipe.writer().close();
 
   AWAIT_EXPECT_EQ(Result<string>::some("goodbye"), read1);
@@ -106,12 +103,10 @@ TEST(RecordIOReaderTest, EndOfFile)
 
 TEST(RecordIOReaderTest, DecodingFailure)
 {
-  ::recordio::Encoder<string> encoder(strings::upper);
   process::http::Pipe pipe;
 
   mesos::internal::recordio::Reader<string> reader(
-      ::recordio::Decoder<string>(strings::lower),
-      pipe.reader());
+      strings::lower, pipe.reader());
 
   // Have multiple outstanding reads before we fail the decoder.
   Future<Result<string>> read1 = reader.read();
@@ -119,7 +114,7 @@ TEST(RecordIOReaderTest, DecodingFailure)
   Future<Result<string>> read3 = reader.read();
 
   // Write non-encoded data to the pipe so that the decoder fails.
-  pipe.writer().write(encoder.encode("encoded"));
+  pipe.writer().write(::recordio::encode("encoded"));
   pipe.writer().write("not encoded!\n");
 
   AWAIT_EXPECT_EQ(Result<string>::some("encoded"), read1);
@@ -128,7 +123,7 @@ TEST(RecordIOReaderTest, DecodingFailure)
 
   // The reader is now in a failed state, subsequent
   // writes will be dropped and all reads will fail.
-  pipe.writer().write(encoder.encode("encoded"));
+  pipe.writer().write(::recordio::encode("encoded"));
 
   AWAIT_EXPECT_FAILED(reader.read());
 }
@@ -136,12 +131,10 @@ TEST(RecordIOReaderTest, DecodingFailure)
 
 TEST(RecordIOReaderTest, PipeFailure)
 {
-  ::recordio::Encoder<string> encoder(strings::upper);
   process::http::Pipe pipe;
 
   mesos::internal::recordio::Reader<string> reader(
-      ::recordio::Decoder<string>(strings::lower),
-      pipe.reader());
+      strings::lower, pipe.reader());
 
   // Have multiple outstanding reads before we fail the writer.
   Future<Result<string>> read1 = reader.read();
@@ -149,7 +142,7 @@ TEST(RecordIOReaderTest, PipeFailure)
   Future<Result<string>> read3 = reader.read();
 
   // Write a record, then fail the pipe writer!
-  pipe.writer().write(encoder.encode("encoded"));
+  pipe.writer().write(::recordio::encode("ENCODED"));
   pipe.writer().fail("failure");
 
   AWAIT_EXPECT_EQ(Result<string>::some("encoded"), read1);
@@ -167,20 +160,17 @@ TEST(RecordIOTransformTest, EndOfFile)
 {
   // Write some data to the pipe so that records
   // are available before any reads occur.
-  ::recordio::Encoder<string> encoder(strings::upper);
-
   string data;
 
-  data += encoder.encode("hello ");
-  data += encoder.encode("world! ");
+  data += ::recordio::encode("HELLO ");
+  data += ::recordio::encode("WORLD! ");
 
   process::http::Pipe pipeA;
   pipeA.writer().write(data);
 
   process::Owned<mesos::internal::recordio::Reader<string>> reader(
     new mesos::internal::recordio::Reader<string>(
-        ::recordio::Decoder<string>(strings::lower),
-        pipeA.reader()));
+        strings::lower, pipeA.reader()));
 
   process::http::Pipe pipeB;
 
@@ -207,20 +197,17 @@ TEST(RecordIOTransformTest, ReaderWriterEndFail)
 {
   // Write some data to the pipe so that records
   // are available before any reads occur.
-  ::recordio::Encoder<string> encoder(strings::upper);
-
   string data;
 
-  data += encoder.encode("hello ");
-  data += encoder.encode("world! ");
+  data += ::recordio::encode("HELLO ");
+  data += ::recordio::encode("WORLD! ");
 
   process::http::Pipe pipeA;
   pipeA.writer().write(data);
 
   process::Owned<mesos::internal::recordio::Reader<string>> reader(
     new mesos::internal::recordio::Reader<string>(
-        ::recordio::Decoder<string>(strings::lower),
-        pipeA.reader()));
+        strings::lower, pipeA.reader()));
 
   process::http::Pipe pipeB;
 
@@ -244,20 +231,17 @@ TEST(RecordIOTransformTest, WriterReadEndFail)
 {
   // Write some data to the pipe so that records
   // are available before any reads occur.
-  ::recordio::Encoder<string> encoder(strings::upper);
-
   string data;
 
-  data += encoder.encode("hello ");
-  data += encoder.encode("world! ");
+  data += ::recordio::encode("HELLO ");
+  data += ::recordio::encode("WORLD! ");
 
   process::http::Pipe pipeA;
   pipeA.writer().write(data);
 
   process::Owned<mesos::internal::recordio::Reader<string>> reader(
     new mesos::internal::recordio::Reader<string>(
-        ::recordio::Decoder<string>(strings::lower),
-        pipeA.reader()));
+        strings::lower, pipeA.reader()));
 
   process::http::Pipe pipeB;
 
diff --git a/src/tests/containerizer/io_switchboard_tests.cpp 
b/src/tests/containerizer/io_switchboard_tests.cpp
index e443145..1b347eb 100644
--- a/src/tests/containerizer/io_switchboard_tests.cpp
+++ b/src/tests/containerizer/io_switchboard_tests.cpp
@@ -146,27 +146,29 @@ protected:
         string stdoutReceived;
         string stderrReceived;
 
-        ::recordio::Decoder<agent::ProcessIO> decoder(lambda::bind(
-            deserialize<agent::ProcessIO>, ContentType::JSON, lambda::_1));
+        ::recordio::Decoder decoder;
 
-        Try<std::deque<Try<agent::ProcessIO>>> records = decoder.decode(data);
+        Try<std::deque<string>> records = decoder.decode(data);
 
         if (records.isError()) {
           return process::Failure(records.error());
         }
 
         while(!records->empty()) {
-          Try<agent::ProcessIO> record = records->front();
+          string record = std::move(records->front());
           records->pop_front();
 
-          if (record.isError()) {
-            return process::Failure(record.error());
+          Try<agent::ProcessIO> processIO =
+            deserialize<agent::ProcessIO>(ContentType::JSON, record);
+
+          if (processIO.isError()) {
+            return process::Failure(processIO.error());
           }
 
-          if (record->data().type() == agent::ProcessIO::Data::STDOUT) {
-            stdoutReceived += record->data().data();
-          } else if (record->data().type() == agent::ProcessIO::Data::STDERR) {
-            stderrReceived += record->data().data();
+          if (processIO->data().type() == agent::ProcessIO::Data::STDOUT) {
+            stdoutReceived += processIO->data().data();
+          } else if (processIO->data().type() == 
agent::ProcessIO::Data::STDERR) {
+            stderrReceived += processIO->data().data();
           }
         }
 
@@ -442,8 +444,7 @@ TEST_F(IOSwitchboardServerTest, SendHeartbeat)
   };
 
   recordio::Reader<agent::ProcessIO> responseDecoder(
-      ::recordio::Decoder<agent::ProcessIO>(deserializer),
-      reader.get());
+      deserializer, reader.get());
 
   // Wait for 5 heartbeat messages.
   Clock::pause();
@@ -555,9 +556,6 @@ TEST_F(IOSwitchboardServerTest, AttachInput)
 
   Future<http::Response> response = connection.send(request);
 
-  ::recordio::Encoder<mesos::agent::Call> encoder(lambda::bind(
-        serialize, ContentType::JSON, lambda::_1));
-
   Call call;
   call.set_type(Call::ATTACH_CONTAINER_INPUT);
 
@@ -565,7 +563,7 @@ TEST_F(IOSwitchboardServerTest, AttachInput)
   attach->set_type(Call::AttachContainerInput::CONTAINER_ID);
   attach->mutable_container_id()->set_value(id::UUID::random().toString());
 
-  writer.write(encoder.encode(call));
+  writer.write(::recordio::encode(serialize(ContentType::JSON, call)));
 
   size_t offset = 0;
   size_t chunkSize = 4096;
@@ -584,7 +582,7 @@ TEST_F(IOSwitchboardServerTest, AttachInput)
     message->mutable_data()->set_type(ProcessIO::Data::STDIN);
     message->mutable_data()->set_data(dataChunk);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(ContentType::JSON, call)));
   }
 
   writer.close();
@@ -667,9 +665,6 @@ TEST_F(IOSwitchboardServerTest, ReceiveHeartbeat)
 
   Future<http::Response> response = connection.send(request);
 
-  ::recordio::Encoder<mesos::agent::Call> encoder(lambda::bind(
-      serialize, ContentType::JSON, lambda::_1));
-
   Call call;
   call.set_type(Call::ATTACH_CONTAINER_INPUT);
 
@@ -677,7 +672,7 @@ TEST_F(IOSwitchboardServerTest, ReceiveHeartbeat)
   attach->set_type(Call::AttachContainerInput::CONTAINER_ID);
   attach->mutable_container_id()->set_value(id::UUID::random().toString());
 
-  writer.write(encoder.encode(call));
+  writer.write(::recordio::encode(serialize(ContentType::JSON, call)));
 
   // Send 5 heartbeat messages.
   Duration heartbeat = Milliseconds(10);
@@ -693,7 +688,7 @@ TEST_F(IOSwitchboardServerTest, ReceiveHeartbeat)
     message->mutable_control()->mutable_heartbeat()
         ->mutable_interval()->set_nanoseconds(heartbeat.ns());
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(ContentType::JSON, call)));
 
     Clock::advance(heartbeat);
   }
diff --git a/src/tests/executor_http_api_tests.cpp 
b/src/tests/executor_http_api_tests.cpp
index 99bcafb..d111547 100644
--- a/src/tests/executor_http_api_tests.cpp
+++ b/src/tests/executor_http_api_tests.cpp
@@ -936,11 +936,8 @@ TEST_P(ExecutorHttpApiTest, Subscribe)
   Option<Pipe::Reader> reader = response->reader;
   ASSERT_SOME(reader);
 
-  auto deserializer =
-    lambda::bind(deserialize<Event>, contentType, lambda::_1);
-
   Reader<Event> responseDecoder(
-      Decoder<Event>(deserializer),
+      lambda::bind(deserialize<Event>, contentType, lambda::_1),
       reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
@@ -1047,9 +1044,9 @@ TEST_P(ExecutorHttpApiTest, HeartbeatEvents)
   Option<Pipe::Reader> reader = response->reader;
   ASSERT_SOME(reader);
 
-  auto deserializer =
-    lambda::bind(deserialize<Event>, contentType, lambda::_1);
-  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+  Reader<Event> responseDecoder(
+      lambda::bind(deserialize<Event>, contentType, lambda::_1),
+      reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
   AWAIT_READY(event);
@@ -1158,10 +1155,8 @@ TEST_F(ExecutorHttpApiTest, HeartbeatCalls)
 
     event.mutable_subscribed()->mutable_container_id()->set_value(":P");
 
-    ::recordio::Encoder<v1::executor::Event> encoder(
-        lambda::bind(serialize, ContentType::PROTOBUF, lambda::_1));
-
-    pipe.writer().write(encoder.encode(event));
+    pipe.writer().write(
+        ::recordio::encode(serialize(ContentType::PROTOBUF, event)));
   }
 
   // Set the expectation for an executor to register with the fake agent.
diff --git a/src/tests/master/mock_master_api_subscriber.cpp 
b/src/tests/master/mock_master_api_subscriber.cpp
index a0808e8..893d3e3 100644
--- a/src/tests/master/mock_master_api_subscriber.cpp
+++ b/src/tests/master/mock_master_api_subscriber.cpp
@@ -98,7 +98,7 @@ private:
         deserialize<Event>, contentType, lambda::_1);
 
     std::unique_ptr<Reader<Event>> reader(new Reader<Event>(
-        ::recordio::Decoder<Event>(deserializer), response->reader.get()));
+        deserializer, response->reader.get()));
 
     auto decode = lambda::bind(
         [](std::unique_ptr<Reader<Event>>& d) { return d->read(); },
diff --git a/src/tests/resource_provider_manager_tests.cpp 
b/src/tests/resource_provider_manager_tests.cpp
index 84ec70b..e1fedf7 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -544,8 +544,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, 
PublishResourcesSuccess)
     ASSERT_SOME(reader);
 
     responseDecoder.reset(new recordio::Reader<Event>(
-        ::recordio::Decoder<Event>(
-            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        lambda::bind(deserialize<Event>, contentType, lambda::_1),
         reader.get()));
 
     Future<Result<Event>> event = responseDecoder->read();
@@ -652,8 +651,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, 
PublishResourcesFailure)
     ASSERT_SOME(reader);
 
     responseDecoder.reset(new recordio::Reader<Event>(
-        ::recordio::Decoder<Event>(
-            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        lambda::bind(deserialize<Event>, contentType, lambda::_1),
         reader.get()));
 
     Future<Result<Event>> event = responseDecoder->read();
@@ -753,8 +751,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, 
PublishResourcesDisconnected)
     ASSERT_SOME(reader);
 
     responseDecoder.reset(new recordio::Reader<Event>(
-        ::recordio::Decoder<Event>(
-            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        lambda::bind(deserialize<Event>, contentType, lambda::_1),
         reader.get()));
 
     Future<Result<Event>> event = responseDecoder->read();
@@ -846,8 +843,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
   ASSERT_SOME(reader);
 
   recordio::Reader<Event> responseDecoder(
-      ::recordio::Decoder<Event>(
-          lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+      lambda::bind(deserialize<Event>, contentType, lambda::_1),
       reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
@@ -1696,8 +1692,7 @@ TEST_F(ResourceProviderManagerHttpApiTest, 
RemoveResourceProvider)
     ASSERT_SOME(reader);
 
     recordio::Reader<Event> responseDecoder(
-        ::recordio::Decoder<Event>(
-            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        lambda::bind(deserialize<Event>, contentType, lambda::_1),
         reader.get());
 
     // We expect the manager to drop the subscribe call since
diff --git a/src/tests/scheduler_http_api_tests.cpp 
b/src/tests/scheduler_http_api_tests.cpp
index d5b5eec..66ed9ce 100644
--- a/src/tests/scheduler_http_api_tests.cpp
+++ b/src/tests/scheduler_http_api_tests.cpp
@@ -282,7 +282,7 @@ TEST_P(SchedulerHttpApiTest, Subscribe)
   auto deserializer = lambda::bind(
       &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+  Reader<Event> responseDecoder(deserializer, reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
   AWAIT_READY(event);
@@ -350,7 +350,7 @@ TEST_P(SchedulerHttpApiTest, RejectFrameworkWithInvalidRole)
   auto deserializer = lambda::bind(
       &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+  Reader<Event> responseDecoder(deserializer, reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
   AWAIT_READY(event);
@@ -430,7 +430,7 @@ TEST_P(SchedulerHttpApiTest, SubscribedOnRetry)
     Option<Pipe::Reader> reader = response->reader;
     ASSERT_SOME(reader);
 
-    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+    Reader<Event> responseDecoder(deserializer, reader.get());
 
     Future<Result<Event>> event = responseDecoder.read();
     AWAIT_READY(event);
@@ -457,7 +457,7 @@ TEST_P(SchedulerHttpApiTest, SubscribedOnRetry)
     Option<Pipe::Reader> reader = response->reader;
     ASSERT_SOME(reader);
 
-    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+    Reader<Event> responseDecoder(deserializer, reader.get());
 
     // Check if we were successfully able to subscribe after the blip.
     Future<Result<Event>> event = responseDecoder.read();
@@ -544,7 +544,7 @@ TEST_P(SchedulerHttpApiTest, UpdatePidToHttpScheduler)
   auto deserializer = lambda::bind(
       &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+  Reader<Event> responseDecoder(deserializer, reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
   AWAIT_READY(event);
@@ -608,7 +608,7 @@ TEST_P(SchedulerHttpApiTest, UpdateHttpToPidScheduler)
   auto deserializer = lambda::bind(
       &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+  Reader<Event> responseDecoder(deserializer, reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
   AWAIT_READY(event);
@@ -705,7 +705,7 @@ TEST_P(SchedulerHttpApiTest, 
UpdateHttpToPidSchedulerAndBack)
   auto deserializer = lambda::bind(
       &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+  Reader<Event> responseDecoder(deserializer, reader.get());
 
   // Get SUBSCRIBED event and check framework ID.
   Future<Result<Event>> event = responseDecoder.read();
@@ -899,7 +899,7 @@ TEST_P(SchedulerHttpApiTest, TeardownWithoutStreamId)
     auto deserializer = lambda::bind(
         &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+    Reader<Event> responseDecoder(deserializer, reader.get());
 
     Future<Result<Event>> event = responseDecoder.read();
     AWAIT_READY(event);
@@ -974,7 +974,7 @@ TEST_P(SchedulerHttpApiTest, TeardownWrongStreamId)
     auto deserializer = lambda::bind(
         &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+    Reader<Event> responseDecoder(deserializer, reader.get());
 
     Future<Result<Event>> event = responseDecoder.read();
     AWAIT_READY(event);
@@ -1020,7 +1020,7 @@ TEST_P(SchedulerHttpApiTest, TeardownWrongStreamId)
     auto deserializer = lambda::bind(
         &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+    Reader<Event> responseDecoder(deserializer, reader.get());
 
     Future<Result<Event>> event = responseDecoder.read();
     AWAIT_READY(event);
@@ -1092,7 +1092,7 @@ TEST_P(SchedulerHttpApiTest, MalformedUUID)
     auto deserializer = lambda::bind(
         &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+    Reader<Event> responseDecoder(deserializer, reader.get());
 
     Future<Result<Event>> event = responseDecoder.read();
     AWAIT_READY(event);

Reply via email to