This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a469a9cf19a34d28a7d29d1b0fac0aeb0662ca8e
Author: Michael Smith <[email protected]>
AuthorDate: Wed Nov 16 16:20:46 2022 -0800

    IMPALA-11730: Add support for spilling to Ozone
    
    Adds support for spilling to Ozone (ofs and o3fs schemes) for parity
    with HDFS. Note that ofs paths start with <volume>/<bucket>, which have
    naming restrictions; tmp/impala-scratch is a valid name, so something
    like ofs://localhost:9862/tmp would work as a scratch directory (volume
    tmp, implicit bucket impala-scratch).
    
    Updates tests to determine the correct path from the environment. Fixes
    backend tests to work with Ozone as well. Guards test_scratch_disk.py
    behind a new flag for filesystems that support spilling. Updates metric
    verification to wait for scratch-space-bytes-used to be non-zero, as it
    seems to update slower with Ozone.
    
    Refactors TmpDir to remove extraneous variables and functions. Each
    implementation is expected to handle its own token parsing.
    
    Initializes default_fs in ExecEnv when using TestEnv. Previously it was
    uninitialized, and uses of default_fs would return an empty string.
    
    Testing:
    - Ran backend, end-to-end, and custom cluster tests with Ozone.
    - Ran test_scratch_disk.py exhaustive runs with Ozone and HDFS.
    
    Change-Id: I5837c30357363f727ca832fb94169f2474fb4f6f
    Reviewed-on: http://gerrit.cloudera.org:8080/19251
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/bufferpool/buffer-pool-test.cc |   8 +-
 be/src/runtime/io/disk-io-mgr-test.cc         |  36 +++---
 be/src/runtime/test-env.cc                    |   9 ++
 be/src/runtime/test-env.h                     |   3 +
 be/src/runtime/tmp-file-mgr-internal.h        |  62 ++++------
 be/src/runtime/tmp-file-mgr-test.cc           |  50 ++++----
 be/src/runtime/tmp-file-mgr.cc                | 166 +++++++++++---------------
 tests/common/skip.py                          |   2 +-
 tests/custom_cluster/test_scratch_disk.py     |  62 +++++-----
 9 files changed, 189 insertions(+), 209 deletions(-)

diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc 
b/be/src/runtime/bufferpool/buffer-pool-test.cc
index b9946ba60..c322d026c 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -66,8 +66,6 @@ DECLARE_string(remote_tmp_file_size);
 const string SCRATCH_SUFFIX = "/impala-scratch";
 
 /// For testing spill to remote.
-static const string HDFS_LOCAL_URL = "hdfs://localhost:20500/tmp";
-static const string REMOTE_URL = HDFS_LOCAL_URL;
 static const string LOCAL_BUFFER_PATH = "/tmp/buffer-pool-test-buffer";
 
 namespace impala {
@@ -87,6 +85,7 @@ class BufferPoolTest : public ::testing::Test {
     test_env_->DisableBufferPool();
     ASSERT_OK(test_env_->Init());
     RandTestUtil::SeedRng("BUFFER_POOL_TEST_SEED", &rng_);
+    remote_url_ = test_env_->GetDefaultFsPath("/tmp");
   }
 
   virtual void TearDown() {
@@ -160,7 +159,7 @@ class BufferPoolTest : public ::testing::Test {
     } else {
       tmp_dirs.push_back(Substitute(LOCAL_BUFFER_PATH + ":$0", 
local_buffer_limit));
     }
-    tmp_dirs.push_back(REMOTE_URL);
+    tmp_dirs.push_back(remote_url_);
     EXPECT_OK(FileSystemUtil::RemoveAndCreateDirectory(LOCAL_BUFFER_PATH));
     for (int i = 0; i < num_local_dirs; ++i) {
       const string& create_dir = Substitute("/tmp/buffer-pool-test.$0", i);
@@ -472,6 +471,9 @@ class BufferPoolTest : public ::testing::Test {
   /// of the map are protected by query_reservations_lock_.
   unordered_map<int64_t, ReservationTracker*> query_reservations_;
   SpinLock query_reservations_lock_;
+
+  /// URL for remote spilling.
+  string remote_url_;
 };
 
 const int64_t BufferPoolTest::TEST_BUFFER_LEN;
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc 
b/be/src/runtime/io/disk-io-mgr-test.cc
index 1e5f9240b..8852762eb 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -74,8 +74,6 @@ const int64_t LARGE_INITIAL_RESERVATION = 128L * 1024L * 
1024L;
 const int64_t BUFFER_POOL_CAPACITY = LARGE_RESERVATION_LIMIT;
 
 /// For testing spill to remote.
-static const string HDFS_LOCAL_URL = "hdfs://localhost:20500/tmp";
-static const string REMOTE_URL = HDFS_LOCAL_URL;
 static const string LOCAL_BUFFER_PATH = "/tmp/tmp-file-mgr-test-buffer";
 
 namespace impala {
@@ -95,6 +93,7 @@ class DiskIoMgrTest : public testing::Test {
     ASSERT_OK(test_env_->Init());
     metrics_.reset(new MetricGroup("disk-io-mgr-test"));
     RandTestUtil::SeedRng("DISK_IO_MGR_TEST_SEED", &rng_);
+    remote_url_ = test_env_->GetDefaultFsPath("/tmp");
   }
 
   virtual void TearDown() {
@@ -173,7 +172,7 @@ class DiskIoMgrTest : public testing::Test {
     if (io_mgr == nullptr) io_mgr = test_env_->exec_env()->disk_io_mgr();
     vector<string> tmp_dirs({LOCAL_BUFFER_PATH});
     RemoveAndCreateDirs(tmp_dirs);
-    tmp_dirs.push_back(REMOTE_URL);
+    tmp_dirs.push_back(remote_url_);
     Status status = tmp_file_mgr->InitCustom(tmp_dirs, true, "", false, 
metrics_.get());
     if (!status.ok()) return nullptr;
     return pool_.Add(new TmpFileGroup(tmp_file_mgr, io_mgr, NewProfile(), 
TUniqueId()));
@@ -349,6 +348,9 @@ class DiskIoMgrTest : public testing::Test {
   mutex oper_mutex_;
   ConditionVariable oper_done_;
   int num_oper_;
+
+  /// URL for remote spilling.
+  string remote_url_;
 };
 
 TEST_F(DiskIoMgrTest, TestDisk) {
@@ -1933,7 +1935,7 @@ TEST_F(DiskIoMgrTest, MetricsOfWriteIoError) {
 TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);
   num_ranges_written_ = 0;
-  string remote_file_path = REMOTE_URL + "/test";
+  string remote_file_path = remote_url_ + "/test";
   string new_file_path_local_buffer = LOCAL_BUFFER_PATH + "/test";
   int32_t file_size = 1024;
   FLAGS_remote_tmp_file_size = "1K";
@@ -1967,7 +1969,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) {
 
   TmpFileRemote** new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote*);
   *new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote(tmp_file_grp, 0, 
remote_file_path,
-      new_file_path_local_buffer, false, REMOTE_URL.c_str()));
+      new_file_path_local_buffer, false, remote_url_.c_str()));
 
   vector<WriteRange*> ranges;
   vector<int32_t> datas;
@@ -2113,7 +2115,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) {
 TEST_F(DiskIoMgrTest, WriteToRemotePartialFileSuccess) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);
   num_ranges_written_ = 0;
-  string remote_file_path = REMOTE_URL + "/test";
+  string remote_file_path = remote_url_ + "/test";
   string new_file_path_local_buffer = LOCAL_BUFFER_PATH + "/test";
   FLAGS_remote_tmp_file_size = "1K";
 
@@ -2138,7 +2140,7 @@ TEST_F(DiskIoMgrTest, WriteToRemotePartialFileSuccess) {
 
   TmpFileRemote** new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote*);
   *new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote(tmp_file_grp, 0, 
remote_file_path,
-      new_file_path_local_buffer, false, REMOTE_URL.c_str()));
+      new_file_path_local_buffer, false, remote_url_.c_str()));
 
   int32_t* data = tmp_pool.Add(new int32_t);
   *data = rand();
@@ -2195,7 +2197,7 @@ TEST_F(DiskIoMgrTest, WriteToRemotePartialFileSuccess) {
 TEST_F(DiskIoMgrTest, WriteToRemoteUploadFailed) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);
   num_oper_ = 0;
-  string remote_file_path = REMOTE_URL + "/test";
+  string remote_file_path = remote_url_ + "/test";
   string non_existent_dir = "/non-existent-dir/test";
   FLAGS_remote_tmp_file_size = "1K";
   int64_t file_size = 1024;
@@ -2215,7 +2217,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteUploadFailed) {
 
   TmpFileRemote** new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote*);
   *new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote(
-      tmp_file_grp, 0, remote_file_path, non_existent_dir, false, 
REMOTE_URL.c_str()));
+      tmp_file_grp, 0, remote_file_path, non_existent_dir, false, 
remote_url_.c_str()));
 
   DiskFile* remote_file = (*new_tmp_file_obj)->DiskFile();
   DiskFile* local_buffer_file = (*new_tmp_file_obj)->DiskBufferFile();
@@ -2253,7 +2255,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteUploadFailed) {
 TEST_F(DiskIoMgrTest, WriteToRemoteEvictLocal) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);
   num_ranges_written_ = 0;
-  string remote_file_path = REMOTE_URL + "/test1";
+  string remote_file_path = remote_url_ + "/test1";
   string local_buffer_file_path = LOCAL_BUFFER_PATH + "/test1";
   int32_t file_size = 1024;
   FLAGS_remote_tmp_file_size = "1K";
@@ -2273,7 +2275,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteEvictLocal) {
   unique_ptr<RequestContext> io_ctx = io_mgr.RegisterContext();
 
   TmpFileRemote* new_tmp_file_obj = new TmpFileRemote(tmp_file_grp, 0, 
remote_file_path,
-      local_buffer_file_path, false, REMOTE_URL.c_str());
+      local_buffer_file_path, false, remote_url_.c_str());
   shared_ptr<TmpFileRemote> shared_tmp_file;
   shared_tmp_file.reset(move(new_tmp_file_obj));
 
@@ -2351,7 +2353,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteEvictLocal) {
 // Use an invalid block size to emulate the case when memory allocation failed.
 TEST_F(DiskIoMgrTest, WriteToRemoteFailMallocBlock) {
   num_ranges_written_ = 0;
-  string remote_file_path = REMOTE_URL + "/test1";
+  string remote_file_path = remote_url_ + "/test1";
   string local_buffer_file_path = LOCAL_BUFFER_PATH + "/test1";
   int64_t invalid_block_size = -1;
 
@@ -2366,7 +2368,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteFailMallocBlock) {
 
   TmpFileRemote** new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote*);
   *new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote(tmp_file_grp, 0, 
remote_file_path,
-      local_buffer_file_path, false, REMOTE_URL.c_str()));
+      local_buffer_file_path, false, remote_url_.c_str()));
 
   RemoteOperRange::RemoteOperDoneCallback u_callback = [this](
                                                            const Status& 
upload_status) {
@@ -2399,7 +2401,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteFailMallocBlock) {
 TEST_F(DiskIoMgrTest, WriteToRemoteDiffPagesSuccess) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);
   num_ranges_written_ = 0;
-  string remote_file_path = REMOTE_URL + "/test";
+  string remote_file_path = remote_url_ + "/test";
   string new_file_path_local_buffer = LOCAL_BUFFER_PATH + "/test";
   int32_t block_size = 1024;
   FLAGS_remote_tmp_file_size = "1K";
@@ -2423,7 +2425,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteDiffPagesSuccess) {
 
   TmpFileRemote** new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote*);
   *new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote(tmp_file_grp, 0, 
remote_file_path,
-      new_file_path_local_buffer, false, REMOTE_URL.c_str()));
+      new_file_path_local_buffer, false, remote_url_.c_str()));
 
   WriteRange::WriteDoneCallback callback = [=](const Status& status) {
     ASSERT_EQ(0, status.code());
@@ -2487,7 +2489,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteDiffPagesSuccess) {
 TEST_F(DiskIoMgrTest, WriteToRemoteFileDeleted) {
   num_oper_ = 0;
   num_ranges_written_ = 0;
-  string remote_file_path = REMOTE_URL + "/test";
+  string remote_file_path = remote_url_ + "/test";
   string local_buffer_path = LOCAL_BUFFER_PATH + "/test";
   FLAGS_remote_tmp_file_size = "1K";
   int64_t file_size = 1024;
@@ -2511,7 +2513,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteFileDeleted) {
   unique_ptr<RequestContext> io_ctx = io_mgr.RegisterContext();
 
   TmpFileRemote tmp_file(
-      tmp_file_grp, 0, remote_file_path, local_buffer_path, false, 
REMOTE_URL.c_str());
+      tmp_file_grp, 0, remote_file_path, local_buffer_path, false, 
remote_url_.c_str());
   DiskFile* remote_file = tmp_file.DiskFile();
   DiskFile* local_buffer_file = tmp_file.DiskBufferFile();
   tmp_file.GetWriteFile()->SetActualFileSize(file_size);
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 7c26931ab..54aebafa4 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -62,6 +62,7 @@ Status TestEnv::Init() {
   exec_env_.reset(new ExecEnv);
   // Populate the ExecEnv state that the backend tests need.
   RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init());
+  RETURN_IF_ERROR(exec_env_->InitHadoopConfig());
   exec_env_->tmp_file_mgr_.reset(new TmpFileMgr);
   if (have_tmp_file_mgr_args_) {
     RETURN_IF_ERROR(tmp_file_mgr()->InitCustom(tmp_dirs_, 
one_tmp_dir_per_device_,
@@ -140,6 +141,14 @@ int64_t TestEnv::TotalQueryMemoryConsumption() {
   return total;
 }
 
+std::string TestEnv::GetDefaultFsPath(std::string path) {
+  const char* filesystem_prefix = getenv("FILESYSTEM_PREFIX");
+  if (filesystem_prefix != nullptr && filesystem_prefix[0] != '\0') {
+    return Substitute("$0$1", filesystem_prefix, path);
+  }
+  return Substitute("$0$1", exec_env_->default_fs(), path);
+}
+
 Status TestEnv::CreateQueryState(
     int64_t query_id, const TQueryOptions* query_options, RuntimeState** 
runtime_state) {
   TQueryCtx query_ctx;
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index b68add29e..3b05e42f7 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -82,6 +82,9 @@ class TestEnv {
     exec_env_->codegen_cache_.reset(new CodeGenCache(metrics));
   }
 
+  /// Get a full URI for the provided path based on the default filesystem.
+  std::string GetDefaultFsPath(std::string path);
+
   ExecEnv* exec_env() { return exec_env_.get(); }
   MetricGroup* metrics() { return exec_env_->metrics(); }
   TmpFileMgr* tmp_file_mgr() { return exec_env_->tmp_file_mgr(); }
diff --git a/be/src/runtime/tmp-file-mgr-internal.h 
b/be/src/runtime/tmp-file-mgr-internal.h
index dbe7566ef..db3076776 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -400,11 +400,11 @@ class TmpFileDummy : public TmpFile {
 /// A configured temporary directory that TmpFileMgr allocates files in.
 class TmpDir {
  public:
-  TmpDir(const std::string& raw_path, const std::string& prefix, bool 
is_local);
+  TmpDir(const std::string& raw_path) : raw_path_(raw_path) {}
   virtual ~TmpDir() {}
 
   /// Parse the raw path and identify the scratch directory options.
-  virtual Status Parse();
+  Status Parse();
 
   /// Verify the scratch path and create the directory.
   virtual Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
@@ -414,7 +414,7 @@ class TmpDir {
   int priority() { return priority_; }
   const string& path() { return path_; }
   IntGauge* bytes_used_metric() const { return bytes_used_metric_; }
-  bool is_local() { return is_local_dir_; }
+  virtual bool is_local() { return false; }
 
  private:
   friend class TmpFileMgr;
@@ -428,82 +428,64 @@ class TmpDir {
   /// Parsed raw path of the temporary directory, e.g, trimmed.
   std::string parsed_raw_path_;
 
-  /// The prefix of the path.
-  std::string prefix_;
-
   /// The complete path to the temporary directory.
   std::string path_;
 
   /// Limit on bytes that should be written to this path. Set to maximum value
   /// of int64_t if there is no limit.
-  int64_t bytes_limit_;
+  int64_t bytes_limit_ = numeric_limits<int64_t>::max();
 
   /// Scratch directory priority.
-  int priority_;
+  int priority_ = numeric_limits<int>::max();
 
   /// The current bytes of scratch used for this temporary directory.
   IntGauge* bytes_used_metric_;
 
-  /// If the dir is expected in the local file system or in the remote.
-  const bool is_local_dir_;
-
-  /// Indicate if the TmpDir is parsed.
-  bool is_parsed_;
-
-  /// Return the directory path by parsing the input tokens.
-  /// "Path" is the path generated from the tokens.
-  /// "Offset" indicates the number of elements has been read in the tokens.
-  virtual Status GetPathFromToks(
-      const std::vector<string>& tokens, string* path, int* offset);
-
-  /// A helper function for ParseTokens() to parse the raw path and generate 
the complete
+  /// A helper function for Parse() to parse the raw path and generate the 
complete
   /// path of the scratch directory.
-  /// "Offset" indicates the number of elements has been read in the tokens.
-  Status ParsePath(const std::vector<string>& tokens, int* offset);
+  /// "Tokens" will contain {path, [bytes_limit, [priority]]} tokens from raw 
path.
+  virtual Status ParsePathTokens(std::vector<string>& tokens) = 0;
 
-  /// A helper function for ParseTokens() to parse the byte limit of the 
scratch
-  /// directory. "Index" indicates the position of the byte_limit in the 
tokens.
-  Status ParseByteLimit(const std::vector<string>& tokens, int index);
+  /// A helper function for Parse() to parse the byte limit of the scratch 
directory.
+  Status ParseByteLimit(const string& byte_limit);
 
-  /// A helper function for ParseTokens() to parse the priorify of the scratch 
directory.
-  /// "Index" indicates the position of the priority in the tokens.
-  Status ParsePriority(const std::vector<string>& tokens, int index);
-
-  /// A helper function for Parse() to parse raw input of the scratch 
directory.
-  Status ParseTokens();
+  /// A helper function for Parse() to parse the priorify of the scratch 
directory.
+  Status ParsePriority(const string& priority);
 };
 
 class TmpDirLocal : public TmpDir {
  public:
-  TmpDirLocal(const std::string& path) : TmpDir(path, "", true /*is_local*/) {}
+  TmpDirLocal(const std::string& path) : TmpDir(path) {}
   Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
       bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override;
+  bool is_local() override { return true; }
 
  private:
   /// A helper function for VerifyAndCreate() to create a local scratch 
directory.
   Status CreateLocalDirectory(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
       bool need_local_buffer_dir, int disk_id, TmpFileMgr* tmp_mgr);
+
+  Status ParsePathTokens(std::vector<string>& tokens) override;
 };
 
 class TmpDirS3 : public TmpDir {
  public:
-  TmpDirS3(const std::string& path)
-    : TmpDir(path, FILESYS_PREFIX_S3, false /*is_local*/) {}
+  TmpDirS3(const std::string& path) : TmpDir(path) {}
   Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
       bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override;
+
+ private:
+  Status ParsePathTokens(std::vector<string>& tokens) override;
 };
 
 class TmpDirHdfs : public TmpDir {
  public:
-  TmpDirHdfs(const std::string& path)
-    : TmpDir(path, FILESYS_PREFIX_HDFS, false /*is_local*/) {}
-  Status Parse() override;
+  TmpDirHdfs(const std::string& path) : TmpDir(path) {}
   Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
       bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override;
 
  private:
-  virtual Status GetPathFromToks(
-      const std::vector<string>& tokens, string* path, int* offset) override;
+  Status ParsePathTokens(std::vector<string>& tokens) override;
 };
 
 /// Temporary file buffer pool allows the temporary files to return their 
buffer to the
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index 038b7e8ee..f29f12adb 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -71,8 +71,6 @@ static const int64_t GIGABYTE = 1024L * MEGABYTE;
 static const int64_t TERABYTE = 1024L * GIGABYTE;
 
 /// For testing spill to remote.
-static const string HDFS_LOCAL_URL = "hdfs://localhost:20500/tmp";
-static const string REMOTE_URL = HDFS_LOCAL_URL;
 static const string LOCAL_BUFFER_PATH = "/tmp/tmp-file-mgr-test-buffer";
 
 /// Read buffer sizes for TestBatchReadingSetMaxBytes().
@@ -99,6 +97,7 @@ class TmpFileMgrTest : public ::testing::Test {
     test_env_.reset(new TestEnv);
     ASSERT_OK(test_env_->Init());
     cb_counter_ = 0;
+    remote_url_ = test_env_->GetDefaultFsPath("/tmp");
   }
 
   virtual void TearDown() {
@@ -186,7 +185,7 @@ class TmpFileMgrTest : public ::testing::Test {
     vector<string> tmp_create_dirs{{LOCAL_BUFFER_PATH}};
     RemoveAndCreateDirs(tmp_create_dirs);
     tmp_dirs->push_back(Substitute(LOCAL_BUFFER_PATH + ":$0", 4096));
-    tmp_dirs->push_back(REMOTE_URL);
+    tmp_dirs->push_back(remote_url_);
   }
 
   // Helper for TestBatchReadingSetMaxBytes() to set the read buffer size and 
check
@@ -393,6 +392,9 @@ class TmpFileMgrTest : public ::testing::Test {
   mutex cb_cv_lock_;
   ConditionVariable cb_cv_;
   int64_t cb_counter_;
+
+  /// URL for remote spilling.
+  string remote_url_;
 };
 
 /// Regression test for IMPALA-2160. Verify that temporary file manager 
allocates blocks
@@ -1088,9 +1090,10 @@ TEST_F(TmpFileMgrTest, 
TestDirectoryLimitParsingRemotePath) {
   RemoveAndCreateDirs({"/tmp/local-buffer-dir", "/tmp/local-buffer-dir1",
       "/tmp/local-buffer-dir2", "/tmp/local-buffer-dir3"});
 
-  // Successful cases for HDFS paths.
+  // Successful cases for FS paths.
   // Two types of paths, one with directory, one without.
-  vector<string> hdfs_paths{"hdfs://localhost:20500", 
"hdfs://localhost:20500/tmp"};
+  vector<string> hdfs_paths{
+      test_env_->GetDefaultFsPath(""), test_env_->GetDefaultFsPath("/tmp")};
   for (string hdfs_path : hdfs_paths) {
     string full_hdfs_path = hdfs_path + "/impala-scratch";
     auto& dirs1 = GetTmpRemoteDir(CreateTmpFileMgr(hdfs_path + 
",/tmp/local-buffer-dir"));
@@ -1150,15 +1153,14 @@ TEST_F(TmpFileMgrTest, 
TestDirectoryLimitParsingRemotePath) {
     EXPECT_EQ(nullptr, dirs10);
 
     // Multiple remote paths, should support only one.
-    auto& dirs11 = GetTmpRemoteDir(CreateTmpFileMgr(hdfs_path
-        + ",hdfs://localhost:20501/tmp,"
-          "/tmp/local-buffer-dir"));
+    auto& dirs11 = GetTmpRemoteDir(CreateTmpFileMgr(Substitute(
+        "$0,hdfs://localhost:20501/tmp,/tmp/local-buffer-dir", hdfs_path)));
     EXPECT_NE(nullptr, dirs11);
     EXPECT_EQ(full_hdfs_path, dirs11->path());
 
     // The order of the buffer and the remote dir should not affect the result.
-    auto& dirs12 = GetTmpRemoteDir(CreateTmpFileMgr(
-        "/tmp/local-buffer-dir, " + hdfs_path + 
",hdfs://localhost:20501/tmp"));
+    auto& dirs12 = GetTmpRemoteDir(CreateTmpFileMgr(Substitute(
+        "/tmp/local-buffer-dir,$0,hdfs://localhost:20501/tmp", hdfs_path)));
     EXPECT_NE(nullptr, dirs12);
     EXPECT_EQ(full_hdfs_path, dirs12->path());
   }
@@ -1258,8 +1260,8 @@ void TmpFileMgrTest::TestCompressBufferManagement(
   TmpFileMgr tmp_file_mgr;
   if (spill_to_remote) {
     RemoveAndCreateDirs(vector<string>{LOCAL_BUFFER_PATH});
-    ASSERT_OK(tmp_file_mgr.InitCustom(vector<string>{REMOTE_URL, 
LOCAL_BUFFER_PATH}, true,
-        "lz4", true, metrics_.get()));
+    ASSERT_OK(tmp_file_mgr.InitCustom(vector<string>{remote_url_, 
LOCAL_BUFFER_PATH},
+        true, "lz4", true, metrics_.get()));
   } else {
     ASSERT_OK(tmp_file_mgr.InitCustom(
         vector<string>{"/tmp/tmp-file-mgr-test.1"}, true, "lz4", true, 
metrics_.get()));
@@ -1512,7 +1514,7 @@ TEST_F(TmpFileMgrTest, TestRemoteOneDir) {
   vector<string> tmp_dirs({LOCAL_BUFFER_PATH});
   TmpFileMgr tmp_file_mgr;
   RemoveAndCreateDirs(tmp_dirs);
-  tmp_dirs.push_back(REMOTE_URL);
+  tmp_dirs.push_back(remote_url_);
   ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, 
metrics_.get()));
   TUniqueId id;
   TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
@@ -1541,7 +1543,7 @@ TEST_F(TmpFileMgrTest, TestRemoteDirReportError) {
   vector<string> tmp_dirs({LOCAL_BUFFER_PATH});
   TmpFileMgr tmp_file_mgr;
   RemoveAndCreateDirs(tmp_dirs);
-  tmp_dirs.push_back(REMOTE_URL);
+  tmp_dirs.push_back(remote_url_);
   ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, "", false, 
metrics_.get()));
   TUniqueId id;
   TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
@@ -1596,7 +1598,7 @@ TEST_F(TmpFileMgrTest, TestRemoteAllocateNonWritable) {
   vector<string> tmp_dirs({LOCAL_BUFFER_PATH});
   TmpFileMgr tmp_file_mgr;
   RemoveAndCreateDirs(tmp_dirs);
-  tmp_dirs.push_back(REMOTE_URL);
+  tmp_dirs.push_back(remote_url_);
   ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, 
metrics_.get()));
   TUniqueId id;
   TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
@@ -1629,7 +1631,7 @@ TEST_F(TmpFileMgrTest, TestRemoteScratchLimit) {
   vector<string> tmp_dirs({LOCAL_BUFFER_PATH});
   TmpFileMgr tmp_file_mgr;
   RemoveAndCreateDirs(tmp_dirs);
-  tmp_dirs.push_back(REMOTE_URL);
+  tmp_dirs.push_back(remote_url_);
 
   ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, 
metrics_.get()));
   TUniqueId id;
@@ -1671,7 +1673,7 @@ TEST_F(TmpFileMgrTest, TestRemoteWriteRange) {
   vector<string> tmp_dirs({LOCAL_BUFFER_PATH});
   TmpFileMgr tmp_file_mgr;
   RemoveAndCreateDirs(tmp_dirs);
-  tmp_dirs.push_back(REMOTE_URL);
+  tmp_dirs.push_back(remote_url_);
 
   ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, 
metrics_.get()));
   TUniqueId id;
@@ -1706,7 +1708,7 @@ TEST_F(TmpFileMgrTest, TestRemoteBlockVerification) {
   vector<string> tmp_dirs({LOCAL_BUFFER_PATH});
   TmpFileMgr tmp_file_mgr;
   RemoveAndCreateDirs(tmp_dirs);
-  tmp_dirs.push_back(REMOTE_URL);
+  tmp_dirs.push_back(remote_url_);
 
   ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, 
metrics_.get()));
   TUniqueId id;
@@ -1755,7 +1757,7 @@ TEST_F(TmpFileMgrTest, TestRemoteDirectoryLimits) {
   vector<string> tmp_dirs{{LOCAL_BUFFER_PATH}};
   TmpFileMgr tmp_file_mgr;
   RemoveAndCreateDirs(tmp_dirs);
-  tmp_dirs.push_back(REMOTE_URL);
+  tmp_dirs.push_back(remote_url_);
   int64_t alloc_size = 1024;
   int64_t file_size = 1024;
   FLAGS_remote_tmp_file_size = "1K";
@@ -1805,7 +1807,7 @@ TEST_F(TmpFileMgrTest, TestMixDirectoryLimits) {
   vector<string> tmp_dirs{{LOCAL_BUFFER_PATH, 
"/tmp/tmp-file-mgr-test-local:1024"}};
   TmpFileMgr tmp_file_mgr;
   RemoveAndCreateDirs(tmp_create_dirs);
-  tmp_dirs.push_back(REMOTE_URL);
+  tmp_dirs.push_back(remote_url_);
   int64_t alloc_size = 1024;
   int64_t file_size = 1024;
   FLAGS_remote_tmp_file_size = "1K";
@@ -1868,7 +1870,7 @@ TEST_F(TmpFileMgrTest, TestMixTmpFileLimits) {
   vector<string> tmp_dirs{{LOCAL_BUFFER_PATH}};
   TmpFileMgr tmp_file_mgr;
   RemoveAndCreateDirs(tmp_create_dirs);
-  tmp_dirs.push_back(REMOTE_URL);
+  tmp_dirs.push_back(remote_url_);
   int64_t alloc_size = 1024;
   int64_t file_size = 512 * 1024 * 1024;
   int64_t offset;
@@ -1896,7 +1898,7 @@ void 
TmpFileMgrTest::TestTmpFileBufferPoolHelper(TmpFileMgr& tmp_file_mgr,
   vector<string> tmp_create_dirs{{LOCAL_BUFFER_PATH}};
   vector<string> tmp_dirs{{Substitute(LOCAL_BUFFER_PATH + ":$0", 2048)}};
   RemoveAndCreateDirs(tmp_create_dirs);
-  tmp_dirs.push_back(REMOTE_URL);
+  tmp_dirs.push_back(remote_url_);
   int64_t alloc_size = 1024;
   int64_t file_size = 1024;
   FLAGS_remote_tmp_file_size = "1KB";
@@ -2035,7 +2037,7 @@ TEST_F(TmpFileMgrTest, TestRemoteRemoveBuffer) {
   vector<string> tmp_dirs({LOCAL_BUFFER_PATH});
   TmpFileMgr tmp_file_mgr;
   RemoveAndCreateDirs(tmp_dirs);
-  tmp_dirs.push_back(REMOTE_URL);
+  tmp_dirs.push_back(remote_url_);
 
   ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, 
metrics_.get()));
   TUniqueId id;
@@ -2091,7 +2093,7 @@ TEST_F(TmpFileMgrTest, TestRemoteUploadFailed) {
   vector<string> tmp_create_dirs{{LOCAL_BUFFER_PATH}};
   vector<string> tmp_dirs{{Substitute(LOCAL_BUFFER_PATH + ":$0", 
buffer_limit)}};
   RemoveAndCreateDirs(tmp_create_dirs);
-  tmp_dirs.push_back(HDFS_LOCAL_URL);
+  tmp_dirs.push_back(remote_url_);
 
   ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, 
metrics_.get()));
   TUniqueId id;
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 8a90cf614..f8af71a30 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -290,7 +290,8 @@ Status TmpFileMgr::InitCustom(const vector<string>& 
tmp_dir_specifiers,
     string 
tmp_dir_spec_trimmed(boost::algorithm::trim_left_copy(tmp_dir_spec));
     std::unique_ptr<TmpDir> tmp_dir;
 
-    if (IsHdfsPath(tmp_dir_spec_trimmed.c_str(), false)) {
+    if (IsHdfsPath(tmp_dir_spec_trimmed.c_str(), false)
+        || IsOzonePath(tmp_dir_spec_trimmed.c_str(), false)) {
       tmp_dir = std::make_unique<TmpDirHdfs>(tmp_dir_spec_trimmed);
     } else if (IsS3APath(tmp_dir_spec_trimmed.c_str(), false)) {
       // Initialize the S3 options for later getting S3 connection.
@@ -645,105 +646,72 @@ Status 
TmpFileMgr::TmpDirRemoteCtrl::SetUpReadBufferParams() {
   return Status::OK();
 }
 
-TmpDir::TmpDir(const string& raw_path, const string& prefix, bool is_local)
-  : raw_path_(raw_path), prefix_(prefix), is_local_dir_(is_local), 
is_parsed_(false) {}
-
-Status TmpDir::GetPathFromToks(const vector<string>& toks, string* path, int* 
offset) {
-  DCHECK(path != nullptr);
-  DCHECK(offset != nullptr);
-  string parsed_raw_path = prefix_;
-  // The ordinary format of the directory input after split by colon is
-  // ["path", "bytes_limit", "priority"].
-  parsed_raw_path.append(toks[0]);
-  *path = parsed_raw_path;
-  *offset = 1;
-  return Status::OK();
-}
-
-Status TmpDir::ParsePath(const vector<string>& toks, int* offset) {
-  string parsed_raw_path;
-  RETURN_IF_ERROR(GetPathFromToks(toks, &parsed_raw_path, offset));
-  parsed_raw_path = trim_right_copy_if(parsed_raw_path, is_any_of("/"));
-
-  // Construct the complete scratch directory path.
-  boost::filesystem::path tmp_path(parsed_raw_path);
-  if (is_local_dir_) {
-    tmp_path = absolute(tmp_path);
-    parsed_raw_path = tmp_path.string();
-  }
-  boost::filesystem::path scratch_subdir_path(tmp_path / TMP_SUB_DIR_NAME);
-  parsed_raw_path_ = parsed_raw_path;
-  path_ = scratch_subdir_path.string();
-
-  return Status::OK();
-}
-
-Status TmpDir::ParseByteLimit(const vector<string>& toks, int index) {
-  DCHECK_GE(index, 0);
-  int64_t bytes_limit = numeric_limits<int64_t>::max();
-  if (index < toks.size()) {
-    // Parse option byte_limit.
-    bool is_percent;
-    bytes_limit = ParseUtil::ParseMemSpec(toks[index], &is_percent, 0);
-    if (bytes_limit < 0 || is_percent) {
-      return Status(Substitute(
-          "Malformed scratch directory capacity configuration '$0'", 
raw_path_));
-    } else if (bytes_limit == 0) {
-      // Interpret -1, 0 or empty string as no limit.
-      bytes_limit = numeric_limits<int64_t>::max();
-    }
+Status TmpDir::ParseByteLimit(const string& byte_limit) {
+  bool is_percent;
+  bytes_limit_ = ParseUtil::ParseMemSpec(byte_limit, &is_percent, 0);
+  if (bytes_limit_ < 0 || is_percent) {
+    return Status(Substitute(
+        "Malformed scratch directory capacity configuration '$0'", raw_path_));
+  } else if (bytes_limit_ == 0) {
+    // Interpret -1, 0 or empty string as no limit.
+    bytes_limit_ = numeric_limits<int64_t>::max();
   }
-  bytes_limit_ = bytes_limit;
   return Status::OK();
 }
 
-Status TmpDir::ParsePriority(const vector<string>& toks, int index) {
-  DCHECK_GE(index, 0);
-  int priority = numeric_limits<int>::max();
-  if (index < toks.size() && !toks[index].empty()) {
+Status TmpDir::ParsePriority(const string& priority) {
+  if (!priority.empty()) {
     StringParser::ParseResult result;
-    priority =
-        StringParser::StringToInt<int>(toks[index].data(), toks[index].size(), 
&result);
+    priority_ = StringParser::StringToInt<int>(
+        priority.c_str(), priority.size(), &result);
     if (result != StringParser::PARSE_SUCCESS) {
       return Status(Substitute(
           "Malformed scratch directory priority configuration '$0'", 
raw_path_));
     }
   }
-  priority_ = priority;
   return Status::OK();
 }
 
-Status TmpDir::ParseTokens() {
+Status TmpDir::Parse() {
+  DCHECK(parsed_raw_path_.empty() && path_.empty());
+
   vector<string> toks;
-  string path_without_prefix = raw_path_.substr(strlen(prefix_.c_str()));
-  split(toks, path_without_prefix, is_any_of(":"), token_compress_off);
-  int offset = 0;
-  RETURN_IF_ERROR(ParsePath(toks, &offset));
-  const int max_num_options = 2;
-  // The max size after the split by colon.
-  int max_num_tokens = max_num_options + offset;
+  RETURN_IF_ERROR(ParsePathTokens(toks));
+
+  constexpr int max_num_tokens = 3;
   if (toks.size() > max_num_tokens) {
     return Status(Substitute(
         "Could not parse temporary dir specifier, too many colons: '$0'", 
raw_path_));
   }
+
+  // Construct the complete scratch directory path.
+  toks[0] = trim_right_copy_if(toks[0], is_any_of("/"));
+  parsed_raw_path_ = toks[0];
+  path_ = (boost::filesystem::path(toks[0]) / TMP_SUB_DIR_NAME).string();
+
   // The scratch path may have two options "bytes limit" and "priority".
-  // The priority should be the first option.
-  RETURN_IF_ERROR(ParseByteLimit(toks, offset++));
+  // The bytes limit should be the first option.
+  if (toks.size() > 1) {
+    RETURN_IF_ERROR(ParseByteLimit(toks[1]));
+  }
   // The priority should be the second option.
-  RETURN_IF_ERROR(ParsePriority(toks, offset));
+  if (toks.size() > 2) {
+    RETURN_IF_ERROR(ParsePriority(toks[2]));
+  }
   return Status::OK();
 }
 
-Status TmpDir::Parse() {
-  DCHECK(!is_parsed_);
-  RETURN_IF_ERROR(ParseTokens());
-  is_parsed_ = true;
+Status TmpDirLocal::ParsePathTokens(vector<string>& toks) {
+  // The ordinary format of the directory input after split by colon is
+  // {path, [bytes_limit, [priority]]}.
+  split(toks, raw_path_, is_any_of(":"), token_compress_off);
+  toks[0] = absolute(toks[0]).string();
   return Status::OK();
 }
 
 Status TmpDirLocal::VerifyAndCreate(MetricGroup* metrics,
     vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* 
tmp_mgr) {
-  DCHECK(is_parsed_);
+  DCHECK(!parsed_raw_path_.empty());
   // The path must be a writable directory.
   Status status = FileSystemUtil::VerifyIsDirectory(parsed_raw_path_);
   if (!status.ok()) {
@@ -784,6 +752,7 @@ Status TmpDirLocal::VerifyAndCreate(MetricGroup* metrics,
 Status TmpDirLocal::CreateLocalDirectory(MetricGroup* metrics,
     vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, int disk_id,
     TmpFileMgr* tmp_mgr) {
+  DCHECK(!path_.empty());
   // Create the directory, destroying if already present. If this succeeds, we 
will
   // have an empty writable scratch directory.
   Status status = FileSystemUtil::RemoveAndCreateDirectory(path_);
@@ -807,7 +776,8 @@ Status TmpDirLocal::CreateLocalDirectory(MetricGroup* 
metrics,
     if (disk_id >= 0) (*is_tmp_dir_on_disk)[disk_id] = true;
     LOG(INFO) << "Using scratch directory " << path_ << " on "
               << "disk " << disk_id
-              << " limit: " << PrettyPrinter::PrintBytes(bytes_limit_);
+              << " limit: " << PrettyPrinter::PrintBytes(bytes_limit_)
+              << ", priority: " << priority_;
     bytes_used_metric_ = metrics->AddGauge(
         SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", 
tmp_mgr->tmp_dirs_.size()));
   } else {
@@ -818,45 +788,44 @@ Status TmpDirLocal::CreateLocalDirectory(MetricGroup* 
metrics,
   return status;
 }
 
+Status TmpDirS3::ParsePathTokens(vector<string>& toks) {
+  // The ordinary format of the directory input after split by colon is
+  // {scheme, path, [bytes_limit, [priority]]}. Combine scheme and path.
+  split(toks, raw_path_, is_any_of(":"), token_compress_off);
+  // Only called on paths starting with `s3a://`, so there will always be at 
least 2.
+  DCHECK(toks.size() >= 2);
+  toks[0] = Substitute("$0:$1", toks[0], toks[1]);
+  toks.erase(toks.begin()+1);
+  return Status::OK();
+}
+
 Status TmpDirS3::VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
     bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) {
   // For the S3 path, it doesn't need to create the directory for the uploading
   // as long as the S3 address is correct.
-  DCHECK(is_parsed_);
+  DCHECK(!path_.empty());
   hdfsFS hdfs_conn;
   RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
       path_, &hdfs_conn, &(tmp_mgr->hdfs_conns_), tmp_mgr->s3a_options()));
   return Status::OK();
 }
 
-Status TmpDirHdfs::GetPathFromToks(
-    const vector<string>& toks, string* parsed_path, int* offset) {
-  DCHECK(parsed_path != nullptr);
-  DCHECK(offset != nullptr);
+Status TmpDirHdfs::ParsePathTokens(vector<string>& toks) {
   // We enforce the HDFS scratch path to have the port number, and the format 
after split
-  // by colon is ["path", "port_num", "bytes_limit", "priority"], therefore, 
the offset
-  // to be returned is 2.
-  if (toks.size() < 2) {
+  // by colon is {scheme, path, port_num, [bytes_limit, [priority]]}. Coalesce 
the URI.
+  split(toks, raw_path_, is_any_of(":"), token_compress_off);
+  if (toks.size() < 3) {
     return Status(
-        Substitute("The scrach path should have the port number: '$0'", 
raw_path_));
+        Substitute("The scratch path should have the port number: '$0'", 
raw_path_));
   }
-  string parsed_raw_path = prefix_;
-  parsed_raw_path.append(toks[0]).append(":").append(toks[1]);
-  *parsed_path = parsed_raw_path;
-  *offset = 2;
-  return Status::OK();
-}
-
-Status TmpDirHdfs::Parse() {
-  DCHECK(!is_parsed_);
-  RETURN_IF_ERROR(ParseTokens());
-  is_parsed_ = true;
+  toks[0] = Substitute("$0:$1:$2", toks[0], toks[1], toks[2]);
+  toks.erase(toks.begin()+1, toks.begin()+3);
   return Status::OK();
 }
 
 Status TmpDirHdfs::VerifyAndCreate(MetricGroup* metrics, vector<bool>* 
is_tmp_dir_on_disk,
     bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) {
-  DCHECK(is_parsed_);
+  DCHECK(!path_.empty());
   hdfsFS hdfs_conn;
   // If the HDFS path doesn't exist, it would fail while uploading, so we
   // create the HDFS path if it doesn't exist.
@@ -978,6 +947,10 @@ TmpFileRemote::TmpFileRemote(TmpFileGroup* file_group, 
TmpFileMgr::DeviceId devi
     disk_type_ = io::DiskFileType::DFS;
     disk_id_ = file_group->io_mgr_->RemoteDfsDiskId();
     disk_id_file_op_ = file_group->io_mgr_->RemoteDfsDiskFileOperId();
+  } else if (IsOzonePath(hdfs_url, false)) {
+    disk_type_ = io::DiskFileType::DFS;
+    disk_id_ = file_group->io_mgr_->RemoteOzoneDiskId();
+    disk_id_file_op_ = file_group->io_mgr_->RemoteDfsDiskFileOperId();
   } else if (IsS3APath(hdfs_url, false)) {
     disk_type_ = io::DiskFileType::S3;
     disk_id_ = file_group->io_mgr_->RemoteS3DiskId();
@@ -1382,8 +1355,9 @@ Status TmpFileGroup::AllocateRemoteSpace(int64_t 
num_bytes, TmpFile** tmp_file,
     int64_t* file_offset, vector<int>* at_capacity_dirs) {
   // Only one remote dir supported currently.
   string dir = tmp_file_mgr_->tmp_dirs_remote_->path();
-  // It is not supposed to have a remote directory other than HDFS or S3.
-  DCHECK(IsHdfsPath(dir.c_str(), false) || IsS3APath(dir.c_str(), false));
+  // It is not supposed to have a remote directory other than HDFS, Ozone, or 
S3.
+  DCHECK(IsHdfsPath(dir.c_str(), false) || IsOzonePath(dir.c_str(), false)
+      || IsS3APath(dir.c_str(), false));
 
   // Look for the space from a previous created file.
   if (!tmp_files_remote_.empty()) {
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 46fc303fa..85d2c678d 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -99,7 +99,7 @@ class SkipIf:
   not_hdfs = pytest.mark.skipif(not IS_HDFS, reason="HDFS admin needed")
   not_dfs = pytest.mark.skipif(not (IS_HDFS or IS_OZONE),
       reason="HDFS/Ozone Filesystem needed")
-  not_scratch_fs = pytest.mark.skipif(not IS_HDFS,
+  not_scratch_fs = pytest.mark.skipif(not (IS_HDFS or IS_OZONE),
       reason="Scratch dirs for temporary file spilling not supported")
   sfs_unsupported = pytest.mark.skipif(not (IS_HDFS or IS_S3 or IS_ABFS or 
IS_ADLS
       or IS_GCS), reason="Hive support for sfs+ is limited, HIVE-26757")
diff --git a/tests/custom_cluster/test_scratch_disk.py 
b/tests/custom_cluster/test_scratch_disk.py
index 66492bcf7..35adf1c40 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -27,6 +27,7 @@ import tempfile
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.verifiers.metric_verifier import MetricVerifier
 from tests.common.skip import SkipIf
+from tests.util.hdfs_util import NAMENODE
 
 class TestScratchDir(CustomClusterTestSuite):
   @classmethod
@@ -229,13 +230,9 @@ class TestScratchDir(CustomClusterTestSuite):
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
-    metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0')
-    metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1')
-    metrics2 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-2')
-    metrics3 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-3')
-    metrics4 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-4')
-    assert (metrics0 > 0 and metrics1 > 0 and metrics2 > 0 and metrics3 > 0
-            and metrics4 > 0)
+    for i in range(5):
+      impalad.service.wait_for_metric_value(
+          'tmp-file-mgr.scratch-space-bytes-used.dir-' + str(i), 1, 
allow_greater=True)
     results = client.fetch(self.spill_query, handle)
     assert results.success
     client.close_query(handle)
@@ -263,26 +260,30 @@ class TestScratchDir(CustomClusterTestSuite):
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
+    # dir1 and dir3 have highest priority and will be used as scratch disk.
+    impalad.service.wait_for_metric_value(
+        'tmp-file-mgr.scratch-space-bytes-used.dir-1', 1, allow_greater=True)
+    impalad.service.wait_for_metric_value(
+        'tmp-file-mgr.scratch-space-bytes-used.dir-3', 1, allow_greater=True)
     metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0')
-    metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1')
     metrics2 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-2')
-    metrics3 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-3')
     metrics4 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-4')
-    # dir1 and dir3 have highest priority and will be used as scratch disk.
-    assert (metrics1 > 0 and metrics3 > 0 and metrics0 == 0 and metrics2 == 0
-            and metrics4 == 0)
+    assert (metrics0 == 0 and metrics2 == 0 and metrics4 == 0)
     results = client.fetch(self.spill_query, handle)
     assert results.success
     client.close_query(handle)
     client.close()
 
+  def dfs_tmp_path(self):
+    return "{}/tmp".format(NAMENODE)
+
   @pytest.mark.execute_serially
   @SkipIf.not_scratch_fs
   def test_scratch_dirs_remote_spill(self, vector):
     # Test one remote directory with one its local buffer directory.
     normal_dirs = self.generate_dirs(1)
-    # Use local hdfs for testing. Could be changed to S3.
-    normal_dirs.append('hdfs://localhost:20500/tmp')
+    # Use dfs for testing.
+    normal_dirs.append(self.dfs_tmp_path())
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 
-scratch_dirs={0}'.format(','.join(normal_dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'],
@@ -296,9 +297,9 @@ class TestScratchDir(CustomClusterTestSuite):
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
-    metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0')
     # Dir0 is the remote directory.
-    assert (metrics0 > 0)
+    impalad.service.wait_for_metric_value(
+        'tmp-file-mgr.scratch-space-bytes-used.dir-0', 1, allow_greater=True)
     results = client.fetch(self.spill_query, handle)
     assert results.success
     client.close_query(handle)
@@ -313,7 +314,7 @@ class TestScratchDir(CustomClusterTestSuite):
     normal_dirs = self.generate_dirs(2)
     normal_dirs[0] = '{0}::{1}'.format(normal_dirs[0], 1)
     normal_dirs[1] = '{0}:2GB:{1}'.format(normal_dirs[1], 0)
-    normal_dirs.append('hdfs://localhost:20500/tmp')
+    normal_dirs.append(self.dfs_tmp_path())
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 
-scratch_dirs={0}'.format(','.join(normal_dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'],
@@ -327,11 +328,12 @@ class TestScratchDir(CustomClusterTestSuite):
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
-    metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0')
-    metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1')
     # Local directory always ranks before the remote one, so dir0 is the local 
directory.
     # Only spill to dir0 because it has enough space for the spilling.
-    assert (metrics0 > 0 and metrics1 == 0)
+    impalad.service.wait_for_metric_value(
+        'tmp-file-mgr.scratch-space-bytes-used.dir-0', 1, allow_greater=True)
+    metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1')
+    assert (metrics1 == 0)
     results = client.fetch(self.spill_query, handle)
     assert results.success
     client.close_query(handle)
@@ -346,7 +348,7 @@ class TestScratchDir(CustomClusterTestSuite):
     normal_dirs = self.generate_dirs(2)
     normal_dirs[0] = '{0}:32MB:{1}'.format(normal_dirs[0], 0)
     normal_dirs[1] = '{0}:4MB:{1}'.format(normal_dirs[1], 1)
-    normal_dirs.append('hdfs://localhost:20500/tmp')
+    normal_dirs.append(self.dfs_tmp_path())
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 
-scratch_dirs={0}'.format(','.join(normal_dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'],
@@ -360,12 +362,13 @@ class TestScratchDir(CustomClusterTestSuite):
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
-    metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0')
-    metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1')
     # Local directory always ranks before the remote one, so dir0 is the local 
directory.
     # The query spills to both dir0 and dir1. By default the remote file is 
16MB each,
     # so the value of metrics1 should be at least one file size.
-    assert (metrics0 == 4 * 1024 * 1024 and metrics1 % (16 * 1024 * 1024) == 0)
+    impalad.service.wait_for_metric_value(
+        'tmp-file-mgr.scratch-space-bytes-used.dir-0', 4 * 1024 * 1024)
+    metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1')
+    assert (metrics1 % (16 * 1024 * 1024) == 0)
     results = client.fetch(self.spill_query, handle)
     assert results.success
     client.close_query(handle)
@@ -377,7 +380,7 @@ class TestScratchDir(CustomClusterTestSuite):
     # One local buffer directory and one remote directory.
     normal_dirs = self.generate_dirs(1)
     normal_dirs[0] = '{0}:16MB:{1}'.format(normal_dirs[0], 0)
-    normal_dirs.append('hdfs://localhost:20500/tmp')
+    normal_dirs.append(self.dfs_tmp_path())
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 
-scratch_dirs={0}'.format(','.join(normal_dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
@@ -392,9 +395,12 @@ class TestScratchDir(CustomClusterTestSuite):
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
-    metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0')
     # The query spills to the remote directories and creates remote files,
     # so that the size is bigger than 0, and be integer times of remote file 
size.
+    impalad.service.wait_for_metric_value(
+        'tmp-file-mgr.scratch-space-bytes-used.dir-0',
+        8 * 1024 * 1024, allow_greater=True)
+    metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0')
     assert (metrics0 > 0 and metrics0 % (8 * 1024 * 1024) == 0)
     results = client.fetch(self.spill_query, handle)
     assert results.success
@@ -408,7 +414,7 @@ class TestScratchDir(CustomClusterTestSuite):
     directory to test if there is a deadlock issue.'''
     normal_dirs = self.generate_dirs(1)
     normal_dirs[0] = '{0}:16MB:{1}'.format(normal_dirs[0], 0)
-    normal_dirs.append('hdfs://localhost:20500/tmp')
+    normal_dirs.append(self.dfs_tmp_path())
     num = 5
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 
-scratch_dirs={0}'.format(','.join(normal_dirs)),
@@ -454,7 +460,7 @@ class TestScratchDir(CustomClusterTestSuite):
     # Set the buffer directory small enough to spill to the remote one.
     normal_dirs = self.generate_dirs(1)
     normal_dirs[0] = '{0}:2MB:{1}'.format(normal_dirs[0], 1)
-    normal_dirs.append('hdfs://localhost:20500/tmp')
+    normal_dirs.append(self.dfs_tmp_path())
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 
-scratch_dirs={0}'.format(','.join(normal_dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',

Reply via email to