IMPALA-5352: Age out unused file handles from the cache

Currently, a file handle in the file handle cache will
only be evicted if the cache reaches its capacity. This
means that file handles can be retained for an indefinite
amount of time. This is true even for files that have
been deleted, replaced, or modified. Since a file handle
maintains a file descriptor for local files, this can
prevent the disk space from being freed. Additionally,
unused file handles are wasted memory.

This adds code to evict file handles that have been
unused for longer than a specified threshold. A thread
periodically checks the file handle cache to see if
any file handle should be evicted. The threshold is
specified by 'unused_file_handle_timeout_sec'; it
defaults to 6 hours.

This adds a test to custom_cluster/test_hdfs_fd_caching.py
to verify the eviction behavior.

Change-Id: Iefe04b3e2e22123ecb8b3e494934c93dfb29682e
Reviewed-on: http://gerrit.cloudera.org:8080/7640
Reviewed-by: Matthew Jacobs <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/57dae5ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/57dae5ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/57dae5ec

Branch: refs/heads/master
Commit: 57dae5ec7e927a1c836f6bf0a1cbe5a81541327e
Parents: b6c0297
Author: Joe McDonnell <[email protected]>
Authored: Thu Jun 29 13:08:58 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Wed Aug 23 04:44:21 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/disk-io-mgr-handle-cache.h       | 54 +++++++++---
 .../runtime/disk-io-mgr-handle-cache.inline.h   | 87 +++++++++++++++++---
 be/src/runtime/disk-io-mgr.cc                   | 20 ++++-
 tests/custom_cluster/test_hdfs_fd_caching.py    | 42 ++++++++--
 4 files changed, 172 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57dae5ec/be/src/runtime/disk-io-mgr-handle-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.h 
b/be/src/runtime/disk-io-mgr-handle-cache.h
index ddfb934..96add9f 100644
--- a/be/src/runtime/disk-io-mgr-handle-cache.h
+++ b/be/src/runtime/disk-io-mgr-handle-cache.h
@@ -29,6 +29,7 @@
 #include "util/aligned-new.h"
 #include "util/impalad-metrics.h"
 #include "util/spinlock.h"
+#include "util/thread.h"
 
 namespace impala {
 
@@ -73,19 +74,29 @@ class HdfsFileHandle {
 /// of concurrent connections, and file handles in the cache would be counted 
towards
 /// that limit.
 ///
-/// TODO: If there is a file handle in the cache and the underlying file is 
deleted,
+/// If there is a file handle in the cache and the underlying file is deleted,
 /// the file handle might keep the file from being deleted at the OS level. 
This can
-/// take up disk space and impact correctness. The cache should check 
periodically to
-/// evict file handles older than some configurable threshold. The cache 
should also
-/// evict file handles more aggressively if the file handle's mtime is older 
than the
-/// file's current mtime.
+/// take up disk space and impact correctness. To avoid this, the cache will 
evict any
+/// file handle that has been unused for longer than threshold specified by
+/// `unused_handle_timeout_secs`. Eviction is disabled when the threshold is 0.
+///
+/// TODO: The cache should also evict file handles more aggressively if the 
file handle's
+/// mtime is older than the file's current mtime.
 template <size_t NUM_PARTITIONS>
 class FileHandleCache {
  public:
   /// Instantiates the cache with `capacity` split evenly across NUM_PARTITIONS
   /// partitions. If the capacity does not split evenly, then the capacity is 
rounded
-  /// up.
-  FileHandleCache(size_t capacity);
+  /// up. The cache will age out any file handle that is unused for
+  /// `unused_handle_timeout_secs` seconds. Age out is disabled if this is set 
to zero.
+  FileHandleCache(size_t capacity, uint64_t unused_handle_timeout_secs);
+
+  /// Destructor is only called for backend tests
+  ~FileHandleCache();
+
+  /// Starts up a thread that monitors the age of file handles and evicts any 
that
+  /// exceed the limit.
+  void Init();
 
   /// Get a file handle from the cache for the specified filename (fname) and
   /// last modification time (mtime). This will hash the filename to determine
@@ -112,18 +123,26 @@ class FileHandleCache {
  private:
   struct FileHandleEntry;
   typedef std::multimap<std::string, FileHandleEntry> MapType;
-  typedef std::list<typename MapType::iterator> LruListType;
+
+  struct LruListEntry {
+    LruListEntry(typename MapType::iterator map_entry_in);
+    typename MapType::iterator map_entry;
+    uint64_t timestamp_seconds;
+  };
+  typedef std::list<LruListEntry> LruListType;
 
   struct FileHandleEntry {
-    FileHandleEntry(HdfsFileHandle *fh_in) : fh(fh_in) {}
+    FileHandleEntry(HdfsFileHandle* fh_in, LruListType& lru_list)
+    : fh(fh_in), lru_entry(lru_list.end()) {}
     std::unique_ptr<HdfsFileHandle> fh;
 
     /// in_use is true for a file handle checked out via GetFileHandle() that 
has not
     /// been returned via ReleaseFileHandle().
     bool in_use = false;
 
-    /// Iterator to this element's location in the LRU list. This only has a 
valid value
-    /// if in_use is false.
+    /// Iterator to this element's location in the LRU list. This only points 
to a
+    /// valid location when in_use is true. For error-checking, this is set to
+    /// lru_list.end() when in_use is false.
     typename LruListType::iterator lru_entry;
   };
 
@@ -151,11 +170,24 @@ class FileHandleCache {
     size_t size;
   };
 
+  /// Periodic check to evict unused file handles. Only executed by 
eviction_thread_.
+  void EvictHandlesLoop();
+  static const int64_t EVICT_HANDLES_PERIOD_MS = 1000;
+
   /// If the partition is above its capacity, evict the oldest unused file 
handles to
   /// enforce the capacity.
   void EvictHandles(FileHandleCachePartition& p);
 
   std::array<FileHandleCachePartition, NUM_PARTITIONS> cache_partitions_;
+
+  /// Maximum time before an unused file handle is aged out of the cache.
+  /// Aging out is disabled if this is set to 0.
+  uint64_t unused_handle_timeout_secs_;
+
+  /// Thread to check for unused file handles to evict. This thread will exit 
when
+  /// the shut_down_promise_ is set.
+  std::unique_ptr<Thread> eviction_thread_;
+  Promise<bool> shut_down_promise_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57dae5ec/be/src/runtime/disk-io-mgr-handle-cache.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.inline.h 
b/be/src/runtime/disk-io-mgr-handle-cache.inline.h
index ed8ed63..76bef95 100644
--- a/be/src/runtime/disk-io-mgr-handle-cache.inline.h
+++ b/be/src/runtime/disk-io-mgr-handle-cache.inline.h
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <tuple>
+
 #include "runtime/disk-io-mgr-handle-cache.h"
 #include "util/hash-util.h"
+#include "util/time.h"
 
 #ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H
 #define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H
@@ -41,7 +44,9 @@ HdfsFileHandle::~HdfsFileHandle() {
 }
 
 template <size_t NUM_PARTITIONS>
-FileHandleCache<NUM_PARTITIONS>::FileHandleCache(size_t capacity) {
+  FileHandleCache<NUM_PARTITIONS>::FileHandleCache(size_t capacity,
+      uint64_t unused_handle_timeout_secs)
+  : unused_handle_timeout_secs_(unused_handle_timeout_secs) {
   DCHECK_GT(NUM_PARTITIONS, 0);
   size_t remainder = capacity % NUM_PARTITIONS;
   size_t base_capacity = capacity / NUM_PARTITIONS;
@@ -53,6 +58,23 @@ FileHandleCache<NUM_PARTITIONS>::FileHandleCache(size_t 
capacity) {
 }
 
 template <size_t NUM_PARTITIONS>
+FileHandleCache<NUM_PARTITIONS>::LruListEntry::LruListEntry(
+    typename MapType::iterator map_entry_in)
+     : map_entry(map_entry_in), timestamp_seconds(MonotonicSeconds()) {}
+
+template <size_t NUM_PARTITIONS>
+FileHandleCache<NUM_PARTITIONS>::~FileHandleCache() {
+  shut_down_promise_.Set(true);
+  if (eviction_thread_ != nullptr) eviction_thread_->Join();
+}
+
+template <size_t NUM_PARTITIONS>
+void FileHandleCache<NUM_PARTITIONS>::Init() {
+  eviction_thread_.reset(new Thread("disk-io-mgr-handle-cache", "File Handle 
Timeout",
+      &FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop, this));
+}
+
+template <size_t NUM_PARTITIONS>
 HdfsFileHandle* FileHandleCache<NUM_PARTITIONS>::GetFileHandle(
     const hdfsFS& fs, std::string* fname, int64_t mtime, bool 
require_new_handle,
     bool* cache_hit) {
@@ -70,8 +92,13 @@ HdfsFileHandle* 
FileHandleCache<NUM_PARTITIONS>::GetFileHandle(
     while (range.first != range.second) {
       FileHandleEntry* elem = &range.first->second;
       if (!elem->in_use && elem->fh->mtime() == mtime) {
-        // remove from lru
+        // This element is currently in the lru_list, which means that 
lru_entry must
+        // be an iterator pointing into the lru_list.
+        DCHECK(elem->lru_entry != p.lru_list.end());
+        // Remove the element from the lru_list and designate that it is not on
+        // the lru_list by resetting its iterator to point to the end of the 
list.
         p.lru_list.erase(elem->lru_entry);
+        elem->lru_entry = p.lru_list.end();
         ret_elem = elem;
         *cache_hit = true;
         break;
@@ -83,14 +110,15 @@ HdfsFileHandle* 
FileHandleCache<NUM_PARTITIONS>::GetFileHandle(
   // There was no entry that was free or caller asked for a new handle
   if (!ret_elem) {
     *cache_hit = false;
-    // Create a new entry and put it in the map
+    // Create a new entry and move it into the map
     HdfsFileHandle* new_fh = new HdfsFileHandle(fs, fname->data(), mtime);
     if (!new_fh->ok()) {
       delete new_fh;
       return nullptr;
     }
-    typename MapType::iterator new_it = p.cache.emplace_hint(range.second, 
*fname,
-        new_fh);
+    FileHandleEntry entry(new_fh, p.lru_list);
+    typename MapType::iterator new_it = p.cache.emplace_hint(range.second,
+        *fname, std::move(entry));
     ret_elem = &new_it->second;
     ++p.size;
     if (p.size > p.capacity) EvictHandles(p);
@@ -139,7 +167,15 @@ void 
FileHandleCache<NUM_PARTITIONS>::ReleaseFileHandle(std::string* fname,
   // If unbuffering is not supported, then hdfsUnbufferFile() will return a 
non-zero
   // return code, and we close the file handle and remove it from the cache.
   if (hdfsUnbufferFile(release_elem->fh->file()) == 0) {
-    release_elem->lru_entry = p.lru_list.insert(p.lru_list.end(), release_it);
+    // This FileHandleEntry must not be in the lru list already, because it was
+    // in use. Verify this by checking that the lru_entry is pointing to the 
end,
+    // which cannot be true for any element in the lru list.
+    DCHECK(release_elem->lru_entry == p.lru_list.end());
+    // Add this to the lru list, establishing links in both directions.
+    // The FileHandleEntry has an iterator to the LruListEntry and the
+    // LruListEntry has an iterator to the location of the FileHandleEntry in
+    // the cache.
+    release_elem->lru_entry = p.lru_list.emplace(p.lru_list.end(), release_it);
     if (p.size > p.capacity) EvictHandles(p);
   } else {
     VLOG_FILE << "FS does not support file handle unbuffering, closing file="
@@ -150,13 +186,42 @@ void 
FileHandleCache<NUM_PARTITIONS>::ReleaseFileHandle(std::string* fname,
 }
 
 template <size_t NUM_PARTITIONS>
+void FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop() {
+  while (true) {
+    for (FileHandleCachePartition& p : cache_partitions_) {
+      boost::lock_guard<SpinLock> g(p.lock);
+      EvictHandles(p);
+    }
+    // This Get() will time out until shutdown, when the promise is set.
+    bool timed_out;
+    shut_down_promise_.Get(EVICT_HANDLES_PERIOD_MS, &timed_out);
+    if (!timed_out) break;
+  }
+  // The promise must be set to true.
+  DCHECK(shut_down_promise_.IsSet());
+  DCHECK(shut_down_promise_.Get());
+}
+
+template <size_t NUM_PARTITIONS>
 void FileHandleCache<NUM_PARTITIONS>::EvictHandles(
     FileHandleCache<NUM_PARTITIONS>::FileHandleCachePartition& p) {
-  while (p.size > p.capacity) {
-    if (p.lru_list.size() == 0) break;
-    typename MapType::iterator evict_it = p.lru_list.front();
-    DCHECK(!evict_it->second.in_use);
-    p.cache.erase(evict_it);
+  uint64_t now = MonotonicSeconds();
+  uint64_t oldest_allowed_timestamp =
+      now > unused_handle_timeout_secs_ ? now - unused_handle_timeout_secs_ : 
0;
+  while (p.lru_list.size() > 0) {
+    // Peek at the oldest element
+    LruListEntry oldest_entry = p.lru_list.front();
+    typename MapType::iterator oldest_entry_map_it = oldest_entry.map_entry;
+    uint64_t oldest_entry_timestamp = oldest_entry.timestamp_seconds;
+    // If the oldest element does not need to be aged out and the cache is not 
over
+    // capacity, then we are done and there is nothing to evict.
+    if (p.size <= p.capacity && (unused_handle_timeout_secs_ == 0 ||
+        oldest_entry_timestamp >= oldest_allowed_timestamp)) {
+      return;
+    }
+    // Evict the oldest element
+    DCHECK(!oldest_entry_map_it->second.in_use);
+    p.cache.erase(oldest_entry_map_it);
     p.lru_list.pop_front();
     --p.size;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57dae5ec/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 0fdfb77..dff6ec5 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -100,6 +100,19 @@ DEFINE_int32(max_free_io_buffers, 128,
 DEFINE_uint64(max_cached_file_handles, 20000, "Maximum number of HDFS file 
handles "
     "that will be cached. Disabled if set to 0.");
 
+// The unused file handle timeout specifies how long a file handle will remain 
in the
+// cache if it is not being used. Aging out unused handles ensures that the 
cache is not
+// wasting memory on handles that aren't useful. This allows users to specify 
a larger
+// cache size, as the system will only use the memory on useful file handles.
+// Additionally, cached file handles keep an open file descriptor for local 
files.
+// If a file is deleted through HDFS, this open file descriptor can keep the 
disk space
+// from being freed. When the metadata sees that a file has been deleted, the 
file handle
+// will no longer be used by future queries. Aging out this file handle allows 
the
+// disk space to be freed in an appropriate period of time.
+DEFINE_uint64(unused_file_handle_timeout_sec, 21600, "Maximum time, in 
seconds, that an "
+    "unused HDFS file handle will remain in the file handle cache. Disabled if 
set "
+    "to 0.");
+
 // The IoMgr is able to run with a wide range of memory usage. If a query has 
memory
 // remaining less than this value, the IoMgr will stop all buffering 
regardless of the
 // current queue size.
@@ -289,7 +302,8 @@ DiskIoMgr::DiskIoMgr() :
     total_bytes_read_counter_(TUnit::BYTES),
     read_timer_(TUnit::TIME_NS),
     file_handle_cache_(min(FLAGS_max_cached_file_handles,
-        FileSystemUtil::MaxNumFileHandles())) {
+        FileSystemUtil::MaxNumFileHandles()),
+        FLAGS_unused_file_handle_timeout_sec) {
   int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, 
min_buffer_size_);
   free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
   int num_local_disks = DiskInfo::num_disks();
@@ -314,7 +328,8 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int 
threads_per_rotational_disk,
     total_bytes_read_counter_(TUnit::BYTES),
     read_timer_(TUnit::TIME_NS),
     file_handle_cache_(min(FLAGS_max_cached_file_handles,
-        FileSystemUtil::MaxNumFileHandles())) {
+        FileSystemUtil::MaxNumFileHandles()),
+        FLAGS_unused_file_handle_timeout_sec) {
   int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, 
min_buffer_size_);
   free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
   if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks();
@@ -396,6 +411,7 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
     }
   }
   request_context_cache_.reset(new RequestContextCache(this));
+  file_handle_cache_.Init();
 
   cached_read_options_ = hadoopRzOptionsAlloc();
   DCHECK(cached_read_options_ != nullptr);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57dae5ec/tests/custom_cluster/test_hdfs_fd_caching.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_hdfs_fd_caching.py 
b/tests/custom_cluster/test_hdfs_fd_caching.py
index 9dcd3bf..ad80cef 100644
--- a/tests/custom_cluster/test_hdfs_fd_caching.py
+++ b/tests/custom_cluster/test_hdfs_fd_caching.py
@@ -23,6 +23,7 @@ from tests.util.filesystem_utils import (
     IS_ISILON,
     IS_S3,
     IS_ADLS)
+from time import sleep
 
 @SkipIfLocal.hdfs_fd_caching
 class TestHdfsFdCaching(CustomClusterTestSuite):
@@ -59,7 +60,8 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
     super(TestHdfsFdCaching, self).teardown_method(method)
     self.client.execute("drop database if exists cachefd cascade")
 
-  def run_fd_caching_test(self, vector, caching_expected, cache_capacity):
+  def run_fd_caching_test(self, vector, caching_expected, cache_capacity,
+      eviction_timeout_secs):
     """
     Tests that HDFS file handles are cached as expected. This is used both
     for the positive and negative test cases. If caching_expected is true,
@@ -103,26 +105,52 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
     # Read all the files of the table and make sure no FD leak
     for x in range(10):
       self.execute_query("select count(*) from cachefd.simple;", vector=vector)
-      assert self.max_cached_handles() <= 16
+      assert self.max_cached_handles() <= cache_capacity
       if not caching_expected:
         assert self.cached_handles() == num_handles_start
     assert self.outstanding_handles() == 0
 
+    if caching_expected and eviction_timeout_secs is not None:
+      # To test unused file handle eviction, sleep for longer than the timeout.
+      # All the cached handles should be evicted.
+      sleep(eviction_timeout_secs + 5)
+      assert self.cached_handles() == 0
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args="--max_cached_file_handles=16",
+      impalad_args="--max_cached_file_handles=16 " +
+          " --unused_file_handle_timeout_sec=18446744073709551600",
       catalogd_args="--load_catalog_in_background=false")
   def test_caching_enabled(self, vector):
-    """Test of the HDFS file handle cache with the parameter specified"""
+    """
+    Test of the HDFS file handle cache with the parameter specified and a very
+    large file handle timeout
+    """
+
     cache_capacity = 16
 
     # Caching only applies to local HDFS files. If this is local HDFS, then 
verify
     # that caching works. Otherwise, verify that file handles are not cached.
-    if (IS_S3 or IS_ADLS or IS_ISILON or 
pytest.config.option.testing_remote_cluster):
+    if IS_S3 or IS_ADLS or IS_ISILON or 
pytest.config.option.testing_remote_cluster:
       caching_expected = False
     else:
       caching_expected = True
-    self.run_fd_caching_test(vector, caching_expected, cache_capacity)
+    self.run_fd_caching_test(vector, caching_expected, cache_capacity, None)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--max_cached_file_handles=16 
--unused_file_handle_timeout_sec=5",
+      catalogd_args="--load_catalog_in_background=false")
+  def test_caching_with_eviction(self, vector):
+    """Test of the HDFS file handle cache with unused file handle eviction 
enabled"""
+    cache_capacity = 16
+    handle_timeout = 5
+
+    # Only test eviction on platforms where caching is enabled.
+    if IS_S3 or IS_ADLS or IS_ISILON or 
pytest.config.option.testing_remote_cluster:
+      return
+    caching_expected = True
+    self.run_fd_caching_test(vector, caching_expected, cache_capacity, 
handle_timeout)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -132,7 +160,7 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
     """Test that the HDFS file handle cache is disabled when the parameter is 
zero"""
     cache_capacity = 0
     caching_expected = False
-    self.run_fd_caching_test(vector, caching_expected, cache_capacity)
+    self.run_fd_caching_test(vector, caching_expected, cache_capacity, None)
 
   def cached_handles(self):
     return self.get_agg_metric("impala-server.io.mgr.num-cached-file-handles")

Reply via email to