This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit cf1d6d0f4ed4e6eecaa951ca3561a7f441ef5099 Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Mon Aug 29 14:53:18 2022 +0200 IMPALA-11539: Mitigate intra-node skew of file scans with MT_DOP Before IMPALA-9655 scan ranges were statically assigned to intra-node fragment instances based on Longest-Processing Time algorithm: https://github.com/apache/impala/blame/a7866a94578be6289bbac31686de4d9032ad9261/be/src/scheduling/scheduler.cc#L499-L501 From IMPALA-9655 we use dynamic intra-node load balancing for file scans. It means fragment instances have a shared queue of scan ranges and the fragment instances grab the next scan range to be read from this queue. IMPALA-9655 got rid of the LPT-algorithm which means the scan ranges are in a random order in the queue. This can lead to a skew if there are large scan ranges at the end. This patch mixes the above two approaches by using a priority queue for the scan ranges, so each fragment instance would grab the largest scan range in the queue. This could further mitigate intra-node skewing. Ranges that are marked to use the hdfs cache are still handled with higher priority. The patch intoduces a new class called ScanRangeQueueMt which implements the above. Testing: * added e2e test in which MIN(bytes_read) / MAX(bytes_read) is greater then 0.6 with this patch. Earlier we could see it less than 0.4. Performance: No significant perf change. Ran TPCH (scale 30) with mt_dop set to 10 on a 3 node minicluster on my desktop. +----------+------------------------+---------+------------+------------+----------------+ | Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) | +----------+------------------------+---------+------------+------------+----------------+ | TPCH(30) | parquet / snap / block | 3.16 | -0.37% | 2.29 | +0.40% | +----------+------------------------+---------+------------+------------+----------------+ +----------+----------+------------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+-------+ | Workload | Query | File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval | +----------+----------+------------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+-------+ | TPCH(30) | TPCH-Q20 | parquet / snap / block | 1.58 | 1.55 | +2.04% | 2.26% | 3.15% | 20 | +3.14% | 2.53 | 2.34 | | TPCH(30) | TPCH-Q11 | parquet / snap / block | 0.68 | 0.66 | +2.94% | 3.67% | 4.18% | 20 | +0.90% | 1.97 | 2.33 | | TPCH(30) | TPCH-Q5 | parquet / snap / block | 2.02 | 1.99 | +1.55% | 1.48% | 1.38% | 20 | +2.27% | 2.53 | 3.40 | | TPCH(30) | TPCH-Q1 | parquet / snap / block | 2.59 | 2.56 | +1.22% | 3.25% | 3.12% | 20 | +1.75% | 1.33 | 1.21 | | TPCH(30) | TPCH-Q15 | parquet / snap / block | 2.15 | 2.13 | +1.20% | 1.53% | 1.39% | 20 | +0.91% | 2.38 | 2.58 | | TPCH(30) | TPCH-Q4 | parquet / snap / block | 1.26 | 1.24 | +1.62% | 2.33% | 2.77% | 20 | +0.36% | 2.23 | 1.98 | | TPCH(30) | TPCH-Q6 | parquet / snap / block | 0.74 | 0.73 | +1.31% | 3.23% | 3.24% | 20 | +0.54% | 1.24 | 1.27 | | TPCH(30) | TPCH-Q7 | parquet / snap / block | 2.13 | 2.11 | +1.03% | 1.84% | 1.71% | 20 | +0.52% | 1.85 | 1.83 | | TPCH(30) | TPCH-Q19 | parquet / snap / block | 2.02 | 2.00 | +0.91% | 1.26% | 1.32% | 20 | +0.34% | 1.80 | 2.22 | | TPCH(30) | TPCH-Q2 | parquet / snap / block | 1.42 | 1.41 | +0.95% | 2.54% | 1.58% | 20 | +0.07% | 0.42 | 1.41 | | TPCH(30) | TPCH-Q16 | parquet / snap / block | 0.93 | 0.92 | +0.59% | 4.06% | 4.53% | 20 | +0.27% | 0.66 | 0.43 | | TPCH(30) | TPCH-Q12 | parquet / snap / block | 1.57 | 1.56 | +0.68% | 1.98% | 2.34% | 20 | +0.14% | 1.04 | 0.98 | | TPCH(30) | TPCH-Q21 | parquet / snap / block | 10.69 | 10.66 | +0.28% | 0.54% | 0.59% | 20 | +0.36% | 1.56 | 1.59 | | TPCH(30) | TPCH-Q8 | parquet / snap / block | 2.30 | 2.29 | +0.50% | 1.38% | 1.57% | 20 | +0.09% | 1.12 | 1.06 | | TPCH(30) | TPCH-Q22 | parquet / snap / block | 1.01 | 1.00 | +0.51% | 3.40% | 4.25% | 20 | +0.08% | 0.60 | 0.42 | | TPCH(30) | TPCH-Q13 | parquet / snap / block | 4.66 | 4.66 | +0.08% | 1.00% | 0.78% | 20 | -0.02% | -0.25 | 0.29 | | TPCH(30) | TPCH-Q18 | parquet / snap / block | 6.26 | 6.25 | +0.13% | 2.23% | 1.49% | 20 | -0.07% | -0.16 | 0.21 | | TPCH(30) | TPCH-Q14 | parquet / snap / block | 1.68 | 1.69 | -0.51% | 2.51% | 3.81% | 20 | -0.05% | -0.31 | -0.51 | | TPCH(30) | TPCH-Q3 | parquet / snap / block | 1.86 | 1.88 | -0.72% | 1.83% | 2.10% | 20 | -0.07% | -0.95 | -1.15 | | TPCH(30) | TPCH-Q9 | parquet / snap / block | 8.84 | 9.02 | -1.93% | 1.78% | 2.55% | 20 | -2.27% | -2.50 | -2.79 | | TPCH(30) | TPCH-Q17 | parquet / snap / block | 8.27 | 8.45 | -2.09% | 1.16% | 1.32% | 20 | -2.33% | -4.01 | -5.35 | | TPCH(30) | TPCH-Q10 | parquet / snap / block | 4.95 | 5.13 | -3.60% | 6.86% | 9.75% | 20 | -2.53% | -2.70 | -1.37 | +----------+----------+------------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+-------+ Change-Id: Ib7dc1f1665565da6c0e155c1e585f7089b18a180 Reviewed-on: http://gerrit.cloudera.org:8080/18929 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/hdfs-scan-node-base.cc | 19 ++----- be/src/exec/hdfs-scan-node-base.h | 3 +- be/src/exec/scan-range-queue-mt.h | 103 +++++++++++++++++++++++++++++++++++++ tests/query_test/test_scanners.py | 62 ++++++++++++++++++++++ 4 files changed, 171 insertions(+), 16 deletions(-) diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 4c12799bf..75d2aa5d9 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -337,6 +337,7 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat shared_state_.progress().Init( Substitute("Splits complete (node=$0)", tnode_->node_id), total_splits); shared_state_.use_mt_scan_node_ = tnode_->hdfs_scan_node.use_mt_scan_node; + shared_state_.scan_range_queue_.Reserve(total_splits); // Distribute the work evenly for issuing initial scan ranges. DCHECK(shared_state_.use_mt_scan_node_ || instance_ctx_pbs.size() == 1) @@ -1279,19 +1280,7 @@ void ScanRangeSharedState::UpdateRemainingScanRangeSubmissions(int32_t delta) { void ScanRangeSharedState::EnqueueScanRange( const vector<ScanRange*>& ranges, bool at_front) { DCHECK(use_mt_scan_node_) << "Should only be called by MT scan nodes"; - if (!at_front) { - for (ScanRange* scan_range : ranges) { - if(scan_range->UseHdfsCache()){ - scan_range_queue_.PushFront(scan_range); - continue; - } - scan_range_queue_.Enqueue(scan_range); - } - } else { - for (ScanRange* scan_range : ranges) { - scan_range_queue_.PushFront(scan_range); - } - } + scan_range_queue_.EnqueueRanges(ranges, at_front); } Status ScanRangeSharedState::GetNextScanRange( @@ -1302,13 +1291,13 @@ Status ScanRangeSharedState::GetNextScanRange( if (*scan_range != nullptr) return Status::OK(); { unique_lock<mutex> l(scan_range_submission_lock_); - while (scan_range_queue_.empty() && remaining_scan_range_submissions_.Load() > 0 + while (scan_range_queue_.Empty() && remaining_scan_range_submissions_.Load() > 0 && !state->is_cancelled()) { range_submission_cv_.Wait(l); } } // No more work to do. - if (scan_range_queue_.empty() && remaining_scan_range_submissions_.Load() == 0) { + if (scan_range_queue_.Empty() && remaining_scan_range_submissions_.Load() == 0) { break; } if (state->is_cancelled()) return Status::CANCELLED; diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 13e4c99c4..21d18f25c 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -34,6 +34,7 @@ #include "exec/file-metadata-utils.h" #include "exec/filter-context.h" #include "exec/scan-node.h" +#include "exec/scan-range-queue-mt.h" #include "runtime/descriptors.h" #include "runtime/io/request-context.h" #include "runtime/io/request-ranges.h" @@ -254,7 +255,7 @@ class ScanRangeSharedState { /// Queue of all scan ranges that need to be read. Shared by all instances of this /// fragment. Only used for MT scans. - InternalQueue<io::ScanRange> scan_range_queue_; + ScanRangeQueueMt scan_range_queue_; /// END: Members that are used only by MT scan nodes(use_mt_scan_node_ is true). ///////////////////////////////////////////////////////////////////// diff --git a/be/src/exec/scan-range-queue-mt.h b/be/src/exec/scan-range-queue-mt.h new file mode 100644 index 000000000..8a80b25b1 --- /dev/null +++ b/be/src/exec/scan-range-queue-mt.h @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <mutex> +#include <queue> +#include <vector> + +#include "runtime/io/request-ranges.h" +#include "util/priority-queue.h" + +namespace impala { + +/// Queue of all scan ranges that need to be read. Shared by all instances of a +/// fragment. Only used for MT scans where the scan ranges are dynamically assigned +/// to the fragment instances using this queue. The scan ranges in this queue are +/// ordered from largest to smallest, i.e. implementing a Longest-Processing Time +/// scheduling. Exceptions are scan ranges that use HDFS caching, they are prioritized +/// earlier. +/// It's also possible to add high prio scan ranges, they are scheduled before +/// the non-high prio scan ranges. +class ScanRangeQueueMt { + public: + ScanRangeQueueMt() = default; + + /// Adds all scan ranges to the queue. If 'high_prio' is true we add it to a separate + /// list of prioritized items. + void EnqueueRanges(const std::vector<io::ScanRange*>& ranges, bool high_prio) { + std::lock_guard<std::mutex> lock(scan_range_queue_lock_); + if (LIKELY(!high_prio)) { + for (io::ScanRange* scan_range : ranges) { + scan_range_queue_.Push(scan_range); + } + } else { + for (io::ScanRange* scan_range : ranges) { + high_prio_scan_ranges_.push(scan_range); + } + } + } + + /// Returns the next scan range from the queue. Returns nullptr if the queue is empty. + io::ScanRange* Dequeue() { + io::ScanRange* ret = nullptr; + std::lock_guard<std::mutex> lock(scan_range_queue_lock_); + if (UNLIKELY(!high_prio_scan_ranges_.empty())) { + ret = high_prio_scan_ranges_.front(); + high_prio_scan_ranges_.pop(); + } else if (!scan_range_queue_.Empty()) { + ret = scan_range_queue_.Pop(); + } + return ret; + } + + /// Returns true if the scan range queue is empty. + bool Empty() { + std::lock_guard<std::mutex> lock(scan_range_queue_lock_); + return high_prio_scan_ranges_.empty() && scan_range_queue_.Empty(); + } + + /// Reserves capacity for the queue. This doesn't affect the queue of the high prio + /// items. + void Reserve(int64_t capacity) { + std::lock_guard<std::mutex> lock(scan_range_queue_lock_); + scan_range_queue_.Reserve(capacity); + } + + private: + DISALLOW_COPY_AND_ASSIGN(ScanRangeQueueMt); + + struct ScanRangeComparator { + public: + bool Less(const io::ScanRange* lhs, const io::ScanRange* rhs) const { + DCHECK(lhs != nullptr); + DCHECK(rhs != nullptr); + if (!lhs->UseHdfsCache() && rhs->UseHdfsCache()) return true; + if (lhs->UseHdfsCache() && !rhs->UseHdfsCache()) return false; + return lhs->bytes_to_read() < rhs->bytes_to_read(); + } + }; + + std::mutex scan_range_queue_lock_; + std::queue<io::ScanRange*> high_prio_scan_ranges_; + ScanRangeComparator scan_range_compare_; + PriorityQueue<io::ScanRange*, ScanRangeComparator> scan_range_queue_{ + scan_range_compare_}; +}; + +} diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 62c0ab3b8..45865bee6 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -366,6 +366,68 @@ class TestWideTable(ImpalaTestSuite): assert expected == actual +class TestHdfsScannerSkew(ImpalaTestSuite): + @classmethod + def get_workload(cls): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestHdfsScannerSkew, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format in ('text')) + + @SkipIfLocal.multiple_impalad + def test_mt_dop_skew_lpt(self, vector, unique_database): + """IMPALA-11539: Sanity check for MT scan nodes to make sure that the intra-node + skew is mitigated. For intra-node scan range assignment we are using dynamic + load balancing with a shared queue between the instances. With IMPALA-11539 + the items in the queue are ordered by scan sizes from largest to smallest, i.e. + we are doing Longest-Processing Time (LPT) scheduling.""" + def bytes_read_statistics(profile): + lines = [line.strip() for line in profile.splitlines() if "- BytesRead: " in line] + assert len(lines) == 7 # Averaged fragment + 6 fragment + min = None + max = None + for i in range(1, len(lines)): + # A line looks like: + # - BytesRead: 202.77 MB (212617555) + # we only need '212617555' from it + bytes_read_str = re.findall(r'\((\d+)\)', lines[i])[0] + bytes_read = int(bytes_read_str) + if min is None and max is None: + min = max = bytes_read + continue + if bytes_read < min: min = bytes_read + if bytes_read > max: max = bytes_read + return [min, max] + + tbl_name = unique_database + ".lineitem_skew" + with self.create_impala_client() as imp_client: + imp_client.set_configuration_option('mt_dop', '2') + imp_client.execute("""create table {} like tpch.lineitem""".format(tbl_name)) + # Create a couple of small data files + for i in range(1, 5): + imp_client.execute("""insert into {} select * from tpch.lineitem + where l_orderkey % 5 = 0""".format(tbl_name)) + # Create a couple of large files + imp_client.execute("insert into {} select * from tpch.lineitem".format(tbl_name)) + + # Let's execute the test multiple time to avoid flakiness + cnt_fail = 0 + for i in range(0, 5): + results = imp_client.execute( + """select min(l_orderkey),min(l_partkey),min(l_suppkey),min(l_linenumber), + min(l_quantity),min(l_extendedprice),min(l_discount),min(l_tax), + min(l_returnflag),min(l_linestatus),min(l_shipdate),min(l_commitdate), + min(l_receiptdate),min(l_shipinstruct),min(l_shipmode),min(l_comment) + from {}""".format(tbl_name)) + profile = results.runtime_profile + [min, max] = bytes_read_statistics(profile) + if float(min) / float(max) < 0.5: cnt_fail += 1 + assert cnt_fail < 3 + + class TestHudiParquet(ImpalaTestSuite): @classmethod def get_workload(cls):
