This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 88e0e4e8b IMPALA-13334: Fix test_sort.py DCHECK hit when
max_sort_run_size>0
88e0e4e8b is described below
commit 88e0e4e8baa97f7fded12230b14232dc85cf6d79
Author: Noemi Pap-Takacs <[email protected]>
AuthorDate: Mon Sep 9 16:18:56 2024 +0200
IMPALA-13334: Fix test_sort.py DCHECK hit when max_sort_run_size>0
test_sort.py declared 'max_sort_run_size' query option, but it
silently did not exercise it. Fixing the query option declaration
in IMPALA-13349 using helper function add_exec_option_dimension()
revealed a DCHECK failure in sorter.cc. In some cases the length
of an in-memory run could exceed 'max_sort_run_size' by 1 page.
This patch fixed the DCHECK failure by strictly enforcing the
'max_sort_run_size' limit.
Memory limits were also adjusted in test_sort.py according to
the memory usage of the different sort run sizes.
Additionally, the 'MAX_SORT_RUN_SIZE' query option's valid range was
relaxed. Instead of throwing an error, negative values also disable
the run size limitation, just as the default: '0'.
Testing:
- E2E tests in sort.py
- set test
Change-Id: I943d8edcc87df168448a174d6c9c6b46fe960eae
Reviewed-on: http://gerrit.cloudera.org:8080/21777
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/runtime/sorter-internal.h | 6 +++
be/src/runtime/sorter.cc | 28 ++++++++---
be/src/service/query-options-test.cc | 10 ++++
be/src/service/query-options.cc | 6 +--
common/thrift/ImpalaService.thrift | 14 +++---
.../functional-query/queries/QueryTest/set.test | 5 ++
.../sort-reservation-usage-single-node.test | 4 +-
tests/query_test/test_sort.py | 56 ++++++++++++++--------
8 files changed, 91 insertions(+), 38 deletions(-)
diff --git a/be/src/runtime/sorter-internal.h b/be/src/runtime/sorter-internal.h
index dba50f109..f746b5bf7 100644
--- a/be/src/runtime/sorter-internal.h
+++ b/be/src/runtime/sorter-internal.h
@@ -335,6 +335,12 @@ class Sorter::Run {
bool ConvertOffsetsForCollectionChildren(const CollectionValue& cv,
const SlotDescriptor& slot_desc) WARN_UNUSED_RESULT;
+ /// Only initial in-memory runs' size can be limited by the
'MAX_SORT_RUN_SIZE' query
+ /// option. Returns true if the initial in-memory run reached its maximum
capacity in
+ /// pages (fixed-len + var-len pages).
+ template <bool INITIAL_RUN>
+ bool IR_ALWAYS_INLINE MaxSortRunSizeReached();
+
int NumOpenPages(const vector<Page>& pages);
/// Close all open pages and clear vector.
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 07aecc66a..dee4c749d 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -125,8 +125,8 @@ Sorter::Run::Run(Sorter* parent, TupleDescriptor*
sort_tuple_desc, bool initial_
max_num_of_pages_(initial_run ? parent->inmem_run_max_pages_ : 0) {}
Status Sorter::Run::Init() {
- int num_to_create = 1 + has_var_len_slots_ + (has_var_len_slots_ &&
initial_run_ &&
- (sorter_->enable_spilling_ && max_num_of_pages_ == 0));
+ int num_to_create = 1 + has_var_len_slots_
+ + (has_var_len_slots_ && initial_run_ && sorter_->enable_spilling_);
int64_t required_mem = num_to_create * sorter_->page_len_;
if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
return Status(Substitute(
@@ -243,8 +243,14 @@ Status Sorter::Run::AddBatchInternal(
DCHECK_GT(var_len_pages_.size(), 0);
Page* cur_var_len_page = &var_len_pages_.back();
if (cur_var_len_page->BytesRemaining() < total_var_len) {
- bool added;
- RETURN_IF_ERROR(TryAddPage(add_mode, &var_len_pages_, &added));
+ bool added = false;
+ if (MaxSortRunSizeReached<INITIAL_RUN>()) {
+ cur_fixed_len_page->FreeBytes(sort_tuple_size_);
+ *allocation_failed = false;
+ return Status::OK();
+ } else {
+ RETURN_IF_ERROR(TryAddPage(add_mode, &var_len_pages_, &added));
+ }
if (added) {
cur_var_len_page = &var_len_pages_.back();
} else {
@@ -272,7 +278,7 @@ Status Sorter::Run::AddBatchInternal(
// If there are still rows left to process, get a new page for the
fixed-length
// tuples. If the run is already too long, return.
- if (INITIAL_RUN && max_num_of_pages_ > 0 && run_size() >=
max_num_of_pages_){
+ if (MaxSortRunSizeReached<INITIAL_RUN>()) {
*allocation_failed = false;
return Status::OK();
}
@@ -290,6 +296,13 @@ Status Sorter::Run::AddBatchInternal(
return Status::OK();
}
+template <bool INITIAL_RUN>
+bool Sorter::Run::MaxSortRunSizeReached() {
+ DCHECK_EQ(INITIAL_RUN, initial_run_);
+ DCHECK(!INITIAL_RUN || max_num_of_pages_ == 0 || run_size() <=
max_num_of_pages_);
+ return (INITIAL_RUN && max_num_of_pages_ > 0 && run_size() ==
max_num_of_pages_);
+}
+
bool IsValidStructInSortingTuple(const ColumnType& struct_type) {
DCHECK(struct_type.IsStructType());
for (const ColumnType& child_type : struct_type.children) {
@@ -1016,8 +1029,7 @@ Sorter::Sorter(const TupleRowComparatorConfig&
tuple_row_comparator_config,
in_mem_sort_timer_(nullptr),
in_mem_merge_timer_(nullptr),
sorted_data_size_(nullptr),
- run_sizes_(nullptr),
- inmem_run_max_pages_(state->query_options().max_sort_run_size) {
+ run_sizes_(nullptr) {
switch (tuple_row_comparator_config.sorting_order_) {
case TSortingOrder::LEXICAL:
compare_less_than_.reset(
@@ -1029,6 +1041,8 @@ Sorter::Sorter(const TupleRowComparatorConfig&
tuple_row_comparator_config,
default:
DCHECK(false);
}
+ int max_sort_run_size = state->query_options().max_sort_run_size;
+ inmem_run_max_pages_ = max_sort_run_size >= 2 ? max_sort_run_size : 0;
if (estimated_input_size > 0) ComputeSpillEstimate(estimated_input_size);
}
diff --git a/be/src/service/query-options-test.cc
b/be/src/service/query-options-test.cc
index ca2072a45..3be966ed8 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -446,6 +446,16 @@ TEST(QueryOptions, SetSpecialOptions) {
TestError("-0.1");
TestError("Not a number!");
}
+ // MAX_SORT_RUN_SIZE should not be 1.
+ {
+ OptionDef<int32_t> key_def = MAKE_OPTIONDEF(max_sort_run_size);
+ auto TestOk = MakeTestOkFn(options, key_def);
+ auto TestError = MakeTestErrFn(options, key_def);
+ TestOk("-1", -1);
+ TestOk("0", 0);
+ TestError("1");
+ TestOk("2", 2);
+ }
}
void VerifyFilterTypes(const set<TRuntimeFilterType::type>& types,
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index beb104893..72b4f39c3 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1168,10 +1168,8 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type
option, const string& va
}
case TImpalaQueryOptions::MAX_SORT_RUN_SIZE: {
int32_t int32_t_val = 0;
- RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
- option, value, &int32_t_val));
- RETURN_IF_ERROR(
- QueryOptionValidator<int32_t>::NotEquals(option, int32_t_val, 1));
+ RETURN_IF_ERROR(QueryOptionParser::Parse<int32_t>(option, value,
&int32_t_val));
+ RETURN_IF_ERROR(QueryOptionValidator<int32_t>::NotEquals(option,
int32_t_val, 1));
query_options->__set_max_sort_run_size(int32_t_val);
break;
}
diff --git a/common/thrift/ImpalaService.thrift
b/common/thrift/ImpalaService.thrift
index 5c70d1019..ca7b7f25b 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -797,13 +797,13 @@ enum TImpalaQueryOptions {
// Valid values are in [1, 128]. Default to 128.
MAX_FRAGMENT_INSTANCES_PER_NODE = 156
- // Configures the in-memory sort algorithm used in the sorter. Determines the
- // maximum number of pages in an initial in-memory run (fixed + variable
length).
- // 0 means unlimited, which will create 1 big run with no in-memory merge
phase.
- // Setting any other other value can create multiple miniruns which leads to
an
- // in-memory merge phase. The minimum value in that case is 2.
- // Generally, with larger workloads the recommended value is 10 or more to
avoid
- // high fragmentation of variable length data.
+ // Configures the in-memory sort algorithm used in the sorter. Determines
the maximum
+ // number of pages in an initial in-memory run (fixed + variable length).
+ // Maximizing the sort run size can help mitigate back-pressure in the
sorter. It
+ // creates multiple miniruns and merges them in-memory. The run size must be
at least 2,
+ // but 10 or more are recommended to avoid high fragmentation of variable
length data.
+ // Setting 0 or a negative value disables the run size limitation.
+ // Defaults to 0 (disabled).
MAX_SORT_RUN_SIZE = 157;
// Allowing implicit casts with loss of precision, adds the capability to use
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test
b/testdata/workloads/functional-query/queries/QueryTest/set.test
index b042bd937..68d6941af 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -305,3 +305,8 @@ set PARQUET_LATE_MATERIALIZATION_THRESHOLD=0;
---- CATCH
Invalid value for query option PARQUET_LATE_MATERIALIZATION_THRESHOLD: Value
can't be 0
====
+---- QUERY
+set MAX_SORT_RUN_SIZE=1;
+---- CATCH
+Invalid value for query option MAX_SORT_RUN_SIZE: Value can't be 1, actual
value: 1
+====
\ No newline at end of file
diff --git
a/testdata/workloads/tpch/queries/sort-reservation-usage-single-node.test
b/testdata/workloads/tpch/queries/sort-reservation-usage-single-node.test
index 167278eaa..19100d7ee 100644
--- a/testdata/workloads/tpch/queries/sort-reservation-usage-single-node.test
+++ b/testdata/workloads/tpch/queries/sort-reservation-usage-single-node.test
@@ -5,8 +5,10 @@
# does not give up memory to the second sort. Scans the text formatted file so
that
# the scan uses less reservation.
# num_nodes = 1 is set for this file by the python test.
+set mt_dop=1;
+set max_scan_range_length=2MB;
set scratch_limit=0;
-set buffer_pool_limit=35m;
+set buffer_pool_limit=$BUFFER_POOL_LIMIT;
set default_spillable_buffer_size=64kb;
SELECT *
FROM (SELECT
diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py
index 000329d1b..b6df5087c 100644
--- a/tests/query_test/test_sort.py
+++ b/tests/query_test/test_sort.py
@@ -22,13 +22,20 @@ from copy import copy
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfNotHdfsMinicluster
from tests.common.test_dimensions import (
- # TODO: uncomment once IMPALA-13334 resolved.
- # add_exec_option_dimension,
+ add_exec_option_dimension,
create_exec_option_dimension)
-# Run sizes (number of pages per run) in sorter
-# TODO: uncomment once IMPALA-13334 resolved.
-# MAX_SORT_RUN_SIZE = [0, 2, 20]
+"""Run sizes (number of pages per run) in sorter.
+Values:
+ 0: there is no limit on the size of an in-memory run. The sorter will
allocate memory
+ to fit the data until it encounters some memory limit.
+ 2: an in-memory run can be at most 2 pages. This is the smallest possible
size of a run:
+ at least 1 page for fix-len data and 1 page for var-len data.
+ 20: an in-memory run can be at most 20 pages.
+Too small in-memory runs with var-len data can cause memory fragmentation,
therefore
+different memory or spilling limits are needed to trigger the same scenarios
in some test
+cases."""
+MAX_SORT_RUN_SIZE = [0, 2, 20]
def split_result_rows(result):
@@ -65,8 +72,7 @@ class TestQueryFullSort(ImpalaTestSuite):
def add_test_dimensions(cls):
super(TestQueryFullSort, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(cluster_sizes=[1]))
- # TODO: uncomment once IMPALA-13334 resolved.
- # add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
+ add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
if cls.exploration_strategy() == 'core':
cls.ImpalaTestMatrix.add_constraint(lambda v:
@@ -104,9 +110,11 @@ class TestQueryFullSort(ImpalaTestSuite):
a query is, the more sort runs are likely to be produced and spilled.
Case 1 : 0 SpilledRuns, because all rows fit within the maximum
reservation.
sort_run_bytes_limit is not enforced.
- Case 2 : 4 SpilledRuns, because sort node estimate that spill is
inevitable.
+ Case 2 : 4 or 5 SpilledRuns, because sort node estimates that spill is
inevitable.
So all runs are capped to 130m, including the first one."""
- options = [('2g', '100m', '0'), ('400m', '130m', '4')]
+ # max_sort_run_size > 0 will spill more in Case 2.
+ options = [('2g', '100m', '0'),
+ ('400m', '130m', ('5' if exec_option['max_sort_run_size'] > 0 else
'4'))]
for (mem_limit, sort_run_bytes_limit, spilled_runs) in options:
exec_option['mem_limit'] = mem_limit
exec_option['sort_run_bytes_limit'] = sort_run_bytes_limit
@@ -156,7 +164,12 @@ class TestQueryFullSort(ImpalaTestSuite):
exec_option = copy(vector.get_value('exec_option'))
exec_option['disable_outermost_topn'] = 1
- exec_option['mem_limit'] = "134m"
+ # With max_sort_run_size=2 (2 pages per run) the varlen data is more
fragmented and
+ # requires a higher limit to maintain "TotalMergesPerformed: 1" assertion.
+ if exec_option['max_sort_run_size'] == 2:
+ exec_option['mem_limit'] = "144m"
+ else:
+ exec_option['mem_limit'] = "134m"
table_format = vector.get_value('table_format')
query_result = self.execute_query(query, exec_option,
table_format=table_format)
@@ -228,8 +241,15 @@ class TestQueryFullSort(ImpalaTestSuite):
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
def test_sort_reservation_usage(self, vector):
"""Tests for sorter reservation usage.
- Run with num_nodes=1 to make execution more deterministic."""
- self.run_test_case('sort-reservation-usage-single-node', vector)
+ If max_sort_run_size > 0, the larger the run size, the sooner the sorter
can give up
+ memory to the next node."""
+ if vector.get_value('exec_option')['max_sort_run_size'] == 2:
+ # Increase buffer_limit to maintain such that query never spill.
+ buffer_pool_limit = '27m'
+ else:
+ buffer_pool_limit = '14m'
+ self.run_test_case('sort-reservation-usage-single-node', vector,
+ test_file_vars={'$BUFFER_POOL_LIMIT':
buffer_pool_limit})
class TestRandomSort(ImpalaTestSuite):
@@ -240,8 +260,7 @@ class TestRandomSort(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
super(TestRandomSort, cls).add_test_dimensions()
- # TODO: uncomment once IMPALA-13334 resolved.
- # add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
+ add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
if cls.exploration_strategy() == 'core':
cls.ImpalaTestMatrix.add_constraint(lambda v:
@@ -296,8 +315,7 @@ class TestPartialSort(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
super(TestPartialSort, cls).add_test_dimensions()
- # TODO: uncomment once IMPALA-13334 resolved.
- # add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
+ add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
if cls.exploration_strategy() == 'core':
cls.ImpalaTestMatrix.add_constraint(lambda v:
@@ -354,8 +372,7 @@ class TestArraySort(ImpalaTestSuite):
def add_test_dimensions(cls):
super(TestArraySort, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(cluster_sizes=[1]))
- # TODO: uncomment once IMPALA-13334 resolved.
- # add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
+ add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
# The table we use is a parquet table.
cls.ImpalaTestMatrix.add_constraint(lambda v:
@@ -384,7 +401,8 @@ class TestArraySort(ImpalaTestSuite):
table_format = vector.get_value('table_format')
query_result = self.execute_query(query, exec_option,
table_format=table_format)
- assert "SpilledRuns: 3" in query_result.runtime_profile
+ # Check that spilling was successful.
+ assert re.search(r'\s+\- SpilledRuns: [1-9]', query_result.runtime_profile)
# Split result rows (strings) into columns.
result = split_result_rows(query_result.data)