IMPALA-3524: Don't process spilled partitions with 0 probe rows

In the partitioned hash join node, if a spilled partition has no probe
rows, building the hash table is unnecessary.

For some build types (right outer, right anti, and full outer), we still
need to process the build side to output unmatched rows (in this case, all
rows since there were no probe rows to match).

Testing: Added some cases to spilling.test. Manually tested these cases
for performance, and they all show around a 6% improvement.

Change-Id: I175b32dd9031e51218b38c37693ac3e31dfab47b
Reviewed-on: http://gerrit.cloudera.org:8080/5389
Reviewed-by: Jim Apple <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6a9df540
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6a9df540
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6a9df540

Branch: refs/heads/master
Commit: 6a9df540967e07b09524268d0cc52b7d10835676
Parents: bdd39f6
Author: Thomas Tauber-Marshall <[email protected]>
Authored: Mon Dec 5 15:37:06 2016 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Mon Feb 6 20:22:33 2017 +0000

----------------------------------------------------------------------
 be/src/exec/partitioned-hash-join-node.cc       | 156 +++++++++++++++----
 be/src/exec/partitioned-hash-join-node.h        |  52 +++++--
 .../queries/QueryTest/spilling.test             |  62 ++++++++
 .../tpch/queries/tpch-outer-joins.test          |   3 -
 4 files changed, 233 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a9df540/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc 
b/be/src/exec/partitioned-hash-join-node.cc
index 6073486..0c91b47 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -135,6 +135,8 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* 
state) {
 
   num_probe_rows_partitioned_ =
       ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT);
+  num_hash_table_builds_skipped_ =
+      ADD_COUNTER(runtime_profile(), "NumHashTableBuildsSkipped", TUnit::UNIT);
   AddCodegenDisabledMessage(state);
   return Status::OK();
 }
@@ -201,6 +203,8 @@ Status PartitionedHashJoinNode::Reset(RuntimeState* state) {
   CloseAndDeletePartitions();
   builder_->Reset();
   memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
+  output_unmatched_batch_.reset();
+  output_unmatched_batch_iter_.reset();
   return ExecNode::Reset(state);
 }
 
@@ -238,6 +242,8 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   if (ht_ctx_ != NULL) ht_ctx_->Close();
   nulls_build_batch_.reset();
+  output_unmatched_batch_.reset();
+  output_unmatched_batch_iter_.reset();
   CloseAndDeletePartitions();
   if (builder_ != NULL) builder_->Close(state);
   Expr::Close(build_expr_ctxs_, state);
@@ -330,13 +336,17 @@ Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
       // In case of right-outer, right-anti and full-outer joins, we move this 
partition
       // to the list of partitions that we need to output their unmatched 
build rows.
       DCHECK(output_build_partitions_.empty());
-      DCHECK(input_partition_->build_partition()->hash_tbl() != NULL)
-          << " id: " << id_
-          << " Build: " << 
input_partition_->build_partition()->build_rows()->num_rows()
-          << " Probe: " << probe_rows->num_rows() << endl
-          << GetStackTrace();
-      hash_tbl_iterator_ =
-          
input_partition_->build_partition()->hash_tbl()->FirstUnmatched(ht_ctx_.get());
+      DCHECK(output_unmatched_batch_iter_.get() == NULL);
+      if (input_partition_->build_partition()->hash_tbl() != NULL) {
+        hash_tbl_iterator_ =
+            input_partition_->build_partition()->hash_tbl()->FirstUnmatched(
+                ht_ctx_.get());
+      } else {
+        output_unmatched_batch_.reset(new RowBatch(
+            child(1)->row_desc(), runtime_state_->batch_size(), 
builder_->mem_tracker()));
+        output_unmatched_batch_iter_.reset(
+            new RowBatch::Iterator(output_unmatched_batch_.get(), 0));
+      }
       output_build_partitions_.push_back(input_partition_->build_partition());
     } else {
       // In any other case, just close the input build partition.
@@ -365,6 +375,24 @@ Status 
PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
   spilled_partitions_.pop_front();
   PhjBuilder::Partition* build_partition = input_partition_->build_partition();
   DCHECK(build_partition->is_spilled());
+  if (input_partition_->probe_rows()->num_rows() == 0) {
+    // If there are no probe rows, there's no need to build the hash table, and
+    // only partitions with NeedToProcessUnmatcheBuildRows() will have been 
added
+    // to 'spilled_partitions_' in CleanUpHashPartitions().
+    DCHECK(NeedToProcessUnmatchedBuildRows());
+    bool got_read_buffer = false;
+    
RETURN_IF_ERROR(input_partition_->build_partition()->build_rows()->PrepareForRead(
+        false, &got_read_buffer));
+    if (!got_read_buffer) {
+      return mem_tracker()->MemLimitExceeded(
+          runtime_state_, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
+    }
+
+    *got_partition = true;
+    UpdateState(PROBING_SPILLED_PARTITION);
+    COUNTER_ADD(num_hash_table_builds_skipped_, 1);
+    return Status::OK();
+  }
 
   // Make sure we have a buffer to read the probe rows before we build the 
hash table.
   RETURN_IF_ERROR(input_partition_->PrepareForRead());
@@ -614,27 +642,82 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* 
state, RowBatch* out_batch
   return Status::OK();
 }
 
-void PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
+Status PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
   SCOPED_TIMER(probe_timer_);
   DCHECK(NeedToProcessUnmatchedBuildRows());
   DCHECK(!output_build_partitions_.empty());
+  const int start_num_rows = out_batch->num_rows();
+
+  if (output_unmatched_batch_iter_.get() != NULL) {
+    // There were no probe rows so we skipped building the hash table. In this 
case, all
+    // build rows of the partition are unmatched.
+    RETURN_IF_ERROR(OutputAllBuild(out_batch));
+  } else {
+    // We built and processed the hash table, so sweep over it and output 
unmatched rows.
+    RETURN_IF_ERROR(OutputUnmatchedBuildFromHashTable(out_batch));
+  }
+
+  num_rows_returned_ += out_batch->num_rows() - start_num_rows;
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  return Status::OK();
+}
+
+Status PartitionedHashJoinNode::OutputAllBuild(RowBatch* out_batch) {
+  // This will only be called for partitions that are added to 
'output_build_partitions_'
+  // in NextSpilledProbeRowBatch(), which adds one partition that is then 
processed until
+  // it is done by the loop in GetNext(). So, there must be exactly one 
partition in
+  // 'output_build_partitions_' here.
+  DCHECK_EQ(output_build_partitions_.size(), 1);
+  ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
+  const int num_conjuncts = conjunct_ctxs_.size();
+  RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
+
+  bool eos = false;
+  while (!eos && !out_batch->AtCapacity()) {
+    if (output_unmatched_batch_iter_->AtEnd()) {
+      output_unmatched_batch_->TransferResourceOwnership(out_batch);
+      output_unmatched_batch_->Reset();
+      RETURN_IF_ERROR(output_build_partitions_.front()->build_rows()->GetNext(
+          output_unmatched_batch_.get(), &eos));
+      output_unmatched_batch_iter_.reset(
+          new RowBatch::Iterator(output_unmatched_batch_.get(), 0));
+    }
+
+    for (; !output_unmatched_batch_iter_->AtEnd() && !out_batch->AtCapacity();
+         output_unmatched_batch_iter_->Next()) {
+      OutputBuildRow(out_batch, output_unmatched_batch_iter_->Get(), 
&out_batch_iterator);
+      if (ExecNode::EvalConjuncts(
+              conjunct_ctxs, num_conjuncts, out_batch_iterator.Get())) {
+        out_batch->CommitLastRow();
+        out_batch_iterator.Next();
+      }
+    }
+  }
+
+  // If we reached eos and finished the last batch, then there are no other 
unmatched
+  // build rows for this partition. In that case we need to close the 
partition.
+  // Otherwise, we reached out_batch capacity and we need to continue to output
+  // unmatched build rows, without closing the partition.
+  if (eos && output_unmatched_batch_iter_->AtEnd()) {
+    output_build_partitions_.front()->Close(out_batch);
+    output_build_partitions_.pop_front();
+    DCHECK(output_build_partitions_.empty());
+    output_unmatched_batch_iter_.reset();
+  }
+  return Status::OK();
+}
+
+Status PartitionedHashJoinNode::OutputUnmatchedBuildFromHashTable(RowBatch* 
out_batch) {
   ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
   const int num_conjuncts = conjunct_ctxs_.size();
   RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
-  const int start_num_rows = out_batch->num_rows();
 
   while (!out_batch->AtCapacity() && !hash_tbl_iterator_.AtEnd()) {
     // Output remaining unmatched build rows.
     if (!hash_tbl_iterator_.IsMatched()) {
-      TupleRow* build_row = hash_tbl_iterator_.GetRow();
-      DCHECK(build_row != NULL);
-      if (join_op_ == TJoinOp::RIGHT_ANTI_JOIN) {
-        out_batch->CopyRow(build_row, out_batch_iterator.Get());
-      } else {
-        CreateOutputRow(out_batch_iterator.Get(), NULL, build_row);
-      }
-      if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts,
-          out_batch_iterator.Get())) {
+      OutputBuildRow(out_batch, hash_tbl_iterator_.GetRow(), 
&out_batch_iterator);
+      if (ExecNode::EvalConjuncts(
+              conjunct_ctxs, num_conjuncts, out_batch_iterator.Get())) {
         out_batch->CommitLastRow();
         out_batch_iterator.Next();
       }
@@ -646,9 +729,9 @@ void 
PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
 
   // If we reached the end of the hash table, then there are no other 
unmatched build
   // rows for this partition. In that case we need to close the partition, and 
move to
-  // the next. If we have not reached the end of the hash table, it means that 
we reached
-  // out_batch capacity and we need to continue to output unmatched build 
rows, without
-  // closing the partition.
+  // the next. If we have not reached the end of the hash table, it means that 
we
+  // reached out_batch capacity and we need to continue to output unmatched 
build rows,
+  // without closing the partition.
   if (hash_tbl_iterator_.AtEnd()) {
     output_build_partitions_.front()->Close(out_batch);
     output_build_partitions_.pop_front();
@@ -658,9 +741,17 @@ void 
PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
           
output_build_partitions_.front()->hash_tbl()->FirstUnmatched(ht_ctx_.get());
     }
   }
+  return Status::OK();
+}
 
-  num_rows_returned_ += out_batch->num_rows() - start_num_rows;
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+void PartitionedHashJoinNode::OutputBuildRow(
+    RowBatch* out_batch, TupleRow* build_row, RowBatch::Iterator* 
out_batch_iterator) {
+  DCHECK(build_row != NULL);
+  if (join_op_ == TJoinOp::RIGHT_ANTI_JOIN) {
+    out_batch->CopyRow(build_row, out_batch_iterator->Get());
+  } else {
+    CreateOutputRow(out_batch_iterator->Get(), NULL, build_row);
+  }
 }
 
 Status PartitionedHashJoinNode::PrepareNullAwareNullProbe() {
@@ -970,13 +1061,24 @@ Status 
PartitionedHashJoinNode::CleanUpHashPartitions(RowBatch* batch) {
       RETURN_IF_ERROR(
           
probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
 
-      // Push newly created partitions at the front. This means a depth first 
walk
-      // (more finely partitioned partitions are processed first). This allows 
us
-      // to delete blocks earlier and bottom out the recursion earlier.
-      spilled_partitions_.push_front(std::move(probe_hash_partitions_[i]));
+      if (probe_partition->probe_rows()->num_rows() != 0
+          || NeedToProcessUnmatchedBuildRows()) {
+        // Push newly created partitions at the front. This means a depth 
first walk
+        // (more finely partitioned partitions are processed first). This 
allows us
+        // to delete blocks earlier and bottom out the recursion earlier.
+        spilled_partitions_.push_front(std::move(probe_hash_partitions_[i]));
+      } else {
+        // There's no more processing to do for this partition, and since 
there were no
+        // probe rows we didn't return any rows that reference memory from 
these
+        // partitions, so just free the resources.
+        build_partition->Close(NULL);
+        probe_partition->Close(NULL);
+        COUNTER_ADD(num_hash_table_builds_skipped_, 1);
+      }
     } else {
       DCHECK(probe_partition == NULL);
       if (NeedToProcessUnmatchedBuildRows()) {
+        DCHECK(output_unmatched_batch_iter_.get() == NULL);
         if (output_build_partitions_.empty()) {
           hash_tbl_iterator_ = 
build_partition->hash_tbl()->FirstUnmatched(ht_ctx_.get());
         }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a9df540/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h 
b/be/src/exec/partitioned-hash-join-node.h
index 504dc7b..6d4e7f4 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -282,12 +282,29 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   int ProcessProbeBatch(const TJoinOp::type join_op, TPrefetchMode::type,
       RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status);
 
-  /// Sweep the hash_tbl_ of the partition that is at the front of
-  /// output_build_partitions_, using hash_tbl_iterator_ and output any 
unmatched build
-  /// rows. If reaches the end of the hash table it closes that partition, 
removes it from
-  /// output_build_partitions_ and moves hash_tbl_iterator_ to the beginning 
of the
-  /// new partition at the front of output_build_partitions_.
-  void OutputUnmatchedBuild(RowBatch* out_batch);
+  /// Used when NeedToProcessUnmatchedBuildRows() is true. Writes all 
unmatched rows from
+  /// 'output_build_partitions_' to 'out_batch', up to 'out_batch' capacity.
+  Status OutputUnmatchedBuild(RowBatch* out_batch);
+
+  /// Called by OutputUnmatchedBuild() when there isn't a hash table built, 
which happens
+  /// when a spilled partition had 0 probe rows. In this case, all of the 
build rows are
+  /// unmatched and we can iterate over the entire build side of the 
partition, which will
+  /// be the only partition in 'output_build_partitions_'. If it reaches the 
end of the
+  /// partition, it closes that partition and removes it from 
'output_build_partitions_'.
+  Status OutputAllBuild(RowBatch* out_batch);
+
+  /// Called by OutputUnmatchedBuild when there is a hash table built. Sweeps 
the
+  /// 'hash_tbl_' of the partition that is at the front of 
'output_build_partitions_',
+  /// using 'hash_tbl_iterator_' and outputs any unmatched build rows. If it 
reaches the
+  /// end of the hash table it closes that partition, removes it from
+  /// 'output_build_partitions_' and moves 'hash_tbl_iterator_' to the 
beginning of the
+  /// new partition at the front of 'output_build_partitions_'.
+  Status OutputUnmatchedBuildFromHashTable(RowBatch* out_batch);
+
+  /// Writes 'build_row' to 'out_batch' at the position of 
'out_batch_iterator' in a
+  /// 'join_op_' specific way.
+  void OutputBuildRow(
+      RowBatch* out_batch, TupleRow* build_row, RowBatch::Iterator* 
out_batch_iterator);
 
   /// Initializes 'null_aware_probe_partition_' and prepares its probe stream 
for writing.
   Status InitNullAwareProbePartition();
@@ -338,10 +355,12 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Moves onto the next spilled partition and initializes 
'input_partition_'. This
   /// function processes the entire build side of 'input_partition_' and when 
this
   /// function returns, we are ready to consume the probe side of 
'input_partition_'.
-  /// If the build side's hash table fits in memory, we will construct 
input_partition_'s
-  /// hash table. If it does not, meaning we need to repartition, this 
function will
-  /// repartition the build rows into 'builder->hash_partitions_' and prepare 
for
-  /// repartitioning the partition's probe rows.
+  /// If the build side's hash table fits in memory and there are probe rows, 
we will
+  /// construct input_partition_'s hash table. If it does not fit, meaning we 
need to
+  /// repartition, this function will repartition the build rows into
+  /// 'builder->hash_partitions_' and prepare for repartitioning the 
partition's probe
+  /// rows. If there are no probe rows, we just prepare the build side to be 
read by
+  /// OutputUnmatchedBuild().
   Status PrepareSpilledPartitionForProbe(RuntimeState* state, bool* 
got_partition);
 
   /// Calls Close() on every probe partition, destroys the partitions and 
cleans up any
@@ -389,6 +408,10 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Time spent evaluating other_join_conjuncts for NAAJ.
   RuntimeProfile::Counter* null_aware_eval_timer_;
 
+  /// Number of partitions which had zero probe rows and we therefore didn't 
build the
+  /// hash table.
+  RuntimeProfile::Counter* num_hash_table_builds_skipped_;
+
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
@@ -451,6 +474,15 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// outputting.
   int64_t null_probe_output_idx_;
 
+  /// Used by OutputAllBuild() to iterate over the entire build side tuple 
stream of the
+  /// current partition.
+  std::unique_ptr<RowBatch> output_unmatched_batch_;
+
+  /// Stores an iterator into 'output_unmatched_batch_' to start from on the 
next call to
+  /// OutputAllBuild(), or NULL if there are no partitions without hash tables 
needing to
+  /// be processed by OutputUnmatchedBuild().
+  std::unique_ptr<RowBatch::Iterator> output_unmatched_batch_iter_;
+
   /// END: Members that must be Reset()
   /////////////////////////////////////////
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a9df540/testdata/workloads/functional-query/queries/QueryTest/spilling.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/spilling.test 
b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index 91b425e..89668e8 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -628,3 +628,65 @@ 
BIGINT,BIGINT,BIGINT,INT,DECIMAL,DECIMAL,DECIMAL,DECIMAL,STRING,STRING,STRING,ST
 row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
 row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\)
 ====
+---- QUERY
+# Tests for the case where a spilled partition has 0 probe rows and so we 
don't build the
+# hash table in a partitioned hash join.
+# INNER JOIN
+set max_block_mgr_memory=10m;
+select straight_join count(*)
+from
+lineitem a, lineitem b
+where
+a.l_partkey = 1 and
+a.l_orderkey = b.l_orderkey;
+---- TYPES
+BIGINT
+---- RESULTS
+173
+---- RUNTIME_PROFILE
+row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# spilled partition with 0 probe rows, NULL AWARE LEFT ANTI JOIN
+set max_block_mgr_memory=10m;
+select straight_join count(*)
+from
+lineitem a
+where
+a.l_partkey not in (select l_partkey from lineitem where l_partkey > 10)
+and a.l_partkey < 1000;
+---- TYPES
+BIGINT
+---- RESULTS
+287
+---- RUNTIME_PROFILE
+row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# spilled partition with 0 probe rows, RIGHT OUTER JOIN
+set max_block_mgr_memory=100m;
+select straight_join count(*)
+from
+supplier right outer join lineitem on s_suppkey = l_suppkey
+where s_acctbal > 0 and s_acctbal < 10;
+---- TYPES
+BIGINT
+---- RESULTS
+12138
+---- RUNTIME_PROFILE
+row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# spilled partition with 0 probe rows, RIGHT ANTI JOIN
+set max_block_mgr_memory=30m;
+with x as (select * from supplier limit 10)
+select straight_join count(*)
+from
+x right anti join lineitem on s_suppkey + 100 = l_suppkey;
+---- TYPES
+BIGINT
+---- RESULTS
+5995258
+---- RUNTIME_PROFILE
+row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
+====
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a9df540/testdata/workloads/tpch/queries/tpch-outer-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpch/queries/tpch-outer-joins.test 
b/testdata/workloads/tpch/queries/tpch-outer-joins.test
index 0da2850..a189a7a 100644
--- a/testdata/workloads/tpch/queries/tpch-outer-joins.test
+++ b/testdata/workloads/tpch/queries/tpch-outer-joins.test
@@ -29,9 +29,6 @@ SELECT straight_join * FROM orders o
 RIGHT OUTER JOIN lineitem l ON o.o_orderkey =  if(l.l_orderkey % 2 = 0, 0, 
l.l_orderkey)
 ORDER BY l_receiptdate, l_orderkey, l_shipdate
 limit 10
----- CATCH: ANY_OF
-Repartitioning did not reduce the size of a spilled partition
-Memory limit exceeded
 ====
 ---- QUERY
 # Regression test for IMPALA-2612. The following query will cause CastToChar

Reply via email to