Repository: incubator-impala Updated Branches: refs/heads/master 9caea9bfa -> 9c4f75a6a
IMPALA-5378: Disk IO manager needs to understand ADLS The Disk IO Manager had customized support for S3 and remote HDFS that allows for these to use a separate queue and have a customized number of IO threads. ADLS did not have this support. Based on the code in DiskIoMgr::Init and DiskIoMgr::AssignQueue, IOs for ADLS were previously put in local disk queues. Since local disks are considered rotational unless we can confirm otherwise by looking at the /sys filesystem, this means that THREADS_PER_ROTATIONAL_DISK=1 was being applied as the thread count. This patch adds customized support for ADLS, similar to how it was done for S3. We set 16 threads as the default number of IO threads for ADLS. For smaller clusters, setting a higher number like 64 would work better. We keep the thread count to a lower default of 16 since there is an undocumented concurrency limit for clusters, which is around 500-700 connections, which means we would hurt node level parallelism if we have higher thread level parallelism, for larger clusters. We also set the default maximum chunk size for ADLS as 128k. This is due to the fact that direct reads aren't supported for ADLS, which means that the JNI array allocation and the memcpy adds significant overhead for larger buffers. 128k was chosen emperically for S3 for the same reason. Since this reason also holds for ADLS, we keep the same value. A new flag called FLAGS_adls_read_chunk_size is used to control this value. TODO: Settle on a buffer size with the most optimal buffer size emperically. Change-Id: I067f053fec941e3631610c5cc89a384f257ba906 Reviewed-on: http://gerrit.cloudera.org:8080/7033 Reviewed-by: Sailesh Mukil <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9c4f75a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9c4f75a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9c4f75a6 Branch: refs/heads/master Commit: 9c4f75a6a877f2b5e56d24f1e73fcf72d74f9129 Parents: 9caea9b Author: Sailesh Mukil <[email protected]> Authored: Tue May 30 19:50:13 2017 +0000 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Jun 1 22:55:09 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/disk-io-mgr-scan-range.cc | 11 +++++++++++ be/src/runtime/disk-io-mgr.cc | 9 +++++++++ be/src/runtime/disk-io-mgr.h | 4 ++++ 3 files changed, 24 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9c4f75a6/be/src/runtime/disk-io-mgr-scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc index 3fcd59d..1528cd6 100644 --- a/be/src/runtime/disk-io-mgr-scan-range.cc +++ b/be/src/runtime/disk-io-mgr-scan-range.cc @@ -33,6 +33,13 @@ DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRea "when performing HDFS read operations. This is necessary to use HDFS hedged reads " "(assuming the HDFS client is configured to do so)."); +// TODO: Run perf tests and empirically settle on the most optimal default value for the +// read buffer size. Currently setting it as 128k for the same reason as for S3, i.e. +// due to JNI array allocation and memcpy overhead, 128k was emperically found to have the +// least overhead. +DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to use when " + "reading from ADLS."); + // Implementation of the ScanRange functionality. Each ScanRange contains a queue // of ready buffers. For each ScanRange, there is only a single producer and // consumer thread, i.e. only one disk thread will push to a scan range at @@ -399,6 +406,10 @@ int64_t DiskIoMgr::ScanRange::MaxReadChunkSize() const { DCHECK(IsS3APath(file())); return 128 * 1024; } + if (disk_id_ == io_mgr_->RemoteAdlsDiskId()) { + DCHECK(IsADLSPath(file())); + return FLAGS_adls_read_chunk_size; + } // The length argument of hdfsRead() is an int. Ensure we don't overflow it. return numeric_limits<int>::max(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9c4f75a6/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index a6d3c19..01dea0e 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -52,6 +52,11 @@ DEFINE_int32(num_remote_hdfs_io_threads, 8, "number of remote HDFS I/O threads") // open to S3 and use of multiple CPU cores since S3 reads are relatively compute // expensive (SSL and JNI buffer overheads). DEFINE_int32(num_s3_io_threads, 16, "number of S3 I/O threads"); +// The maximum number of ADLS I/O threads. This number is a good default to have for +// clusters that may vary widely in size, due to an undocumented concurrency limit +// enforced by ADLS for a cluster, which spans between 500-700. For smaller clusters +// (~10 nodes), 64 threads would be more ideal. +DEFINE_int32(num_adls_io_threads, 16, "number of ADLS I/O threads"); // The read size is the size of the reads sent to hdfs/os. // There is a trade off of latency and throughout, trying to keep disks busy but // not introduce seeks. The literature seems to agree that with 8 MB reads, random @@ -364,6 +369,8 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) { num_threads_per_disk = FLAGS_num_remote_hdfs_io_threads; } else if (i == RemoteS3DiskId()) { num_threads_per_disk = FLAGS_num_s3_io_threads; + } else if (i == RemoteAdlsDiskId()) { + num_threads_per_disk = FLAGS_num_adls_io_threads; } else if (num_threads_per_disk_ != 0) { num_threads_per_disk = num_threads_per_disk_; } else if (DiskInfo::is_rotational(i)) { @@ -1232,9 +1239,11 @@ int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) { return RemoteDfsDiskId(); } if (IsS3APath(file)) return RemoteS3DiskId(); + if (IsADLSPath(file)) return RemoteAdlsDiskId(); } // Assign to a local disk queue. DCHECK(!IsS3APath(file)); // S3 is always remote. + DCHECK(!IsADLSPath(file)); // ADLS is always remote. if (disk_id == -1) { // disk id is unknown, assign it an arbitrary one. disk_id = next_disk_id_.Add(1); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9c4f75a6/be/src/runtime/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h index e2a92ec..aae19ee 100644 --- a/be/src/runtime/disk-io-mgr.h +++ b/be/src/runtime/disk-io-mgr.h @@ -760,6 +760,9 @@ class DiskIoMgr : public CacheLineAligned { /// The disk ID (and therefore disk_queues_ index) used for S3 accesses. int RemoteS3DiskId() const { return num_local_disks() + REMOTE_S3_DISK_OFFSET; } + /// The disk ID (and therefore disk_queues_ index) used for ADLS accesses. + int RemoteAdlsDiskId() const { return num_local_disks() + REMOTE_ADLS_DISK_OFFSET; } + /// Dumps the disk IoMgr queues (for readers and disks) std::string DebugString(); @@ -795,6 +798,7 @@ class DiskIoMgr : public CacheLineAligned { enum { REMOTE_DFS_DISK_OFFSET = 0, REMOTE_S3_DISK_OFFSET, + REMOTE_ADLS_DISK_OFFSET, REMOTE_NUM_DISKS };
