This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit ed39b42034254f6c2e8da65dfea3a35e4d6ddce3 Author: Benjamin Mahler <[email protected]> AuthorDate: Mon Nov 25 17:37:36 2019 -0500 Updated mesos code to work against recordio encoder/decoder changes. The recordio encoder and decoder were updated to operate on records as bytes instead of typed T records. Review: https://reviews.apache.org/r/71825 --- src/checks/checker_process.cpp | 25 +++-- src/common/http.hpp | 13 ++- src/common/recordio.hpp | 21 +++-- src/executor/executor.cpp | 8 +- src/resource_provider/http_connection.hpp | 5 +- src/resource_provider/manager.cpp | 48 ++-------- src/scheduler/scheduler.cpp | 8 +- src/slave/containerizer/mesos/io/switchboard.cpp | 15 +-- src/slave/http.cpp | 25 +++-- src/tests/api_tests.cpp | 111 ++++++++--------------- src/tests/common/recordio_tests.cpp | 52 ++++------- src/tests/containerizer/io_switchboard_tests.cpp | 39 ++++---- src/tests/executor_http_api_tests.cpp | 17 ++-- src/tests/master/mock_master_api_subscriber.cpp | 2 +- src/tests/resource_provider_manager_tests.cpp | 15 +-- src/tests/scheduler_http_api_tests.cpp | 22 ++--- 16 files changed, 162 insertions(+), 264 deletions(-) diff --git a/src/checks/checker_process.cpp b/src/checks/checker_process.cpp index c214bd1..6a9c7fc 100644 --- a/src/checks/checker_process.cpp +++ b/src/checks/checker_process.cpp @@ -171,30 +171,29 @@ static Try<tuple<string, string>> decodeProcessIOData(const string& data) string stdoutReceived; string stderrReceived; - ::recordio::Decoder<v1::agent::ProcessIO> decoder( - lambda::bind( - deserialize<v1::agent::ProcessIO>, - ContentType::PROTOBUF, - lambda::_1)); + ::recordio::Decoder decoder; - Try<std::deque<Try<v1::agent::ProcessIO>>> records = decoder.decode(data); + Try<std::deque<string>> records = decoder.decode(data); if (records.isError()) { return Error(records.error()); } while (!records->empty()) { - Try<v1::agent::ProcessIO> record = records->front(); + string record = std::move(records->front()); records->pop_front(); - if (record.isError()) { - return Error(record.error()); + Try<v1::agent::ProcessIO> result = deserialize<v1::agent::ProcessIO>( + ContentType::PROTOBUF, record); + + if (result.isError()) { + return Error(result.error()); } - if (record->data().type() == v1::agent::ProcessIO::Data::STDOUT) { - stdoutReceived += record->data().data(); - } else if (record->data().type() == v1::agent::ProcessIO::Data::STDERR) { - stderrReceived += record->data().data(); + if (result->data().type() == v1::agent::ProcessIO::Data::STDOUT) { + stdoutReceived += result->data().data(); + } else if (result->data().type() == v1::agent::ProcessIO::Data::STDERR) { + stderrReceived += result->data().data(); } } diff --git a/src/common/http.hpp b/src/common/http.hpp index b9ab561..534fc26 100644 --- a/src/common/http.hpp +++ b/src/common/http.hpp @@ -153,15 +153,19 @@ struct StreamingHttpConnection id::UUID _streamId = id::UUID::random()) : writer(_writer), contentType(_contentType), - encoder(lambda::bind(serialize, contentType, lambda::_1)), streamId(_streamId) {} - // Converts the message to the templated `Event`, via `evolve()`, - // before sending. template <typename Message> bool send(const Message& message) { - return writer.write(encoder.encode(evolve(message))); + // TODO(bmahler): Remove this evolve(). Could we still + // somehow assert that evolve(message) produces a result + // of type Event without calling evolve()? + Event e = evolve(message); + + std::string record = serialize(contentType, e); + + return writer.write(::recordio::encode(record)); } bool close() @@ -176,7 +180,6 @@ struct StreamingHttpConnection process::http::Pipe::Writer writer; ContentType contentType; - ::recordio::Encoder<Event> encoder; id::UUID streamId; }; diff --git a/src/common/recordio.hpp b/src/common/recordio.hpp index 8cb2e73..ee91a60 100644 --- a/src/common/recordio.hpp +++ b/src/common/recordio.hpp @@ -65,10 +65,10 @@ public: // We spawn `ReaderProcess` as a managed process to guarantee // that it does not wait on itself (this would cause a deadlock!). // See comments in `Connection::Data` for further details. - Reader(::recordio::Decoder<T>&& decoder, + Reader(std::function<Try<T>(const std::string&)> deserialize, process::http::Pipe::Reader reader) : process(process::spawn( - new internal::ReaderProcess<T>(std::move(decoder), reader), + new internal::ReaderProcess<T>(std::move(deserialize), reader), true)) {} virtual ~Reader() @@ -149,10 +149,10 @@ class ReaderProcess : public process::Process<ReaderProcess<T>> { public: ReaderProcess( - ::recordio::Decoder<T>&& _decoder, + std::function<Try<T>(const std::string&)>&& _deserialize, process::http::Pipe::Reader _reader) : process::ProcessBase(process::ID::generate("__reader__")), - decoder(_decoder), + deserialize(_deserialize), reader(_reader), done(false) {} @@ -235,26 +235,29 @@ private: return; } - Try<std::deque<Try<T>>> decode = decoder.decode(read.get()); + Try<std::deque<std::string>> decode = decoder.decode(read.get()); if (decode.isError()) { fail("Decoder failure: " + decode.error()); return; } - foreach (const Try<T>& record, decode.get()) { + foreach (const std::string& record, decode.get()) { + Result<T> t = deserialize(record); + if (!waiters.empty()) { - waiters.front()->set(Result<T>(std::move(record))); + waiters.front()->set(std::move(t)); waiters.pop(); } else { - records.push(std::move(record)); + records.push(std::move(t)); } } consume(); } - ::recordio::Decoder<T> decoder; + ::recordio::Decoder decoder; + std::function<Try<T>(const std::string&)> deserialize; process::http::Pipe::Reader reader; std::queue<process::Owned<process::Promise<Result<T>>>> waiters; diff --git a/src/executor/executor.cpp b/src/executor/executor.cpp index b412603..dfa5820 100644 --- a/src/executor/executor.cpp +++ b/src/executor/executor.cpp @@ -634,11 +634,9 @@ protected: Pipe::Reader reader = response->reader.get(); - auto deserializer = - lambda::bind(deserialize<Event>, contentType, lambda::_1); - - Owned<Reader<Event>> decoder( - new Reader<Event>(Decoder<Event>(deserializer), reader)); + Owned<Reader<Event>> decoder(new Reader<Event>( + lambda::bind(deserialize<Event>, contentType, lambda::_1), + reader)); subscribed = SubscribedResponse {reader, decoder}; diff --git a/src/resource_provider/http_connection.hpp b/src/resource_provider/http_connection.hpp index 05863aa..84a2610 100644 --- a/src/resource_provider/http_connection.hpp +++ b/src/resource_provider/http_connection.hpp @@ -368,12 +368,9 @@ protected: process::http::Pipe::Reader reader = response.reader.get(); - auto deserializer = - lambda::bind(deserialize<Event>, contentType, lambda::_1); - process::Owned<recordio::Reader<Event>> decoder( new recordio::Reader<Event>( - ::recordio::Decoder<Event>(deserializer), + lambda::bind(deserialize<Event>, contentType, lambda::_1), reader)); subscribed = SubscribedResponse(reader, std::move(decoder)); diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp index 427ce70..5665167 100644 --- a/src/resource_provider/manager.cpp +++ b/src/resource_provider/manager.cpp @@ -113,48 +113,12 @@ createRegistryResourceProvider(const ResourceProviderInfo& resourceProviderInfo) return resourceProvider; } -// Represents the streaming HTTP connection to a resource provider. -struct HttpConnection -{ - HttpConnection(const http::Pipe::Writer& _writer, - ContentType _contentType, - id::UUID _streamId) - : writer(_writer), - contentType(_contentType), - streamId(_streamId), - encoder(lambda::bind(serialize, contentType, lambda::_1)) {} - - // Converts the message to an Event before sending. - template <typename Message> - bool send(const Message& message) - { - // We need to evolve the internal 'message' into a - // 'v1::resource_provider::Event'. - return writer.write(encoder.encode(evolve(message))); - } - - bool close() - { - return writer.close(); - } - - Future<Nothing> closed() const - { - return writer.readerClosed(); - } - - http::Pipe::Writer writer; - ContentType contentType; - id::UUID streamId; - ::recordio::Encoder<v1::resource_provider::Event> encoder; -}; - struct ResourceProvider { ResourceProvider( const ResourceProviderInfo& _info, - const HttpConnection& _http) + const StreamingHttpConnection<v1::resource_provider::Event>& _http) : info(_info), http(_http) {} @@ -172,7 +136,7 @@ struct ResourceProvider } ResourceProviderInfo info; - HttpConnection http; + StreamingHttpConnection<v1::resource_provider::Event> http; hashmap<id::UUID, Owned<Promise<Nothing>>> publishes; }; @@ -203,7 +167,7 @@ public: private: void subscribe( - const HttpConnection& http, + const StreamingHttpConnection<v1::resource_provider::Event>& http, const Call::Subscribe& subscribe); void _subscribe( @@ -394,7 +358,9 @@ Future<http::Response> ResourceProviderManagerProcess::api( id::UUID streamId = id::UUID::random(); ok.headers["Mesos-Stream-Id"] = streamId.toString(); - HttpConnection http(pipe.writer(), acceptType, streamId); + StreamingHttpConnection<v1::resource_provider::Event> http( + pipe.writer(), acceptType, streamId); + this->subscribe(http, call.subscribe()); return std::move(ok); @@ -804,7 +770,7 @@ Future<Nothing> ResourceProviderManagerProcess::publishResources( void ResourceProviderManagerProcess::subscribe( - const HttpConnection& http, + const StreamingHttpConnection<v1::resource_provider::Event>& http, const Call::Subscribe& subscribe) { const ResourceProviderInfo& resourceProviderInfo = diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index 674483a..48be291 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -655,11 +655,9 @@ protected: Pipe::Reader reader = response->reader.get(); - auto deserializer = - lambda::bind(deserialize<Event>, contentType, lambda::_1); - - Owned<Reader<Event>> decoder( - new Reader<Event>(Decoder<Event>(deserializer), reader)); + Owned<Reader<Event>> decoder(new Reader<Event>( + lambda::bind(deserialize<Event>, contentType, lambda::_1), + reader)); subscribed = SubscribedResponse {reader, decoder}; diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp index 8e02e51..8d78f7c 100644 --- a/src/slave/containerizer/mesos/io/switchboard.cpp +++ b/src/slave/containerizer/mesos/io/switchboard.cpp @@ -977,18 +977,21 @@ public: Future<Nothing> unblock(); private: + // TODO(bmahler): Replace this with the common StreamingHttpConnection. class HttpConnection { public: HttpConnection( const http::Pipe::Writer& _writer, - const ContentType& contentType) + const ContentType& _contentType) : writer(_writer), - encoder(lambda::bind(serialize, contentType, lambda::_1)) {} + contentType(_contentType) {} bool send(const agent::ProcessIO& message) { - return writer.write(encoder.encode(message)); + string record = serialize(contentType, message); + + return writer.write(::recordio::encode(record)); } bool close() @@ -1003,7 +1006,7 @@ private: private: http::Pipe::Writer writer; - ::recordio::Encoder<agent::ProcessIO> encoder; + ContentType contentType; }; // Sit in a heartbeat loop forever. @@ -1483,10 +1486,10 @@ Future<http::Response> IOSwitchboardServerProcess::handler( Owned<recordio::Reader<agent::Call>> reader( new recordio::Reader<agent::Call>( - ::recordio::Decoder<agent::Call>(lambda::bind( + lambda::bind( deserialize<agent::Call>, messageContentType.get(), - lambda::_1)), + lambda::_1), request.reader.get())); return reader->read() diff --git a/src/slave/http.cpp b/src/slave/http.cpp index 4d68ce7..04ad0d8 100644 --- a/src/slave/http.cpp +++ b/src/slave/http.cpp @@ -498,8 +498,8 @@ Future<Response> Http::api( CHECK_SOME(mediaTypes.messageContent); Owned<Reader<mesos::agent::Call>> reader(new Reader<mesos::agent::Call>( - Decoder<mesos::agent::Call>(lambda::bind( - deserializer, lambda::_1, mediaTypes.messageContent.get())), + lambda::bind( + deserializer, lambda::_1, mediaTypes.messageContent.get()), request.reader.get())); return reader->read() @@ -3191,10 +3191,8 @@ Future<Response> Http::_attachContainerInput( CHECK_SOME(mediaTypes.messageContent); auto encoder = [mediaTypes](const mesos::agent::Call& call) { - ::recordio::Encoder<mesos::agent::Call> encoder(lambda::bind( - serialize, mediaTypes.messageContent.get(), lambda::_1)); - - return encoder.encode(call); + string record = serialize(mediaTypes.messageContent.get(), call); + return ::recordio::encode(record); }; // Write the first record. We had extracted it from the `decoder` @@ -3728,17 +3726,18 @@ Future<Response> Http::_attachContainerOutput( CHECK_SOME(response.reader); Pipe::Reader reader = response.reader.get(); - auto deserializer = lambda::bind( - deserialize<ProcessIO>, messageContentType, lambda::_1); - Owned<Reader<ProcessIO>> decoder(new Reader<ProcessIO>( - Decoder<ProcessIO>(deserializer), reader)); + lambda::bind( + deserialize<ProcessIO>, + messageContentType, + lambda::_1), + reader)); auto encoder = [messageContentType](const ProcessIO& processIO) { - ::recordio::Encoder<v1::agent::ProcessIO> encoder (lambda::bind( - serialize, messageContentType, lambda::_1)); + v1::agent::ProcessIO evolved = evolve(processIO); + string record = serialize(messageContentType, evolved); - return encoder.encode(evolve(processIO)); + return ::recordio::encode(record); }; recordio::transform<ProcessIO>(std::move(decoder), encoder, writer) diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp index 393f9a2..8755016 100644 --- a/src/tests/api_tests.cpp +++ b/src/tests/api_tests.cpp @@ -3073,8 +3073,7 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering) auto deserializer = lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1); - Reader<v1::master::Event> decoder( - Decoder<v1::master::Event>(deserializer), reader); + Reader<v1::master::Event> decoder(deserializer, reader); { Future<Result<v1::master::Event>> event = decoder.read(); @@ -3334,8 +3333,7 @@ TEST_P(MasterAPITest, EventAuthorizationDelayed) auto deserializer = lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1); - Reader<v1::master::Event> decoder( - Decoder<v1::master::Event>(deserializer), reader); + Reader<v1::master::Event> decoder(deserializer, reader); Future<Result<v1::master::Event>> event = decoder.read(); AWAIT_READY(event); @@ -3525,8 +3523,7 @@ TEST_P(MasterAPITest, FrameworksEvent) auto deserializer = lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1); - Reader<v1::master::Event> decoder( - Decoder<v1::master::Event>(deserializer), reader); + Reader<v1::master::Event> decoder(deserializer, reader); Future<Result<v1::master::Event>> event = decoder.read(); AWAIT_READY(event); @@ -3697,8 +3694,7 @@ TEST_P(MasterAPITest, Heartbeat) auto deserializer = lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1); - Reader<v1::master::Event> decoder( - Decoder<v1::master::Event>(deserializer), reader); + Reader<v1::master::Event> decoder(deserializer, reader); Future<Result<v1::master::Event>> event = decoder.read(); AWAIT_READY(event); @@ -3785,8 +3781,7 @@ TEST_P(MasterAPITest, MaxEventStreamSubscribers) ASSERT_SOME(response1->reader); http::Pipe::Reader reader1 = response1->reader.get(); - Reader<v1::master::Event> decoder1( - Decoder<v1::master::Event>(deserializer), reader1); + Reader<v1::master::Event> decoder1(deserializer, reader1); event = decoder1.read(); AWAIT_READY(event); @@ -3807,8 +3802,7 @@ TEST_P(MasterAPITest, MaxEventStreamSubscribers) ASSERT_SOME(response2->reader); http::Pipe::Reader reader2 = response2->reader.get(); - Reader<v1::master::Event> decoder2( - Decoder<v1::master::Event>(deserializer), reader2); + Reader<v1::master::Event> decoder2(deserializer, reader2); event = decoder2.read(); AWAIT_READY(event); @@ -3854,8 +3848,7 @@ TEST_P(MasterAPITest, MaxEventStreamSubscribers) ASSERT_SOME(response3->reader); http::Pipe::Reader reader3 = response3->reader.get(); - Reader<v1::master::Event> decoder3( - Decoder<v1::master::Event>(deserializer), reader3); + Reader<v1::master::Event> decoder3(deserializer, reader3); event = decoder3.read(); AWAIT_READY(event); @@ -3898,8 +3891,7 @@ TEST_P(MasterAPITest, MaxEventStreamSubscribers) ASSERT_SOME(response4->reader); http::Pipe::Reader reader4 = response4->reader.get(); - Reader<v1::master::Event> decoder4( - Decoder<v1::master::Event>(deserializer), reader4); + Reader<v1::master::Event> decoder4(deserializer, reader4); event = decoder4.read(); AWAIT_READY(event); @@ -5675,29 +5667,30 @@ static Future<tuple<string, string>> getProcessIOData( string stdoutReceived; string stderrReceived; - ::recordio::Decoder<v1::agent::ProcessIO> decoder(lambda::bind( - deserialize<v1::agent::ProcessIO>, contentType, lambda::_1)); + ::recordio::Decoder decoder; - Try<std::deque<Try<v1::agent::ProcessIO>>> records = - decoder.decode(data); + Try<std::deque<string>> records = decoder.decode(data); if (records.isError()) { return process::Failure(records.error()); } while(!records->empty()) { - Try<v1::agent::ProcessIO> record = records->front(); + string record = std::move(records->front()); records->pop_front(); - if (record.isError()) { - return process::Failure(record.error()); + Try<v1::agent::ProcessIO> processIO = + deserialize<v1::agent::ProcessIO>(contentType, record); + + if (processIO.isError()) { + return process::Failure(processIO.error()); } - if (record->data().type() == v1::agent::ProcessIO::Data::STDOUT) { - stdoutReceived += record->data().data(); - } else if (record->data().type() == + if (processIO->data().type() == v1::agent::ProcessIO::Data::STDOUT) { + stdoutReceived += processIO->data().data(); + } else if (processIO->data().type() == v1::agent::ProcessIO::Data::STDERR) { - stderrReceived += record->data().data(); + stderrReceived += processIO->data().data(); } } @@ -7968,9 +7961,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( http::Pipe::Writer writer = pipe.writer(); http::Pipe::Reader reader = pipe.reader(); - ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind( - serialize, contentType, lambda::_1)); - { v1::agent::Call call; call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT); @@ -7981,7 +7971,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID); attach->mutable_container_id()->CopyFrom(containerId); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(contentType, call))); } const std::string command = "pkill sleep\n"; @@ -8000,7 +7990,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN); processIO->mutable_data()->set_data(command); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(contentType, call))); } { @@ -8150,9 +8140,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, AttachContainerInputRepeat) http::Pipe::Writer writer = pipe.writer(); http::Pipe::Reader reader = pipe.reader(); - ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind( - serialize, contentType, lambda::_1)); - { v1::agent::Call call; call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT); @@ -8163,7 +8150,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, AttachContainerInputRepeat) attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID); attach->mutable_container_id()->CopyFrom(containerId); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(contentType, call))); } size_t offset = 0; @@ -8185,7 +8172,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, AttachContainerInputRepeat) processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN); processIO->mutable_data()->set_data(dataChunk); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(contentType, call))); } // Signal `EOF` to the 'cat' command. @@ -8203,7 +8190,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, AttachContainerInputRepeat) processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN); processIO->mutable_data()->set_data(""); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(contentType, call))); } writer.close(); @@ -8435,9 +8422,6 @@ TEST_F(AgentAPITest, AttachContainerInputFailure) http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); headers[MESSAGE_CONTENT_TYPE] = stringify(messageContentType); - ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind( - serialize, messageContentType, lambda::_1)); - EXPECT_CALL(containerizer, attach(_)) .WillOnce(Return(process::Failure("Unsupported"))); @@ -8445,7 +8429,7 @@ TEST_F(AgentAPITest, AttachContainerInputFailure) slave.get()->pid, "api/v1", headers, - encoder.encode(call), + ::recordio::encode(serialize(messageContentType, call)), stringify(contentType)); AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::InternalServerError().status, response); @@ -8573,9 +8557,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( ContentType contentType = ContentType::RECORDIO; ContentType messageContentType = ContentType::PROTOBUF; - ::recordio::Encoder<v1::agent::Call> encoder( - lambda::bind(serialize, messageContentType, lambda::_1)); - http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); headers[MESSAGE_CONTENT_TYPE] = stringify(messageContentType); @@ -8583,7 +8564,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( slave.get()->pid, "api/v1", headers, - encoder.encode(call), + ::recordio::encode(serialize(messageContentType, call)), stringify(contentType)); AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response); @@ -8624,14 +8605,11 @@ TEST_F(AgentAPITest, AttachContainerInputValidation) call.mutable_attach_container_input()->set_type( v1::agent::Call::AttachContainerInput::CONTAINER_ID); - ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind( - serialize, messageContentType, lambda::_1)); - Future<http::Response> response = http::post( slave.get()->pid, "api/v1", headers, - encoder.encode(call), + ::recordio::encode(serialize(messageContentType, call)), stringify(contentType)); AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::BadRequest().status, response); @@ -8646,14 +8624,11 @@ TEST_F(AgentAPITest, AttachContainerInputValidation) call.mutable_attach_container_input()->set_type( v1::agent::Call::AttachContainerInput::PROCESS_IO); - ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind( - serialize, messageContentType, lambda::_1)); - Future<http::Response> response = http::post( slave.get()->pid, "api/v1", headers, - encoder.encode(call), + ::recordio::encode(serialize(messageContentType, call)), stringify(contentType)); AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::BadRequest().status, response); @@ -8687,14 +8662,11 @@ TEST_F(AgentAPITest, HeaderValidation) call.mutable_attach_container_input()->set_type( v1::agent::Call::AttachContainerInput::CONTAINER_ID); - ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind( - serialize, ContentType::PROTOBUF, lambda::_1)); - Future<http::Response> response = http::post( slave.get()->pid, "api/v1", createBasicAuthHeaders(DEFAULT_CREDENTIAL), - encoder.encode(call), + ::recordio::encode(serialize(ContentType::PROTOBUF, call)), stringify(ContentType::RECORDIO)); AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::BadRequest().status, response); @@ -8708,9 +8680,6 @@ TEST_F(AgentAPITest, HeaderValidation) call.mutable_attach_container_input()->set_type( v1::agent::Call::AttachContainerInput::CONTAINER_ID); - ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind( - serialize, ContentType::PROTOBUF, lambda::_1)); - http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); headers[MESSAGE_CONTENT_TYPE] = "unsupported/media-type"; @@ -8718,7 +8687,7 @@ TEST_F(AgentAPITest, HeaderValidation) slave.get()->pid, "api/v1", headers, - encoder.encode(call), + ::recordio::encode(serialize(ContentType::PROTOBUF, call)), stringify(ContentType::RECORDIO)); AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::UnsupportedMediaType().status, @@ -9382,9 +9351,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPIStreamingTest, http::Pipe::Writer writer = pipe.writer(); http::Pipe::Reader reader = pipe.reader(); - ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind( - serialize, messageContentType, lambda::_1)); - // Prepare the data that needs to be streamed to the entrypoint // of the container. @@ -9398,7 +9364,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPIStreamingTest, attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID); attach->mutable_container_id()->CopyFrom(containerId); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(messageContentType, call))); } size_t offset = 0; @@ -9420,7 +9386,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPIStreamingTest, processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN); processIO->mutable_data()->set_data(dataChunk); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(messageContentType, call))); } // Signal `EOT` to the terminal so that it sends `EOF` to `cat` command. @@ -9438,7 +9404,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPIStreamingTest, processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN); processIO->mutable_data()->set_data("\x04"); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(messageContentType, call))); } writer.close(); @@ -9610,9 +9576,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( http::Pipe::Writer writer = pipe.writer(); http::Pipe::Reader reader = pipe.reader(); - ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind( - serialize, messageContentType, lambda::_1)); - { v1::agent::Call call; call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT); @@ -9623,7 +9586,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID); attach->mutable_container_id()->CopyFrom(containerId); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(messageContentType, call))); } size_t offset = 0; @@ -9645,7 +9608,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN); processIO->mutable_data()->set_data(dataChunk); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(messageContentType, call))); } // Signal `EOF` to the 'cat' command. @@ -9663,7 +9626,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN); processIO->mutable_data()->set_data(""); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(messageContentType, call))); } writer.close(); diff --git a/src/tests/common/recordio_tests.cpp b/src/tests/common/recordio_tests.cpp index 5dd6880..a79f08b 100644 --- a/src/tests/common/recordio_tests.cpp +++ b/src/tests/common/recordio_tests.cpp @@ -69,19 +69,16 @@ TEST(RecordIOReaderTest, EndOfFile) { // Write some data to the pipe so that records // are available before any reads occur. - ::recordio::Encoder<string> encoder(strings::upper); - string data; - data += encoder.encode("hello"); - data += encoder.encode("world!"); + data += ::recordio::encode("HELLO"); + data += ::recordio::encode("WORLD!"); process::http::Pipe pipe; pipe.writer().write(data); mesos::internal::recordio::Reader<string> reader( - ::recordio::Decoder<string>(strings::lower), - pipe.reader()); + strings::lower, pipe.reader()); AWAIT_EXPECT_EQ(Result<string>::some("hello"), reader.read()); AWAIT_EXPECT_EQ(Result<string>::some("world!"), reader.read()); @@ -93,7 +90,7 @@ TEST(RecordIOReaderTest, EndOfFile) EXPECT_TRUE(read1.isPending()); EXPECT_TRUE(read2.isPending()); - pipe.writer().write(encoder.encode("goodbye")); + pipe.writer().write(::recordio::encode("goodbye")); pipe.writer().close(); AWAIT_EXPECT_EQ(Result<string>::some("goodbye"), read1); @@ -106,12 +103,10 @@ TEST(RecordIOReaderTest, EndOfFile) TEST(RecordIOReaderTest, DecodingFailure) { - ::recordio::Encoder<string> encoder(strings::upper); process::http::Pipe pipe; mesos::internal::recordio::Reader<string> reader( - ::recordio::Decoder<string>(strings::lower), - pipe.reader()); + strings::lower, pipe.reader()); // Have multiple outstanding reads before we fail the decoder. Future<Result<string>> read1 = reader.read(); @@ -119,7 +114,7 @@ TEST(RecordIOReaderTest, DecodingFailure) Future<Result<string>> read3 = reader.read(); // Write non-encoded data to the pipe so that the decoder fails. - pipe.writer().write(encoder.encode("encoded")); + pipe.writer().write(::recordio::encode("encoded")); pipe.writer().write("not encoded!\n"); AWAIT_EXPECT_EQ(Result<string>::some("encoded"), read1); @@ -128,7 +123,7 @@ TEST(RecordIOReaderTest, DecodingFailure) // The reader is now in a failed state, subsequent // writes will be dropped and all reads will fail. - pipe.writer().write(encoder.encode("encoded")); + pipe.writer().write(::recordio::encode("encoded")); AWAIT_EXPECT_FAILED(reader.read()); } @@ -136,12 +131,10 @@ TEST(RecordIOReaderTest, DecodingFailure) TEST(RecordIOReaderTest, PipeFailure) { - ::recordio::Encoder<string> encoder(strings::upper); process::http::Pipe pipe; mesos::internal::recordio::Reader<string> reader( - ::recordio::Decoder<string>(strings::lower), - pipe.reader()); + strings::lower, pipe.reader()); // Have multiple outstanding reads before we fail the writer. Future<Result<string>> read1 = reader.read(); @@ -149,7 +142,7 @@ TEST(RecordIOReaderTest, PipeFailure) Future<Result<string>> read3 = reader.read(); // Write a record, then fail the pipe writer! - pipe.writer().write(encoder.encode("encoded")); + pipe.writer().write(::recordio::encode("ENCODED")); pipe.writer().fail("failure"); AWAIT_EXPECT_EQ(Result<string>::some("encoded"), read1); @@ -167,20 +160,17 @@ TEST(RecordIOTransformTest, EndOfFile) { // Write some data to the pipe so that records // are available before any reads occur. - ::recordio::Encoder<string> encoder(strings::upper); - string data; - data += encoder.encode("hello "); - data += encoder.encode("world! "); + data += ::recordio::encode("HELLO "); + data += ::recordio::encode("WORLD! "); process::http::Pipe pipeA; pipeA.writer().write(data); process::Owned<mesos::internal::recordio::Reader<string>> reader( new mesos::internal::recordio::Reader<string>( - ::recordio::Decoder<string>(strings::lower), - pipeA.reader())); + strings::lower, pipeA.reader())); process::http::Pipe pipeB; @@ -207,20 +197,17 @@ TEST(RecordIOTransformTest, ReaderWriterEndFail) { // Write some data to the pipe so that records // are available before any reads occur. - ::recordio::Encoder<string> encoder(strings::upper); - string data; - data += encoder.encode("hello "); - data += encoder.encode("world! "); + data += ::recordio::encode("HELLO "); + data += ::recordio::encode("WORLD! "); process::http::Pipe pipeA; pipeA.writer().write(data); process::Owned<mesos::internal::recordio::Reader<string>> reader( new mesos::internal::recordio::Reader<string>( - ::recordio::Decoder<string>(strings::lower), - pipeA.reader())); + strings::lower, pipeA.reader())); process::http::Pipe pipeB; @@ -244,20 +231,17 @@ TEST(RecordIOTransformTest, WriterReadEndFail) { // Write some data to the pipe so that records // are available before any reads occur. - ::recordio::Encoder<string> encoder(strings::upper); - string data; - data += encoder.encode("hello "); - data += encoder.encode("world! "); + data += ::recordio::encode("HELLO "); + data += ::recordio::encode("WORLD! "); process::http::Pipe pipeA; pipeA.writer().write(data); process::Owned<mesos::internal::recordio::Reader<string>> reader( new mesos::internal::recordio::Reader<string>( - ::recordio::Decoder<string>(strings::lower), - pipeA.reader())); + strings::lower, pipeA.reader())); process::http::Pipe pipeB; diff --git a/src/tests/containerizer/io_switchboard_tests.cpp b/src/tests/containerizer/io_switchboard_tests.cpp index e443145..1b347eb 100644 --- a/src/tests/containerizer/io_switchboard_tests.cpp +++ b/src/tests/containerizer/io_switchboard_tests.cpp @@ -146,27 +146,29 @@ protected: string stdoutReceived; string stderrReceived; - ::recordio::Decoder<agent::ProcessIO> decoder(lambda::bind( - deserialize<agent::ProcessIO>, ContentType::JSON, lambda::_1)); + ::recordio::Decoder decoder; - Try<std::deque<Try<agent::ProcessIO>>> records = decoder.decode(data); + Try<std::deque<string>> records = decoder.decode(data); if (records.isError()) { return process::Failure(records.error()); } while(!records->empty()) { - Try<agent::ProcessIO> record = records->front(); + string record = std::move(records->front()); records->pop_front(); - if (record.isError()) { - return process::Failure(record.error()); + Try<agent::ProcessIO> processIO = + deserialize<agent::ProcessIO>(ContentType::JSON, record); + + if (processIO.isError()) { + return process::Failure(processIO.error()); } - if (record->data().type() == agent::ProcessIO::Data::STDOUT) { - stdoutReceived += record->data().data(); - } else if (record->data().type() == agent::ProcessIO::Data::STDERR) { - stderrReceived += record->data().data(); + if (processIO->data().type() == agent::ProcessIO::Data::STDOUT) { + stdoutReceived += processIO->data().data(); + } else if (processIO->data().type() == agent::ProcessIO::Data::STDERR) { + stderrReceived += processIO->data().data(); } } @@ -442,8 +444,7 @@ TEST_F(IOSwitchboardServerTest, SendHeartbeat) }; recordio::Reader<agent::ProcessIO> responseDecoder( - ::recordio::Decoder<agent::ProcessIO>(deserializer), - reader.get()); + deserializer, reader.get()); // Wait for 5 heartbeat messages. Clock::pause(); @@ -555,9 +556,6 @@ TEST_F(IOSwitchboardServerTest, AttachInput) Future<http::Response> response = connection.send(request); - ::recordio::Encoder<mesos::agent::Call> encoder(lambda::bind( - serialize, ContentType::JSON, lambda::_1)); - Call call; call.set_type(Call::ATTACH_CONTAINER_INPUT); @@ -565,7 +563,7 @@ TEST_F(IOSwitchboardServerTest, AttachInput) attach->set_type(Call::AttachContainerInput::CONTAINER_ID); attach->mutable_container_id()->set_value(id::UUID::random().toString()); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(ContentType::JSON, call))); size_t offset = 0; size_t chunkSize = 4096; @@ -584,7 +582,7 @@ TEST_F(IOSwitchboardServerTest, AttachInput) message->mutable_data()->set_type(ProcessIO::Data::STDIN); message->mutable_data()->set_data(dataChunk); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(ContentType::JSON, call))); } writer.close(); @@ -667,9 +665,6 @@ TEST_F(IOSwitchboardServerTest, ReceiveHeartbeat) Future<http::Response> response = connection.send(request); - ::recordio::Encoder<mesos::agent::Call> encoder(lambda::bind( - serialize, ContentType::JSON, lambda::_1)); - Call call; call.set_type(Call::ATTACH_CONTAINER_INPUT); @@ -677,7 +672,7 @@ TEST_F(IOSwitchboardServerTest, ReceiveHeartbeat) attach->set_type(Call::AttachContainerInput::CONTAINER_ID); attach->mutable_container_id()->set_value(id::UUID::random().toString()); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(ContentType::JSON, call))); // Send 5 heartbeat messages. Duration heartbeat = Milliseconds(10); @@ -693,7 +688,7 @@ TEST_F(IOSwitchboardServerTest, ReceiveHeartbeat) message->mutable_control()->mutable_heartbeat() ->mutable_interval()->set_nanoseconds(heartbeat.ns()); - writer.write(encoder.encode(call)); + writer.write(::recordio::encode(serialize(ContentType::JSON, call))); Clock::advance(heartbeat); } diff --git a/src/tests/executor_http_api_tests.cpp b/src/tests/executor_http_api_tests.cpp index 99bcafb..d111547 100644 --- a/src/tests/executor_http_api_tests.cpp +++ b/src/tests/executor_http_api_tests.cpp @@ -936,11 +936,8 @@ TEST_P(ExecutorHttpApiTest, Subscribe) Option<Pipe::Reader> reader = response->reader; ASSERT_SOME(reader); - auto deserializer = - lambda::bind(deserialize<Event>, contentType, lambda::_1); - Reader<Event> responseDecoder( - Decoder<Event>(deserializer), + lambda::bind(deserialize<Event>, contentType, lambda::_1), reader.get()); Future<Result<Event>> event = responseDecoder.read(); @@ -1047,9 +1044,9 @@ TEST_P(ExecutorHttpApiTest, HeartbeatEvents) Option<Pipe::Reader> reader = response->reader; ASSERT_SOME(reader); - auto deserializer = - lambda::bind(deserialize<Event>, contentType, lambda::_1); - Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + Reader<Event> responseDecoder( + lambda::bind(deserialize<Event>, contentType, lambda::_1), + reader.get()); Future<Result<Event>> event = responseDecoder.read(); AWAIT_READY(event); @@ -1158,10 +1155,8 @@ TEST_F(ExecutorHttpApiTest, HeartbeatCalls) event.mutable_subscribed()->mutable_container_id()->set_value(":P"); - ::recordio::Encoder<v1::executor::Event> encoder( - lambda::bind(serialize, ContentType::PROTOBUF, lambda::_1)); - - pipe.writer().write(encoder.encode(event)); + pipe.writer().write( + ::recordio::encode(serialize(ContentType::PROTOBUF, event))); } // Set the expectation for an executor to register with the fake agent. diff --git a/src/tests/master/mock_master_api_subscriber.cpp b/src/tests/master/mock_master_api_subscriber.cpp index a0808e8..893d3e3 100644 --- a/src/tests/master/mock_master_api_subscriber.cpp +++ b/src/tests/master/mock_master_api_subscriber.cpp @@ -98,7 +98,7 @@ private: deserialize<Event>, contentType, lambda::_1); std::unique_ptr<Reader<Event>> reader(new Reader<Event>( - ::recordio::Decoder<Event>(deserializer), response->reader.get())); + deserializer, response->reader.get())); auto decode = lambda::bind( [](std::unique_ptr<Reader<Event>>& d) { return d->read(); }, diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp index 84ec70b..e1fedf7 100644 --- a/src/tests/resource_provider_manager_tests.cpp +++ b/src/tests/resource_provider_manager_tests.cpp @@ -544,8 +544,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesSuccess) ASSERT_SOME(reader); responseDecoder.reset(new recordio::Reader<Event>( - ::recordio::Decoder<Event>( - lambda::bind(deserialize<Event>, contentType, lambda::_1)), + lambda::bind(deserialize<Event>, contentType, lambda::_1), reader.get())); Future<Result<Event>> event = responseDecoder->read(); @@ -652,8 +651,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesFailure) ASSERT_SOME(reader); responseDecoder.reset(new recordio::Reader<Event>( - ::recordio::Decoder<Event>( - lambda::bind(deserialize<Event>, contentType, lambda::_1)), + lambda::bind(deserialize<Event>, contentType, lambda::_1), reader.get())); Future<Result<Event>> event = responseDecoder->read(); @@ -753,8 +751,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesDisconnected) ASSERT_SOME(reader); responseDecoder.reset(new recordio::Reader<Event>( - ::recordio::Decoder<Event>( - lambda::bind(deserialize<Event>, contentType, lambda::_1)), + lambda::bind(deserialize<Event>, contentType, lambda::_1), reader.get())); Future<Result<Event>> event = responseDecoder->read(); @@ -846,8 +843,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint) ASSERT_SOME(reader); recordio::Reader<Event> responseDecoder( - ::recordio::Decoder<Event>( - lambda::bind(deserialize<Event>, contentType, lambda::_1)), + lambda::bind(deserialize<Event>, contentType, lambda::_1), reader.get()); Future<Result<Event>> event = responseDecoder.read(); @@ -1696,8 +1692,7 @@ TEST_F(ResourceProviderManagerHttpApiTest, RemoveResourceProvider) ASSERT_SOME(reader); recordio::Reader<Event> responseDecoder( - ::recordio::Decoder<Event>( - lambda::bind(deserialize<Event>, contentType, lambda::_1)), + lambda::bind(deserialize<Event>, contentType, lambda::_1), reader.get()); // We expect the manager to drop the subscribe call since diff --git a/src/tests/scheduler_http_api_tests.cpp b/src/tests/scheduler_http_api_tests.cpp index d5b5eec..66ed9ce 100644 --- a/src/tests/scheduler_http_api_tests.cpp +++ b/src/tests/scheduler_http_api_tests.cpp @@ -282,7 +282,7 @@ TEST_P(SchedulerHttpApiTest, Subscribe) auto deserializer = lambda::bind( &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1); - Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + Reader<Event> responseDecoder(deserializer, reader.get()); Future<Result<Event>> event = responseDecoder.read(); AWAIT_READY(event); @@ -350,7 +350,7 @@ TEST_P(SchedulerHttpApiTest, RejectFrameworkWithInvalidRole) auto deserializer = lambda::bind( &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1); - Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + Reader<Event> responseDecoder(deserializer, reader.get()); Future<Result<Event>> event = responseDecoder.read(); AWAIT_READY(event); @@ -430,7 +430,7 @@ TEST_P(SchedulerHttpApiTest, SubscribedOnRetry) Option<Pipe::Reader> reader = response->reader; ASSERT_SOME(reader); - Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + Reader<Event> responseDecoder(deserializer, reader.get()); Future<Result<Event>> event = responseDecoder.read(); AWAIT_READY(event); @@ -457,7 +457,7 @@ TEST_P(SchedulerHttpApiTest, SubscribedOnRetry) Option<Pipe::Reader> reader = response->reader; ASSERT_SOME(reader); - Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + Reader<Event> responseDecoder(deserializer, reader.get()); // Check if we were successfully able to subscribe after the blip. Future<Result<Event>> event = responseDecoder.read(); @@ -544,7 +544,7 @@ TEST_P(SchedulerHttpApiTest, UpdatePidToHttpScheduler) auto deserializer = lambda::bind( &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1); - Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + Reader<Event> responseDecoder(deserializer, reader.get()); Future<Result<Event>> event = responseDecoder.read(); AWAIT_READY(event); @@ -608,7 +608,7 @@ TEST_P(SchedulerHttpApiTest, UpdateHttpToPidScheduler) auto deserializer = lambda::bind( &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1); - Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + Reader<Event> responseDecoder(deserializer, reader.get()); Future<Result<Event>> event = responseDecoder.read(); AWAIT_READY(event); @@ -705,7 +705,7 @@ TEST_P(SchedulerHttpApiTest, UpdateHttpToPidSchedulerAndBack) auto deserializer = lambda::bind( &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1); - Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + Reader<Event> responseDecoder(deserializer, reader.get()); // Get SUBSCRIBED event and check framework ID. Future<Result<Event>> event = responseDecoder.read(); @@ -899,7 +899,7 @@ TEST_P(SchedulerHttpApiTest, TeardownWithoutStreamId) auto deserializer = lambda::bind( &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1); - Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + Reader<Event> responseDecoder(deserializer, reader.get()); Future<Result<Event>> event = responseDecoder.read(); AWAIT_READY(event); @@ -974,7 +974,7 @@ TEST_P(SchedulerHttpApiTest, TeardownWrongStreamId) auto deserializer = lambda::bind( &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1); - Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + Reader<Event> responseDecoder(deserializer, reader.get()); Future<Result<Event>> event = responseDecoder.read(); AWAIT_READY(event); @@ -1020,7 +1020,7 @@ TEST_P(SchedulerHttpApiTest, TeardownWrongStreamId) auto deserializer = lambda::bind( &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1); - Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + Reader<Event> responseDecoder(deserializer, reader.get()); Future<Result<Event>> event = responseDecoder.read(); AWAIT_READY(event); @@ -1092,7 +1092,7 @@ TEST_P(SchedulerHttpApiTest, MalformedUUID) auto deserializer = lambda::bind( &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1); - Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + Reader<Event> responseDecoder(deserializer, reader.get()); Future<Result<Event>> event = responseDecoder.read(); AWAIT_READY(event);
