This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a159eb52f8d3efda5223dfa4f7a9eced5ce48d77
Author: Yida Wu <yida...@cloudera.com>
AuthorDate: Wed Jan 22 03:03:58 2025 -0800

    IMPALA-13677: Support remote scratch directory cleanup at Impala daemon 
startup
    
    This patch introduces a new feature for cleaning up remote scratch
    files during Impala daemon startup, ensuring that potential leftover
    files from abnormal shutdowns are removed.
    
    To allow efficient cleanup, this patch also refines the remote
    scratch directory hierarchy by adding a host-level directory,
    changing it from:
    <base_dir>/<backend_id>_<query_id>/<file_name>
    to:
    <base_dir>/<hostname>/<backend_id>_<query_id>/<file_name>
    <base_dir> is <scratch_dir_config_path>/impala-scratch.
    
    During startup, if the host-level directory exists, it will be
    removed entirely.
    
    This design assumes one Impala daemon per host, and it also
    assumes that multiple Impala clusters don't share the same
    scratch_dir path on remote filesystem. Even if they share the
    same prefix, each Impala cluster should have dedicated paths:
    --scratch_dirs=hdfs://remote_dir/scratch/impala1
    --scratch_dirs=hdfs://remote_dir/scratch/impala2
    
    Also added one flag remote_scratch_cleanup_on_startup to control
    whether the host-level directory is cleaned during Impala daemon
    startup. By default, this feature is enabled. If multiple daemons
    on a host or multiple clusters share the same remote scratch_dir
    path, we can set this to false to prevent unintended cleanup.
    
    Tests:
    Passed exhaustive tests.
    Adds testcase test_scratch_dirs_remote_spill_leftover_files_removal.
    
    Change-Id: Iadd49b7384d52bac5ddab4e86cd9f39dc2c88e1b
    Reviewed-on: http://gerrit.cloudera.org:8080/22378
    Reviewed-by: Abhishek Rawat <ara...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/runtime/io/disk-io-mgr.cc          |  1 +
 be/src/runtime/tmp-file-mgr-test.cc       |  2 +
 be/src/runtime/tmp-file-mgr.cc            | 63 ++++++++++++++++++++++++-------
 be/src/runtime/tmp-file-mgr.h             |  9 ++++-
 tests/custom_cluster/test_scratch_disk.py | 40 ++++++++++++++++++++
 5 files changed, 100 insertions(+), 15 deletions(-)

diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 5664b7e56..6830fe1a5 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -403,6 +403,7 @@ end:
   if (status.ok()) {
     disk_file_dst_->SetStatus(io::DiskFileStatus::PERSISTED);
     disk_file_dst_->SetActualFileSize(file_size);
+    VLOG(2) << "File upload succeeded. File name: " << remote_file_path;
   } else {
     LOG(WARNING) << "File upload failed, msg:" << status.msg().msg();
   }
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index 44f5463d3..73b70747d 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -60,6 +60,7 @@ DECLARE_string(remote_tmp_file_size);
 DECLARE_int32(wait_for_spill_buffer_timeout_s);
 DECLARE_bool(remote_batch_read);
 DECLARE_string(remote_read_memory_buffer_size);
+DECLARE_bool(remote_scratch_cleanup_on_startup);
 
 namespace impala {
 
@@ -1163,6 +1164,7 @@ TEST_F(TmpFileMgrTest, 
TestDirectoryLimitParsingRemotePath) {
   // Create a fake s3 connection in order to pass the connection verification.
   HdfsFsCache::HdfsFsMap fake_hdfs_conn_map;
   hdfsFS fake_conn = reinterpret_cast<hdfsFS>(1);
+  FLAGS_remote_scratch_cleanup_on_startup = false;
   fake_hdfs_conn_map.insert(make_pair("s3a://fake_host/", fake_conn));
   // Two types of paths, one with directory, one without.
   vector<string> s3_paths{"s3a://fake_host", "s3a://fake_host/dir"};
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index e25bc2ca0..23c6b4091 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -118,6 +118,11 @@ DEFINE_bool(remote_batch_read, false,
     "Set if the system uses batch reading for the remote temporary files. 
Batch reading"
     "allows reading a block asynchronously when the buffer pool is trying to 
pin one"
     "page of that block.");
+DEFINE_bool(remote_scratch_cleanup_on_startup, true,
+    "If enabled, the Impala daemon will clean up the host-level directory 
within the "
+    "specified remote scratch directory during startup to remove potential 
leftover "
+    "files. This assumes a single Impala daemon per host. "
+    "For multiple daemons on a host, set this to false to prevent unintended 
cleanup.");
 
 using boost::algorithm::is_any_of;
 using boost::algorithm::join;
@@ -441,22 +446,48 @@ void TmpFileMgr::NewFile(
   new_file->reset(new TmpFileLocal(file_group, device_id, 
new_file_path.string()));
 }
 
-void TmpFileMgr::RemoveRemoteDir(TmpFileGroup* file_group, DeviceId device_id) 
{
+static string ConstructRemoteDirPath(const string& base_dir, const string& 
hostname,
+    const string& backend_id = "", const string& query_id = "") {
+  stringstream dir;
+  dir << base_dir << "/" << hostname;
+  if (!backend_id.empty()) {
+    dir << "/" << backend_id;
+    DCHECK(!query_id.empty());
+    dir << "_" << query_id;
+  }
+  return dir.str();
+}
+
+void TmpFileMgr::RemoveRemoteDirForHost(const string& dir, hdfsFS hdfs_conn) {
+  if (!FLAGS_remote_scratch_cleanup_on_startup) return;
+  DCHECK(hdfs_conn != nullptr);
+  const string hostlevel_dir = ConstructRemoteDirPath(
+      dir, ExecEnv::GetInstance()->configured_backend_address().hostname);
+  if (hdfsExists(hdfs_conn, hostlevel_dir.c_str()) == 0) {
+    hdfsDelete(hdfs_conn, hostlevel_dir.c_str(), 1);
+    LOG(INFO) << "Called to remove the host-level remote directory " << 
hostlevel_dir;
+  }
+}
+
+void TmpFileMgr::RemoveRemoteDirForQuery(TmpFileGroup* file_group) {
   if (tmp_dirs_remote_ == nullptr) return;
-  string dir = tmp_dirs_remote_->path_;
-  stringstream files_dir;
-  files_dir << dir << "/" << PrintId(ExecEnv::GetInstance()->backend_id(), 
"_") << "_"
-            << PrintId(file_group->unique_id(), "_");
+  const string& dir = tmp_dirs_remote_->path_;
+  const string& hostname = 
ExecEnv::GetInstance()->configured_backend_address().hostname;
+  const string backend_id = PrintId(ExecEnv::GetInstance()->backend_id(), "_");
+  const string query_id = PrintId(file_group->unique_id(), "_");
+  const string querylevel_dir =
+      ConstructRemoteDirPath(dir, hostname, backend_id, query_id);
   hdfsFS hdfs_conn;
   Status status =
-      HdfsFsCache::instance()->GetConnection(files_dir.str(), &hdfs_conn, 
&hdfs_conns_);
+      HdfsFsCache::instance()->GetConnection(querylevel_dir, &hdfs_conn, 
&hdfs_conns_);
   if (status.ok()) {
     DCHECK(hdfs_conn != nullptr);
-    hdfsDelete(hdfs_conn, files_dir.str().c_str(), 1);
+    hdfsDelete(hdfs_conn, querylevel_dir.c_str(), 1);
+    LOG(INFO) << "Called to remove the query-level remote directory " << 
querylevel_dir;
   } else {
     LOG(WARNING) << "Failed to remove the remote directory because unable to 
create a "
                     "connection to "
-                 << files_dir.str();
+                 << querylevel_dir;
   }
 }
 
@@ -809,10 +840,12 @@ Status TmpDirS3::VerifyAndCreate(MetricGroup* metrics, 
vector<bool>* is_tmp_dir_
     bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) {
   // For the S3 path, it doesn't need to create the directory for the uploading
   // as long as the S3 address is correct.
+  DCHECK(tmp_mgr != nullptr);
   DCHECK(!path_.empty());
   hdfsFS hdfs_conn;
   RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
       path_, &hdfs_conn, &(tmp_mgr->hdfs_conns_), tmp_mgr->s3a_options()));
+  tmp_mgr->RemoveRemoteDirForHost(path_, hdfs_conn);
   return Status::OK();
 }
 
@@ -841,14 +874,17 @@ Status TmpDirHdfs::VerifyAndCreate(MetricGroup* metrics, 
vector<bool>* is_tmp_di
     bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) {
   DCHECK(!path_.empty());
   hdfsFS hdfs_conn;
-  // If the HDFS path doesn't exist, it would fail while uploading, so we
-  // create the HDFS path if it doesn't exist.
   RETURN_IF_ERROR(
       HdfsFsCache::instance()->GetConnection(path_, &hdfs_conn, 
&(tmp_mgr->hdfs_conns_)));
   if (hdfsExists(hdfs_conn, path_.c_str()) != 0) {
+    // If the impala scratch path in hdfs doesn't exist, attempt to create the 
path to
+    // verify it's valid and writable for scratch usage.
+    // Failure may indicate a permission or configuration issue.
     if (hdfsCreateDirectory(hdfs_conn, path_.c_str()) != 0) {
       return Status(GetHdfsErrorMsg("HDFS create path failed: ", path_));
     }
+  } else {
+    tmp_mgr->RemoveRemoteDirForHost(path_, hdfs_conn);
   }
   return Status::OK();
 }
@@ -1323,7 +1359,7 @@ void TmpFileGroup::Close() {
   }
   CloseInternal<std::unique_ptr<TmpFile>>(tmp_files_);
   CloseInternal<std::shared_ptr<TmpFile>>(tmp_files_remote_);
-  tmp_file_mgr_->RemoveRemoteDir(this, 0);
+  tmp_file_mgr_->RemoveRemoteDirForQuery(this);
   tmp_file_mgr_->scratch_bytes_used_metric_->Increment(
       -1 * scratch_space_bytes_used_counter_->value());
 }
@@ -1405,8 +1441,9 @@ Status TmpFileGroup::AllocateRemoteSpace(int64_t 
num_bytes, TmpFile** tmp_file,
   DeviceId dev_id = tmp_file_mgr_->tmp_dirs_.size();
   string unique_name = lexical_cast<string>(random_generator()());
   stringstream file_name;
-  dir = dir + "/" + PrintId(ExecEnv::GetInstance()->backend_id(), "_") + "_"
-      + PrintId(unique_id(), "_");
+  dir = ConstructRemoteDirPath(dir,
+      ExecEnv::GetInstance()->configured_backend_address().hostname,
+      PrintId(ExecEnv::GetInstance()->backend_id(), "_"), PrintId(unique_id(), 
"_"));
 
   string new_file_path = GenerateNewPath(dir, unique_name);
   const string& local_buffer_dir = tmp_file_mgr_->local_buff_dir_->path();
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index a6a3cb5a7..a345b5a8a 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -347,8 +347,13 @@ class TmpFileMgr {
   void NewFile(TmpFileGroup* file_group, DeviceId device_id,
     std::unique_ptr<TmpFile>* new_file);
 
-  /// Remove the remote directory which stores tmp files of the tmp file group.
-  void RemoveRemoteDir(TmpFileGroup* file_group, DeviceId device_id);
+  /// Remove the remote directory which stores temporary files of the tmp file 
group.
+  void RemoveRemoteDirForQuery(TmpFileGroup* file_group);
+
+  /// Remove the remote directory which stores temporary files of the entire 
host.
+  /// Called during Impala daemon startup to clean up any leftover temporary 
files.
+  /// Assumes there is only one Impala daemon process running per host.
+  void RemoveRemoteDirForHost(const string& dir, hdfsFS hdfs_conn);
 
   bool initialized_ = false;
 
diff --git a/tests/custom_cluster/test_scratch_disk.py 
b/tests/custom_cluster/test_scratch_disk.py
index 525a0d9cb..05508e3c3 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -22,6 +22,7 @@ from builtins import range
 import os
 import pytest
 import re
+import socket
 import stat
 import subprocess
 import time
@@ -529,3 +530,42 @@ class TestScratchDir(CustomClusterTestSuite):
     assert (metrics1 > 0)
     client.close_query(handle)
     client.close()
+
+  @pytest.mark.execute_serially
+  @SkipIf.not_scratch_fs
+  def test_scratch_dirs_remote_spill_leftover_files_removal(self, vector):
+    # Test remote scratch directory cleanup after Impala daemon restart.
+    normal_dirs = self.generate_dirs('scratch_dirs_remote_spill', 1)
+    normal_dirs.append(self.dfs_tmp_path())
+    self._start_impala_cluster([
+      '--impalad_args=-logbuflevel=-1 
-scratch_dirs={0}'.format(','.join(normal_dirs)),
+      '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'],
+      cluster_size=1,
+      expected_num_impalads=1)
+    self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+                                    expected_count=len(normal_dirs) - 1)
+    vector.get_value('exec_option')['buffer_pool_limit'] = 
self.buffer_pool_limit
+    impalad = self.cluster.impalads[0]
+    client = impalad.service.create_beeswax_client()
+    self.execute_query_async_using_client(client, self.spill_query_big_table, 
vector)
+    verifier = MetricVerifier(impalad.service)
+    verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
+    # Dir0 is the remote directory.
+    impalad.service.wait_for_metric_value(
+        'tmp-file-mgr.scratch-space-bytes-used.dir-0', 1, allow_greater=True)
+    impalad.kill()
+    client.close()
+    hostname = socket.gethostname()
+    # Verify that there are leftover files in the remote scratch dirs after 
being killed.
+    full_dfs_tmp_path = "{}/impala-scratch".format(self.dfs_tmp_path())
+    files_result = subprocess.check_output(["hdfs", "dfs", "-ls", 
full_dfs_tmp_path])
+    assert "Found 1 items" in files_result
+    assert hostname in files_result
+    full_dfs_tmp_path_with_hostname = "{}/{}".format(full_dfs_tmp_path, 
hostname)
+    files_result = subprocess.check_output(["hdfs", "dfs", "-ls",
+        full_dfs_tmp_path_with_hostname])
+    assert "Found 1 items" in files_result
+    impalad.start()
+    # Verify that the leftover files being removed after the impala daemon 
restarted.
+    files_result = subprocess.check_output(["hdfs", "dfs", "-ls", 
full_dfs_tmp_path])
+    assert files_result == ""

Reply via email to