Created method for partition aware finalize aggregate.

Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/157ce784
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/157ce784
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/157ce784

Branch: refs/heads/partitioned-aggregation
Commit: 157ce784123dc514c4a8f4894cb1e6ad3570f98f
Parents: d28f99a
Author: Harshad Deshmukh <hbdeshm...@apache.org>
Authored: Thu Aug 18 16:54:38 2016 -0500
Committer: Harshad Deshmukh <hbdeshm...@apache.org>
Committed: Tue Sep 20 12:57:06 2016 -0500

----------------------------------------------------------------------
 storage/AggregationOperationState.cpp | 69 +++++++++++++++++++++++++++++-
 storage/AggregationOperationState.hpp | 33 +++++++++++++-
 2 files changed, 98 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/157ce784/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp 
b/storage/AggregationOperationState.cpp
index 7f63004..faf2c0b 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -565,8 +565,14 @@ void AggregationOperationState::finalizeHashTable(
     }
     AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
     DCHECK(agg_hash_table != nullptr);
-    ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
-        *agg_hash_table, &group_by_keys, agg_idx);
+    // TODO(harshad) - Modify the finalizeHashTable() function called below 
such
+    // that group_by_keys is a single ColumnVectorValueAccessor in which there
+    // is one ColumnVector per group by key. If we do that, the code below
+    // for reorganizing group_by_keys can be removed.
+    ColumnVector* agg_result_col =
+        handles_[agg_idx]->finalizeHashTable(*agg_hash_table,
+                                             &group_by_keys,
+                                              agg_idx);
     if (agg_result_col != nullptr) {
       final_values.emplace_back(agg_result_col);
     }
@@ -617,4 +623,63 @@ void AggregationOperationState::finalizeHashTable(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
+void AggregationOperationState::finalizeAggregatePartitioned(
+    const std::size_t partition_id, InsertDestination *output_destination) {
+  // Each element of 'group_by_keys' is a vector of values for a particular
+  // group (which is also the prefix of the finalized Tuple for that group).
+  std::vector<std::vector<TypedValue>> group_by_keys;
+
+  // Collect per-aggregate finalized values.
+  std::vector<std::unique_ptr<ColumnVector>> final_values;
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+    AggregationStateHashTableBase *hash_table =
+        partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
+    ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
+        *hash_table, &group_by_keys, agg_idx);
+    if (agg_result_col != nullptr) {
+      final_values.emplace_back(agg_result_col);
+    }
+  }
+
+  // Reorganize 'group_by_keys' in column-major order so that we can make a
+  // ColumnVectorsValueAccessor to bulk-insert results.
+  //
+  // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal
+  // if there is only one aggregate. The need to do this should hopefully go
+  // away when we work out storing composite structures for multiple aggregates
+  // in a single HashTable.
+  std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
+  std::size_t group_by_element_idx = 0;
+  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) 
{
+    const Type &group_by_type = group_by_element->getType();
+    if (NativeColumnVector::UsableForType(group_by_type)) {
+      NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, 
group_by_keys.size());
+      group_by_cvs.emplace_back(element_cv);
+      for (std::vector<TypedValue> &group_key : group_by_keys) {
+        
element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+      }
+    } else {
+      IndirectColumnVector *element_cv = new 
IndirectColumnVector(group_by_type, group_by_keys.size());
+      group_by_cvs.emplace_back(element_cv);
+      for (std::vector<TypedValue> &group_key : group_by_keys) {
+        
element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+      }
+    }
+    ++group_by_element_idx;
+  }
+
+  // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys
+  // and the finalized aggregates.
+  ColumnVectorsValueAccessor complete_result;
+  for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) {
+    complete_result.addColumn(group_by_cv.release());
+  }
+  for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) {
+    complete_result.addColumn(final_value_cv.release());
+  }
+
+  // Bulk-insert the complete result.
+  output_destination->bulkInsertTuples(&complete_result);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/157ce784/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp 
b/storage/AggregationOperationState.hpp
index 384d943..880375f 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -168,8 +168,37 @@ class AggregationOperationState {
    **/
   void finalizeAggregate(InsertDestination *output_destination);
 
-  static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
-                                     AggregationStateHashTableBase *dst);
+  /**
+   * @brief Generate the final results for the aggregates managed by this
+   *        AggregationOperationState, for the given partition and write them
+   *        out to StorageBlock(s).
+   *
+   * @param partition_id The Partition ID for which the finalize has to be
+   *        performed.
+   * @param output_destination An InsertDestination where the finalized output
+   *        tuple(s) from this aggregate are to be written.
+   **/
+  void finalizeAggregatePartitioned(const std::size_t partition_id,
+                                    InsertDestination *output_destination);
+
+  bool isAggregatePartitioned() const {
+    return is_aggregate_partitioned_;
+  }
+
+  /**
+   * @brief Get the number of partitions used for the aggregation.
+   *
+   * @note This is relevant only when is_aggregate_partitioned_ is true.
+   *
+   * @return The number of partitions used for the aggregation. Default is 1.
+   **/
+  std::size_t getNumPartitions() const {
+    if (is_aggregate_partitioned_) {
+      return partitioned_group_by_hashtable_pool_->getNumPartitions();
+    } else {
+      return 1;
+    }
+  }
 
   int dflag;
 

Reply via email to