HDFS-9093. Initialize protobuf fields in RemoteBlockReaderTest. Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5876945d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5876945d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5876945d Branch: refs/heads/HDFS-8707 Commit: 5876945dc7cc7341627512cf14ac2411832e0291 Parents: f81439d Author: Haohui Mai <whe...@apache.org> Authored: Wed Sep 16 16:48:56 2015 -0700 Committer: Haohui Mai <whe...@apache.org> Committed: Wed Oct 7 00:17:13 2015 -0700 ---------------------------------------------------------------------- .../native/libhdfspp/tests/mock_connection.cc | 4 + .../native/libhdfspp/tests/mock_connection.h | 10 +- .../libhdfspp/tests/remote_block_reader_test.cc | 160 +++++++++++-------- 3 files changed, 104 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5876945d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc index e1dfdc7..93a3099 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc @@ -20,6 +20,10 @@ namespace hdfs { +MockConnectionBase::MockConnectionBase(::asio::io_service *io_service) + : io_service_(io_service) +{} + MockConnectionBase::~MockConnectionBase() {} } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5876945d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h index e917e9d..086797f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h @@ -21,12 +21,15 @@ #include <asio/error_code.hpp> #include <asio/buffer.hpp> #include <asio/streambuf.hpp> +#include <asio/io_service.hpp> + #include <gmock/gmock.h> namespace hdfs { class MockConnectionBase { public: + MockConnectionBase(::asio::io_service *io_service); virtual ~MockConnectionBase(); typedef std::pair<asio::error_code, std::string> ProducerResult; template <class MutableBufferSequence, class Handler> @@ -34,7 +37,7 @@ public: if (produced_.size() == 0) { ProducerResult r = Produce(); if (r.first) { - handler(r.first, 0); + io_service_->post(std::bind(handler, r.first, 0)); } asio::mutable_buffers_1 data = produced_.prepare(r.second.size()); asio::buffer_copy(data, asio::buffer(r.second)); @@ -44,17 +47,18 @@ public: size_t len = std::min(asio::buffer_size(buf), produced_.size()); asio::buffer_copy(buf, produced_.data()); produced_.consume(len); - handler(asio::error_code(), len); + io_service_->post(std::bind(handler, asio::error_code(), len)); } template <class ConstBufferSequence, class Handler> void async_write_some(const ConstBufferSequence &buf, Handler &&handler) { // CompletionResult res = OnWrite(buf); - handler(asio::error_code(), asio::buffer_size(buf)); + io_service_->post(std::bind(handler, asio::error_code(), asio::buffer_size(buf))); } protected: virtual ProducerResult Produce() = 0; + ::asio::io_service *io_service_; private: asio::streambuf produced_; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5876945d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc index 5307d39..388a106 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc @@ -51,6 +51,8 @@ namespace hdfs { class MockDNConnection : public MockConnectionBase { public: + MockDNConnection(::asio::io_service &io_service) + : MockConnectionBase(&io_service) {} MOCK_METHOD0(Produce, ProducerResult()); }; } @@ -91,35 +93,30 @@ ProducePacket(const std::string &data, const std::string &checksum, return std::make_pair(error_code(), std::move(payload)); } -template<class Stream = MockDNConnection> +template <class Stream = MockDNConnection, class Handler> static std::shared_ptr<RemoteBlockReader<Stream>> -ReadContent(Stream *conn, TokenProto *token, - const ExtendedBlockProto &block, uint64_t length, uint64_t offset, - const mutable_buffers_1 &buf, Status *status, size_t *transferred) { +ReadContent(Stream *conn, TokenProto *token, const ExtendedBlockProto &block, + uint64_t length, uint64_t offset, const mutable_buffers_1 &buf, + const Handler &handler) { BlockReaderOptions options; - auto reader = - std::make_shared<RemoteBlockReader<Stream>>(options, conn); + auto reader = std::make_shared<RemoteBlockReader<Stream>>(options, conn); Status result; - reader->async_connect( - "libhdfs++", token, &block, length, offset, - [buf, reader, status, transferred](const Status &stat) { - if (!stat.ok()) { - *status = stat; - } else { - reader->async_read_some( - buf, [status, transferred](const Status &stat, size_t t) { - *transferred = t; - *status = stat; - }); - } - }); + reader->async_connect("libhdfs++", token, &block, length, offset, + [buf, reader, handler](const Status &stat) { + if (!stat.ok()) { + handler(stat, 0); + } else { + reader->async_read_some(buf, handler); + } + }); return reader; } TEST(RemoteBlockReaderTest, TestReadWholeBlock) { static const size_t kChunkSize = 512; static const string kChunkData(kChunkSize, 'a'); - MockDNConnection conn; + ::asio::io_service io_service; + MockDNConnection conn(io_service); BlockOpResponseProto block_op_resp; block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); @@ -128,15 +125,20 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) { .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + std::string data(kChunkSize, 0); - size_t transferred = 0; - Status stat; ReadContent(&conn, nullptr, block, kChunkSize, 0, - buffer(const_cast<char *>(data.c_str()), data.size()), &stat, - &transferred); - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(kChunkSize, transferred); - ASSERT_EQ(kChunkData, data); + buffer(const_cast<char *>(data.c_str()), data.size()), + [&data, &io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + io_service.stop(); + }); + io_service.run(); } TEST(RemoteBlockReaderTest, TestReadWithinChunk) { @@ -145,7 +147,8 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) { static const size_t kOffset = kChunkSize / 4; static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b'); - MockDNConnection conn; + ::asio::io_service io_service; + MockDNConnection conn(io_service); BlockOpResponseProto block_op_resp; ReadOpChecksumInfoProto *checksum_info = block_op_resp.mutable_readopchecksuminfo(); @@ -160,22 +163,28 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) { .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true))); ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + string data(kLength, 0); - size_t transferred = 0; - Status stat; ReadContent(&conn, nullptr, block, data.size(), kOffset, - buffer(const_cast<char *>(data.c_str()), data.size()), &stat, - &transferred); - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(kLength, transferred); - ASSERT_EQ(kChunkData.substr(kOffset, kLength), data); + buffer(const_cast<char *>(data.c_str()), data.size()), + [&data, &io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kLength, transferred); + ASSERT_EQ(kChunkData.substr(kOffset, kLength), data); + io_service.stop(); + }); + io_service.run(); } TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { static const size_t kChunkSize = 1024; static const string kChunkData(kChunkSize, 'a'); - MockDNConnection conn; + ::asio::io_service io_service; + MockDNConnection conn(io_service); BlockOpResponseProto block_op_resp; block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); @@ -185,25 +194,37 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true))); ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + string data(kChunkSize, 0); - size_t transferred = 0; - Status stat; mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size()); - auto reader = ReadContent(&conn, nullptr, block, data.size(), 0, buf, &stat, - &transferred); - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(kChunkSize, transferred); - ASSERT_EQ(kChunkData, data); - - data.clear(); - data.resize(kChunkSize); - transferred = 0; - - reader->async_read_some(buf, [&data](const Status &stat, size_t transferred) { - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(kChunkSize, transferred); - ASSERT_EQ(kChunkData, data); - }); + BlockReaderOptions options; + auto reader = std::make_shared<RemoteBlockReader<MockDNConnection> >(options, &conn); + Status result; + reader->async_connect( + "libhdfs++", nullptr, &block, data.size(), 0, + [buf, reader, &data, &io_service](const Status &stat) { + ASSERT_TRUE(stat.ok()); + reader->async_read_some( + buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + data.clear(); + data.resize(kChunkSize); + transferred = 0; + reader->async_read_some( + buf, [&data,&io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + io_service.stop(); + }); + }); + }); + io_service.run(); } TEST(RemoteBlockReaderTest, TestSaslConnection) { @@ -212,7 +233,8 @@ TEST(RemoteBlockReaderTest, TestSaslConnection) { static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/" "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\"," "charset=utf-8,algorithm=md5-sess"; - MockDNConnection conn; + ::asio::io_service io_service; + MockDNConnection conn(io_service); BlockOpResponseProto block_op_resp; block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); @@ -233,20 +255,24 @@ TEST(RemoteBlockReaderTest, TestSaslConnection) { DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar"); ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + std::string data(kChunkSize, 0); - size_t transferred = 0; - Status stat; - sasl_conn.Handshake([&stat](const Status &s) { - stat = s; - }); - - ASSERT_TRUE(stat.ok()); - ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0, - buffer(const_cast<char *>(data.c_str()), data.size()), &stat, - &transferred); - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(kChunkSize, transferred); - ASSERT_EQ(kChunkData, data); + sasl_conn.Handshake([&sasl_conn, &block, &data, &io_service]( + const Status &s) { + ASSERT_TRUE(s.ok()); + ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0, + buffer(const_cast<char *>(data.c_str()), data.size()), + [&data, &io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + io_service.stop(); + }); + }); + io_service.run(); } int main(int argc, char *argv[]) {