This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 2004a87  IMPALA-10337: Consider MAX_ROW_SIZE when computing max 
reservation
2004a87 is described below

commit 2004a87edfa3a78e89623f395e8697047fe3c984
Author: Riza Suminto <[email protected]>
AuthorDate: Thu Nov 19 15:32:36 2020 -0800

    IMPALA-10337: Consider MAX_ROW_SIZE when computing max reservation
    
    PlanRootSink can fail silently if result spooling is enabled and
    maxMemReservationBytes is less than 2 * MAX_ROW_SIZE. This happens
    because results are spilled using a SpillableRowBatchQueue which needs 2
    buffer (read and write) with at least MAX_ROW_SIZE bytes per buffer.
    This patch fixes this by setting a lower bound of 2 * MAX_ROW_SIZE while
    computing the min reservation for the PlanRootSink.
    
    Testing:
    - Pass exhaustive tests.
    - Add e2e TestResultSpoolingMaxReservation.
    - Lower MAX_ROW_SIZE on tests where MAX_RESULT_SPOOLING_MEM is set to
      extremely low value. Also verify that PLAN_ROOT_SINK's ReservationLimit
      remain unchanged after lowering the MAX_ROW_SIZE.
    
    Change-Id: Id7138e1e034ea5d1cd15cf8de399690e52a9d726
    Reviewed-on: http://gerrit.cloudera.org:8080/16765
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/buffered-tuple-stream.h             | 10 +--
 be/src/runtime/spillable-row-batch-queue.cc        |  7 +-
 .../org/apache/impala/planner/PlanRootSink.java    |  7 +-
 tests/custom_cluster/test_query_retries.py         |  7 ++
 tests/query_test/test_result_spooling.py           | 96 ++++++++++++++++++++++
 5 files changed, 118 insertions(+), 9 deletions(-)

diff --git a/be/src/runtime/buffered-tuple-stream.h 
b/be/src/runtime/buffered-tuple-stream.h
index 98a588b..5a22bfb 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -551,6 +551,11 @@ class BufferedTupleStream {
     }
   };
 
+  /// Returns the total additional bytes that this row will consume in 
write_page_ if
+  /// appended to the page. This includes the row's null indicators, the fixed 
length
+  /// part of the row and the data for inlined_string_slots_ and 
inlined_coll_slots_.
+  int64_t ComputeRowSize(TupleRow* row) const noexcept;
+
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferedTupleStream);
   friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test;
@@ -749,11 +754,6 @@ class BufferedTupleStream {
   /// but it is invalid to read or write from in future.
   void InvalidateReadIterator();
 
-  /// Returns the total additional bytes that this row will consume in 
write_page_ if
-  /// appended to the page. This includes the row's null indicators, the fixed 
length
-  /// part of the row and the data for inlined_string_slots_ and 
inlined_coll_slots_.
-  int64_t ComputeRowSize(TupleRow* row) const noexcept;
-
   /// Pins page and updates tracking stats.
   Status PinPage(Page* page);
 
diff --git a/be/src/runtime/spillable-row-batch-queue.cc 
b/be/src/runtime/spillable-row-batch-queue.cc
index da21660..eb02900 100644
--- a/be/src/runtime/spillable-row-batch-queue.cc
+++ b/be/src/runtime/spillable-row-batch-queue.cc
@@ -20,6 +20,7 @@
 #include "runtime/query-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "util/pretty-printer.h"
 
 #include "common/names.h"
 
@@ -94,7 +95,11 @@ Status SpillableRowBatchQueue::AddBatch(RowBatch* batch) {
         RETURN_IF_ERROR(status);
         // If the row could not be added after the stream was unpinned, an 
error should
         // have been set.
-        DCHECK(false) << "Rows should be added in unpinned mode unless an 
error occurred";
+        DCHECK(false) << Substitute("Row with a size of $0 should be added 
successfully "
+                                    "in unpinned mode unless an error 
occurred. "
+                                    "batch_queue_: $1",
+            
PrettyPrinter::PrintBytes(batch_queue_->ComputeRowSize(batch_itr.Get())),
+            batch_queue_->DebugString());
       }
     }
   }
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java 
b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
index 6d68bff..61d57df 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -76,8 +76,9 @@ public class PlanRootSink extends DataSink {
    * estimated number of input rows into the sink and multiplying it by the 
estimated
    * average row size. The estimated number of input rows is derived from the 
cardinality
    * of the associated fragment's root node. If the cardinality or the average 
row size
-   * are not available, a default value is used. The minimum reservation is 
set 2x the
-   * default spillable buffer size to account for the read and write page in 
the
+   * are not available, a default value is used. The minimum reservation is 
set as 2x of
+   * the maximum between default spillable buffer size and MAX_ROW_SIZE 
(rounded up to
+   * nearest power of 2) to account for the read and write pages in the
    * BufferedTupleStream used by the backend plan-root-sink. The maximum 
reservation is
    * set to the query-level config MAX_PINNED_RESULT_SPOOLING_MEMORY.
    */
@@ -87,7 +88,7 @@ public class PlanRootSink extends DataSink {
       long bufferSize = queryOptions.getDefault_spillable_buffer_size();
       long maxRowBufferSize = PlanNode.computeMaxSpillableBufferSize(
           bufferSize, queryOptions.getMax_row_size());
-      long minMemReservationBytes = 2 * bufferSize;
+      long minMemReservationBytes = 2 * maxRowBufferSize;
       long maxMemReservationBytes = Math.max(
           queryOptions.getMax_result_spooling_mem(), minMemReservationBytes);
 
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index 38e17f0..8a3611a 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -586,12 +586,19 @@ class TestQueryRetries(CustomClusterTestSuite):
         'min_spillable_buffer_size': 8 * 1024,
         'default_spillable_buffer_size': 8 * 1024,
         'max_result_spooling_mem': 8 * 1024,
+        'max_row_size': 8 * 1024,
         'max_spilled_result_spooling_mem': 8 * 1024})
 
     # Wait until we can fetch some results
     results = self.client.fetch(query, handle, max_rows=1)
     assert len(results.data) == 1
 
+    # PLAN_ROOT_SINK's reservation limit should be set at
+    # 2 * DEFAULT_SPILLABLE_BUFFER_SIZE = 16 KB.
+    plan_root_sink_reservation_limit = 
"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: 16.00 KB"
+    profile = self.client.get_runtime_profile(handle)
+    assert re.search(plan_root_sink_reservation_limit, profile)
+
     # Assert that the query is still executing
     summary = self.client.get_exec_summary(handle)
     assert summary.progress.num_completed_scan_ranges < 
summary.progress.total_scan_ranges
diff --git a/tests/query_test/test_result_spooling.py 
b/tests/query_test/test_result_spooling.py
index 6f54bed..e8c1295 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -65,6 +65,7 @@ class TestResultSpooling(ImpalaTestSuite):
     exec_options['min_spillable_buffer_size'] = 8 * 1024
     exec_options['default_spillable_buffer_size'] = 8 * 1024
     exec_options['max_result_spooling_mem'] = 32 * 1024
+    exec_options['max_row_size'] = 16 * 1024
 
     # Execute the query without result spooling and save the results for later 
validation
     base_result = self.execute_query(query, exec_options)
@@ -83,6 +84,8 @@ class TestResultSpooling(ImpalaTestSuite):
     unpinned_bytes_regex = 
"PLAN_ROOT_SINK[\s\S]*?PeakUnpinnedBytes.*\([1-9][0-9]*\)"
     # The PLAN_ROOT_SINK should have 'Spilled' in the 'ExecOption' info string.
     spilled_exec_option_regex = "ExecOption:.*Spilled"
+    # PLAN_ROOT_SINK's reservation limit should be set at 
MAX_RESULT_SPOOLING_MEM = 32 KB.
+    plan_root_sink_reservation_limit = 
"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: 32.00 KB"
 
     # Fetch the runtime profile every 0.5 seconds until either the timeout is 
hit, or
     # PeakUnpinnedBytes shows up in the profile.
@@ -99,6 +102,8 @@ class TestResultSpooling(ImpalaTestSuite):
       # At this point PLAN_ROOT_SINK must have spilled, so 
spilled_exec_option_regex
       # should be in the profile as well.
       assert re.search(spilled_exec_option_regex, profile)
+      # Check that PLAN_ROOT_SINK reservation limit is set accordingly.
+      assert re.search(plan_root_sink_reservation_limit, profile)
       result = self.client.fetch(query, handle)
       assert result.data == base_result.data
     finally:
@@ -408,3 +413,94 @@ class TestResultSpoolingFailpoints(ImpalaTestSuite):
     vector.get_value('exec_option')['debug_action'] = 
vector.get_value('debug_action')
     vector.get_value('exec_option')['spool_query_results'] = 'true'
     execute_query_expect_debug_action_failure(self, self._query, vector)
+
+
+class TestResultSpoolingMaxReservation(ImpalaTestSuite):
+  """These tests verify that while calculating max_reservation for spooling 
these query
+  options are taken into account: MAX_ROW_SIZE, MAX_RESULT_SPOOLING_MEM and
+  DEFAULT_SPILLABLE_BUFFER_SIZE."""
+
+  # Test with denial of reservations at varying frequency.
+  # Always test with the minimal amount of spilling and running with the 
absolute minimum
+  # memory requirement.
+  DEBUG_ACTION_VALUES = [None, '-1:OPEN:[email protected]']
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestResultSpoolingMaxReservation, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('debug_action',
+        *cls.DEBUG_ACTION_VALUES))
+
+    # Result spooling should be independent of file format, so only testing for
+    # table_format=parquet/none in order to avoid a test dimension explosion.
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'parquet' and
+        v.get_value('table_format').compression_codec == 'none')
+
+  def test_high_max_row_size(self, vector):
+    """Test that when MAX_ROW_SIZE is set, PLAN_ROOT_SINK can adjust its 
max_reservation
+    even though MAX_RESULT_SPOOLING_MEM is set lower."""
+    exec_options = vector.get_value('exec_option')
+    exec_options['debug_action'] = vector.get_value('debug_action')
+    exec_options['spool_query_results'] = 'true'
+    exec_options['max_row_size'] = 10 * 1024 * 1024
+    exec_options['max_result_spooling_mem'] = 2 * 1024 * 1024
+    exec_options['default_spillable_buffer_size'] = 2 * 1024 * 1024
+
+    # Select 3 wide rows, each with size of 10MB.
+    query = "select string_col from functional.widerow " \
+        "join functional.tinyinttable where int_col < 3"
+    result = self.execute_query(query, exec_options)
+    assert result.success, "Failed to run {0} when result spooling is enabled" 
\
+        .format(query)
+
+    # The PLAN_ROOT_SINK should have 'Spilled' in the 'ExecOption' info string.
+    spilled_exec_option_regex = "ExecOption:.*Spilled"
+    assert re.search(spilled_exec_option_regex, result.runtime_profile)
+
+    # PLAN_ROOT_SINK's reservation limit should be set at 2 * MAX_ROW_SIZE.
+    plan_root_sink_reservation_limit = 
"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: 32.00 MB"
+    assert re.search(plan_root_sink_reservation_limit, result.runtime_profile)
+
+  def test_high_default_spillable_buffer(self, vector):
+    """Test that high DEFAULT_SPILLABLE_BUFFER_SIZE wins the calculation for
+    PLAN_ROOT_SINK's max_reservation"""
+    exec_options = vector.get_value('exec_option')
+    exec_options['debug_action'] = vector.get_value('debug_action')
+    exec_options['spool_query_results'] = 'true'
+    exec_options['max_row_size'] = 8 * 1024
+    exec_options['max_result_spooling_mem'] = 8 * 1024
+    exec_options['default_spillable_buffer_size'] = 32 * 1024
+    self.__run_small_spilling_query(exec_options, "64.00 KB")
+
+  def test_high_max_result_spooling_mem(self, vector):
+    """Test that high MAX_RESULT_SPOOLING_MEM wins the calculation for
+    PLAN_ROOT_SINK's max_reservation"""
+    exec_options = vector.get_value('exec_option')
+    exec_options['debug_action'] = vector.get_value('debug_action')
+    exec_options['spool_query_results'] = 'true'
+    exec_options['max_row_size'] = 8 * 1024
+    exec_options['max_result_spooling_mem'] = 70 * 1024
+    exec_options['default_spillable_buffer_size'] = 8 * 1024
+    self.__run_small_spilling_query(exec_options, "70.00 KB")
+
+  def __run_small_spilling_query(self, exec_options, expected_limit):
+    """Given an exec_options, test that simple query below spills and 
PLAN_ROOT_SINK's
+    ReservationLimit match with the expected_limit"""
+    query = "select * from functional.alltypes order by id limit 1500"
+    result = self.execute_query(query, exec_options)
+    assert result.success, "Failed to run {0} when result spooling is enabled" 
\
+        .format(query)
+
+    # The PLAN_ROOT_SINK should have 'Spilled' in the 'ExecOption' info string.
+    spilled_exec_option_regex = "ExecOption:.*Spilled"
+    assert re.search(spilled_exec_option_regex, result.runtime_profile)
+
+    # Check that PLAN_ROOT_SINK's reservation limit match.
+    plan_root_sink_reservation_limit = 
"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \
+        .format(expected_limit)
+    assert re.search(plan_root_sink_reservation_limit, result.runtime_profile)

Reply via email to