This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 9b4a3ef79 IMPALA-10214, IMPALA-10375: Ozone remote file handle caching
9b4a3ef79 is described below
commit 9b4a3ef7952c4c6aef5d33703d68bb7994644a0f
Author: Michael Smith <[email protected]>
AuthorDate: Thu Jul 28 11:18:40 2022 -0700
IMPALA-10214, IMPALA-10375: Ozone remote file handle caching
Enables support for caching remote file handles for Ozone. Local file
handles were already cached unintentionally, similar to HDFS. Updates
file handle cache enablement to be more stringent about enabling
caching.
File handle caching is enabled if a max_cached_file_handles is non-zero
and any of the following are true
- HDFS file is local
- HDFS file is remote and cache_remote_file_handles is enabled
- Ozone file is local or remote and cache_ozone_file_handles is enabled
- S3 file is remote and cache_s3_file_handles is enabled
- ABFS file is remote and cache_abfs_file_handles is enabled
Enables testing Ozone in test_hdfs_fd_caching, and adds tests that
remote caching can be disabled using individual flags.
Change-Id: I9df13208999c6d3b14f4c005a91ee2a92a05bdf9
Reviewed-on: http://gerrit.cloudera.org:8080/18853
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Csaba Ringhofer <[email protected]>
---
be/src/runtime/io/disk-io-mgr.cc | 3 ++
be/src/runtime/io/handle-cache.inline.h | 1 +
be/src/runtime/io/request-ranges.h | 3 ++
be/src/runtime/io/scan-range.cc | 41 +++++++++++++++------
tests/common/network.py | 31 ++++++++++++++++
tests/custom_cluster/test_hdfs_fd_caching.py | 54 ++++++++++++++++++++--------
6 files changed, 109 insertions(+), 24 deletions(-)
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 86bc7f719..e3e181290 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -190,6 +190,9 @@ DEFINE_bool(cache_s3_file_handles, true, "Enable the file
handle cache for "
DEFINE_bool(cache_abfs_file_handles, true, "Enable the file handle cache for "
"ABFS files.");
+DEFINE_bool(cache_ozone_file_handles, false, "Enable the file handle cache for
Ozone "
+ "files.");
+
DECLARE_int64(min_buffer_size);
static const char* DEVICE_NAME_METRIC_KEY_TEMPLATE =
diff --git a/be/src/runtime/io/handle-cache.inline.h
b/be/src/runtime/io/handle-cache.inline.h
index cc19630ac..0d0ec31cd 100644
--- a/be/src/runtime/io/handle-cache.inline.h
+++ b/be/src/runtime/io/handle-cache.inline.h
@@ -102,6 +102,7 @@ void FileHandleCache::Accessor::Destroy() {
FileHandleCache::Accessor::~Accessor() {
if (cache_accessor_.Get()) {
+ VLOG_FILE << "hdfsUnbufferFile() fid=" << Get()->file();
if (hdfsUnbufferFile(Get()->file()) != 0) {
VLOG_FILE << "FS does not support file handle unbuffering, closing file="
<< cache_accessor_.GetKey()->first;
diff --git a/be/src/runtime/io/request-ranges.h
b/be/src/runtime/io/request-ranges.h
index 11a6b1e34..b8bfbacd9 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -393,6 +393,9 @@ class ScanRange : public RequestRange {
/// buffer reader.
ReadOutcome DoReadInternal(DiskQueue* queue, int disk_id, bool
use_local_buffer);
+ /// Whether to use file handle caching for the current file.
+ bool FileHandleCacheEnabled();
+
/// Same as Cancel() except it doesn't remove the scan range from
/// reader_->active_scan_ranges_ or call WaitForInFlightRead(). This allows
for
/// custom handling of in flight reads or active scan ranges. For example,
this is
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 3512861b4..09776b0c3 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -34,6 +34,7 @@ using std::unique_lock;
DECLARE_bool(cache_remote_file_handles);
DECLARE_bool(cache_s3_file_handles);
DECLARE_bool(cache_abfs_file_handles);
+DECLARE_bool(cache_ozone_file_handles);
// Implementation of the ScanRange functionality. Each ScanRange contains a
queue
// of ready buffers. For each ScanRange, there is only a single producer and
@@ -122,6 +123,31 @@ void
ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers,
if (unblocked) ScheduleScanRange();
}
+bool ScanRange::FileHandleCacheEnabled() {
+ // Global flag for all file handle caching
+ if (!is_file_handle_caching_enabled()) return false;
+
+ if (expected_local_ && IsHdfsPath(file())) return true;
+ if (FLAGS_cache_remote_file_handles) {
+ if (disk_id_ == io_mgr_->RemoteDfsDiskId()) return true;
+ }
+
+ if (FLAGS_cache_ozone_file_handles) {
+ if (expected_local_ && IsOzonePath(file())) return true;
+ if (disk_id_ == io_mgr_->RemoteOzoneDiskId()) return true;
+ }
+
+ if (FLAGS_cache_s3_file_handles) {
+ if (disk_id_ == io_mgr_->RemoteS3DiskId()) return true;
+ }
+
+ if (FLAGS_cache_abfs_file_handles) {
+ if (disk_id_ == io_mgr_->RemoteAbfsDiskId()) return true;
+ }
+
+ return false;
+}
+
ReadOutcome ScanRange::DoReadInternal(
DiskQueue* queue, int disk_id, bool use_local_buff) {
int64_t bytes_remaining = bytes_to_read_ - bytes_read_;
@@ -165,16 +191,11 @@ ReadOutcome ScanRange::DoReadInternal(
// lock across the read call.
// To use the file handle cache:
// 1. It must be enabled at the daemon level.
- // 2. The file is a local HDFS file (expected_local_) OR it is a remote HDFS
file and
- // 'cache_remote_file_handles' is true
- bool use_file_handle_cache = false;
- if (is_file_handle_caching_enabled() &&
- (expected_local_ ||
- (FLAGS_cache_remote_file_handles && disk_id_ ==
io_mgr_->RemoteDfsDiskId()) ||
- (FLAGS_cache_s3_file_handles && disk_id_ == io_mgr_->RemoteS3DiskId())
||
- (FLAGS_cache_abfs_file_handles && disk_id_ ==
io_mgr_->RemoteAbfsDiskId()))) {
- use_file_handle_cache = true;
- }
+ // 2. It must be enabled for the particular filesystem.
+ bool use_file_handle_cache = FileHandleCacheEnabled();
+ VLOG_FILE << (use_file_handle_cache ? "Using" : "Skipping")
+ << " file handle cache for " << (expected_local_ ? "local" :
"remote")
+ << " file " << file();
Status read_status = file_reader->Open(use_file_handle_cache);
bool eof = false;
if (read_status.ok()) {
diff --git a/tests/common/network.py b/tests/common/network.py
new file mode 100644
index 000000000..ea9b739c2
--- /dev/null
+++ b/tests/common/network.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Tools for identifying network characteristics.
+
+import socket
+
+
+# Retrieves the host external IP rather than localhost/127.0.0.1 so we have an
IP that
+# Impala will consider distinct from storage backends to force remote
scheduling.
+def get_external_ip():
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ s.settimeout(0)
+ # This address is used to get the networking stack to identify a return IP
address.
+ # Timeout=0 means it doesn't need to resolve.
+ s.connect(('10.254.254.254', 1))
+ return s.getsockname()[0]
diff --git a/tests/custom_cluster/test_hdfs_fd_caching.py
b/tests/custom_cluster/test_hdfs_fd_caching.py
index b682ce5ce..d46232d2b 100644
--- a/tests/custom_cluster/test_hdfs_fd_caching.py
+++ b/tests/custom_cluster/test_hdfs_fd_caching.py
@@ -18,15 +18,16 @@
import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.network import get_external_ip
from tests.common.skip import SkipIfLocal
from tests.util.filesystem_utils import (
IS_ISILON,
IS_ADLS,
IS_GCS,
- IS_COS,
- IS_OZONE)
+ IS_COS)
from time import sleep
+
@SkipIfLocal.hdfs_fd_caching
class TestHdfsFdCaching(CustomClusterTestSuite):
"""Tests that if HDFS file handle caching is enabled, file handles are
actually cached
@@ -120,20 +121,21 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
- impalad_args="--max_cached_file_handles=16 " +
- " --unused_file_handle_timeout_sec=18446744073709551600",
+ impalad_args="--max_cached_file_handles=16"
+ " --unused_file_handle_timeout_sec=18446744073709551600"
+ " --cache_ozone_file_handles=true",
catalogd_args="--load_catalog_in_background=false")
def test_caching_enabled(self, vector):
"""
Test of the HDFS file handle cache with the parameter specified and a very
large file handle timeout
"""
-
cache_capacity = 16
- # Caching applies to HDFS, S3, and ABFS files. If this is HDFS, S3, or
ABFS, then
- # verify that caching works. Otherwise, verify that file handles are not
cached.
- if IS_ADLS or IS_ISILON or IS_GCS or IS_COS or IS_OZONE:
+ # Caching applies to HDFS, Ozone, S3, and ABFS files. If this is HDFS,
Ozone, S3, or
+ # ABFS, then verify that caching works. Otherwise, verify that file
handles are not
+ # cached.
+ if IS_ADLS or IS_ISILON or IS_GCS or IS_COS:
caching_expected = False
else:
caching_expected = True
@@ -141,7 +143,8 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
- impalad_args="--max_cached_file_handles=16
--unused_file_handle_timeout_sec=5",
+ impalad_args="--max_cached_file_handles=16
--unused_file_handle_timeout_sec=5"
+ " --cache_ozone_file_handles=true",
catalogd_args="--load_catalog_in_background=false")
def test_caching_with_eviction(self, vector):
"""Test of the HDFS file handle cache with unused file handle eviction
enabled"""
@@ -149,14 +152,14 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
handle_timeout = 5
# Only test eviction on platforms where caching is enabled.
- if IS_ADLS or IS_ISILON or IS_GCS or IS_COS or IS_OZONE:
+ if IS_ADLS or IS_ISILON or IS_GCS or IS_COS:
return
caching_expected = True
self.run_fd_caching_test(vector, caching_expected, cache_capacity,
handle_timeout)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
- impalad_args="--max_cached_file_handles=0",
+ impalad_args="--max_cached_file_handles=0
--cache_ozone_file_handles=true",
catalogd_args="--load_catalog_in_background=false")
def test_caching_disabled_by_param(self, vector):
"""Test that the HDFS file handle cache is disabled when the parameter is
zero"""
@@ -166,8 +169,31 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
- impalad_args="--max_cached_file_handles=16
--unused_file_handle_timeout_sec=5 " +
- "--always_use_data_cache=true",
+ impalad_args="--cache_remote_file_handles=false
--cache_s3_file_handles=false "
+ "--cache_abfs_file_handles=false --hostname=" +
get_external_ip(),
+ catalogd_args="--load_catalog_in_background=false")
+ def test_remote_caching_disabled_by_param(self, vector):
+ """Test that the file handle cache is disabled for remote files when
disabled"""
+ cache_capacity = 0
+ caching_expected = False
+ self.run_fd_caching_test(vector, caching_expected, cache_capacity, None)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--max_cached_file_handles=0
--cache_ozone_file_handles=true "
+ "--hostname=" + get_external_ip(),
+ catalogd_args="--load_catalog_in_background=false")
+ def test_remote_caching_disabled_by_global_param(self, vector):
+ """Test that the file handle cache is disabled for remote files when all
caching is
+ disabled"""
+ cache_capacity = 0
+ caching_expected = False
+ self.run_fd_caching_test(vector, caching_expected, cache_capacity, None)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--max_cached_file_handles=16
--unused_file_handle_timeout_sec=5 "
+ "--always_use_data_cache=true
--cache_ozone_file_handles=true",
start_args="--data_cache_dir=/tmp --data_cache_size=500MB",
catalogd_args="--load_catalog_in_background=false")
def test_no_fd_caching_on_cached_data(self, vector):
@@ -177,7 +203,7 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
eviction_timeout_secs = 5
# Only test eviction on platforms where caching is enabled.
- if IS_ADLS or IS_ISILON or IS_GCS or IS_COS or IS_OZONE:
+ if IS_ADLS or IS_ISILON or IS_GCS or IS_COS:
return
# Maximum number of file handles cached.