Repository: incubator-impala
Updated Branches:
  refs/heads/master 330558e46 -> 235651668


IMPALA-5488: Fix handling of exclusive HDFS file handles

This change fixes three issues:
1. File handle caching is expected to be disabled for
remote files (using exclusive HDFS file handles),
however the file handles are still being cached.
2. The retry logic for exclusive file handles is broken,
leading the number of open files to be incorrect.
3. There is no test coverage for disabling the file
handle cache.

To fix issue #1, when a scan range is requesting an
exclusive file handle from the cache, it will always
request a newly opened file handle. It also will destroy
the file handle when the scan range is closed.

To fix issue #2, exclusive file handles will no longer
retry IOs. Since the exclusive file handle is always
a fresh file handle, it will never have a bad file
handle from the cache. This returns the logic to
its state before IMPALA-4623 in these cases. If a
file handle is borrowed from the cache, then the
code will continue to retry once with a fresh handle.

To fix issue #3, custom_cluster/test_hdfs_fd_caching.py
now does both positive and negative tests for the file
handle cache. It verifies that setting
max_cached_file_handles to zero disables caching. It
also verifies that caching is disabled on remote
files. (This change will resolve IMPALA-5390.)

Change-Id: I4c03696984285cc9ce463edd969c5149cd83a861
Reviewed-on: http://gerrit.cloudera.org:8080/7181
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/53287df0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/53287df0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/53287df0

Branch: refs/heads/master
Commit: 53287df0a11ba43ff12067267cf2e3b0ffad075c
Parents: 330558e
Author: Joe McDonnell <[email protected]>
Authored: Wed Jun 14 11:29:38 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Wed Jun 21 09:42:34 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/disk-io-mgr-scan-range.cc     | 60 ++++++-----------
 be/src/runtime/disk-io-mgr.cc                | 18 ++++--
 be/src/runtime/disk-io-mgr.h                 | 20 ++++--
 tests/common/skip.py                         |  5 +-
 tests/custom_cluster/test_hdfs_fd_caching.py | 78 +++++++++++++++++------
 5 files changed, 106 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/53287df0/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc 
b/be/src/runtime/disk-io-mgr-scan-range.cc
index d909b94..113f15b 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -289,29 +289,19 @@ Status DiskIoMgr::ScanRange::Open(bool 
use_file_handle_cache) {
     // so s3 and remote filesystems should obtain an exclusive file handle
     // for each scan range.
     if (use_file_handle_cache && expected_local_) return Status::OK();
+    // Get a new exclusive file handle.
     exclusive_hdfs_fh_ = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
-        mtime(), reader_);
+        mtime(), reader_, true);
     if (exclusive_hdfs_fh_ == nullptr) {
       return Status(GetHdfsErrorMsg("Failed to open HDFS file ", file_));
     }
 
-    int num_retries = 0;
-    while (true) {
-      if (hdfsSeek(fs_, exclusive_hdfs_fh_->file(), offset_) == 0) break;
-      // Seek failed. If already retried once, return error. Otherwise, destroy
-      // the current file handle and retry with a new handle.
-      DCHECK_LE(num_retries, 1);
-      if (num_retries == 1) {
-        io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), 
exclusive_hdfs_fh_);
-        exclusive_hdfs_fh_ = nullptr;
-        string error_msg = GetHdfsErrorMsg("");
-        stringstream ss;
-        ss << "Error seeking to " << offset_ << " in file: " << file_ << " " 
<< error_msg;
-        return Status(ss.str());
-      }
-      ++num_retries;
-      RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(),
-          mtime(), &exclusive_hdfs_fh_));
+    if (hdfsSeek(fs_, exclusive_hdfs_fh_->file(), offset_) != 0) {
+      // Destroy the file handle and remove it from the cache.
+      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, 
true);
+      exclusive_hdfs_fh_ = nullptr;
+      return Status(Substitute("Error seeking to $0 in file: $1 $2", offset_, 
file_,
+          GetHdfsErrorMsg("")));
     }
   } else {
     if (local_file_ != nullptr) return Status::OK();
@@ -352,7 +342,8 @@ void DiskIoMgr::ScanRange::Close() {
         external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
       }
 
-      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_);
+      // Destroy the file handle and remove it from the cache.
+      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, 
true);
       exclusive_hdfs_fh_ = nullptr;
       closed_file = true;
     }
@@ -434,7 +425,7 @@ Status DiskIoMgr::ScanRange::Read(
       hdfs_file = exclusive_hdfs_fh_->file();
     } else {
       borrowed_hdfs_fh = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
-          mtime(), reader_);
+          mtime(), reader_, false);
       if (borrowed_hdfs_fh == nullptr) {
         return Status(GetHdfsErrorMsg("Failed to open HDFS file ", file_));
       }
@@ -489,17 +480,17 @@ Status DiskIoMgr::ScanRange::Read(
         // Do not retry:
         // - if read was successful (current_bytes_read != -1)
         // - or if already retried once
+        // - or if this not using a borrowed file handle
         DCHECK_LE(num_retries, 1);
-        if (current_bytes_read != -1 || num_retries == 1) {
+        if (current_bytes_read != -1 || borrowed_hdfs_fh == nullptr ||
+            num_retries == 1) {
           break;
         }
         // The error may be due to a bad file handle. Reopen the file handle 
and retry.
         ++num_retries;
-        HdfsFileHandle** fh_to_refresh =
-            (borrowed_hdfs_fh != nullptr ? &borrowed_hdfs_fh : 
&exclusive_hdfs_fh_);
         RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(),
-            mtime(), fh_to_refresh));
-        hdfs_file = (*fh_to_refresh)->file();
+            mtime(), &borrowed_hdfs_fh));
+        hdfs_file = borrowed_hdfs_fh->file();
       }
       if (!status.ok()) break;
       if (current_bytes_read == 0) {
@@ -514,7 +505,7 @@ Status DiskIoMgr::ScanRange::Read(
     }
 
     if (borrowed_hdfs_fh != nullptr) {
-      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), borrowed_hdfs_fh);
+      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), borrowed_hdfs_fh, 
false);
     }
     if (!status.ok()) return status;
   } else {
@@ -558,19 +549,10 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* 
read_succeeded) {
 
     DCHECK(exclusive_hdfs_fh_ != nullptr);
     DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER);
-    int num_retries = 0;
-    while (true) {
-      cached_buffer_ = hadoopReadZero(exclusive_hdfs_fh_->file(),
-          io_mgr_->cached_read_options_, len());
-      if (cached_buffer_ != nullptr) {
-        external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
-        break;
-      }
-      DCHECK_LE(num_retries, 1);
-      if (num_retries == 1) break;
-      ++num_retries;
-      RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(), 
mtime(),
-          &exclusive_hdfs_fh_));
+    cached_buffer_ =
+      hadoopReadZero(exclusive_hdfs_fh_->file(), 
io_mgr_->cached_read_options_, len());
+    if (cached_buffer_ != nullptr) {
+      external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
     }
   }
   // Data was not cached, caller will fall back to normal read path.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/53287df0/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index bfb6d60..d78bad3 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -1206,12 +1206,14 @@ int DiskIoMgr::AssignQueue(const char* file, int 
disk_id, bool expected_local) {
 }
 
 HdfsFileHandle* DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs,
-    std::string* fname, int64_t mtime, DiskIoRequestContext *reader) {
+    std::string* fname, int64_t mtime, DiskIoRequestContext *reader,
+    bool require_new) {
   bool cache_hit;
-  HdfsFileHandle* fh = file_handle_cache_.GetFileHandle(fs, fname, mtime, 
false,
+  HdfsFileHandle* fh = file_handle_cache_.GetFileHandle(fs, fname, mtime, 
require_new,
       &cache_hit);
-  if (!fh) return nullptr;
+  if (fh == nullptr) return nullptr;
   if (cache_hit) {
+    DCHECK(!require_new);
     ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(1L);
     ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT->Increment(1L);
     reader->cached_file_handles_hit_count_.Add(1L);
@@ -1223,19 +1225,21 @@ HdfsFileHandle* 
DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs,
   return fh;
 }
 
-void DiskIoMgr::ReleaseCachedHdfsFileHandle(std::string* fname, 
HdfsFileHandle* fid) {
-  file_handle_cache_.ReleaseFileHandle(fname, fid, false);
+void DiskIoMgr::ReleaseCachedHdfsFileHandle(std::string* fname, 
HdfsFileHandle* fid,
+    bool destroy_handle) {
+  file_handle_cache_.ReleaseFileHandle(fname, fid, destroy_handle);
 }
 
 Status DiskIoMgr::ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* 
fname,
     int64_t mtime, HdfsFileHandle** fid) {
-  bool dummy;
+  bool cache_hit;
   file_handle_cache_.ReleaseFileHandle(fname, *fid, true);
   // The old handle has been destroyed, so *fid must be overwritten before 
returning.
   *fid = file_handle_cache_.GetFileHandle(fs, fname, mtime, true,
-      &dummy);
+      &cache_hit);
   if (*fid == nullptr) {
     return Status(GetHdfsErrorMsg("Failed to open HDFS file ", fname->data()));
   }
+  DCHECK(!cache_hit);
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/53287df0/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index 9621c92..a7fcbad 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -452,7 +452,8 @@ class DiskIoMgr : public CacheLineAligned {
     /// If 'use_file_handle_cache' is false or this is a remote hdfs file or 
this is
     /// a local OS file, Open() will maintain a file handle on the scan range 
for
     /// exclusive use by this scan range. An exclusive hdfs file handle still 
comes
-    /// from the cache, but it is held for the entire duration of a scan 
range's lifetime.
+    /// from the cache, but it is a newly opened file handle that is held for 
the
+    /// entire duration of a scan range's lifetime and destroyed in Close().
     /// All local OS files are opened using normal OS file APIs.
     Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT;
 
@@ -512,8 +513,8 @@ class DiskIoMgr : public CacheLineAligned {
     /// 2. The scan range is using hdfs caching.
     /// -OR-
     /// 3. The hdfs file is expected to be remote (expected_local_ == false)
-    /// In each case, the scan range gets a file handle from the file handle 
cache
-    /// at Open() and holds it exclusively until Close() is called.
+    /// In each case, the scan range gets a new file handle from the file 
handle cache
+    /// at Open(), holds it exclusively, and destroys it in Close().
     union {
       FILE* local_file_ = nullptr;
       HdfsFileHandle* exclusive_hdfs_fh_;
@@ -773,14 +774,19 @@ class DiskIoMgr : public CacheLineAligned {
   bool Validate() const;
 
   /// Given a FS handle, name and last modified time of the file, gets an 
HdfsFileHandle
-  /// from the file handle cache. On success, records statistics about whether 
this was
-  /// a cache hit or miss in the `reader` as well as at the system level. In 
case of an
+  /// from the file handle cache. If 'require_new_handle' is true, the cache 
will open
+  /// a fresh file handle. On success, records statistics about whether this 
was
+  /// a cache hit or miss in the 'reader' as well as at the system level. In 
case of an
   /// error returns nullptr.
   HdfsFileHandle* GetCachedHdfsFileHandle(const hdfsFS& fs,
-      std::string* fname, int64_t mtime, DiskIoRequestContext *reader);
+      std::string* fname, int64_t mtime, DiskIoRequestContext *reader,
+      bool require_new_handle);
 
   /// Releases a file handle back to the file handle cache when it is no 
longer in use.
-  void ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid);
+  /// If 'destroy_handle' is true, the file handle cache will close the file 
handle
+  /// immediately.
+  void ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid,
+      bool destroy_handle);
 
   /// Reopens a file handle by destroying the file handle and getting a fresh
   /// file handle from the cache. Returns an error if the file could not be 
reopened.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/53287df0/tests/common/skip.py
----------------------------------------------------------------------
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 77ccc1e..a34d2bc 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -85,9 +85,6 @@ class SkipIf:
   not_hdfs = pytest.mark.skipif(not IS_HDFS, reason="HDFS Filesystem needed")
   no_secondary_fs = pytest.mark.skipif(not SECONDARY_FILESYSTEM,
       reason="Secondary filesystem needed")
-  no_file_handle_caching = pytest.mark.skipif(IS_S3 or IS_ADLS or IS_ISILON or 
\
-      pytest.config.option.testing_remote_cluster,
-      reason="File handle caching needed")
 
 class SkipIfIsilon:
   caching = pytest.mark.skipif(IS_ISILON, reason="SET CACHED not implemented 
for Isilon")
@@ -126,6 +123,8 @@ class SkipIfLocal:
       reason="Multiple impalads are not supported when using local file 
system")
   parquet_file_size = pytest.mark.skipif(IS_LOCAL,
       reason="Parquet block size incorrectly determined")
+  hdfs_fd_caching = pytest.mark.skipif(IS_LOCAL,
+      reason="HDFS file handle caching not supported for local non-HDFS files")
 
   # These ones need test infra work to re-enable.
   hbase = pytest.mark.skipif(IS_LOCAL,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/53287df0/tests/custom_cluster/test_hdfs_fd_caching.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_hdfs_fd_caching.py 
b/tests/custom_cluster/test_hdfs_fd_caching.py
index b1e8f30..9dcd3bf 100644
--- a/tests/custom_cluster/test_hdfs_fd_caching.py
+++ b/tests/custom_cluster/test_hdfs_fd_caching.py
@@ -18,9 +18,13 @@
 import pytest
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.skip import SkipIf
+from tests.common.skip import SkipIfLocal
+from tests.util.filesystem_utils import (
+    IS_ISILON,
+    IS_S3,
+    IS_ADLS)
 
[email protected]_file_handle_caching
[email protected]_fd_caching
 class TestHdfsFdCaching(CustomClusterTestSuite):
   """Tests that if HDFS file handle caching is enabled, file handles are 
actually cached
   and the associated metrics return valid results. In addition, tests that the 
upper bound
@@ -55,24 +59,32 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
     super(TestHdfsFdCaching, self).teardown_method(method)
     self.client.execute("drop database if exists cachefd cascade")
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-      impalad_args="--max_cached_file_handles=16",
-      catalogd_args="--load_catalog_in_background=false")
-  def test_scan_does_cache_fd(self, vector):
-    """Tests that an hdfs scan will lead to caching HDFS file descriptors."""
-
-    # Maximum number of file handles cached
-    assert self.max_cached_handles() <= 16
-    # The table has one file, so there should be one more handle cached after 
the
-    # first select.
-    num_handles_before = self.cached_handles()
+  def run_fd_caching_test(self, vector, caching_expected, cache_capacity):
+    """
+    Tests that HDFS file handles are cached as expected. This is used both
+    for the positive and negative test cases. If caching_expected is true,
+    this verifies that the cache adheres to the specified capacity. Also,
+    repeated queries across the same files reuse the file handles.
+    If caching_expected is false, it verifies that the cache does not
+    change in size while running queries.
+    """
+
+    # Maximum number of file handles cached (applies whether caching expected
+    # or not)
+    assert self.max_cached_handles() <= cache_capacity
+
+    num_handles_start = self.cached_handles()
+    # The table has one file. If caching is expected, there should be one more
+    # handle cached after the first select. If caching is not expected, the
+    # number of handles should not change from the initial number.
     self.execute_query("select * from cachefd.simple", vector=vector)
     num_handles_after = self.cached_handles()
-    assert self.max_cached_handles() <= 16
+    assert self.max_cached_handles() <= cache_capacity
 
-    # Should have one more file handle
-    assert num_handles_after == (num_handles_before + 1)
+    if caching_expected:
+      assert num_handles_after == (num_handles_start + 1)
+    else:
+      assert num_handles_after == num_handles_start
 
     # No open handles if scanning is finished
     assert self.outstanding_handles() == 0
@@ -81,19 +93,47 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
     for x in range(10):
       self.execute_query("select * from cachefd.simple", vector=vector)
       assert self.cached_handles() == num_handles_after
-      assert self.max_cached_handles() <= 16
+      assert self.max_cached_handles() <= cache_capacity
       assert self.outstanding_handles() == 0
 
     # Create more files. This means there are more files than the cache size.
     # The cache size should still be enforced.
-    self.create_n_files(100)
+    self.create_n_files(cache_capacity + 100)
 
     # Read all the files of the table and make sure no FD leak
     for x in range(10):
       self.execute_query("select count(*) from cachefd.simple;", vector=vector)
       assert self.max_cached_handles() <= 16
+      if not caching_expected:
+        assert self.cached_handles() == num_handles_start
     assert self.outstanding_handles() == 0
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--max_cached_file_handles=16",
+      catalogd_args="--load_catalog_in_background=false")
+  def test_caching_enabled(self, vector):
+    """Test of the HDFS file handle cache with the parameter specified"""
+    cache_capacity = 16
+
+    # Caching only applies to local HDFS files. If this is local HDFS, then 
verify
+    # that caching works. Otherwise, verify that file handles are not cached.
+    if (IS_S3 or IS_ADLS or IS_ISILON or 
pytest.config.option.testing_remote_cluster):
+      caching_expected = False
+    else:
+      caching_expected = True
+    self.run_fd_caching_test(vector, caching_expected, cache_capacity)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--max_cached_file_handles=0",
+      catalogd_args="--load_catalog_in_background=false")
+  def test_caching_disabled_by_param(self, vector):
+    """Test that the HDFS file handle cache is disabled when the parameter is 
zero"""
+    cache_capacity = 0
+    caching_expected = False
+    self.run_fd_caching_test(vector, caching_expected, cache_capacity)
+
   def cached_handles(self):
     return self.get_agg_metric("impala-server.io.mgr.num-cached-file-handles")
 

Reply via email to