This is an automated email from the ASF dual-hosted git repository.

arawat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ddd4f4f8d68addce1542d57f94c637210a090150
Author: Yida Wu <[email protected]>
AuthorDate: Tue Feb 25 12:14:29 2025 -0800

    IMPALA-13798: Cleanup host-level remote scratch dir on shutdown
    
    IMPALA-13677 introduces cleanup of remote scratch files on startup,
    and also introduces a host-level directory to do easier cleanup.
    
    An empty host-level directory in remote storage does not use
    resources, but it can stay there forever if a host goes offline
    permanently or hostname changes. To improve this behavior, this
    patch adds support for removing the remote host-level temporary
    directory on shutdown. This helps prevent too many empty
    directories left in the remote scratch path, especially for hosts
    that never restart.
    
    Changed the flag remote_scratch_cleanup_on_startup to
    remote_scratch_cleanup_on_start_stop, so that this flag also
    controls cleanup on shutdown.
    
    Tests:
    Passed exhaustive tests.
    Added an e2e testcase
    test_scratch_dirs_remote_dir_removal_on_shutdown.
    
    Change-Id: Ic8f446894afdf975630aef80a9d964a9a78d3b46
    Reviewed-on: http://gerrit.cloudera.org:8080/22549
    Reviewed-by: Daniel Becker <[email protected]>
    Reviewed-by: Abhishek Rawat <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/tmp-file-mgr-internal.h    |  9 +++++++
 be/src/runtime/tmp-file-mgr-test.cc       |  4 +--
 be/src/runtime/tmp-file-mgr.cc            | 43 +++++++++++++++++++++++++------
 be/src/runtime/tmp-file-mgr.h             |  3 +++
 be/src/service/impala-server.cc           |  3 +++
 tests/custom_cluster/test_scratch_disk.py | 35 +++++++++++++++++++++++++
 6 files changed, 87 insertions(+), 10 deletions(-)

diff --git a/be/src/runtime/tmp-file-mgr-internal.h 
b/be/src/runtime/tmp-file-mgr-internal.h
index e800636f4..c371cb1f0 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -410,6 +410,9 @@ class TmpDir {
   virtual Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
       bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) = 0;
 
+  /// Get a connection to the path of the dir. Only for the remote dir.
+  virtual Status GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* conn) = 0;
+
   int64_t bytes_limit() { return bytes_limit_; }
   int priority() { return priority_; }
   const string& path() { return path_; }
@@ -458,6 +461,10 @@ class TmpDirLocal : public TmpDir {
   TmpDirLocal(const std::string& path) : TmpDir(path) {}
   Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
       bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override;
+  Status GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* conn) override {
+    DCHECK(false) << "GetConnection() is not supported for a local temporary 
dir";
+    return Status("GetConnection() is not supported for a local temporary 
dir");
+  }
   bool is_local() override { return true; }
 
  private:
@@ -477,6 +484,7 @@ class TmpDirS3 : public TmpDir {
   TmpDirS3(const std::string& path) : TmpDir(path) {}
   Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
       bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override;
+  Status GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* conn) override;
 
  private:
   Status ParsePathTokens(std::vector<string>& tokens) override;
@@ -487,6 +495,7 @@ class TmpDirHdfs : public TmpDir {
   TmpDirHdfs(const std::string& path) : TmpDir(path) {}
   Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
       bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override;
+  Status GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* conn) override;
 
  private:
   Status ParsePathTokens(std::vector<string>& tokens) override;
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index 73b70747d..9b44d1415 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -60,7 +60,7 @@ DECLARE_string(remote_tmp_file_size);
 DECLARE_int32(wait_for_spill_buffer_timeout_s);
 DECLARE_bool(remote_batch_read);
 DECLARE_string(remote_read_memory_buffer_size);
-DECLARE_bool(remote_scratch_cleanup_on_startup);
+DECLARE_bool(remote_scratch_cleanup_on_start_stop);
 
 namespace impala {
 
@@ -1164,7 +1164,7 @@ TEST_F(TmpFileMgrTest, 
TestDirectoryLimitParsingRemotePath) {
   // Create a fake s3 connection in order to pass the connection verification.
   HdfsFsCache::HdfsFsMap fake_hdfs_conn_map;
   hdfsFS fake_conn = reinterpret_cast<hdfsFS>(1);
-  FLAGS_remote_scratch_cleanup_on_startup = false;
+  FLAGS_remote_scratch_cleanup_on_start_stop = false;
   fake_hdfs_conn_map.insert(make_pair("s3a://fake_host/", fake_conn));
   // Two types of paths, one with directory, one without.
   vector<string> s3_paths{"s3a://fake_host", "s3a://fake_host/dir"};
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 23c6b4091..393f74699 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -118,10 +118,10 @@ DEFINE_bool(remote_batch_read, false,
     "Set if the system uses batch reading for the remote temporary files. 
Batch reading"
     "allows reading a block asynchronously when the buffer pool is trying to 
pin one"
     "page of that block.");
-DEFINE_bool(remote_scratch_cleanup_on_startup, true,
+DEFINE_bool(remote_scratch_cleanup_on_start_stop, true,
     "If enabled, the Impala daemon will clean up the host-level directory 
within the "
-    "specified remote scratch directory during startup to remove potential 
leftover "
-    "files. This assumes a single Impala daemon per host. "
+    "specified remote scratch directory during both startup and shutdown to 
remove "
+    "potential leftover files. This assumes a single Impala daemon per host. "
     "For multiple daemons on a host, set this to false to prevent unintended 
cleanup.");
 
 using boost::algorithm::is_any_of;
@@ -415,6 +415,19 @@ Status TmpFileMgr::InitCustom(const vector<string>& 
tmp_dir_specifiers,
   return Status::OK();
 }
 
+void TmpFileMgr::CleanupAtShutdown() {
+  // Try to clear the host-level remote temporary directory.
+  if (tmp_dirs_remote_ == nullptr) return;
+  hdfsFS hdfs_conn;
+  Status status = tmp_dirs_remote_->GetConnection(this, &hdfs_conn);
+  if (!status.ok()) {
+    LOG(WARNING) << "Unable to get a connection to " << tmp_dirs_remote_->path_
+                 << " for clearing the directory on shutdown";
+    return;
+  }
+  RemoveRemoteDirForHost(tmp_dirs_remote_->path_, hdfs_conn);
+}
+
 Status TmpFileMgr::CreateTmpFileBufferPoolThread(MetricGroup* metrics) {
   DCHECK(metrics != nullptr);
   tmp_dirs_remote_ctrl_.tmp_file_pool_.reset(new TmpFileBufferPool(this));
@@ -459,7 +472,7 @@ static string ConstructRemoteDirPath(const string& 
base_dir, const string& hostn
 }
 
 void TmpFileMgr::RemoveRemoteDirForHost(const string& dir, hdfsFS hdfs_conn) {
-  if (!FLAGS_remote_scratch_cleanup_on_startup) return;
+  if (!FLAGS_remote_scratch_cleanup_on_start_stop) return;
   DCHECK(hdfs_conn != nullptr);
   const string hostlevel_dir = ConstructRemoteDirPath(
       dir, ExecEnv::GetInstance()->configured_backend_address().hostname);
@@ -836,6 +849,14 @@ Status TmpDirS3::ParsePathTokens(vector<string>& toks) {
   return Status::OK();
 }
 
+Status TmpDirS3::GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* hdfs_conn) {
+  DCHECK(tmp_mgr != nullptr);
+  DCHECK(!path_.empty());
+  DCHECK(hdfs_conn != nullptr);
+  return HdfsFsCache::instance()->GetConnection(
+      path_, hdfs_conn, &(tmp_mgr->hdfs_conns_), tmp_mgr->s3a_options());
+}
+
 Status TmpDirS3::VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
     bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) {
   // For the S3 path, it doesn't need to create the directory for the uploading
@@ -843,8 +864,7 @@ Status TmpDirS3::VerifyAndCreate(MetricGroup* metrics, 
vector<bool>* is_tmp_dir_
   DCHECK(tmp_mgr != nullptr);
   DCHECK(!path_.empty());
   hdfsFS hdfs_conn;
-  RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
-      path_, &hdfs_conn, &(tmp_mgr->hdfs_conns_), tmp_mgr->s3a_options()));
+  RETURN_IF_ERROR(GetConnection(tmp_mgr, &hdfs_conn));
   tmp_mgr->RemoveRemoteDirForHost(path_, hdfs_conn);
   return Status::OK();
 }
@@ -870,12 +890,19 @@ Status TmpDirHdfs::ParsePathTokens(vector<string>& toks) {
   return Status::OK();
 }
 
+Status TmpDirHdfs::GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* hdfs_conn) {
+  DCHECK(tmp_mgr != nullptr);
+  DCHECK(!path_.empty());
+  DCHECK(hdfs_conn != nullptr);
+  return HdfsFsCache::instance()->GetConnection(
+      path_, hdfs_conn, &(tmp_mgr->hdfs_conns_));
+}
+
 Status TmpDirHdfs::VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
     bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) {
   DCHECK(!path_.empty());
   hdfsFS hdfs_conn;
-  RETURN_IF_ERROR(
-      HdfsFsCache::instance()->GetConnection(path_, &hdfs_conn, 
&(tmp_mgr->hdfs_conns_)));
+  RETURN_IF_ERROR(GetConnection(tmp_mgr, &hdfs_conn));
   if (hdfsExists(hdfs_conn, path_.c_str()) != 0) {
     // If the impala scratch path in hdfs doesn't exist, attempt to create the 
path to
     // verify it's valid and writable for scratch usage.
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index a345b5a8a..e9d929d81 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -205,6 +205,9 @@ class TmpFileMgr {
   // Create the TmpFile buffer pool thread for async buffer file reservation.
   Status CreateTmpFileBufferPoolThread(MetricGroup* metrics) 
WARN_UNUSED_RESULT;
 
+  // Clean up the temporary directory during shutdown if needed.
+  void CleanupAtShutdown();
+
   /// Try to reserve space for the buffer file from local buffer directory.
   /// If quick_return is true, the function won't wait if there is no 
available space.
   Status ReserveLocalBufferSpace(bool quick_return) WARN_UNUSED_RESULT;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index bd70838c4..55b5a1ebc 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -3564,6 +3564,9 @@ bool ImpalaServer::CancelQueriesForGracefulShutdown() {
     }
   }
 
+  // Clean up temporary files if needed.
+  ExecEnv::GetInstance()->tmp_file_mgr()->CleanupAtShutdown();
+
   // Drain the completed queries queue to the query log table.
   if (FLAGS_enable_workload_mgmt) {
     ShutdownWorkloadManagement();
diff --git a/tests/custom_cluster/test_scratch_disk.py 
b/tests/custom_cluster/test_scratch_disk.py
index 05508e3c3..05bf1e410 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -569,3 +569,38 @@ class TestScratchDir(CustomClusterTestSuite):
     # Verify that the leftover files being removed after the impala daemon 
restarted.
     files_result = subprocess.check_output(["hdfs", "dfs", "-ls", 
full_dfs_tmp_path])
     assert files_result == ""
+
+  @pytest.mark.execute_serially
+  @SkipIf.not_scratch_fs
+  def test_scratch_dirs_remote_dir_removal_on_shutdown(self, vector):
+    # Test remote scratch directory cleanup on Impala daemon shutdown.
+    normal_dirs = self.generate_dirs('scratch_dirs_remote_spill', 1)
+    normal_dirs.append(self.dfs_tmp_path())
+    self._start_impala_cluster([
+      '--impalad_args=-logbuflevel=-1 
-scratch_dirs={0}'.format(','.join(normal_dirs)),
+      '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
+      '--impalad_args=--shutdown_grace_period_s=5',
+      '--impalad_args=--shutdown_deadline_s=10'],
+      cluster_size=1,
+      expected_num_impalads=1)
+    self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+                                    expected_count=len(normal_dirs) - 1)
+    vector.get_value('exec_option')['buffer_pool_limit'] = 
self.buffer_pool_limit
+    impalad = self.cluster.impalads[0]
+    client = impalad.service.create_beeswax_client()
+    self.execute_query_async_using_client(client, self.spill_query_big_table, 
vector)
+    verifier = MetricVerifier(impalad.service)
+    verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
+    # Dir0 is the remote directory.
+    impalad.service.wait_for_metric_value(
+        'tmp-file-mgr.scratch-space-bytes-used.dir-0', 1, allow_greater=True)
+    # Shut down the impalad.
+    SHUTDOWN = ": shutdown()"
+    self.execute_query_expect_success(client, SHUTDOWN)
+    impalad.wait_for_exit()
+    client.close()
+    # Verify that no host-level dir in the remote scratch dirs after shutdown.
+    full_dfs_tmp_path = "{}/impala-scratch".format(self.dfs_tmp_path())
+    files_result = subprocess.check_output(["hdfs", "dfs", "-ls", 
full_dfs_tmp_path])
+    assert files_result == ""
+    impalad.start()

Reply via email to