Repository: incubator-impala
Updated Branches:
  refs/heads/master b14ca6d09 -> 37ec25396


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/runtime/sorter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index 397e41c..16b55c2 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -46,30 +46,31 @@ class RowBatch;
 /// AddBatch(), InputDone() and GetNext() must be called in that order.
 //
 /// Batches of input rows are collected into a sequence of pinned 
BufferedBlockMgr blocks
-/// called a run. The maximum size of a run is determined by the maximum 
available buffers
-/// in the block manager. After the run is full, it is sorted in memory, 
unpinned and the
-/// next run is collected.  The variable-length column data (e.g. string 
slots) in the
-/// materialized sort tuples are stored in separate sequence of blocks from 
the tuples
-/// themselves.
-/// When the blocks containing tuples in a run are unpinned, the var-len slot 
pointers are
-/// converted to offsets from the start of the first var-len data block. When 
a block is
-/// read back, these offsets are converted back to pointers.
+/// called a run. The maximum size of a run is determined by the number of 
blocks that
+/// can be pinned by the Sorter. After the run is full, it is sorted in 
memory, unpinned
+/// and the next run is constructed. The variable-length column data (e.g. 
string slots)
+/// in the materialized sort tuples are stored in a separate sequence of 
blocks from the
+/// tuples themselves.  When the blocks containing tuples in a run are 
unpinned, the
+/// var-len slot pointers are converted to offsets from the start of the first 
var-len
+/// data block. When a block is read back, these offsets are converted back to 
pointers.
 /// The in-memory sorter sorts the fixed-length tuples in-place. The output 
rows have the
 /// same schema as the materialized sort tuples.
 //
-/// After the input is consumed, the sorter is left with one or more sorted 
runs. The
-/// client calls GetNext(output_batch) to retrieve batches of sorted rows. If 
there are
-/// multiple runs, the runs are merged using SortedRunMerger to produce a 
stream of sorted
-/// tuples. At least one block per run (two if there are var-length slots) 
must be pinned
-/// in memory during a merge, so multiple merges may be necessary if the 
number of runs is
-/// too large. During a merge, rows from multiple sorted input runs are 
compared and copied
-/// into a single larger run. One input batch is created to hold tuple rows 
for each
-/// input run, and one batch is created to hold deep copied rows (i.e. ptrs + 
data) from
-/// the output of the merge.
-//
+/// After the input is consumed, the sorter is left with one or more sorted 
runs. If
+/// there are multiple runs, the runs are merged using SortedRunMerger. At 
least one
+/// block per run (two if there are var-length slots) must be pinned in memory 
during
+/// a merge, so multiple merges may be necessary if the number of runs is too 
large.
+/// First a series of intermediate merges are performed, until the number of 
runs is
+/// small enough to do a single final merge that returns batches of sorted 
rows to the
+/// caller of GetNext().
+///
 /// If there is a single sorted run (i.e. no merge required), only tuple rows 
are
-/// copied into the output batch supplied by GetNext, and the data itself is 
left in
+/// copied into the output batch supplied by GetNext(), and the data itself is 
left in
 /// pinned blocks held by the sorter.
+///
+/// When merges are performed, one input batch is created to hold tuple rows 
for each
+/// input run, and one batch is created to hold deep copied rows (i.e. ptrs + 
data) from
+/// the output of the merge.
 //
 /// Note that Init() must be called right after the constructor.
 //
@@ -115,19 +116,12 @@ class Sorter {
   /// may or may not have been called.
   Status Reset();
 
-  /// Estimate the memory overhead in bytes for an intermediate merge, based 
on the
-  /// maximum number of memory buffers available for the sort, the row 
descriptor for
-  /// the sorted tuples and the batch size used (in rows).
-  /// This is a pessimistic estimate of the memory needed by the sorter in 
addition to the
-  /// memory used by the block buffer manager. The memory overhead is 0 if the 
input fits
-  /// in memory. Merges incur additional memory overhead because row batches 
are created
-  /// to hold tuple rows from the input runs, and the merger itself deep-copies
-  /// sort-merged rows into its output batch.
-  static uint64_t EstimateMergeMem(uint64_t available_blocks, RowDescriptor* 
row_desc,
-      int merge_batch_size);
+  /// Close the Sorter and free resources.
+  void Close();
 
  private:
   class Run;
+  class TupleIterator;
   class TupleSorter;
 
   /// Create a SortedRunMerger from the first 'num_runs' sorted runs in 
sorted_runs_ and
@@ -144,9 +138,12 @@ class Sorter {
   /// containing deep copied rows is used for the output of each intermediate 
merge.
   Status MergeIntermediateRuns();
 
-  /// Sorts unsorted_run_ and appends it to the list of sorted runs. Deletes 
any empty
-  /// blocks at the end of the run. Updates the sort bytes counter if 
necessary.
-  Status SortRun();
+  /// Called once there no more rows to be added to 'unsorted_run_'. Sorts
+  /// 'unsorted_run_' and appends it to the list of sorted runs.
+  Status SortCurrentInputRun();
+
+  /// Helper that cleans up all runs in the sorter.
+  void CleanupAllRuns();
 
   /// Runtime state instance used to check for cancellation. Not owned.
   RuntimeState* const state_;
@@ -186,7 +183,7 @@ class Sorter {
 
   /// List of sorted runs that have been produced but not merged. 
unsorted_run_ is added
   /// to this list after an in-memory sort. Sorted runs produced by 
intermediate merges
-  /// are also added to this list. Runs are added to the object pool.
+  /// are also added to this list during the merge. Runs are added to the 
object pool.
   std::deque<Run*> sorted_runs_;
 
   /// Merger object (intermediate or final) currently used to produce sorted 
runs.
@@ -206,9 +203,21 @@ class Sorter {
 
   /// Runtime profile and counters for this sorter instance.
   RuntimeProfile* profile_;
+
+  /// Number of initial runs created.
   RuntimeProfile::Counter* initial_runs_counter_;
+
+  /// Number of runs that were unpinned and may have spilled to disk, 
including initial
+  /// and intermediate runs.
+  RuntimeProfile::Counter* spilled_runs_counter_;
+
+  /// Number of merges of sorted runs.
   RuntimeProfile::Counter* num_merges_counter_;
+
+  /// Time spent sorting initial runs in memory.
   RuntimeProfile::Counter* in_mem_sort_timer_;
+
+  /// Total size of the initial runs in bytes.
   RuntimeProfile::Counter* sorted_data_size_;
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/runtime/tuple-row.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple-row.h b/be/src/runtime/tuple-row.h
index 29d0f92..319a37c 100644
--- a/be/src/runtime/tuple-row.h
+++ b/be/src/runtime/tuple-row.h
@@ -28,7 +28,7 @@ namespace impala {
 /// together make up a row.
 class TupleRow {
  public:
-  Tuple* ALWAYS_INLINE GetTuple(int tuple_idx) {
+  Tuple* ALWAYS_INLINE GetTuple(int tuple_idx) const {
     return tuples_[tuple_idx];
   }
 
@@ -37,7 +37,7 @@ class TupleRow {
   }
 
   /// Create a deep copy of this TupleRow.  DeepCopy will allocate from  the 
pool.
-  TupleRow* DeepCopy(const std::vector<TupleDescriptor*>& descs, MemPool* 
pool) {
+  TupleRow* DeepCopy(const std::vector<TupleDescriptor*>& descs, MemPool* 
pool) const {
     int size = descs.size() * sizeof(Tuple*);
     TupleRow* result = reinterpret_cast<TupleRow*>(pool->Allocate(size));
     DeepCopy(result, descs, pool, false);
@@ -51,7 +51,7 @@ class TupleRow {
   /// tuple memory and that memory will be reused.  Otherwise, new tuples will 
be allocated
   /// and stored in 'dst'.
   void DeepCopy(TupleRow* dst, const std::vector<TupleDescriptor*>& descs, 
MemPool* pool,
-      bool reuse_tuple_mem) {
+      bool reuse_tuple_mem) const {
     for (int i = 0; i < descs.size(); ++i) {
       if (this->GetTuple(i) != NULL) {
         if (reuse_tuple_mem && dst->GetTuple(i) != NULL) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/util/tuple-row-compare.h
----------------------------------------------------------------------
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index d20d864..13101a9 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -16,6 +16,7 @@
 #ifndef IMPALA_UTIL_TUPLE_ROW_COMPARE_H_
 #define IMPALA_UTIL_TUPLE_ROW_COMPARE_H_
 
+#include "common/compiler-util.h"
 #include "exec/sort-exec-exprs.h"
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
@@ -91,7 +92,7 @@ class TupleRowComparator {
   /// than rhs, or 0 if they are equal. All exprs (key_exprs_lhs_ and 
key_exprs_rhs_) must
   /// have been prepared and opened before calling this, i.e. 'sort_key_exprs' 
in the
   /// constructor must have been opened.
-  int Compare(TupleRow* lhs, TupleRow* rhs) const {
+  int Compare(const TupleRow* lhs, const TupleRow* rhs) const {
     DCHECK_EQ(key_expr_ctxs_lhs_.size(), key_expr_ctxs_rhs_.size());
     for (int i = 0; i < key_expr_ctxs_lhs_.size(); ++i) {
       void* lhs_value = key_expr_ctxs_lhs_[i]->GetValue(lhs);
@@ -114,14 +115,16 @@ class TupleRowComparator {
   /// Returns true if lhs is strictly less than rhs.
   /// All exprs (key_exprs_lhs_ and key_exprs_rhs_) must have been prepared 
and opened
   /// before calling this.
-  bool Less(TupleRow* lhs, TupleRow* rhs) const {
+  /// Force inlining because it tends not to be always inlined at callsites, 
even in
+  /// hot loops.
+  bool ALWAYS_INLINE Less(const TupleRow* lhs, const TupleRow* rhs) const {
     int result = codegend_compare_fn_ == NULL ? Compare(lhs, rhs) :
         (*codegend_compare_fn_)(&key_expr_ctxs_lhs_[0], 
&key_expr_ctxs_rhs_[0], lhs, rhs);
     if (result < 0) return true;
     return false;
   }
 
-  bool Less(Tuple* lhs, Tuple* rhs) const {
+  bool ALWAYS_INLINE Less(const Tuple* lhs, const Tuple* rhs) const {
     TupleRow* lhs_row = reinterpret_cast<TupleRow*>(&lhs);
     TupleRow* rhs_row = reinterpret_cast<TupleRow*>(&rhs);
     return Less(lhs_row, rhs_row);
@@ -150,8 +153,8 @@ class TupleRowComparator {
   /// TupleRowComparator is copied before the module is compiled, the copy 
will still have
   /// its function pointer set to NULL. The function pointer is allocated from 
the runtime
   /// state's object pool so that its lifetime will be >= that of any copies.
-  typedef int (*CompareFn)(ExprContext* const*, ExprContext* const*, TupleRow*,
-      TupleRow*);
+  typedef int (*CompareFn)(ExprContext* const*, ExprContext* const*, const 
TupleRow*,
+      const TupleRow*);
   CompareFn* codegend_compare_fn_;
 };
 
@@ -200,7 +203,7 @@ struct RowEqualityChecker {
     }
   }
 
-  bool Equal(TupleRow* x, TupleRow* y) {
+  bool Equal(const TupleRow* x, const TupleRow* y) {
     for (int i = 0; i < tuple_checkers_.size(); ++i) {
       Tuple* x_tuple = x->GetTuple(i);
       Tuple* y_tuple = y->GetTuple(i);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test 
b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
index 26dde07..02feb31 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
@@ -1583,16 +1583,34 @@ from functional.alltypestiny order by id
 int, bigint, bigint, double
 ====
 ---- QUERY
-# Regression test for IMPALA-2265, IMPALA-2559. The mem_limit is tuned to 
reproduce the
-# issue when running this query against functional_parquet.
-SET mem_limit=132m;
+# Regression test for IMPALA-2265, IMPALA-2559. The max_block_mgr_memory is 
tuned to
+# reproduce the issue when running this query against functional_parquet.
+SET max_block_mgr_memory=36m;
 SELECT lag(-180, 13) over (ORDER BY t1.int_col ASC, t2.int_col ASC) AS int_col
 FROM functional_parquet.alltypes t1 CROSS JOIN functional_parquet.alltypes t2 
LIMIT 10;
-SET mem_limit=;
 ---- CATCH
 Memory limit exceeded
 ====
 ---- QUERY
+# Check that the above query can succeed with more buffers.
+SET max_block_mgr_memory=64m;
+SELECT lag(-180, 13) over (ORDER BY t1.int_col ASC, t2.int_col ASC) AS int_col
+FROM functional_parquet.alltypes t1 CROSS JOIN functional_parquet.alltypes t2 
LIMIT 10;
+---- TYPES
+SMALLINT
+---- RESULTS
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+====
+---- QUERY
 # IMPALA-2906: Tests that an analytic sort does not attempt to materialize
 # a constant expr wrapped in a TupleIsNullPredicate that is not executable
 # at the analytic sort. Here 'y' is wrapped in a TupleIsNullPredicate, and

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/testdata/workloads/functional-query/queries/QueryTest/single-node-large-sorts.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/single-node-large-sorts.test
 
b/testdata/workloads/functional-query/queries/QueryTest/single-node-large-sorts.test
index c2fbde5..8000ad3 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/single-node-large-sorts.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/single-node-large-sorts.test
@@ -30,4 +30,5 @@ STRING,STRING,STRING
 'Faaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa','',''
 ---- RUNTIME_PROFILE
 row_regex: .* TotalMergesPerformed: [^0] .*
+row_regex: .* SpilledRuns: [^0] .*
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/testdata/workloads/functional-query/queries/QueryTest/sort.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/sort.test 
b/testdata/workloads/functional-query/queries/QueryTest/sort.test
index b957939..0fe7efa 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/sort.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/sort.test
@@ -4107,3 +4107,37 @@ select 1 from functional.alltypestiny order by 1
 ---- TYPES
 TINYINT
 ====
+---- QUERY
+# Test for corner case: fixed length data but only NULL var-len slots.
+select id, cast(NULL as string) str
+from functional.alltypestiny
+order by 2, 1
+---- RESULTS
+0,'NULL'
+1,'NULL'
+2,'NULL'
+3,'NULL'
+4,'NULL'
+5,'NULL'
+6,'NULL'
+7,'NULL'
+---- TYPES
+INT, STRING
+====
+---- QUERY
+# Test for corner case: fixed length data but only zero-length var len data.
+select id, ''
+from functional.alltypestiny
+order by 2 desc, 1 desc
+---- RESULTS
+7,''
+6,''
+5,''
+4,''
+3,''
+2,''
+1,''
+0,''
+---- TYPES
+INT, STRING
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/testdata/workloads/functional-query/queries/QueryTest/spilling.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/spilling.test 
b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index c35ddec..f462435 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -427,6 +427,7 @@ limit 20;
 CHAR
 ---- RUNTIME_PROFILE
 # Verify that the sort actually spilled
+row_regex: .*SpilledRuns: .* \([1-9][0-9]*\)
 row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\)
 ====
 ---- QUERY
@@ -465,6 +466,7 @@ limit 20;
 CHAR
 ---- RUNTIME_PROFILE
 # Verify that the sort actually spilled
+row_regex: .*SpilledRuns: .* \([1-9][0-9]*\)
 row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\)
 ====
 ---- QUERY
@@ -504,6 +506,7 @@ limit 20;
 CHAR
 ---- RUNTIME_PROFILE
 # Verify that the sort actually spilled
+row_regex: .*SpilledRuns: .* \([1-9][0-9]*\)
 row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\)
 ====
 ---- QUERY
@@ -543,6 +546,7 @@ limit 20;
 STRING
 ---- RUNTIME_PROFILE
 # Verify that the sort actually spilled
+row_regex: .*SpilledRuns: .* \([1-9][0-9]*\)
 row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\)
 ====
 ---- QUERY
@@ -582,5 +586,6 @@ limit 20;
 STRING
 ---- RUNTIME_PROFILE
 # Verify that the sort actually spilled
+row_regex: .*SpilledRuns: .* \([1-9][0-9]*\)
 row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\)
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/tests/query_test/test_sort.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py
index ad99aef..281957a 100644
--- a/tests/query_test/test_sort.py
+++ b/tests/query_test/test_sort.py
@@ -77,7 +77,6 @@ class TestQueryFullSort(ImpalaTestSuite):
     assert(result[0] == sorted(result[0]))
 
   def test_sort_union(self, vector):
-    pytest.xfail(reason="IMPALA-1346")
     query = """select o_orderdate, o_custkey, o_comment from (select * from 
orders union
     select * from orders union all select * from orders) as i
     order by o_orderdate limit 100000"""
@@ -116,3 +115,27 @@ class TestQueryFullSort(ImpalaTestSuite):
       query, exec_option, table_format=table_format).data)
     numeric_results = [int(val) for val in result[0]]
     assert(numeric_results == sorted(numeric_results))
+
+  def test_spill_empty_strings(self, vector):
+    """Test corner case of spilling sort with only empty strings. Spilling 
with var len
+    slots typically means the sort must reorder blocks and convert pointers, 
but this case
+    has to be handled differently because there are no var len blocks to point 
into."""
+
+    query = """
+    select empty, l_orderkey, l_partkey, l_suppkey,
+        l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax
+    from (select substr(l_comment, 1000, 0) empty, * from lineitem) t
+    order by empty, l_orderkey, l_partkey, l_suppkey, l_linenumber
+    limit 100000
+    """
+
+    exec_option = vector.get_value('exec_option')
+    exec_option['disable_outermost_topn'] = 1
+    exec_option['max_block_mgr_memory'] = "256m"
+    exec_option['num_nodes'] = "1"
+    table_format = vector.get_value('table_format')
+
+    result = transpose_results(self.execute_query(
+      query, exec_option, table_format=table_format).data)
+    assert(result[0] == sorted(result[0]))
+

Reply via email to