This is an automated email from the ASF dual-hosted git repository.

bikram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5e3d4391906b09adb97ba6d036811aaf81f5a403
Author: Yida Wu <[email protected]>
AuthorDate: Fri Jul 23 05:51:31 2021 -0700

    IMPALA-10862 Optimization of the code structure of TmpDir
    
    Currently the logic of initialization of TmpFileMgr is a bit tedious.
    
    This patch simplifies TmpFileMgr::InitCustom() by refactoring parsing
    and validation logic.
    
    The patch adds TmpDirLocal, TmpDirHdfs and TmpDirS3 to inherit TmpDir
    to implement their own logic to parse and validate. It enables easier
    addition of custom logic for future filesystems.
    
    All changes only affect the interfaces within the TmpFileMgr module,
    and the main logic of the scratch directory parsing and verification
    doesn't change.
    
    Tests:
    Ran the Core tests and exhaustive e2e tests.
    
    Because the current testcases of TmpFileMgrTest already cover the
    TmpDir parsing and verification, no testcases may need to be added
    for this structure optimization. Added some S3 directory parsing
    failure testcases.
    
    Change-Id: I52971238d5841a1cdfee06b38750f9dc99a6a2be
    Reviewed-on: http://gerrit.cloudera.org:8080/17778
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/tmp-file-mgr-internal.h | 114 +++++++-
 be/src/runtime/tmp-file-mgr-test.cc    | 130 ++++----
 be/src/runtime/tmp-file-mgr.cc         | 521 ++++++++++++++++++---------------
 be/src/runtime/tmp-file-mgr.h          |  50 +---
 4 files changed, 487 insertions(+), 328 deletions(-)

diff --git a/be/src/runtime/tmp-file-mgr-internal.h 
b/be/src/runtime/tmp-file-mgr-internal.h
index 1827e4f..cbdebec 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -27,6 +27,7 @@
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/io/file-writer.h"
 #include "runtime/tmp-file-mgr.h"
+#include "util/hdfs-util.h"
 
 namespace impala {
 
@@ -94,7 +95,7 @@ class TmpFile {
   std::string DebugString();
 
   /// Helper to get the TmpDir that this file is associated with.
-  TmpFileMgr::TmpDir* GetDir();
+  TmpDir* GetDir();
 
   /// Helper to get the TmpFileGroup that this file is associated with.
   TmpFileGroup* FileGroup() const { return file_group_; }
@@ -213,7 +214,7 @@ class TmpFileRemote : public TmpFile {
 
   bool AllocateSpace(int64_t num_bytes, int64_t* offset);
   io::DiskFile* GetWriteFile();
-  TmpFileMgr::TmpDir* GetLocalBufferDir() const;
+  TmpDir* GetLocalBufferDir() const;
   Status Remove();
 
   /// Returns the size of the file.
@@ -308,6 +309,115 @@ class TmpFileDummy : public TmpFile {
   Status Remove() { return Status::OK(); }
 };
 
+/// A configured temporary directory that TmpFileMgr allocates files in.
+class TmpDir {
+ public:
+  TmpDir(const std::string& raw_path, const std::string& prefix, bool 
is_local);
+  virtual ~TmpDir() {}
+
+  /// Parse the raw path and identify the scratch directory options.
+  virtual Status Parse();
+
+  /// Verify the scratch path and create the directory.
+  virtual Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
+      bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) = 0;
+
+  int64_t bytes_limit() { return bytes_limit_; }
+  int priority() { return priority_; }
+  const string& path() { return path_; }
+  IntGauge* bytes_used_metric() const { return bytes_used_metric_; }
+  bool is_local() { return is_local_dir_; }
+
+ private:
+  friend class TmpFileMgr;
+  friend class TmpDirHdfs;
+  friend class TmpDirS3;
+  friend class TmpDirLocal;
+
+  /// Raw path of the temporary directory.
+  const std::string raw_path_;
+
+  /// Parsed raw path of the temporary directory, e.g, trimmed.
+  std::string parsed_raw_path_;
+
+  /// The prefix of the path.
+  std::string prefix_;
+
+  /// The complete path to the temporary directory.
+  std::string path_;
+
+  /// Limit on bytes that should be written to this path. Set to maximum value
+  /// of int64_t if there is no limit.
+  int64_t bytes_limit_;
+
+  /// Scratch directory priority.
+  int priority_;
+
+  /// The current bytes of scratch used for this temporary directory.
+  IntGauge* bytes_used_metric_;
+
+  /// If the dir is expected in the local file system or in the remote.
+  const bool is_local_dir_;
+
+  /// Indicate if the TmpDir is parsed.
+  bool is_parsed_;
+
+  /// Return the directory path by parsing the input tokens.
+  /// "Path" is the path generated from the tokens.
+  /// "Offset" indicates the number of elements has been read in the tokens.
+  virtual Status GetPathFromToks(
+      const std::vector<string>& tokens, string* path, int* offset);
+
+  /// A helper function for ParseTokens() to parse the raw path and generate 
the complete
+  /// path of the scratch directory.
+  /// "Offset" indicates the number of elements has been read in the tokens.
+  Status ParsePath(const std::vector<string>& tokens, int* offset);
+
+  /// A helper function for ParseTokens() to parse the byte limit of the 
scratch
+  /// directory. "Index" indicates the position of the byte_limit in the 
tokens.
+  Status ParseByteLimit(const std::vector<string>& tokens, int index);
+
+  /// A helper function for ParseTokens() to parse the priorify of the scratch 
directory.
+  /// "Index" indicates the position of the priority in the tokens.
+  Status ParsePriority(const std::vector<string>& tokens, int index);
+
+  /// A helper function for Parse() to parse raw input of the scratch 
directory.
+  Status ParseTokens();
+};
+
+class TmpDirLocal : public TmpDir {
+ public:
+  TmpDirLocal(const std::string& path) : TmpDir(path, "", true /*is_local*/) {}
+  Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
+      bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override;
+
+ private:
+  /// A helper function for VerifyAndCreate() to create a local scratch 
directory.
+  Status CreateLocalDirectory(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
+      bool need_local_buffer_dir, int disk_id, TmpFileMgr* tmp_mgr);
+};
+
+class TmpDirS3 : public TmpDir {
+ public:
+  TmpDirS3(const std::string& path)
+    : TmpDir(path, FILESYS_PREFIX_S3, false /*is_local*/) {}
+  Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
+      bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override;
+};
+
+class TmpDirHdfs : public TmpDir {
+ public:
+  TmpDirHdfs(const std::string& path)
+    : TmpDir(path, FILESYS_PREFIX_HDFS, false /*is_local*/) {}
+  Status Parse() override;
+  Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
+      bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override;
+
+ private:
+  virtual Status GetPathFromToks(
+      const std::vector<string>& tokens, string* path, int* offset) override;
+};
+
 /// Temporary file buffer pool allows the temporary files to return their 
buffer to the
 /// pool and can be evicted to make room for other files. The pool also 
provides an async
 /// way for the write ranges to wait until there is an available space to 
reserve before
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index fcc4ea8..57cd9a7 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -171,13 +171,13 @@ class TmpFileMgrTest : public ::testing::Test {
   }
 
   /// Helper to get the private tmp_dirs_ member.
-  static const vector<TmpFileMgr::TmpDir>& GetTmpDirs(TmpFileMgr* mgr) {
+  static const vector<unique_ptr<TmpDir>>& GetTmpDirs(TmpFileMgr* mgr) {
     return mgr->tmp_dirs_;
   }
 
   /// Helper to get the private tmp_remote_dirs_ pointer.
-  static const TmpFileMgr::TmpDir* GetTmpRemoteDir(TmpFileMgr* mgr) {
-    return mgr->tmp_dirs_remote_.get();
+  static const unique_ptr<TmpDir>& GetTmpRemoteDir(TmpFileMgr* mgr) {
+    return mgr->tmp_dirs_remote_;
   }
 
   /// Helper to call the private TmpFileMgr::NewFile() method.
@@ -211,7 +211,7 @@ class TmpFileMgrTest : public ::testing::Test {
   /// Helper to set an invalid remote path to create an error.
   static void SetInvalidRemotePath(
       TmpFileMgr* tmp_file_mgr, TmpFileGroup* group, int tmp_file_idx) {
-    string dir = tmp_file_mgr->tmp_dirs_remote_->path;
+    const string& dir = tmp_file_mgr->tmp_dirs_remote_->path();
     int dev_id = 0;
     string invalid_path = "";
     auto tmp_file_shared_ptr = group->tmp_files_remote_[tmp_file_idx];
@@ -968,12 +968,12 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) {
                        "/tmp/tmp-file-mgr-test5:200tb:5,"
                        "/tmp/tmp-file-mgr-test6:100MB:6"));
   EXPECT_EQ(6, dirs.size());
-  EXPECT_EQ(5 * GIGABYTE, dirs[0].bytes_limit);
-  EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1].bytes_limit);
-  EXPECT_EQ(1234, dirs[2].bytes_limit);
-  EXPECT_EQ(99999999, dirs[3].bytes_limit);
-  EXPECT_EQ(200 * TERABYTE, dirs[4].bytes_limit);
-  EXPECT_EQ(100 * MEGABYTE, dirs[5].bytes_limit);
+  EXPECT_EQ(5 * GIGABYTE, dirs[0]->bytes_limit());
+  EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1]->bytes_limit());
+  EXPECT_EQ(1234, dirs[2]->bytes_limit());
+  EXPECT_EQ(99999999, dirs[3]->bytes_limit());
+  EXPECT_EQ(200 * TERABYTE, dirs[4]->bytes_limit());
+  EXPECT_EQ(100 * MEGABYTE, dirs[5]->bytes_limit());
 
   // Various invalid limit formats result in the directory getting skipped.
   // Include a valid dir on the end to ensure that we don't short-circuit all
@@ -984,8 +984,8 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) {
                        
"/tmp/tmp-file-mgr-test5:5pb,/tmp/tmp-file-mgr-test6:10%,"
                        "/tmp/tmp-file-mgr-test1:100"));
   EXPECT_EQ(1, dirs2.size());
-  EXPECT_EQ("/tmp/tmp-file-mgr-test1/impala-scratch", dirs2[0].path);
-  EXPECT_EQ(100, dirs2[0].bytes_limit);
+  EXPECT_EQ("/tmp/tmp-file-mgr-test1/impala-scratch", dirs2[0]->path());
+  EXPECT_EQ(100, dirs2[0]->bytes_limit());
 
   // Various valid ways of specifying "unlimited".
   auto& dirs3 =
@@ -993,7 +993,7 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) {
                                   
"/tmp/tmp-file-mgr-test3,/tmp/tmp-file-mgr-test4:0"));
   EXPECT_EQ(4, dirs3.size());
   for (const auto& dir : dirs3) {
-    EXPECT_EQ(numeric_limits<int64_t>::max(), dir.bytes_limit);
+    EXPECT_EQ(numeric_limits<int64_t>::max(), dir->bytes_limit());
   }
 
   // Extra colons
@@ -1015,36 +1015,36 @@ TEST_F(TmpFileMgrTest, 
TestDirectoryLimitParsingRemotePath) {
       "/tmp/local-buffer-dir2", "/tmp/local-buffer-dir3"});
 
   // Successful cases for HDFS paths.
-  auto dirs1 = GetTmpRemoteDir(
+  auto& dirs1 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost:20500/tmp,/tmp/local-buffer-dir"));
   EXPECT_NE(nullptr, dirs1);
 
-  auto dirs2 = GetTmpRemoteDir(
+  auto& dirs2 = GetTmpRemoteDir(
       
CreateTmpFileMgr("hdfs://localhost:20500/tmp:100,/tmp/local-buffer-dir"));
   EXPECT_NE(nullptr, dirs2);
-  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs2->path);
-  EXPECT_EQ(100, dirs2->bytes_limit);
+  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs2->path());
+  EXPECT_EQ(100, dirs2->bytes_limit());
 
-  auto dirs3 = GetTmpRemoteDir(
+  auto& dirs3 = GetTmpRemoteDir(
       
CreateTmpFileMgr("hdfs://localhost:20500/tmp:1KB:1,/tmp/local-buffer-dir"));
   EXPECT_NE(nullptr, dirs3);
-  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs3->path);
-  EXPECT_EQ(1024, dirs3->bytes_limit);
+  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs3->path());
+  EXPECT_EQ(1024, dirs3->bytes_limit());
 
   // Multiple local paths with one remote path.
   auto tmp_mgr_4 = 
CreateTmpFileMgr("hdfs://localhost:20500/tmp,/tmp/local-buffer-dir1,"
                                     
"/tmp/local-buffer-dir2,/tmp/local-buffer-dir3");
-  auto dirs4_local = GetTmpDirs(tmp_mgr_4);
-  auto dirs4_remote = GetTmpRemoteDir(tmp_mgr_4);
+  auto& dirs4_local = GetTmpDirs(tmp_mgr_4);
+  auto& dirs4_remote = GetTmpRemoteDir(tmp_mgr_4);
   EXPECT_NE(nullptr, dirs4_remote);
   EXPECT_EQ(2, dirs4_local.size());
-  EXPECT_EQ("/tmp/local-buffer-dir2/impala-scratch", dirs4_local[0].path);
-  EXPECT_EQ("/tmp/local-buffer-dir3/impala-scratch", dirs4_local[1].path);
+  EXPECT_EQ("/tmp/local-buffer-dir2/impala-scratch", dirs4_local[0]->path());
+  EXPECT_EQ("/tmp/local-buffer-dir3/impala-scratch", dirs4_local[1]->path());
 
   // Fails the parsing due to no port number for the HDFS path.
   auto tmp_mgr_5 = 
CreateTmpFileMgr("hdfs://localhost/tmp,/tmp/local-buffer-dir");
-  auto dirs5_local = GetTmpDirs(tmp_mgr_5);
-  auto dirs5_remote = GetTmpRemoteDir(tmp_mgr_5);
+  auto& dirs5_local = GetTmpDirs(tmp_mgr_5);
+  auto& dirs5_remote = GetTmpRemoteDir(tmp_mgr_5);
   EXPECT_EQ(1, dirs5_local.size());
   EXPECT_EQ(nullptr, dirs5_remote);
 
@@ -1054,60 +1054,76 @@ TEST_F(TmpFileMgrTest, 
TestDirectoryLimitParsingRemotePath) {
 
   // Parse successfully, but the parsed HDFS path is unable to connect.
   // These cases would fail the initialization of TmpFileMgr.
-  auto dirs7 = GetTmpRemoteDir(
+  auto& dirs7 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost:1/tmp::1,/tmp/local-buffer-dir", 
false));
   EXPECT_EQ(nullptr, dirs7);
 
-  auto dirs8 = GetTmpRemoteDir(
+  auto& dirs8 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost:/tmp::,/tmp/local-buffer-dir", 
false));
   EXPECT_EQ(nullptr, dirs8);
 
-  auto dirs9 = GetTmpRemoteDir(
+  auto& dirs9 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost/tmp::1,/tmp/local-buffer-dir", 
false));
   EXPECT_EQ(nullptr, dirs9);
 
-  auto dirs10 = GetTmpRemoteDir(
+  auto& dirs10 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost/tmp:1,/tmp/local-buffer-dir", false));
   EXPECT_EQ(nullptr, dirs10);
 
   // Multiple remote paths, should support only one.
-  auto dirs11 = GetTmpRemoteDir(
+  auto& dirs11 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost:20500/tmp,hdfs://localhost:20501/tmp,"
                        "/tmp/local-buffer-dir"));
   EXPECT_NE(nullptr, dirs11);
-  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs11->path);
+  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs11->path());
 
   // The order of the buffer and the remote dir should not affect the result.
-  auto dirs12 = GetTmpRemoteDir(
+  auto& dirs12 = GetTmpRemoteDir(
       CreateTmpFileMgr("/tmp/local-buffer-dir, hdfs://localhost:20500/tmp,"
                        "hdfs://localhost:20501/tmp"));
   EXPECT_NE(nullptr, dirs12);
-  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs12->path);
+  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs12->path());
 
   // Successful cases for parsing S3 paths.
   // Create a fake s3 connection in order to pass the connection verification.
   HdfsFsCache::HdfsFsMap fake_hdfs_conn_map;
   hdfsFS fake_conn = reinterpret_cast<hdfsFS>(1);
   fake_hdfs_conn_map.insert(make_pair("s3a://fake_host/", fake_conn));
-  auto dirs13 = GetTmpRemoteDir(
+  auto& dirs13 = GetTmpRemoteDir(
       CreateTmpFileMgr("/tmp/local-buffer-dir, 
s3a://fake_host/for-parsing-test-only",
           true, &fake_hdfs_conn_map));
   EXPECT_NE(nullptr, dirs13);
-  EXPECT_EQ("s3a://fake_host/for-parsing-test-only/impala-scratch", 
dirs13->path);
+  EXPECT_EQ("s3a://fake_host/for-parsing-test-only/impala-scratch", 
dirs13->path());
 
-  auto dirs14 = GetTmpRemoteDir(
+  auto& dirs14 = GetTmpRemoteDir(
       CreateTmpFileMgr("/tmp/local-buffer-dir, 
s3a://fake_host/for-parsing-test-only:100",
           true, &fake_hdfs_conn_map));
   EXPECT_NE(nullptr, dirs14);
-  EXPECT_EQ("s3a://fake_host/for-parsing-test-only/impala-scratch", 
dirs14->path);
-  EXPECT_EQ(100, dirs14->bytes_limit);
+  EXPECT_EQ("s3a://fake_host/for-parsing-test-only/impala-scratch", 
dirs14->path());
+  EXPECT_EQ(100, dirs14->bytes_limit());
 
-  auto dirs15 = GetTmpRemoteDir(CreateTmpFileMgr(
+  auto& dirs15 = GetTmpRemoteDir(CreateTmpFileMgr(
       "/tmp/local-buffer-dir, s3a://fake_host/for-parsing-test-only:1KB:1", 
true,
       &fake_hdfs_conn_map));
   EXPECT_NE(nullptr, dirs15);
-  EXPECT_EQ("s3a://fake_host/for-parsing-test-only/impala-scratch", 
dirs15->path);
-  EXPECT_EQ(1024, dirs15->bytes_limit);
+  EXPECT_EQ("s3a://fake_host/for-parsing-test-only/impala-scratch", 
dirs15->path());
+  EXPECT_EQ(1024, dirs15->bytes_limit());
+
+  // Failure cases for parsing S3 paths.
+  auto& dirs16 = GetTmpRemoteDir(CreateTmpFileMgr(
+      "/tmp/local-buffer-dir, 
s3a://fake_host:1234/for-parsing-test-only:1KB:1", true,
+      &fake_hdfs_conn_map));
+  EXPECT_EQ(nullptr, dirs16);
+
+  auto& dirs17 = GetTmpRemoteDir(CreateTmpFileMgr(
+      "/tmp/local-buffer-dir, s3a://fake_host:1234/for-parsing-test-only:1KB", 
true,
+      &fake_hdfs_conn_map));
+  EXPECT_EQ(nullptr, dirs17);
+
+  auto& dirs18 = GetTmpRemoteDir(CreateTmpFileMgr(
+      "/tmp/local-buffer-dir, s3a://fake_host:1234/for-parsing-test-only", 
true,
+      &fake_hdfs_conn_map));
+  EXPECT_EQ(nullptr, dirs18);
 }
 
 // Test compression buffer memory management for reads and writes.
@@ -1268,18 +1284,18 @@ TEST_F(TmpFileMgrTest, TestDirectoryPriorityParsing) {
                        
"/tmp/tmp-file-mgr-test2::2,/tmp/tmp-file-mgr-test4:99999999:4,"
                        
"/tmp/tmp-file-mgr-test5:200tb:5,/tmp/tmp-file-mgr-test1:5g:1"));
   EXPECT_EQ(6, dirs.size());
-  EXPECT_EQ(5 * GIGABYTE, dirs[0].bytes_limit);
-  EXPECT_EQ(1, dirs[0].priority);
-  EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1].bytes_limit);
-  EXPECT_EQ(2, dirs[1].priority);
-  EXPECT_EQ(1234, dirs[2].bytes_limit);
-  EXPECT_EQ(3, dirs[2].priority);
-  EXPECT_EQ(99999999, dirs[3].bytes_limit);
-  EXPECT_EQ(4, dirs[3].priority);
-  EXPECT_EQ(200 * TERABYTE, dirs[4].bytes_limit);
-  EXPECT_EQ(5, dirs[4].priority);
-  EXPECT_EQ(100 * MEGABYTE, dirs[5].bytes_limit);
-  EXPECT_EQ(numeric_limits<int>::max(), dirs[5].priority);
+  EXPECT_EQ(5 * GIGABYTE, dirs[0]->bytes_limit());
+  EXPECT_EQ(1, dirs[0]->priority());
+  EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1]->bytes_limit());
+  EXPECT_EQ(2, dirs[1]->priority());
+  EXPECT_EQ(1234, dirs[2]->bytes_limit());
+  EXPECT_EQ(3, dirs[2]->priority());
+  EXPECT_EQ(99999999, dirs[3]->bytes_limit());
+  EXPECT_EQ(4, dirs[3]->priority());
+  EXPECT_EQ(200 * TERABYTE, dirs[4]->bytes_limit());
+  EXPECT_EQ(5, dirs[4]->priority());
+  EXPECT_EQ(100 * MEGABYTE, dirs[5]->bytes_limit());
+  EXPECT_EQ(numeric_limits<int>::max(), dirs[5]->priority());
 
   // Various invalid limit formats result in the directory getting skipped.
   // Include a valid dir on the end to ensure that we don't short-circuit all
@@ -1290,9 +1306,9 @@ TEST_F(TmpFileMgrTest, TestDirectoryPriorityParsing) {
                        
"/tmp/tmp-file-mgr-test5::p0,/tmp/tmp-file-mgr-test6::10%,"
                        "/tmp/tmp-file-mgr-test1:100:-1"));
   EXPECT_EQ(1, dirs2.size());
-  EXPECT_EQ("/tmp/tmp-file-mgr-test1/impala-scratch", dirs2[0].path);
-  EXPECT_EQ(100, dirs2[0].bytes_limit);
-  EXPECT_EQ(-1, dirs2[0].priority);
+  EXPECT_EQ("/tmp/tmp-file-mgr-test1/impala-scratch", dirs2[0]->path());
+  EXPECT_EQ(100, dirs2[0]->bytes_limit());
+  EXPECT_EQ(-1, dirs2[0]->priority());
 }
 
 // Tests that when TmpFileGroup is constructed, the priority based index 
ranges are
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index c18463f..d494284 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -154,7 +154,6 @@ static const Status& TMP_FILE_BUFFER_POOL_CONTEXT_CANCELLED 
=
     Status::CancelledInternal("TmpFileBufferPool");
 
 using DeviceId = TmpFileMgr::DeviceId;
-using TmpDir = TmpFileMgr::TmpDir;
 using WriteDoneCallback = TmpFileMgr::WriteDoneCallback;
 
 TmpFileMgr::TmpFileMgr() {}
@@ -187,6 +186,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& 
tmp_dir_specifiers,
     MetricGroup* metrics) {
   DCHECK(!initialized_);
   punch_holes_ = punch_holes;
+  one_dir_per_device_ = one_dir_per_device;
   if (tmp_dir_specifiers.empty()) {
     LOG(WARNING) << "Running without spill to disk: no scratch directories 
provided.";
   }
@@ -249,41 +249,31 @@ Status TmpFileMgr::InitCustom(const vector<string>& 
tmp_dir_specifiers,
   // warning - we don't want to abort process startup because of misconfigured 
scratch,
   // since queries will generally still be runnable.
   for (const string& tmp_dir_spec : tmp_dir_specifiers) {
-    string tmp_dirs_without_prefix, prefix;
     string 
tmp_dir_spec_trimmed(boost::algorithm::trim_left_copy(tmp_dir_spec));
-    bool is_hdfs = false;
-    bool is_remote = false;
+    std::unique_ptr<TmpDir> tmp_dir;
 
     if (IsHdfsPath(tmp_dir_spec_trimmed.c_str(), false)) {
-      prefix = FILESYS_PREFIX_HDFS;
-      tmp_dirs_without_prefix = 
tmp_dir_spec_trimmed.substr(strlen(FILESYS_PREFIX_HDFS));
-      is_hdfs = true;
-      is_remote = true;
+      tmp_dir = std::make_unique<TmpDirHdfs>(tmp_dir_spec_trimmed);
     } else if (IsS3APath(tmp_dir_spec_trimmed.c_str(), false)) {
-      prefix = FILESYS_PREFIX_S3;
-      tmp_dirs_without_prefix = 
tmp_dir_spec_trimmed.substr(strlen(FILESYS_PREFIX_S3));
-      is_remote = true;
       // Initialize the S3 options for later getting S3 connection.
       s3a_options_ = {make_pair("fs.s3a.fast.upload", "true"),
           make_pair("fs.s3a.fast.upload.buffer", "disk")};
-    } else {
+      tmp_dir = std::make_unique<TmpDirS3>(tmp_dir_spec_trimmed);
+    } else if (IsGcsPath(tmp_dir_spec_trimmed.c_str(), false)) {
       // TODO(IMPALA-10561): Add support for spilling to GCS
-      prefix = "";
-      tmp_dirs_without_prefix = tmp_dir_spec_trimmed.substr(0);
+    } else {
+      tmp_dir = std::make_unique<TmpDirLocal>(tmp_dir_spec_trimmed);
     }
 
-    string parsed_path;
-    int64_t bytes_limit = numeric_limits<int64_t>::max();
-    int priority = numeric_limits<int>::max();
-    Status parse_status = ParseScratchPathToks(tmp_dir_spec, 
tmp_dirs_without_prefix,
-        is_hdfs, &parsed_path, &bytes_limit, &priority);
+    DCHECK(tmp_dir != nullptr);
+    Status parse_status = tmp_dir->Parse();
     if (!parse_status.ok()) {
       LOG(WARNING) << "Directory " << tmp_dir_spec.c_str() << " is not used 
because "
                    << parse_status.msg().msg();
       continue;
     }
 
-    if (is_remote) {
+    if (!tmp_dir->is_local()) {
       // Set the flag to reserve a local dir for buffer.
       // If the flag has been set, meaning that there is already one remote dir
       // registered, since we only support one remote dir, this remote dir 
will be
@@ -296,78 +286,38 @@ Status TmpFileMgr::InitCustom(const vector<string>& 
tmp_dir_specifiers,
         need_local_buffer_dir = true;
       }
     }
-
-    tmp_dirs.emplace_back(
-        new TmpDir(prefix.append(parsed_path), bytes_limit, priority, 
nullptr));
+    tmp_dirs.emplace_back(move(tmp_dir));
   }
 
   vector<bool> is_tmp_dir_on_disk(DiskInfo::num_disks(), false);
   // For each tmp directory, find the disk it is on,
   // so additional tmp directories on the same disk can be skipped.
   for (int i = 0; i < tmp_dirs.size(); ++i) {
-    Status status;
-    path tmp_path(trim_right_copy_if(tmp_dirs[i]->path, is_any_of("/")));
-    bool is_hdfs_path = IsHdfsPath(tmp_path.c_str(), false);
-    bool is_s3a_path = IsS3APath(tmp_path.c_str(), false);
-    if (is_hdfs_path || is_s3a_path) {
-      // Should be only one remote dir.
-      DCHECK(!HasRemoteDir());
-      path scratch_subdir_path(tmp_path / TMP_SUB_DIR_NAME);
-      string scratch_subdir_path_str = scratch_subdir_path.string();
-      hdfsFS hdfs_conn;
-      // If the HDFS path doesn't exist, it would fail while uploading, so we
-      // create the HDFS path if it doesn't exist.
-      // For the S3 path, it doesn't need to create the directory for the 
uploading
-      // as long as the S3 address is correct.
-      if (is_hdfs_path) {
-        RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
-            tmp_path.string(), &hdfs_conn, &hdfs_conns_));
-        if (hdfsExists(hdfs_conn, scratch_subdir_path_str.c_str()) != 0) {
-          if (hdfsCreateDirectory(hdfs_conn, scratch_subdir_path_str.c_str()) 
!= 0) {
-            return Status(
-                GetHdfsErrorMsg("HDFS create path failed: ", 
scratch_subdir_path_str));
-          }
-        }
+    Status status = tmp_dirs[i]->VerifyAndCreate(
+        metrics, &is_tmp_dir_on_disk, need_local_buffer_dir, this);
+    if (!status.ok()) {
+      // If the remote directory fails to verify or create, return the error.
+      if (!tmp_dirs[i]->is_local()) return status;
+      // If it is the local directory, continue to try next directory.
+      continue;
+    }
+    if (tmp_dirs[i]->is_local()) {
+      if (need_local_buffer_dir) {
+        local_buff_dir_ = move(tmp_dirs[i]);
+        need_local_buffer_dir = false;
       } else {
-        DCHECK(is_s3a_path);
-        RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
-            tmp_path.string(), &hdfs_conn, &hdfs_conns_, s3a_options()));
+        tmp_dirs_.emplace_back(move(tmp_dirs[i]));
       }
-      tmp_dirs_remote_ =
-          std::make_unique<TmpDir>(scratch_subdir_path_str, 
tmp_dirs[i]->bytes_limit,
-              tmp_dirs[i]->priority, tmp_dirs[i]->bytes_used_metric, false);
     } else {
-      tmp_path = absolute(tmp_path);
-      path scratch_subdir_path(tmp_path / TMP_SUB_DIR_NAME);
-      // tmp_path must be a writable directory.
-      status = FileSystemUtil::VerifyIsDirectory(tmp_path.string());
-      if (!status.ok()) {
-        LOG(WARNING) << "Cannot use directory " << tmp_path.string()
-                     << " for scratch: " << status.msg().msg();
-        continue;
-      }
-
-      // Find the disk id of tmp_path. Add the scratch directory if there 
isn't another
-      // directory on the same disk (or if we don't know which disk it is on).
-      int disk_id = DiskInfo::disk_id(tmp_path.c_str());
-      if (!one_dir_per_device || disk_id < 0 || !is_tmp_dir_on_disk[disk_id]) {
-        uint64_t available_space;
-        RETURN_IF_ERROR(
-            FileSystemUtil::GetSpaceAvailable(tmp_path.string(), 
&available_space));
-        if (available_space < AVAILABLE_SPACE_THRESHOLD_MB * 1024 * 1024) {
-          LOG(WARNING) << "Filesystem containing scratch directory " << 
tmp_path
-                       << " has less than " << AVAILABLE_SPACE_THRESHOLD_MB
-                       << "MB available.";
-        }
-        RETURN_IF_ERROR(CreateDirectory(scratch_subdir_path.string(), 
tmp_path.string(),
-            tmp_dirs[i], metrics, &is_tmp_dir_on_disk, &need_local_buffer_dir, 
disk_id));
-      }
+      tmp_dirs_remote_ = move(tmp_dirs[i]);
     }
   }
 
   // Sort the tmp directories by priority.
   std::sort(tmp_dirs_.begin(), tmp_dirs_.end(),
-      [](const TmpDir& a, const TmpDir& b) { return a.priority < b.priority; 
});
+      [](const std::unique_ptr<TmpDir>& a, const std::unique_ptr<TmpDir>& b) {
+        return a->priority_ < b->priority_;
+      });
 
   if (HasRemoteDir()) {
     if (local_buff_dir_ == nullptr) {
@@ -375,13 +325,13 @@ Status TmpFileMgr::InitCustom(const vector<string>& 
tmp_dir_specifiers,
       // s3 fast upload directly without a buffer.
       return Status(
           Substitute("No local directory configured for remote scratch space:  
$0",
-              tmp_dirs_remote_->path));
+              tmp_dirs_remote_->path_));
     } else {
-      LOG(INFO) << "Using scratch directory " << tmp_dirs_remote_->path
-                << " limit: " << 
PrettyPrinter::PrintBytes(tmp_dirs_remote_->bytes_limit);
+      LOG(INFO) << "Using scratch directory " << tmp_dirs_remote_->path_ << " 
limit: "
+                << PrettyPrinter::PrintBytes(tmp_dirs_remote_->bytes_limit_);
       IntGauge* bytes_used_metric = metrics->AddGauge(
           SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", 
tmp_dirs_.size()));
-      tmp_dirs_remote_->bytes_used_metric = bytes_used_metric;
+      tmp_dirs_remote_->bytes_used_metric_ = bytes_used_metric;
     }
   }
 
@@ -396,10 +346,10 @@ Status TmpFileMgr::InitCustom(const vector<string>& 
tmp_dir_specifiers,
     num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size());
   }
   for (int i = 0; i < tmp_dirs_.size(); ++i) {
-    active_scratch_dirs_metric_->Add(tmp_dirs_[i].path);
+    active_scratch_dirs_metric_->Add(tmp_dirs_[i]->path_);
   }
   if (HasRemoteDir()) {
-    active_scratch_dirs_metric_->Add(tmp_dirs_remote_->path);
+    active_scratch_dirs_metric_->Add(tmp_dirs_remote_->path_);
     RETURN_IF_ERROR(CreateTmpFileBufferPoolThread(metrics));
   }
 
@@ -417,116 +367,6 @@ Status TmpFileMgr::InitCustom(const vector<string>& 
tmp_dir_specifiers,
   return Status::OK();
 }
 
-Status TmpFileMgr::ParseScratchPathToks(const string& tmp_dir_spec,
-    const string& tmp_dirs_without_prefix, bool is_hdfs, string* path,
-    int64_t* bytes_limit, int* priority) {
-  vector<string> toks;
-  split(toks, tmp_dirs_without_prefix, is_any_of(":"), token_compress_off);
-  // The scratch path may have two options "bytes limit" and "priority".
-  // toks_option_st_idx indicates, after the split by colon, from which index 
on,
-  // the content should be the options.
-  int toks_option_st_idx = 1;
-  // The max size after the split by colon.
-  int max_num_tokens = 3;
-  if (is_hdfs) {
-    // We force the HDFS path to contain the port number, so the
-    // first ":" should be a part of the hdfs path.
-    if (toks.size() < 2) {
-      return Status(
-          Substitute("Hdfs path should have the port number: '$0'", 
tmp_dir_spec));
-    }
-    *path = toks[0].append(":").append(toks[1]);
-    toks_option_st_idx++;
-    max_num_tokens++;
-  } else {
-    *path = toks[0];
-  }
-
-  if (toks.size() > max_num_tokens) {
-    return Status(Substitute(
-        "Could not parse temporary dir specifier, too many colons: '$0'", 
tmp_dir_spec));
-  }
-
-  for (int i = toks_option_st_idx; i < toks.size(); i++) {
-    if (i == toks_option_st_idx) {
-      // Parse option byte_limit.
-      bool is_percent;
-      int64_t tmp_bytes_limit = ParseUtil::ParseMemSpec(toks[i], &is_percent, 
0);
-      if (tmp_bytes_limit < 0 || is_percent) {
-        return Status(Substitute(
-            "Malformed scratch directory capacity configuration '$0'", 
tmp_dir_spec));
-      } else if (tmp_bytes_limit == 0) {
-        // Interpret -1, 0 or empty string as no limit.
-        tmp_bytes_limit = numeric_limits<int64_t>::max();
-      }
-      *bytes_limit = tmp_bytes_limit;
-    } else if (i == (toks_option_st_idx + 1)) {
-      // Parse option priority.
-      if (toks[i].empty()) continue;
-      StringParser::ParseResult result;
-      int tmp_priority =
-          StringParser::StringToInt<int>(toks[i].data(), toks[i].size(), 
&result);
-      if (result != StringParser::PARSE_SUCCESS) {
-        return Status(Substitute(
-            "Malformed scratch directory priority configuration '$0'", 
tmp_dir_spec));
-      }
-      *priority = tmp_priority;
-    } else {
-      DCHECK(false) << "Invalid temporary dir specifier: " << tmp_dir_spec;
-    }
-  }
-
-  return Status::OK();
-}
-
-Status TmpFileMgr::CreateDirectory(const string& scratch_subdir_path,
-    const string& tmp_path, const std::unique_ptr<TmpDir>& tmp_dir, 
MetricGroup* metrics,
-    vector<bool>* is_tmp_dir_on_disk, bool* need_local_buffer_dir, int 
disk_id) {
-  // Create the directory, destroying if already present. If this succeeds, we 
will
-  // have an empty writable scratch directory.
-  Status status = 
FileSystemUtil::RemoveAndCreateDirectory(scratch_subdir_path);
-  if (status.ok()) {
-    if (*need_local_buffer_dir) {
-      *need_local_buffer_dir = false;
-      // Add the first local dir as local buffer, the dir is only served as 
the buffer for
-      // Spill to Remote FS. At least we need the dir has two default file 
size space.
-      if (tmp_dir->bytes_limit < tmp_dirs_remote_ctrl_.remote_tmp_file_size_ * 
2) {
-        return Status(Substitute(
-            "Local buffer directory $0 configured for remote scratch "
-            "space has a size limit of $1 bytes, should be at least twice as 
the "
-            "temporary file size "
-            "$2 bytes",
-            tmp_dir->path, tmp_dir->bytes_limit,
-            tmp_dirs_remote_ctrl_.remote_tmp_file_size_));
-      }
-      IntGauge* local_buff_bytes_used_metric =
-          metrics->AddGauge(LOCAL_BUFF_BYTES_USED_FORMAT, 0, Substitute("$0", 
0));
-      local_buff_dir_ = std::make_unique<TmpDir>(
-          scratch_subdir_path, tmp_dir->bytes_limit, 0, 
local_buff_bytes_used_metric);
-      return Status::OK();
-    }
-    if (disk_id >= 0) (*is_tmp_dir_on_disk)[disk_id] = true;
-    LOG(INFO) << "Using scratch directory " << scratch_subdir_path << " on "
-              << "disk " << disk_id
-              << " limit: " << PrettyPrinter::PrintBytes(tmp_dir->bytes_limit);
-    IntGauge* bytes_used_metric = metrics->AddGauge(
-        SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_dirs_.size()));
-    tmp_dirs_.emplace_back(
-        scratch_subdir_path, tmp_dir->bytes_limit, tmp_dir->priority, 
bytes_used_metric);
-  } else {
-    LOG(WARNING) << "Could not remove and recreate directory " << 
scratch_subdir_path
-                 << ": cannot use it for scratch. "
-                 << "Error was: " << status.msg().msg();
-  }
-  if (punch_holes_) {
-    // Make sure hole punching is supported for the directory.
-    // IMPALA-9798: this file should *not* be created inside impala-scratch
-    // subdirectory to avoid races with multiple impalads starting up.
-    RETURN_IF_ERROR(FileSystemUtil::CheckHolePunch(tmp_path));
-  }
-  return Status::OK();
-}
-
 Status TmpFileMgr::CreateTmpFileBufferPoolThread(MetricGroup* metrics) {
   DCHECK(metrics != nullptr);
   tmp_dirs_remote_ctrl_.tmp_file_pool_.reset(new TmpFileBufferPool(this));
@@ -552,7 +392,7 @@ void TmpFileMgr::NewFile(
   string unique_name = lexical_cast<string>(random_generator()());
   stringstream file_name;
   file_name << PrintId(file_group->unique_id()) << "_" << unique_name;
-  path new_file_path(tmp_dirs_[device_id].path);
+  path new_file_path(tmp_dirs_[device_id]->path_);
   new_file_path /= file_name.str();
 
   new_file->reset(new TmpFileLocal(file_group, device_id, 
new_file_path.string()));
@@ -560,7 +400,7 @@ void TmpFileMgr::NewFile(
 
 void TmpFileMgr::RemoveRemoteDir(TmpFileGroup* file_group, DeviceId device_id) 
{
   if (tmp_dirs_remote_ == nullptr) return;
-  string dir = tmp_dirs_remote_->path;
+  string dir = tmp_dirs_remote_->path_;
   stringstream files_dir;
   files_dir << dir << "/" << PrintId(ExecEnv::GetInstance()->backend_id(), 
"_") << "_"
             << PrintId(file_group->unique_id(), "_");
@@ -636,10 +476,10 @@ Status TmpFileMgr::ReserveLocalBufferSpace(bool 
quick_return) {
   // the pool now.
   TmpDir* dir = local_buff_dir_.get();
   if 
(tmp_dirs_remote_ctrl_.local_buff_dir_bytes_high_water_mark_.Add(file_size)
-      > dir->bytes_limit) {
+      > dir->bytes_limit_) {
     
tmp_dirs_remote_ctrl_.local_buff_dir_bytes_high_water_mark_.Add(-file_size);
   } else {
-    GetLocalBufferDir()->bytes_used_metric->Increment(file_size);
+    GetLocalBufferDir()->bytes_used_metric_->Increment(file_size);
     return Status::OK();
   }
 
@@ -694,10 +534,235 @@ string TmpFileMgr::GetTmpDirPath(DeviceId device_id) 
const {
   DCHECK_GE(device_id, 0);
   DCHECK_LT(device_id, tmp_dirs_.size() + ((tmp_dirs_remote_ == nullptr) ? 0 : 
1));
   if (device_id < tmp_dirs_.size()) {
-    return tmp_dirs_[device_id].path;
+    return tmp_dirs_[device_id]->path_;
+  } else {
+    return tmp_dirs_remote_->path_;
+  }
+}
+
+TmpDir::TmpDir(const string& raw_path, const string& prefix, bool is_local)
+  : raw_path_(raw_path), prefix_(prefix), is_local_dir_(is_local), 
is_parsed_(false) {}
+
+Status TmpDir::GetPathFromToks(const vector<string>& toks, string* path, int* 
offset) {
+  DCHECK(path != nullptr);
+  DCHECK(offset != nullptr);
+  string parsed_raw_path = prefix_;
+  // The ordinary format of the directory input after split by colon is
+  // ["path", "bytes_limit", "priority"].
+  parsed_raw_path.append(toks[0]);
+  *path = parsed_raw_path;
+  *offset = 1;
+  return Status::OK();
+}
+
+Status TmpDir::ParsePath(const vector<string>& toks, int* offset) {
+  string parsed_raw_path;
+  RETURN_IF_ERROR(GetPathFromToks(toks, &parsed_raw_path, offset));
+  parsed_raw_path = trim_right_copy_if(parsed_raw_path, is_any_of("/"));
+
+  // Construct the complete scratch directory path.
+  boost::filesystem::path tmp_path(parsed_raw_path);
+  if (is_local_dir_) {
+    tmp_path = absolute(tmp_path);
+    parsed_raw_path = tmp_path.string();
+  }
+  boost::filesystem::path scratch_subdir_path(tmp_path / TMP_SUB_DIR_NAME);
+  parsed_raw_path_ = parsed_raw_path;
+  path_ = scratch_subdir_path.string();
+
+  return Status::OK();
+}
+
+Status TmpDir::ParseByteLimit(const vector<string>& toks, int index) {
+  DCHECK_GE(index, 0);
+  int64_t bytes_limit = numeric_limits<int64_t>::max();
+  if (index < toks.size()) {
+    // Parse option byte_limit.
+    bool is_percent;
+    bytes_limit = ParseUtil::ParseMemSpec(toks[index], &is_percent, 0);
+    if (bytes_limit < 0 || is_percent) {
+      return Status(Substitute(
+          "Malformed scratch directory capacity configuration '$0'", 
raw_path_));
+    } else if (bytes_limit == 0) {
+      // Interpret -1, 0 or empty string as no limit.
+      bytes_limit = numeric_limits<int64_t>::max();
+    }
+  }
+  bytes_limit_ = bytes_limit;
+  return Status::OK();
+}
+
+Status TmpDir::ParsePriority(const vector<string>& toks, int index) {
+  DCHECK_GE(index, 0);
+  int priority = numeric_limits<int>::max();
+  if (index < toks.size() && !toks[index].empty()) {
+    StringParser::ParseResult result;
+    priority =
+        StringParser::StringToInt<int>(toks[index].data(), toks[index].size(), 
&result);
+    if (result != StringParser::PARSE_SUCCESS) {
+      return Status(Substitute(
+          "Malformed scratch directory priority configuration '$0'", 
raw_path_));
+    }
+  }
+  priority_ = priority;
+  return Status::OK();
+}
+
+Status TmpDir::ParseTokens() {
+  vector<string> toks;
+  string path_without_prefix = raw_path_.substr(strlen(prefix_.c_str()));
+  split(toks, path_without_prefix, is_any_of(":"), token_compress_off);
+  int offset = 0;
+  RETURN_IF_ERROR(ParsePath(toks, &offset));
+  const int max_num_options = 2;
+  // The max size after the split by colon.
+  int max_num_tokens = max_num_options + offset;
+  if (toks.size() > max_num_tokens) {
+    return Status(Substitute(
+        "Could not parse temporary dir specifier, too many colons: '$0'", 
raw_path_));
+  }
+  // The scratch path may have two options "bytes limit" and "priority".
+  // The priority should be the first option.
+  RETURN_IF_ERROR(ParseByteLimit(toks, offset++));
+  // The priority should be the second option.
+  RETURN_IF_ERROR(ParsePriority(toks, offset));
+  return Status::OK();
+}
+
+Status TmpDir::Parse() {
+  DCHECK(!is_parsed_);
+  RETURN_IF_ERROR(ParseTokens());
+  is_parsed_ = true;
+  return Status::OK();
+}
+
+Status TmpDirLocal::VerifyAndCreate(MetricGroup* metrics,
+    vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* 
tmp_mgr) {
+  DCHECK(is_parsed_);
+  // The path must be a writable directory.
+  Status status = FileSystemUtil::VerifyIsDirectory(parsed_raw_path_);
+  if (!status.ok()) {
+    LOG(WARNING) << "Cannot use directory " << parsed_raw_path_
+                 << " for scratch: " << status.msg().msg();
+    return status;
+  }
+
+  // Find the disk id of path. Add the scratch directory if there isn't 
another directory
+  // on the same disk (or if we don't know which disk it is on).
+  int disk_id = DiskInfo::disk_id(parsed_raw_path_.c_str());
+  if (!tmp_mgr->one_dir_per_device_ || disk_id < 0 || 
!(*is_tmp_dir_on_disk)[disk_id]) {
+    uint64_t available_space;
+    RETURN_IF_ERROR(
+        FileSystemUtil::GetSpaceAvailable(parsed_raw_path_, &available_space));
+    if (available_space < AVAILABLE_SPACE_THRESHOLD_MB * 1024 * 1024) {
+      LOG(WARNING) << "Filesystem containing scratch directory " << 
parsed_raw_path_
+                   << " has less than " << AVAILABLE_SPACE_THRESHOLD_MB
+                   << "MB available.";
+    }
+    RETURN_IF_ERROR(CreateLocalDirectory(
+        metrics, is_tmp_dir_on_disk, need_local_buffer_dir, disk_id, tmp_mgr));
+    if (tmp_mgr->punch_holes_) {
+      // Make sure hole punching is supported for the directory.
+      // IMPALA-9798: this file should *not* be created inside impala-scratch
+      // subdirectory to avoid races with multiple impalads starting up.
+      RETURN_IF_ERROR(FileSystemUtil::CheckHolePunch(parsed_raw_path_));
+    }
   } else {
-    return tmp_dirs_remote_->path;
+    return Status(Substitute(
+        "The scratch directory $0 is on the same disk with another directory 
or on "
+        "an unknown disk.",
+        parsed_raw_path_));
   }
+  return Status::OK();
+}
+
+Status TmpDirLocal::CreateLocalDirectory(MetricGroup* metrics,
+    vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, int disk_id,
+    TmpFileMgr* tmp_mgr) {
+  // Create the directory, destroying if already present. If this succeeds, we 
will
+  // have an empty writable scratch directory.
+  Status status = FileSystemUtil::RemoveAndCreateDirectory(path_);
+  if (status.ok()) {
+    if (need_local_buffer_dir) {
+      // Add the first local dir as local buffer, the dir is only served as 
the buffer
+      // for spill to remote filesystem. At least we need the dir to have two 
default
+      // file size space.
+      if (bytes_limit_ < tmp_mgr->tmp_dirs_remote_ctrl_.remote_tmp_file_size_ 
* 2) {
+        return Status(Substitute(
+            "Local buffer directory $0 configured for remote scratch "
+            "space has a size limit of $1 bytes, should be at least twice as 
the "
+            "temporary file size "
+            "$2 bytes",
+            path_, bytes_limit_, 
tmp_mgr->tmp_dirs_remote_ctrl_.remote_tmp_file_size_));
+      }
+      bytes_used_metric_ =
+          metrics->AddGauge(LOCAL_BUFF_BYTES_USED_FORMAT, 0, Substitute("$0", 
0));
+      return Status::OK();
+    }
+    if (disk_id >= 0) (*is_tmp_dir_on_disk)[disk_id] = true;
+    LOG(INFO) << "Using scratch directory " << path_ << " on "
+              << "disk " << disk_id
+              << " limit: " << PrettyPrinter::PrintBytes(bytes_limit_);
+    bytes_used_metric_ = metrics->AddGauge(
+        SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", 
tmp_mgr->tmp_dirs_.size()));
+  } else {
+    LOG(WARNING) << "Could not remove and recreate directory " << path_
+                 << ": cannot use it for scratch. "
+                 << "Error was: " << status.msg().msg();
+  }
+  return status;
+}
+
+Status TmpDirS3::VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
+    bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) {
+  // For the S3 path, it doesn't need to create the directory for the uploading
+  // as long as the S3 address is correct.
+  DCHECK(is_parsed_);
+  hdfsFS hdfs_conn;
+  RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
+      parsed_raw_path_, &hdfs_conn, &(tmp_mgr->hdfs_conns_), 
tmp_mgr->s3a_options()));
+  return Status::OK();
+}
+
+Status TmpDirHdfs::GetPathFromToks(
+    const vector<string>& toks, string* parsed_path, int* offset) {
+  DCHECK(parsed_path != nullptr);
+  DCHECK(offset != nullptr);
+  // We enforce the HDFS scratch path to have the port number, and the format 
after split
+  // by colon is ["path", "port_num", "bytes_limit", "priority"], therefore, 
the offset
+  // to be returned is 2.
+  if (toks.size() < 2) {
+    return Status(
+        Substitute("The scrach path should have the port number: '$0'", 
raw_path_));
+  }
+  string parsed_raw_path = prefix_;
+  parsed_raw_path.append(toks[0]).append(":").append(toks[1]);
+  *parsed_path = parsed_raw_path;
+  *offset = 2;
+  return Status::OK();
+}
+
+Status TmpDirHdfs::Parse() {
+  DCHECK(!is_parsed_);
+  RETURN_IF_ERROR(ParseTokens());
+  is_parsed_ = true;
+  return Status::OK();
+}
+
+Status TmpDirHdfs::VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
+    bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) {
+  DCHECK(is_parsed_);
+  hdfsFS hdfs_conn;
+  // If the HDFS path doesn't exist, it would fail while uploading, so we
+  // create the HDFS path if it doesn't exist.
+  RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
+      parsed_raw_path_, &hdfs_conn, &(tmp_mgr->hdfs_conns_)));
+  if (hdfsExists(hdfs_conn, path_.c_str()) != 0) {
+    if (hdfsCreateDirectory(hdfs_conn, path_.c_str()) != 0) {
+      return Status(GetHdfsErrorMsg("HDFS create path failed: ", path_));
+    }
+  }
+  return Status::OK();
 }
 
 TmpFile::TmpFile(
@@ -730,14 +795,14 @@ bool TmpFile::Blacklist(const ErrorMsg& msg) {
   }
 }
 
-TmpFileMgr::TmpDir* TmpFile::GetDir() {
+TmpDir* TmpFile::GetDir() {
   auto tmp_file_mgr = file_group_->tmp_file_mgr_;
   if (device_id_ >= tmp_file_mgr->tmp_dirs_.size()) {
     // Only one remote directory supported.
     DCHECK(device_id_ - tmp_file_mgr->tmp_dirs_.size() == 0);
     return tmp_file_mgr->tmp_dirs_remote_.get();
   }
-  return &tmp_file_mgr->tmp_dirs_[device_id_];
+  return tmp_file_mgr->tmp_dirs_[device_id_].get();
 }
 
 Status TmpFile::PunchHole(int64_t offset, int64_t len) {
@@ -751,7 +816,7 @@ Status TmpFile::PunchHole(int64_t offset, int64_t len) {
   KUDU_RETURN_IF_ERROR(
       file->PunchHole(offset, len), "Failed to punch hole in scratch file");
   bytes_reclaimed_.Add(len);
-  GetDir()->bytes_used_metric->Increment(-len);
+  GetDir()->bytes_used_metric()->Increment(-len);
   VLOG(3) << "Punched hole in " << path_ << " " << offset << " " << len;
   return Status::OK();
 }
@@ -774,8 +839,8 @@ bool TmpFileLocal::AllocateSpace(int64_t num_bytes, 
int64_t* offset) {
   DCHECK_GT(num_bytes, 0);
   TmpDir* dir = GetDir();
   // Increment optimistically and roll back if the limit is exceeded.
-  if (dir->bytes_used_metric->Increment(num_bytes) > dir->bytes_limit) {
-    dir->bytes_used_metric->Increment(-num_bytes);
+  if (dir->bytes_used_metric()->Increment(num_bytes) > dir->bytes_limit()) {
+    dir->bytes_used_metric()->Increment(-num_bytes);
     return false;
   }
   *offset = allocation_offset_;
@@ -793,7 +858,7 @@ Status TmpFileLocal::Remove() {
   int64_t bytes_in_use = file_group_->tmp_file_mgr_->punch_holes() ?
       allocation_offset_ - bytes_reclaimed_.Load() :
       allocation_offset_;
-  GetDir()->bytes_used_metric->Increment(-bytes_in_use);
+  GetDir()->bytes_used_metric()->Increment(-bytes_in_use);
   return status;
 }
 
@@ -845,7 +910,7 @@ io::DiskFile* TmpFileRemote::GetWriteFile() {
   return disk_buffer_file_.get();
 }
 
-TmpFileMgr::TmpDir* TmpFileRemote::GetLocalBufferDir() const {
+TmpDir* TmpFileRemote::GetLocalBufferDir() const {
   return file_group_->tmp_file_mgr_->GetLocalBufferDir();
 }
 
@@ -883,7 +948,7 @@ Status TmpFileRemote::Remove() {
   disk_file_->SetStatus(io::DiskFileStatus::DELETED);
 
   // Update the metrics.
-  GetDir()->bytes_used_metric->Increment(-file_size_);
+  GetDir()->bytes_used_metric()->Increment(-file_size_);
 
   // Return the file to the pool if it hasn't been enqueued.
   if (to_return_the_buffer) {
@@ -923,13 +988,13 @@ TmpFileGroup::TmpFileGroup(TmpFileMgr* tmp_file_mgr, 
DiskIoMgr* io_mgr,
   DCHECK(tmp_file_mgr != nullptr);
   io_ctx_ = io_mgr_->RegisterContext();
   // Populate the priority based index ranges.
-  const std::vector<TmpDir>& tmp_dirs = tmp_file_mgr_->tmp_dirs_;
+  const std::vector<std::unique_ptr<TmpDir>>& tmp_dirs = 
tmp_file_mgr_->tmp_dirs_;
   if (tmp_dirs.size() > 0) {
     int start_index = 0;
-    int priority = tmp_dirs[0].priority;
+    int priority = tmp_dirs[0]->priority();
     for (int i = 0; i < tmp_dirs.size() - 1; ++i) {
-      priority = tmp_dirs[i].priority;
-      const int next_priority = tmp_dirs[i+1].priority;
+      priority = tmp_dirs[i]->priority();
+      const int next_priority = tmp_dirs[i + 1]->priority();
       if (next_priority != priority) {
         tmp_files_index_range_.emplace(priority, 
TmpFileIndexRange(start_index, i));
         start_index = i + 1;
@@ -1025,7 +1090,7 @@ void TmpFileGroup::UpdateScratchSpaceMetrics(int64_t 
num_bytes, bool is_remote)
   if (is_remote) current_bytes_allocated_remote_.Add(num_bytes);
 }
 
-string TmpFileGroup::GenerateNewPath(string& dir, string& unique_name) {
+string TmpFileGroup::GenerateNewPath(const string& dir, const string& 
unique_name) {
   stringstream file_name;
   file_name << TMP_SUB_DIR_NAME << "-" << unique_name;
   path new_file_path(dir);
@@ -1045,7 +1110,7 @@ std::shared_ptr<TmpFile>& 
TmpFileGroup::FindTmpFileSharedPtr(TmpFile* tmp_file)
 Status TmpFileGroup::AllocateRemoteSpace(int64_t num_bytes, TmpFile** tmp_file,
     int64_t* file_offset, vector<int>* at_capacity_dirs) {
   // Only one remote dir supported currently.
-  string dir = tmp_file_mgr_->tmp_dirs_remote_->path;
+  string dir = tmp_file_mgr_->tmp_dirs_remote_->path();
   // It is not supposed to have a remote directory other than HDFS or S3.
   DCHECK(IsHdfsPath(dir.c_str(), false) || IsS3APath(dir.c_str(), false));
 
@@ -1069,7 +1134,7 @@ Status TmpFileGroup::AllocateRemoteSpace(int64_t 
num_bytes, TmpFile** tmp_file,
     return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, bytes_limit_, 
GetBackendString());
   }
 
-  int64_t remote_dir_bytes_limit = 
tmp_file_mgr_->tmp_dirs_remote_->bytes_limit;
+  int64_t remote_dir_bytes_limit = 
tmp_file_mgr_->tmp_dirs_remote_->bytes_limit();
   if (remote_dir_bytes_limit != -1 && new_bytes > remote_dir_bytes_limit) {
     return Status(
         TErrorCode::SCRATCH_LIMIT_EXCEEDED, remote_dir_bytes_limit, 
GetBackendString());
@@ -1085,7 +1150,7 @@ Status TmpFileGroup::AllocateRemoteSpace(int64_t 
num_bytes, TmpFile** tmp_file,
       + PrintId(unique_id(), "_");
 
   string new_file_path = GenerateNewPath(dir, unique_name);
-  string local_buffer_dir = tmp_file_mgr_->local_buff_dir_->path;
+  const string& local_buffer_dir = tmp_file_mgr_->local_buff_dir_->path();
   string new_file_path_local = GenerateNewPath(local_buffer_dir, unique_name);
 
   TmpFileRemote* tmp_file_r = new TmpFileRemote(
@@ -1098,14 +1163,14 @@ Status TmpFileGroup::AllocateRemoteSpace(int64_t 
num_bytes, TmpFile** tmp_file,
   }
   shared_ptr<TmpFile> tmp_file_remote(move(tmp_file_r));
   int64_t file_size = tmp_file_mgr_->GetRemoteTmpFileSize();
-  TmpFileMgr::TmpDir* tmp_dir_remote = tmp_file_remote->GetDir();
-  if (tmp_dir_remote->bytes_limit != -1
-      && tmp_dir_remote->bytes_used_metric->Increment(file_size)
-          > tmp_dir_remote->bytes_limit) {
-    tmp_dir_remote->bytes_used_metric->Increment(-file_size);
+  TmpDir* tmp_dir_remote = tmp_file_remote->GetDir();
+  if (tmp_dir_remote->bytes_limit() != -1
+      && tmp_dir_remote->bytes_used_metric()->Increment(file_size)
+          > tmp_dir_remote->bytes_limit()) {
+    tmp_dir_remote->bytes_used_metric()->Increment(-file_size);
     at_capacity_dirs->push_back(dev_id);
     return Status(Substitute("Reach the size limit $0 of dir: $1",
-        tmp_dir_remote->bytes_limit, tmp_dir_remote->path));
+        tmp_dir_remote->bytes_limit(), tmp_dir_remote->path()));
   }
   UpdateScratchSpaceMetrics(file_size, true);
   tmp_files_remote_.emplace_back(move(tmp_file_remote));
@@ -1473,15 +1538,15 @@ Status TmpFileGroup::RecoverWriteError(
 Status TmpFileGroup::ScratchAllocationFailedStatus(
     const vector<int>& at_capacity_dirs) {
   vector<string> tmp_dir_paths;
-  for (TmpDir& tmp_dir : tmp_file_mgr_->tmp_dirs_) {
-    tmp_dir_paths.push_back(tmp_dir.path);
+  for (std::unique_ptr<TmpDir>& tmp_dir : tmp_file_mgr_->tmp_dirs_) {
+    tmp_dir_paths.push_back(tmp_dir->path());
   }
   vector<string> at_capacity_dir_paths;
   for (int dir_idx : at_capacity_dirs) {
     if (dir_idx >= tmp_file_mgr_->tmp_dirs_.size()) {
-      at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_remote_->path);
+      at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_remote_->path());
     } else {
-      at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_[dir_idx].path);
+      
at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_[dir_idx]->path());
     }
   }
   Status status(TErrorCode::SCRATCH_ALLOCATION_FAILED, join(tmp_dir_paths, 
","),
@@ -1994,7 +2059,7 @@ void 
TmpFileBufferPool::EnqueueTmpFilesPool(shared_ptr<TmpFile>& tmp_file, bool
     } else {
       tmp_files_avail_pool_.push_back(tmp_file);
     }
-    tmp_file_mgr_->GetLocalBufferDir()->bytes_used_metric->Increment(
+    tmp_file_mgr_->GetLocalBufferDir()->bytes_used_metric()->Increment(
         -1 * tmp_file_mgr_->GetRemoteTmpFileSize());
   }
   tmp_files_available_cv_.NotifyOne();
@@ -2030,7 +2095,7 @@ Status TmpFileBufferPool::DequeueTmpFilesPool(
     DCHECK(tmp_file_remote->is_enqueued());
     tmp_file_remote->SetEnqueued(false);
   }
-  tmp_file_mgr_->GetLocalBufferDir()->bytes_used_metric->Increment(
+  tmp_file_mgr_->GetLocalBufferDir()->bytes_used_metric()->Increment(
       tmp_file_mgr_->GetRemoteTmpFileSize());
   return Status::OK();
 }
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index bb459d9..9f3691c 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -54,6 +54,7 @@ namespace io {
 }
 struct BufferPoolClientCounters;
 class MemTracker;
+class TmpDir;
 class TmpFile;
 class TmpFileRemote;
 class TmpFileBufferPool;
@@ -132,33 +133,6 @@ class TmpFileMgr {
   /// Same typedef as io::WriteRange::WriteDoneCallback.
   typedef std::function<void(const Status&)> WriteDoneCallback;
 
-  /// A configured temporary directory that TmpFileMgr allocates files in.
-  struct TmpDir {
-    TmpDir(const std::string& path, int64_t bytes_limit, int priority,
-        IntGauge* bytes_used_metric, bool is_local_dir = true)
-      : path(path),
-        bytes_limit(bytes_limit),
-        priority(priority),
-        bytes_used_metric(bytes_used_metric),
-        is_local_dir(is_local_dir) {}
-
-    /// Path to the temporary directory.
-    std::string path;
-
-    /// Limit on bytes that should be written to this path. Set to maximum 
value
-    /// of int64_t if there is no limit.
-    int64_t bytes_limit;
-
-    /// Scratch directory priority.
-    int priority;
-
-    /// The current bytes of scratch used for this temporary directory.
-    IntGauge* bytes_used_metric;
-
-    /// If the dir is expected in the local file system or in the remote.
-    bool is_local_dir;
-  };
-
   /// A configuration for the control parameters of remote temporary 
directories.
   /// The struct is used by TmpFileMgr and has the same lifecycle as 
TmpFileMgr.
   struct TmpDirRemoteCtrl {
@@ -208,18 +182,6 @@ class TmpFileMgr {
   Status InitCustom(const std::vector<std::string>& tmp_dir_specifiers,
       bool one_dir_per_device, const std::string& compression_codec, bool 
punch_holes,
       MetricGroup* metrics) WARN_UNUSED_RESULT;
-
-  /// A helper function for InitCustom() to parse the options of the scratch 
directory.
-  Status ParseScratchPathToks(const string& tmp_dir_spec,
-      const string& tmp_dirs_without_prefix, bool is_hdfs, string* path,
-      int64_t* bytes_limit, int* priority) WARN_UNUSED_RESULT;
-
-  /// A helper function for InitCustom() to create a scratch directory.
-  Status CreateDirectory(const string& scratch_subdir_path, const string& 
tmp_path,
-      const std::unique_ptr<TmpDir>& tmp_dir, MetricGroup* metrics,
-      vector<bool>* is_tmp_dir_on_disk, bool* has_remote_dir,
-      int disk_id) WARN_UNUSED_RESULT;
-
   // Create the TmpFile buffer pool thread for async buffer file reservation.
   Status CreateTmpFileBufferPoolThread(MetricGroup* metrics) 
WARN_UNUSED_RESULT;
 
@@ -328,6 +290,9 @@ class TmpFileMgr {
   friend class TmpFileRemote;
   friend class TmpFileGroup;
   friend class TmpFileMgrTest;
+  friend class TmpDirLocal;
+  friend class TmpDirHdfs;
+  friend class TmpDirS3;
 
   /// Return a new TmpFile handle with a path based on file_group->unique_id. 
The file is
   /// associated with the 'file_group' and the file path is within the 
(single) scratch
@@ -353,9 +318,12 @@ class TmpFileMgr {
   /// Whether hole punching is enabled.
   bool punch_holes_ = false;
 
+  /// Whether one local scratch directory per device.
+  bool one_dir_per_device_ = false;
+
   /// The paths of the created tmp directories, which are used for spilling to 
local
   /// filesystem.
-  std::vector<TmpDir> tmp_dirs_;
+  std::vector<std::unique_ptr<TmpDir>> tmp_dirs_;
 
   /// The paths of remote directories, which are used for spilling to remote 
filesystem.
   std::unique_ptr<TmpDir> tmp_dirs_remote_;
@@ -469,7 +437,7 @@ class TmpFileGroup {
   void UpdateScratchSpaceMetrics(int64_t num_bytes, bool is_remote = false);
 
   /// Assemble and return a new path.
-  std::string GenerateNewPath(string& dir, string& unique_name);
+  std::string GenerateNewPath(const string& dir, const string& unique_name);
 
   std::string DebugString();
 

Reply via email to