Repository: incubator-impala
Updated Branches:
  refs/heads/master a306096e5 -> 54cda7858


IMPALA-4866: Hash join node does not apply limits correctly

Hash join node currently does not apply the limits correctly.
This issue gets masked most of the times since the planner sticks
an exchange node on top of most of the joins. This issue gets
exposed when NUM_NODES=1.

Change-Id: I414124f8bb6f8b2af2df468e1c23418d05a0e29f
Reviewed-on: http://gerrit.cloudera.org:8080/6778
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/54cda785
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/54cda785
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/54cda785

Branch: refs/heads/master
Commit: 54cda7858513256f776b04ae43c0a241f14faa72
Parents: a306096
Author: aphadke <[email protected]>
Authored: Fri Apr 28 21:22:38 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Jun 22 21:56:25 2017 +0000

----------------------------------------------------------------------
 be/src/exec/partitioned-hash-join-node.cc       | 25 +++++++----
 .../queries/QueryTest/nested-types-subplan.test |  2 +-
 ...ingle-node-joins-with-limits-exhaustive.test | 45 ++++++++++++++++++++
 tests/query_test/test_join_queries.py           |  6 +++
 4 files changed, 69 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54cda785/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc 
b/be/src/exec/partitioned-hash-join-node.cc
index 0ecfab2..f931743 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -496,7 +496,11 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* 
state, RowBatch* out_batch
 
   Status status = Status::OK();
   *eos = false;
-  while (!ReachedLimit()) {
+  // Save the number of rows in case GetNext() is called with a non-empty 
batch,
+  // which can happen in a subplan.
+  int num_rows_before = out_batch->num_rows();
+
+  while (true) {
     DCHECK(!*eos);
     DCHECK(status.ok());
     DCHECK_NE(state_, PARTITIONING_BUILD) << "Should not be in GetNext()";
@@ -565,11 +569,9 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* 
state, RowBatch* out_batch
       }
       DCHECK(status.ok());
       out_batch->CommitRows(rows_added);
-      num_rows_returned_ += rows_added;
-      if (out_batch->AtCapacity() || ReachedLimit()) break;
+      if (out_batch->AtCapacity()) break;
 
       DCHECK(current_probe_row_ == NULL);
-      COUNTER_SET(rows_returned_counter_, num_rows_returned_);
     }
 
     // Try to continue from the current probe side input.
@@ -626,10 +628,20 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* 
state, RowBatch* out_batch
     break;
   }
 
-  if (ReachedLimit()) {
+  int num_rows_added = out_batch->num_rows() - num_rows_before;
+  DCHECK_GE(num_rows_added, 0);
+
+  if (limit_ != -1 && num_rows_returned_ + num_rows_added > limit_) {
+    // Truncate the row batch if we went over the limit.
+    num_rows_added = limit_ - num_rows_returned_;
+    DCHECK_GE(num_rows_added, 0);
+    out_batch->set_num_rows(num_rows_before + num_rows_added);
     probe_batch_->TransferResourceOwnership(out_batch);
     *eos = true;
   }
+
+  num_rows_returned_ += num_rows_added;
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   return Status::OK();
 }
 
@@ -637,7 +649,6 @@ Status 
PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
   SCOPED_TIMER(probe_timer_);
   DCHECK(NeedToProcessUnmatchedBuildRows());
   DCHECK(!output_build_partitions_.empty());
-  const int start_num_rows = out_batch->num_rows();
 
   if (output_unmatched_batch_iter_.get() != NULL) {
     // There were no probe rows so we skipped building the hash table. In this 
case, all
@@ -648,8 +659,6 @@ Status 
PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
     OutputUnmatchedBuildFromHashTable(out_batch);
   }
 
-  num_rows_returned_ += out_batch->num_rows() - start_num_rows;
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54cda785/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
 
b/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
index e18f25e..205978d 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
@@ -627,7 +627,7 @@ select count(*) FROM tpch_nested_parquet.customer c, 
(SELECT ca.o_orderkey okey,
 ca.o_orderpriority opriority FROM c.c_orders ca, c.c_orders cb
 WHERE ca.o_orderkey = cb.o_orderkey limit 2) v limit 51
 ---- RESULTS
-1500000
+199975
 ---- TYPES
 bigint
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54cda785/testdata/workloads/functional-query/queries/QueryTest/single-node-joins-with-limits-exhaustive.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/single-node-joins-with-limits-exhaustive.test
 
b/testdata/workloads/functional-query/queries/QueryTest/single-node-joins-with-limits-exhaustive.test
new file mode 100644
index 0000000..d0ac79d
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/single-node-joins-with-limits-exhaustive.test
@@ -0,0 +1,45 @@
+====
+---- QUERY
+# IMPALA-4866: Hash join node does not apply limits correctly
+# Test that a join query applies the limits correctly when output_batch gets
+# populated in OutputNullAwareProbe. If the output_batch is full after invoking
+# this function, the ReachedLimit check wont be be correctly applied without 
this
+# change
+set batch_size=6;
+select id, int_col, bigint_col from functional.alltypesagg a
+where int_col not in (select int_col from functional.alltypestiny t
+where a.id = t.id) limit 10995;
+---- RUNTIME_PROFILE
+row_regex: .*RowsProduced: 10.99..\W10995\W
+====
+---- QUERY
+# Test to verify that is limit_ is correctly enforced when
+# output_batch is at AtCapacity.
+set batch_size=6;
+set max_block_mgr_memory=180m;
+select * from tpch.lineitem t1 full outer join tpch.lineitem t2 on
+t1.l_orderkey = t2.l_orderkey limit 10;
+---- RUNTIME_PROFILE
+row_regex: .*RowsProduced: 10 .
+====
+---- QUERY
+# IMPALA-4866: Hash join node does not apply limits correctly
+# Test to ensure that the limit is correctly applied on a right
+# join. Without this change this query returns 10 rows(batch_size)
+set batch_size=10;
+select straight_join t1.id, t2.id from functional.alltypes t1
+right join functional.alltypes t2 on t1.id = t2.int_col + 100000
+limit 5;
+---- RUNTIME_PROFILE
+row_regex: .*RowsProduced: 5 .
+====
+---- QUERY
+# IMPALA-4866: Hash join node does not apply limits correctly
+# Test to ensure that the limit is correctly applied on a inner
+# join. Without this change this query returns 10 rows(batch_size)
+set batch_size=10;
+select straight_join t1.id, t2.id from functional.alltypes t1
+inner join functional.alltypes t2 on t1.id = t2.id limit 5;
+---- RUNTIME_PROFILE
+row_regex: .*RowsProduced: 5 .
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54cda785/tests/query_test/test_join_queries.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_join_queries.py 
b/tests/query_test/test_join_queries.py
index db1a4fa..b333a71 100644
--- a/tests/query_test/test_join_queries.py
+++ b/tests/query_test/test_join_queries.py
@@ -56,6 +56,12 @@ class TestJoinQueries(ImpalaTestSuite):
     new_vector.get_value('exec_option')['batch_size'] = 
vector.get_value('batch_size')
     self.run_test_case('QueryTest/joins', new_vector)
 
+  def test_single_node_joins_with_limits_exhaustive(self, vector):
+    if self.exploration_strategy() != 'exhaustive': pytest.skip()
+    new_vector = copy(vector)
+    new_vector.get_value('exec_option')['num_nodes'] = 1
+    self.run_test_case('QueryTest/single-node-joins-with-limits-exhaustive', 
new_vector)
+
   @SkipIfOldAggsJoins.unsupported
   def test_partitioned_joins(self, vector):
     self.run_test_case('QueryTest/joins-partitioned', vector)

Reply via email to