IMPALA-5788: Fix agg node crash when grouping by nondeterministic exprs Fixed a bug where impala crashes during execution of an aggregation query using nondeterministic grouping expressions. This happens when it tries to rebuild a spilled partition that can fit in memory and rows get re-hashed to a partition other than the spilled one due to the use of nondeterministic expressions.
Testing: Added a query test to verify successful execution. Change-Id: Ibdb09239577b3f0a19d710b0d148e882b0b73e23 Reviewed-on: http://gerrit.cloudera.org:8080/7714 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b6c02972 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b6c02972 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b6c02972 Branch: refs/heads/master Commit: b6c02972d6bb8dc4c62ef806c6145acae95842ad Parents: c871e00 Author: Bikramjeet Vig <[email protected]> Authored: Wed Aug 16 17:45:06 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Aug 23 03:59:02 2017 +0000 ---------------------------------------------------------------------- be/src/exec/partitioned-aggregation-node.cc | 23 +++++++++++++++++++- be/src/exec/partitioned-aggregation-node.h | 11 ++++++---- .../queries/QueryTest/spilling.test | 13 +++++++++++ 3 files changed, 42 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b6c02972/be/src/exec/partitioned-aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index a0fed41..b1d54a6 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -1152,6 +1152,17 @@ Status PartitionedAggregationNode::CreateHashPartitions( } hash_tbls_[i] = partition->hash_tbl.get(); } + // In this case we did not have to repartition, so ensure that while building the hash + // table all rows will be inserted into the partition at 'single_partition_idx' in case + // a non deterministic grouping expression causes a row to hash to a different + // partition index. + if (single_partition_idx != -1) { + Partition* partition = hash_partitions_[single_partition_idx]; + for (int i = 0; i < PARTITION_FANOUT; ++i) { + hash_partitions_[i] = partition; + hash_tbls_[i] = partition->hash_tbl.get(); + } + } COUNTER_ADD(partitions_created_, num_partitions_created); if (!is_streaming_preagg_) { @@ -1390,7 +1401,13 @@ Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) { } DCHECK_NE(partition_idx, -1) << "Should have been able to spill a partition to " << "reclaim memory: " << buffer_pool_client_.DebugString(); - hash_tbls_[partition_idx] = NULL; + // Remove references to the destroyed hash table from 'hash_tbls_'. + // Additionally, we might be dealing with a rebuilt spilled partition, where all + // partitions point to a single in-memory partition. This also ensures that 'hash_tbls_' + // remains consistent in that case. + for (int i = 0; i < PARTITION_FANOUT; ++i) { + if (hash_partitions_[i] == hash_partitions_[partition_idx]) hash_tbls_[i] = nullptr; + } return hash_partitions_[partition_idx]->Spill(more_aggregate_rows); } @@ -1402,6 +1419,10 @@ Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) { for (int i = 0; i < hash_partitions_.size(); ++i) { Partition* partition = hash_partitions_[i]; if (partition == nullptr) continue; + // We might be dealing with a rebuilt spilled partition, where all partitions are + // pointing to a single in-memory partition, so make sure we only proceed for the + // right partition. + if(i != partition->idx) continue; int64_t aggregated_rows = 0; if (partition->aggregated_row_stream != nullptr) { aggregated_rows = partition->aggregated_row_stream->num_rows(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b6c02972/be/src/exec/partitioned-aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h index 210400e..fa8674c 100644 --- a/be/src/exec/partitioned-aggregation-node.h +++ b/be/src/exec/partitioned-aggregation-node.h @@ -336,10 +336,14 @@ class PartitionedAggregationNode : public ExecNode { /// Object pool that holds the Partition objects in hash_partitions_. boost::scoped_ptr<ObjectPool> partition_pool_; - /// Current partitions we are partitioning into. + /// Current partitions we are partitioning into. IMPALA-5788: For the case where we + /// rebuild a spilled partition that fits in memory, all pointers in this vector will + /// point to a single in-memory partition. std::vector<Partition*> hash_partitions_; - /// Cache for hash tables in 'hash_partitions_'. + /// Cache for hash tables in 'hash_partitions_'. IMPALA-5788: For the case where we + /// rebuild a spilled partition that fits in memory, all pointers in this array will + /// point to the hash table that is a part of a single in-memory partition. HashTable* hash_tbls_[PARTITION_FANOUT]; /// All partitions that have been spilled and need further processing. @@ -623,8 +627,7 @@ class PartitionedAggregationNode : public ExecNode { /// Initializes hash_partitions_. 'level' is the level for the partitions to create. /// If 'single_partition_idx' is provided, it must be a number in range - /// [0, PARTITION_FANOUT), and only that partition is created - the others are - /// initialized to NULL. + /// [0, PARTITION_FANOUT), and only that partition is created - all others point to it. /// Also sets ht_ctx_'s level to 'level'. Status CreateHashPartitions( int level, int single_partition_idx = -1) WARN_UNUSED_RESULT; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b6c02972/testdata/workloads/functional-query/queries/QueryTest/spilling.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test index b6f4f12..3868e4f 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test @@ -343,3 +343,16 @@ bigint,bigint,bigint,int,decimal,decimal,decimal,decimal,string,string,string,st 1382,156162,6163,5,31.00,37762.96,0.07,0.03,'R','F','1993-10-26','1993-10-15','1993-11-09','TAKE BACK RETURN','FOB','hely regular dependencies. f' 1509,186349,3904,6,31.00,44495.54,0.04,0.03,'A','F','1993-07-14','1993-08-21','1993-08-06','COLLECT COD','SHIP','ic deposits cajole carefully. quickly bold ' ==== +---- QUERY +# Test spilling aggregation when grouping by nondeterministic expression +set buffer_pool_limit=5m; +set num_nodes=1; +select l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment +from tpch_parquet.lineitem +group by 1, 2, 3, 4, 5, random() +limit 5 +---- RUNTIME_PROFILE +row_regex: .*Query State: FINISHED.* +row_regex: .*Query Status: OK.* +row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\) +====
