This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d89c04bf806682d3449c566ce979632bd2ac5b29
Author: Yida Wu <[email protected]>
AuthorDate: Sat Feb 27 18:54:09 2021 -0800

    IMPALA-10529: Fix hit DCHECK in DiskIoMgr::AssignQueue in core-s3 build
    
    For start option "scratch_dirs", it only considers local filesystem as
    the default filesystem, regardless of the setting of DefaultFS(for a
    remote scratch dir, it needs to explicitly set it with the remote fs
    prefix). However, the function AssignQueue() would assign the queue
    based on not only the path string but also the default filesystem
    setting. For example, if scratch_dirs is set as "/tmp", the scratch dir
    is supposed to be in the local filesystem, but the AssignQueue() would
    consider it as "s3a://xxx/tmp" if a s3 path is set as the default fs.
    To fix this, the solution is to add a bool variable to AssignQueue() to
    decide whether or not to check the default fs setting when parsing the
    file path. For all of the scratch dirs, AssignQueue() won't check the
    default fs.
    
    Tests:
    Added a unit testcase: TmpFileMgrTest::TestSpillingWithRemoteDefaultFS.
    Ran and Passed TmpFileMgrTest.
    
    Change-Id: Ic07945abe65d90235aa8dea92dd3c3821a4f1f53
    Reviewed-on: http://gerrit.cloudera.org:8080/17136
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/io/disk-io-mgr.cc    | 21 +++++++++++----------
 be/src/runtime/io/disk-io-mgr.h     |  6 ++++--
 be/src/runtime/io/scan-range.cc     |  4 ++--
 be/src/runtime/test-env.h           |  3 +++
 be/src/runtime/tmp-file-mgr-test.cc | 28 ++++++++++++++++++++++++++++
 be/src/runtime/tmp-file-mgr.cc      | 10 ++++++++--
 6 files changed, 56 insertions(+), 16 deletions(-)

diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index f963de4..f05afae 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -810,22 +810,23 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, 
WriteRange* write_range) {
   return Status::OK();
 }
 
-int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) 
{
+int DiskIoMgr::AssignQueue(
+    const char* file, int disk_id, bool expected_local, bool check_default_fs) 
{
   // If it's a remote range, check for an appropriate remote disk queue.
   if (!expected_local) {
-    if (IsHdfsPath(file) && FLAGS_num_remote_hdfs_io_threads > 0) {
+    if (IsHdfsPath(file, check_default_fs) && FLAGS_num_remote_hdfs_io_threads 
> 0) {
       return RemoteDfsDiskId();
     }
-    if (IsS3APath(file)) return RemoteS3DiskId();
-    if (IsABFSPath(file)) return RemoteAbfsDiskId();
-    if (IsADLSPath(file)) return RemoteAdlsDiskId();
-    if (IsOzonePath(file)) return RemoteOzoneDiskId();
+    if (IsS3APath(file, check_default_fs)) return RemoteS3DiskId();
+    if (IsABFSPath(file, check_default_fs)) return RemoteAbfsDiskId();
+    if (IsADLSPath(file, check_default_fs)) return RemoteAdlsDiskId();
+    if (IsOzonePath(file, check_default_fs)) return RemoteOzoneDiskId();
   }
   // Assign to a local disk queue.
-  DCHECK(!IsS3APath(file)); // S3 is always remote.
-  DCHECK(!IsABFSPath(file)); // ABFS is always remote.
-  DCHECK(!IsADLSPath(file)); // ADLS is always remote.
-  DCHECK(!IsOzonePath(file)); // Ozone is always remote.
+  DCHECK(!IsS3APath(file, check_default_fs)); // S3 is always remote.
+  DCHECK(!IsABFSPath(file, check_default_fs)); // ABFS is always remote.
+  DCHECK(!IsADLSPath(file, check_default_fs)); // ADLS is always remote.
+  DCHECK(!IsOzonePath(file, check_default_fs)); // Ozone is always remote.
   if (disk_id == -1) {
     // disk id is unknown, assign it an arbitrary one.
     disk_id = next_disk_id_.Add(1);
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 32a50e4..400702a 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -288,8 +288,10 @@ class DiskIoMgr : public CacheLineAligned {
   /// Determine which disk queue this file should be assigned to.  Returns an 
index into
   /// disk_queues_.  The disk_id is the volume ID for the local disk that 
holds the
   /// files, or -1 if unknown.  Flag expected_local is true iff this impalad is
-  /// co-located with the datanode for this file.
-  int AssignQueue(const char* file, int disk_id, bool expected_local = false);
+  /// co-located with the datanode for this file. Flag check_default_fs is 
false iff
+  /// the file is a temporary file.
+  int AssignQueue(
+      const char* file, int disk_id, bool expected_local, bool 
check_default_fs);
 
   int64_t min_buffer_size() const { return min_buffer_size_; }
   int64_t max_buffer_size() const { return max_buffer_size_; }
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index d1d75c9..87b5273 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -506,8 +506,8 @@ ScanRange* ScanRange::AllocateScanRange(ObjectPool* 
obj_pool, hdfsFS fs, const c
   DCHECK_GE(disk_id, -1);
   DCHECK_GE(offset, 0);
   DCHECK_GE(len, 0);
-  disk_id =
-      ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue(file, disk_id, 
expected_local);
+  disk_id = ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue(
+      file, disk_id, expected_local, /* check_default_fs */ true);
   ScanRange* range = obj_pool->Add(new ScanRange);
   range->Reset(fs, file, len, offset, disk_id, expected_local, mtime, 
buffer_opts,
       move(sub_ranges), metadata);
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index 6e50fd0..5dec325 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -52,6 +52,9 @@ class TestEnv {
   /// If not called, a process memory tracker with no limit is created.
   void SetProcessMemTrackerArgs(int64_t bytes_limit, bool use_metrics);
 
+  /// Set the Default FS of ExecEnv.
+  void SetDefaultFS(const string& fs) { exec_env_->default_fs_ = fs; }
+
   /// Initialize the TestEnv with the specified arguments.
   Status Init();
 
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index 22d0ce0e..e598b91 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -1757,4 +1757,32 @@ TEST_F(TmpFileMgrTest, 
TestTmpFileBufferPoolOneWriteDone) {
   TestTmpFileBufferPoolTearDown(tmp_file_mgr);
 }
 
+/// Test setting a remote fs for the default fs, but should not affect the 
spilling.
+TEST_F(TmpFileMgrTest, TestSpillingWithRemoteDefaultFS) {
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1"});
+  TmpFileMgr tmp_file_mgr;
+  RemoveAndCreateDirs(tmp_dirs);
+  string org_default_fs = test_env_->exec_env()->default_fs();
+  string fake_remote_default_fs = "s3a://fake_s3";
+  test_env_->SetDefaultFS(fake_remote_default_fs);
+
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, 
metrics_.get()));
+  TUniqueId id;
+  TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
+  string data = "arbitrary data";
+  MemRange data_mem_range(reinterpret_cast<uint8_t*>(&data[0]), data.size());
+
+  unique_ptr<TmpWriteHandle> handle;
+  WriteRange::WriteDoneCallback callback = [this](const Status& status) {
+    EXPECT_TRUE(status.ok());
+    SignalCallback(status);
+  };
+  ASSERT_OK(file_group.Write(data_mem_range, callback, &handle));
+  WaitForWrite(handle.get());
+  WaitForCallbacks(1);
+  file_group.Close();
+  test_env_->SetDefaultFS(org_default_fs);
+  test_env_->TearDownQueries();
+}
+
 } // namespace impala
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 738cc72..afc2436 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -657,10 +657,14 @@ TmpFile::TmpFile(
     blacklisted_(false) {}
 
 int TmpFile::AssignDiskQueue(bool is_local_buffer) const {
+  // The file paths of TmpFiles are absolute paths, doesn't support default fs.
   if (is_local_buffer) {
-    return file_group_->io_mgr_->AssignQueue(local_buffer_path_.c_str(), -1, 
true);
+    // Assign a disk queue for a local buffer, which is associated with a 
remote file.
+    return file_group_->io_mgr_->AssignQueue(local_buffer_path_.c_str(),
+        /* disk_id */ -1, /* expected_local */ true, /* check_default_fs */ 
false);
   }
-  return file_group_->io_mgr_->AssignQueue(path_.c_str(), disk_id_, 
expected_local_);
+  return file_group_->io_mgr_->AssignQueue(
+      path_.c_str(), disk_id_, expected_local_, /* check_default_fs */ false);
 }
 
 bool TmpFile::Blacklist(const ErrorMsg& msg) {
@@ -1518,6 +1522,8 @@ Status TmpWriteHandle::Write(RequestContext* io_ctx, 
MemRange buffer,
 
   // Set all member variables before calling AddWriteRange(): after it 
succeeds,
   // WriteComplete() may be called concurrently with the remainder of this 
function.
+  // If the TmpFile is not local, the disk queue assigned should be for the
+  // buffer.
   data_len_ = buffer.len();
   file_ = tmp_file;
   write_range_.reset(new WriteRange(tmp_file->path(), file_offset,

Reply via email to