This is an automated email from the ASF dual-hosted git repository. dbecker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 10b130facb8c0f06d9a0f5e051728c9b825eab14 Author: Joe McDonnell <[email protected]> AuthorDate: Thu May 11 15:31:33 2023 -0700 IMPALA-12123 (part 2): Fix offsets for HDFS caching with subranges When using HDFS caching, subrange reads are copying memory from the HDFS cache buffer into the output buffer. When the HDFS cache buffer starts at an offset into the file, the subrange offsets need to be adjusted to point to the relative place within the cache buffer. This fixes the calculation to use the relative locations. Testing: - Modified the caching tests in disk-io-mgr-test to also test with reads at an offset - In a local minicluster, creating a table based on alltypes_tiny_pages.parquet, added HDFS caching for the table, ran the SQLs in parquet-page-index-alltypes-tiny-pages.test, and checked the HDFS caching reads in the profile. Change-Id: Icd0ccc677b090ef3a75047defb4d7bda4279dac6 Reviewed-on: http://gerrit.cloudera.org:8080/19877 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Joe McDonnell <[email protected]> --- be/src/runtime/io/disk-io-mgr-test.cc | 66 ++++++++++++++++++++++++----------- be/src/runtime/io/scan-range.cc | 14 ++++++-- 2 files changed, 58 insertions(+), 22 deletions(-) diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc index 7eb8984b1..39aafed0e 100644 --- a/be/src/runtime/io/disk-io-mgr-test.cc +++ b/be/src/runtime/io/disk-io-mgr-test.cc @@ -247,8 +247,10 @@ class DiskIoMgrTest : public testing::Test { static void ValidateScanRange(DiskIoMgr* io_mgr, ScanRange* range, const char* expected, int expected_len, const Status& expected_status) { - char result[expected_len + 1]; - memset(result, 0, expected_len + 1); + // TODO: Disentagle the offsets/expected results properly so this doesn't need to + // overallocate. + char result[range->offset() + expected_len + 1]; + memset(result, 0, range->offset() + expected_len + 1); int64_t scan_range_offset = 0; while (true) { @@ -323,7 +325,8 @@ class DiskIoMgrTest : public testing::Test { vector<ScanRange::SubRange> sub_ranges = {}); void CachedReadsTestBody(const char* data, const char* expected, - HdfsCachingScenario scenario, vector<ScanRange::SubRange> sub_ranges = {}); + HdfsCachingScenario scenario, int offset, + vector<ScanRange::SubRange> sub_ranges = {}); /// Convenience function to get a reference to the buffer pool. BufferPool* buffer_pool() const { return ExecEnv::GetInstance()->buffer_pool(); } @@ -1050,11 +1053,15 @@ TEST_F(DiskIoMgrTest, MemScarcity) { } void DiskIoMgrTest::CachedReadsTestBody(const char* data, const char* expected, - HdfsCachingScenario scenario, vector<ScanRange::SubRange> sub_ranges) { + HdfsCachingScenario scenario, int offset, vector<ScanRange::SubRange> sub_ranges) { + // The data passed in is the full data with any extra initial offset. This + // is necessary to make the file on disk able to service the data at the + // appropriate offset. const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; - uint8_t* cached_data = reinterpret_cast<uint8_t*>(const_cast<char*>(data)); - int len = strlen(data); CreateTempFile(tmp_file, data); + // The actual scan range data is the original test data at the appropriate offset. + uint8_t* cached_data = reinterpret_cast<uint8_t*>(const_cast<char*>(data)) + offset; + int len = strlen(data) - offset; // Get mtime for file struct stat stat_val; @@ -1071,7 +1078,7 @@ void DiskIoMgrTest::CachedReadsTestBody(const char* data, const char* expected, unique_ptr<RequestContext> reader = io_mgr.RegisterContext(); ScanRange* complete_range = - InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true, + InitRange(&pool_, tmp_file, offset, len, 0, stat_val.st_mtime, nullptr, true, sub_ranges); SetReaderStub(complete_range, make_unique<CacheReaderTestStub>(complete_range, cached_data, len, scenario)); @@ -1083,8 +1090,8 @@ void DiskIoMgrTest::CachedReadsTestBody(const char* data, const char* expected, vector<ScanRange*> ranges; for (int i = 0; i < len; ++i) { int disk_id = i % num_disks; - ScanRange* range = InitRange(&pool_, tmp_file, 0, len, disk_id, stat_val.st_mtime, - nullptr, true, sub_ranges); + ScanRange* range = InitRange(&pool_, tmp_file, offset, len, disk_id, + stat_val.st_mtime, nullptr, true, sub_ranges); ranges.push_back(range); SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cached_data, len, scenario)); @@ -1120,28 +1127,47 @@ void DiskIoMgrTest::CachedReadsTestBody(const char* data, const char* expected, // Test when some scan ranges are marked as being cached. TEST_F(DiskIoMgrTest, CachedReads) { InitRootReservation(LARGE_RESERVATION_LIMIT); - const char* data = "abcdefghijklm"; - // Test the fallback mechanism for the cache - CachedReadsTestBody(data, data, FALLBACK_NULL_BUFFER); - CachedReadsTestBody(data, data, FALLBACK_INCOMPLETE_BUFFER); - // Test the success path for the cache - CachedReadsTestBody(data, data, VALID_BUFFER); + std::string data_str = "abcdefghijklm"; + const char* data = data_str.data(); + + // Test at different offsets to validate offset handling + for (int offset: {0, 1000}) { + // The full data is 'offset' copies of 0 followed by the test string above. + std::string full_data_str(offset, '0'); + full_data_str += data_str; + const char* full_data = full_data_str.data(); + // Test the fallback mechanism for the cache + CachedReadsTestBody(full_data, data, FALLBACK_NULL_BUFFER, offset); + CachedReadsTestBody(full_data, data, FALLBACK_INCOMPLETE_BUFFER, offset); + // Test the success path for the cache + CachedReadsTestBody(full_data, data, VALID_BUFFER, offset); + } } // Test when some scan ranges are marked as being cached and there // are sub-ranges as well. TEST_F(DiskIoMgrTest, CachedReadsSubRanges) { InitRootReservation(LARGE_RESERVATION_LIMIT); - const char* data = "abcdefghijklm"; - int64_t data_len = strlen(data); + std::string data_str = "abcdefghijklm"; + const char* data = data_str.data(); + int64_t data_len = data_str.length(); // first iteration tests the fallback mechanism with sub-ranges // second iteration fakes a cache for (HdfsCachingScenario scenario : {VALID_BUFFER, FALLBACK_NULL_BUFFER, FALLBACK_INCOMPLETE_BUFFER}) { - CachedReadsTestBody(data, data, scenario, {{0, data_len}}); - CachedReadsTestBody(data, "bc", scenario, {{1, 2}}); - CachedReadsTestBody(data, "abchilm", scenario, {{0, 3}, {7, 2}, {11, 2}}); + // Test at different offsets to validate offset handling + for (int offset : {0, 1000}) { + // The full data is 'offset' copies of 0 followed by the test string above. + std::string full_data_str(offset, '0'); + full_data_str += data_str; + const char* full_data = full_data_str.data(); + CachedReadsTestBody(full_data, data, scenario, offset, + {{0 + offset, data_len}}); + CachedReadsTestBody(full_data, "bc", scenario, offset, {{1 + offset, 2}}); + CachedReadsTestBody(full_data, "abchilm", scenario, offset, + {{0 + offset, 3}, {7 + offset, 2}, {11 + offset, 2}}); + } } } diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc index ab44c83ba..463aa9c5b 100644 --- a/be/src/runtime/io/scan-range.cc +++ b/be/src/runtime/io/scan-range.cc @@ -342,9 +342,19 @@ Status ScanRange::ReadSubRanges( buffer_desc->buffer_len() - buffer_desc->len()); if (cache_.data != nullptr) { - DCHECK_LE(offset + bytes_to_read, cache_.len); + // The cache_.data buffer starts at offset_, so adjust the starting + // offset for the copies. + int64_t buffer_offset = offset - offset_; + DCHECK_LE(buffer_offset + bytes_to_read, cache_.len); + // DCHECKs are only effective with test coverage, so also return an error + // if this would read past the edge of the cache_.data buffer. We wanted + // bytes_to_read, but only cache_.len - buffer_offset bytes were available. + if (buffer_offset + bytes_to_read > cache_.len) { + return Status(TErrorCode::SCANNER_INCOMPLETE_READ, bytes_to_read, + cache_.len - buffer_offset, file(), offset); + } memcpy(buffer_desc->buffer_ + buffer_desc->len(), - cache_.data + offset, bytes_to_read); + cache_.data + buffer_offset, bytes_to_read); } else { int64_t current_bytes_read; Status read_status = file_reader->ReadFromPos(queue, offset,
