This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 15b07ff1f IMPALA-11704: (Addendum) fix crash on open for HDFS cache
15b07ff1f is described below
commit 15b07ff1fb348be2c75e2176e88feb5ef76fde42
Author: Michael Smith <[email protected]>
AuthorDate: Thu Nov 10 10:17:34 2022 -0800
IMPALA-11704: (Addendum) fix crash on open for HDFS cache
When trying to read from HDFS cache, ReadFromCache calls
FileReader::Open(false) to force the file to open. The prior commit for
IMPALA-11704 didn't allow for that case when using a data cache, as the
data cache check would always happen. This resulted in a crash calling
CachedFile as exclusive_hdfs_fh_ was nullptr. Tests only catch this when
reading from HDFS cache with data cache enabled.
Replaces explicit arguments to override FileReader behavior with a flag
to communicate whether FileReader supports delayed open. Then the caller
can choose whether to call Open before read. Also simplifies calls to
ReadFromPos as it already has a pointer to ScanRange and can check
whether file handle caching is enabled directly. The Open call in
DoInternalRead uses a slightly wider net by only checking UseDataCache.
If the data cache is unavailable or a miss the file will then be opened.
Adds a select from tpch.nation to the query for test_data_cache.py as
something that triggers checking the HDFS cache.
Change-Id: I741488d6195e586917de220a39090895886a2dc5
Reviewed-on: http://gerrit.cloudera.org:8080/19228
Reviewed-by: Joe McDonnell <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/runtime/io/cache-reader-test-stub.h | 5 ++---
be/src/runtime/io/file-reader.h | 12 +++++++-----
be/src/runtime/io/hdfs-file-reader.cc | 14 +++-----------
be/src/runtime/io/hdfs-file-reader.h | 6 +++---
be/src/runtime/io/local-file-reader.cc | 5 ++---
be/src/runtime/io/local-file-reader.h | 5 ++---
be/src/runtime/io/request-ranges.h | 5 ++---
be/src/runtime/io/scan-range.cc | 22 ++++++++++++----------
.../queries/QueryTest/data-cache.test | 6 ++++++
9 files changed, 39 insertions(+), 41 deletions(-)
diff --git a/be/src/runtime/io/cache-reader-test-stub.h
b/be/src/runtime/io/cache-reader-test-stub.h
index bc91974a4..00be66530 100644
--- a/be/src/runtime/io/cache-reader-test-stub.h
+++ b/be/src/runtime/io/cache-reader-test-stub.h
@@ -37,13 +37,12 @@ public:
~CacheReaderTestStub() {}
- virtual Status Open(bool use_file_handle_cache) override {
+ virtual Status Open() override {
return Status::OK();
}
virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t*
buffer,
- int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
- bool use_file_handle_cache) override {
+ int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override {
DCHECK(false);
return Status("Not implemented");
}
diff --git a/be/src/runtime/io/file-reader.h b/be/src/runtime/io/file-reader.h
index 742d7d8e2..3f703d914 100644
--- a/be/src/runtime/io/file-reader.h
+++ b/be/src/runtime/io/file-reader.h
@@ -42,8 +42,7 @@ public:
virtual ~FileReader() {}
/// Opens file that is associated with 'scan_range_'.
- /// 'use_file_handle_cache' currently only used by HdfsFileReader.
- virtual Status Open(bool use_file_handle_cache) = 0;
+ virtual Status Open() = 0;
/// Reads bytes from given position ('file_offset'). Tries to read
/// 'bytes_to_read' amount of bytes. 'bytes_read' contains the number of
@@ -51,12 +50,11 @@ public:
/// Metrics in 'queue' are updated with the size and latencies of the read
/// operations on the underlying file system.
virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t*
buffer,
- int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
- bool use_file_handle_cache) = 0;
+ int64_t bytes_to_read, int64_t* bytes_read, bool* eof) = 0;
/// ***Currently only for HDFS***
/// When successful, sets 'data' to a buffer that contains the contents of a
file,
- /// and 'length' is set to the length of the data.
+ /// and 'length' is set to the length of the data. Does not support delayed
open.
/// When unsuccessful, 'data' is set to nullptr.
virtual void CachedFile(uint8_t** data, int64_t* length) = 0;
@@ -64,6 +62,10 @@ public:
/// scan ranges.
virtual void Close() = 0;
+ /// Override to return true if the implementation supports ReadFromPos
without first
+ /// calling Open. This can be useful when caching data or file handles.
+ virtual bool SupportsDelayedOpen() const { return false; }
+
/// Resets internal bookkeeping
virtual void ResetState() {}
diff --git a/be/src/runtime/io/hdfs-file-reader.cc
b/be/src/runtime/io/hdfs-file-reader.cc
index 9dfef6aae..d880264c5 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -57,19 +57,11 @@ HdfsFileReader::~HdfsFileReader() {
DCHECK(cached_buffer_ == nullptr) << "Cached buffer was not released.";
}
-Status HdfsFileReader::Open(bool use_file_handle_cache) {
+Status HdfsFileReader::Open() {
unique_lock<SpinLock> hdfs_lock(lock_);
RETURN_IF_ERROR(scan_range_->cancel_status_);
if (exclusive_hdfs_fh_ != nullptr) return Status::OK();
- // If using file handle caching, the reader does not maintain its own
- // hdfs file handle, so it can skip opening a file handle.
- if (use_file_handle_cache) return Status::OK();
- if (scan_range_->UseDataCache() &&
- scan_range_->io_mgr_->remote_data_cache() != nullptr) {
- // Use delayed open when using a remote data cache.
- return Status::OK();
- }
return DoOpen();
}
@@ -115,7 +107,7 @@ std::string HdfsFileReader::GetHostList(int64_t file_offset,
}
Status HdfsFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset,
uint8_t* buffer,
- int64_t bytes_to_read, int64_t* bytes_read, bool* eof, bool
use_file_handle_cache) {
+ int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
DCHECK(scan_range_->read_in_flight());
DCHECK_GE(bytes_to_read, 0);
// Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
@@ -165,7 +157,7 @@ Status HdfsFileReader::ReadFromPos(DiskQueue* queue,
int64_t file_offset, uint8_
hdfsFile hdfs_file;
if (exclusive_hdfs_fh_ != nullptr) {
hdfs_file = exclusive_hdfs_fh_->file();
- } else if (use_file_handle_cache) {
+ } else if (scan_range_->FileHandleCacheEnabled()) {
RETURN_IF_ERROR(io_mgr->GetCachedHdfsFileHandle(hdfs_fs_,
scan_range_->file_string(), scan_range_->mtime(), request_context,
&accessor));
hdfs_file = accessor.Get()->file();
diff --git a/be/src/runtime/io/hdfs-file-reader.h
b/be/src/runtime/io/hdfs-file-reader.h
index 7f3660b21..dd421cd8c 100644
--- a/be/src/runtime/io/hdfs-file-reader.h
+++ b/be/src/runtime/io/hdfs-file-reader.h
@@ -34,11 +34,11 @@ public:
~HdfsFileReader();
- virtual Status Open(bool use_file_handle_cache) override;
+ virtual Status Open() override;
virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t*
buffer,
- int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
- bool use_file_handle_cache) override;
+ int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
virtual void Close() override;
+ virtual bool SupportsDelayedOpen() const override { return true; }
virtual void ResetState() override;
virtual std::string DebugString() const override;
diff --git a/be/src/runtime/io/local-file-reader.cc
b/be/src/runtime/io/local-file-reader.cc
index fa94c19f5..672239098 100644
--- a/be/src/runtime/io/local-file-reader.cc
+++ b/be/src/runtime/io/local-file-reader.cc
@@ -34,7 +34,7 @@ DECLARE_int32(stress_disk_read_delay_ms);
namespace impala {
namespace io {
-Status LocalFileReader::Open(bool use_file_handle_cache) {
+Status LocalFileReader::Open() {
unique_lock<SpinLock> fs_lock(lock_);
RETURN_IF_ERROR(scan_range_->cancel_status_);
@@ -52,8 +52,7 @@ Status LocalFileReader::Open(bool use_file_handle_cache) {
}
Status LocalFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset,
- uint8_t* buffer, int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
- bool use_file_handle_cache) {
+ uint8_t* buffer, int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
DCHECK(scan_range_->read_in_flight());
DCHECK_GE(bytes_to_read, 0);
// Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
diff --git a/be/src/runtime/io/local-file-reader.h
b/be/src/runtime/io/local-file-reader.h
index af15dfc80..fb4171cec 100644
--- a/be/src/runtime/io/local-file-reader.h
+++ b/be/src/runtime/io/local-file-reader.h
@@ -29,10 +29,9 @@ class LocalFileReader : public FileReader {
LocalFileReader(ScanRange* scan_range) : FileReader(scan_range) {}
~LocalFileReader() {}
- virtual Status Open(bool use_file_handle_cache) override;
+ virtual Status Open() override;
virtual Status ReadFromPos(DiskQueue* disk_queue, int64_t file_offset,
uint8_t* buffer,
- int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
- bool use_file_handle_cache) override;
+ int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
/// We don't cache files of the local file system.
virtual void CachedFile(uint8_t** data, int64_t* length) override;
virtual void Close() override;
diff --git a/be/src/runtime/io/request-ranges.h
b/be/src/runtime/io/request-ranges.h
index 5121d6acc..01bbe2c01 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -420,7 +420,7 @@ class ScanRange : public RequestRange {
boost::shared_lock<boost::shared_mutex>* local_file_lock = nullptr);
/// Whether to use file handle caching for the current file.
- bool FileHandleCacheEnabled();
+ bool FileHandleCacheEnabled() const;
/// Same as Cancel() except it doesn't remove the scan range from
/// reader_->active_scan_ranges_ or call WaitForInFlightRead(). This allows
for
@@ -488,8 +488,7 @@ class ScanRange : public RequestRange {
/// files. 'queue' is updated with the latencies and sizes of reads from the
underlying
/// filesystem.
Status ReadSubRanges(
- DiskQueue* queue, BufferDescriptor* buffer, bool* eof, FileReader*
file_reader,
- bool use_file_handle_cache);
+ DiskQueue* queue, BufferDescriptor* buffer, bool* eof, FileReader*
file_reader);
/// Validates the internal state of this range. lock_ must be taken
/// before calling this. Need to take a lock on 'lock_' via
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 57d67bbfa..693807358 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -123,7 +123,7 @@ void
ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers,
if (unblocked) ScheduleScanRange();
}
-bool ScanRange::FileHandleCacheEnabled() {
+bool ScanRange::FileHandleCacheEnabled() const {
// Global flag for all file handle caching
if (!is_file_handle_caching_enabled()) return false;
@@ -224,8 +224,12 @@ ReadOutcome ScanRange::DoReadInternal(DiskQueue* queue,
int disk_id, bool use_lo
VLOG_FILE << (use_file_handle_cache ? "Using" : "Skipping")
<< " file handle cache for " << (expected_local_ ? "local" :
"remote")
<< " file " << file();
-
- read_status = file_reader->Open(use_file_handle_cache);
+ // Delay open if configured to use a file handle cache or data cache as
cache hits
+ // don't require an explicit Open.
+ if (!file_reader->SupportsDelayedOpen()
+ || !(use_file_handle_cache || UseDataCache())) {
+ read_status = file_reader->Open();
+ }
if (read_status.ok()) {
COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL <<
disk_id);
@@ -235,10 +239,9 @@ ReadOutcome ScanRange::DoReadInternal(DiskQueue* queue,
int disk_id, bool use_lo
read_status =
file_reader->ReadFromPos(queue, offset_ + bytes_read_,
buffer_desc->buffer_,
min(bytes_to_read() - bytes_read_, buffer_desc->buffer_len_),
- &buffer_desc->len_, &eof, use_file_handle_cache);
+ &buffer_desc->len_, &eof);
} else {
- read_status = ReadSubRanges(queue, buffer_desc.get(), &eof,
file_reader,
- use_file_handle_cache);
+ read_status = ReadSubRanges(queue, buffer_desc.get(), &eof,
file_reader);
}
COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
@@ -329,8 +332,7 @@ ReadOutcome ScanRange::DoRead(DiskQueue* queue, int
disk_id) {
}
Status ScanRange::ReadSubRanges(
- DiskQueue* queue, BufferDescriptor* buffer_desc, bool* eof, FileReader*
file_reader,
- bool use_file_handle_cache) {
+ DiskQueue* queue, BufferDescriptor* buffer_desc, bool* eof, FileReader*
file_reader) {
buffer_desc->len_ = 0;
while (buffer_desc->len() < buffer_desc->buffer_len()
&& sub_range_pos_.index < sub_ranges_.size()) {
@@ -346,7 +348,7 @@ Status ScanRange::ReadSubRanges(
int64_t current_bytes_read;
Status read_status = file_reader->ReadFromPos(queue, offset,
buffer_desc->buffer_ + buffer_desc->len(), bytes_to_read,
¤t_bytes_read,
- eof, use_file_handle_cache);
+ eof);
if (!read_status.ok()) return read_status;
if (current_bytes_read != bytes_to_read) {
DCHECK(*eof);
@@ -610,7 +612,7 @@ Status ScanRange::ReadFromCache(
DCHECK(UseHdfsCache());
DCHECK_EQ(bytes_read_, 0);
*read_succeeded = false;
- Status status = file_reader_->Open(false);
+ Status status = file_reader_->Open();
if (!status.ok()) return status;
// Check cancel status.
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/data-cache.test
b/testdata/workloads/functional-query/queries/QueryTest/data-cache.test
index 3f0564ead..2c5b0782d 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/data-cache.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/data-cache.test
@@ -49,3 +49,9 @@ row_regex: .*DataCacheHitCount: 0 \(0\).*
row_regex: .*DataCacheMissCount: 2 \(2\).*
row_regex: .*DataCachePartialHitCount: 0 \(0\).*
====
+---- QUERY
+# Exercise HDFS cache
+select count(*) from tpch.nation;
+---- RESULTS
+25
+====