Repository: incubator-impala Updated Branches: refs/heads/master 330558e46 -> 235651668
IMPALA-5488: Fix handling of exclusive HDFS file handles This change fixes three issues: 1. File handle caching is expected to be disabled for remote files (using exclusive HDFS file handles), however the file handles are still being cached. 2. The retry logic for exclusive file handles is broken, leading the number of open files to be incorrect. 3. There is no test coverage for disabling the file handle cache. To fix issue #1, when a scan range is requesting an exclusive file handle from the cache, it will always request a newly opened file handle. It also will destroy the file handle when the scan range is closed. To fix issue #2, exclusive file handles will no longer retry IOs. Since the exclusive file handle is always a fresh file handle, it will never have a bad file handle from the cache. This returns the logic to its state before IMPALA-4623 in these cases. If a file handle is borrowed from the cache, then the code will continue to retry once with a fresh handle. To fix issue #3, custom_cluster/test_hdfs_fd_caching.py now does both positive and negative tests for the file handle cache. It verifies that setting max_cached_file_handles to zero disables caching. It also verifies that caching is disabled on remote files. (This change will resolve IMPALA-5390.) Change-Id: I4c03696984285cc9ce463edd969c5149cd83a861 Reviewed-on: http://gerrit.cloudera.org:8080/7181 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/53287df0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/53287df0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/53287df0 Branch: refs/heads/master Commit: 53287df0a11ba43ff12067267cf2e3b0ffad075c Parents: 330558e Author: Joe McDonnell <[email protected]> Authored: Wed Jun 14 11:29:38 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Jun 21 09:42:34 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/disk-io-mgr-scan-range.cc | 60 ++++++----------- be/src/runtime/disk-io-mgr.cc | 18 ++++-- be/src/runtime/disk-io-mgr.h | 20 ++++-- tests/common/skip.py | 5 +- tests/custom_cluster/test_hdfs_fd_caching.py | 78 +++++++++++++++++------ 5 files changed, 106 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/53287df0/be/src/runtime/disk-io-mgr-scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc index d909b94..113f15b 100644 --- a/be/src/runtime/disk-io-mgr-scan-range.cc +++ b/be/src/runtime/disk-io-mgr-scan-range.cc @@ -289,29 +289,19 @@ Status DiskIoMgr::ScanRange::Open(bool use_file_handle_cache) { // so s3 and remote filesystems should obtain an exclusive file handle // for each scan range. if (use_file_handle_cache && expected_local_) return Status::OK(); + // Get a new exclusive file handle. exclusive_hdfs_fh_ = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(), - mtime(), reader_); + mtime(), reader_, true); if (exclusive_hdfs_fh_ == nullptr) { return Status(GetHdfsErrorMsg("Failed to open HDFS file ", file_)); } - int num_retries = 0; - while (true) { - if (hdfsSeek(fs_, exclusive_hdfs_fh_->file(), offset_) == 0) break; - // Seek failed. If already retried once, return error. Otherwise, destroy - // the current file handle and retry with a new handle. - DCHECK_LE(num_retries, 1); - if (num_retries == 1) { - io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_); - exclusive_hdfs_fh_ = nullptr; - string error_msg = GetHdfsErrorMsg(""); - stringstream ss; - ss << "Error seeking to " << offset_ << " in file: " << file_ << " " << error_msg; - return Status(ss.str()); - } - ++num_retries; - RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(), - mtime(), &exclusive_hdfs_fh_)); + if (hdfsSeek(fs_, exclusive_hdfs_fh_->file(), offset_) != 0) { + // Destroy the file handle and remove it from the cache. + io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, true); + exclusive_hdfs_fh_ = nullptr; + return Status(Substitute("Error seeking to $0 in file: $1 $2", offset_, file_, + GetHdfsErrorMsg(""))); } } else { if (local_file_ != nullptr) return Status::OK(); @@ -352,7 +342,8 @@ void DiskIoMgr::ScanRange::Close() { external_buffer_tag_ = ExternalBufferTag::NO_BUFFER; } - io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_); + // Destroy the file handle and remove it from the cache. + io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, true); exclusive_hdfs_fh_ = nullptr; closed_file = true; } @@ -434,7 +425,7 @@ Status DiskIoMgr::ScanRange::Read( hdfs_file = exclusive_hdfs_fh_->file(); } else { borrowed_hdfs_fh = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(), - mtime(), reader_); + mtime(), reader_, false); if (borrowed_hdfs_fh == nullptr) { return Status(GetHdfsErrorMsg("Failed to open HDFS file ", file_)); } @@ -489,17 +480,17 @@ Status DiskIoMgr::ScanRange::Read( // Do not retry: // - if read was successful (current_bytes_read != -1) // - or if already retried once + // - or if this not using a borrowed file handle DCHECK_LE(num_retries, 1); - if (current_bytes_read != -1 || num_retries == 1) { + if (current_bytes_read != -1 || borrowed_hdfs_fh == nullptr || + num_retries == 1) { break; } // The error may be due to a bad file handle. Reopen the file handle and retry. ++num_retries; - HdfsFileHandle** fh_to_refresh = - (borrowed_hdfs_fh != nullptr ? &borrowed_hdfs_fh : &exclusive_hdfs_fh_); RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(), - mtime(), fh_to_refresh)); - hdfs_file = (*fh_to_refresh)->file(); + mtime(), &borrowed_hdfs_fh)); + hdfs_file = borrowed_hdfs_fh->file(); } if (!status.ok()) break; if (current_bytes_read == 0) { @@ -514,7 +505,7 @@ Status DiskIoMgr::ScanRange::Read( } if (borrowed_hdfs_fh != nullptr) { - io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), borrowed_hdfs_fh); + io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), borrowed_hdfs_fh, false); } if (!status.ok()) return status; } else { @@ -558,19 +549,10 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) { DCHECK(exclusive_hdfs_fh_ != nullptr); DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER); - int num_retries = 0; - while (true) { - cached_buffer_ = hadoopReadZero(exclusive_hdfs_fh_->file(), - io_mgr_->cached_read_options_, len()); - if (cached_buffer_ != nullptr) { - external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER; - break; - } - DCHECK_LE(num_retries, 1); - if (num_retries == 1) break; - ++num_retries; - RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(), mtime(), - &exclusive_hdfs_fh_)); + cached_buffer_ = + hadoopReadZero(exclusive_hdfs_fh_->file(), io_mgr_->cached_read_options_, len()); + if (cached_buffer_ != nullptr) { + external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER; } } // Data was not cached, caller will fall back to normal read path. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/53287df0/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index bfb6d60..d78bad3 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -1206,12 +1206,14 @@ int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) { } HdfsFileHandle* DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs, - std::string* fname, int64_t mtime, DiskIoRequestContext *reader) { + std::string* fname, int64_t mtime, DiskIoRequestContext *reader, + bool require_new) { bool cache_hit; - HdfsFileHandle* fh = file_handle_cache_.GetFileHandle(fs, fname, mtime, false, + HdfsFileHandle* fh = file_handle_cache_.GetFileHandle(fs, fname, mtime, require_new, &cache_hit); - if (!fh) return nullptr; + if (fh == nullptr) return nullptr; if (cache_hit) { + DCHECK(!require_new); ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(1L); ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT->Increment(1L); reader->cached_file_handles_hit_count_.Add(1L); @@ -1223,19 +1225,21 @@ HdfsFileHandle* DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs, return fh; } -void DiskIoMgr::ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid) { - file_handle_cache_.ReleaseFileHandle(fname, fid, false); +void DiskIoMgr::ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid, + bool destroy_handle) { + file_handle_cache_.ReleaseFileHandle(fname, fid, destroy_handle); } Status DiskIoMgr::ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime, HdfsFileHandle** fid) { - bool dummy; + bool cache_hit; file_handle_cache_.ReleaseFileHandle(fname, *fid, true); // The old handle has been destroyed, so *fid must be overwritten before returning. *fid = file_handle_cache_.GetFileHandle(fs, fname, mtime, true, - &dummy); + &cache_hit); if (*fid == nullptr) { return Status(GetHdfsErrorMsg("Failed to open HDFS file ", fname->data())); } + DCHECK(!cache_hit); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/53287df0/be/src/runtime/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h index 9621c92..a7fcbad 100644 --- a/be/src/runtime/disk-io-mgr.h +++ b/be/src/runtime/disk-io-mgr.h @@ -452,7 +452,8 @@ class DiskIoMgr : public CacheLineAligned { /// If 'use_file_handle_cache' is false or this is a remote hdfs file or this is /// a local OS file, Open() will maintain a file handle on the scan range for /// exclusive use by this scan range. An exclusive hdfs file handle still comes - /// from the cache, but it is held for the entire duration of a scan range's lifetime. + /// from the cache, but it is a newly opened file handle that is held for the + /// entire duration of a scan range's lifetime and destroyed in Close(). /// All local OS files are opened using normal OS file APIs. Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT; @@ -512,8 +513,8 @@ class DiskIoMgr : public CacheLineAligned { /// 2. The scan range is using hdfs caching. /// -OR- /// 3. The hdfs file is expected to be remote (expected_local_ == false) - /// In each case, the scan range gets a file handle from the file handle cache - /// at Open() and holds it exclusively until Close() is called. + /// In each case, the scan range gets a new file handle from the file handle cache + /// at Open(), holds it exclusively, and destroys it in Close(). union { FILE* local_file_ = nullptr; HdfsFileHandle* exclusive_hdfs_fh_; @@ -773,14 +774,19 @@ class DiskIoMgr : public CacheLineAligned { bool Validate() const; /// Given a FS handle, name and last modified time of the file, gets an HdfsFileHandle - /// from the file handle cache. On success, records statistics about whether this was - /// a cache hit or miss in the `reader` as well as at the system level. In case of an + /// from the file handle cache. If 'require_new_handle' is true, the cache will open + /// a fresh file handle. On success, records statistics about whether this was + /// a cache hit or miss in the 'reader' as well as at the system level. In case of an /// error returns nullptr. HdfsFileHandle* GetCachedHdfsFileHandle(const hdfsFS& fs, - std::string* fname, int64_t mtime, DiskIoRequestContext *reader); + std::string* fname, int64_t mtime, DiskIoRequestContext *reader, + bool require_new_handle); /// Releases a file handle back to the file handle cache when it is no longer in use. - void ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid); + /// If 'destroy_handle' is true, the file handle cache will close the file handle + /// immediately. + void ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid, + bool destroy_handle); /// Reopens a file handle by destroying the file handle and getting a fresh /// file handle from the cache. Returns an error if the file could not be reopened. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/53287df0/tests/common/skip.py ---------------------------------------------------------------------- diff --git a/tests/common/skip.py b/tests/common/skip.py index 77ccc1e..a34d2bc 100644 --- a/tests/common/skip.py +++ b/tests/common/skip.py @@ -85,9 +85,6 @@ class SkipIf: not_hdfs = pytest.mark.skipif(not IS_HDFS, reason="HDFS Filesystem needed") no_secondary_fs = pytest.mark.skipif(not SECONDARY_FILESYSTEM, reason="Secondary filesystem needed") - no_file_handle_caching = pytest.mark.skipif(IS_S3 or IS_ADLS or IS_ISILON or \ - pytest.config.option.testing_remote_cluster, - reason="File handle caching needed") class SkipIfIsilon: caching = pytest.mark.skipif(IS_ISILON, reason="SET CACHED not implemented for Isilon") @@ -126,6 +123,8 @@ class SkipIfLocal: reason="Multiple impalads are not supported when using local file system") parquet_file_size = pytest.mark.skipif(IS_LOCAL, reason="Parquet block size incorrectly determined") + hdfs_fd_caching = pytest.mark.skipif(IS_LOCAL, + reason="HDFS file handle caching not supported for local non-HDFS files") # These ones need test infra work to re-enable. hbase = pytest.mark.skipif(IS_LOCAL, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/53287df0/tests/custom_cluster/test_hdfs_fd_caching.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_hdfs_fd_caching.py b/tests/custom_cluster/test_hdfs_fd_caching.py index b1e8f30..9dcd3bf 100644 --- a/tests/custom_cluster/test_hdfs_fd_caching.py +++ b/tests/custom_cluster/test_hdfs_fd_caching.py @@ -18,9 +18,13 @@ import pytest from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.skip import SkipIf +from tests.common.skip import SkipIfLocal +from tests.util.filesystem_utils import ( + IS_ISILON, + IS_S3, + IS_ADLS) [email protected]_file_handle_caching [email protected]_fd_caching class TestHdfsFdCaching(CustomClusterTestSuite): """Tests that if HDFS file handle caching is enabled, file handles are actually cached and the associated metrics return valid results. In addition, tests that the upper bound @@ -55,24 +59,32 @@ class TestHdfsFdCaching(CustomClusterTestSuite): super(TestHdfsFdCaching, self).teardown_method(method) self.client.execute("drop database if exists cachefd cascade") - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args="--max_cached_file_handles=16", - catalogd_args="--load_catalog_in_background=false") - def test_scan_does_cache_fd(self, vector): - """Tests that an hdfs scan will lead to caching HDFS file descriptors.""" - - # Maximum number of file handles cached - assert self.max_cached_handles() <= 16 - # The table has one file, so there should be one more handle cached after the - # first select. - num_handles_before = self.cached_handles() + def run_fd_caching_test(self, vector, caching_expected, cache_capacity): + """ + Tests that HDFS file handles are cached as expected. This is used both + for the positive and negative test cases. If caching_expected is true, + this verifies that the cache adheres to the specified capacity. Also, + repeated queries across the same files reuse the file handles. + If caching_expected is false, it verifies that the cache does not + change in size while running queries. + """ + + # Maximum number of file handles cached (applies whether caching expected + # or not) + assert self.max_cached_handles() <= cache_capacity + + num_handles_start = self.cached_handles() + # The table has one file. If caching is expected, there should be one more + # handle cached after the first select. If caching is not expected, the + # number of handles should not change from the initial number. self.execute_query("select * from cachefd.simple", vector=vector) num_handles_after = self.cached_handles() - assert self.max_cached_handles() <= 16 + assert self.max_cached_handles() <= cache_capacity - # Should have one more file handle - assert num_handles_after == (num_handles_before + 1) + if caching_expected: + assert num_handles_after == (num_handles_start + 1) + else: + assert num_handles_after == num_handles_start # No open handles if scanning is finished assert self.outstanding_handles() == 0 @@ -81,19 +93,47 @@ class TestHdfsFdCaching(CustomClusterTestSuite): for x in range(10): self.execute_query("select * from cachefd.simple", vector=vector) assert self.cached_handles() == num_handles_after - assert self.max_cached_handles() <= 16 + assert self.max_cached_handles() <= cache_capacity assert self.outstanding_handles() == 0 # Create more files. This means there are more files than the cache size. # The cache size should still be enforced. - self.create_n_files(100) + self.create_n_files(cache_capacity + 100) # Read all the files of the table and make sure no FD leak for x in range(10): self.execute_query("select count(*) from cachefd.simple;", vector=vector) assert self.max_cached_handles() <= 16 + if not caching_expected: + assert self.cached_handles() == num_handles_start assert self.outstanding_handles() == 0 + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--max_cached_file_handles=16", + catalogd_args="--load_catalog_in_background=false") + def test_caching_enabled(self, vector): + """Test of the HDFS file handle cache with the parameter specified""" + cache_capacity = 16 + + # Caching only applies to local HDFS files. If this is local HDFS, then verify + # that caching works. Otherwise, verify that file handles are not cached. + if (IS_S3 or IS_ADLS or IS_ISILON or pytest.config.option.testing_remote_cluster): + caching_expected = False + else: + caching_expected = True + self.run_fd_caching_test(vector, caching_expected, cache_capacity) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--max_cached_file_handles=0", + catalogd_args="--load_catalog_in_background=false") + def test_caching_disabled_by_param(self, vector): + """Test that the HDFS file handle cache is disabled when the parameter is zero""" + cache_capacity = 0 + caching_expected = False + self.run_fd_caching_test(vector, caching_expected, cache_capacity) + def cached_handles(self): return self.get_agg_metric("impala-server.io.mgr.num-cached-file-handles")
