This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cf1d6d0f4ed4e6eecaa951ca3561a7f441ef5099
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Mon Aug 29 14:53:18 2022 +0200

    IMPALA-11539: Mitigate intra-node skew of file scans with MT_DOP
    
    Before IMPALA-9655 scan ranges were statically assigned to intra-node
    fragment instances based on Longest-Processing Time algorithm:
    
https://github.com/apache/impala/blame/a7866a94578be6289bbac31686de4d9032ad9261/be/src/scheduling/scheduler.cc#L499-L501
    
    From IMPALA-9655 we use dynamic intra-node load balancing for file
    scans. It means fragment instances have a shared queue of scan ranges
    and the fragment instances grab the next scan range to be read from
    this queue.
    
    IMPALA-9655 got rid of the LPT-algorithm which means the scan ranges
    are in a random order in the queue. This can lead to a skew if there
    are large scan ranges at the end.
    
    This patch mixes the above two approaches by using a priority queue
    for the scan ranges, so each fragment instance would grab the largest
    scan range in the queue. This could further mitigate intra-node skewing.
    Ranges that are marked to use the hdfs cache are still handled with
    higher priority.
    
    The patch intoduces a new class called ScanRangeQueueMt which implements
    the above.
    
    Testing:
     * added e2e test in which MIN(bytes_read) / MAX(bytes_read) is greater
       then 0.6 with this patch. Earlier we could see it less than 0.4.
    
    Performance:
    No significant perf change. Ran TPCH (scale 30) with mt_dop set to 10 on
    a 3 node minicluster on my desktop.
    
    
+----------+------------------------+---------+------------+------------+----------------+
    | Workload | File Format            | Avg (s) | Delta(Avg) | GeoMean(s) | 
Delta(GeoMean) |
    
+----------+------------------------+---------+------------+------------+----------------+
    | TPCH(30) | parquet / snap / block | 3.16    | -0.37%     | 2.29       | 
+0.40%         |
    
+----------+------------------------+---------+------------+------------+----------------+
    
    
+----------+----------+------------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+-------+
    | Workload | Query    | File Format            | Avg(s) | Base Avg(s) | 
Delta(Avg) | StdDev(%) | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | 
Tval  |
    
+----------+----------+------------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+-------+
    | TPCH(30) | TPCH-Q20 | parquet / snap / block | 1.58   | 1.55        |   
+2.04%   |   2.26%   |   3.15%        | 20    |   +3.14%       | 2.53    | 2.34 
 |
    | TPCH(30) | TPCH-Q11 | parquet / snap / block | 0.68   | 0.66        |   
+2.94%   |   3.67%   |   4.18%        | 20    |   +0.90%       | 1.97    | 2.33 
 |
    | TPCH(30) | TPCH-Q5  | parquet / snap / block | 2.02   | 1.99        |   
+1.55%   |   1.48%   |   1.38%        | 20    |   +2.27%       | 2.53    | 3.40 
 |
    | TPCH(30) | TPCH-Q1  | parquet / snap / block | 2.59   | 2.56        |   
+1.22%   |   3.25%   |   3.12%        | 20    |   +1.75%       | 1.33    | 1.21 
 |
    | TPCH(30) | TPCH-Q15 | parquet / snap / block | 2.15   | 2.13        |   
+1.20%   |   1.53%   |   1.39%        | 20    |   +0.91%       | 2.38    | 2.58 
 |
    | TPCH(30) | TPCH-Q4  | parquet / snap / block | 1.26   | 1.24        |   
+1.62%   |   2.33%   |   2.77%        | 20    |   +0.36%       | 2.23    | 1.98 
 |
    | TPCH(30) | TPCH-Q6  | parquet / snap / block | 0.74   | 0.73        |   
+1.31%   |   3.23%   |   3.24%        | 20    |   +0.54%       | 1.24    | 1.27 
 |
    | TPCH(30) | TPCH-Q7  | parquet / snap / block | 2.13   | 2.11        |   
+1.03%   |   1.84%   |   1.71%        | 20    |   +0.52%       | 1.85    | 1.83 
 |
    | TPCH(30) | TPCH-Q19 | parquet / snap / block | 2.02   | 2.00        |   
+0.91%   |   1.26%   |   1.32%        | 20    |   +0.34%       | 1.80    | 2.22 
 |
    | TPCH(30) | TPCH-Q2  | parquet / snap / block | 1.42   | 1.41        |   
+0.95%   |   2.54%   |   1.58%        | 20    |   +0.07%       | 0.42    | 1.41 
 |
    | TPCH(30) | TPCH-Q16 | parquet / snap / block | 0.93   | 0.92        |   
+0.59%   |   4.06%   |   4.53%        | 20    |   +0.27%       | 0.66    | 0.43 
 |
    | TPCH(30) | TPCH-Q12 | parquet / snap / block | 1.57   | 1.56        |   
+0.68%   |   1.98%   |   2.34%        | 20    |   +0.14%       | 1.04    | 0.98 
 |
    | TPCH(30) | TPCH-Q21 | parquet / snap / block | 10.69  | 10.66       |   
+0.28%   |   0.54%   |   0.59%        | 20    |   +0.36%       | 1.56    | 1.59 
 |
    | TPCH(30) | TPCH-Q8  | parquet / snap / block | 2.30   | 2.29        |   
+0.50%   |   1.38%   |   1.57%        | 20    |   +0.09%       | 1.12    | 1.06 
 |
    | TPCH(30) | TPCH-Q22 | parquet / snap / block | 1.01   | 1.00        |   
+0.51%   |   3.40%   |   4.25%        | 20    |   +0.08%       | 0.60    | 0.42 
 |
    | TPCH(30) | TPCH-Q13 | parquet / snap / block | 4.66   | 4.66        |   
+0.08%   |   1.00%   |   0.78%        | 20    |   -0.02%       | -0.25   | 0.29 
 |
    | TPCH(30) | TPCH-Q18 | parquet / snap / block | 6.26   | 6.25        |   
+0.13%   |   2.23%   |   1.49%        | 20    |   -0.07%       | -0.16   | 0.21 
 |
    | TPCH(30) | TPCH-Q14 | parquet / snap / block | 1.68   | 1.69        |   
-0.51%   |   2.51%   |   3.81%        | 20    |   -0.05%       | -0.31   | 
-0.51 |
    | TPCH(30) | TPCH-Q3  | parquet / snap / block | 1.86   | 1.88        |   
-0.72%   |   1.83%   |   2.10%        | 20    |   -0.07%       | -0.95   | 
-1.15 |
    | TPCH(30) | TPCH-Q9  | parquet / snap / block | 8.84   | 9.02        |   
-1.93%   |   1.78%   |   2.55%        | 20    |   -2.27%       | -2.50   | 
-2.79 |
    | TPCH(30) | TPCH-Q17 | parquet / snap / block | 8.27   | 8.45        |   
-2.09%   |   1.16%   |   1.32%        | 20    |   -2.33%       | -4.01   | 
-5.35 |
    | TPCH(30) | TPCH-Q10 | parquet / snap / block | 4.95   | 5.13        |   
-3.60%   |   6.86%   |   9.75%        | 20    |   -2.53%       | -2.70   | 
-1.37 |
    
+----------+----------+------------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+-------+
    
    Change-Id: Ib7dc1f1665565da6c0e155c1e585f7089b18a180
    Reviewed-on: http://gerrit.cloudera.org:8080/18929
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/hdfs-scan-node-base.cc |  19 ++-----
 be/src/exec/hdfs-scan-node-base.h  |   3 +-
 be/src/exec/scan-range-queue-mt.h  | 103 +++++++++++++++++++++++++++++++++++++
 tests/query_test/test_scanners.py  |  62 ++++++++++++++++++++++
 4 files changed, 171 insertions(+), 16 deletions(-)

diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index 4c12799bf..75d2aa5d9 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -337,6 +337,7 @@ Status 
HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
   shared_state_.progress().Init(
       Substitute("Splits complete (node=$0)", tnode_->node_id), total_splits);
   shared_state_.use_mt_scan_node_ = tnode_->hdfs_scan_node.use_mt_scan_node;
+  shared_state_.scan_range_queue_.Reserve(total_splits);
 
   // Distribute the work evenly for issuing initial scan ranges.
   DCHECK(shared_state_.use_mt_scan_node_ || instance_ctx_pbs.size() == 1)
@@ -1279,19 +1280,7 @@ void 
ScanRangeSharedState::UpdateRemainingScanRangeSubmissions(int32_t delta) {
 void ScanRangeSharedState::EnqueueScanRange(
     const vector<ScanRange*>& ranges, bool at_front) {
   DCHECK(use_mt_scan_node_) << "Should only be called by MT scan nodes";
-  if (!at_front) {
-    for (ScanRange* scan_range : ranges) {
-      if(scan_range->UseHdfsCache()){
-        scan_range_queue_.PushFront(scan_range);
-        continue;
-      }
-      scan_range_queue_.Enqueue(scan_range);
-    }
-  } else {
-    for (ScanRange* scan_range : ranges) {
-      scan_range_queue_.PushFront(scan_range);
-    }
-  }
+  scan_range_queue_.EnqueueRanges(ranges, at_front);
 }
 
 Status ScanRangeSharedState::GetNextScanRange(
@@ -1302,13 +1291,13 @@ Status ScanRangeSharedState::GetNextScanRange(
     if (*scan_range != nullptr) return Status::OK();
     {
       unique_lock<mutex> l(scan_range_submission_lock_);
-      while (scan_range_queue_.empty() && 
remaining_scan_range_submissions_.Load() > 0
+      while (scan_range_queue_.Empty() && 
remaining_scan_range_submissions_.Load() > 0
           && !state->is_cancelled()) {
         range_submission_cv_.Wait(l);
       }
     }
     // No more work to do.
-    if (scan_range_queue_.empty() && remaining_scan_range_submissions_.Load() 
== 0) {
+    if (scan_range_queue_.Empty() && remaining_scan_range_submissions_.Load() 
== 0) {
       break;
     }
     if (state->is_cancelled()) return Status::CANCELLED;
diff --git a/be/src/exec/hdfs-scan-node-base.h 
b/be/src/exec/hdfs-scan-node-base.h
index 13e4c99c4..21d18f25c 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -34,6 +34,7 @@
 #include "exec/file-metadata-utils.h"
 #include "exec/filter-context.h"
 #include "exec/scan-node.h"
+#include "exec/scan-range-queue-mt.h"
 #include "runtime/descriptors.h"
 #include "runtime/io/request-context.h"
 #include "runtime/io/request-ranges.h"
@@ -254,7 +255,7 @@ class ScanRangeSharedState {
 
   /// Queue of all scan ranges that need to be read. Shared by all instances 
of this
   /// fragment. Only used for MT scans.
-  InternalQueue<io::ScanRange> scan_range_queue_;
+  ScanRangeQueueMt scan_range_queue_;
 
   /// END: Members that are used only by MT scan nodes(use_mt_scan_node_ is 
true).
   /////////////////////////////////////////////////////////////////////
diff --git a/be/src/exec/scan-range-queue-mt.h 
b/be/src/exec/scan-range-queue-mt.h
new file mode 100644
index 000000000..8a80b25b1
--- /dev/null
+++ b/be/src/exec/scan-range-queue-mt.h
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <mutex>
+#include <queue>
+#include <vector>
+
+#include "runtime/io/request-ranges.h"
+#include "util/priority-queue.h"
+
+namespace impala {
+
+/// Queue of all scan ranges that need to be read. Shared by all instances of a
+/// fragment. Only used for MT scans where the scan ranges are dynamically 
assigned
+/// to the fragment instances using this queue. The scan ranges in this queue 
are
+/// ordered from largest to smallest, i.e. implementing a Longest-Processing 
Time
+/// scheduling. Exceptions are scan ranges that use HDFS caching, they are 
prioritized
+/// earlier.
+/// It's also possible to add high prio scan ranges, they are scheduled before
+/// the non-high prio scan ranges.
+class ScanRangeQueueMt {
+ public:
+  ScanRangeQueueMt() = default;
+
+  /// Adds all scan ranges to the queue. If 'high_prio' is true we add it to a 
separate
+  /// list of prioritized items.
+  void EnqueueRanges(const std::vector<io::ScanRange*>& ranges, bool 
high_prio) {
+    std::lock_guard<std::mutex> lock(scan_range_queue_lock_);
+    if (LIKELY(!high_prio)) {
+      for (io::ScanRange* scan_range : ranges) {
+        scan_range_queue_.Push(scan_range);
+      }
+    } else {
+      for (io::ScanRange* scan_range : ranges) {
+        high_prio_scan_ranges_.push(scan_range);
+      }
+    }
+  }
+
+  /// Returns the next scan range from the queue. Returns nullptr if the queue 
is empty.
+  io::ScanRange* Dequeue() {
+    io::ScanRange* ret = nullptr;
+    std::lock_guard<std::mutex> lock(scan_range_queue_lock_);
+    if (UNLIKELY(!high_prio_scan_ranges_.empty())) {
+      ret = high_prio_scan_ranges_.front();
+      high_prio_scan_ranges_.pop();
+    } else if (!scan_range_queue_.Empty()) {
+      ret = scan_range_queue_.Pop();
+    }
+    return ret;
+  }
+
+  /// Returns true if the scan range queue is empty.
+  bool Empty() {
+    std::lock_guard<std::mutex> lock(scan_range_queue_lock_);
+    return high_prio_scan_ranges_.empty() && scan_range_queue_.Empty();
+  }
+
+  /// Reserves capacity for the queue. This doesn't affect the queue of the 
high prio
+  /// items.
+  void Reserve(int64_t capacity) {
+    std::lock_guard<std::mutex> lock(scan_range_queue_lock_);
+    scan_range_queue_.Reserve(capacity);
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(ScanRangeQueueMt);
+
+  struct ScanRangeComparator {
+  public:
+    bool Less(const io::ScanRange* lhs, const io::ScanRange* rhs) const {
+      DCHECK(lhs != nullptr);
+      DCHECK(rhs != nullptr);
+      if (!lhs->UseHdfsCache() && rhs->UseHdfsCache()) return true;
+      if (lhs->UseHdfsCache() && !rhs->UseHdfsCache()) return false;
+      return lhs->bytes_to_read() < rhs->bytes_to_read();
+    }
+  };
+
+  std::mutex scan_range_queue_lock_;
+  std::queue<io::ScanRange*> high_prio_scan_ranges_;
+  ScanRangeComparator scan_range_compare_;
+  PriorityQueue<io::ScanRange*, ScanRangeComparator> scan_range_queue_{
+      scan_range_compare_};
+};
+
+}
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 62c0ab3b8..45865bee6 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -366,6 +366,68 @@ class TestWideTable(ImpalaTestSuite):
     assert expected == actual
 
 
+class TestHdfsScannerSkew(ImpalaTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestHdfsScannerSkew, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format in ('text'))
+
+  @SkipIfLocal.multiple_impalad
+  def test_mt_dop_skew_lpt(self, vector, unique_database):
+    """IMPALA-11539: Sanity check for MT scan nodes to make sure that the 
intra-node
+       skew is mitigated. For intra-node scan range assignment we are using 
dynamic
+       load balancing with a shared queue between the instances. With 
IMPALA-11539
+       the items in the queue are ordered by scan sizes from largest to 
smallest, i.e.
+       we are doing Longest-Processing Time (LPT) scheduling."""
+    def bytes_read_statistics(profile):
+      lines = [line.strip() for line in profile.splitlines() if "- BytesRead: 
" in line]
+      assert len(lines) == 7  # Averaged fragment + 6 fragment
+      min = None
+      max = None
+      for i in range(1, len(lines)):
+        # A line looks like:
+        # - BytesRead: 202.77 MB (212617555)
+        # we only need '212617555' from it
+        bytes_read_str = re.findall(r'\((\d+)\)', lines[i])[0]
+        bytes_read = int(bytes_read_str)
+        if min is None and max is None:
+          min = max = bytes_read
+          continue
+        if bytes_read < min: min = bytes_read
+        if bytes_read > max: max = bytes_read
+      return [min, max]
+
+    tbl_name = unique_database + ".lineitem_skew"
+    with self.create_impala_client() as imp_client:
+      imp_client.set_configuration_option('mt_dop', '2')
+      imp_client.execute("""create table {} like 
tpch.lineitem""".format(tbl_name))
+      # Create a couple of small data files
+      for i in range(1, 5):
+        imp_client.execute("""insert into {} select * from tpch.lineitem
+                              where l_orderkey % 5 = 0""".format(tbl_name))
+      # Create a couple of large files
+      imp_client.execute("insert into {} select * from 
tpch.lineitem".format(tbl_name))
+
+      # Let's execute the test multiple time to avoid flakiness
+      cnt_fail = 0
+      for i in range(0, 5):
+        results = imp_client.execute(
+            """select 
min(l_orderkey),min(l_partkey),min(l_suppkey),min(l_linenumber),
+                      
min(l_quantity),min(l_extendedprice),min(l_discount),min(l_tax),
+                      
min(l_returnflag),min(l_linestatus),min(l_shipdate),min(l_commitdate),
+                      
min(l_receiptdate),min(l_shipinstruct),min(l_shipmode),min(l_comment)
+               from {}""".format(tbl_name))
+        profile = results.runtime_profile
+        [min, max] = bytes_read_statistics(profile)
+        if float(min) / float(max) < 0.5: cnt_fail += 1
+      assert cnt_fail < 3
+
+
 class TestHudiParquet(ImpalaTestSuite):
   @classmethod
   def get_workload(cls):

Reply via email to