This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 017175558 IMPALA-12123: Fix crash triggered by incomplete HDFS cache
reads
017175558 is described below
commit 017175558341204bc32dd7998b245a12995234d7
Author: Joe McDonnell <[email protected]>
AuthorDate: Wed May 10 15:26:44 2023 -0700
IMPALA-12123: Fix crash triggered by incomplete HDFS cache reads
When using HDFS caching, the HDFS cache may not have the full
buffer in memory, and it can return a buffer that is incomplete.
In this case, the code falls back to the ordinary read path.
However, the ScanRange cache_ structure is still set up, and
the code in ScanRange::ReadSubRanges() tries to use it. This
can crash, because the buffer is too short (and may have been
freed).
This changes the code to null out the cache_ data structure
when there is an incomplete read from the HDFS cache.
Testing:
- Reproduced the crash stack manually by putting a Parquet
file with a page index in HDFS cache and manually forcing
it down the incomplete read codepath.
- Modified the disk-io-mgr-test and CacheReaderTestStub to
simulate the incomplete read case. The test will hit a
DCHECK or crash without this fixup.
Change-Id: I51d8be6c03716badee81675447ed94ae6249b21b
Reviewed-on: http://gerrit.cloudera.org:8080/19869
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Zoltan Borok-Nagy <[email protected]>
---
be/src/runtime/io/cache-reader-test-stub.h | 74 ++++++++++++++++++++++--------
be/src/runtime/io/disk-io-mgr-test.cc | 55 +++++++++++-----------
be/src/runtime/io/scan-range.cc | 5 ++
3 files changed, 87 insertions(+), 47 deletions(-)
diff --git a/be/src/runtime/io/cache-reader-test-stub.h
b/be/src/runtime/io/cache-reader-test-stub.h
index 00be66530..e195c8a2c 100644
--- a/be/src/runtime/io/cache-reader-test-stub.h
+++ b/be/src/runtime/io/cache-reader-test-stub.h
@@ -17,45 +17,79 @@
#pragma once
-#include "runtime/io/file-reader.h"
+#include "runtime/io/local-file-reader.h"
#include "runtime/io/request-ranges.h"
namespace impala {
namespace io {
-/// Only for testing the code path when reading from the cache is successful.
-/// Takes a pointer to a buffer in its constructor, also the length of this
buffer.
-/// CachedFile() simply returns the pointer and length.
-/// Invoking ReadFromPos() on it results in an error.
-class CacheReaderTestStub : public FileReader {
+// Test scenarios:
+// VALID_BUFFER - Successful HDFS cache read returns the whole buffer / length
+// FALLBACK_NULL_BUFFER - Failed HDFS cache read returns null / length = 0
+// FALLBACK_INCOMPLETE_BUFFER - Failed HDFS cache read returns invalid buffer
+// with a short length
+enum HdfsCachingScenario {
+ VALID_BUFFER,
+ FALLBACK_NULL_BUFFER,
+ FALLBACK_INCOMPLETE_BUFFER
+};
+
+/// This simulates reads from HDFS caching for the successful path
(VALID_BUFFER)
+/// and some unsuccessful paths (FALLBACK*) that should fall back to the normal
+/// file read path.
+/// Takes a pointer to a buffer, the length of the buffer, and an indicator of
+/// what scenario to test. The scenario determins what CachedFile() returns.
+/// See the description of HdfsCachingScenario above.
+/// All other methods fall through to the LocalFileReader, so fallback reads
+/// can succeed for applicable scenarios. The passthrough of Open() and Close()
+/// are harmless for the successful cache scenario.
+class CacheReaderTestStub : public LocalFileReader {
public:
- CacheReaderTestStub(ScanRange* scan_range, uint8_t* cache, int64_t length) :
- FileReader(scan_range),
+ CacheReaderTestStub(ScanRange* scan_range, uint8_t* cache, int64_t length,
+ HdfsCachingScenario scenario) :
+ LocalFileReader(scan_range),
cache_(cache),
- length_(length) {
+ length_(length),
+ scenario_(scenario){
}
~CacheReaderTestStub() {}
- virtual Status Open() override {
- return Status::OK();
- }
-
- virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t*
buffer,
- int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override {
- DCHECK(false);
- return Status("Not implemented");
+ virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset,
+ uint8_t* buffer, int64_t bytes_to_read, int64_t* bytes_read, bool* eof)
override {
+ // This should not be reached for the VALID_BUFFER scenario, because
+ // the reads will come from the cached buffer.
+ DCHECK_NE(scenario_, VALID_BUFFER);
+ return LocalFileReader::ReadFromPos(queue, file_offset, buffer,
bytes_to_read,
+ bytes_read, eof);
}
virtual void CachedFile(uint8_t** data, int64_t* length) override {
- *length = length_;
- *data = cache_;
+ switch (scenario_) {
+ case VALID_BUFFER:
+ *length = length_;
+ *data = cache_;
+ break;
+ case FALLBACK_NULL_BUFFER:
+ *length = 0;
+ *data = nullptr;
+ break;
+ case FALLBACK_INCOMPLETE_BUFFER:
+ // Use a fake too-short length and fake non-null buffer
+ // The buffer should not be dereferenced in this scenario, so
+ // it is useful for it to be invalid.
+ *length = 1;
+ *data = (uint8_t *) 0x1;
+ break;
+ default:
+ DCHECK(false) << "Invalid HdfsCachingScenario value";
+ }
}
- virtual void Close() override {}
private:
uint8_t* cache_ = nullptr;
int64_t length_ = 0;
+ HdfsCachingScenario scenario_;
};
}
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc
b/be/src/runtime/io/disk-io-mgr-test.cc
index f50fb7efa..7eb8984b1 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -323,7 +323,7 @@ class DiskIoMgrTest : public testing::Test {
vector<ScanRange::SubRange> sub_ranges = {});
void CachedReadsTestBody(const char* data, const char* expected,
- bool fake_cache, vector<ScanRange::SubRange> sub_ranges = {});
+ HdfsCachingScenario scenario, vector<ScanRange::SubRange> sub_ranges =
{});
/// Convenience function to get a reference to the buffer pool.
BufferPool* buffer_pool() const { return
ExecEnv::GetInstance()->buffer_pool(); }
@@ -1050,7 +1050,7 @@ TEST_F(DiskIoMgrTest, MemScarcity) {
}
void DiskIoMgrTest::CachedReadsTestBody(const char* data, const char* expected,
- bool fake_cache, vector<ScanRange::SubRange> sub_ranges) {
+ HdfsCachingScenario scenario, vector<ScanRange::SubRange> sub_ranges) {
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
uint8_t* cached_data = reinterpret_cast<uint8_t*>(const_cast<char*>(data));
int len = strlen(data);
@@ -1073,10 +1073,8 @@ void DiskIoMgrTest::CachedReadsTestBody(const char*
data, const char* expected,
ScanRange* complete_range =
InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime,
nullptr, true,
sub_ranges);
- if (fake_cache) {
- SetReaderStub(complete_range, make_unique<CacheReaderTestStub>(
- complete_range, cached_data, len));
- }
+ SetReaderStub(complete_range,
make_unique<CacheReaderTestStub>(complete_range,
+ cached_data, len, scenario));
// Issue some reads before the async ones are issued
ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range,
expected);
@@ -1088,9 +1086,8 @@ void DiskIoMgrTest::CachedReadsTestBody(const char* data,
const char* expected,
ScanRange* range = InitRange(&pool_, tmp_file, 0, len, disk_id,
stat_val.st_mtime,
nullptr, true, sub_ranges);
ranges.push_back(range);
- if (fake_cache) {
- SetReaderStub(range, make_unique<CacheReaderTestStub>(range,
cached_data, len));
- }
+ SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cached_data,
+ len, scenario));
}
ASSERT_OK(reader->AddScanRanges(ranges, EnqueueLocation::TAIL));
@@ -1124,10 +1121,11 @@ void DiskIoMgrTest::CachedReadsTestBody(const char*
data, const char* expected,
TEST_F(DiskIoMgrTest, CachedReads) {
InitRootReservation(LARGE_RESERVATION_LIMIT);
const char* data = "abcdefghijklm";
- // Don't fake the cache, i.e. test the fallback mechanism
- CachedReadsTestBody(data, data, false);
- // Fake the test with a file reader stub.
- CachedReadsTestBody(data, data, true);
+ // Test the fallback mechanism for the cache
+ CachedReadsTestBody(data, data, FALLBACK_NULL_BUFFER);
+ CachedReadsTestBody(data, data, FALLBACK_INCOMPLETE_BUFFER);
+ // Test the success path for the cache
+ CachedReadsTestBody(data, data, VALID_BUFFER);
}
// Test when some scan ranges are marked as being cached and there
@@ -1139,10 +1137,11 @@ TEST_F(DiskIoMgrTest, CachedReadsSubRanges) {
// first iteration tests the fallback mechanism with sub-ranges
// second iteration fakes a cache
- for (bool fake_cache : {false, true}) {
- CachedReadsTestBody(data, data, fake_cache, {{0, data_len}});
- CachedReadsTestBody(data, "bc", fake_cache, {{1, 2}});
- CachedReadsTestBody(data, "abchilm", fake_cache, {{0, 3}, {7, 2}, {11,
2}});
+ for (HdfsCachingScenario scenario :
+ {VALID_BUFFER, FALLBACK_NULL_BUFFER, FALLBACK_INCOMPLETE_BUFFER}) {
+ CachedReadsTestBody(data, data, scenario, {{0, data_len}});
+ CachedReadsTestBody(data, "bc", scenario, {{1, 2}});
+ CachedReadsTestBody(data, "abchilm", scenario, {{0, 3}, {7, 2}, {11, 2}});
}
}
@@ -1635,18 +1634,19 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
// Reader doesn't need to provide client if it's providing buffers.
unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
- auto test_case = [&](bool fake_cache, const char* expected_result,
+ auto test_case = [&](HdfsCachingScenario scenario, const char*
expected_result,
vector<ScanRange::SubRange> sub_ranges) {
int result_len = strlen(expected_result);
vector<uint8_t> client_buffer(result_len);
ScanRange* range = pool_.Add(new ScanRange);
- int cache_options = fake_cache ? BufferOpts::USE_HDFS_CACHE :
BufferOpts::NO_CACHING;
+ // Note: Even though this specifies HDFS caching, some scenarios will fall
back
+ // to doing regular reads.
+ int cache_options = BufferOpts::USE_HDFS_CACHE;
range->Reset(ScanRange::FileInfo{tmp_file, nullptr, stat_val.st_mtime},
data_len, 0,
0, true, BufferOpts::ReadInto(cache_options, client_buffer.data(),
result_len),
move(sub_ranges));
- if (fake_cache) {
- SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache,
data_len));
- }
+ SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache,
data_len,
+ scenario));
bool needs_buffers;
ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
ASSERT_FALSE(needs_buffers);
@@ -1663,11 +1663,12 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
range->ReturnBuffer(move(io_buffer));
};
- for (bool fake_cache : {false, true}) {
- test_case(fake_cache, data, {{0, data_len}});
- test_case(fake_cache, data, {{0, 15}, {15, data_len - 15}});
- test_case(fake_cache, "quick fox", {{4, 5}, {15, 4}});
- test_case(fake_cache, "the brown dog", {{0, 3}, {9, 6}, {data_len - 4,
4}});
+ for (HdfsCachingScenario scenario :
+ {VALID_BUFFER, FALLBACK_NULL_BUFFER, FALLBACK_INCOMPLETE_BUFFER}) {
+ test_case(scenario, data, {{0, data_len}});
+ test_case(scenario, data, {{0, 15}, {15, data_len - 15}});
+ test_case(scenario, "quick fox", {{4, 5}, {15, 4}});
+ test_case(scenario, "the brown dog", {{0, 3}, {9, 6}, {data_len - 4, 4}});
}
io_mgr->UnregisterContext(reader.get());
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 5dd5eae8f..ab44c83ba 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -342,6 +342,7 @@ Status ScanRange::ReadSubRanges(
buffer_desc->buffer_len() - buffer_desc->len());
if (cache_.data != nullptr) {
+ DCHECK_LE(offset + bytes_to_read, cache_.len);
memcpy(buffer_desc->buffer_ + buffer_desc->len(),
cache_.data + offset, bytes_to_read);
} else {
@@ -637,6 +638,10 @@ Status ScanRange::ReadFromCache(
if (cache_.len < len()) {
VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ".
Expected "
<< len() << " bytes, but read " << cache_.len << ". Switching to disk
read path.";
+ // Null out the cache buffer to avoid any interactions when this falls
+ // back to the regular read path.
+ cache_.len = 0;
+ cache_.data = nullptr;
// Close the scan range. 'read_succeeded' is still false, so the caller
will fall back
// to non-cached read of this scan range.
file_reader_->Close();