This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b4a3ef79 IMPALA-10214, IMPALA-10375: Ozone remote file handle caching
9b4a3ef79 is described below

commit 9b4a3ef7952c4c6aef5d33703d68bb7994644a0f
Author: Michael Smith <[email protected]>
AuthorDate: Thu Jul 28 11:18:40 2022 -0700

    IMPALA-10214, IMPALA-10375: Ozone remote file handle caching
    
    Enables support for caching remote file handles for Ozone. Local file
    handles were already cached unintentionally, similar to HDFS. Updates
    file handle cache enablement to be more stringent about enabling
    caching.
    
    File handle caching is enabled if a max_cached_file_handles is non-zero
    and any of the following are true
    - HDFS file is local
    - HDFS file is remote and cache_remote_file_handles is enabled
    - Ozone file is local or remote and cache_ozone_file_handles is enabled
    - S3 file is remote and cache_s3_file_handles is enabled
    - ABFS file is remote and cache_abfs_file_handles is enabled
    
    Enables testing Ozone in test_hdfs_fd_caching, and adds tests that
    remote caching can be disabled using individual flags.
    
    Change-Id: I9df13208999c6d3b14f4c005a91ee2a92a05bdf9
    Reviewed-on: http://gerrit.cloudera.org:8080/18853
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Csaba Ringhofer <[email protected]>
---
 be/src/runtime/io/disk-io-mgr.cc             |  3 ++
 be/src/runtime/io/handle-cache.inline.h      |  1 +
 be/src/runtime/io/request-ranges.h           |  3 ++
 be/src/runtime/io/scan-range.cc              | 41 +++++++++++++++------
 tests/common/network.py                      | 31 ++++++++++++++++
 tests/custom_cluster/test_hdfs_fd_caching.py | 54 ++++++++++++++++++++--------
 6 files changed, 109 insertions(+), 24 deletions(-)

diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 86bc7f719..e3e181290 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -190,6 +190,9 @@ DEFINE_bool(cache_s3_file_handles, true, "Enable the file 
handle cache for "
 DEFINE_bool(cache_abfs_file_handles, true, "Enable the file handle cache for "
     "ABFS files.");
 
+DEFINE_bool(cache_ozone_file_handles, false, "Enable the file handle cache for 
Ozone "
+    "files.");
+
 DECLARE_int64(min_buffer_size);
 
 static const char* DEVICE_NAME_METRIC_KEY_TEMPLATE =
diff --git a/be/src/runtime/io/handle-cache.inline.h 
b/be/src/runtime/io/handle-cache.inline.h
index cc19630ac..0d0ec31cd 100644
--- a/be/src/runtime/io/handle-cache.inline.h
+++ b/be/src/runtime/io/handle-cache.inline.h
@@ -102,6 +102,7 @@ void FileHandleCache::Accessor::Destroy() {
 
 FileHandleCache::Accessor::~Accessor() {
   if (cache_accessor_.Get()) {
+    VLOG_FILE << "hdfsUnbufferFile() fid=" << Get()->file();
     if (hdfsUnbufferFile(Get()->file()) != 0) {
       VLOG_FILE << "FS does not support file handle unbuffering, closing file="
                 << cache_accessor_.GetKey()->first;
diff --git a/be/src/runtime/io/request-ranges.h 
b/be/src/runtime/io/request-ranges.h
index 11a6b1e34..b8bfbacd9 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -393,6 +393,9 @@ class ScanRange : public RequestRange {
   /// buffer reader.
   ReadOutcome DoReadInternal(DiskQueue* queue, int disk_id, bool 
use_local_buffer);
 
+  /// Whether to use file handle caching for the current file.
+  bool FileHandleCacheEnabled();
+
   /// Same as Cancel() except it doesn't remove the scan range from
   /// reader_->active_scan_ranges_ or call WaitForInFlightRead(). This allows 
for
   /// custom handling of in flight reads or active scan ranges. For example, 
this is
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 3512861b4..09776b0c3 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -34,6 +34,7 @@ using std::unique_lock;
 DECLARE_bool(cache_remote_file_handles);
 DECLARE_bool(cache_s3_file_handles);
 DECLARE_bool(cache_abfs_file_handles);
+DECLARE_bool(cache_ozone_file_handles);
 
 // Implementation of the ScanRange functionality. Each ScanRange contains a 
queue
 // of ready buffers. For each ScanRange, there is only a single producer and
@@ -122,6 +123,31 @@ void 
ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers,
   if (unblocked) ScheduleScanRange();
 }
 
+bool ScanRange::FileHandleCacheEnabled() {
+  // Global flag for all file handle caching
+  if (!is_file_handle_caching_enabled()) return false;
+
+  if (expected_local_ && IsHdfsPath(file())) return true;
+  if (FLAGS_cache_remote_file_handles) {
+    if (disk_id_ == io_mgr_->RemoteDfsDiskId()) return true;
+  }
+
+  if (FLAGS_cache_ozone_file_handles) {
+    if (expected_local_ && IsOzonePath(file())) return true;
+    if (disk_id_ == io_mgr_->RemoteOzoneDiskId()) return true;
+  }
+
+  if (FLAGS_cache_s3_file_handles) {
+    if (disk_id_ == io_mgr_->RemoteS3DiskId()) return true;
+  }
+
+  if (FLAGS_cache_abfs_file_handles) {
+    if (disk_id_ == io_mgr_->RemoteAbfsDiskId()) return true;
+  }
+
+  return false;
+}
+
 ReadOutcome ScanRange::DoReadInternal(
     DiskQueue* queue, int disk_id, bool use_local_buff) {
   int64_t bytes_remaining = bytes_to_read_ - bytes_read_;
@@ -165,16 +191,11 @@ ReadOutcome ScanRange::DoReadInternal(
   // lock across the read call.
   // To use the file handle cache:
   // 1. It must be enabled at the daemon level.
-  // 2. The file is a local HDFS file (expected_local_) OR it is a remote HDFS 
file and
-  //    'cache_remote_file_handles' is true
-  bool use_file_handle_cache = false;
-  if (is_file_handle_caching_enabled() &&
-      (expected_local_ ||
-       (FLAGS_cache_remote_file_handles && disk_id_ == 
io_mgr_->RemoteDfsDiskId()) ||
-       (FLAGS_cache_s3_file_handles && disk_id_ == io_mgr_->RemoteS3DiskId()) 
||
-       (FLAGS_cache_abfs_file_handles && disk_id_ == 
io_mgr_->RemoteAbfsDiskId()))) {
-    use_file_handle_cache = true;
-  }
+  // 2. It must be enabled for the particular filesystem.
+  bool use_file_handle_cache = FileHandleCacheEnabled();
+  VLOG_FILE << (use_file_handle_cache ? "Using" : "Skipping")
+            << " file handle cache for " << (expected_local_ ? "local" : 
"remote")
+            << " file " << file();
   Status read_status = file_reader->Open(use_file_handle_cache);
   bool eof = false;
   if (read_status.ok()) {
diff --git a/tests/common/network.py b/tests/common/network.py
new file mode 100644
index 000000000..ea9b739c2
--- /dev/null
+++ b/tests/common/network.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Tools for identifying network characteristics.
+
+import socket
+
+
+# Retrieves the host external IP rather than localhost/127.0.0.1 so we have an 
IP that
+# Impala will consider distinct from storage backends to force remote 
scheduling.
+def get_external_ip():
+  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+  s.settimeout(0)
+  # This address is used to get the networking stack to identify a return IP 
address.
+  # Timeout=0 means it doesn't need to resolve.
+  s.connect(('10.254.254.254', 1))
+  return s.getsockname()[0]
diff --git a/tests/custom_cluster/test_hdfs_fd_caching.py 
b/tests/custom_cluster/test_hdfs_fd_caching.py
index b682ce5ce..d46232d2b 100644
--- a/tests/custom_cluster/test_hdfs_fd_caching.py
+++ b/tests/custom_cluster/test_hdfs_fd_caching.py
@@ -18,15 +18,16 @@
 import pytest
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.network import get_external_ip
 from tests.common.skip import SkipIfLocal
 from tests.util.filesystem_utils import (
     IS_ISILON,
     IS_ADLS,
     IS_GCS,
-    IS_COS,
-    IS_OZONE)
+    IS_COS)
 from time import sleep
 
+
 @SkipIfLocal.hdfs_fd_caching
 class TestHdfsFdCaching(CustomClusterTestSuite):
   """Tests that if HDFS file handle caching is enabled, file handles are 
actually cached
@@ -120,20 +121,21 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args="--max_cached_file_handles=16 " +
-          " --unused_file_handle_timeout_sec=18446744073709551600",
+      impalad_args="--max_cached_file_handles=16"
+                   " --unused_file_handle_timeout_sec=18446744073709551600"
+                   " --cache_ozone_file_handles=true",
       catalogd_args="--load_catalog_in_background=false")
   def test_caching_enabled(self, vector):
     """
     Test of the HDFS file handle cache with the parameter specified and a very
     large file handle timeout
     """
-
     cache_capacity = 16
 
-    # Caching applies to HDFS, S3, and ABFS files. If this is HDFS, S3, or 
ABFS, then
-    # verify that caching works. Otherwise, verify that file handles are not 
cached.
-    if IS_ADLS or IS_ISILON or IS_GCS or IS_COS or IS_OZONE:
+    # Caching applies to HDFS, Ozone, S3, and ABFS files. If this is HDFS, 
Ozone, S3, or
+    # ABFS, then verify that caching works. Otherwise, verify that file 
handles are not
+    # cached.
+    if IS_ADLS or IS_ISILON or IS_GCS or IS_COS:
       caching_expected = False
     else:
       caching_expected = True
@@ -141,7 +143,8 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args="--max_cached_file_handles=16 
--unused_file_handle_timeout_sec=5",
+      impalad_args="--max_cached_file_handles=16 
--unused_file_handle_timeout_sec=5"
+                   " --cache_ozone_file_handles=true",
       catalogd_args="--load_catalog_in_background=false")
   def test_caching_with_eviction(self, vector):
     """Test of the HDFS file handle cache with unused file handle eviction 
enabled"""
@@ -149,14 +152,14 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
     handle_timeout = 5
 
     # Only test eviction on platforms where caching is enabled.
-    if IS_ADLS or IS_ISILON or IS_GCS or IS_COS or IS_OZONE:
+    if IS_ADLS or IS_ISILON or IS_GCS or IS_COS:
       return
     caching_expected = True
     self.run_fd_caching_test(vector, caching_expected, cache_capacity, 
handle_timeout)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args="--max_cached_file_handles=0",
+      impalad_args="--max_cached_file_handles=0 
--cache_ozone_file_handles=true",
       catalogd_args="--load_catalog_in_background=false")
   def test_caching_disabled_by_param(self, vector):
     """Test that the HDFS file handle cache is disabled when the parameter is 
zero"""
@@ -166,8 +169,31 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args="--max_cached_file_handles=16 
--unused_file_handle_timeout_sec=5 " +
-                   "--always_use_data_cache=true",
+      impalad_args="--cache_remote_file_handles=false 
--cache_s3_file_handles=false "
+                   "--cache_abfs_file_handles=false --hostname=" + 
get_external_ip(),
+      catalogd_args="--load_catalog_in_background=false")
+  def test_remote_caching_disabled_by_param(self, vector):
+    """Test that the file handle cache is disabled for remote files when 
disabled"""
+    cache_capacity = 0
+    caching_expected = False
+    self.run_fd_caching_test(vector, caching_expected, cache_capacity, None)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--max_cached_file_handles=0 
--cache_ozone_file_handles=true "
+                   "--hostname=" + get_external_ip(),
+      catalogd_args="--load_catalog_in_background=false")
+  def test_remote_caching_disabled_by_global_param(self, vector):
+    """Test that the file handle cache is disabled for remote files when all 
caching is
+    disabled"""
+    cache_capacity = 0
+    caching_expected = False
+    self.run_fd_caching_test(vector, caching_expected, cache_capacity, None)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--max_cached_file_handles=16 
--unused_file_handle_timeout_sec=5 "
+                   "--always_use_data_cache=true 
--cache_ozone_file_handles=true",
       start_args="--data_cache_dir=/tmp --data_cache_size=500MB",
       catalogd_args="--load_catalog_in_background=false")
   def test_no_fd_caching_on_cached_data(self, vector):
@@ -177,7 +203,7 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
     eviction_timeout_secs = 5
 
     # Only test eviction on platforms where caching is enabled.
-    if IS_ADLS or IS_ISILON or IS_GCS or IS_COS or IS_OZONE:
+    if IS_ADLS or IS_ISILON or IS_GCS or IS_COS:
       return
 
     # Maximum number of file handles cached.

Reply via email to