HDFS-8788. Implement unit tests for remote block reader in libhdfspp. Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/172623de Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/172623de Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/172623de Branch: refs/heads/HDFS-8707 Commit: 172623de91dcc6e27d2b97d37ccdd8fb9089ab0d Parents: 928a9a1 Author: Haohui Mai <[email protected]> Authored: Wed Jul 15 16:58:42 2015 -0700 Committer: Haohui Mai <[email protected]> Committed: Wed Jul 15 16:59:05 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs-client/pom.xml | 13 ++ .../src/main/native/CMakeLists.txt | 2 + .../src/main/native/libhdfspp/CMakeLists.txt | 1 + .../main/native/libhdfspp/tests/CMakeLists.txt | 22 ++ .../native/libhdfspp/tests/mock_connection.cc | 25 +++ .../native/libhdfspp/tests/mock_connection.h | 64 ++++++ .../libhdfspp/tests/remote_block_reader_test.cc | 213 +++++++++++++++++++ 7 files changed, 340 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/172623de/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml index 1a03cbd..9f7070e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml @@ -137,6 +137,19 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> </target> </configuration> </execution> + <execution> + <id>native_tests</id> + <phase>test</phase> + <goals><goal>run</goal></goals> + <configuration> + <skip>${skipTests}</skip> + <target> + <exec executable="make" dir="${project.build.directory}/native" failonerror="true"> + <arg line="test"/> + </exec> + </target> + </configuration> + </execution> </executions> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/hadoop/blob/172623de/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/CMakeLists.txt index ef14183..309e99f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/CMakeLists.txt @@ -18,4 +18,6 @@ cmake_minimum_required(VERSION 2.8 FATAL_ERROR) +enable_testing() + add_subdirectory(libhdfspp) http://git-wip-us.apache.org/repos/asf/hadoop/blob/172623de/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt index cae786c..51e3122 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt @@ -51,3 +51,4 @@ include_directories( add_subdirectory(third_party/gmock-1.7.0) add_subdirectory(lib) +add_subdirectory(tests) http://git-wip-us.apache.org/repos/asf/hadoop/blob/172623de/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt new file mode 100644 index 0000000..cd5e1b1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +add_library(test_common OBJECT mock_connection.cc) +add_executable(remote_block_reader_test remote_block_reader_test.cc $<TARGET_OBJECTS:test_common>) +target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} gmock_main) +add_test(remote_block_reader remote_block_reader_test) http://git-wip-us.apache.org/repos/asf/hadoop/blob/172623de/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc new file mode 100644 index 0000000..e1dfdc7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mock_connection.h" + +namespace hdfs { + +MockConnectionBase::~MockConnectionBase() {} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/172623de/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h new file mode 100644 index 0000000..e917e9d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_TEST_MOCK_CONNECTION_H_ +#define LIBHDFSPP_TEST_MOCK_CONNECTION_H_ + +#include <asio/error_code.hpp> +#include <asio/buffer.hpp> +#include <asio/streambuf.hpp> +#include <gmock/gmock.h> + +namespace hdfs { + +class MockConnectionBase { +public: + virtual ~MockConnectionBase(); + typedef std::pair<asio::error_code, std::string> ProducerResult; + template <class MutableBufferSequence, class Handler> + void async_read_some(const MutableBufferSequence &buf, Handler &&handler) { + if (produced_.size() == 0) { + ProducerResult r = Produce(); + if (r.first) { + handler(r.first, 0); + } + asio::mutable_buffers_1 data = produced_.prepare(r.second.size()); + asio::buffer_copy(data, asio::buffer(r.second)); + produced_.commit(r.second.size()); + } + + size_t len = std::min(asio::buffer_size(buf), produced_.size()); + asio::buffer_copy(buf, produced_.data()); + produced_.consume(len); + handler(asio::error_code(), len); + } + + template <class ConstBufferSequence, class Handler> + void async_write_some(const ConstBufferSequence &buf, Handler &&handler) { + // CompletionResult res = OnWrite(buf); + handler(asio::error_code(), asio::buffer_size(buf)); + } + +protected: + virtual ProducerResult Produce() = 0; + +private: + asio::streambuf produced_; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/172623de/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc new file mode 100644 index 0000000..92cbc8f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mock_connection.h" + +#include "datatransfer.pb.h" +#include "common/util.h" +#include "reader/block_reader.h" + +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +using namespace hdfs; + +using ::hadoop::common::TokenProto; +using ::hadoop::hdfs::BlockOpResponseProto; +using ::hadoop::hdfs::ChecksumProto; +using ::hadoop::hdfs::ExtendedBlockProto; +using ::hadoop::hdfs::PacketHeaderProto; +using ::hadoop::hdfs::ReadOpChecksumInfoProto; + +using ::asio::buffer; +using ::asio::error_code; +using ::asio::mutable_buffers_1; +using ::testing::Return; +using std::make_pair; +using std::string; + +namespace pb = ::google::protobuf; +namespace pbio = pb::io; + +namespace hdfs { + +class MockDNConnection : public MockConnectionBase { +public: + MOCK_METHOD0(Produce, ProducerResult()); +}; +} + +static inline string ToDelimitedString(const pb::MessageLite *msg) { + string res; + res.reserve(hdfs::DelimitedPBMessageSize(msg)); + pbio::StringOutputStream os(&res); + pbio::CodedOutputStream out(&os); + out.WriteVarint32(msg->ByteSize()); + msg->SerializeToCodedStream(&out); + return res; +} + +static inline std::pair<error_code, string> Produce(const std::string &s) { + return make_pair(error_code(), s); +} + +static inline std::pair<error_code, string> +ProducePacket(const std::string &data, const std::string &checksum, + int offset_in_block, int seqno, bool last_packet) { + PacketHeaderProto proto; + proto.set_datalen(data.size()); + proto.set_offsetinblock(offset_in_block); + proto.set_seqno(seqno); + proto.set_lastpacketinblock(last_packet); + + char prefix[6]; + *reinterpret_cast<unsigned *>(prefix) = + htonl(data.size() + checksum.size() + sizeof(int)); + *reinterpret_cast<short *>(prefix + sizeof(int)) = htons(proto.ByteSize()); + std::string payload(prefix, sizeof(prefix)); + payload.reserve(payload.size() + proto.ByteSize() + checksum.size() + + data.size()); + proto.AppendToString(&payload); + payload += checksum; + payload += data; + return std::make_pair(error_code(), std::move(payload)); +} + +static std::shared_ptr<RemoteBlockReader<MockDNConnection>> +ReadContent(MockDNConnection *conn, TokenProto *token, + const ExtendedBlockProto &block, uint64_t length, uint64_t offset, + const mutable_buffers_1 &buf, Status *status, size_t *transferred) { + BlockReaderOptions options; + auto reader = + std::make_shared<RemoteBlockReader<MockDNConnection>>(options, conn); + Status result; + reader->async_connect( + "libhdfs++", token, &block, length, offset, + [buf, reader, status, transferred](const Status &stat) { + if (!stat.ok()) { + *status = stat; + } else { + reader->async_read_some( + buf, [status, transferred](const Status &stat, size_t t) { + *transferred = t; + *status = stat; + }); + } + }); + return reader; +} + +TEST(RemoteBlockReaderTest, TestReadWholeBlock) { + static const size_t kChunkSize = 512; + static const string kChunkData(kChunkSize, 'a'); + MockDNConnection conn; + BlockOpResponseProto block_op_resp; + + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + + EXPECT_CALL(conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); + + ExtendedBlockProto block; + std::string data(kChunkSize, 0); + size_t transferred = 0; + Status stat; + ReadContent(&conn, nullptr, block, kChunkSize, 0, + buffer(const_cast<char *>(data.c_str()), data.size()), &stat, + &transferred); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); +} + +TEST(RemoteBlockReaderTest, TestReadWithinChunk) { + static const size_t kChunkSize = 1024; + static const size_t kLength = kChunkSize / 4 * 3; + static const size_t kOffset = kChunkSize / 4; + static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b'); + + MockDNConnection conn; + BlockOpResponseProto block_op_resp; + ReadOpChecksumInfoProto *checksum_info = + block_op_resp.mutable_readopchecksuminfo(); + checksum_info->set_chunkoffset(0); + ChecksumProto *checksum = checksum_info->mutable_checksum(); + checksum->set_type(::hadoop::hdfs::ChecksumTypeProto::CHECKSUM_NULL); + checksum->set_bytesperchecksum(512); + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + + EXPECT_CALL(conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true))); + + ExtendedBlockProto block; + string data(kLength, 0); + size_t transferred = 0; + Status stat; + ReadContent(&conn, nullptr, block, data.size(), kOffset, + buffer(const_cast<char *>(data.c_str()), data.size()), &stat, + &transferred); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kLength, transferred); + ASSERT_EQ(kChunkData.substr(kOffset, kLength), data); +} + +TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { + static const size_t kChunkSize = 1024; + static const string kChunkData(kChunkSize, 'a'); + + MockDNConnection conn; + BlockOpResponseProto block_op_resp; + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + + EXPECT_CALL(conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false))) + .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true))); + + ExtendedBlockProto block; + string data(kChunkSize, 0); + size_t transferred = 0; + Status stat; + mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size()); + auto reader = ReadContent(&conn, nullptr, block, data.size(), 0, buf, &stat, + &transferred); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + + data.clear(); + data.resize(kChunkSize); + transferred = 0; + + reader->async_read_some(buf, [&data](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + }); +} + +int main(int argc, char *argv[]) { + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + return RUN_ALL_TESTS(); +}
