This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 80c1d2dba IMPALA-4530: Implement in-memory merge of quicksorted runs
80c1d2dba is described below
commit 80c1d2dbaabc78bdf1b6e4da5475bfa365cd375e
Author: noemi <[email protected]>
AuthorDate: Wed Apr 6 16:36:27 2022 +0200
IMPALA-4530: Implement in-memory merge of quicksorted runs
This change aims to decrease back-pressure in the sorter. It offers an
alternative for the in-memory run formation strategy and sorting
algorithm by introducing a new in-memory merge level between the
in-memory quicksort and the external merge phase.
Instead of forming one big run, it produces many smaller in-memory runs
(called miniruns), sorts those with quicksort, then merges them
in memory, before spilling or serving GetNext().
The external merge phase remains the same.
Works with MAX_SORT_RUN_SIZE development query option that determines
the maximum number of pages in a 'minirun'. The default value of
MAX_SORT_RUN_SIZE is 0, which keeps the original implementation of 1
big initial in-memory run. Other options are integers of 2 and above.
The recommended value is 10 or more, to avoid high fragmentation
in case of large workloads and variable length data.
Testing:
- added MAX_SORT_RUN_SIZE as an additional test dimension to
test_sort.py with values [0, 2, 20]
- additional partial sort test case (inserting into partitioned
kudu table)
- manual E2E testing
Change-Id: I58c0ae112e279b93426752895ded7b1a3791865c
Reviewed-on: http://gerrit.cloudera.org:8080/18393
Reviewed-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Csaba Ringhofer <[email protected]>
Tested-by: Csaba Ringhofer <[email protected]>
---
be/src/exec/partial-sort-node.cc | 8 +-
be/src/exec/partial-sort-node.h | 3 +
be/src/runtime/sorter-internal.h | 40 +++++-
be/src/runtime/sorter.cc | 272 +++++++++++++++++++++++++++++++++----
be/src/runtime/sorter.h | 56 ++++++--
be/src/service/query-options.cc | 9 ++
be/src/service/query-options.h | 5 +-
be/src/util/tuple-row-compare.h | 22 ++-
bin/perf_tools/perf-query.sh | 2 +-
common/thrift/ImpalaService.thrift | 10 ++
common/thrift/Query.thrift | 5 +
tests/query_test/test_sort.py | 82 +++++++++--
12 files changed, 452 insertions(+), 62 deletions(-)
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index 1fadbe523..5919bb46a 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -66,6 +66,7 @@ PartialSortNode::PartialSortNode(
input_eos_(false),
sorter_eos_(true) {
runtime_profile()->AddInfoString("SortType", "Partial");
+ child_get_next_timer_ = ADD_SUMMARY_STATS_TIMER(runtime_profile(),
"ChildGetNextTime");
}
PartialSortNode::~PartialSortNode() {
@@ -143,7 +144,12 @@ Status PartialSortNode::GetNext(RuntimeState* state,
RowBatch* row_batch, bool*
if (input_batch_index_ == input_batch_->num_rows()) {
input_batch_->Reset();
input_batch_index_ = 0;
- RETURN_IF_ERROR(child(0)->GetNext(state, input_batch_.get(),
&input_eos_));
+ MonotonicStopWatch timer;
+ timer.Start();
+ Status status = child(0)->GetNext(state, input_batch_.get(),
&input_eos_);
+ timer.Stop();
+ RETURN_IF_ERROR(status);
+ child_get_next_timer_->UpdateCounter(timer.ElapsedTime());
}
int num_processed;
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
index a02bb2fca..0f1ebe456 100644
--- a/be/src/exec/partial-sort-node.h
+++ b/be/src/exec/partial-sort-node.h
@@ -86,6 +86,9 @@ class PartialSortNode : public ExecNode {
const TupleRowComparatorConfig& tuple_row_comparator_config_;
+ /// Min, max, and avg time spent in calling GetNext on child
+ RuntimeProfile::SummaryStatsCounter* child_get_next_timer_;
+
/////////////////////////////////////////
/// BEGIN: Members that must be Reset()
diff --git a/be/src/runtime/sorter-internal.h b/be/src/runtime/sorter-internal.h
index 7661051ba..a974af456 100644
--- a/be/src/runtime/sorter-internal.h
+++ b/be/src/runtime/sorter-internal.h
@@ -114,13 +114,22 @@ class Sorter::Page {
///
/// Runs are either "initial runs" constructed from the sorter's input by
evaluating
/// the expressions in 'sort_tuple_exprs_' or "intermediate runs" constructed
-/// by merging already-sorted runs. Initial runs are sorted in-place in
memory. Once
-/// sorted, runs can be spilled to disk to free up memory. Sorted runs are
merged by
+/// by merging already-sorted runs. Initial runs are sorted in-place in memory.
+/// Once sorted, runs can be spilled to disk to free up memory. Sorted runs
are merged by
/// SortedRunMerger, either to produce the final sorted output or to produce
another
/// sorted run.
+/// By default, the size of initial runs is determined by the available
memory: the
+/// sorter tries to add batches to the run until some (memory) limit is
reached.
+/// Some query options can also limit the size of an initial (or in-memory)
run.
+/// SORT_RUN_BYTES_LIMIT triggers spilling after the size of data in the run
exceeds the
+/// given threshold (usually expressed in MB or GB).
+/// MAX_SORT_RUN_SIZE allows constructing runs up to a certain size by
limiting the
+/// number of pages in the initial runs. These smaller in-memory runs are also
referred
+/// to as 'miniruns'. Miniruns are not spilled immediately, but sorted
in-place first,
+/// and collected to be merged in memory before spilling the produced output
run to disk.
///
/// The expected calling sequence of functions is as follows:
-/// * Init() to initialize the run and allocate initial pages.
+/// * Init() or TryInit() to initialize the run and allocate initial pages.
/// * Add*Batch() to add batches of tuples to the run.
/// * FinalizeInput() to signal that no more batches will be added.
/// * If the run is unsorted, it must be sorted. After that set_sorted() must
be called.
@@ -141,13 +150,23 @@ class Sorter::Run {
/// var-len data into var_len_copy_page_.
Status Init();
+ /// Similar to Init(), except for the following differences:
+ /// It is only used to initialize miniruns (query option MAX_SORT_RUN_SIZE >
0 cases).
+ /// The first in-memory run is always initialized by calling Init(), because
that must
+ /// succeed. The following ones are initialized by TryInit().
+ /// TryInit() allocates one fixed-len page and one var-len page if
'has_var_len_slots_'
+ /// is true. There is no need for var_len_copy_page here. Returns false if
+ /// initialization was successful, returns true, if reservation was not
enough.
+ Status TryInit(bool* allocation_failed);
+
/// Add the rows from 'batch' starting at 'start_index' to the current run.
Returns
/// the number of rows actually added in 'num_processed'. If the run is full
(no more
/// pages can be allocated), 'num_processed' may be less than the number of
remaining
/// rows in the batch. AddInputBatch() materializes the input rows using the
/// expressions in sorter_->sort_tuple_expr_evals_, while
AddIntermediateBatch() just
/// copies rows.
- Status AddInputBatch(RowBatch* batch, int start_index, int* num_processed);
+ Status AddInputBatch(
+ RowBatch* batch, int start_index, int* num_processed, bool*
allocation_failed);
Status AddIntermediateBatch(RowBatch* batch, int start_index, int*
num_processed);
@@ -199,6 +218,9 @@ class Sorter::Run {
bool is_finalized() const { return is_finalized_; }
bool is_sorted() const { return is_sorted_; }
void set_sorted() { is_sorted_ = true; }
+ int max_num_of_pages() const { return max_num_of_pages_; }
+ int fixed_len_size() { return fixed_len_pages_.size(); }
+ int run_size() { return fixed_len_pages_.size() + var_len_pages_.size(); }
int64_t num_tuples() const { return num_tuples_; }
/// Returns true if we have var-len pages in the run.
bool HasVarLenPages() const {
@@ -215,8 +237,8 @@ class Sorter::Run {
/// INITIAL_RUN and HAS_VAR_LEN_SLOTS are template arguments for performance
and must
/// match 'initial_run_' and 'has_var_len_slots_'.
template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
- Status AddBatchInternal(
- RowBatch* batch, int start_index, int* num_processed);
+ Status AddBatchInternal(RowBatch* batch, int start_index, int* num_processed,
+ bool* allocation_failed);
/// Finalize the list of pages: delete empty final pages and unpin the
previous page
/// if the run is unpinned.
@@ -352,6 +374,12 @@ class Sorter::Run {
/// Used to implement GetNextBatch() interface required for the merger.
boost::scoped_ptr<RowBatch> buffered_batch_;
+ /// Max number of fixed-len + var-len pages in an in-memory minirun. It
defines the
+ /// length of a minirun.
+ /// The default value is 0 which means that only 1 in-memory run will be
created, and
+ /// its size will be determined by other limits eg. memory or
sort_run_bytes_limit.
+ int max_num_of_pages_;
+
/// Members used when a run is read in GetNext().
/// The index into 'fixed_' and 'var_len_pages_' of the pages being read in
GetNext().
int fixed_len_pages_index_;
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index ae885252b..93a4cb908 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -121,11 +121,12 @@ Sorter::Run::Run(Sorter* parent, TupleDescriptor*
sort_tuple_desc, bool initial_
is_pinned_(initial_run),
is_finalized_(false),
is_sorted_(!initial_run),
- num_tuples_(0) {}
+ num_tuples_(0),
+ max_num_of_pages_(initial_run ? parent->inmem_run_max_pages_ : 0) {}
Status Sorter::Run::Init() {
- int num_to_create = 1 + has_var_len_slots_
- + (has_var_len_slots_ && initial_run_ && sorter_->enable_spilling_);
+ int num_to_create = 1 + has_var_len_slots_ + (has_var_len_slots_ &&
initial_run_ &&
+ (sorter_->enable_spilling_ && max_num_of_pages_ == 0));
int64_t required_mem = num_to_create * sorter_->page_len_;
if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
return Status(Substitute(
@@ -152,9 +153,32 @@ Status Sorter::Run::Init() {
return Status::OK();
}
+Status Sorter::Run::TryInit(bool* allocation_failed) {
+ *allocation_failed = true;
+ DCHECK_GT(sorter_->inmem_run_max_pages_, 0);
+ // No need for additional copy page because var-len data is not reordered.
+ // The in-memory merger can copy var-len data directly from the in-memory
runs,
+ // which are kept until the merge is finished
+ int num_to_create = 1 + has_var_len_slots_;
+ int64_t required_mem = num_to_create * sorter_->page_len_;
+ if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
+ return Status::OK();
+ }
+
+ RETURN_IF_ERROR(AddPage(&fixed_len_pages_));
+ if (has_var_len_slots_) {
+ RETURN_IF_ERROR(AddPage(&var_len_pages_));
+ }
+ if (initial_run_) {
+ sorter_->initial_runs_counter_->Add(1);
+ }
+ *allocation_failed = false;
+ return Status::OK();
+}
+
template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
Status Sorter::Run::AddBatchInternal(
- RowBatch* batch, int start_index, int* num_processed) {
+ RowBatch* batch, int start_index, int* num_processed, bool*
allocation_failed) {
DCHECK(!is_finalized_);
DCHECK(!fixed_len_pages_.empty());
DCHECK_EQ(HAS_VAR_LEN_SLOTS, has_var_len_slots_);
@@ -226,6 +250,7 @@ Status Sorter::Run::AddBatchInternal(
// There was not enough space in the last var-len page for this
tuple, and
// the run could not be extended. Return the fixed-len allocation
and exit.
cur_fixed_len_page->FreeBytes(sort_tuple_size_);
+ *allocation_failed = true;
return Status::OK();
}
}
@@ -246,13 +271,21 @@ Status Sorter::Run::AddBatchInternal(
// If there are still rows left to process, get a new page for the
fixed-length
// tuples. If the run is already too long, return.
+ if (INITIAL_RUN && max_num_of_pages_ > 0 && run_size() >=
max_num_of_pages_){
+ *allocation_failed = false;
+ return Status::OK();
+ }
if (cur_input_index < batch->num_rows()) {
bool added;
RETURN_IF_ERROR(TryAddPage(add_mode, &fixed_len_pages_, &added));
- if (!added) return Status::OK();
+ if (!added) {
+ *allocation_failed = true;
+ return Status::OK();
+ }
cur_fixed_len_page = &fixed_len_pages_.back();
}
}
+ *allocation_failed = false;
return Status::OK();
}
@@ -773,22 +806,28 @@ int64_t Sorter::Run::TotalBytes() const {
return total_bytes;
}
-Status Sorter::Run::AddInputBatch(RowBatch* batch, int start_index, int*
num_processed) {
+Status Sorter::Run::AddInputBatch(RowBatch* batch, int start_index, int*
num_processed,
+ bool* allocation_failed) {
DCHECK(initial_run_);
if (has_var_len_slots_) {
- return AddBatchInternal<true, true>(batch, start_index, num_processed);
+ return AddBatchInternal<true, true>(
+ batch, start_index, num_processed, allocation_failed);
} else {
- return AddBatchInternal<false, true>(batch, start_index, num_processed);
+ return AddBatchInternal<false, true>(
+ batch, start_index, num_processed, allocation_failed);
}
}
Status Sorter::Run::AddIntermediateBatch(
RowBatch* batch, int start_index, int* num_processed) {
DCHECK(!initial_run_);
+ bool allocation_failed = false;
if (has_var_len_slots_) {
- return AddBatchInternal<true, false>(batch, start_index, num_processed);
+ return AddBatchInternal<true, false>(
+ batch, start_index, num_processed, &allocation_failed);
} else {
- return AddBatchInternal<false, false>(batch, start_index, num_processed);
+ return AddBatchInternal<false, false>(
+ batch, start_index, num_processed, &allocation_failed);
}
}
@@ -909,8 +948,10 @@ Sorter::Sorter(const TupleRowComparatorConfig&
tuple_row_comparator_config,
initial_runs_counter_(nullptr),
num_merges_counter_(nullptr),
in_mem_sort_timer_(nullptr),
+ in_mem_merge_timer_(nullptr),
sorted_data_size_(nullptr),
- run_sizes_(nullptr) {
+ run_sizes_(nullptr),
+ inmem_run_max_pages_(state->query_options().max_sort_run_size) {
switch (tuple_row_comparator_config.sorting_order_) {
case TSortingOrder::LEXICAL:
compare_less_than_.reset(
@@ -922,13 +963,13 @@ Sorter::Sorter(const TupleRowComparatorConfig&
tuple_row_comparator_config,
default:
DCHECK(false);
}
-
if (estimated_input_size > 0) ComputeSpillEstimate(estimated_input_size);
}
Sorter::~Sorter() {
DCHECK(sorted_runs_.empty());
DCHECK(merging_runs_.empty());
+ DCHECK(sorted_inmem_runs_.empty());
DCHECK(unsorted_run_ == nullptr);
DCHECK(merge_output_run_ == nullptr);
}
@@ -977,6 +1018,7 @@ Status Sorter::Prepare(ObjectPool* obj_pool) {
initial_runs_counter_ = ADD_COUNTER(profile_, "RunsCreated", TUnit::UNIT);
}
in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime");
+ in_mem_merge_timer_ = ADD_TIMER(profile_, "InMemoryMergeTime");
sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES);
run_sizes_ = ADD_SUMMARY_STATS_COUNTER(profile_, "NumRowsPerRun",
TUnit::UNIT);
@@ -1016,12 +1058,23 @@ Status Sorter::AddBatch(RowBatch* batch) {
RETURN_IF_ERROR(AddBatchNoSpill(batch, cur_batch_index, &num_processed));
cur_batch_index += num_processed;
+
if (MustSortAndSpill(cur_batch_index, batch->num_rows())) {
- // The current run is full. Sort it, spill it and begin the next one.
- int64_t unsorted_run_bytes = unsorted_run_->TotalBytes();
RETURN_IF_ERROR(state_->StartSpilling(mem_tracker_));
- RETURN_IF_ERROR(SortCurrentInputRun());
- RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
+ int64_t unsorted_run_bytes = unsorted_run_->TotalBytes();
+
+ if (inmem_run_max_pages_ == 0) {
+ // The current run is full. Sort it and spill it.
+ RETURN_IF_ERROR(SortCurrentInputRun());
+ sorted_runs_.push_back(unsorted_run_);
+ unsorted_run_ = nullptr;
+ RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
+ } else {
+ // The memory is full with miniruns. Sort, merge and spill them.
+ RETURN_IF_ERROR(MergeAndSpill());
+ }
+
+ // After we freed memory by spilling, initialize the next run.
unsorted_run_ =
run_pool_.Add(new Run(this,
output_row_desc_->tuple_descriptors()[0], true));
RETURN_IF_ERROR(unsorted_run_->Init());
@@ -1058,6 +1111,70 @@ int64_t Sorter::GetSortRunBytesLimit() const {
}
}
+Status Sorter::InitializeNewMinirun(bool* allocation_failed) {
+ // The minirun reached its size limit (max_num_of_pages).
+ // We should sort the run, append to the sorted miniruns, and start a new
minirun.
+ DCHECK(!*allocation_failed && unsorted_run_->run_size() ==
inmem_run_max_pages_);
+
+ // When the first minirun is full, and there are more tuples to come, we
first
+ // need to ensure that these in-memory miniruns can be merged later, by
trying
+ // to reserve pages for the output run of the in-memory merger. Only if this
+ // initialization was successful, can we move on to create the 2nd inmem run.
+ // If it fails and/or only 1 inmem_run could fit into memory, start spilling.
+ // No need to initialize 'merge_output_run_' if spilling is disabled,
because the
+ // output will be read directly from the merger in GetNext().
+ if (enable_spilling_ && sorted_inmem_runs_.empty()) {
+ DCHECK(merge_output_run_ == nullptr) << "Should have finished previous
merge.";
+ merge_output_run_ = run_pool_.Add(
+ new Run(this, output_row_desc_->tuple_descriptors()[0], false));
+ RETURN_IF_ERROR(merge_output_run_->TryInit(allocation_failed));
+ if (*allocation_failed) {
+ return Status::OK();
+ }
+ }
+ RETURN_IF_ERROR(SortCurrentInputRun());
+ sorted_inmem_runs_.push_back(unsorted_run_);
+ unsorted_run_ =
+ run_pool_.Add(new Run(this, output_row_desc_->tuple_descriptors()[0],
true));
+ RETURN_IF_ERROR(unsorted_run_->TryInit(allocation_failed));
+ if (*allocation_failed) {
+ unsorted_run_->CloseAllPages();
+ }
+ return Status::OK();
+}
+
+Status Sorter::MergeAndSpill() {
+ // The last minirun might have been created just before we ran out of memory.
+ // In this case it should not be sorted and merged.
+ if (unsorted_run_->run_size() == 0){
+ unsorted_run_ = nullptr;
+ } else {
+ RETURN_IF_ERROR(SortCurrentInputRun());
+ sorted_inmem_runs_.push_back(unsorted_run_);
+ }
+
+ // If only 1 run was created, do not merge.
+ if (sorted_inmem_runs_.size() == 1) {
+ sorted_runs_.push_back(sorted_inmem_runs_.back());
+ sorted_inmem_runs_.clear();
+ DCHECK_GT(sorted_runs_.back()->fixed_len_size(), 0);
+ RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
+ // If 'merge_output_run_' was initialized but no merge was executed,
+ // set it back to nullptr.
+ if (merge_output_run_ != nullptr){
+ merge_output_run_->CloseAllPages();
+ merge_output_run_ = nullptr;
+ }
+ } else {
+ DCHECK(merge_output_run_ != nullptr) << "Should have reserved memory for
the merger.";
+ RETURN_IF_ERROR(MergeInMemoryRuns());
+ DCHECK(merge_output_run_ == nullptr) << "Should have finished previous
merge.";
+ }
+
+ DCHECK(sorted_inmem_runs_.empty());
+ return Status::OK();
+}
+
bool Sorter::MustSortAndSpill(const int rows_added, const int batch_num_rows) {
if (rows_added < batch_num_rows) {
return true;
@@ -1086,7 +1203,28 @@ void Sorter::TryLowerMemUpToSortRunBytesLimit() {
Status Sorter::AddBatchNoSpill(RowBatch* batch, int start_index, int*
num_processed) {
DCHECK(batch != nullptr);
- RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index,
num_processed));
+ bool allocation_failed = false;
+
+ RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index,
num_processed,
+ &allocation_failed));
+
+ if (inmem_run_max_pages_ > 0) {
+ start_index += *num_processed;
+ // We try to add the entire input batch. If it does not fit into 1 minirun,
+ // initialize a new one.
+ while (!allocation_failed && start_index < batch->num_rows()) {
+ RETURN_IF_ERROR(InitializeNewMinirun(&allocation_failed));
+ if (allocation_failed) {
+ break;
+ }
+ int processed = 0;
+ RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index,
&processed,
+ &allocation_failed));
+ start_index += processed;
+ *num_processed += processed;
+ }
+ }
+
// Clear any temporary allocations made while materializing the sort tuples.
expr_results_pool_.Clear();
return Status::OK();
@@ -1096,6 +1234,45 @@ Status Sorter::InputDone() {
// Sort the tuples in the last run.
RETURN_IF_ERROR(SortCurrentInputRun());
+ if (inmem_run_max_pages_ > 0) {
+ sorted_inmem_runs_.push_back(unsorted_run_);
+ unsorted_run_ = nullptr;
+ if (!HasSpilledRuns()) {
+ if (sorted_inmem_runs_.size() == 1) {
+ DCHECK(sorted_inmem_runs_.back()->is_pinned());
+ DCHECK(merge_output_run_ == nullptr);
+ RETURN_IF_ERROR(sorted_inmem_runs_.back()->PrepareRead());
+ return Status::OK();
+ }
+ if (enable_spilling_) {
+ DCHECK(merge_output_run_ != nullptr);
+ merge_output_run_->CloseAllPages();
+ merge_output_run_ = nullptr;
+ }
+ // 'merge_output_run_' is not initialized for partial sort, because the
output
+ // will be read directly from the merger.
+ DCHECK(enable_spilling_ || merge_output_run_ == nullptr);
+ return CreateMerger(sorted_inmem_runs_.size(), false);
+ }
+ DCHECK(enable_spilling_);
+
+ if (sorted_inmem_runs_.size() == 1) {
+ sorted_runs_.push_back(sorted_inmem_runs_.back());
+ sorted_inmem_runs_.clear();
+ DCHECK_GT(sorted_runs_.back()->run_size(), 0);
+ RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
+ } else {
+ RETURN_IF_ERROR(MergeInMemoryRuns());
+ }
+ DCHECK(sorted_inmem_runs_.empty());
+ // Merge intermediate runs until we have a final merge set-up.
+ return MergeIntermediateRuns();
+ } else {
+ sorted_runs_.push_back(unsorted_run_);
+ }
+
+ unsorted_run_ = nullptr;
+
if (sorted_runs_.size() == 1) {
// The entire input fit in one run. Read sorted rows in GetNext() directly
from the
// in-memory sorted run.
@@ -1114,10 +1291,19 @@ Status Sorter::InputDone() {
return MergeIntermediateRuns();
}
+
Status Sorter::GetNext(RowBatch* output_batch, bool* eos) {
- if (sorted_runs_.size() == 1) {
+ if (sorted_inmem_runs_.size() == 1 && !HasSpilledRuns()) {
+ DCHECK(sorted_inmem_runs_.back()->is_pinned());
+ return sorted_inmem_runs_.back()->GetNext<false>(output_batch, eos);
+ } else if (inmem_run_max_pages_ == 0 && sorted_runs_.size() == 1) {
DCHECK(sorted_runs_.back()->is_pinned());
return sorted_runs_.back()->GetNext<false>(output_batch, eos);
+ } else if (inmem_run_max_pages_ > 0 && !HasSpilledRuns()) {
+ RETURN_IF_ERROR(merger_->GetNext(output_batch, eos));
+ // Clear any temporary allocations made by the merger.
+ expr_results_pool_.Clear();
+ return Status::OK();
} else {
RETURN_IF_ERROR(merger_->GetNext(output_batch, eos));
// Clear any temporary allocations made by the merger.
@@ -1144,6 +1330,7 @@ void Sorter::Close(RuntimeState* state) {
}
void Sorter::CleanupAllRuns() {
+ Run::CleanupRuns(&sorted_inmem_runs_);
Run::CleanupRuns(&sorted_runs_);
Run::CleanupRuns(&merging_runs_);
if (unsorted_run_ != nullptr) unsorted_run_->CloseAllPages();
@@ -1160,10 +1347,8 @@ Status Sorter::SortCurrentInputRun() {
SCOPED_TIMER(in_mem_sort_timer_);
RETURN_IF_ERROR(in_mem_tuple_sorter_->Sort(unsorted_run_));
}
- sorted_runs_.push_back(unsorted_run_);
sorted_data_size_->Add(unsorted_run_->TotalBytes());
run_sizes_->UpdateCounter(unsorted_run_->num_tuples());
- unsorted_run_ = nullptr;
RETURN_IF_CANCELLED(state_);
return Status::OK();
@@ -1236,8 +1421,31 @@ int Sorter::GetNumOfRunsForMerge() const {
return max_runs_in_next_merge;
}
+Status Sorter::MergeInMemoryRuns() {
+ DCHECK_GE(sorted_inmem_runs_.size(), 2);
+ DCHECK_GT(sorted_inmem_runs_.back()->run_size(), 0);
+
+ // No need to allocate more memory before doing in-memory merges, because the
+ // buffers of the in-memory runs are already open and they fit into memory.
+ // The merge output run is already initialized, too.
+
+ DCHECK(merge_output_run_ != nullptr) << "Should have initialized output run
for merge.";
+ RETURN_IF_ERROR(CreateMerger(sorted_inmem_runs_.size(), false));
+ {
+ SCOPED_TIMER(in_mem_merge_timer_);
+ RETURN_IF_ERROR(ExecuteIntermediateMerge(merge_output_run_));
+ }
+ spilled_runs_counter_->Add(1);
+ sorted_runs_.push_back(merge_output_run_);
+ DCHECK_GT(sorted_runs_.back()->fixed_len_size(), 0);
+ merge_output_run_ = nullptr;
+ DCHECK(sorted_inmem_runs_.empty());
+ return Status::OK();
+}
+
Status Sorter::MergeIntermediateRuns() {
DCHECK_GE(sorted_runs_.size(), 2);
+ DCHECK_GT(sorted_runs_.back()->fixed_len_size(), 0);
// Attempt to allocate more memory before doing intermediate merges. This may
// be possible if other operators have relinquished memory after the sort
has built
@@ -1248,7 +1456,7 @@ Status Sorter::MergeIntermediateRuns() {
int num_of_runs_to_merge = GetNumOfRunsForMerge();
DCHECK(merge_output_run_ == nullptr) << "Should have finished previous
merge.";
- RETURN_IF_ERROR(CreateMerger(num_of_runs_to_merge));
+ RETURN_IF_ERROR(CreateMerger(num_of_runs_to_merge, true));
// If CreateMerger() consumed all the sorted runs, we have set up the
final merge.
if (sorted_runs_.empty()) return Status::OK();
@@ -1263,9 +1471,16 @@ Status Sorter::MergeIntermediateRuns() {
return Status::OK();
}
-Status Sorter::CreateMerger(int num_runs) {
+Status Sorter::CreateMerger(int num_runs, bool external) {
+ std::deque<impala::Sorter::Run *>* runs_to_merge;
+
+ if (external) {
+ DCHECK_GE(sorted_runs_.size(), 2);
+ runs_to_merge = &sorted_runs_;
+ } else {
+ runs_to_merge = &sorted_inmem_runs_;
+ }
DCHECK_GE(num_runs, 2);
- DCHECK_GE(sorted_runs_.size(), 2);
// Clean up the runs from the previous merge.
Run::CleanupRuns(&merging_runs_);
@@ -1273,24 +1488,25 @@ Status Sorter::CreateMerger(int num_runs) {
// from the runs being merged. This is unnecessary overhead that is not
required if we
// correctly transfer resources.
merger_.reset(
- new SortedRunMerger(*compare_less_than_, output_row_desc_, profile_,
true,
+ new SortedRunMerger(*compare_less_than_, output_row_desc_, profile_,
external,
codegend_heapify_helper_fn_));
vector<function<Status (RowBatch**)>> merge_runs;
merge_runs.reserve(num_runs);
for (int i = 0; i < num_runs; ++i) {
- Run* run = sorted_runs_.front();
+ Run* run = runs_to_merge->front();
RETURN_IF_ERROR(run->PrepareRead());
// Run::GetNextBatch() is used by the merger to retrieve a batch of rows
to merge
// from this run.
merge_runs.emplace_back(bind<Status>(mem_fn(&Run::GetNextBatch), run, _1));
- sorted_runs_.pop_front();
+ runs_to_merge->pop_front();
merging_runs_.push_back(run);
}
RETURN_IF_ERROR(merger_->Prepare(merge_runs));
-
- num_merges_counter_->Add(1);
+ if (external) {
+ num_merges_counter_->Add(1);
+ }
return Status::OK();
}
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index c83bf81cc..39617e8e3 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -105,7 +105,7 @@ class Sorter {
/// 'enable_spilling' should be set to false to reduce the number of
requested buffers
/// if the caller will use AddBatchNoSpill().
/// 'codegend_sort_helper_fn' is a reference to the codegen version of
- /// the Sorter::TupleSorter::SortHelp() method.
+ /// the Sorter::TupleSorter::SortHelper() method.
/// 'estimated_input_size' is the total rows in bytes that are estimated to
get added
/// into this sorter. This is used to decide if sorter needs to proactively
spill for
/// the first run. -1 value means estimate is unavailable.
@@ -166,20 +166,39 @@ class Sorter {
/// Return true if the sorter has any spilled runs.
bool HasSpilledRuns() const;
+ /// The logic in AddBatchNoSpill() that handles the different cases when
+ /// AddBatchInternal() returns without processing the entire batch.
+ /// In this case we need to initialize a new minirun for the incoming rows.
+ /// Tries initializing a new minirun. If the initialization was successful,
+ /// allocation_failed is set to false, otherwise (e.g. a memory limit is
hit) true.
+ Status InitializeNewMinirun(bool* allocation_failed) WARN_UNUSED_RESULT;
+
private:
class Page;
class Run;
- /// Minimum value for sot_run_bytes_limit query option.
+ /// Minimum value for sort_run_bytes_limit query option.
static const int64_t MIN_SORT_RUN_BYTES_LIMIT = 32 << 20; // 32 MB
+ /// Merges multiple smaller runs in sorted_inmem_runs_ into a single larger
merged
+ /// run that can be spilled page by page during the process, until all pages
from
+ /// sorted_inmem_runs are consumed and freed.
+ /// Always performs one-level merge and spills the output run to disc.
Therefore there
+ /// is no need to deep-copy the input, since the pages containing var-len
data
+ /// remain in the memory until the end of the in-memory merge.
+ /// TODO: delete pages that are consumed during merge asap.
+ Status MergeInMemoryRuns() WARN_UNUSED_RESULT;
+
/// Create a SortedRunMerger from sorted runs in 'sorted_runs_' and assign
it to
/// 'merger_'. 'num_runs' indicates how many runs should be covered by the
current
- /// merging attempt. Returns error if memory allocation fails during in
- /// Run::PrepareRead(). The runs to be merged are removed from
'sorted_runs_'. The
+ /// merging attempt. Returns error if memory allocation fails in
Run::PrepareRead().
+ /// The runs to be merged are removed from 'sorted_runs_'.
+ /// If 'external' is set to true, it performs an external merge, and the
/// Sorter sets the 'deep_copy_input' flag to true for the merger, since the
pages
/// containing input run data will be deleted as input runs are read.
- Status CreateMerger(int num_runs) WARN_UNUSED_RESULT;
+ /// If 'external' is false, it creates an in-memory merger for the in-memory
miniruns.
+ /// In this case, 'deep_copy_input' is set to false.
+ Status CreateMerger(int num_runs, bool external) WARN_UNUSED_RESULT;
/// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single
larger
/// merged run until there are few enough runs to be merged with a single
merger.
@@ -193,6 +212,10 @@ class Sorter {
/// and adding them to 'merged_run'.
Status ExecuteIntermediateMerge(Sorter::Run* merged_run) WARN_UNUSED_RESULT;
+ /// Handles cases in AddBatch where a memory limit is reached and spilling
must start.
+ /// Used only in case of multiple in-memory runs set by query option
MAX_SORT_RUN_SIZE.
+ inline Status MergeAndSpill() WARN_UNUSED_RESULT;
+
/// Called once there no more rows to be added to 'unsorted_run_'. Sorts
/// 'unsorted_run_' and appends it to the list of sorted runs.
Status SortCurrentInputRun() WARN_UNUSED_RESULT;
@@ -266,8 +289,8 @@ class Sorter {
const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn>&
codegend_heapify_helper_fn_;
/// A default codegened function pointer storing nullptr, which is used when
the
- /// merger is not needed. Used as a default value in constructor, when the
CodegenFnPtr
- /// is not provided.
+ /// merger is not needed. Used as a default value in the constructor, when
the
+ /// CodegenFnPtr is not provided.
static const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn>
default_heapify_helper_fn_;
/// Client used to allocate pages from the buffer pool. Not owned.
@@ -302,9 +325,13 @@ class Sorter {
/// When it is added to sorted_runs_, it is set to NULL.
Run* unsorted_run_;
- /// List of sorted runs that have been produced but not merged.
unsorted_run_ is added
- /// to this list after an in-memory sort. Sorted runs produced by
intermediate merges
- /// are also added to this list during the merge. Runs are added to the
object pool.
+ /// List of quicksorted miniruns before merging in memory.
+ std::deque<Run*> sorted_inmem_runs_;
+
+ /// List of sorted runs that have been produced but not merged.
'unsorted_run_' is
+ /// added to this list after an in-memory sort. Sorted runs produced by
intermediate
+ /// merges are also added to this list during the merge. Runs are added to
the
+ /// object pool.
std::deque<Run*> sorted_runs_;
/// Merger object (intermediate or final) currently used to produce sorted
runs.
@@ -345,6 +372,9 @@ class Sorter {
/// Time spent sorting initial runs in memory.
RuntimeProfile::Counter* in_mem_sort_timer_;
+ /// Time spent merging initial miniruns in memory.
+ RuntimeProfile::Counter* in_mem_merge_timer_;
+
/// Total size of the initial runs in bytes.
RuntimeProfile::Counter* sorted_data_size_;
@@ -353,6 +383,12 @@ class Sorter {
/// Flag to enforce sort_run_bytes_limit.
bool enforce_sort_run_bytes_limit_ = false;
+
+ /// Maximum number of fixed-length + variable-length pages in an in-memory
run set by
+ /// Query Option MAX_SORT_RUN_SIZE.
+ /// The default value is 0 which means that only 1 in-memory run will be
created, and
+ /// its size will be determined by other limits eg. memory or
sort_run_bytes_limit.
+ int inmem_run_max_pages_ = 0;
};
} // namespace impala
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index da959234d..af0a74463 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1099,6 +1099,15 @@ Status impala::SetQueryOption(const string& key, const
string& value,
query_options->__set_max_fragment_instances_per_node(max_num);
break;
}
+ case TImpalaQueryOptions::MAX_SORT_RUN_SIZE: {
+ int32_t int32_t_val = 0;
+ RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+ option, value, &int32_t_val));
+ RETURN_IF_ERROR(
+ QueryOptionValidator<int32_t>::NotEquals(option, int32_t_val, 1));
+ query_options->__set_max_sort_run_size(int32_t_val);
+ break;
+ }
default:
if (IsRemovedQueryOption(key)) {
LOG(WARNING) << "Ignoring attempt to set removed query option '" <<
key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index db47664ec..f88e5fc74 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string,
beeswax::TQueryOptionLevel::type>
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
#define QUERY_OPTS_TABLE
\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),
\
- TImpalaQueryOptions::MAX_FRAGMENT_INSTANCES_PER_NODE + 1);
\
+ TImpalaQueryOptions::MAX_SORT_RUN_SIZE + 1);
\
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded,
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)
\
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)
\
@@ -291,7 +291,8 @@ typedef std::unordered_map<string,
beeswax::TQueryOptionLevel::type>
QUERY_OPT_FN(join_selectivity_correlation_factor,
JOIN_SELECTIVITY_CORRELATION_FACTOR, \
TQueryOptionLevel::ADVANCED)
\
QUERY_OPT_FN(max_fragment_instances_per_node,
MAX_FRAGMENT_INSTANCES_PER_NODE, \
- TQueryOptionLevel::ADVANCED);
+ TQueryOptionLevel::ADVANCED);
\
+ QUERY_OPT_FN(max_sort_run_size, MAX_SORT_RUN_SIZE,
TQueryOptionLevel::DEVELOPMENT) ;
/// Enforce practical limits on some query options to avoid undesired query
state.
static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index c17253cc3..70d59d2c9 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -59,6 +59,8 @@ class ComparatorWrapper {
}
};
+class TupleRowComparator;
+
/// TupleRowComparatorConfig contains the static state initialized from its
corresponding
/// thrift structure. It serves as an input for creating instances of the
/// TupleRowComparator class.
@@ -102,8 +104,8 @@ class TupleRowComparatorConfig {
std::vector<int8_t> nulls_first_;
/// Codegened version of TupleRowComparator::Compare().
- typedef int (*CompareFn)(ScalarExprEvaluator* const*, ScalarExprEvaluator*
const*,
- const TupleRow*, const TupleRow*);
+ typedef int (*CompareFn)(const TupleRowComparator*, ScalarExprEvaluator*
const*,
+ ScalarExprEvaluator* const*, const TupleRow*, const TupleRow*);
CodegenFnPtr<CompareFn> codegend_compare_fn_;
private:
@@ -156,8 +158,19 @@ class TupleRowComparator {
/// hot loops.
bool ALWAYS_INLINE Less(const TupleRow* lhs, const TupleRow* rhs) const {
return Compare(
- ordering_expr_evals_lhs_.data(),
ordering_expr_evals_rhs_.data(), lhs, rhs)
- < 0;
+ ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(), lhs,
rhs) < 0;
+ }
+
+ bool ALWAYS_INLINE LessCodegend(const TupleRow* lhs, const TupleRow* rhs)
const {
+ if (codegend_compare_fn_non_atomic_ != nullptr) {
+ return codegend_compare_fn_non_atomic_(this,
+ ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(),
lhs, rhs) < 0;
+ } else {
+ TupleRowComparatorConfig::CompareFn fn = codegend_compare_fn_.load();
+ if (fn != nullptr) codegend_compare_fn_non_atomic_ = fn;
+ }
+ return Compare(
+ ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(), lhs,
rhs) < 0;
}
bool ALWAYS_INLINE Less(const Tuple* lhs, const Tuple* rhs) const {
@@ -197,6 +210,7 @@ class TupleRowComparator {
/// Reference to the codegened function pointer owned by the
TupleRowComparatorConfig
/// object that was used to create this instance.
const CodegenFnPtr<TupleRowComparatorConfig::CompareFn>&
codegend_compare_fn_;
+ mutable TupleRowComparatorConfig::CompareFn codegend_compare_fn_non_atomic_
= nullptr;
private:
/// Interpreted implementation of Compare().
diff --git a/bin/perf_tools/perf-query.sh b/bin/perf_tools/perf-query.sh
index 8e44491d6..3e6ace7d1 100755
--- a/bin/perf_tools/perf-query.sh
+++ b/bin/perf_tools/perf-query.sh
@@ -64,7 +64,7 @@ sudo echo "test sudo"
sudo perf record -F 99 -g -a &
perf_pid=$!
-~/Impala/bin/impala-shell.sh -q "$1"
+${IMPALA_HOME}/bin/impala-shell.sh -q "$1"
# Send interrupt to 'perf record'. We need to issue 'kill' in a new
session/process
# group via 'setsid', otherwise 'perf record' won't get the signal (because
it's
diff --git a/common/thrift/ImpalaService.thrift
b/common/thrift/ImpalaService.thrift
index e501d9cc2..26651a00f 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -785,6 +785,16 @@ enum TImpalaQueryOptions {
// PROCESSING_COST_MIN_THREADS option has higher value.
// Valid values are in [1, 128]. Default to 128.
MAX_FRAGMENT_INSTANCES_PER_NODE = 156
+
+ // Configures the in-memory sort algorithm used in the sorter. Determines the
+ // maximum number of pages in an initial in-memory run (fixed + variable
length).
+ // 0 means unlimited, which will create 1 big run with no in-memory merge
phase.
+ // Setting any other other value can create multiple miniruns which leads to
an
+ // in-memory merge phase. The minimum value in that case is 2.
+ // Generally, with larger workloads the recommended value is 10 or more to
avoid
+ // high fragmentation of variable length data.
+ MAX_SORT_RUN_SIZE = 157;
+
}
// The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index f482d21b5..2e0157a96 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -614,6 +614,7 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift
150: optional bool disable_codegen_cache = false;
+
151: optional TCodeGenCacheMode codegen_cache_mode =
TCodeGenCacheMode.NORMAL;
// See comment in ImpalaService.thrift
@@ -633,6 +634,10 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift
157: optional i32 max_fragment_instances_per_node =
MAX_FRAGMENT_INSTANCES_PER_NODE
+
+ // Configures the in-memory sort algorithm used in the sorter.
+ // See comment in ImpalaService.thrift
+ 158: optional i32 max_sort_run_size = 0;
}
// Impala currently has three types of sessions: Beeswax, HiveServer2 and
external
diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py
index e536b0134..82e43a78f 100644
--- a/tests/query_test/test_sort.py
+++ b/tests/query_test/test_sort.py
@@ -21,6 +21,10 @@ from copy import copy, deepcopy
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfNotHdfsMinicluster
+from tests.common.test_vector import ImpalaTestDimension
+
+# Run sizes (number of pages per run) in sorter
+MAX_SORT_RUN_SIZE = [0, 2, 20]
def split_result_rows(result):
@@ -55,6 +59,8 @@ class TestQueryFullSort(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
super(TestQueryFullSort, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size',
+ *MAX_SORT_RUN_SIZE))
if cls.exploration_strategy() == 'core':
cls.ImpalaTestMatrix.add_constraint(lambda v:\
@@ -229,10 +235,21 @@ class TestQueryFullSort(ImpalaTestSuite):
class TestRandomSort(ImpalaTestSuite):
@classmethod
def get_workload(self):
- return 'functional'
+ return 'functional-query'
+
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestRandomSort, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size',
+ *MAX_SORT_RUN_SIZE))
+
+ if cls.exploration_strategy() == 'core':
+ cls.ImpalaTestMatrix.add_constraint(lambda v:
+ v.get_value('table_format').file_format == 'parquet')
- def test_order_by_random(self):
+ def test_order_by_random(self, vector):
"""Tests that 'order by random()' works as expected."""
+ exec_option = copy(vector.get_value('exec_option'))
# "order by random()" with different seeds should produce different
orderings.
seed_query = "select * from functional.alltypestiny order by random(%s)"
results_seed0 = self.execute_query(seed_query % "0")
@@ -242,8 +259,8 @@ class TestRandomSort(ImpalaTestSuite):
# Include "random()" in the select list to check that it's sorted
correctly.
results = transpose_results(self.execute_query(
- "select random() as r from functional.alltypessmall order by r").data,
- lambda x: float(x))
+ "select random() as r from functional.alltypessmall order by r",
+ exec_option).data, lambda x: float(x))
assert(results[0] == sorted(results[0]))
# Like above, but with a limit.
@@ -254,22 +271,40 @@ class TestRandomSort(ImpalaTestSuite):
# "order by random()" inside an inline view.
query = "select r from (select random() r from functional.alltypessmall) v
order by r"
- results = transpose_results(self.execute_query(query).data, lambda x:
float(x))
+ results = transpose_results(self.execute_query(query, exec_option).data,
+ lambda x: float(x))
assert (results == sorted(results))
- def test_analytic_order_by_random(self):
+ def test_analytic_order_by_random(self, vector):
"""Tests that a window function over 'order by random()' works as
expected."""
+ exec_option = copy(vector.get_value('exec_option'))
# Since we use the same random seed, the results should be returned in
order.
query = """select last_value(rand(2)) over (order by rand(2)) from
functional.alltypestiny"""
- results = transpose_results(self.execute_query(query).data, lambda x:
float(x))
+ results = transpose_results(self.execute_query(query, exec_option).data,
+ lambda x: float(x))
assert (results == sorted(results))
+
class TestPartialSort(ImpalaTestSuite):
"""Test class to do functional validation of partial sorts."""
- def test_partial_sort_min_reservation(self, unique_database):
+ @classmethod
+ def get_workload(self):
+ return 'tpch'
+
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestPartialSort, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size',
+ *MAX_SORT_RUN_SIZE))
+
+ if cls.exploration_strategy() == 'core':
+ cls.ImpalaTestMatrix.add_constraint(lambda v:
+ v.get_value('table_format').file_format == 'parquet')
+
+ def test_partial_sort_min_reservation(self, vector, unique_database):
"""Test that the partial sort node can operate if it only gets its minimum
memory reservation."""
table_name = "%s.kudu_test" % unique_database
@@ -277,10 +312,36 @@ class TestPartialSort(ImpalaTestSuite):
"debug_action", "-1:OPEN:[email protected]")
self.execute_query("""create table %s (col0 string primary key)
partition by hash(col0) partitions 8 stored as kudu""" % table_name)
+ exec_option = copy(vector.get_value('exec_option'))
result = self.execute_query(
- "insert into %s select string_col from functional.alltypessmall" %
table_name)
+ "insert into %s select string_col from functional.alltypessmall" %
table_name,
+ exec_option)
assert "PARTIAL SORT" in result.runtime_profile, result.runtime_profile
+ def test_partial_sort_kudu_insert(self, vector, unique_database):
+ table_name = "%s.kudu_partial_sort_test" % unique_database
+ self.execute_query("""create table %s (l_linenumber INT, l_orderkey BIGINT,
+ l_partkey BIGINT, l_shipdate STRING, l_quantity DECIMAL(12,2),
+ l_comment STRING, PRIMARY KEY(l_linenumber, l_orderkey) )
+ PARTITION BY RANGE (l_linenumber)
+ (
+ PARTITION VALUE = 1,
+ PARTITION VALUE = 2,
+ PARTITION VALUE = 3,
+ PARTITION VALUE = 4,
+ PARTITION VALUE = 5,
+ PARTITION VALUE = 6,
+ PARTITION VALUE = 7
+ )
+ STORED AS KUDU""" % table_name)
+ exec_option = copy(vector.get_value('exec_option'))
+ result = self.execute_query(
+ """insert into %s SELECT l_linenumber, l_orderkey, l_partkey,
l_shipdate,
+ l_quantity, l_comment FROM tpch.lineitem limit 300000""" % table_name,
+ exec_option)
+ assert "NumModifiedRows: 300000" in result.runtime_profile,
result.runtime_profile
+ assert "NumRowErrors: 0" in result.runtime_profile, result.runtime_profile
+
class TestArraySort(ImpalaTestSuite):
"""Tests where there are arrays in the sorting tuple."""
@@ -292,7 +353,8 @@ class TestArraySort(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
super(TestArraySort, cls).add_test_dimensions()
-
+ cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size',
+ *MAX_SORT_RUN_SIZE))
# The table we use is a parquet table.
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')