This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 2004a87 IMPALA-10337: Consider MAX_ROW_SIZE when computing max
reservation
2004a87 is described below
commit 2004a87edfa3a78e89623f395e8697047fe3c984
Author: Riza Suminto <[email protected]>
AuthorDate: Thu Nov 19 15:32:36 2020 -0800
IMPALA-10337: Consider MAX_ROW_SIZE when computing max reservation
PlanRootSink can fail silently if result spooling is enabled and
maxMemReservationBytes is less than 2 * MAX_ROW_SIZE. This happens
because results are spilled using a SpillableRowBatchQueue which needs 2
buffer (read and write) with at least MAX_ROW_SIZE bytes per buffer.
This patch fixes this by setting a lower bound of 2 * MAX_ROW_SIZE while
computing the min reservation for the PlanRootSink.
Testing:
- Pass exhaustive tests.
- Add e2e TestResultSpoolingMaxReservation.
- Lower MAX_ROW_SIZE on tests where MAX_RESULT_SPOOLING_MEM is set to
extremely low value. Also verify that PLAN_ROOT_SINK's ReservationLimit
remain unchanged after lowering the MAX_ROW_SIZE.
Change-Id: Id7138e1e034ea5d1cd15cf8de399690e52a9d726
Reviewed-on: http://gerrit.cloudera.org:8080/16765
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/runtime/buffered-tuple-stream.h | 10 +--
be/src/runtime/spillable-row-batch-queue.cc | 7 +-
.../org/apache/impala/planner/PlanRootSink.java | 7 +-
tests/custom_cluster/test_query_retries.py | 7 ++
tests/query_test/test_result_spooling.py | 96 ++++++++++++++++++++++
5 files changed, 118 insertions(+), 9 deletions(-)
diff --git a/be/src/runtime/buffered-tuple-stream.h
b/be/src/runtime/buffered-tuple-stream.h
index 98a588b..5a22bfb 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -551,6 +551,11 @@ class BufferedTupleStream {
}
};
+ /// Returns the total additional bytes that this row will consume in
write_page_ if
+ /// appended to the page. This includes the row's null indicators, the fixed
length
+ /// part of the row and the data for inlined_string_slots_ and
inlined_coll_slots_.
+ int64_t ComputeRowSize(TupleRow* row) const noexcept;
+
private:
DISALLOW_COPY_AND_ASSIGN(BufferedTupleStream);
friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test;
@@ -749,11 +754,6 @@ class BufferedTupleStream {
/// but it is invalid to read or write from in future.
void InvalidateReadIterator();
- /// Returns the total additional bytes that this row will consume in
write_page_ if
- /// appended to the page. This includes the row's null indicators, the fixed
length
- /// part of the row and the data for inlined_string_slots_ and
inlined_coll_slots_.
- int64_t ComputeRowSize(TupleRow* row) const noexcept;
-
/// Pins page and updates tracking stats.
Status PinPage(Page* page);
diff --git a/be/src/runtime/spillable-row-batch-queue.cc
b/be/src/runtime/spillable-row-batch-queue.cc
index da21660..eb02900 100644
--- a/be/src/runtime/spillable-row-batch-queue.cc
+++ b/be/src/runtime/spillable-row-batch-queue.cc
@@ -20,6 +20,7 @@
#include "runtime/query-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
+#include "util/pretty-printer.h"
#include "common/names.h"
@@ -94,7 +95,11 @@ Status SpillableRowBatchQueue::AddBatch(RowBatch* batch) {
RETURN_IF_ERROR(status);
// If the row could not be added after the stream was unpinned, an
error should
// have been set.
- DCHECK(false) << "Rows should be added in unpinned mode unless an
error occurred";
+ DCHECK(false) << Substitute("Row with a size of $0 should be added
successfully "
+ "in unpinned mode unless an error
occurred. "
+ "batch_queue_: $1",
+
PrettyPrinter::PrintBytes(batch_queue_->ComputeRowSize(batch_itr.Get())),
+ batch_queue_->DebugString());
}
}
}
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
index 6d68bff..61d57df 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -76,8 +76,9 @@ public class PlanRootSink extends DataSink {
* estimated number of input rows into the sink and multiplying it by the
estimated
* average row size. The estimated number of input rows is derived from the
cardinality
* of the associated fragment's root node. If the cardinality or the average
row size
- * are not available, a default value is used. The minimum reservation is
set 2x the
- * default spillable buffer size to account for the read and write page in
the
+ * are not available, a default value is used. The minimum reservation is
set as 2x of
+ * the maximum between default spillable buffer size and MAX_ROW_SIZE
(rounded up to
+ * nearest power of 2) to account for the read and write pages in the
* BufferedTupleStream used by the backend plan-root-sink. The maximum
reservation is
* set to the query-level config MAX_PINNED_RESULT_SPOOLING_MEMORY.
*/
@@ -87,7 +88,7 @@ public class PlanRootSink extends DataSink {
long bufferSize = queryOptions.getDefault_spillable_buffer_size();
long maxRowBufferSize = PlanNode.computeMaxSpillableBufferSize(
bufferSize, queryOptions.getMax_row_size());
- long minMemReservationBytes = 2 * bufferSize;
+ long minMemReservationBytes = 2 * maxRowBufferSize;
long maxMemReservationBytes = Math.max(
queryOptions.getMax_result_spooling_mem(), minMemReservationBytes);
diff --git a/tests/custom_cluster/test_query_retries.py
b/tests/custom_cluster/test_query_retries.py
index 38e17f0..8a3611a 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -586,12 +586,19 @@ class TestQueryRetries(CustomClusterTestSuite):
'min_spillable_buffer_size': 8 * 1024,
'default_spillable_buffer_size': 8 * 1024,
'max_result_spooling_mem': 8 * 1024,
+ 'max_row_size': 8 * 1024,
'max_spilled_result_spooling_mem': 8 * 1024})
# Wait until we can fetch some results
results = self.client.fetch(query, handle, max_rows=1)
assert len(results.data) == 1
+ # PLAN_ROOT_SINK's reservation limit should be set at
+ # 2 * DEFAULT_SPILLABLE_BUFFER_SIZE = 16 KB.
+ plan_root_sink_reservation_limit =
"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: 16.00 KB"
+ profile = self.client.get_runtime_profile(handle)
+ assert re.search(plan_root_sink_reservation_limit, profile)
+
# Assert that the query is still executing
summary = self.client.get_exec_summary(handle)
assert summary.progress.num_completed_scan_ranges <
summary.progress.total_scan_ranges
diff --git a/tests/query_test/test_result_spooling.py
b/tests/query_test/test_result_spooling.py
index 6f54bed..e8c1295 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -65,6 +65,7 @@ class TestResultSpooling(ImpalaTestSuite):
exec_options['min_spillable_buffer_size'] = 8 * 1024
exec_options['default_spillable_buffer_size'] = 8 * 1024
exec_options['max_result_spooling_mem'] = 32 * 1024
+ exec_options['max_row_size'] = 16 * 1024
# Execute the query without result spooling and save the results for later
validation
base_result = self.execute_query(query, exec_options)
@@ -83,6 +84,8 @@ class TestResultSpooling(ImpalaTestSuite):
unpinned_bytes_regex =
"PLAN_ROOT_SINK[\s\S]*?PeakUnpinnedBytes.*\([1-9][0-9]*\)"
# The PLAN_ROOT_SINK should have 'Spilled' in the 'ExecOption' info string.
spilled_exec_option_regex = "ExecOption:.*Spilled"
+ # PLAN_ROOT_SINK's reservation limit should be set at
MAX_RESULT_SPOOLING_MEM = 32 KB.
+ plan_root_sink_reservation_limit =
"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: 32.00 KB"
# Fetch the runtime profile every 0.5 seconds until either the timeout is
hit, or
# PeakUnpinnedBytes shows up in the profile.
@@ -99,6 +102,8 @@ class TestResultSpooling(ImpalaTestSuite):
# At this point PLAN_ROOT_SINK must have spilled, so
spilled_exec_option_regex
# should be in the profile as well.
assert re.search(spilled_exec_option_regex, profile)
+ # Check that PLAN_ROOT_SINK reservation limit is set accordingly.
+ assert re.search(plan_root_sink_reservation_limit, profile)
result = self.client.fetch(query, handle)
assert result.data == base_result.data
finally:
@@ -408,3 +413,94 @@ class TestResultSpoolingFailpoints(ImpalaTestSuite):
vector.get_value('exec_option')['debug_action'] =
vector.get_value('debug_action')
vector.get_value('exec_option')['spool_query_results'] = 'true'
execute_query_expect_debug_action_failure(self, self._query, vector)
+
+
+class TestResultSpoolingMaxReservation(ImpalaTestSuite):
+ """These tests verify that while calculating max_reservation for spooling
these query
+ options are taken into account: MAX_ROW_SIZE, MAX_RESULT_SPOOLING_MEM and
+ DEFAULT_SPILLABLE_BUFFER_SIZE."""
+
+ # Test with denial of reservations at varying frequency.
+ # Always test with the minimal amount of spilling and running with the
absolute minimum
+ # memory requirement.
+ DEBUG_ACTION_VALUES = [None, '-1:OPEN:[email protected]']
+
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestResultSpoolingMaxReservation, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('debug_action',
+ *cls.DEBUG_ACTION_VALUES))
+
+ # Result spooling should be independent of file format, so only testing for
+ # table_format=parquet/none in order to avoid a test dimension explosion.
+ cls.ImpalaTestMatrix.add_constraint(lambda v:
+ v.get_value('table_format').file_format == 'parquet' and
+ v.get_value('table_format').compression_codec == 'none')
+
+ def test_high_max_row_size(self, vector):
+ """Test that when MAX_ROW_SIZE is set, PLAN_ROOT_SINK can adjust its
max_reservation
+ even though MAX_RESULT_SPOOLING_MEM is set lower."""
+ exec_options = vector.get_value('exec_option')
+ exec_options['debug_action'] = vector.get_value('debug_action')
+ exec_options['spool_query_results'] = 'true'
+ exec_options['max_row_size'] = 10 * 1024 * 1024
+ exec_options['max_result_spooling_mem'] = 2 * 1024 * 1024
+ exec_options['default_spillable_buffer_size'] = 2 * 1024 * 1024
+
+ # Select 3 wide rows, each with size of 10MB.
+ query = "select string_col from functional.widerow " \
+ "join functional.tinyinttable where int_col < 3"
+ result = self.execute_query(query, exec_options)
+ assert result.success, "Failed to run {0} when result spooling is enabled"
\
+ .format(query)
+
+ # The PLAN_ROOT_SINK should have 'Spilled' in the 'ExecOption' info string.
+ spilled_exec_option_regex = "ExecOption:.*Spilled"
+ assert re.search(spilled_exec_option_regex, result.runtime_profile)
+
+ # PLAN_ROOT_SINK's reservation limit should be set at 2 * MAX_ROW_SIZE.
+ plan_root_sink_reservation_limit =
"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: 32.00 MB"
+ assert re.search(plan_root_sink_reservation_limit, result.runtime_profile)
+
+ def test_high_default_spillable_buffer(self, vector):
+ """Test that high DEFAULT_SPILLABLE_BUFFER_SIZE wins the calculation for
+ PLAN_ROOT_SINK's max_reservation"""
+ exec_options = vector.get_value('exec_option')
+ exec_options['debug_action'] = vector.get_value('debug_action')
+ exec_options['spool_query_results'] = 'true'
+ exec_options['max_row_size'] = 8 * 1024
+ exec_options['max_result_spooling_mem'] = 8 * 1024
+ exec_options['default_spillable_buffer_size'] = 32 * 1024
+ self.__run_small_spilling_query(exec_options, "64.00 KB")
+
+ def test_high_max_result_spooling_mem(self, vector):
+ """Test that high MAX_RESULT_SPOOLING_MEM wins the calculation for
+ PLAN_ROOT_SINK's max_reservation"""
+ exec_options = vector.get_value('exec_option')
+ exec_options['debug_action'] = vector.get_value('debug_action')
+ exec_options['spool_query_results'] = 'true'
+ exec_options['max_row_size'] = 8 * 1024
+ exec_options['max_result_spooling_mem'] = 70 * 1024
+ exec_options['default_spillable_buffer_size'] = 8 * 1024
+ self.__run_small_spilling_query(exec_options, "70.00 KB")
+
+ def __run_small_spilling_query(self, exec_options, expected_limit):
+ """Given an exec_options, test that simple query below spills and
PLAN_ROOT_SINK's
+ ReservationLimit match with the expected_limit"""
+ query = "select * from functional.alltypes order by id limit 1500"
+ result = self.execute_query(query, exec_options)
+ assert result.success, "Failed to run {0} when result spooling is enabled"
\
+ .format(query)
+
+ # The PLAN_ROOT_SINK should have 'Spilled' in the 'ExecOption' info string.
+ spilled_exec_option_regex = "ExecOption:.*Spilled"
+ assert re.search(spilled_exec_option_regex, result.runtime_profile)
+
+ # Check that PLAN_ROOT_SINK's reservation limit match.
+ plan_root_sink_reservation_limit =
"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \
+ .format(expected_limit)
+ assert re.search(plan_root_sink_reservation_limit, result.runtime_profile)