This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 15b07ff1f IMPALA-11704: (Addendum) fix crash on open for HDFS cache
15b07ff1f is described below

commit 15b07ff1fb348be2c75e2176e88feb5ef76fde42
Author: Michael Smith <[email protected]>
AuthorDate: Thu Nov 10 10:17:34 2022 -0800

    IMPALA-11704: (Addendum) fix crash on open for HDFS cache
    
    When trying to read from HDFS cache, ReadFromCache calls
    FileReader::Open(false) to force the file to open. The prior commit for
    IMPALA-11704 didn't allow for that case when using a data cache, as the
    data cache check would always happen. This resulted in a crash calling
    CachedFile as exclusive_hdfs_fh_ was nullptr. Tests only catch this when
    reading from HDFS cache with data cache enabled.
    
    Replaces explicit arguments to override FileReader behavior with a flag
    to communicate whether FileReader supports delayed open. Then the caller
    can choose whether to call Open before read. Also simplifies calls to
    ReadFromPos as it already has a pointer to ScanRange and can check
    whether file handle caching is enabled directly. The Open call in
    DoInternalRead uses a slightly wider net by only checking UseDataCache.
    If the data cache is unavailable or a miss the file will then be opened.
    
    Adds a select from tpch.nation to the query for test_data_cache.py as
    something that triggers checking the HDFS cache.
    
    Change-Id: I741488d6195e586917de220a39090895886a2dc5
    Reviewed-on: http://gerrit.cloudera.org:8080/19228
    Reviewed-by: Joe McDonnell <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/io/cache-reader-test-stub.h         |  5 ++---
 be/src/runtime/io/file-reader.h                    | 12 +++++++-----
 be/src/runtime/io/hdfs-file-reader.cc              | 14 +++-----------
 be/src/runtime/io/hdfs-file-reader.h               |  6 +++---
 be/src/runtime/io/local-file-reader.cc             |  5 ++---
 be/src/runtime/io/local-file-reader.h              |  5 ++---
 be/src/runtime/io/request-ranges.h                 |  5 ++---
 be/src/runtime/io/scan-range.cc                    | 22 ++++++++++++----------
 .../queries/QueryTest/data-cache.test              |  6 ++++++
 9 files changed, 39 insertions(+), 41 deletions(-)

diff --git a/be/src/runtime/io/cache-reader-test-stub.h 
b/be/src/runtime/io/cache-reader-test-stub.h
index bc91974a4..00be66530 100644
--- a/be/src/runtime/io/cache-reader-test-stub.h
+++ b/be/src/runtime/io/cache-reader-test-stub.h
@@ -37,13 +37,12 @@ public:
 
   ~CacheReaderTestStub() {}
 
-  virtual Status Open(bool use_file_handle_cache) override {
+  virtual Status Open() override {
     return Status::OK();
   }
 
   virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* 
buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
-      bool use_file_handle_cache) override {
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override {
     DCHECK(false);
     return Status("Not implemented");
   }
diff --git a/be/src/runtime/io/file-reader.h b/be/src/runtime/io/file-reader.h
index 742d7d8e2..3f703d914 100644
--- a/be/src/runtime/io/file-reader.h
+++ b/be/src/runtime/io/file-reader.h
@@ -42,8 +42,7 @@ public:
   virtual ~FileReader() {}
 
   /// Opens file that is associated with 'scan_range_'.
-  /// 'use_file_handle_cache' currently only used by HdfsFileReader.
-  virtual Status Open(bool use_file_handle_cache) = 0;
+  virtual Status Open() = 0;
 
   /// Reads bytes from given position ('file_offset'). Tries to read
   /// 'bytes_to_read' amount of bytes. 'bytes_read' contains the number of
@@ -51,12 +50,11 @@ public:
   /// Metrics in 'queue' are updated with the size and latencies of the read
   /// operations on the underlying file system.
   virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* 
buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
-      bool use_file_handle_cache) = 0;
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) = 0;
 
   /// ***Currently only for HDFS***
   /// When successful, sets 'data' to a buffer that contains the contents of a 
file,
-  /// and 'length' is set to the length of the data.
+  /// and 'length' is set to the length of the data. Does not support delayed 
open.
   /// When unsuccessful, 'data' is set to nullptr.
   virtual void CachedFile(uint8_t** data, int64_t* length) = 0;
 
@@ -64,6 +62,10 @@ public:
   /// scan ranges.
   virtual void Close() = 0;
 
+  /// Override to return true if the implementation supports ReadFromPos 
without first
+  /// calling Open. This can be useful when caching data or file handles.
+  virtual bool SupportsDelayedOpen() const { return false; }
+
   /// Resets internal bookkeeping
   virtual void ResetState() {}
 
diff --git a/be/src/runtime/io/hdfs-file-reader.cc 
b/be/src/runtime/io/hdfs-file-reader.cc
index 9dfef6aae..d880264c5 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -57,19 +57,11 @@ HdfsFileReader::~HdfsFileReader() {
   DCHECK(cached_buffer_ == nullptr) << "Cached buffer was not released.";
 }
 
-Status HdfsFileReader::Open(bool use_file_handle_cache) {
+Status HdfsFileReader::Open() {
   unique_lock<SpinLock> hdfs_lock(lock_);
   RETURN_IF_ERROR(scan_range_->cancel_status_);
 
   if (exclusive_hdfs_fh_ != nullptr) return Status::OK();
-  // If using file handle caching, the reader does not maintain its own
-  // hdfs file handle, so it can skip opening a file handle.
-  if (use_file_handle_cache) return Status::OK();
-  if (scan_range_->UseDataCache() &&
-      scan_range_->io_mgr_->remote_data_cache() != nullptr) {
-    // Use delayed open when using a remote data cache.
-    return Status::OK();
-  }
   return DoOpen();
 }
 
@@ -115,7 +107,7 @@ std::string HdfsFileReader::GetHostList(int64_t file_offset,
 }
 
 Status HdfsFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset, 
uint8_t* buffer,
-    int64_t bytes_to_read, int64_t* bytes_read, bool* eof, bool 
use_file_handle_cache) {
+    int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
   DCHECK(scan_range_->read_in_flight());
   DCHECK_GE(bytes_to_read, 0);
   // Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
@@ -165,7 +157,7 @@ Status HdfsFileReader::ReadFromPos(DiskQueue* queue, 
int64_t file_offset, uint8_
     hdfsFile hdfs_file;
     if (exclusive_hdfs_fh_ != nullptr) {
       hdfs_file = exclusive_hdfs_fh_->file();
-    } else if (use_file_handle_cache) {
+    } else if (scan_range_->FileHandleCacheEnabled()) {
       RETURN_IF_ERROR(io_mgr->GetCachedHdfsFileHandle(hdfs_fs_,
           scan_range_->file_string(), scan_range_->mtime(), request_context, 
&accessor));
       hdfs_file = accessor.Get()->file();
diff --git a/be/src/runtime/io/hdfs-file-reader.h 
b/be/src/runtime/io/hdfs-file-reader.h
index 7f3660b21..dd421cd8c 100644
--- a/be/src/runtime/io/hdfs-file-reader.h
+++ b/be/src/runtime/io/hdfs-file-reader.h
@@ -34,11 +34,11 @@ public:
 
   ~HdfsFileReader();
 
-  virtual Status Open(bool use_file_handle_cache) override;
+  virtual Status Open() override;
   virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* 
buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
-      bool use_file_handle_cache) override;
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
   virtual void Close() override;
+  virtual bool SupportsDelayedOpen() const override { return true; }
   virtual void ResetState() override;
   virtual std::string DebugString() const override;
 
diff --git a/be/src/runtime/io/local-file-reader.cc 
b/be/src/runtime/io/local-file-reader.cc
index fa94c19f5..672239098 100644
--- a/be/src/runtime/io/local-file-reader.cc
+++ b/be/src/runtime/io/local-file-reader.cc
@@ -34,7 +34,7 @@ DECLARE_int32(stress_disk_read_delay_ms);
 namespace impala {
 namespace io {
 
-Status LocalFileReader::Open(bool use_file_handle_cache) {
+Status LocalFileReader::Open() {
   unique_lock<SpinLock> fs_lock(lock_);
   RETURN_IF_ERROR(scan_range_->cancel_status_);
 
@@ -52,8 +52,7 @@ Status LocalFileReader::Open(bool use_file_handle_cache) {
 }
 
 Status LocalFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset,
-    uint8_t* buffer, int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
-    bool use_file_handle_cache) {
+    uint8_t* buffer, int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
   DCHECK(scan_range_->read_in_flight());
   DCHECK_GE(bytes_to_read, 0);
   // Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
diff --git a/be/src/runtime/io/local-file-reader.h 
b/be/src/runtime/io/local-file-reader.h
index af15dfc80..fb4171cec 100644
--- a/be/src/runtime/io/local-file-reader.h
+++ b/be/src/runtime/io/local-file-reader.h
@@ -29,10 +29,9 @@ class LocalFileReader : public FileReader {
   LocalFileReader(ScanRange* scan_range) : FileReader(scan_range) {}
   ~LocalFileReader() {}
 
-  virtual Status Open(bool use_file_handle_cache) override;
+  virtual Status Open() override;
   virtual Status ReadFromPos(DiskQueue* disk_queue, int64_t file_offset, 
uint8_t* buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
-      bool use_file_handle_cache) override;
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
   /// We don't cache files of the local file system.
   virtual void CachedFile(uint8_t** data, int64_t* length) override;
   virtual void Close() override;
diff --git a/be/src/runtime/io/request-ranges.h 
b/be/src/runtime/io/request-ranges.h
index 5121d6acc..01bbe2c01 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -420,7 +420,7 @@ class ScanRange : public RequestRange {
       boost::shared_lock<boost::shared_mutex>* local_file_lock = nullptr);
 
   /// Whether to use file handle caching for the current file.
-  bool FileHandleCacheEnabled();
+  bool FileHandleCacheEnabled() const;
 
   /// Same as Cancel() except it doesn't remove the scan range from
   /// reader_->active_scan_ranges_ or call WaitForInFlightRead(). This allows 
for
@@ -488,8 +488,7 @@ class ScanRange : public RequestRange {
   /// files. 'queue' is updated with the latencies and sizes of reads from the 
underlying
   /// filesystem.
   Status ReadSubRanges(
-      DiskQueue* queue, BufferDescriptor* buffer, bool* eof, FileReader* 
file_reader,
-      bool use_file_handle_cache);
+      DiskQueue* queue, BufferDescriptor* buffer, bool* eof, FileReader* 
file_reader);
 
   /// Validates the internal state of this range. lock_ must be taken
   /// before calling this. Need to take a lock on 'lock_' via
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 57d67bbfa..693807358 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -123,7 +123,7 @@ void 
ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers,
   if (unblocked) ScheduleScanRange();
 }
 
-bool ScanRange::FileHandleCacheEnabled() {
+bool ScanRange::FileHandleCacheEnabled() const {
   // Global flag for all file handle caching
   if (!is_file_handle_caching_enabled()) return false;
 
@@ -224,8 +224,12 @@ ReadOutcome ScanRange::DoReadInternal(DiskQueue* queue, 
int disk_id, bool use_lo
     VLOG_FILE << (use_file_handle_cache ? "Using" : "Skipping")
               << " file handle cache for " << (expected_local_ ? "local" : 
"remote")
               << " file " << file();
-
-    read_status = file_reader->Open(use_file_handle_cache);
+    // Delay open if configured to use a file handle cache or data cache as 
cache hits
+    // don't require an explicit Open.
+    if (!file_reader->SupportsDelayedOpen()
+        || !(use_file_handle_cache || UseDataCache())) {
+      read_status = file_reader->Open();
+    }
     if (read_status.ok()) {
       COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
       COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << 
disk_id);
@@ -235,10 +239,9 @@ ReadOutcome ScanRange::DoReadInternal(DiskQueue* queue, 
int disk_id, bool use_lo
         read_status =
             file_reader->ReadFromPos(queue, offset_ + bytes_read_, 
buffer_desc->buffer_,
                 min(bytes_to_read() - bytes_read_, buffer_desc->buffer_len_),
-                &buffer_desc->len_, &eof, use_file_handle_cache);
+                &buffer_desc->len_, &eof);
       } else {
-        read_status = ReadSubRanges(queue, buffer_desc.get(), &eof, 
file_reader,
-            use_file_handle_cache);
+        read_status = ReadSubRanges(queue, buffer_desc.get(), &eof, 
file_reader);
       }
 
       COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
@@ -329,8 +332,7 @@ ReadOutcome ScanRange::DoRead(DiskQueue* queue, int 
disk_id) {
 }
 
 Status ScanRange::ReadSubRanges(
-    DiskQueue* queue, BufferDescriptor* buffer_desc, bool* eof, FileReader* 
file_reader,
-    bool use_file_handle_cache) {
+    DiskQueue* queue, BufferDescriptor* buffer_desc, bool* eof, FileReader* 
file_reader) {
   buffer_desc->len_ = 0;
   while (buffer_desc->len() < buffer_desc->buffer_len()
       && sub_range_pos_.index < sub_ranges_.size()) {
@@ -346,7 +348,7 @@ Status ScanRange::ReadSubRanges(
       int64_t current_bytes_read;
       Status read_status = file_reader->ReadFromPos(queue, offset,
           buffer_desc->buffer_ + buffer_desc->len(), bytes_to_read, 
&current_bytes_read,
-          eof, use_file_handle_cache);
+          eof);
       if (!read_status.ok()) return read_status;
       if (current_bytes_read != bytes_to_read) {
         DCHECK(*eof);
@@ -610,7 +612,7 @@ Status ScanRange::ReadFromCache(
   DCHECK(UseHdfsCache());
   DCHECK_EQ(bytes_read_, 0);
   *read_succeeded = false;
-  Status status = file_reader_->Open(false);
+  Status status = file_reader_->Open();
   if (!status.ok()) return status;
 
   // Check cancel status.
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/data-cache.test 
b/testdata/workloads/functional-query/queries/QueryTest/data-cache.test
index 3f0564ead..2c5b0782d 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/data-cache.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/data-cache.test
@@ -49,3 +49,9 @@ row_regex: .*DataCacheHitCount: 0 \(0\).*
 row_regex: .*DataCacheMissCount: 2 \(2\).*
 row_regex: .*DataCachePartialHitCount: 0 \(0\).*
 ====
+---- QUERY
+# Exercise HDFS cache
+select count(*) from tpch.nation;
+---- RESULTS
+25
+====

Reply via email to