Repository: incubator-impala Updated Branches: refs/heads/master a306096e5 -> 54cda7858
IMPALA-4866: Hash join node does not apply limits correctly Hash join node currently does not apply the limits correctly. This issue gets masked most of the times since the planner sticks an exchange node on top of most of the joins. This issue gets exposed when NUM_NODES=1. Change-Id: I414124f8bb6f8b2af2df468e1c23418d05a0e29f Reviewed-on: http://gerrit.cloudera.org:8080/6778 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/54cda785 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/54cda785 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/54cda785 Branch: refs/heads/master Commit: 54cda7858513256f776b04ae43c0a241f14faa72 Parents: a306096 Author: aphadke <[email protected]> Authored: Fri Apr 28 21:22:38 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Jun 22 21:56:25 2017 +0000 ---------------------------------------------------------------------- be/src/exec/partitioned-hash-join-node.cc | 25 +++++++---- .../queries/QueryTest/nested-types-subplan.test | 2 +- ...ingle-node-joins-with-limits-exhaustive.test | 45 ++++++++++++++++++++ tests/query_test/test_join_queries.py | 6 +++ 4 files changed, 69 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54cda785/be/src/exec/partitioned-hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index 0ecfab2..f931743 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -496,7 +496,11 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch Status status = Status::OK(); *eos = false; - while (!ReachedLimit()) { + // Save the number of rows in case GetNext() is called with a non-empty batch, + // which can happen in a subplan. + int num_rows_before = out_batch->num_rows(); + + while (true) { DCHECK(!*eos); DCHECK(status.ok()); DCHECK_NE(state_, PARTITIONING_BUILD) << "Should not be in GetNext()"; @@ -565,11 +569,9 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch } DCHECK(status.ok()); out_batch->CommitRows(rows_added); - num_rows_returned_ += rows_added; - if (out_batch->AtCapacity() || ReachedLimit()) break; + if (out_batch->AtCapacity()) break; DCHECK(current_probe_row_ == NULL); - COUNTER_SET(rows_returned_counter_, num_rows_returned_); } // Try to continue from the current probe side input. @@ -626,10 +628,20 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch break; } - if (ReachedLimit()) { + int num_rows_added = out_batch->num_rows() - num_rows_before; + DCHECK_GE(num_rows_added, 0); + + if (limit_ != -1 && num_rows_returned_ + num_rows_added > limit_) { + // Truncate the row batch if we went over the limit. + num_rows_added = limit_ - num_rows_returned_; + DCHECK_GE(num_rows_added, 0); + out_batch->set_num_rows(num_rows_before + num_rows_added); probe_batch_->TransferResourceOwnership(out_batch); *eos = true; } + + num_rows_returned_ += num_rows_added; + COUNTER_SET(rows_returned_counter_, num_rows_returned_); return Status::OK(); } @@ -637,7 +649,6 @@ Status PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) { SCOPED_TIMER(probe_timer_); DCHECK(NeedToProcessUnmatchedBuildRows()); DCHECK(!output_build_partitions_.empty()); - const int start_num_rows = out_batch->num_rows(); if (output_unmatched_batch_iter_.get() != NULL) { // There were no probe rows so we skipped building the hash table. In this case, all @@ -648,8 +659,6 @@ Status PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) { OutputUnmatchedBuildFromHashTable(out_batch); } - num_rows_returned_ += out_batch->num_rows() - start_num_rows; - COUNTER_SET(rows_returned_counter_, num_rows_returned_); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54cda785/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test index e18f25e..205978d 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test +++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test @@ -627,7 +627,7 @@ select count(*) FROM tpch_nested_parquet.customer c, (SELECT ca.o_orderkey okey, ca.o_orderpriority opriority FROM c.c_orders ca, c.c_orders cb WHERE ca.o_orderkey = cb.o_orderkey limit 2) v limit 51 ---- RESULTS -1500000 +199975 ---- TYPES bigint ==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54cda785/testdata/workloads/functional-query/queries/QueryTest/single-node-joins-with-limits-exhaustive.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/single-node-joins-with-limits-exhaustive.test b/testdata/workloads/functional-query/queries/QueryTest/single-node-joins-with-limits-exhaustive.test new file mode 100644 index 0000000..d0ac79d --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/single-node-joins-with-limits-exhaustive.test @@ -0,0 +1,45 @@ +==== +---- QUERY +# IMPALA-4866: Hash join node does not apply limits correctly +# Test that a join query applies the limits correctly when output_batch gets +# populated in OutputNullAwareProbe. If the output_batch is full after invoking +# this function, the ReachedLimit check wont be be correctly applied without this +# change +set batch_size=6; +select id, int_col, bigint_col from functional.alltypesagg a +where int_col not in (select int_col from functional.alltypestiny t +where a.id = t.id) limit 10995; +---- RUNTIME_PROFILE +row_regex: .*RowsProduced: 10.99..\W10995\W +==== +---- QUERY +# Test to verify that is limit_ is correctly enforced when +# output_batch is at AtCapacity. +set batch_size=6; +set max_block_mgr_memory=180m; +select * from tpch.lineitem t1 full outer join tpch.lineitem t2 on +t1.l_orderkey = t2.l_orderkey limit 10; +---- RUNTIME_PROFILE +row_regex: .*RowsProduced: 10 . +==== +---- QUERY +# IMPALA-4866: Hash join node does not apply limits correctly +# Test to ensure that the limit is correctly applied on a right +# join. Without this change this query returns 10 rows(batch_size) +set batch_size=10; +select straight_join t1.id, t2.id from functional.alltypes t1 +right join functional.alltypes t2 on t1.id = t2.int_col + 100000 +limit 5; +---- RUNTIME_PROFILE +row_regex: .*RowsProduced: 5 . +==== +---- QUERY +# IMPALA-4866: Hash join node does not apply limits correctly +# Test to ensure that the limit is correctly applied on a inner +# join. Without this change this query returns 10 rows(batch_size) +set batch_size=10; +select straight_join t1.id, t2.id from functional.alltypes t1 +inner join functional.alltypes t2 on t1.id = t2.id limit 5; +---- RUNTIME_PROFILE +row_regex: .*RowsProduced: 5 . +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54cda785/tests/query_test/test_join_queries.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_join_queries.py b/tests/query_test/test_join_queries.py index db1a4fa..b333a71 100644 --- a/tests/query_test/test_join_queries.py +++ b/tests/query_test/test_join_queries.py @@ -56,6 +56,12 @@ class TestJoinQueries(ImpalaTestSuite): new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size') self.run_test_case('QueryTest/joins', new_vector) + def test_single_node_joins_with_limits_exhaustive(self, vector): + if self.exploration_strategy() != 'exhaustive': pytest.skip() + new_vector = copy(vector) + new_vector.get_value('exec_option')['num_nodes'] = 1 + self.run_test_case('QueryTest/single-node-joins-with-limits-exhaustive', new_vector) + @SkipIfOldAggsJoins.unsupported def test_partitioned_joins(self, vector): self.run_test_case('QueryTest/joins-partitioned', vector)
