Created method for partition aware finalize aggregate.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/928f6eb7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/928f6eb7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/928f6eb7 Branch: refs/heads/partitioned-aggregation Commit: 928f6eb70f95d5040aa4d76e4eb7317448c1df09 Parents: b020f20 Author: Harshad Deshmukh <hbdeshm...@apache.org> Authored: Thu Aug 18 16:54:38 2016 -0500 Committer: Harshad Deshmukh <hbdeshm...@apache.org> Committed: Fri Sep 16 13:23:47 2016 -0500 ---------------------------------------------------------------------- storage/AggregationOperationState.cpp | 69 +++++++++++++++++++++++++++++- storage/AggregationOperationState.hpp | 33 +++++++++++++- 2 files changed, 98 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/928f6eb7/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index c39e98a..6b4a672 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -566,8 +566,14 @@ void AggregationOperationState::finalizeHashTable( } AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); DCHECK(agg_hash_table != nullptr); - ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable( - *agg_hash_table, &group_by_keys, agg_idx); + // TODO(harshad) - Modify the finalizeHashTable() function called below such + // that group_by_keys is a single ColumnVectorValueAccessor in which there + // is one ColumnVector per group by key. If we do that, the code below + // for reorganizing group_by_keys can be removed. + ColumnVector* agg_result_col = + handles_[agg_idx]->finalizeHashTable(*agg_hash_table, + &group_by_keys, + agg_idx); if (agg_result_col != nullptr) { final_values.emplace_back(agg_result_col); } @@ -618,4 +624,63 @@ void AggregationOperationState::finalizeHashTable( output_destination->bulkInsertTuples(&complete_result); } +void AggregationOperationState::finalizeAggregatePartitioned( + const std::size_t partition_id, InsertDestination *output_destination) { + // Each element of 'group_by_keys' is a vector of values for a particular + // group (which is also the prefix of the finalized Tuple for that group). + std::vector<std::vector<TypedValue>> group_by_keys; + + // Collect per-aggregate finalized values. + std::vector<std::unique_ptr<ColumnVector>> final_values; + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { + AggregationStateHashTableBase *hash_table = + partitioned_group_by_hashtable_pool_->getHashTable(partition_id); + ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable( + *hash_table, &group_by_keys, agg_idx); + if (agg_result_col != nullptr) { + final_values.emplace_back(agg_result_col); + } + } + + // Reorganize 'group_by_keys' in column-major order so that we can make a + // ColumnVectorsValueAccessor to bulk-insert results. + // + // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal + // if there is only one aggregate. The need to do this should hopefully go + // away when we work out storing composite structures for multiple aggregates + // in a single HashTable. + std::vector<std::unique_ptr<ColumnVector>> group_by_cvs; + std::size_t group_by_element_idx = 0; + for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) { + const Type &group_by_type = group_by_element->getType(); + if (NativeColumnVector::UsableForType(group_by_type)) { + NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size()); + group_by_cvs.emplace_back(element_cv); + for (std::vector<TypedValue> &group_key : group_by_keys) { + element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + } + } else { + IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size()); + group_by_cvs.emplace_back(element_cv); + for (std::vector<TypedValue> &group_key : group_by_keys) { + element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + } + } + ++group_by_element_idx; + } + + // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys + // and the finalized aggregates. + ColumnVectorsValueAccessor complete_result; + for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) { + complete_result.addColumn(group_by_cv.release()); + } + for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) { + complete_result.addColumn(final_value_cv.release()); + } + + // Bulk-insert the complete result. + output_destination->bulkInsertTuples(&complete_result); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/928f6eb7/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 7e8acb5..37d77e3 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -168,8 +168,37 @@ class AggregationOperationState { **/ void finalizeAggregate(InsertDestination *output_destination); - static void mergeGroupByHashTables(AggregationStateHashTableBase *src, - AggregationStateHashTableBase *dst); + /** + * @brief Generate the final results for the aggregates managed by this + * AggregationOperationState, for the given partition and write them + * out to StorageBlock(s). + * + * @param partition_id The Partition ID for which the finalize has to be + * performed. + * @param output_destination An InsertDestination where the finalized output + * tuple(s) from this aggregate are to be written. + **/ + void finalizeAggregatePartitioned(const std::size_t partition_id, + InsertDestination *output_destination); + + bool isAggregatePartitioned() const { + return is_aggregate_partitioned_; + } + + /** + * @brief Get the number of partitions used for the aggregation. + * + * @note This is relevant only when is_aggregate_partitioned_ is true. + * + * @return The number of partitions used for the aggregation. Default is 1. + **/ + std::size_t getNumPartitions() const { + if (is_aggregate_partitioned_) { + return partitioned_group_by_hashtable_pool_->getNumPartitions(); + } else { + return 1; + } + } int dflag;