This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit a159eb52f8d3efda5223dfa4f7a9eced5ce48d77 Author: Yida Wu <yida...@cloudera.com> AuthorDate: Wed Jan 22 03:03:58 2025 -0800 IMPALA-13677: Support remote scratch directory cleanup at Impala daemon startup This patch introduces a new feature for cleaning up remote scratch files during Impala daemon startup, ensuring that potential leftover files from abnormal shutdowns are removed. To allow efficient cleanup, this patch also refines the remote scratch directory hierarchy by adding a host-level directory, changing it from: <base_dir>/<backend_id>_<query_id>/<file_name> to: <base_dir>/<hostname>/<backend_id>_<query_id>/<file_name> <base_dir> is <scratch_dir_config_path>/impala-scratch. During startup, if the host-level directory exists, it will be removed entirely. This design assumes one Impala daemon per host, and it also assumes that multiple Impala clusters don't share the same scratch_dir path on remote filesystem. Even if they share the same prefix, each Impala cluster should have dedicated paths: --scratch_dirs=hdfs://remote_dir/scratch/impala1 --scratch_dirs=hdfs://remote_dir/scratch/impala2 Also added one flag remote_scratch_cleanup_on_startup to control whether the host-level directory is cleaned during Impala daemon startup. By default, this feature is enabled. If multiple daemons on a host or multiple clusters share the same remote scratch_dir path, we can set this to false to prevent unintended cleanup. Tests: Passed exhaustive tests. Adds testcase test_scratch_dirs_remote_spill_leftover_files_removal. Change-Id: Iadd49b7384d52bac5ddab4e86cd9f39dc2c88e1b Reviewed-on: http://gerrit.cloudera.org:8080/22378 Reviewed-by: Abhishek Rawat <ara...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/runtime/io/disk-io-mgr.cc | 1 + be/src/runtime/tmp-file-mgr-test.cc | 2 + be/src/runtime/tmp-file-mgr.cc | 63 ++++++++++++++++++++++++------- be/src/runtime/tmp-file-mgr.h | 9 ++++- tests/custom_cluster/test_scratch_disk.py | 40 ++++++++++++++++++++ 5 files changed, 100 insertions(+), 15 deletions(-) diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc index 5664b7e56..6830fe1a5 100644 --- a/be/src/runtime/io/disk-io-mgr.cc +++ b/be/src/runtime/io/disk-io-mgr.cc @@ -403,6 +403,7 @@ end: if (status.ok()) { disk_file_dst_->SetStatus(io::DiskFileStatus::PERSISTED); disk_file_dst_->SetActualFileSize(file_size); + VLOG(2) << "File upload succeeded. File name: " << remote_file_path; } else { LOG(WARNING) << "File upload failed, msg:" << status.msg().msg(); } diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index 44f5463d3..73b70747d 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -60,6 +60,7 @@ DECLARE_string(remote_tmp_file_size); DECLARE_int32(wait_for_spill_buffer_timeout_s); DECLARE_bool(remote_batch_read); DECLARE_string(remote_read_memory_buffer_size); +DECLARE_bool(remote_scratch_cleanup_on_startup); namespace impala { @@ -1163,6 +1164,7 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsingRemotePath) { // Create a fake s3 connection in order to pass the connection verification. HdfsFsCache::HdfsFsMap fake_hdfs_conn_map; hdfsFS fake_conn = reinterpret_cast<hdfsFS>(1); + FLAGS_remote_scratch_cleanup_on_startup = false; fake_hdfs_conn_map.insert(make_pair("s3a://fake_host/", fake_conn)); // Two types of paths, one with directory, one without. vector<string> s3_paths{"s3a://fake_host", "s3a://fake_host/dir"}; diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index e25bc2ca0..23c6b4091 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -118,6 +118,11 @@ DEFINE_bool(remote_batch_read, false, "Set if the system uses batch reading for the remote temporary files. Batch reading" "allows reading a block asynchronously when the buffer pool is trying to pin one" "page of that block."); +DEFINE_bool(remote_scratch_cleanup_on_startup, true, + "If enabled, the Impala daemon will clean up the host-level directory within the " + "specified remote scratch directory during startup to remove potential leftover " + "files. This assumes a single Impala daemon per host. " + "For multiple daemons on a host, set this to false to prevent unintended cleanup."); using boost::algorithm::is_any_of; using boost::algorithm::join; @@ -441,22 +446,48 @@ void TmpFileMgr::NewFile( new_file->reset(new TmpFileLocal(file_group, device_id, new_file_path.string())); } -void TmpFileMgr::RemoveRemoteDir(TmpFileGroup* file_group, DeviceId device_id) { +static string ConstructRemoteDirPath(const string& base_dir, const string& hostname, + const string& backend_id = "", const string& query_id = "") { + stringstream dir; + dir << base_dir << "/" << hostname; + if (!backend_id.empty()) { + dir << "/" << backend_id; + DCHECK(!query_id.empty()); + dir << "_" << query_id; + } + return dir.str(); +} + +void TmpFileMgr::RemoveRemoteDirForHost(const string& dir, hdfsFS hdfs_conn) { + if (!FLAGS_remote_scratch_cleanup_on_startup) return; + DCHECK(hdfs_conn != nullptr); + const string hostlevel_dir = ConstructRemoteDirPath( + dir, ExecEnv::GetInstance()->configured_backend_address().hostname); + if (hdfsExists(hdfs_conn, hostlevel_dir.c_str()) == 0) { + hdfsDelete(hdfs_conn, hostlevel_dir.c_str(), 1); + LOG(INFO) << "Called to remove the host-level remote directory " << hostlevel_dir; + } +} + +void TmpFileMgr::RemoveRemoteDirForQuery(TmpFileGroup* file_group) { if (tmp_dirs_remote_ == nullptr) return; - string dir = tmp_dirs_remote_->path_; - stringstream files_dir; - files_dir << dir << "/" << PrintId(ExecEnv::GetInstance()->backend_id(), "_") << "_" - << PrintId(file_group->unique_id(), "_"); + const string& dir = tmp_dirs_remote_->path_; + const string& hostname = ExecEnv::GetInstance()->configured_backend_address().hostname; + const string backend_id = PrintId(ExecEnv::GetInstance()->backend_id(), "_"); + const string query_id = PrintId(file_group->unique_id(), "_"); + const string querylevel_dir = + ConstructRemoteDirPath(dir, hostname, backend_id, query_id); hdfsFS hdfs_conn; Status status = - HdfsFsCache::instance()->GetConnection(files_dir.str(), &hdfs_conn, &hdfs_conns_); + HdfsFsCache::instance()->GetConnection(querylevel_dir, &hdfs_conn, &hdfs_conns_); if (status.ok()) { DCHECK(hdfs_conn != nullptr); - hdfsDelete(hdfs_conn, files_dir.str().c_str(), 1); + hdfsDelete(hdfs_conn, querylevel_dir.c_str(), 1); + LOG(INFO) << "Called to remove the query-level remote directory " << querylevel_dir; } else { LOG(WARNING) << "Failed to remove the remote directory because unable to create a " "connection to " - << files_dir.str(); + << querylevel_dir; } } @@ -809,10 +840,12 @@ Status TmpDirS3::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_ bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) { // For the S3 path, it doesn't need to create the directory for the uploading // as long as the S3 address is correct. + DCHECK(tmp_mgr != nullptr); DCHECK(!path_.empty()); hdfsFS hdfs_conn; RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( path_, &hdfs_conn, &(tmp_mgr->hdfs_conns_), tmp_mgr->s3a_options())); + tmp_mgr->RemoveRemoteDirForHost(path_, hdfs_conn); return Status::OK(); } @@ -841,14 +874,17 @@ Status TmpDirHdfs::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_di bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) { DCHECK(!path_.empty()); hdfsFS hdfs_conn; - // If the HDFS path doesn't exist, it would fail while uploading, so we - // create the HDFS path if it doesn't exist. RETURN_IF_ERROR( HdfsFsCache::instance()->GetConnection(path_, &hdfs_conn, &(tmp_mgr->hdfs_conns_))); if (hdfsExists(hdfs_conn, path_.c_str()) != 0) { + // If the impala scratch path in hdfs doesn't exist, attempt to create the path to + // verify it's valid and writable for scratch usage. + // Failure may indicate a permission or configuration issue. if (hdfsCreateDirectory(hdfs_conn, path_.c_str()) != 0) { return Status(GetHdfsErrorMsg("HDFS create path failed: ", path_)); } + } else { + tmp_mgr->RemoveRemoteDirForHost(path_, hdfs_conn); } return Status::OK(); } @@ -1323,7 +1359,7 @@ void TmpFileGroup::Close() { } CloseInternal<std::unique_ptr<TmpFile>>(tmp_files_); CloseInternal<std::shared_ptr<TmpFile>>(tmp_files_remote_); - tmp_file_mgr_->RemoveRemoteDir(this, 0); + tmp_file_mgr_->RemoveRemoteDirForQuery(this); tmp_file_mgr_->scratch_bytes_used_metric_->Increment( -1 * scratch_space_bytes_used_counter_->value()); } @@ -1405,8 +1441,9 @@ Status TmpFileGroup::AllocateRemoteSpace(int64_t num_bytes, TmpFile** tmp_file, DeviceId dev_id = tmp_file_mgr_->tmp_dirs_.size(); string unique_name = lexical_cast<string>(random_generator()()); stringstream file_name; - dir = dir + "/" + PrintId(ExecEnv::GetInstance()->backend_id(), "_") + "_" - + PrintId(unique_id(), "_"); + dir = ConstructRemoteDirPath(dir, + ExecEnv::GetInstance()->configured_backend_address().hostname, + PrintId(ExecEnv::GetInstance()->backend_id(), "_"), PrintId(unique_id(), "_")); string new_file_path = GenerateNewPath(dir, unique_name); const string& local_buffer_dir = tmp_file_mgr_->local_buff_dir_->path(); diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index a6a3cb5a7..a345b5a8a 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -347,8 +347,13 @@ class TmpFileMgr { void NewFile(TmpFileGroup* file_group, DeviceId device_id, std::unique_ptr<TmpFile>* new_file); - /// Remove the remote directory which stores tmp files of the tmp file group. - void RemoveRemoteDir(TmpFileGroup* file_group, DeviceId device_id); + /// Remove the remote directory which stores temporary files of the tmp file group. + void RemoveRemoteDirForQuery(TmpFileGroup* file_group); + + /// Remove the remote directory which stores temporary files of the entire host. + /// Called during Impala daemon startup to clean up any leftover temporary files. + /// Assumes there is only one Impala daemon process running per host. + void RemoveRemoteDirForHost(const string& dir, hdfsFS hdfs_conn); bool initialized_ = false; diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py index 525a0d9cb..05508e3c3 100644 --- a/tests/custom_cluster/test_scratch_disk.py +++ b/tests/custom_cluster/test_scratch_disk.py @@ -22,6 +22,7 @@ from builtins import range import os import pytest import re +import socket import stat import subprocess import time @@ -529,3 +530,42 @@ class TestScratchDir(CustomClusterTestSuite): assert (metrics1 > 0) client.close_query(handle) client.close() + + @pytest.mark.execute_serially + @SkipIf.not_scratch_fs + def test_scratch_dirs_remote_spill_leftover_files_removal(self, vector): + # Test remote scratch directory cleanup after Impala daemon restart. + normal_dirs = self.generate_dirs('scratch_dirs_remote_spill', 1) + normal_dirs.append(self.dfs_tmp_path()) + self._start_impala_cluster([ + '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)), + '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'], + cluster_size=1, + expected_num_impalads=1) + self.assert_impalad_log_contains("INFO", "Using scratch directory ", + expected_count=len(normal_dirs) - 1) + vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit + impalad = self.cluster.impalads[0] + client = impalad.service.create_beeswax_client() + self.execute_query_async_using_client(client, self.spill_query_big_table, vector) + verifier = MetricVerifier(impalad.service) + verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) + # Dir0 is the remote directory. + impalad.service.wait_for_metric_value( + 'tmp-file-mgr.scratch-space-bytes-used.dir-0', 1, allow_greater=True) + impalad.kill() + client.close() + hostname = socket.gethostname() + # Verify that there are leftover files in the remote scratch dirs after being killed. + full_dfs_tmp_path = "{}/impala-scratch".format(self.dfs_tmp_path()) + files_result = subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path]) + assert "Found 1 items" in files_result + assert hostname in files_result + full_dfs_tmp_path_with_hostname = "{}/{}".format(full_dfs_tmp_path, hostname) + files_result = subprocess.check_output(["hdfs", "dfs", "-ls", + full_dfs_tmp_path_with_hostname]) + assert "Found 1 items" in files_result + impalad.start() + # Verify that the leftover files being removed after the impala daemon restarted. + files_result = subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path]) + assert files_result == ""