ARROW-424: [C++] Make ReadAt, Write HDFS functions threadsafe This also fixes the HDFS test suite to actually use libhdfs3 (it was not by accident)
Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #712 from wesm/ARROW-424 and squashes the following commits: 0894719 [Wes McKinney] Make ReadAt, Write HDFS functions threadsafe Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/4e4435ec Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/4e4435ec Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/4e4435ec Branch: refs/heads/master Commit: 4e4435ecc4e4a02a9029e65c312bdfb00af7e291 Parents: 078357a Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Wed May 24 17:20:40 2017 -0400 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Wed May 31 13:45:48 2017 -0400 ---------------------------------------------------------------------- cpp/src/arrow/array.h | 1 - cpp/src/arrow/io/hdfs.cc | 5 +++ cpp/src/arrow/io/io-hdfs-test.cc | 68 ++++++++++++++++++++++++++++++----- 3 files changed, 64 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/4e4435ec/cpp/src/arrow/array.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 331c6c3..2c96ce0 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -508,7 +508,6 @@ ARROW_EXTERN_TEMPLATE NumericArray<Time32Type>; ARROW_EXTERN_TEMPLATE NumericArray<Time64Type>; ARROW_EXTERN_TEMPLATE NumericArray<TimestampType>; - /// \brief Perform any validation checks to determine obvious inconsistencies /// with the array's internal data /// http://git-wip-us.apache.org/repos/asf/arrow/blob/4e4435ec/cpp/src/arrow/io/hdfs.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index a27e132..ba9c2c2 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -89,6 +89,9 @@ class HdfsAnyFileImpl { LibHdfsShim* driver_; + // For threadsafety + std::mutex lock_; + // These are pointers in libhdfs, so OK to copy hdfsFS fs_; hdfsFile file_; @@ -116,6 +119,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { ret = driver_->Pread(fs_, file_, static_cast<tOffset>(position), reinterpret_cast<void*>(buffer), static_cast<tSize>(nbytes)); } else { + std::lock_guard<std::mutex> guard(lock_); RETURN_NOT_OK(Seek(position)); return Read(nbytes, bytes_read, buffer); } @@ -253,6 +257,7 @@ class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl { } Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written) { + std::lock_guard<std::mutex> guard(lock_); tSize ret = driver_->Write( fs_, file_, reinterpret_cast<const void*>(buffer), static_cast<tSize>(nbytes)); CHECK_FAILURE(ret, "Write"); http://git-wip-us.apache.org/repos/asf/arrow/blob/4e4435ec/cpp/src/arrow/io/io-hdfs-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc index 0fdb897..74f8042 100644 --- a/cpp/src/arrow/io/io-hdfs-test.cc +++ b/cpp/src/arrow/io/io-hdfs-test.cc @@ -19,6 +19,7 @@ #include <iostream> #include <sstream> #include <string> +#include <thread> #include "gtest/gtest.h" @@ -38,6 +39,14 @@ std::vector<uint8_t> RandomData(int64_t size) { return buffer; } +struct JNIDriver { + static HdfsDriver type; +}; + +struct PivotalDriver { + static HdfsDriver type; +}; + template <typename DRIVER> class TestHdfsClient : public ::testing::Test { public: @@ -112,6 +121,7 @@ class TestHdfsClient : public ::testing::Test { conf_.host = host == nullptr ? "localhost" : host; conf_.user = user; conf_.port = port == nullptr ? 20500 : atoi(port); + conf_.driver = DRIVER::type; ASSERT_OK(HdfsClient::Connect(&conf_, &client_)); } @@ -133,20 +143,19 @@ class TestHdfsClient : public ::testing::Test { std::shared_ptr<HdfsClient> client_; }; +template <> +std::string TestHdfsClient<PivotalDriver>::HdfsAbsPath(const std::string& relpath) { + std::stringstream ss; + ss << relpath; + return ss.str(); +} + #define SKIP_IF_NO_DRIVER() \ if (!this->loaded_driver_) { \ std::cout << "Driver not loaded, skipping" << std::endl; \ return; \ } -struct JNIDriver { - static HdfsDriver type; -}; - -struct PivotalDriver { - static HdfsDriver type; -}; - HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS; HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3; @@ -364,7 +373,6 @@ TYPED_TEST(TestHdfsClient, LargeFile) { TYPED_TEST(TestHdfsClient, RenameFile) { SKIP_IF_NO_DRIVER(); - ASSERT_OK(this->MakeScratchDir()); auto src_path = this->ScratchPath("src-file"); @@ -380,5 +388,47 @@ TYPED_TEST(TestHdfsClient, RenameFile) { ASSERT_TRUE(this->client_->Exists(dst_path)); } +TYPED_TEST(TestHdfsClient, ThreadSafety) { + SKIP_IF_NO_DRIVER(); + ASSERT_OK(this->MakeScratchDir()); + + auto src_path = this->ScratchPath("threadsafety"); + + std::string data = "foobar"; + ASSERT_OK(this->WriteDummyFile(src_path, reinterpret_cast<const uint8_t*>(data.c_str()), + static_cast<int64_t>(data.size()))); + + std::shared_ptr<HdfsReadableFile> file; + ASSERT_OK(this->client_->OpenReadable(src_path, &file)); + + std::atomic<int> correct_count(0); + const int niter = 1000; + + auto ReadData = [&file, &correct_count, &data, niter]() { + for (int i = 0; i < niter; ++i) { + std::shared_ptr<Buffer> buffer; + if (i % 2 == 0) { + ASSERT_OK(file->ReadAt(3, 3, &buffer)); + if (0 == memcmp(data.c_str() + 3, buffer->data(), 3)) { correct_count += 1; } + } else { + ASSERT_OK(file->ReadAt(0, 4, &buffer)); + if (0 == memcmp(data.c_str() + 0, buffer->data(), 4)) { correct_count += 1; } + } + } + }; + + std::thread thread1(ReadData); + std::thread thread2(ReadData); + std::thread thread3(ReadData); + std::thread thread4(ReadData); + + thread1.join(); + thread2.join(); + thread3.join(); + thread4.join(); + + ASSERT_EQ(niter * 4, correct_count); +} + } // namespace io } // namespace arrow