Repository: incubator-quickstep
Updated Branches:
  refs/heads/collision-free-agg 963a60428 -> 12b112f85


Updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/12b112f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/12b112f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/12b112f8

Branch: refs/heads/collision-free-agg
Commit: 12b112f851608c7f423cb9bca780b1be0489e55d
Parents: 963a604
Author: Jianqiao Zhu <jianq...@cs.wisc.edu>
Authored: Tue Jan 31 23:39:41 2017 -0600
Committer: Jianqiao Zhu <jianq...@cs.wisc.edu>
Committed: Tue Jan 31 23:39:41 2017 -0600

----------------------------------------------------------------------
 .../aggregation/AggregationConcreteHandle.cpp   | 23 ++++----
 .../aggregation/AggregationConcreteHandle.hpp   | 14 ++---
 expressions/aggregation/AggregationHandle.hpp   | 11 ++--
 .../aggregation/AggregationHandleSum.cpp        |  4 +-
 query_optimizer/ExecutionGenerator.cpp          |  2 +-
 .../InitializeAggregationStateOperator.cpp      |  2 +-
 storage/AggregationOperationState.cpp           | 56 +++++++++++++++++---
 storage/AggregationOperationState.hpp           |  3 +-
 .../CollisionFreeAggregationStateHashTable.cpp  |  3 +-
 9 files changed, 76 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp 
b/expressions/aggregation/AggregationConcreteHandle.cpp
index c3d133a..d28aa6e 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -48,21 +48,16 @@ AggregationStateHashTableBase* 
AggregationConcreteHandle::createDistinctifyHashT
 }
 
 void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
+    const std::vector<attribute_id> &argument_ids,
+    const std::vector<attribute_id> &key_attr_ids,
+    AggregationStateHashTableBase *distinctify_hash_table,
     ValueAccessor *accessor,
-    const std::vector<attribute_id> &key_ids,
-    AggregationStateHashTableBase *distinctify_hash_table) const {
-  // If the key-value pair is already there, we don't need to update the value,
-  // which should always be "true". I.e. the value is just a placeholder.
-//  AggregationStateFastHashTable *hash_table =
-//      static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
-//  if (key_ids.size() == 1) {
-//    hash_table->upsertValueAccessorFast(
-//        key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
-//  } else {
-//    std::vector<attribute_id> empty_args {kInvalidAttributeID};
-//    hash_table->upsertValueAccessorCompositeKeyFast(
-//        empty_args, accessor, key_ids, true /* check_for_null_keys */);
-//  }
+    ColumnVectorsValueAccessor *aux_accessor) const {
+  std::vector<attribute_id> combined_ids(key_attr_ids);
+  combined_ids.insert(combined_ids.end(), argument_ids.begin(), 
argument_ids.end());
+
+  static_cast<PackedPayloadSeparateChainingAggregationStateHashTable 
*>(distinctify_hash_table)
+      ->upsertValueAccessor({}, combined_ids, accessor, aux_accessor);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp 
b/expressions/aggregation/AggregationConcreteHandle.hpp
index 04be232..c49f597 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -110,9 +110,11 @@ class AggregationConcreteHandle : public AggregationHandle 
{
       StorageManager *storage_manager) const override;
 
   void insertValueAccessorIntoDistinctifyHashTable(
+      const std::vector<attribute_id> &argument_ids,
+      const std::vector<attribute_id> &key_attr_ids,
+      AggregationStateHashTableBase *distinctify_hash_table,
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &key_ids,
-      AggregationStateHashTableBase *distinctify_hash_table) const override;
+      ColumnVectorsValueAccessor *aux_accessor = nullptr) const override;
 
   void blockUpdate() override {
     block_update_ = true;
@@ -127,11 +129,11 @@ class AggregationConcreteHandle : public 
AggregationHandle {
       : AggregationHandle(agg_id) {}
 
   template <typename HandleT, typename StateT>
-  StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+  StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelper(
       const AggregationStateHashTableBase &distinctify_hash_table) const;
 
   template <typename HandleT, typename HashTableT>
-  void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+  void aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
       const AggregationStateHashTableBase &distinctify_hash_table,
       AggregationStateHashTableBase *hash_table,
       std::size_t index) const;
@@ -200,7 +202,7 @@ class HashTableAggregateFinalizer {
 
 template <typename HandleT, typename StateT>
 StateT* AggregationConcreteHandle::
-    aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+    aggregateOnDistinctifyHashTableForSingleUnaryHelper(
         const AggregationStateHashTableBase &distinctify_hash_table) const {
   const HandleT &handle = static_cast<const HandleT &>(*this);
   StateT *state = static_cast<StateT *>(createInitialState());
@@ -226,7 +228,7 @@ StateT* AggregationConcreteHandle::
 
 template <typename HandleT, typename HashTableT>
 void AggregationConcreteHandle::
-    aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+    aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
         const AggregationStateHashTableBase &distinctify_hash_table,
         AggregationStateHashTableBase *aggregation_hash_table,
         std::size_t index) const {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp 
b/expressions/aggregation/AggregationHandle.hpp
index bc9c27f..fd7f7af 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -249,11 +249,6 @@ class AggregationHandle {
    * @brief Inserts the GROUP BY expressions and aggregation arguments together
    * as keys into the distinctify hash table.
    *
-   * @param accessor The ValueAccessor that will be iterated over to read
-   *        tuples.
-   * @param key_ids The attribute_ids of the GROUP BY expressions in accessor
-   *        together with the attribute_ids of the arguments to this aggregate
-   *        in accessor, in order.
    * @param distinctify_hash_table The HashTable to store the GROUP BY
    *        expressions and the aggregation arguments together as hash table
    *        keys and a bool constant \c true as hash table value (So the hash
@@ -261,9 +256,11 @@ class AggregationHandle {
    *        by calling createDistinctifyHashTable();
    */
   virtual void insertValueAccessorIntoDistinctifyHashTable(
+      const std::vector<attribute_id> &argument_ids,
+      const std::vector<attribute_id> &key_attr_ids,
+      AggregationStateHashTableBase *distinctify_hash_table,
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &key_ids,
-      AggregationStateHashTableBase *distinctify_hash_table) const = 0;
+      ColumnVectorsValueAccessor *aux_accessor = nullptr) const = 0;
 
   /**
    * @brief Perform single (i.e. without GROUP BY) aggregation on the keys from

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp 
b/expressions/aggregation/AggregationHandleSum.cpp
index 29a986f..ce36e79 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -152,7 +152,7 @@ ColumnVector* AggregationHandleSum::finalizeHashTable(
 
 AggregationState* 
AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
       AggregationHandleSum,
       AggregationStateSum>(distinctify_hash_table);
 }
@@ -161,7 +161,7 @@ void 
AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
     AggregationStateHashTableBase *aggregation_hash_table,
     std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
       AggregationHandleSum,
       PackedPayloadSeparateChainingAggregationStateHashTable>(
           distinctify_hash_table, aggregation_hash_table, index);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp 
b/query_optimizer/ExecutionGenerator.cpp
index d32505b..76522dc 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1657,7 +1657,7 @@ void ExecutionGenerator::convertAggregate(
 
     execution_plan_->addDirectDependency(aggregation_operator_index,
                                          
initialize_aggregation_state_operator_index,
-                                         true);
+                                         true /* is_pipeline_breaker */);
   }
 
   // Create InsertDestination proto.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/relational_operators/InitializeAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationStateOperator.cpp 
b/relational_operators/InitializeAggregationStateOperator.cpp
index dfee459..b041aef 100644
--- a/relational_operators/InitializeAggregationStateOperator.cpp
+++ b/relational_operators/InitializeAggregationStateOperator.cpp
@@ -53,7 +53,7 @@ bool InitializeAggregationStateOperator::getAllWorkOrders(
     }
     started_ = true;
   }
-  return started_;
+  return true;
 }
 
 bool 
InitializeAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
 *container) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp 
b/storage/AggregationOperationState.cpp
index 1bc5832..6e7d2ae 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -136,6 +136,9 @@ AggregationOperationState::AggregationOperationState(
     std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator
         args_it = arguments.begin();
     std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
+    std::vector<HashTableImplType>::const_iterator
+        distinctify_hash_table_impl_types_it =
+            distinctify_hash_table_impl_types.begin();
     for (; agg_func_it != aggregate_functions.end();
          ++agg_func_it, ++args_it, ++is_distinct_it) {
       // Get the Types of this aggregate's arguments so that we can create an
@@ -174,13 +177,35 @@ AggregationOperationState::AggregationOperationState(
         // Aggregation with GROUP BY: combined payload is partially updated in
         // the presence of DISTINCT.
         if (*is_distinct_it) {
-          LOG(FATAL) << "Distinct aggregation not supported";
+          handles_.back()->blockUpdate();
         }
         group_by_handles.emplace_back(handles_.back().get());
       } else {
         // Aggregation without GROUP BY: create a single global state.
         single_states_.emplace_back(handles_.back()->createInitialState());
       }
+
+      // Initialize the corresponding distinctify hash table if this is a
+      // DISTINCT aggregation.
+      if (*is_distinct_it) {
+        std::vector<const Type *> key_types(group_by_types_);
+        key_types.insert(
+            key_types.end(), argument_types.begin(), argument_types.end());
+        // TODO(jianqiao): estimated_num_entries is quite inaccurate for
+        // estimating the number of entries in the distinctify hash table.
+        // We need to estimate for each distinct aggregation an
+        // estimated_num_distinct_keys value during query optimization.
+        distinctify_hashtables_.emplace_back(
+            AggregationStateHashTableFactory::CreateResizable(
+                *distinctify_hash_table_impl_types_it,
+                key_types,
+                estimated_num_entries,
+                {},
+                storage_manager));
+        ++distinctify_hash_table_impl_types_it;
+      } else {
+        distinctify_hashtables_.emplace_back(nullptr);
+      }
     }
 
     // Aggregation with GROUP BY: create a HashTable pool.
@@ -439,14 +464,23 @@ void AggregationOperationState::aggregateBlockSingleState(
     const auto &argument_ids = argument_ids_[agg_idx];
     const auto &handle = handles_[agg_idx];
 
-    AggregationState *state;
-    if (argument_ids.empty()) {
-      // Special case. This is a nullary aggregate (i.e. COUNT(*)).
-      state = handle->accumulateNullary(matches == nullptr ? 
tuple_store.numTuples()
-                                                           : matches->size());
+    AggregationState *state = nullptr;
+    if (is_distinct_[agg_idx]) {
+      handle->insertValueAccessorIntoDistinctifyHashTable(
+          argument_ids,
+          {},
+          distinctify_hashtables_[agg_idx].get(),
+          accessor.get(),
+          &non_trivial_results);
     } else {
-      // Have the AggregationHandle actually do the aggregation.
-      state = handle->accumulate(accessor.get(), &non_trivial_results, 
argument_ids);
+      if (argument_ids.empty()) {
+        // Special case. This is a nullary aggregate (i.e. COUNT(*)).
+        state = handle->accumulateNullary(matches == nullptr ? 
tuple_store.numTuples()
+                                                             : 
matches->size());
+      } else {
+        // Have the AggregationHandle actually do the aggregation.
+        state = handle->accumulate(accessor.get(), &non_trivial_results, 
argument_ids);
+      }
     }
     local_state.emplace_back(state);
   }
@@ -616,6 +650,12 @@ void AggregationOperationState::finalizeSingleState(
   std::vector<TypedValue> attribute_values;
 
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+    if (is_distinct_[agg_idx]) {
+      single_states_[agg_idx].reset(
+          handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(
+              *distinctify_hashtables_[agg_idx]));
+    }
+
     attribute_values.emplace_back(
         handles_[agg_idx]->finalize(*single_states_[agg_idx]));
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp 
b/storage/AggregationOperationState.hpp
index 44803fc..5ee675a 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -261,8 +261,7 @@ class AggregationOperationState {
   std::vector<const Type *> group_by_types_;
 
   // Hash table for obtaining distinct (i.e. unique) arguments.
-//  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
-//      distinctify_hashtables_;
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>> 
distinctify_hashtables_;
 
   // Per-aggregate global states for aggregation without GROUP BY.
   std::vector<std::unique_ptr<AggregationState>> single_states_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/storage/CollisionFreeAggregationStateHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.cpp 
b/storage/CollisionFreeAggregationStateHashTable.cpp
index 15d4dfe..39560cc 100644
--- a/storage/CollisionFreeAggregationStateHashTable.cpp
+++ b/storage/CollisionFreeAggregationStateHashTable.cpp
@@ -111,7 +111,8 @@ 
CollisionFreeAggregationStateHashTable::CollisionFreeAggregationStateHashTable(
   }
 
   memory_size_ = required_memory;
-  num_init_partitions_ = std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL);
+  num_init_partitions_ =
+      std::max(1uL, std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL));
 }
 
 
CollisionFreeAggregationStateHashTable::~CollisionFreeAggregationStateHashTable()
 {

Reply via email to