IMPALA-3375: Improve TopN performance with a trivial Compare object.

The C++ standard requires that priority_queue operations behave as
wrappers to {push,pop,make,sort}_heap, which take their comparator
object by value. This is expensive for objects like
TupleRowComparator.

This patch creates a wrapper type this is trivial to copy. It also
renames the operator()s of stateful comparator types to prevent their
accidental use in STL functions that take comparators by value.

This speeds up primitive_topn_bigint by 39% locally with scale factor
13 and 31% on the 16-node with scale factor 300. It speeds up
primitive_top-n_all by 13% locally and 7% on the 16-node.

Change-Id: I24755227b5bbbca6ad7c7d31d9bb8e132ca89e11
Reviewed-on: http://gerrit.cloudera.org:8080/2936
Reviewed-by: Jim Apple <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b204d5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b204d5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b204d5de

Branch: refs/heads/master
Commit: b204d5dea3212a81602aaf148a1960a0ecd47ccf
Parents: bff194c
Author: Jim Apple <[email protected]>
Authored: Tue Apr 19 19:00:33 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Thu May 12 14:17:58 2016 -0700

----------------------------------------------------------------------
 be/src/exec/topn-node.cc                  |  7 +++--
 be/src/exec/topn-node.h                   |  5 ++--
 be/src/runtime/sorted-run-merger.cc       | 12 ++++-----
 be/src/runtime/sorted-run-merger.h        |  6 ++---
 be/src/runtime/sorter.cc                  | 26 +++++++++----------
 be/src/service/impala-server-callbacks.cc |  2 +-
 be/src/service/impala-server.cc           |  2 +-
 be/src/service/impala-server.h            |  2 ++
 be/src/util/tuple-row-compare.h           | 36 +++++++++++++++++++++-----
 9 files changed, 61 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b204d5de/be/src/exec/topn-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index c6ef247..a0ad825 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -73,9 +73,8 @@ Status TopNNode::Prepare(RuntimeState* state) {
     codegen_enabled = codegen_status.ok();
   }
   AddCodegenExecOption(codegen_enabled, codegen_status);
-  priority_queue_.reset(
-      new priority_queue<Tuple*, vector<Tuple*>, TupleRowComparator>(
-          *tuple_row_less_than_));
+  priority_queue_.reset(new priority_queue<Tuple*, vector<Tuple*>,
+      ComparatorWrapper<TupleRowComparator> >(*tuple_row_less_than_));
   materialized_tuple_desc_ = row_descriptor_.tuple_descriptors()[0];
   return Status::OK();
 }
@@ -176,7 +175,7 @@ void TopNNode::InsertTupleRow(TupleRow* input_row) {
     Tuple* top_tuple = priority_queue_->top();
     tmp_tuple_->MaterializeExprs<false>(input_row, *materialized_tuple_desc_,
             sort_exec_exprs_.sort_tuple_slot_expr_ctxs(), NULL);
-    if ((*tuple_row_less_than_)(tmp_tuple_, top_tuple)) {
+    if (tuple_row_less_than_->Less(tmp_tuple_, top_tuple)) {
       // TODO: DeepCopy() will allocate new buffers for the string data. This 
needs
       // to be fixed to use a freelist
       tmp_tuple_->DeepCopy(top_tuple, *materialized_tuple_desc_, 
tuple_pool_.get());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b204d5de/be/src/exec/topn-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.h b/be/src/exec/topn-node.h
index d51f9c9..a9e0bd9 100644
--- a/be/src/exec/topn-node.h
+++ b/be/src/exec/topn-node.h
@@ -99,9 +99,8 @@ class TopNNode : public ExecNode {
   /// The stl priority queue doesn't support a max size, so to get that 
functionality,
   /// the order of the queue is the opposite of what the ORDER BY clause 
specifies, such
   /// that the top of the queue is the last sorted element.
-  boost::scoped_ptr<
-      std::priority_queue<Tuple*, std::vector<Tuple*>, TupleRowComparator> >
-          priority_queue_;
+  boost::scoped_ptr<std::priority_queue<Tuple*, std::vector<Tuple*>,
+      ComparatorWrapper<TupleRowComparator> > > priority_queue_;
 
   /// END: Members that must be Reset()
   /////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b204d5de/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc 
b/be/src/runtime/sorted-run-merger.cc
index d773937..ea7a689 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -100,8 +100,8 @@ void SortedRunMerger::Heapify(int parent_index) {
   int least_child;
   // Find the least child of parent.
   if (right_index >= min_heap_.size() ||
-      compare_less_than_(min_heap_[left_index]->current_row(),
-          min_heap_[right_index]->current_row())) {
+      comparator_.Less(
+          min_heap_[left_index]->current_row(), 
min_heap_[right_index]->current_row())) {
     least_child = left_index;
   } else {
     least_child = right_index;
@@ -109,16 +109,16 @@ void SortedRunMerger::Heapify(int parent_index) {
 
   // If the parent is out of place, swap it with the least child and invoke
   // Heapify recursively.
-  if (compare_less_than_(min_heap_[least_child]->current_row(),
-      min_heap_[parent_index]->current_row())) {
+  if (comparator_.Less(min_heap_[least_child]->current_row(),
+          min_heap_[parent_index]->current_row())) {
     iter_swap(min_heap_.begin() + least_child, min_heap_.begin() + 
parent_index);
     Heapify(least_child);
   }
 }
 
-SortedRunMerger::SortedRunMerger(const TupleRowComparator& compare_less_than,
+SortedRunMerger::SortedRunMerger(const TupleRowComparator& comparator,
     RowDescriptor* row_desc, RuntimeProfile* profile, bool deep_copy_input)
-  : compare_less_than_(compare_less_than),
+  : comparator_(comparator),
     input_row_desc_(row_desc),
     deep_copy_input_(deep_copy_input) {
   get_next_timer_ = ADD_TIMER(profile, "MergeGetNext");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b204d5de/be/src/runtime/sorted-run-merger.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.h 
b/be/src/runtime/sorted-run-merger.h
index cb68ee1..d622fd9 100644
--- a/be/src/runtime/sorted-run-merger.h
+++ b/be/src/runtime/sorted-run-merger.h
@@ -46,7 +46,7 @@ class SortedRunMerger {
   /// batch being returned.
   typedef boost::function<Status (RowBatch**)> RunBatchSupplier;
 
-  SortedRunMerger(const TupleRowComparator& compare_less_than, RowDescriptor* 
row_desc,
+  SortedRunMerger(const TupleRowComparator& comparator, RowDescriptor* 
row_desc,
       RuntimeProfile* profile, bool deep_copy_input);
 
   /// Prepare this merger to merge and return rows from the sorted runs in 
'input_runs'.
@@ -72,13 +72,13 @@ class SortedRunMerger {
   /// stored in a 0-indexed array, the 0-th element is the minimum element in 
the heap,
   /// and the children of the element at index i are 2*i+1 and 2*i+2. The heap 
property is
   /// that row of the parent element is <= the rows of the child elements 
according to the
-  /// comparator compare_less_than_.
+  /// comparator comparator_.
   /// The BatchedRowSupplier objects used in the min_heap_ are owned by this
   /// SortedRunMerger instance.
   std::vector<BatchedRowSupplier*> min_heap_;
 
   /// Row comparator. Returns true if lhs < rhs.
-  TupleRowComparator compare_less_than_;
+  TupleRowComparator comparator_;
 
   /// Descriptor for the rows provided by the input runs. Owned by the 
exec-node through
   /// which this merger was created.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b204d5de/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 7de1b25..4640374 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -221,7 +221,7 @@ class Sorter::Run {
 /// instance to check for cancellation during an in-memory sort.
 class Sorter::TupleSorter {
  public:
-  TupleSorter(const TupleRowComparator& less_than_comp, int64_t block_size,
+  TupleSorter(const TupleRowComparator& comparator, int64_t block_size,
       int tuple_size, RuntimeState* state);
 
   ~TupleSorter();
@@ -324,8 +324,8 @@ class Sorter::TupleSorter {
   /// Offset in bytes of the last tuple in a block, calculated from block and 
tuple sizes.
   const int last_tuple_block_offset_;
 
-  /// Tuple comparator that returns true if lhs < rhs.
-  const TupleRowComparator less_than_comp_;
+  /// Tuple comparator with method Less() that returns true if lhs < rhs.
+  const TupleRowComparator comparator_;
 
   /// Runtime state instance to check for cancellation. Not owned.
   RuntimeState* const state_;
@@ -862,7 +862,7 @@ Sorter::TupleSorter::TupleSorter(const TupleRowComparator& 
comp, int64_t block_s
   : tuple_size_(tuple_size),
     block_capacity_(block_size / tuple_size),
     last_tuple_block_offset_(tuple_size * ((block_size / tuple_size) - 1)),
-    less_than_comp_(comp),
+    comparator_(comp),
     state_(state) {
   temp_tuple_buffer_ = new uint8_t[tuple_size];
   temp_tuple_row_ = reinterpret_cast<TupleRow*>(&temp_tuple_buffer_);
@@ -902,8 +902,8 @@ void Sorter::TupleSorter::InsertionSort(const 
TupleIterator& first,
     TupleIterator iter = insert_iter;
     iter.Prev();
     uint8_t* copy_to = insert_iter.current_tuple_;
-    while (less_than_comp_(temp_tuple_row_,
-        reinterpret_cast<TupleRow*>(&iter.current_tuple_))) {
+    while (comparator_.Less(
+        temp_tuple_row_, reinterpret_cast<TupleRow*>(&iter.current_tuple_))) {
       memcpy(copy_to, iter.current_tuple_, tuple_size_);
       copy_to = iter.current_tuple_;
       // Break if 'iter' has reached the first row, meaning that 
temp_tuple_row_
@@ -924,12 +924,12 @@ Sorter::TupleSorter::TupleIterator 
Sorter::TupleSorter::Partition(TupleIterator
   last.Prev();
   while (true) {
     // Search for the first and last out-of-place elements, and swap them.
-    while (less_than_comp_(reinterpret_cast<TupleRow*>(&first.current_tuple_),
-        temp_tuple_row_)) {
+    while (comparator_.Less(
+        reinterpret_cast<TupleRow*>(&first.current_tuple_), temp_tuple_row_)) {
       first.Next();
     }
-    while (less_than_comp_(temp_tuple_row_,
-        reinterpret_cast<TupleRow*>(&last.current_tuple_))) {
+    while (comparator_.Less(
+        temp_tuple_row_, reinterpret_cast<TupleRow*>(&last.current_tuple_))) {
       last.Prev();
     }
 
@@ -1002,9 +1002,9 @@ Tuple* Sorter::TupleSorter::MedianOfThree(Tuple* t1, 
Tuple* t2, Tuple* t3) {
   TupleRow* tr2 = reinterpret_cast<TupleRow*>(&t2);
   TupleRow* tr3 = reinterpret_cast<TupleRow*>(&t3);
 
-  bool t1_lt_t2 = less_than_comp_(tr1, tr2);
-  bool t2_lt_t3 = less_than_comp_(tr2, tr3);
-  bool t1_lt_t3 = less_than_comp_(tr1, tr3);
+  bool t1_lt_t2 = comparator_.Less(tr1, tr2);
+  bool t2_lt_t3 = comparator_.Less(tr2, tr3);
+  bool t1_lt_t3 = comparator_.Less(tr1, tr3);
 
   if (t1_lt_t2) {
     // t1 < t2

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b204d5de/be/src/service/impala-server-callbacks.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server-callbacks.cc 
b/be/src/service/impala-server-callbacks.cc
index 27806f5..88d4fa0 100644
--- a/be/src/service/impala-server-callbacks.cc
+++ b/be/src/service/impala-server-callbacks.cc
@@ -294,7 +294,7 @@ void ImpalaServer::QueryStateToJson(const 
ImpalaServer::QueryStateRecord& record
 
 void ImpalaServer::QueryStateUrlCallback(const Webserver::ArgumentMap& args,
     Document* document) {
-  set<QueryStateRecord, QueryStateRecord> sorted_query_records;
+  set<QueryStateRecord, QueryStateRecordLessThan> sorted_query_records;
   {
     lock_guard<mutex> l(query_exec_state_map_lock_);
     for(const QueryExecStateMap::value_type& exec_state: 
query_exec_state_map_) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b204d5de/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 2d314b2..25ef571 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1602,7 +1602,7 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const 
QueryExecState& exec_stat
   last_active_time = exec_state.last_active();
 }
 
-bool ImpalaServer::QueryStateRecord::operator() (
+bool ImpalaServer::QueryStateRecordLessThan::operator() (
     const QueryStateRecord& lhs, const QueryStateRecord& rhs) const {
   if (lhs.start_time == rhs.start_time) return lhs.id < rhs.id;
   return lhs.start_time < rhs.start_time;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b204d5de/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 339da4d..ce30725 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -622,7 +622,9 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
 
     /// Default constructor used only when participating in collections
     QueryStateRecord() { }
+  };
 
+  struct QueryStateRecordLessThan {
     /// Comparator that sorts by start time.
     bool operator() (const QueryStateRecord& lhs, const QueryStateRecord& rhs) 
const;
   };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b204d5de/be/src/util/tuple-row-compare.h
----------------------------------------------------------------------
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index 75406ad..41035df 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -26,6 +26,30 @@
 
 namespace impala {
 
+/// A wrapper around types Comparator with a Less() method. This wrapper 
allows the use of
+/// type Comparator with STL containers which expect a type like std::less<T>, 
which uses
+/// operator() instead of Less() and is cheap to copy.
+///
+/// The C++ standard requires that std::priority_queue operations behave as 
wrappers to
+/// {push,pop,make,sort}_heap, which take their comparator object by value. 
Therefore, it
+/// is inefficient to use comparator objects that have expensive construction,
+/// destruction, and copying with std::priority_queue.
+///
+/// ComparatorWrapper takes a reference to an object of type Comparator, 
rather than
+/// copying that object. ComparatorWrapper<Comparator>(comp) is not safe to 
use beyond the
+/// lifetime of comp.
+template <typename Comparator>
+class ComparatorWrapper {
+  const Comparator& comp_;
+ public:
+  ComparatorWrapper(const Comparator& comp) : comp_(comp) {}
+
+  template <typename T>
+  bool operator()(const T& lhs, const T& rhs) const {
+    return comp_.Less(lhs, rhs);
+  }
+};
+
 /// Compares two TupleRows based on a set of exprs, in order.
 class TupleRowComparator {
  public:
@@ -89,17 +113,17 @@ class TupleRowComparator {
   /// Returns true if lhs is strictly less than rhs.
   /// All exprs (key_exprs_lhs_ and key_exprs_rhs_) must have been prepared 
and opened
   /// before calling this.
-  bool operator() (TupleRow* lhs, TupleRow* rhs) const {
+  bool Less(TupleRow* lhs, TupleRow* rhs) const {
     int result = codegend_compare_fn_ == NULL ? Compare(lhs, rhs) :
         (*codegend_compare_fn_)(&key_expr_ctxs_lhs_[0], 
&key_expr_ctxs_rhs_[0], lhs, rhs);
     if (result < 0) return true;
     return false;
   }
 
-  bool operator() (Tuple* lhs, Tuple* rhs) const {
+  bool Less(Tuple* lhs, Tuple* rhs) const {
     TupleRow* lhs_row = reinterpret_cast<TupleRow*>(&lhs);
     TupleRow* rhs_row = reinterpret_cast<TupleRow*>(&rhs);
-    return (*this)(lhs_row, rhs_row);
+    return Less(lhs_row, rhs_row);
   }
 
  private:
@@ -131,7 +155,7 @@ struct TupleEqualityChecker {
   TupleEqualityChecker(TupleDescriptor* tuple_desc) : tuple_desc_(tuple_desc) {
   }
 
-  bool operator() (Tuple* x, Tuple* y) {
+  bool Equal(Tuple* x, Tuple* y) {
     const std::vector<SlotDescriptor*>& slots = tuple_desc_->slots();
     for (int i = 0; i < slots.size(); ++i) {
       SlotDescriptor* slot = slots[i];
@@ -169,11 +193,11 @@ struct RowEqualityChecker {
     }
   }
 
-  bool operator() (TupleRow* x, TupleRow* y) {
+  bool Equal(TupleRow* x, TupleRow* y) {
     for (int i = 0; i < tuple_checkers_.size(); ++i) {
       Tuple* x_tuple = x->GetTuple(i);
       Tuple* y_tuple = y->GetTuple(i);
-      if (!tuple_checkers_[i](x_tuple, y_tuple)) return false;
+      if (!tuple_checkers_[i].Equal(x_tuple, y_tuple)) return false;
     }
 
     return true;

Reply via email to