IMPALA-7543: Enhance scan ranges to support sub-ranges This commit enhances the ScanRange class to make it possible to only read some smaller parts of the whole ScanRange. This functionality is needed by IMPALA-5843.
A sub-range is an offset and length which is located within the scan range. Sub-ranges can be added to a scan range when calling ScanRange::Reset(). If done so, the ScanRange class will only read the parts defined by the sub-ranges. If we have sub-ranges for a cache read then the ScanRange won't enqueue the whole cache buffer (which contains the whole ScanRange), but memcpy() the sub-ranges to IO/client buffers. Smaller refactorings needed to do: * remove scan_range_offset_ from BufferDescriptor * number of bytes read are bookkeeped by ScanRange again Testing: * introduced CacheReaderTestStub to fake cache reads during testing * extended disk-io-mgr-test.cc with sub-ranges Change-Id: Iea26ba386713990f7671aab5a372cf449b8d51e4 Reviewed-on: http://gerrit.cloudera.org:8080/11520 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/48fb4902 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/48fb4902 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/48fb4902 Branch: refs/heads/master Commit: 48fb4902d4f28c9e4d327b80b00b33962c118c22 Parents: 2fb8eba Author: Zoltan Borok-Nagy <[email protected]> Authored: Wed Sep 26 13:58:36 2018 +0200 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Oct 18 20:33:36 2018 +0000 ---------------------------------------------------------------------- be/src/runtime/io/CMakeLists.txt | 1 - be/src/runtime/io/cache-reader-test-stub.h | 62 +++++++++ be/src/runtime/io/disk-io-mgr-stress.cc | 3 +- be/src/runtime/io/disk-io-mgr-test.cc | 177 +++++++++++++++++++---- be/src/runtime/io/disk-io-mgr.cc | 2 +- be/src/runtime/io/file-reader.cc | 31 ----- be/src/runtime/io/file-reader.h | 23 ++- be/src/runtime/io/hdfs-file-reader.cc | 32 ++--- be/src/runtime/io/hdfs-file-reader.h | 4 +- be/src/runtime/io/local-file-reader.cc | 15 +- be/src/runtime/io/local-file-reader.h | 5 +- be/src/runtime/io/request-context.cc | 41 +++++- be/src/runtime/io/request-context.h | 8 ++ be/src/runtime/io/request-ranges.h | 87 ++++++++++-- be/src/runtime/io/scan-range.cc | 178 +++++++++++++++++++----- 15 files changed, 511 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt index bb26ef0..675bd6e 100644 --- a/be/src/runtime/io/CMakeLists.txt +++ b/be/src/runtime/io/CMakeLists.txt @@ -29,7 +29,6 @@ add_library(Io error-converter.cc request-context.cc scan-range.cc - file-reader.cc hdfs-file-reader.cc local-file-reader.cc ) http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/cache-reader-test-stub.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/cache-reader-test-stub.h b/be/src/runtime/io/cache-reader-test-stub.h new file mode 100644 index 0000000..b53b30b --- /dev/null +++ b/be/src/runtime/io/cache-reader-test-stub.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/io/file-reader.h" +#include "runtime/io/request-ranges.h" + +namespace impala { +namespace io { + +/// Only for testing the code path when reading from the cache is successful. +/// Takes a pointer to a buffer in its constructor, also the length of this buffer. +/// CachedFile() simply returns the pointer and length. +/// Invoking ReadFromPos() on it results in an error. +class CacheReaderTestStub : public FileReader { +public: + CacheReaderTestStub(ScanRange* scan_range, uint8_t* cache, int64_t length) : + FileReader(scan_range), + cache_(cache), + length_(length) { + } + + ~CacheReaderTestStub() {} + + virtual Status Open(bool use_file_handle_cache) override { + return Status::OK(); + } + + virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer, + int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override { + DCHECK(false); + return Status("Not implemented"); + } + + virtual void CachedFile(uint8_t** data, int64_t* length) override { + *length = length_; + *data = cache_; + } + + virtual void Close() override {} +private: + uint8_t* cache_ = nullptr; + int64_t length_ = 0; +}; + +} +} http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/disk-io-mgr-stress.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc index 16cfbf2..4335aa4 100644 --- a/be/src/runtime/io/disk-io-mgr-stress.cc +++ b/be/src/runtime/io/disk-io-mgr-stress.cc @@ -119,6 +119,7 @@ void DiskIoMgrStress::ClientThread(int client_id) { while (!eos) { ScanRange* range; + int64_t scan_range_offset = 0; bool needs_buffers; Status status = client->reader->GetNextUnstartedRange(&range, &needs_buffers); CHECK(status.ok() || status.IsCancelled()); @@ -135,7 +136,6 @@ void DiskIoMgrStress::ClientThread(int client_id) { CHECK(status.ok() || status.IsCancelled()); if (buffer == NULL) break; - int64_t scan_range_offset = buffer->scan_range_offset(); int len = buffer->len(); CHECK_GE(scan_range_offset, 0); CHECK_LT(scan_range_offset, expected.size()); @@ -154,6 +154,7 @@ void DiskIoMgrStress::ClientThread(int client_id) { memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len()); range->ReturnBuffer(move(buffer)); bytes_read += len; + scan_range_offset += len; CHECK_GE(bytes_read, 0); CHECK_LE(bytes_read, expected.size()); http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/disk-io-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc index a6689a8..7a9bc23 100644 --- a/be/src/runtime/io/disk-io-mgr-test.cc +++ b/be/src/runtime/io/disk-io-mgr-test.cc @@ -24,6 +24,7 @@ #include "common/init.h" #include "runtime/bufferpool/buffer-pool.h" #include "runtime/bufferpool/reservation-tracker.h" +#include "runtime/io/cache-reader-test-stub.h" #include "runtime/io/local-file-system-with-fault-injection.h" #include "runtime/io/disk-io-mgr-stress.h" #include "runtime/io/disk-io-mgr.h" @@ -179,7 +180,7 @@ class DiskIoMgrTest : public testing::Test { } ASSERT_OK(range->GetNext(&buffer)); ASSERT_TRUE(buffer != nullptr); - EXPECT_EQ(buffer->len(), range->len()); + EXPECT_EQ(buffer->len(), range->bytes_to_read()); if (expected_len < 0) expected_len = strlen(expected); int cmp = memcmp(buffer->buffer(), expected, expected_len); EXPECT_TRUE(cmp == 0); @@ -190,6 +191,7 @@ class DiskIoMgrTest : public testing::Test { const char* expected, int expected_len, const Status& expected_status) { char result[expected_len + 1]; memset(result, 0, expected_len + 1); + int64_t scan_range_offset = 0; while (true) { unique_ptr<BufferDescriptor> buffer; @@ -200,8 +202,9 @@ class DiskIoMgrTest : public testing::Test { break; } ASSERT_LE(buffer->len(), expected_len); - memcpy(result + range->offset() + buffer->scan_range_offset(), + memcpy(result + range->offset() + scan_range_offset, buffer->buffer(), buffer->len()); + scan_range_offset += buffer->len(); range->ReturnBuffer(move(buffer)); } ValidateEmptyOrCorrect(expected, result, expected_len); @@ -229,11 +232,16 @@ class DiskIoMgrTest : public testing::Test { } } + static void SetReaderStub(ScanRange* scan_range, unique_ptr<FileReader> reader_stub) { + scan_range->SetFileReader(move(reader_stub)); + } + ScanRange* InitRange(ObjectPool* pool, const char* file_path, int offset, int len, - int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false) { + int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false, + std::vector<ScanRange::SubRange> sub_ranges = {}) { ScanRange* range = pool->Add(new ScanRange); range->Reset(nullptr, file_path, len, offset, disk_id, true, - BufferOpts(is_cached, mtime), meta_data); + BufferOpts(is_cached, mtime), move(sub_ranges), meta_data); EXPECT_EQ(mtime, range->mtime()); return range; } @@ -252,6 +260,12 @@ class DiskIoMgrTest : public testing::Test { const string& tmp_file, int offset, RequestContext* writer, const string& expected_output); + void SingleReaderTestBody(const char* data, const char* expected_result, + vector<ScanRange::SubRange> sub_ranges = {}); + + void CachedReadsTestBody(const char* data, const char* expected, + bool fake_cache, vector<ScanRange::SubRange> sub_ranges = {}); + /// Convenience function to get a reference to the buffer pool. BufferPool* buffer_pool() const { return ExecEnv::GetInstance()->buffer_pool(); } @@ -522,13 +536,11 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) { buffer_pool()->DeregisterClient(&read_client); } -// Basic test with a single reader, testing multiple threads, disks and a different -// number of buffers. -TEST_F(DiskIoMgrTest, SingleReader) { - InitRootReservation(LARGE_RESERVATION_LIMIT); +void DiskIoMgrTest::SingleReaderTestBody(const char* data, const char* expected_result, + vector<ScanRange::SubRange> sub_ranges) { const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; - const char* data = "abcdefghijklm"; - int len = strlen(data); + int data_len = strlen(data); + int expected_result_len = strlen(expected_result); CreateTempFile(tmp_file, data); // Get mtime for file @@ -554,9 +566,10 @@ TEST_F(DiskIoMgrTest, SingleReader) { unique_ptr<RequestContext> reader = io_mgr.RegisterContext(); vector<ScanRange*> ranges; - for (int i = 0; i < len; ++i) { + for (int i = 0; i < data_len; ++i) { int disk_id = i % num_disks; - ranges.push_back(InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime)); + ranges.push_back(InitRange(&tmp_pool, tmp_file, 0, data_len, disk_id, + stat_val.st_mtime, nullptr, false, sub_ranges)); } ASSERT_OK(reader->AddScanRanges(ranges)); @@ -564,7 +577,8 @@ TEST_F(DiskIoMgrTest, SingleReader) { thread_group threads; for (int i = 0; i < num_read_threads; ++i) { threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), - &read_client, data, len, Status::OK(), 0, &num_ranges_processed)); + &read_client, expected_result, expected_result_len, Status::OK(), 0, + &num_ranges_processed)); } threads.join_all(); @@ -578,6 +592,23 @@ TEST_F(DiskIoMgrTest, SingleReader) { EXPECT_EQ(root_reservation_.GetChildReservations(), 0); } +// Basic test with a single reader, testing multiple threads, disks and a different +// number of buffers. +TEST_F(DiskIoMgrTest, SingleReader) { + InitRootReservation(LARGE_RESERVATION_LIMIT); + const char* data = "abcdefghijklm"; + SingleReaderTestBody(data, data); +} + +TEST_F(DiskIoMgrTest, SingleReaderSubRanges) { + InitRootReservation(LARGE_RESERVATION_LIMIT); + const char* data = "abcdefghijklm"; + int64_t data_len = strlen(data); + SingleReaderTestBody(data, data, {{0, data_len}}); + SingleReaderTestBody(data, "abdef", {{0, 2}, {3, 3}}); + SingleReaderTestBody(data, "bceflm", {{1, 2}, {4, 2}, {11, 2}}); +} + // This test issues adding additional scan ranges while there are some still in flight. TEST_F(DiskIoMgrTest, AddScanRangeTest) { InitRootReservation(LARGE_RESERVATION_LIMIT); @@ -872,14 +903,10 @@ TEST_F(DiskIoMgrTest, MemScarcity) { } } -// Test when some scan ranges are marked as being cached. -// Since these files are not in HDFS, the cached path always fails so this -// only tests the fallback mechanism. -// TODO: we can fake the cached read path without HDFS -TEST_F(DiskIoMgrTest, CachedReads) { - InitRootReservation(LARGE_RESERVATION_LIMIT); +void DiskIoMgrTest::CachedReadsTestBody(const char* data, const char* expected, + bool fake_cache, vector<ScanRange::SubRange> sub_ranges) { const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; - const char* data = "abcdefghijklm"; + uint8_t* cached_data = reinterpret_cast<uint8_t*>(const_cast<char*>(data)); int len = strlen(data); CreateTempFile(tmp_file, data); @@ -898,17 +925,26 @@ TEST_F(DiskIoMgrTest, CachedReads) { unique_ptr<RequestContext> reader = io_mgr.RegisterContext(); ScanRange* complete_range = - InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true); + InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true, + sub_ranges); + if (fake_cache) { + SetReaderStub(complete_range, make_unique<CacheReaderTestStub>( + complete_range, cached_data, len)); + } // Issue some reads before the async ones are issued - ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); - ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected); vector<ScanRange*> ranges; for (int i = 0; i < len; ++i) { int disk_id = i % num_disks; - ranges.push_back( - InitRange(&pool_, tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true)); + ScanRange* range = InitRange(&pool_, tmp_file, 0, len, disk_id, stat_val.st_mtime, + nullptr, true, sub_ranges); + ranges.push_back(range); + if (fake_cache) { + SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cached_data, len)); + } } ASSERT_OK(reader->AddScanRanges(ranges)); @@ -916,19 +952,19 @@ TEST_F(DiskIoMgrTest, CachedReads) { thread_group threads; for (int i = 0; i < 5; ++i) { threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), &read_client, - data, strlen(data), Status::OK(), 0, &num_ranges_processed)); + expected, strlen(expected), Status::OK(), 0, &num_ranges_processed)); } // Issue some more sync ranges for (int i = 0; i < 5; ++i) { sched_yield(); - ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected); } threads.join_all(); - ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); - ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, expected); EXPECT_EQ(num_ranges_processed.Load(), ranges.size()); io_mgr.UnregisterContext(reader.get()); @@ -938,6 +974,32 @@ TEST_F(DiskIoMgrTest, CachedReads) { EXPECT_EQ(root_reservation_.GetChildReservations(), 0); } +// Test when some scan ranges are marked as being cached. +TEST_F(DiskIoMgrTest, CachedReads) { + InitRootReservation(LARGE_RESERVATION_LIMIT); + const char* data = "abcdefghijklm"; + // Don't fake the cache, i.e. test the fallback mechanism + CachedReadsTestBody(data, data, false); + // Fake the test with a file reader stub. + CachedReadsTestBody(data, data, true); +} + +// Test when some scan ranges are marked as being cached and there +// are sub-ranges as well. +TEST_F(DiskIoMgrTest, CachedReadsSubRanges) { + InitRootReservation(LARGE_RESERVATION_LIMIT); + const char* data = "abcdefghijklm"; + int64_t data_len = strlen(data); + + // first iteration tests the fallback mechanism with sub-ranges + // second iteration fakes a cache + for (bool fake_cache : {false, true}) { + CachedReadsTestBody(data, data, fake_cache, {{0, data_len}}); + CachedReadsTestBody(data, "bc", fake_cache, {{1, 2}}); + CachedReadsTestBody(data, "abchilm", fake_cache, {{0, 3}, {7, 2}, {11, 2}}); + } +} + TEST_F(DiskIoMgrTest, MultipleReaderWriter) { InitRootReservation(LARGE_RESERVATION_LIMIT); const int ITERATIONS = 1; @@ -1401,6 +1463,63 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) { EXPECT_EQ(root_reservation_.GetChildReservations(), 0); } +// Test reading into a client-allocated buffer using sub-ranges. +TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) { + InitRootReservation(LARGE_RESERVATION_LIMIT); + const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; + const char* data = "the quick brown fox jumped over the lazy dog"; + uint8_t* cache = reinterpret_cast<uint8_t*>(const_cast<char*>(data)); + int data_len = strlen(data); + int read_len = 4; // Make buffer size smaller than client-provided buffer. + CreateTempFile(tmp_file, data); + + // Get mtime for file + struct stat stat_val; + stat(tmp_file, &stat_val); + + scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len)); + ASSERT_OK(io_mgr->Init()); + // Reader doesn't need to provide client if it's providing buffers. + unique_ptr<RequestContext> reader = io_mgr->RegisterContext(); + + auto test_case = [&](bool fake_cache, const char* expected_result, + vector<ScanRange::SubRange> sub_ranges) { + int result_len = strlen(expected_result); + vector<uint8_t> client_buffer(result_len); + ScanRange* range = pool_.Add(new ScanRange); + range->Reset(nullptr, tmp_file, data_len, 0, 0, true, + BufferOpts::ReadInto(fake_cache, stat_val.st_mtime, client_buffer.data(), + result_len), move(sub_ranges)); + if (fake_cache) { + SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache, data_len)); + } + bool needs_buffers; + ASSERT_OK(reader->StartScanRange(range, &needs_buffers)); + ASSERT_FALSE(needs_buffers); + + unique_ptr<BufferDescriptor> io_buffer; + ASSERT_OK(range->GetNext(&io_buffer)); + ASSERT_TRUE(io_buffer->eosr()); + ASSERT_EQ(result_len, io_buffer->len()); + ASSERT_EQ(client_buffer.data(), io_buffer->buffer()); + ASSERT_EQ(memcmp(io_buffer->buffer(), expected_result, result_len), 0); + + // DiskIoMgr should not have allocated memory. + EXPECT_EQ(root_reservation_.GetChildReservations(), 0); + range->ReturnBuffer(move(io_buffer)); + }; + + for (bool fake_cache : {false, true}) { + test_case(fake_cache, data, {{0, data_len}}); + test_case(fake_cache, data, {{0, 15}, {15, data_len - 15}}); + test_case(fake_cache, "quick fox", {{4, 5}, {15, 4}}); + test_case(fake_cache, "the brown dog", {{0, 3}, {9, 6}, {data_len - 4, 4}}); + } + + io_mgr->UnregisterContext(reader.get()); + EXPECT_EQ(root_reservation_.GetChildReservations(), 0); +} + // Test reading into a client-allocated buffer where the read fails. TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) { InitRootReservation(LARGE_RESERVATION_LIMIT); http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc index 75136ec..fa871d1 100644 --- a/be/src/runtime/io/disk-io-mgr.cc +++ b/be/src/runtime/io/disk-io-mgr.cc @@ -302,7 +302,7 @@ Status DiskIoMgr::AllocateBuffersForRange( BufferPool* bp = ExecEnv::GetInstance()->buffer_pool(); Status status; vector<unique_ptr<BufferDescriptor>> buffers; - for (int64_t buffer_size : ChooseBufferSizes(range->len(), max_bytes)) { + for (int64_t buffer_size : ChooseBufferSizes(range->bytes_to_read(), max_bytes)) { BufferPool::BufferHandle handle; status = bp->AllocateBuffer(bp_client, buffer_size, &handle); if (!status.ok()) goto error; http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/file-reader.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/file-reader.cc b/be/src/runtime/io/file-reader.cc deleted file mode 100644 index 81ef090..0000000 --- a/be/src/runtime/io/file-reader.cc +++ /dev/null @@ -1,31 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "gutil/strings/substitute.h" -#include "runtime/io/file-reader.h" - -#include "common/names.h" - -namespace impala { -namespace io { - -string FileReader::DebugString() const { - return Substitute("bytes_read=$0", bytes_read_); -} - -} -} http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/file-reader.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/file-reader.h b/be/src/runtime/io/file-reader.h index 40f2a5c..9dbcc31 100644 --- a/be/src/runtime/io/file-reader.h +++ b/be/src/runtime/io/file-reader.h @@ -40,33 +40,31 @@ public: FileReader(ScanRange* scan_range) : scan_range_(scan_range) {} virtual ~FileReader() {} - /// Returns number of bytes read by this file reader. - int bytes_read() const { return bytes_read_; } - /// Opens file that is associated with 'scan_range_'. /// 'use_file_handle_cache' currently only used by HdfsFileReader. virtual Status Open(bool use_file_handle_cache) = 0; /// Reads bytes from given position ('file_offset'). Tries to read /// 'bytes_to_read' amount of bytes. 'bytes_read' contains the number of - /// bytes actually read. 'eosr' is set to true when end of file has reached, - /// or the file reader has read all the bytes needed by 'scan_range_'. + /// bytes actually read. 'eof' is set to true when end of file has reached. virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer, - int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) = 0; + int64_t bytes_to_read, int64_t* bytes_read, bool* eof) = 0; /// ***Currently only for HDFS*** - /// Returns a pointer to a cached buffer that contains the contents of the file. - virtual void* CachedFile() = 0; + /// When successful, sets 'data' to a buffer that contains the contents of a file, + /// and 'length' is set to the length of the data. + /// When unsuccessful, 'data' is set to nullptr. + virtual void CachedFile(uint8_t** data, int64_t* length) = 0; /// Closes the file associated with 'scan_range_'. It doesn't have effect on other /// scan ranges. virtual void Close() = 0; - /// Reset internal bookkeeping, e.g. how many bytes have been read. - virtual void ResetState() { bytes_read_ = 0; } + /// Resets internal bookkeeping + virtual void ResetState() {} // Debug string of this file reader. - virtual std::string DebugString() const; + virtual std::string DebugString() const { return ""; } SpinLock& lock() { return lock_; } protected: @@ -81,9 +79,6 @@ protected: /// The scan range this file reader serves. ScanRange* const scan_range_; - - /// Number of bytes read by this reader. - int bytes_read_ = 0; }; } http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/hdfs-file-reader.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc index ea61b6a..c6a6cec 100644 --- a/be/src/runtime/io/hdfs-file-reader.cc +++ b/be/src/runtime/io/hdfs-file-reader.cc @@ -40,6 +40,7 @@ namespace io { HdfsFileReader::~HdfsFileReader() { DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed."; + DCHECK(cached_buffer_ == nullptr) << "Cached buffer was not released."; } Status HdfsFileReader::Open(bool use_file_handle_cache) { @@ -73,7 +74,7 @@ Status HdfsFileReader::Open(bool use_file_handle_cache) { } Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer, - int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) { + int64_t bytes_to_read, int64_t* bytes_read, bool* eof) { DCHECK(scan_range_->read_in_flight()); DCHECK_GE(bytes_to_read, 0); // Delay before acquiring the lock, to allow triggering IMPALA-6587 race. @@ -87,7 +88,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer, auto io_mgr = scan_range_->io_mgr_; auto request_context = scan_range_->reader_; - *eosr = false; + *eof = false; *bytes_read = 0; CachedHdfsFileHandle* borrowed_hdfs_fh = nullptr; @@ -151,7 +152,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer, DCHECK_GT(current_bytes_read, -1); if (current_bytes_read == 0) { // No more bytes in the file. The scan range went past the end. - *eosr = true; + *eof = true; break; } *bytes_read += current_bytes_read; @@ -164,12 +165,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer, if (borrowed_hdfs_fh != nullptr) { io_mgr->ReleaseCachedHdfsFileHandle(scan_range_->file_string(), borrowed_hdfs_fh); } - if (!status.ok()) - return status; - bytes_read_ += *bytes_read; - DCHECK_LE(bytes_read_, scan_range_->len()); - if (bytes_read_ == scan_range_->len()) *eosr = true; - return Status::OK(); + return status; } Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_in_file, @@ -203,16 +199,22 @@ Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_ return Status::OK(); } -void* HdfsFileReader::CachedFile() { +void HdfsFileReader::CachedFile(uint8_t** data, int64_t* length) { { unique_lock<SpinLock> hdfs_lock(lock_); + DCHECK(cached_buffer_ == nullptr); DCHECK(exclusive_hdfs_fh_ != nullptr); cached_buffer_ = hadoopReadZero(exclusive_hdfs_fh_->file(), scan_range_->io_mgr_->cached_read_options(), scan_range_->len()); } - if (cached_buffer_ == nullptr) return nullptr; - bytes_read_ = hadoopRzBufferLength(cached_buffer_); - return const_cast<void*>(hadoopRzBufferGet(cached_buffer_)); + if (cached_buffer_ == nullptr) { + *data = nullptr; + *length = 0; + return; + } + *data = reinterpret_cast<uint8_t*>( + const_cast<void*>(hadoopRzBufferGet(cached_buffer_))); + *length = hadoopRzBufferLength(cached_buffer_); } void HdfsFileReader::Close() { @@ -220,11 +222,9 @@ void HdfsFileReader::Close() { if (exclusive_hdfs_fh_ != nullptr) { GetHdfsStatistics(exclusive_hdfs_fh_->file()); - if (scan_range_->external_buffer_tag_ == - ScanRange::ExternalBufferTag::CACHED_BUFFER) { + if (cached_buffer_ != nullptr) { hadoopRzBufferFree(exclusive_hdfs_fh_->file(), cached_buffer_); cached_buffer_ = nullptr; - scan_range_->external_buffer_tag_ = ScanRange::ExternalBufferTag::NO_BUFFER; } // Destroy the file handle. http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/hdfs-file-reader.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/hdfs-file-reader.h b/be/src/runtime/io/hdfs-file-reader.h index 3b3642b..ba95095 100644 --- a/be/src/runtime/io/hdfs-file-reader.h +++ b/be/src/runtime/io/hdfs-file-reader.h @@ -34,11 +34,11 @@ public: virtual Status Open(bool use_file_handle_cache) override; virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer, - int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) override; + int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override; /// Reads from the DN cache. On success, sets cached_buffer_ to the DN /// buffer and returns a pointer to the underlying raw buffer. /// Returns nullptr if the data is not cached. - virtual void* CachedFile() override; + virtual void CachedFile(uint8_t** data, int64_t* length) override; virtual void Close() override; virtual void ResetState() override; virtual std::string DebugString() const override; http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/local-file-reader.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/local-file-reader.cc b/be/src/runtime/io/local-file-reader.cc index 47c025b..3f88106 100644 --- a/be/src/runtime/io/local-file-reader.cc +++ b/be/src/runtime/io/local-file-reader.cc @@ -49,7 +49,7 @@ Status LocalFileReader::Open(bool use_file_handle_cache) { } Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer, - int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) { + int64_t bytes_to_read, int64_t* bytes_read, bool* eof) { DCHECK(scan_range_->read_in_flight()); DCHECK_GE(bytes_to_read, 0); // Delay before acquiring the lock, to allow triggering IMPALA-6587 race. @@ -61,7 +61,7 @@ Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer, unique_lock<SpinLock> fs_lock(lock_); RETURN_IF_ERROR(scan_range_->cancel_status_); - *eosr = false; + *eof = false; *bytes_read = 0; DCHECK(file_ != nullptr); @@ -85,17 +85,14 @@ Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer, } // On Linux, we should only get partial reads from block devices on error or eof. DCHECK(feof(file_) != 0); - *eosr = true; + *eof = true; } - bytes_read_ += *bytes_read; - DCHECK_LE(bytes_read_, scan_range_->len()); - if (bytes_read_ == scan_range_->len()) *eosr = true; return Status::OK(); } -void* LocalFileReader::CachedFile() { - DCHECK(false); - return nullptr; +void LocalFileReader::CachedFile(uint8_t** data, int64_t* length) { + *data = nullptr; + *length = 0; } void LocalFileReader::Close() { http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/local-file-reader.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/local-file-reader.h b/be/src/runtime/io/local-file-reader.h index d8eedd1..14cff3c 100644 --- a/be/src/runtime/io/local-file-reader.h +++ b/be/src/runtime/io/local-file-reader.h @@ -17,7 +17,6 @@ #pragma once -#include "common/hdfs.h" #include "runtime/io/file-reader.h" namespace impala { @@ -32,9 +31,9 @@ public: virtual Status Open(bool use_file_handle_cache) override; virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer, - int64_t bytes_to_read, int64_t* bytes_read, bool* eosr) override; + int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override; /// We don't cache files of the local file system. - virtual void* CachedFile() override; + virtual void CachedFile(uint8_t** data, int64_t* length) override; virtual void Close() override; private: /// Points to a C FILE object between calls to Open() and Close(), otherwise nullptr. http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/request-context.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc index 56f6704..24213f9 100644 --- a/be/src/runtime/io/request-context.cc +++ b/be/src/runtime/io/request-context.cc @@ -425,7 +425,8 @@ Status RequestContext::GetNextUnstartedRange(ScanRange** range, bool* needs_buff *range = cached_ranges_.Dequeue(); DCHECK((*range)->try_cache()); bool cached_read_succeeded; - RETURN_IF_ERROR((*range)->ReadFromCache(lock, &cached_read_succeeded)); + RETURN_IF_ERROR(TryReadFromCache(lock, *range, &cached_read_succeeded, + needs_buffers)); if (cached_read_succeeded) return Status::OK(); // This range ended up not being cached. Loop again and pick up a new range. @@ -472,12 +473,9 @@ Status RequestContext::StartScanRange(ScanRange* range, bool* needs_buffers) { DCHECK_NE(range->len(), 0); if (range->try_cache()) { bool cached_read_succeeded; - RETURN_IF_ERROR(range->ReadFromCache(lock, &cached_read_succeeded)); - if (cached_read_succeeded) { - DCHECK(Validate()) << endl << DebugString(); - *needs_buffers = false; - return Status::OK(); - } + RETURN_IF_ERROR(TryReadFromCache(lock, range, &cached_read_succeeded, + needs_buffers)); + if (cached_read_succeeded) return Status::OK(); // Cached read failed, fall back to normal read path. } // If we don't have a buffer yet, the caller must allocate buffers for the range. @@ -491,6 +489,35 @@ Status RequestContext::StartScanRange(ScanRange* range, bool* needs_buffers) { return Status::OK(); } +Status RequestContext::TryReadFromCache(const unique_lock<mutex>& lock, + ScanRange* range, bool* read_succeeded, bool* needs_buffers) { + DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); + RETURN_IF_ERROR(range->ReadFromCache(lock, read_succeeded)); + if (!*read_succeeded) return Status::OK(); + + DCHECK(Validate()) << endl << DebugString(); + ScanRange::ExternalBufferTag buffer_tag = range->external_buffer_tag(); + // The following cases are possible at this point: + // * The scan range doesn't have sub-ranges: + // ** buffer_tag is CACHED_BUFFER and the buffer is already available to the reader. + // (there is nothing to do) + // + // * The scan range has sub-ranges, and buffer_tag is: + // ** NO_BUFFER: the client needs to add buffers to the scan range + // ** CLIENT_BUFFER: the client already provided a buffer to copy data into it + *needs_buffers = buffer_tag == ScanRange::ExternalBufferTag::NO_BUFFER; + if (*needs_buffers) { + DCHECK(range->HasSubRanges()); + range->SetBlockedOnBuffer(); + // The range will be scheduled when buffers are added to it. + AddRangeToDisk(lock, range, ScheduleMode::BY_CALLER); + } else if (buffer_tag == ScanRange::ExternalBufferTag::CLIENT_BUFFER) { + DCHECK(range->HasSubRanges()); + AddRangeToDisk(lock, range, ScheduleMode::IMMEDIATELY); + } + return Status::OK(); +} + Status RequestContext::AddWriteRange(WriteRange* write_range) { unique_lock<mutex> lock(lock_); if (state_ == RequestContext::Cancelled) return CONTEXT_CANCELLED; http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/request-context.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h index 0fcbca3..24b1bb5 100644 --- a/be/src/runtime/io/request-context.h +++ b/be/src/runtime/io/request-context.h @@ -281,6 +281,14 @@ class RequestContext { void RemoveActiveScanRangeLocked( const boost::unique_lock<boost::mutex>& lock, ScanRange* range); + /// Try to read the scan range from the cache. '*read_succeeded' is set to true if the + /// scan range can be found in the cache, otherwise false. + /// If '*needs_buffers' is returned as true, the caller must call + /// AllocateBuffersForRange() to add buffers for the data to be read into before the + /// range can be scheduled. + Status TryReadFromCache(const boost::unique_lock<boost::mutex>& lock, ScanRange* range, + bool* read_succeeded, bool* needs_buffers); + // Counters are updated by other classes - expose to other io:: classes for convenience. /// Total bytes read for this reader http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/request-ranges.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h index 3bf55d6..1dc451a 100644 --- a/be/src/runtime/io/request-ranges.h +++ b/be/src/runtime/io/request-ranges.h @@ -60,9 +60,6 @@ class BufferDescriptor { int64_t len() { return len_; } bool eosr() { return eosr_; } - /// Returns the offset within the scan range that this buffer starts at - int64_t scan_range_offset() const { return scan_range_offset_; } - private: DISALLOW_COPY_AND_ASSIGN(BufferDescriptor); /// This class is tightly coupled with ScanRange. Making them friends is easiest. @@ -101,8 +98,6 @@ class BufferDescriptor { /// true if the current scan range is complete bool eosr_ = false; - int64_t scan_range_offset_ = 0; - // Handle to an allocated buffer and the client used to allocate it buffer. Only used // for non-external buffers. BufferPool::ClientHandle* bp_client_ = nullptr; @@ -191,6 +186,15 @@ struct BufferOpts { return BufferOpts(false, NEVER_CACHE, client_buffer, client_buffer_len); } + /// Use only when you don't want to to read the entire scan range, but only sub-ranges + /// in it. In this case you can copy the relevant parts from the HDFS cache into the + /// client buffer. The length of the buffer, 'client_buffer_len' must fit the + /// concatenation of all the sub-ranges. + static BufferOpts ReadInto(bool try_cache, int64_t mtime, uint8_t* client_buffer, + int64_t client_buffer_len) { + return BufferOpts(try_cache, mtime, client_buffer, client_buffer_len); + } + private: friend class ScanRange; friend class HdfsFileReader; @@ -227,6 +231,12 @@ class ScanRange : public RequestRange { virtual ~ScanRange(); + /// Defines an internal range within this ScanRange. + struct SubRange { + int64_t offset; + int64_t length; + }; + /// Resets this scan range object with the scan range description. The scan range /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the /// local filesystem). The scan range must be non-empty and fall within the file bounds @@ -238,10 +248,17 @@ class ScanRange : public RequestRange { void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr); + /// Same as above, but it also adds sub-ranges. No need to merge contiguous sub-ranges + /// in advance, as this method will do the merge. + void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id, + bool expected_local, const BufferOpts& buffer_opts, + std::vector<SubRange>&& sub_ranges, void* meta_data = nullptr); + void* meta_data() const { return meta_data_; } bool try_cache() const { return try_cache_; } bool read_in_flight() const { return read_in_flight_; } bool expected_local() const { return expected_local_; } + int64_t bytes_to_read() const { return bytes_to_read_; } /// Returns the next buffer for this scan range. buffer is an output parameter. /// This function blocks until a buffer is ready or an error occurred. If this is @@ -269,7 +286,7 @@ class ScanRange : public RequestRange { int64_t mtime() const { return mtime_; } - int BytesRead() const; + bool HasSubRanges() const { return !sub_ranges_.empty(); } private: DISALLOW_COPY_AND_ASSIGN(ScanRange); @@ -279,6 +296,7 @@ class ScanRange : public RequestRange { friend class BufferDescriptor; friend class DiskQueue; friend class DiskIoMgr; + friend class DiskIoMgrTest; friend class RequestContext; friend class HdfsFileReader; friend class LocalFileReader; @@ -344,6 +362,9 @@ class ScanRange : public RequestRange { /// while any thread is inside a critical section. Status cancel_status_; + /// Only for testing + void SetFileReader(std::unique_ptr<FileReader> file_reader); + /// END: private members that are accessed by other io:: classes ///////////////////////////////////////// @@ -383,10 +404,28 @@ class ScanRange : public RequestRange { return !cancel_status_.ok() || (eosr_queued_ && ready_buffers_.empty()); } + /// Adds sub-ranges to this ScanRange. If sub_ranges is not empty, then ScanRange won't + /// read everything from its range, but will only read these sub-ranges. + /// Sub-ranges need to be ordered by 'offset' and cannot overlap with each other. + /// Doesn't need to merge continuous sub-ranges in advance, this method will do. + void InitSubRanges(std::vector<SubRange>&& sub_ranges); + + /// Read the sub-ranges into buffer and track the current position in 'sub_range_pos_'. + /// If cached data is available, then memcpy() from it instead of actually reading the + /// files. + Status ReadSubRanges(BufferDescriptor* buffer, bool* eof); + /// Validates the internal state of this range. lock_ must be taken /// before calling this. bool Validate(); + /// Validates the sub-ranges. All sub-range must be inside of this ScanRange. + /// They need to be ordered by offset and cannot overlap. + bool ValidateSubRanges(); + + /// Merges adjacent and continuous sub-ranges. + void MergeSubRanges(); + /// Pointer to caller specified metadata. This is untouched by the io manager /// and the caller can put whatever auxiliary data in here. void* meta_data_ = nullptr; @@ -418,18 +457,26 @@ class ScanRange : public RequestRange { /// Valid if the 'external_buffer_tag_' is CLIENT_BUFFER. struct { /// Client-provided buffer to read the whole scan range into. - uint8_t* data; + uint8_t* data = nullptr; /// Length of the client-provided buffer. - int64_t len; + int64_t len = 0; } client_buffer_; + /// Valid if reading file contents from cache was successful. + struct { + /// Pointer to the contents of the file. + uint8_t* data = nullptr; + /// Length of the contents. + int64_t len = 0; + } cache_; + /// The number of buffers that have been returned to a client via GetNext() that have /// not yet been returned with ReturnBuffer(). AtomicInt32 num_buffers_in_reader_{0}; /// Lock protecting fields below. - /// This lock should not be taken during Open()/Read()/Close(). + /// This lock should not be taken during FileReader::Open()/Read()/Close(). /// If RequestContext::lock_ and this lock need to be held simultaneously, /// RequestContext::lock_ must be taken first. boost::mutex lock_; @@ -475,8 +522,30 @@ class ScanRange : public RequestRange { /// cancelled. ConditionVariable buffer_ready_cv_; + /// Number of bytes read by this scan range. + int64_t bytes_read_ = 0; + /// Polymorphic object that is responsible for doing file operations. std::unique_ptr<FileReader> file_reader_; + + /// If not empty, the ScanRange will only read these parts from the file. + std::vector<SubRange> sub_ranges_; + + // Read position in the sub-ranges. + struct SubRangePosition { + /// Index of SubRange in 'ScanRange::sub_ranges_' to read next + int64_t index = 0; + /// Bytes already read from 'ScanRange::sub_ranges_[sub_range_index]' + int64_t bytes_read = 0; + }; + + /// Current read position in the sub-ranges. + SubRangePosition sub_range_pos_; + + /// Number of bytes need to be read by this ScanRange. If there are no sub-ranges it + /// equals to 'len_'. If there are sub-ranges then it equals to the sum of the lengths + /// of the sub-ranges (which is less than or equal to 'len_'). + int64_t bytes_to_read_ = 0; }; /// Used to specify data to be written to a file and offset. http://git-wip-us.apache.org/repos/asf/impala/blob/48fb4902/be/src/runtime/io/scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc index aaef6b4..660710e 100644 --- a/be/src/runtime/io/scan-range.cc +++ b/be/src/runtime/io/scan-range.cc @@ -163,13 +163,8 @@ unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer( return result; } -int ScanRange::BytesRead() const { - DCHECK(file_reader_ != nullptr); - return file_reader_->bytes_read(); -} - ReadOutcome ScanRange::DoRead(int disk_id) { - int64_t bytes_remaining = len_ - BytesRead(); + int64_t bytes_remaining = bytes_to_read_ - bytes_read_; DCHECK_GT(bytes_remaining, 0); unique_ptr<BufferDescriptor> buffer_desc; @@ -183,8 +178,8 @@ ReadOutcome ScanRange::DoRead(int disk_id) { this, client_buffer_.data, client_buffer_.len)); } else { DCHECK(external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER) - << "This code path does not handle other buffer types, i.e. HDFS cache" - << static_cast<int>(external_buffer_tag_); + << "This code path does not handle other buffer types, i.e. HDFS cache. " + << "external_buffer_tag_=" << static_cast<int>(external_buffer_tag_); buffer_desc = GetUnusedBuffer(lock); if (buffer_desc == nullptr) { // No buffer available - the range will be rescheduled when a buffer is added. @@ -199,21 +194,27 @@ ReadOutcome ScanRange::DoRead(int disk_id) { // No locks in this section. Only working on local vars. We don't want to hold a // lock across the read call. Status read_status = file_reader_->Open(is_file_handle_caching_enabled()); + bool eof = false; if (read_status.ok()) { COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L); COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id); - read_status = file_reader_->ReadFromPos(offset_ + BytesRead(), buffer_desc->buffer_, - min(len() - BytesRead(), buffer_desc->buffer_len_), &buffer_desc->len_, - &buffer_desc->eosr_); - buffer_desc->scan_range_offset_ = BytesRead() - buffer_desc->len_; + if (sub_ranges_.empty()) { + DCHECK(cache_.data == nullptr); + read_status = file_reader_->ReadFromPos(offset_ + bytes_read_, buffer_desc->buffer_, + min(len() - bytes_read_, buffer_desc->buffer_len_), + &buffer_desc->len_, &eof); + } else { + read_status = ReadSubRanges(buffer_desc.get(), &eof); + } COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_); COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L); } DCHECK(buffer_desc->buffer_ != nullptr); - DCHECK(!buffer_desc->is_cached()) << "HDFS cache reads don't go through this code path."; + DCHECK(!buffer_desc->is_cached()) << + "Pure HDFS cache reads don't go through this code path."; if (!read_status.ok()) { // Free buffer to release resources before we cancel the range so that all buffers // are freed at cancellation. @@ -228,9 +229,15 @@ ReadOutcome ScanRange::DoRead(int disk_id) { return ReadOutcome::CANCELLED; } + bytes_read_ += buffer_desc->len(); + DCHECK_LE(bytes_read_, bytes_to_read_); + + // It is end of stream if it is end of file, or read all the bytes. + buffer_desc->eosr_ = eof || bytes_read_ == bytes_to_read_; + // After calling EnqueueReadyBuffer(), it is no longer valid to touch 'buffer_desc'. // Store the state we need before calling EnqueueReadyBuffer(). - bool eosr = buffer_desc->eosr_; + bool eosr = buffer_desc->eosr(); // Read successful - enqueue the buffer and return the appropriate outcome. if (!EnqueueReadyBuffer(move(buffer_desc))) return ReadOutcome::CANCELLED; if (eosr) { @@ -241,6 +248,42 @@ ReadOutcome ScanRange::DoRead(int disk_id) { return ReadOutcome::SUCCESS_NO_EOSR; } +Status ScanRange::ReadSubRanges(BufferDescriptor* buffer_desc, bool* eof) { + buffer_desc->len_ = 0; + while (buffer_desc->len() < buffer_desc->buffer_len() && + sub_range_pos_.index < sub_ranges_.size()) { + SubRange& sub_range = sub_ranges_[sub_range_pos_.index]; + int64_t offset = sub_range.offset + sub_range_pos_.bytes_read; + int64_t bytes_to_read = min(sub_range.length - sub_range_pos_.bytes_read, + buffer_desc->buffer_len() - buffer_desc->len()); + + if (cache_.data != nullptr) { + memcpy(buffer_desc->buffer_ + buffer_desc->len(), + cache_.data + offset, bytes_to_read); + } else { + int64_t current_bytes_read; + Status read_status = file_reader_->ReadFromPos(offset, + buffer_desc->buffer_ + buffer_desc->len(), bytes_to_read, ¤t_bytes_read, + eof); + if (!read_status.ok()) return read_status; + if (current_bytes_read != bytes_to_read) { + DCHECK(*eof); + DCHECK_LT(current_bytes_read, bytes_to_read); + return Status(TErrorCode::SCANNER_INCOMPLETE_READ, bytes_to_read, + current_bytes_read, file(), offset); + } + } + + buffer_desc->len_ += bytes_to_read; + sub_range_pos_.bytes_read += bytes_to_read; + if (sub_range_pos_.bytes_read == sub_range.length) { + sub_range_pos_.index += 1; + sub_range_pos_.bytes_read = 0; + } + } + return Status::OK(); +} + void ScanRange::SetBlockedOnBuffer() { unique_lock<mutex> lock(lock_); blocked_on_buffer_ = true; @@ -339,9 +382,10 @@ string ScanRange::DebugString() const { } bool ScanRange::Validate() { - if (BytesRead() > len_) { - LOG(ERROR) << "Bytes read tracking is wrong. Shouldn't read past the scan range." - << " bytes_read_=" << BytesRead() << " len_=" << len_; + if (bytes_read_ > bytes_to_read_) { + LOG(ERROR) << "Bytes read tracking is wrong. Too many bytes have been read." + << " bytes_read_=" << bytes_read_ + << " bytes_to_read_=" << bytes_to_read_; return false; } if (!cancel_status_.ok() && !ready_buffers_.empty()) { @@ -377,8 +421,6 @@ ScanRange::ScanRange() external_buffer_tag_(ExternalBufferTag::NO_BUFFER) {} ScanRange::~ScanRange() { - DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) - << "Cached buffer was not released."; DCHECK(!read_in_flight_); DCHECK_EQ(0, ready_buffers_.size()); DCHECK_EQ(0, num_buffers_in_reader_.Load()); @@ -386,6 +428,12 @@ ScanRange::~ScanRange() { void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data) { + Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, {}, meta_data); +} + +void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, + int disk_id, bool expected_local, const BufferOpts& buffer_opts, + vector<SubRange>&& sub_ranges, void* meta_data) { DCHECK(ready_buffers_.empty()); DCHECK(!read_in_flight_); DCHECK(file != nullptr); @@ -401,6 +449,7 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, } file_ = file; len_ = len; + bytes_to_read_ = len; offset_ = offset; disk_id_ = disk_id; try_cache_ = buffer_opts.try_cache_; @@ -416,6 +465,59 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, expected_local_ = expected_local; io_mgr_ = nullptr; reader_ = nullptr; + sub_ranges_.clear(); + sub_range_pos_ = {}; + InitSubRanges(move(sub_ranges)); +} + +void ScanRange::InitSubRanges(vector<SubRange>&& sub_ranges) { + sub_ranges_ = std::move(sub_ranges); + DCHECK(ValidateSubRanges()); + MergeSubRanges(); + DCHECK(ValidateSubRanges()); + sub_range_pos_ = {}; + + if (sub_ranges_.empty()) return; + + int length_sum = 0; + for (auto& sub_range : sub_ranges_) { + length_sum += sub_range.length; + } + bytes_to_read_ = length_sum; +} + +bool ScanRange::ValidateSubRanges() { + for (int i = 0; i < sub_ranges_.size(); ++i) { + SubRange& sub_range = sub_ranges_[i]; + if (sub_range.length <= 0) return false; + if (sub_range.offset < offset_) return false; + if (sub_range.offset + sub_range.length > offset_ + len_) return false; + + if (i == sub_ranges_.size() - 1) break; + + SubRange& next_sub_range = sub_ranges_[i+1]; + if (sub_range.offset + sub_range.length > next_sub_range.offset) return false; + } + return true; +} + +void ScanRange::MergeSubRanges() { + if (sub_ranges_.empty()) return; + for (int i = 0; i < sub_ranges_.size() - 1; ++i) { + SubRange& current = sub_ranges_[i]; + int j = i + 1; + for (; j < sub_ranges_.size(); ++j) { + SubRange& sr_j = sub_ranges_[j]; + if (sr_j.offset == current.offset + current.length) { + current.length += sr_j.length; + } else { + break; + } + } + if (j > i + 1) { + sub_ranges_.erase(sub_ranges_.begin() + i + 1, sub_ranges_.begin() + j); + } + } } void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) { @@ -427,10 +529,16 @@ void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) { cancel_status_ = Status::OK(); eosr_queued_ = false; blocked_on_buffer_ = false; + bytes_read_ = 0; + sub_range_pos_ = {}; file_reader_->ResetState(); DCHECK(Validate()) << DebugString(); } +void ScanRange::SetFileReader(unique_ptr<FileReader> file_reader) { + file_reader_ = move(file_reader); +} + int64_t ScanRange::MaxReadChunkSize() const { // S3 InputStreams don't support DIRECT_READ (i.e. java.nio.ByteBuffer read() // interface). So, hdfsRead() needs to allocate a Java byte[] and copy the data out. @@ -454,54 +562,54 @@ Status ScanRange::ReadFromCache( const unique_lock<mutex>& reader_lock, bool* read_succeeded) { DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock()); DCHECK(try_cache_); - DCHECK_EQ(BytesRead(), 0); + DCHECK_EQ(bytes_read_, 0); *read_succeeded = false; Status status = file_reader_->Open(false); if (!status.ok()) return status; - // Cached reads not supported on local filesystem. - if (fs_ == nullptr) return Status::OK(); - // Check cancel status. { unique_lock<mutex> lock(lock_); RETURN_IF_ERROR(cancel_status_); } - DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER); - void* buffer = file_reader_->CachedFile(); - if (buffer != nullptr) external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER; + file_reader_->CachedFile(&cache_.data, &cache_.len); // Data was not cached, caller will fall back to normal read path. - if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) { + if (cache_.data == nullptr) { VLOG_QUERY << "Cache read failed for scan range: " << DebugString() << ". Switching to disk read path."; // Clean up the scan range state before re-issuing it. file_reader_->Close(); return Status::OK(); } - int bytes_read = BytesRead(); // A partial read can happen when files are truncated. // TODO: If HDFS ever supports partially cached blocks, we'll have to distinguish // between errors and partially cached blocks here. - if (bytes_read < len()) { + if (cache_.len < len()) { VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ". Expected " - << len() << " bytes, but read " << bytes_read << ". Switching to disk read path."; + << len() << " bytes, but read " << cache_.len << ". Switching to disk read path."; // Close the scan range. 'read_succeeded' is still false, so the caller will fall back // to non-cached read of this scan range. file_reader_->Close(); return Status::OK(); } + *read_succeeded = true; + // If there are sub-ranges, then we need to memcpy() them from the cached buffer. + if (HasSubRanges()) return Status::OK(); + + DCHECK(external_buffer_tag_ != ExternalBufferTag::CLIENT_BUFFER); + external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER; + bytes_read_ = cache_.len; + // Create a single buffer desc for the entire scan range and enqueue that. // The memory is owned by the HDFS java client, not the Impala backend. unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor( - this, reinterpret_cast<uint8_t*>(buffer), 0)); - desc->len_ = bytes_read; - desc->scan_range_offset_ = 0; + this, cache_.data, 0)); + desc->len_ = cache_.len; desc->eosr_ = true; EnqueueReadyBuffer(move(desc)); - COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, bytes_read); - *read_succeeded = true; + COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, cache_.len); return Status::OK(); }
