IMPALA-5378: Disk IO manager needs to understand ADLS

The Disk IO Manager had customized support for S3 and remote HDFS that
allows for these to use a separate queue and have a customized number
of IO threads. ADLS did not have this support.

Based on the code in DiskIoMgr::Init and DiskIoMgr::AssignQueue, IOs
for ADLS were previously put in local disk queues. Since local disks
are considered rotational unless we can confirm otherwise by looking at
the /sys filesystem, this means that THREADS_PER_ROTATIONAL_DISK=1 was
being applied as the thread count.

This patch adds customized support for ADLS, similar to how it was done
for S3. We set 16 threads as the default number of IO threads for ADLS.
For smaller clusters, setting a higher number like 64 would work better.
We keep the thread count to a lower default of 16 since there is an
undocumented concurrency limit for clusters, which is around 500-700
connections, which means we would hurt node level parallelism if we
have higher thread level parallelism, for larger clusters.

We also set the default maximum chunk size for ADLS as 128k. This is due
to the fact that direct reads aren't supported for ADLS, which means that
the JNI array allocation and the memcpy adds significant overhead for
larger buffers. 128k was chosen emperically for S3 for the same reason.
Since this reason also holds for ADLS, we keep the same value. A new
flag called FLAGS_adls_read_chunk_size is used to control this value.

TODO: Settle on a buffer size with the most optimal buffer size
emperically.

Change-Id: I067f053fec941e3631610c5cc89a384f257ba906
Reviewed-on: http://gerrit.cloudera.org:8080/7033
Reviewed-by: Sailesh Mukil <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b8558506
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b8558506
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b8558506

Branch: refs/heads/branch-2.9.0
Commit: b8558506957dbf44b8ceb29c8b7382bfd8180e05
Parents: 117fc38
Author: Sailesh Mukil <[email protected]>
Authored: Tue May 30 19:50:13 2017 +0000
Committer: Taras Bobrovytsky <[email protected]>
Committed: Thu Jun 1 17:52:18 2017 -0700

----------------------------------------------------------------------
 be/src/runtime/disk-io-mgr-scan-range.cc | 11 +++++++++++
 be/src/runtime/disk-io-mgr.cc            |  9 +++++++++
 be/src/runtime/disk-io-mgr.h             |  4 ++++
 3 files changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b8558506/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc 
b/be/src/runtime/disk-io-mgr-scan-range.cc
index 9ccd1da..0668eb6 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -33,6 +33,13 @@ DEFINE_bool(use_hdfs_pread, false, "Enables using 
hdfsPread() instead of hdfsRea
     "when performing HDFS read operations. This is necessary to use HDFS 
hedged reads "
     "(assuming the HDFS client is configured to do so).");
 
+// TODO: Run perf tests and empirically settle on the most optimal default 
value for the
+// read buffer size. Currently setting it as 128k for the same reason as for 
S3, i.e.
+// due to JNI array allocation and memcpy overhead, 128k was emperically found 
to have the
+// least overhead.
+DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to 
use when "
+    "reading from ADLS.");
+
 // Implementation of the ScanRange functionality. Each ScanRange contains a 
queue
 // of ready buffers. For each ScanRange, there is only a single producer and
 // consumer thread, i.e. only one disk thread will push to a scan range at
@@ -389,6 +396,10 @@ int64_t DiskIoMgr::ScanRange::MaxReadChunkSize() const {
     DCHECK(IsS3APath(file()));
     return 128 * 1024;
   }
+  if (disk_id_ == io_mgr_->RemoteAdlsDiskId()) {
+    DCHECK(IsADLSPath(file()));
+    return FLAGS_adls_read_chunk_size;
+  }
   // The length argument of hdfsRead() is an int. Ensure we don't overflow it.
   return numeric_limits<int>::max();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b8558506/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 1f28b1a..536ed88 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -51,6 +51,11 @@ DEFINE_int32(num_remote_hdfs_io_threads, 8, "number of 
remote HDFS I/O threads")
 // open to S3 and use of multiple CPU cores since S3 reads are relatively 
compute
 // expensive (SSL and JNI buffer overheads).
 DEFINE_int32(num_s3_io_threads, 16, "number of S3 I/O threads");
+// The maximum number of ADLS I/O threads. This number is a good default to 
have for
+// clusters that may vary widely in size, due to an undocumented concurrency 
limit
+// enforced by ADLS for a cluster, which spans between 500-700. For smaller 
clusters
+// (~10 nodes), 64 threads would be more ideal.
+DEFINE_int32(num_adls_io_threads, 16, "number of ADLS I/O threads");
 // The read size is the size of the reads sent to hdfs/os.
 // There is a trade off of latency and throughout, trying to keep disks busy 
but
 // not introduce seeks.  The literature seems to agree that with 8 MB reads, 
random
@@ -388,6 +393,8 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
       num_threads_per_disk = FLAGS_num_remote_hdfs_io_threads;
     } else if (i == RemoteS3DiskId()) {
       num_threads_per_disk = FLAGS_num_s3_io_threads;
+    } else if (i == RemoteAdlsDiskId()) {
+      num_threads_per_disk = FLAGS_num_adls_io_threads;
     } else if (num_threads_per_disk_ != 0) {
       num_threads_per_disk = num_threads_per_disk_;
     } else if (DiskInfo::is_rotational(i)) {
@@ -1248,9 +1255,11 @@ int DiskIoMgr::AssignQueue(const char* file, int 
disk_id, bool expected_local) {
       return RemoteDfsDiskId();
     }
     if (IsS3APath(file)) return RemoteS3DiskId();
+    if (IsADLSPath(file)) return RemoteAdlsDiskId();
   }
   // Assign to a local disk queue.
   DCHECK(!IsS3APath(file)); // S3 is always remote.
+  DCHECK(!IsADLSPath(file)); // ADLS is always remote.
   if (disk_id == -1) {
     // disk id is unknown, assign it an arbitrary one.
     disk_id = next_disk_id_.Add(1);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b8558506/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index b70c9c8..41d3226 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -756,6 +756,9 @@ class DiskIoMgr {
   /// The disk ID (and therefore disk_queues_ index) used for S3 accesses.
   int RemoteS3DiskId() const { return num_local_disks() + 
REMOTE_S3_DISK_OFFSET; }
 
+  /// The disk ID (and therefore disk_queues_ index) used for ADLS accesses.
+  int RemoteAdlsDiskId() const { return num_local_disks() + 
REMOTE_ADLS_DISK_OFFSET; }
+
   /// Dumps the disk IoMgr queues (for readers and disks)
   std::string DebugString();
 
@@ -787,6 +790,7 @@ class DiskIoMgr {
   enum {
     REMOTE_DFS_DISK_OFFSET = 0,
     REMOTE_S3_DISK_OFFSET,
+    REMOTE_ADLS_DISK_OFFSET,
     REMOTE_NUM_DISKS
   };
 

Reply via email to