This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit a469a9cf19a34d28a7d29d1b0fac0aeb0662ca8e Author: Michael Smith <[email protected]> AuthorDate: Wed Nov 16 16:20:46 2022 -0800 IMPALA-11730: Add support for spilling to Ozone Adds support for spilling to Ozone (ofs and o3fs schemes) for parity with HDFS. Note that ofs paths start with <volume>/<bucket>, which have naming restrictions; tmp/impala-scratch is a valid name, so something like ofs://localhost:9862/tmp would work as a scratch directory (volume tmp, implicit bucket impala-scratch). Updates tests to determine the correct path from the environment. Fixes backend tests to work with Ozone as well. Guards test_scratch_disk.py behind a new flag for filesystems that support spilling. Updates metric verification to wait for scratch-space-bytes-used to be non-zero, as it seems to update slower with Ozone. Refactors TmpDir to remove extraneous variables and functions. Each implementation is expected to handle its own token parsing. Initializes default_fs in ExecEnv when using TestEnv. Previously it was uninitialized, and uses of default_fs would return an empty string. Testing: - Ran backend, end-to-end, and custom cluster tests with Ozone. - Ran test_scratch_disk.py exhaustive runs with Ozone and HDFS. Change-Id: I5837c30357363f727ca832fb94169f2474fb4f6f Reviewed-on: http://gerrit.cloudera.org:8080/19251 Reviewed-by: Michael Smith <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/runtime/bufferpool/buffer-pool-test.cc | 8 +- be/src/runtime/io/disk-io-mgr-test.cc | 36 +++--- be/src/runtime/test-env.cc | 9 ++ be/src/runtime/test-env.h | 3 + be/src/runtime/tmp-file-mgr-internal.h | 62 ++++------ be/src/runtime/tmp-file-mgr-test.cc | 50 ++++---- be/src/runtime/tmp-file-mgr.cc | 166 +++++++++++--------------- tests/common/skip.py | 2 +- tests/custom_cluster/test_scratch_disk.py | 62 +++++----- 9 files changed, 189 insertions(+), 209 deletions(-) diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc index b9946ba60..c322d026c 100644 --- a/be/src/runtime/bufferpool/buffer-pool-test.cc +++ b/be/src/runtime/bufferpool/buffer-pool-test.cc @@ -66,8 +66,6 @@ DECLARE_string(remote_tmp_file_size); const string SCRATCH_SUFFIX = "/impala-scratch"; /// For testing spill to remote. -static const string HDFS_LOCAL_URL = "hdfs://localhost:20500/tmp"; -static const string REMOTE_URL = HDFS_LOCAL_URL; static const string LOCAL_BUFFER_PATH = "/tmp/buffer-pool-test-buffer"; namespace impala { @@ -87,6 +85,7 @@ class BufferPoolTest : public ::testing::Test { test_env_->DisableBufferPool(); ASSERT_OK(test_env_->Init()); RandTestUtil::SeedRng("BUFFER_POOL_TEST_SEED", &rng_); + remote_url_ = test_env_->GetDefaultFsPath("/tmp"); } virtual void TearDown() { @@ -160,7 +159,7 @@ class BufferPoolTest : public ::testing::Test { } else { tmp_dirs.push_back(Substitute(LOCAL_BUFFER_PATH + ":$0", local_buffer_limit)); } - tmp_dirs.push_back(REMOTE_URL); + tmp_dirs.push_back(remote_url_); EXPECT_OK(FileSystemUtil::RemoveAndCreateDirectory(LOCAL_BUFFER_PATH)); for (int i = 0; i < num_local_dirs; ++i) { const string& create_dir = Substitute("/tmp/buffer-pool-test.$0", i); @@ -472,6 +471,9 @@ class BufferPoolTest : public ::testing::Test { /// of the map are protected by query_reservations_lock_. unordered_map<int64_t, ReservationTracker*> query_reservations_; SpinLock query_reservations_lock_; + + /// URL for remote spilling. + string remote_url_; }; const int64_t BufferPoolTest::TEST_BUFFER_LEN; diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc index 1e5f9240b..8852762eb 100644 --- a/be/src/runtime/io/disk-io-mgr-test.cc +++ b/be/src/runtime/io/disk-io-mgr-test.cc @@ -74,8 +74,6 @@ const int64_t LARGE_INITIAL_RESERVATION = 128L * 1024L * 1024L; const int64_t BUFFER_POOL_CAPACITY = LARGE_RESERVATION_LIMIT; /// For testing spill to remote. -static const string HDFS_LOCAL_URL = "hdfs://localhost:20500/tmp"; -static const string REMOTE_URL = HDFS_LOCAL_URL; static const string LOCAL_BUFFER_PATH = "/tmp/tmp-file-mgr-test-buffer"; namespace impala { @@ -95,6 +93,7 @@ class DiskIoMgrTest : public testing::Test { ASSERT_OK(test_env_->Init()); metrics_.reset(new MetricGroup("disk-io-mgr-test")); RandTestUtil::SeedRng("DISK_IO_MGR_TEST_SEED", &rng_); + remote_url_ = test_env_->GetDefaultFsPath("/tmp"); } virtual void TearDown() { @@ -173,7 +172,7 @@ class DiskIoMgrTest : public testing::Test { if (io_mgr == nullptr) io_mgr = test_env_->exec_env()->disk_io_mgr(); vector<string> tmp_dirs({LOCAL_BUFFER_PATH}); RemoveAndCreateDirs(tmp_dirs); - tmp_dirs.push_back(REMOTE_URL); + tmp_dirs.push_back(remote_url_); Status status = tmp_file_mgr->InitCustom(tmp_dirs, true, "", false, metrics_.get()); if (!status.ok()) return nullptr; return pool_.Add(new TmpFileGroup(tmp_file_mgr, io_mgr, NewProfile(), TUniqueId())); @@ -349,6 +348,9 @@ class DiskIoMgrTest : public testing::Test { mutex oper_mutex_; ConditionVariable oper_done_; int num_oper_; + + /// URL for remote spilling. + string remote_url_; }; TEST_F(DiskIoMgrTest, TestDisk) { @@ -1933,7 +1935,7 @@ TEST_F(DiskIoMgrTest, MetricsOfWriteIoError) { TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) { InitRootReservation(LARGE_RESERVATION_LIMIT); num_ranges_written_ = 0; - string remote_file_path = REMOTE_URL + "/test"; + string remote_file_path = remote_url_ + "/test"; string new_file_path_local_buffer = LOCAL_BUFFER_PATH + "/test"; int32_t file_size = 1024; FLAGS_remote_tmp_file_size = "1K"; @@ -1967,7 +1969,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) { TmpFileRemote** new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote*); *new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote(tmp_file_grp, 0, remote_file_path, - new_file_path_local_buffer, false, REMOTE_URL.c_str())); + new_file_path_local_buffer, false, remote_url_.c_str())); vector<WriteRange*> ranges; vector<int32_t> datas; @@ -2113,7 +2115,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) { TEST_F(DiskIoMgrTest, WriteToRemotePartialFileSuccess) { InitRootReservation(LARGE_RESERVATION_LIMIT); num_ranges_written_ = 0; - string remote_file_path = REMOTE_URL + "/test"; + string remote_file_path = remote_url_ + "/test"; string new_file_path_local_buffer = LOCAL_BUFFER_PATH + "/test"; FLAGS_remote_tmp_file_size = "1K"; @@ -2138,7 +2140,7 @@ TEST_F(DiskIoMgrTest, WriteToRemotePartialFileSuccess) { TmpFileRemote** new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote*); *new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote(tmp_file_grp, 0, remote_file_path, - new_file_path_local_buffer, false, REMOTE_URL.c_str())); + new_file_path_local_buffer, false, remote_url_.c_str())); int32_t* data = tmp_pool.Add(new int32_t); *data = rand(); @@ -2195,7 +2197,7 @@ TEST_F(DiskIoMgrTest, WriteToRemotePartialFileSuccess) { TEST_F(DiskIoMgrTest, WriteToRemoteUploadFailed) { InitRootReservation(LARGE_RESERVATION_LIMIT); num_oper_ = 0; - string remote_file_path = REMOTE_URL + "/test"; + string remote_file_path = remote_url_ + "/test"; string non_existent_dir = "/non-existent-dir/test"; FLAGS_remote_tmp_file_size = "1K"; int64_t file_size = 1024; @@ -2215,7 +2217,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteUploadFailed) { TmpFileRemote** new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote*); *new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote( - tmp_file_grp, 0, remote_file_path, non_existent_dir, false, REMOTE_URL.c_str())); + tmp_file_grp, 0, remote_file_path, non_existent_dir, false, remote_url_.c_str())); DiskFile* remote_file = (*new_tmp_file_obj)->DiskFile(); DiskFile* local_buffer_file = (*new_tmp_file_obj)->DiskBufferFile(); @@ -2253,7 +2255,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteUploadFailed) { TEST_F(DiskIoMgrTest, WriteToRemoteEvictLocal) { InitRootReservation(LARGE_RESERVATION_LIMIT); num_ranges_written_ = 0; - string remote_file_path = REMOTE_URL + "/test1"; + string remote_file_path = remote_url_ + "/test1"; string local_buffer_file_path = LOCAL_BUFFER_PATH + "/test1"; int32_t file_size = 1024; FLAGS_remote_tmp_file_size = "1K"; @@ -2273,7 +2275,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteEvictLocal) { unique_ptr<RequestContext> io_ctx = io_mgr.RegisterContext(); TmpFileRemote* new_tmp_file_obj = new TmpFileRemote(tmp_file_grp, 0, remote_file_path, - local_buffer_file_path, false, REMOTE_URL.c_str()); + local_buffer_file_path, false, remote_url_.c_str()); shared_ptr<TmpFileRemote> shared_tmp_file; shared_tmp_file.reset(move(new_tmp_file_obj)); @@ -2351,7 +2353,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteEvictLocal) { // Use an invalid block size to emulate the case when memory allocation failed. TEST_F(DiskIoMgrTest, WriteToRemoteFailMallocBlock) { num_ranges_written_ = 0; - string remote_file_path = REMOTE_URL + "/test1"; + string remote_file_path = remote_url_ + "/test1"; string local_buffer_file_path = LOCAL_BUFFER_PATH + "/test1"; int64_t invalid_block_size = -1; @@ -2366,7 +2368,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteFailMallocBlock) { TmpFileRemote** new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote*); *new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote(tmp_file_grp, 0, remote_file_path, - local_buffer_file_path, false, REMOTE_URL.c_str())); + local_buffer_file_path, false, remote_url_.c_str())); RemoteOperRange::RemoteOperDoneCallback u_callback = [this]( const Status& upload_status) { @@ -2399,7 +2401,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteFailMallocBlock) { TEST_F(DiskIoMgrTest, WriteToRemoteDiffPagesSuccess) { InitRootReservation(LARGE_RESERVATION_LIMIT); num_ranges_written_ = 0; - string remote_file_path = REMOTE_URL + "/test"; + string remote_file_path = remote_url_ + "/test"; string new_file_path_local_buffer = LOCAL_BUFFER_PATH + "/test"; int32_t block_size = 1024; FLAGS_remote_tmp_file_size = "1K"; @@ -2423,7 +2425,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteDiffPagesSuccess) { TmpFileRemote** new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote*); *new_tmp_file_obj = tmp_pool.Add(new TmpFileRemote(tmp_file_grp, 0, remote_file_path, - new_file_path_local_buffer, false, REMOTE_URL.c_str())); + new_file_path_local_buffer, false, remote_url_.c_str())); WriteRange::WriteDoneCallback callback = [=](const Status& status) { ASSERT_EQ(0, status.code()); @@ -2487,7 +2489,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteDiffPagesSuccess) { TEST_F(DiskIoMgrTest, WriteToRemoteFileDeleted) { num_oper_ = 0; num_ranges_written_ = 0; - string remote_file_path = REMOTE_URL + "/test"; + string remote_file_path = remote_url_ + "/test"; string local_buffer_path = LOCAL_BUFFER_PATH + "/test"; FLAGS_remote_tmp_file_size = "1K"; int64_t file_size = 1024; @@ -2511,7 +2513,7 @@ TEST_F(DiskIoMgrTest, WriteToRemoteFileDeleted) { unique_ptr<RequestContext> io_ctx = io_mgr.RegisterContext(); TmpFileRemote tmp_file( - tmp_file_grp, 0, remote_file_path, local_buffer_path, false, REMOTE_URL.c_str()); + tmp_file_grp, 0, remote_file_path, local_buffer_path, false, remote_url_.c_str()); DiskFile* remote_file = tmp_file.DiskFile(); DiskFile* local_buffer_file = tmp_file.DiskBufferFile(); tmp_file.GetWriteFile()->SetActualFileSize(file_size); diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc index 7c26931ab..54aebafa4 100644 --- a/be/src/runtime/test-env.cc +++ b/be/src/runtime/test-env.cc @@ -62,6 +62,7 @@ Status TestEnv::Init() { exec_env_.reset(new ExecEnv); // Populate the ExecEnv state that the backend tests need. RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init()); + RETURN_IF_ERROR(exec_env_->InitHadoopConfig()); exec_env_->tmp_file_mgr_.reset(new TmpFileMgr); if (have_tmp_file_mgr_args_) { RETURN_IF_ERROR(tmp_file_mgr()->InitCustom(tmp_dirs_, one_tmp_dir_per_device_, @@ -140,6 +141,14 @@ int64_t TestEnv::TotalQueryMemoryConsumption() { return total; } +std::string TestEnv::GetDefaultFsPath(std::string path) { + const char* filesystem_prefix = getenv("FILESYSTEM_PREFIX"); + if (filesystem_prefix != nullptr && filesystem_prefix[0] != '\0') { + return Substitute("$0$1", filesystem_prefix, path); + } + return Substitute("$0$1", exec_env_->default_fs(), path); +} + Status TestEnv::CreateQueryState( int64_t query_id, const TQueryOptions* query_options, RuntimeState** runtime_state) { TQueryCtx query_ctx; diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h index b68add29e..3b05e42f7 100644 --- a/be/src/runtime/test-env.h +++ b/be/src/runtime/test-env.h @@ -82,6 +82,9 @@ class TestEnv { exec_env_->codegen_cache_.reset(new CodeGenCache(metrics)); } + /// Get a full URI for the provided path based on the default filesystem. + std::string GetDefaultFsPath(std::string path); + ExecEnv* exec_env() { return exec_env_.get(); } MetricGroup* metrics() { return exec_env_->metrics(); } TmpFileMgr* tmp_file_mgr() { return exec_env_->tmp_file_mgr(); } diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h index dbe7566ef..db3076776 100644 --- a/be/src/runtime/tmp-file-mgr-internal.h +++ b/be/src/runtime/tmp-file-mgr-internal.h @@ -400,11 +400,11 @@ class TmpFileDummy : public TmpFile { /// A configured temporary directory that TmpFileMgr allocates files in. class TmpDir { public: - TmpDir(const std::string& raw_path, const std::string& prefix, bool is_local); + TmpDir(const std::string& raw_path) : raw_path_(raw_path) {} virtual ~TmpDir() {} /// Parse the raw path and identify the scratch directory options. - virtual Status Parse(); + Status Parse(); /// Verify the scratch path and create the directory. virtual Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, @@ -414,7 +414,7 @@ class TmpDir { int priority() { return priority_; } const string& path() { return path_; } IntGauge* bytes_used_metric() const { return bytes_used_metric_; } - bool is_local() { return is_local_dir_; } + virtual bool is_local() { return false; } private: friend class TmpFileMgr; @@ -428,82 +428,64 @@ class TmpDir { /// Parsed raw path of the temporary directory, e.g, trimmed. std::string parsed_raw_path_; - /// The prefix of the path. - std::string prefix_; - /// The complete path to the temporary directory. std::string path_; /// Limit on bytes that should be written to this path. Set to maximum value /// of int64_t if there is no limit. - int64_t bytes_limit_; + int64_t bytes_limit_ = numeric_limits<int64_t>::max(); /// Scratch directory priority. - int priority_; + int priority_ = numeric_limits<int>::max(); /// The current bytes of scratch used for this temporary directory. IntGauge* bytes_used_metric_; - /// If the dir is expected in the local file system or in the remote. - const bool is_local_dir_; - - /// Indicate if the TmpDir is parsed. - bool is_parsed_; - - /// Return the directory path by parsing the input tokens. - /// "Path" is the path generated from the tokens. - /// "Offset" indicates the number of elements has been read in the tokens. - virtual Status GetPathFromToks( - const std::vector<string>& tokens, string* path, int* offset); - - /// A helper function for ParseTokens() to parse the raw path and generate the complete + /// A helper function for Parse() to parse the raw path and generate the complete /// path of the scratch directory. - /// "Offset" indicates the number of elements has been read in the tokens. - Status ParsePath(const std::vector<string>& tokens, int* offset); + /// "Tokens" will contain {path, [bytes_limit, [priority]]} tokens from raw path. + virtual Status ParsePathTokens(std::vector<string>& tokens) = 0; - /// A helper function for ParseTokens() to parse the byte limit of the scratch - /// directory. "Index" indicates the position of the byte_limit in the tokens. - Status ParseByteLimit(const std::vector<string>& tokens, int index); + /// A helper function for Parse() to parse the byte limit of the scratch directory. + Status ParseByteLimit(const string& byte_limit); - /// A helper function for ParseTokens() to parse the priorify of the scratch directory. - /// "Index" indicates the position of the priority in the tokens. - Status ParsePriority(const std::vector<string>& tokens, int index); - - /// A helper function for Parse() to parse raw input of the scratch directory. - Status ParseTokens(); + /// A helper function for Parse() to parse the priorify of the scratch directory. + Status ParsePriority(const string& priority); }; class TmpDirLocal : public TmpDir { public: - TmpDirLocal(const std::string& path) : TmpDir(path, "", true /*is_local*/) {} + TmpDirLocal(const std::string& path) : TmpDir(path) {} Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override; + bool is_local() override { return true; } private: /// A helper function for VerifyAndCreate() to create a local scratch directory. Status CreateLocalDirectory(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, int disk_id, TmpFileMgr* tmp_mgr); + + Status ParsePathTokens(std::vector<string>& tokens) override; }; class TmpDirS3 : public TmpDir { public: - TmpDirS3(const std::string& path) - : TmpDir(path, FILESYS_PREFIX_S3, false /*is_local*/) {} + TmpDirS3(const std::string& path) : TmpDir(path) {} Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override; + + private: + Status ParsePathTokens(std::vector<string>& tokens) override; }; class TmpDirHdfs : public TmpDir { public: - TmpDirHdfs(const std::string& path) - : TmpDir(path, FILESYS_PREFIX_HDFS, false /*is_local*/) {} - Status Parse() override; + TmpDirHdfs(const std::string& path) : TmpDir(path) {} Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override; private: - virtual Status GetPathFromToks( - const std::vector<string>& tokens, string* path, int* offset) override; + Status ParsePathTokens(std::vector<string>& tokens) override; }; /// Temporary file buffer pool allows the temporary files to return their buffer to the diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index 038b7e8ee..f29f12adb 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -71,8 +71,6 @@ static const int64_t GIGABYTE = 1024L * MEGABYTE; static const int64_t TERABYTE = 1024L * GIGABYTE; /// For testing spill to remote. -static const string HDFS_LOCAL_URL = "hdfs://localhost:20500/tmp"; -static const string REMOTE_URL = HDFS_LOCAL_URL; static const string LOCAL_BUFFER_PATH = "/tmp/tmp-file-mgr-test-buffer"; /// Read buffer sizes for TestBatchReadingSetMaxBytes(). @@ -99,6 +97,7 @@ class TmpFileMgrTest : public ::testing::Test { test_env_.reset(new TestEnv); ASSERT_OK(test_env_->Init()); cb_counter_ = 0; + remote_url_ = test_env_->GetDefaultFsPath("/tmp"); } virtual void TearDown() { @@ -186,7 +185,7 @@ class TmpFileMgrTest : public ::testing::Test { vector<string> tmp_create_dirs{{LOCAL_BUFFER_PATH}}; RemoveAndCreateDirs(tmp_create_dirs); tmp_dirs->push_back(Substitute(LOCAL_BUFFER_PATH + ":$0", 4096)); - tmp_dirs->push_back(REMOTE_URL); + tmp_dirs->push_back(remote_url_); } // Helper for TestBatchReadingSetMaxBytes() to set the read buffer size and check @@ -393,6 +392,9 @@ class TmpFileMgrTest : public ::testing::Test { mutex cb_cv_lock_; ConditionVariable cb_cv_; int64_t cb_counter_; + + /// URL for remote spilling. + string remote_url_; }; /// Regression test for IMPALA-2160. Verify that temporary file manager allocates blocks @@ -1088,9 +1090,10 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsingRemotePath) { RemoveAndCreateDirs({"/tmp/local-buffer-dir", "/tmp/local-buffer-dir1", "/tmp/local-buffer-dir2", "/tmp/local-buffer-dir3"}); - // Successful cases for HDFS paths. + // Successful cases for FS paths. // Two types of paths, one with directory, one without. - vector<string> hdfs_paths{"hdfs://localhost:20500", "hdfs://localhost:20500/tmp"}; + vector<string> hdfs_paths{ + test_env_->GetDefaultFsPath(""), test_env_->GetDefaultFsPath("/tmp")}; for (string hdfs_path : hdfs_paths) { string full_hdfs_path = hdfs_path + "/impala-scratch"; auto& dirs1 = GetTmpRemoteDir(CreateTmpFileMgr(hdfs_path + ",/tmp/local-buffer-dir")); @@ -1150,15 +1153,14 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsingRemotePath) { EXPECT_EQ(nullptr, dirs10); // Multiple remote paths, should support only one. - auto& dirs11 = GetTmpRemoteDir(CreateTmpFileMgr(hdfs_path - + ",hdfs://localhost:20501/tmp," - "/tmp/local-buffer-dir")); + auto& dirs11 = GetTmpRemoteDir(CreateTmpFileMgr(Substitute( + "$0,hdfs://localhost:20501/tmp,/tmp/local-buffer-dir", hdfs_path))); EXPECT_NE(nullptr, dirs11); EXPECT_EQ(full_hdfs_path, dirs11->path()); // The order of the buffer and the remote dir should not affect the result. - auto& dirs12 = GetTmpRemoteDir(CreateTmpFileMgr( - "/tmp/local-buffer-dir, " + hdfs_path + ",hdfs://localhost:20501/tmp")); + auto& dirs12 = GetTmpRemoteDir(CreateTmpFileMgr(Substitute( + "/tmp/local-buffer-dir,$0,hdfs://localhost:20501/tmp", hdfs_path))); EXPECT_NE(nullptr, dirs12); EXPECT_EQ(full_hdfs_path, dirs12->path()); } @@ -1258,8 +1260,8 @@ void TmpFileMgrTest::TestCompressBufferManagement( TmpFileMgr tmp_file_mgr; if (spill_to_remote) { RemoveAndCreateDirs(vector<string>{LOCAL_BUFFER_PATH}); - ASSERT_OK(tmp_file_mgr.InitCustom(vector<string>{REMOTE_URL, LOCAL_BUFFER_PATH}, true, - "lz4", true, metrics_.get())); + ASSERT_OK(tmp_file_mgr.InitCustom(vector<string>{remote_url_, LOCAL_BUFFER_PATH}, + true, "lz4", true, metrics_.get())); } else { ASSERT_OK(tmp_file_mgr.InitCustom( vector<string>{"/tmp/tmp-file-mgr-test.1"}, true, "lz4", true, metrics_.get())); @@ -1512,7 +1514,7 @@ TEST_F(TmpFileMgrTest, TestRemoteOneDir) { vector<string> tmp_dirs({LOCAL_BUFFER_PATH}); TmpFileMgr tmp_file_mgr; RemoveAndCreateDirs(tmp_dirs); - tmp_dirs.push_back(REMOTE_URL); + tmp_dirs.push_back(remote_url_); ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, metrics_.get())); TUniqueId id; TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id); @@ -1541,7 +1543,7 @@ TEST_F(TmpFileMgrTest, TestRemoteDirReportError) { vector<string> tmp_dirs({LOCAL_BUFFER_PATH}); TmpFileMgr tmp_file_mgr; RemoveAndCreateDirs(tmp_dirs); - tmp_dirs.push_back(REMOTE_URL); + tmp_dirs.push_back(remote_url_); ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, "", false, metrics_.get())); TUniqueId id; TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id); @@ -1596,7 +1598,7 @@ TEST_F(TmpFileMgrTest, TestRemoteAllocateNonWritable) { vector<string> tmp_dirs({LOCAL_BUFFER_PATH}); TmpFileMgr tmp_file_mgr; RemoveAndCreateDirs(tmp_dirs); - tmp_dirs.push_back(REMOTE_URL); + tmp_dirs.push_back(remote_url_); ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, metrics_.get())); TUniqueId id; TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id); @@ -1629,7 +1631,7 @@ TEST_F(TmpFileMgrTest, TestRemoteScratchLimit) { vector<string> tmp_dirs({LOCAL_BUFFER_PATH}); TmpFileMgr tmp_file_mgr; RemoveAndCreateDirs(tmp_dirs); - tmp_dirs.push_back(REMOTE_URL); + tmp_dirs.push_back(remote_url_); ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, metrics_.get())); TUniqueId id; @@ -1671,7 +1673,7 @@ TEST_F(TmpFileMgrTest, TestRemoteWriteRange) { vector<string> tmp_dirs({LOCAL_BUFFER_PATH}); TmpFileMgr tmp_file_mgr; RemoveAndCreateDirs(tmp_dirs); - tmp_dirs.push_back(REMOTE_URL); + tmp_dirs.push_back(remote_url_); ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, metrics_.get())); TUniqueId id; @@ -1706,7 +1708,7 @@ TEST_F(TmpFileMgrTest, TestRemoteBlockVerification) { vector<string> tmp_dirs({LOCAL_BUFFER_PATH}); TmpFileMgr tmp_file_mgr; RemoveAndCreateDirs(tmp_dirs); - tmp_dirs.push_back(REMOTE_URL); + tmp_dirs.push_back(remote_url_); ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, metrics_.get())); TUniqueId id; @@ -1755,7 +1757,7 @@ TEST_F(TmpFileMgrTest, TestRemoteDirectoryLimits) { vector<string> tmp_dirs{{LOCAL_BUFFER_PATH}}; TmpFileMgr tmp_file_mgr; RemoveAndCreateDirs(tmp_dirs); - tmp_dirs.push_back(REMOTE_URL); + tmp_dirs.push_back(remote_url_); int64_t alloc_size = 1024; int64_t file_size = 1024; FLAGS_remote_tmp_file_size = "1K"; @@ -1805,7 +1807,7 @@ TEST_F(TmpFileMgrTest, TestMixDirectoryLimits) { vector<string> tmp_dirs{{LOCAL_BUFFER_PATH, "/tmp/tmp-file-mgr-test-local:1024"}}; TmpFileMgr tmp_file_mgr; RemoveAndCreateDirs(tmp_create_dirs); - tmp_dirs.push_back(REMOTE_URL); + tmp_dirs.push_back(remote_url_); int64_t alloc_size = 1024; int64_t file_size = 1024; FLAGS_remote_tmp_file_size = "1K"; @@ -1868,7 +1870,7 @@ TEST_F(TmpFileMgrTest, TestMixTmpFileLimits) { vector<string> tmp_dirs{{LOCAL_BUFFER_PATH}}; TmpFileMgr tmp_file_mgr; RemoveAndCreateDirs(tmp_create_dirs); - tmp_dirs.push_back(REMOTE_URL); + tmp_dirs.push_back(remote_url_); int64_t alloc_size = 1024; int64_t file_size = 512 * 1024 * 1024; int64_t offset; @@ -1896,7 +1898,7 @@ void TmpFileMgrTest::TestTmpFileBufferPoolHelper(TmpFileMgr& tmp_file_mgr, vector<string> tmp_create_dirs{{LOCAL_BUFFER_PATH}}; vector<string> tmp_dirs{{Substitute(LOCAL_BUFFER_PATH + ":$0", 2048)}}; RemoveAndCreateDirs(tmp_create_dirs); - tmp_dirs.push_back(REMOTE_URL); + tmp_dirs.push_back(remote_url_); int64_t alloc_size = 1024; int64_t file_size = 1024; FLAGS_remote_tmp_file_size = "1KB"; @@ -2035,7 +2037,7 @@ TEST_F(TmpFileMgrTest, TestRemoteRemoveBuffer) { vector<string> tmp_dirs({LOCAL_BUFFER_PATH}); TmpFileMgr tmp_file_mgr; RemoveAndCreateDirs(tmp_dirs); - tmp_dirs.push_back(REMOTE_URL); + tmp_dirs.push_back(remote_url_); ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, metrics_.get())); TUniqueId id; @@ -2091,7 +2093,7 @@ TEST_F(TmpFileMgrTest, TestRemoteUploadFailed) { vector<string> tmp_create_dirs{{LOCAL_BUFFER_PATH}}; vector<string> tmp_dirs{{Substitute(LOCAL_BUFFER_PATH + ":$0", buffer_limit)}}; RemoveAndCreateDirs(tmp_create_dirs); - tmp_dirs.push_back(HDFS_LOCAL_URL); + tmp_dirs.push_back(remote_url_); ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, metrics_.get())); TUniqueId id; diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index 8a90cf614..f8af71a30 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -290,7 +290,8 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers, string tmp_dir_spec_trimmed(boost::algorithm::trim_left_copy(tmp_dir_spec)); std::unique_ptr<TmpDir> tmp_dir; - if (IsHdfsPath(tmp_dir_spec_trimmed.c_str(), false)) { + if (IsHdfsPath(tmp_dir_spec_trimmed.c_str(), false) + || IsOzonePath(tmp_dir_spec_trimmed.c_str(), false)) { tmp_dir = std::make_unique<TmpDirHdfs>(tmp_dir_spec_trimmed); } else if (IsS3APath(tmp_dir_spec_trimmed.c_str(), false)) { // Initialize the S3 options for later getting S3 connection. @@ -645,105 +646,72 @@ Status TmpFileMgr::TmpDirRemoteCtrl::SetUpReadBufferParams() { return Status::OK(); } -TmpDir::TmpDir(const string& raw_path, const string& prefix, bool is_local) - : raw_path_(raw_path), prefix_(prefix), is_local_dir_(is_local), is_parsed_(false) {} - -Status TmpDir::GetPathFromToks(const vector<string>& toks, string* path, int* offset) { - DCHECK(path != nullptr); - DCHECK(offset != nullptr); - string parsed_raw_path = prefix_; - // The ordinary format of the directory input after split by colon is - // ["path", "bytes_limit", "priority"]. - parsed_raw_path.append(toks[0]); - *path = parsed_raw_path; - *offset = 1; - return Status::OK(); -} - -Status TmpDir::ParsePath(const vector<string>& toks, int* offset) { - string parsed_raw_path; - RETURN_IF_ERROR(GetPathFromToks(toks, &parsed_raw_path, offset)); - parsed_raw_path = trim_right_copy_if(parsed_raw_path, is_any_of("/")); - - // Construct the complete scratch directory path. - boost::filesystem::path tmp_path(parsed_raw_path); - if (is_local_dir_) { - tmp_path = absolute(tmp_path); - parsed_raw_path = tmp_path.string(); - } - boost::filesystem::path scratch_subdir_path(tmp_path / TMP_SUB_DIR_NAME); - parsed_raw_path_ = parsed_raw_path; - path_ = scratch_subdir_path.string(); - - return Status::OK(); -} - -Status TmpDir::ParseByteLimit(const vector<string>& toks, int index) { - DCHECK_GE(index, 0); - int64_t bytes_limit = numeric_limits<int64_t>::max(); - if (index < toks.size()) { - // Parse option byte_limit. - bool is_percent; - bytes_limit = ParseUtil::ParseMemSpec(toks[index], &is_percent, 0); - if (bytes_limit < 0 || is_percent) { - return Status(Substitute( - "Malformed scratch directory capacity configuration '$0'", raw_path_)); - } else if (bytes_limit == 0) { - // Interpret -1, 0 or empty string as no limit. - bytes_limit = numeric_limits<int64_t>::max(); - } +Status TmpDir::ParseByteLimit(const string& byte_limit) { + bool is_percent; + bytes_limit_ = ParseUtil::ParseMemSpec(byte_limit, &is_percent, 0); + if (bytes_limit_ < 0 || is_percent) { + return Status(Substitute( + "Malformed scratch directory capacity configuration '$0'", raw_path_)); + } else if (bytes_limit_ == 0) { + // Interpret -1, 0 or empty string as no limit. + bytes_limit_ = numeric_limits<int64_t>::max(); } - bytes_limit_ = bytes_limit; return Status::OK(); } -Status TmpDir::ParsePriority(const vector<string>& toks, int index) { - DCHECK_GE(index, 0); - int priority = numeric_limits<int>::max(); - if (index < toks.size() && !toks[index].empty()) { +Status TmpDir::ParsePriority(const string& priority) { + if (!priority.empty()) { StringParser::ParseResult result; - priority = - StringParser::StringToInt<int>(toks[index].data(), toks[index].size(), &result); + priority_ = StringParser::StringToInt<int>( + priority.c_str(), priority.size(), &result); if (result != StringParser::PARSE_SUCCESS) { return Status(Substitute( "Malformed scratch directory priority configuration '$0'", raw_path_)); } } - priority_ = priority; return Status::OK(); } -Status TmpDir::ParseTokens() { +Status TmpDir::Parse() { + DCHECK(parsed_raw_path_.empty() && path_.empty()); + vector<string> toks; - string path_without_prefix = raw_path_.substr(strlen(prefix_.c_str())); - split(toks, path_without_prefix, is_any_of(":"), token_compress_off); - int offset = 0; - RETURN_IF_ERROR(ParsePath(toks, &offset)); - const int max_num_options = 2; - // The max size after the split by colon. - int max_num_tokens = max_num_options + offset; + RETURN_IF_ERROR(ParsePathTokens(toks)); + + constexpr int max_num_tokens = 3; if (toks.size() > max_num_tokens) { return Status(Substitute( "Could not parse temporary dir specifier, too many colons: '$0'", raw_path_)); } + + // Construct the complete scratch directory path. + toks[0] = trim_right_copy_if(toks[0], is_any_of("/")); + parsed_raw_path_ = toks[0]; + path_ = (boost::filesystem::path(toks[0]) / TMP_SUB_DIR_NAME).string(); + // The scratch path may have two options "bytes limit" and "priority". - // The priority should be the first option. - RETURN_IF_ERROR(ParseByteLimit(toks, offset++)); + // The bytes limit should be the first option. + if (toks.size() > 1) { + RETURN_IF_ERROR(ParseByteLimit(toks[1])); + } // The priority should be the second option. - RETURN_IF_ERROR(ParsePriority(toks, offset)); + if (toks.size() > 2) { + RETURN_IF_ERROR(ParsePriority(toks[2])); + } return Status::OK(); } -Status TmpDir::Parse() { - DCHECK(!is_parsed_); - RETURN_IF_ERROR(ParseTokens()); - is_parsed_ = true; +Status TmpDirLocal::ParsePathTokens(vector<string>& toks) { + // The ordinary format of the directory input after split by colon is + // {path, [bytes_limit, [priority]]}. + split(toks, raw_path_, is_any_of(":"), token_compress_off); + toks[0] = absolute(toks[0]).string(); return Status::OK(); } Status TmpDirLocal::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) { - DCHECK(is_parsed_); + DCHECK(!parsed_raw_path_.empty()); // The path must be a writable directory. Status status = FileSystemUtil::VerifyIsDirectory(parsed_raw_path_); if (!status.ok()) { @@ -784,6 +752,7 @@ Status TmpDirLocal::VerifyAndCreate(MetricGroup* metrics, Status TmpDirLocal::CreateLocalDirectory(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, int disk_id, TmpFileMgr* tmp_mgr) { + DCHECK(!path_.empty()); // Create the directory, destroying if already present. If this succeeds, we will // have an empty writable scratch directory. Status status = FileSystemUtil::RemoveAndCreateDirectory(path_); @@ -807,7 +776,8 @@ Status TmpDirLocal::CreateLocalDirectory(MetricGroup* metrics, if (disk_id >= 0) (*is_tmp_dir_on_disk)[disk_id] = true; LOG(INFO) << "Using scratch directory " << path_ << " on " << "disk " << disk_id - << " limit: " << PrettyPrinter::PrintBytes(bytes_limit_); + << " limit: " << PrettyPrinter::PrintBytes(bytes_limit_) + << ", priority: " << priority_; bytes_used_metric_ = metrics->AddGauge( SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_mgr->tmp_dirs_.size())); } else { @@ -818,45 +788,44 @@ Status TmpDirLocal::CreateLocalDirectory(MetricGroup* metrics, return status; } +Status TmpDirS3::ParsePathTokens(vector<string>& toks) { + // The ordinary format of the directory input after split by colon is + // {scheme, path, [bytes_limit, [priority]]}. Combine scheme and path. + split(toks, raw_path_, is_any_of(":"), token_compress_off); + // Only called on paths starting with `s3a://`, so there will always be at least 2. + DCHECK(toks.size() >= 2); + toks[0] = Substitute("$0:$1", toks[0], toks[1]); + toks.erase(toks.begin()+1); + return Status::OK(); +} + Status TmpDirS3::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) { // For the S3 path, it doesn't need to create the directory for the uploading // as long as the S3 address is correct. - DCHECK(is_parsed_); + DCHECK(!path_.empty()); hdfsFS hdfs_conn; RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( path_, &hdfs_conn, &(tmp_mgr->hdfs_conns_), tmp_mgr->s3a_options())); return Status::OK(); } -Status TmpDirHdfs::GetPathFromToks( - const vector<string>& toks, string* parsed_path, int* offset) { - DCHECK(parsed_path != nullptr); - DCHECK(offset != nullptr); +Status TmpDirHdfs::ParsePathTokens(vector<string>& toks) { // We enforce the HDFS scratch path to have the port number, and the format after split - // by colon is ["path", "port_num", "bytes_limit", "priority"], therefore, the offset - // to be returned is 2. - if (toks.size() < 2) { + // by colon is {scheme, path, port_num, [bytes_limit, [priority]]}. Coalesce the URI. + split(toks, raw_path_, is_any_of(":"), token_compress_off); + if (toks.size() < 3) { return Status( - Substitute("The scrach path should have the port number: '$0'", raw_path_)); + Substitute("The scratch path should have the port number: '$0'", raw_path_)); } - string parsed_raw_path = prefix_; - parsed_raw_path.append(toks[0]).append(":").append(toks[1]); - *parsed_path = parsed_raw_path; - *offset = 2; - return Status::OK(); -} - -Status TmpDirHdfs::Parse() { - DCHECK(!is_parsed_); - RETURN_IF_ERROR(ParseTokens()); - is_parsed_ = true; + toks[0] = Substitute("$0:$1:$2", toks[0], toks[1], toks[2]); + toks.erase(toks.begin()+1, toks.begin()+3); return Status::OK(); } Status TmpDirHdfs::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) { - DCHECK(is_parsed_); + DCHECK(!path_.empty()); hdfsFS hdfs_conn; // If the HDFS path doesn't exist, it would fail while uploading, so we // create the HDFS path if it doesn't exist. @@ -978,6 +947,10 @@ TmpFileRemote::TmpFileRemote(TmpFileGroup* file_group, TmpFileMgr::DeviceId devi disk_type_ = io::DiskFileType::DFS; disk_id_ = file_group->io_mgr_->RemoteDfsDiskId(); disk_id_file_op_ = file_group->io_mgr_->RemoteDfsDiskFileOperId(); + } else if (IsOzonePath(hdfs_url, false)) { + disk_type_ = io::DiskFileType::DFS; + disk_id_ = file_group->io_mgr_->RemoteOzoneDiskId(); + disk_id_file_op_ = file_group->io_mgr_->RemoteDfsDiskFileOperId(); } else if (IsS3APath(hdfs_url, false)) { disk_type_ = io::DiskFileType::S3; disk_id_ = file_group->io_mgr_->RemoteS3DiskId(); @@ -1382,8 +1355,9 @@ Status TmpFileGroup::AllocateRemoteSpace(int64_t num_bytes, TmpFile** tmp_file, int64_t* file_offset, vector<int>* at_capacity_dirs) { // Only one remote dir supported currently. string dir = tmp_file_mgr_->tmp_dirs_remote_->path(); - // It is not supposed to have a remote directory other than HDFS or S3. - DCHECK(IsHdfsPath(dir.c_str(), false) || IsS3APath(dir.c_str(), false)); + // It is not supposed to have a remote directory other than HDFS, Ozone, or S3. + DCHECK(IsHdfsPath(dir.c_str(), false) || IsOzonePath(dir.c_str(), false) + || IsS3APath(dir.c_str(), false)); // Look for the space from a previous created file. if (!tmp_files_remote_.empty()) { diff --git a/tests/common/skip.py b/tests/common/skip.py index 46fc303fa..85d2c678d 100644 --- a/tests/common/skip.py +++ b/tests/common/skip.py @@ -99,7 +99,7 @@ class SkipIf: not_hdfs = pytest.mark.skipif(not IS_HDFS, reason="HDFS admin needed") not_dfs = pytest.mark.skipif(not (IS_HDFS or IS_OZONE), reason="HDFS/Ozone Filesystem needed") - not_scratch_fs = pytest.mark.skipif(not IS_HDFS, + not_scratch_fs = pytest.mark.skipif(not (IS_HDFS or IS_OZONE), reason="Scratch dirs for temporary file spilling not supported") sfs_unsupported = pytest.mark.skipif(not (IS_HDFS or IS_S3 or IS_ABFS or IS_ADLS or IS_GCS), reason="Hive support for sfs+ is limited, HIVE-26757") diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py index 66492bcf7..35adf1c40 100644 --- a/tests/custom_cluster/test_scratch_disk.py +++ b/tests/custom_cluster/test_scratch_disk.py @@ -27,6 +27,7 @@ import tempfile from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.verifiers.metric_verifier import MetricVerifier from tests.common.skip import SkipIf +from tests.util.hdfs_util import NAMENODE class TestScratchDir(CustomClusterTestSuite): @classmethod @@ -229,13 +230,9 @@ class TestScratchDir(CustomClusterTestSuite): handle = self.execute_query_async_using_client(client, self.spill_query, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) - metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0') - metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1') - metrics2 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-2') - metrics3 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-3') - metrics4 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-4') - assert (metrics0 > 0 and metrics1 > 0 and metrics2 > 0 and metrics3 > 0 - and metrics4 > 0) + for i in range(5): + impalad.service.wait_for_metric_value( + 'tmp-file-mgr.scratch-space-bytes-used.dir-' + str(i), 1, allow_greater=True) results = client.fetch(self.spill_query, handle) assert results.success client.close_query(handle) @@ -263,26 +260,30 @@ class TestScratchDir(CustomClusterTestSuite): handle = self.execute_query_async_using_client(client, self.spill_query, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) + # dir1 and dir3 have highest priority and will be used as scratch disk. + impalad.service.wait_for_metric_value( + 'tmp-file-mgr.scratch-space-bytes-used.dir-1', 1, allow_greater=True) + impalad.service.wait_for_metric_value( + 'tmp-file-mgr.scratch-space-bytes-used.dir-3', 1, allow_greater=True) metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0') - metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1') metrics2 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-2') - metrics3 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-3') metrics4 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-4') - # dir1 and dir3 have highest priority and will be used as scratch disk. - assert (metrics1 > 0 and metrics3 > 0 and metrics0 == 0 and metrics2 == 0 - and metrics4 == 0) + assert (metrics0 == 0 and metrics2 == 0 and metrics4 == 0) results = client.fetch(self.spill_query, handle) assert results.success client.close_query(handle) client.close() + def dfs_tmp_path(self): + return "{}/tmp".format(NAMENODE) + @pytest.mark.execute_serially @SkipIf.not_scratch_fs def test_scratch_dirs_remote_spill(self, vector): # Test one remote directory with one its local buffer directory. normal_dirs = self.generate_dirs(1) - # Use local hdfs for testing. Could be changed to S3. - normal_dirs.append('hdfs://localhost:20500/tmp') + # Use dfs for testing. + normal_dirs.append(self.dfs_tmp_path()) self._start_impala_cluster([ '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)), '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'], @@ -296,9 +297,9 @@ class TestScratchDir(CustomClusterTestSuite): handle = self.execute_query_async_using_client(client, self.spill_query, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) - metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0') # Dir0 is the remote directory. - assert (metrics0 > 0) + impalad.service.wait_for_metric_value( + 'tmp-file-mgr.scratch-space-bytes-used.dir-0', 1, allow_greater=True) results = client.fetch(self.spill_query, handle) assert results.success client.close_query(handle) @@ -313,7 +314,7 @@ class TestScratchDir(CustomClusterTestSuite): normal_dirs = self.generate_dirs(2) normal_dirs[0] = '{0}::{1}'.format(normal_dirs[0], 1) normal_dirs[1] = '{0}:2GB:{1}'.format(normal_dirs[1], 0) - normal_dirs.append('hdfs://localhost:20500/tmp') + normal_dirs.append(self.dfs_tmp_path()) self._start_impala_cluster([ '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)), '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'], @@ -327,11 +328,12 @@ class TestScratchDir(CustomClusterTestSuite): handle = self.execute_query_async_using_client(client, self.spill_query, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) - metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0') - metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1') # Local directory always ranks before the remote one, so dir0 is the local directory. # Only spill to dir0 because it has enough space for the spilling. - assert (metrics0 > 0 and metrics1 == 0) + impalad.service.wait_for_metric_value( + 'tmp-file-mgr.scratch-space-bytes-used.dir-0', 1, allow_greater=True) + metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1') + assert (metrics1 == 0) results = client.fetch(self.spill_query, handle) assert results.success client.close_query(handle) @@ -346,7 +348,7 @@ class TestScratchDir(CustomClusterTestSuite): normal_dirs = self.generate_dirs(2) normal_dirs[0] = '{0}:32MB:{1}'.format(normal_dirs[0], 0) normal_dirs[1] = '{0}:4MB:{1}'.format(normal_dirs[1], 1) - normal_dirs.append('hdfs://localhost:20500/tmp') + normal_dirs.append(self.dfs_tmp_path()) self._start_impala_cluster([ '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)), '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'], @@ -360,12 +362,13 @@ class TestScratchDir(CustomClusterTestSuite): handle = self.execute_query_async_using_client(client, self.spill_query, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) - metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0') - metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1') # Local directory always ranks before the remote one, so dir0 is the local directory. # The query spills to both dir0 and dir1. By default the remote file is 16MB each, # so the value of metrics1 should be at least one file size. - assert (metrics0 == 4 * 1024 * 1024 and metrics1 % (16 * 1024 * 1024) == 0) + impalad.service.wait_for_metric_value( + 'tmp-file-mgr.scratch-space-bytes-used.dir-0', 4 * 1024 * 1024) + metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1') + assert (metrics1 % (16 * 1024 * 1024) == 0) results = client.fetch(self.spill_query, handle) assert results.success client.close_query(handle) @@ -377,7 +380,7 @@ class TestScratchDir(CustomClusterTestSuite): # One local buffer directory and one remote directory. normal_dirs = self.generate_dirs(1) normal_dirs[0] = '{0}:16MB:{1}'.format(normal_dirs[0], 0) - normal_dirs.append('hdfs://localhost:20500/tmp') + normal_dirs.append(self.dfs_tmp_path()) self._start_impala_cluster([ '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)), '--impalad_args=--allow_multiple_scratch_dirs_per_device=true', @@ -392,9 +395,12 @@ class TestScratchDir(CustomClusterTestSuite): handle = self.execute_query_async_using_client(client, self.spill_query, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) - metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0') # The query spills to the remote directories and creates remote files, # so that the size is bigger than 0, and be integer times of remote file size. + impalad.service.wait_for_metric_value( + 'tmp-file-mgr.scratch-space-bytes-used.dir-0', + 8 * 1024 * 1024, allow_greater=True) + metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0') assert (metrics0 > 0 and metrics0 % (8 * 1024 * 1024) == 0) results = client.fetch(self.spill_query, handle) assert results.success @@ -408,7 +414,7 @@ class TestScratchDir(CustomClusterTestSuite): directory to test if there is a deadlock issue.''' normal_dirs = self.generate_dirs(1) normal_dirs[0] = '{0}:16MB:{1}'.format(normal_dirs[0], 0) - normal_dirs.append('hdfs://localhost:20500/tmp') + normal_dirs.append(self.dfs_tmp_path()) num = 5 self._start_impala_cluster([ '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)), @@ -454,7 +460,7 @@ class TestScratchDir(CustomClusterTestSuite): # Set the buffer directory small enough to spill to the remote one. normal_dirs = self.generate_dirs(1) normal_dirs[0] = '{0}:2MB:{1}'.format(normal_dirs[0], 1) - normal_dirs.append('hdfs://localhost:20500/tmp') + normal_dirs.append(self.dfs_tmp_path()) self._start_impala_cluster([ '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)), '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
