Repository: incubator-impala Updated Branches: refs/heads/master b14ca6d09 -> 37ec25396
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/runtime/sorter.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h index 397e41c..16b55c2 100644 --- a/be/src/runtime/sorter.h +++ b/be/src/runtime/sorter.h @@ -46,30 +46,31 @@ class RowBatch; /// AddBatch(), InputDone() and GetNext() must be called in that order. // /// Batches of input rows are collected into a sequence of pinned BufferedBlockMgr blocks -/// called a run. The maximum size of a run is determined by the maximum available buffers -/// in the block manager. After the run is full, it is sorted in memory, unpinned and the -/// next run is collected. The variable-length column data (e.g. string slots) in the -/// materialized sort tuples are stored in separate sequence of blocks from the tuples -/// themselves. -/// When the blocks containing tuples in a run are unpinned, the var-len slot pointers are -/// converted to offsets from the start of the first var-len data block. When a block is -/// read back, these offsets are converted back to pointers. +/// called a run. The maximum size of a run is determined by the number of blocks that +/// can be pinned by the Sorter. After the run is full, it is sorted in memory, unpinned +/// and the next run is constructed. The variable-length column data (e.g. string slots) +/// in the materialized sort tuples are stored in a separate sequence of blocks from the +/// tuples themselves. When the blocks containing tuples in a run are unpinned, the +/// var-len slot pointers are converted to offsets from the start of the first var-len +/// data block. When a block is read back, these offsets are converted back to pointers. /// The in-memory sorter sorts the fixed-length tuples in-place. The output rows have the /// same schema as the materialized sort tuples. // -/// After the input is consumed, the sorter is left with one or more sorted runs. The -/// client calls GetNext(output_batch) to retrieve batches of sorted rows. If there are -/// multiple runs, the runs are merged using SortedRunMerger to produce a stream of sorted -/// tuples. At least one block per run (two if there are var-length slots) must be pinned -/// in memory during a merge, so multiple merges may be necessary if the number of runs is -/// too large. During a merge, rows from multiple sorted input runs are compared and copied -/// into a single larger run. One input batch is created to hold tuple rows for each -/// input run, and one batch is created to hold deep copied rows (i.e. ptrs + data) from -/// the output of the merge. -// +/// After the input is consumed, the sorter is left with one or more sorted runs. If +/// there are multiple runs, the runs are merged using SortedRunMerger. At least one +/// block per run (two if there are var-length slots) must be pinned in memory during +/// a merge, so multiple merges may be necessary if the number of runs is too large. +/// First a series of intermediate merges are performed, until the number of runs is +/// small enough to do a single final merge that returns batches of sorted rows to the +/// caller of GetNext(). +/// /// If there is a single sorted run (i.e. no merge required), only tuple rows are -/// copied into the output batch supplied by GetNext, and the data itself is left in +/// copied into the output batch supplied by GetNext(), and the data itself is left in /// pinned blocks held by the sorter. +/// +/// When merges are performed, one input batch is created to hold tuple rows for each +/// input run, and one batch is created to hold deep copied rows (i.e. ptrs + data) from +/// the output of the merge. // /// Note that Init() must be called right after the constructor. // @@ -115,19 +116,12 @@ class Sorter { /// may or may not have been called. Status Reset(); - /// Estimate the memory overhead in bytes for an intermediate merge, based on the - /// maximum number of memory buffers available for the sort, the row descriptor for - /// the sorted tuples and the batch size used (in rows). - /// This is a pessimistic estimate of the memory needed by the sorter in addition to the - /// memory used by the block buffer manager. The memory overhead is 0 if the input fits - /// in memory. Merges incur additional memory overhead because row batches are created - /// to hold tuple rows from the input runs, and the merger itself deep-copies - /// sort-merged rows into its output batch. - static uint64_t EstimateMergeMem(uint64_t available_blocks, RowDescriptor* row_desc, - int merge_batch_size); + /// Close the Sorter and free resources. + void Close(); private: class Run; + class TupleIterator; class TupleSorter; /// Create a SortedRunMerger from the first 'num_runs' sorted runs in sorted_runs_ and @@ -144,9 +138,12 @@ class Sorter { /// containing deep copied rows is used for the output of each intermediate merge. Status MergeIntermediateRuns(); - /// Sorts unsorted_run_ and appends it to the list of sorted runs. Deletes any empty - /// blocks at the end of the run. Updates the sort bytes counter if necessary. - Status SortRun(); + /// Called once there no more rows to be added to 'unsorted_run_'. Sorts + /// 'unsorted_run_' and appends it to the list of sorted runs. + Status SortCurrentInputRun(); + + /// Helper that cleans up all runs in the sorter. + void CleanupAllRuns(); /// Runtime state instance used to check for cancellation. Not owned. RuntimeState* const state_; @@ -186,7 +183,7 @@ class Sorter { /// List of sorted runs that have been produced but not merged. unsorted_run_ is added /// to this list after an in-memory sort. Sorted runs produced by intermediate merges - /// are also added to this list. Runs are added to the object pool. + /// are also added to this list during the merge. Runs are added to the object pool. std::deque<Run*> sorted_runs_; /// Merger object (intermediate or final) currently used to produce sorted runs. @@ -206,9 +203,21 @@ class Sorter { /// Runtime profile and counters for this sorter instance. RuntimeProfile* profile_; + + /// Number of initial runs created. RuntimeProfile::Counter* initial_runs_counter_; + + /// Number of runs that were unpinned and may have spilled to disk, including initial + /// and intermediate runs. + RuntimeProfile::Counter* spilled_runs_counter_; + + /// Number of merges of sorted runs. RuntimeProfile::Counter* num_merges_counter_; + + /// Time spent sorting initial runs in memory. RuntimeProfile::Counter* in_mem_sort_timer_; + + /// Total size of the initial runs in bytes. RuntimeProfile::Counter* sorted_data_size_; }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/runtime/tuple-row.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tuple-row.h b/be/src/runtime/tuple-row.h index 29d0f92..319a37c 100644 --- a/be/src/runtime/tuple-row.h +++ b/be/src/runtime/tuple-row.h @@ -28,7 +28,7 @@ namespace impala { /// together make up a row. class TupleRow { public: - Tuple* ALWAYS_INLINE GetTuple(int tuple_idx) { + Tuple* ALWAYS_INLINE GetTuple(int tuple_idx) const { return tuples_[tuple_idx]; } @@ -37,7 +37,7 @@ class TupleRow { } /// Create a deep copy of this TupleRow. DeepCopy will allocate from the pool. - TupleRow* DeepCopy(const std::vector<TupleDescriptor*>& descs, MemPool* pool) { + TupleRow* DeepCopy(const std::vector<TupleDescriptor*>& descs, MemPool* pool) const { int size = descs.size() * sizeof(Tuple*); TupleRow* result = reinterpret_cast<TupleRow*>(pool->Allocate(size)); DeepCopy(result, descs, pool, false); @@ -51,7 +51,7 @@ class TupleRow { /// tuple memory and that memory will be reused. Otherwise, new tuples will be allocated /// and stored in 'dst'. void DeepCopy(TupleRow* dst, const std::vector<TupleDescriptor*>& descs, MemPool* pool, - bool reuse_tuple_mem) { + bool reuse_tuple_mem) const { for (int i = 0; i < descs.size(); ++i) { if (this->GetTuple(i) != NULL) { if (reuse_tuple_mem && dst->GetTuple(i) != NULL) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/util/tuple-row-compare.h ---------------------------------------------------------------------- diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h index d20d864..13101a9 100644 --- a/be/src/util/tuple-row-compare.h +++ b/be/src/util/tuple-row-compare.h @@ -16,6 +16,7 @@ #ifndef IMPALA_UTIL_TUPLE_ROW_COMPARE_H_ #define IMPALA_UTIL_TUPLE_ROW_COMPARE_H_ +#include "common/compiler-util.h" #include "exec/sort-exec-exprs.h" #include "exprs/expr.h" #include "exprs/expr-context.h" @@ -91,7 +92,7 @@ class TupleRowComparator { /// than rhs, or 0 if they are equal. All exprs (key_exprs_lhs_ and key_exprs_rhs_) must /// have been prepared and opened before calling this, i.e. 'sort_key_exprs' in the /// constructor must have been opened. - int Compare(TupleRow* lhs, TupleRow* rhs) const { + int Compare(const TupleRow* lhs, const TupleRow* rhs) const { DCHECK_EQ(key_expr_ctxs_lhs_.size(), key_expr_ctxs_rhs_.size()); for (int i = 0; i < key_expr_ctxs_lhs_.size(); ++i) { void* lhs_value = key_expr_ctxs_lhs_[i]->GetValue(lhs); @@ -114,14 +115,16 @@ class TupleRowComparator { /// Returns true if lhs is strictly less than rhs. /// All exprs (key_exprs_lhs_ and key_exprs_rhs_) must have been prepared and opened /// before calling this. - bool Less(TupleRow* lhs, TupleRow* rhs) const { + /// Force inlining because it tends not to be always inlined at callsites, even in + /// hot loops. + bool ALWAYS_INLINE Less(const TupleRow* lhs, const TupleRow* rhs) const { int result = codegend_compare_fn_ == NULL ? Compare(lhs, rhs) : (*codegend_compare_fn_)(&key_expr_ctxs_lhs_[0], &key_expr_ctxs_rhs_[0], lhs, rhs); if (result < 0) return true; return false; } - bool Less(Tuple* lhs, Tuple* rhs) const { + bool ALWAYS_INLINE Less(const Tuple* lhs, const Tuple* rhs) const { TupleRow* lhs_row = reinterpret_cast<TupleRow*>(&lhs); TupleRow* rhs_row = reinterpret_cast<TupleRow*>(&rhs); return Less(lhs_row, rhs_row); @@ -150,8 +153,8 @@ class TupleRowComparator { /// TupleRowComparator is copied before the module is compiled, the copy will still have /// its function pointer set to NULL. The function pointer is allocated from the runtime /// state's object pool so that its lifetime will be >= that of any copies. - typedef int (*CompareFn)(ExprContext* const*, ExprContext* const*, TupleRow*, - TupleRow*); + typedef int (*CompareFn)(ExprContext* const*, ExprContext* const*, const TupleRow*, + const TupleRow*); CompareFn* codegend_compare_fn_; }; @@ -200,7 +203,7 @@ struct RowEqualityChecker { } } - bool Equal(TupleRow* x, TupleRow* y) { + bool Equal(const TupleRow* x, const TupleRow* y) { for (int i = 0; i < tuple_checkers_.size(); ++i) { Tuple* x_tuple = x->GetTuple(i); Tuple* y_tuple = y->GetTuple(i); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test index 26dde07..02feb31 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test +++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test @@ -1583,16 +1583,34 @@ from functional.alltypestiny order by id int, bigint, bigint, double ==== ---- QUERY -# Regression test for IMPALA-2265, IMPALA-2559. The mem_limit is tuned to reproduce the -# issue when running this query against functional_parquet. -SET mem_limit=132m; +# Regression test for IMPALA-2265, IMPALA-2559. The max_block_mgr_memory is tuned to +# reproduce the issue when running this query against functional_parquet. +SET max_block_mgr_memory=36m; SELECT lag(-180, 13) over (ORDER BY t1.int_col ASC, t2.int_col ASC) AS int_col FROM functional_parquet.alltypes t1 CROSS JOIN functional_parquet.alltypes t2 LIMIT 10; -SET mem_limit=; ---- CATCH Memory limit exceeded ==== ---- QUERY +# Check that the above query can succeed with more buffers. +SET max_block_mgr_memory=64m; +SELECT lag(-180, 13) over (ORDER BY t1.int_col ASC, t2.int_col ASC) AS int_col +FROM functional_parquet.alltypes t1 CROSS JOIN functional_parquet.alltypes t2 LIMIT 10; +---- TYPES +SMALLINT +---- RESULTS +NULL +NULL +NULL +NULL +NULL +NULL +NULL +NULL +NULL +NULL +==== +---- QUERY # IMPALA-2906: Tests that an analytic sort does not attempt to materialize # a constant expr wrapped in a TupleIsNullPredicate that is not executable # at the analytic sort. Here 'y' is wrapped in a TupleIsNullPredicate, and http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/testdata/workloads/functional-query/queries/QueryTest/single-node-large-sorts.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/single-node-large-sorts.test b/testdata/workloads/functional-query/queries/QueryTest/single-node-large-sorts.test index c2fbde5..8000ad3 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/single-node-large-sorts.test +++ b/testdata/workloads/functional-query/queries/QueryTest/single-node-large-sorts.test @@ -30,4 +30,5 @@ STRING,STRING,STRING 'Faaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa','','' ---- RUNTIME_PROFILE row_regex: .* TotalMergesPerformed: [^0] .* +row_regex: .* SpilledRuns: [^0] .* ==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/testdata/workloads/functional-query/queries/QueryTest/sort.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/sort.test b/testdata/workloads/functional-query/queries/QueryTest/sort.test index b957939..0fe7efa 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/sort.test +++ b/testdata/workloads/functional-query/queries/QueryTest/sort.test @@ -4107,3 +4107,37 @@ select 1 from functional.alltypestiny order by 1 ---- TYPES TINYINT ==== +---- QUERY +# Test for corner case: fixed length data but only NULL var-len slots. +select id, cast(NULL as string) str +from functional.alltypestiny +order by 2, 1 +---- RESULTS +0,'NULL' +1,'NULL' +2,'NULL' +3,'NULL' +4,'NULL' +5,'NULL' +6,'NULL' +7,'NULL' +---- TYPES +INT, STRING +==== +---- QUERY +# Test for corner case: fixed length data but only zero-length var len data. +select id, '' +from functional.alltypestiny +order by 2 desc, 1 desc +---- RESULTS +7,'' +6,'' +5,'' +4,'' +3,'' +2,'' +1,'' +0,'' +---- TYPES +INT, STRING +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/testdata/workloads/functional-query/queries/QueryTest/spilling.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test index c35ddec..f462435 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test @@ -427,6 +427,7 @@ limit 20; CHAR ---- RUNTIME_PROFILE # Verify that the sort actually spilled +row_regex: .*SpilledRuns: .* \([1-9][0-9]*\) row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\) ==== ---- QUERY @@ -465,6 +466,7 @@ limit 20; CHAR ---- RUNTIME_PROFILE # Verify that the sort actually spilled +row_regex: .*SpilledRuns: .* \([1-9][0-9]*\) row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\) ==== ---- QUERY @@ -504,6 +506,7 @@ limit 20; CHAR ---- RUNTIME_PROFILE # Verify that the sort actually spilled +row_regex: .*SpilledRuns: .* \([1-9][0-9]*\) row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\) ==== ---- QUERY @@ -543,6 +546,7 @@ limit 20; STRING ---- RUNTIME_PROFILE # Verify that the sort actually spilled +row_regex: .*SpilledRuns: .* \([1-9][0-9]*\) row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\) ==== ---- QUERY @@ -582,5 +586,6 @@ limit 20; STRING ---- RUNTIME_PROFILE # Verify that the sort actually spilled +row_regex: .*SpilledRuns: .* \([1-9][0-9]*\) row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\) ==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/tests/query_test/test_sort.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py index ad99aef..281957a 100644 --- a/tests/query_test/test_sort.py +++ b/tests/query_test/test_sort.py @@ -77,7 +77,6 @@ class TestQueryFullSort(ImpalaTestSuite): assert(result[0] == sorted(result[0])) def test_sort_union(self, vector): - pytest.xfail(reason="IMPALA-1346") query = """select o_orderdate, o_custkey, o_comment from (select * from orders union select * from orders union all select * from orders) as i order by o_orderdate limit 100000""" @@ -116,3 +115,27 @@ class TestQueryFullSort(ImpalaTestSuite): query, exec_option, table_format=table_format).data) numeric_results = [int(val) for val in result[0]] assert(numeric_results == sorted(numeric_results)) + + def test_spill_empty_strings(self, vector): + """Test corner case of spilling sort with only empty strings. Spilling with var len + slots typically means the sort must reorder blocks and convert pointers, but this case + has to be handled differently because there are no var len blocks to point into.""" + + query = """ + select empty, l_orderkey, l_partkey, l_suppkey, + l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax + from (select substr(l_comment, 1000, 0) empty, * from lineitem) t + order by empty, l_orderkey, l_partkey, l_suppkey, l_linenumber + limit 100000 + """ + + exec_option = vector.get_value('exec_option') + exec_option['disable_outermost_topn'] = 1 + exec_option['max_block_mgr_memory'] = "256m" + exec_option['num_nodes'] = "1" + table_format = vector.get_value('table_format') + + result = transpose_results(self.execute_query( + query, exec_option, table_format=table_format).data) + assert(result[0] == sorted(result[0])) +
