Repository: incubator-quickstep
Updated Branches:
  refs/heads/untyped-agg 9ccd5a311 -> c41451dbd


Updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c41451db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c41451db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c41451db

Branch: refs/heads/untyped-agg
Commit: c41451dbd01fbc11609f996fe9357e091eb49307
Parents: 9ccd5a3
Author: Jianqiao Zhu <jianq...@cs.wisc.edu>
Authored: Tue Oct 18 09:53:43 2016 -0500
Committer: Jianqiao Zhu <jianq...@cs.wisc.edu>
Committed: Tue Oct 18 09:53:43 2016 -0500

----------------------------------------------------------------------
 .../aggregation/AggregationHandleAvg.cpp        | 270 ++++++++-----------
 .../aggregation/AggregationHandleAvg.hpp        |  33 ++-
 .../aggregation/AggregationHandleCount.cpp      |   6 +-
 .../aggregation/AggregationHandleSum.cpp        |   1 +
 storage/AggregationOperationState.cpp           |  63 ++---
 storage/AggregationOperationState.hpp           |   1 +
 storage/AggregationStateHashTable.hpp           | 157 +++++++----
 storage/AggregationStateManager.hpp             |  48 ++--
 storage/HashTableBase.hpp                       |   3 +
 storage/StorageBlock.cpp                        |  91 +++----
 storage/StorageBlock.hpp                        |   6 +-
 11 files changed, 341 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp 
b/expressions/aggregation/AggregationHandleAvg.cpp
index 47f3f41..e90f10f 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -24,8 +24,6 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
 #include "types/Type.hpp"
 #include "types/TypeFactory.hpp"
 #include "types/TypeID.hpp"
@@ -40,165 +38,113 @@ namespace quickstep {
 
 class StorageManager;
 
-AggregationHandleAvg::AggregationHandleAvg(const Type &type) {}
-//    : argument_type_(type), block_update_(false) {
-//  // We sum Int as Long and Float as Double so that we have more headroom 
when
-//  // adding many values.
-//  TypeID type_precision_id;
-//  switch (type.getTypeID()) {
-//    case kInt:
-//    case kLong:
-//      type_precision_id = kLong;
-//      break;
-//    case kFloat:
-//    case kDouble:
-//      type_precision_id = kDouble;
-//      break;
-//    default:
-//      type_precision_id = type.getTypeID();
-//      break;
-//  }
-//
-//  const Type &sum_type = TypeFactory::GetType(type_precision_id);
-//  blank_state_.sum_ = sum_type.makeZeroValue();
-//  blank_state_.count_ = 0;
-//
-//  // Make operators to do arithmetic:
-//  // Add operator for summing argument values.
-//  fast_add_operator_.reset(
-//      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
-//          .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_));
-//  // Add operator for merging states.
-//  merge_add_operator_.reset(
-//      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
-//          .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type));
-//  // Divide operator for dividing sum by count to get final average.
-//  divide_operator_.reset(
-//      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
-//          .makeUncheckedBinaryOperatorForTypes(sum_type,
-//                                               
TypeFactory::GetType(kDouble)));
-//
-//  // Result is nullable, because AVG() over 0 values (or all NULL values) is
-//  // NULL.
-//  result_type_ =
-//      
&(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
-//            .resultTypeForArgumentTypes(sum_type, 
TypeFactory::GetType(kDouble))
-//            ->getNullableVersion());
-//}
-//
-//AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable(
-//    const HashTableImplType hash_table_impl,
-//    const std::vector<const Type *> &group_by_types,
-//    const std::size_t estimated_num_groups,
-//    StorageManager *storage_manager) const {
-//  return 
AggregationStateHashTableFactory<AggregationStateAvg>::CreateResizable(
-//      hash_table_impl, group_by_types, estimated_num_groups, 
storage_manager);
-//}
-//
-//AggregationState* AggregationHandleAvg::accumulateColumnVectors(
-//    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-//  DCHECK_EQ(1u, column_vectors.size())
-//      << "Got wrong number of ColumnVectors for AVG: " << 
column_vectors.size();
-//
-//  AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
-//  std::size_t count = 0;
-//  state->sum_ = fast_add_operator_->accumulateColumnVector(
-//      state->sum_, *column_vectors.front(), &count);
-//  state->count_ = count;
-//  return state;
-//}
-//
-//#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-//AggregationState* AggregationHandleAvg::accumulateValueAccessor(
-//    ValueAccessor *accessor,
-//    const std::vector<attribute_id> &accessor_ids) const {
-//  DCHECK_EQ(1u, accessor_ids.size())
-//      << "Got wrong number of attributes for AVG: " << accessor_ids.size();
-//
-//  AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
-//  std::size_t count = 0;
-//  state->sum_ = fast_add_operator_->accumulateValueAccessor(
-//      state->sum_, accessor, accessor_ids.front(), &count);
-//  state->count_ = count;
-//  return state;
-//}
-//#endif
-//
-//void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
-//    ValueAccessor *accessor,
-//    const std::vector<attribute_id> &argument_ids,
-//    const std::vector<attribute_id> &group_by_key_ids,
-//    AggregationStateHashTableBase *hash_table) const {
-//  DCHECK_EQ(1u, argument_ids.size())
-//      << "Got wrong number of arguments for AVG: " << argument_ids.size();
-//}
-//
-//void AggregationHandleAvg::mergeStates(const AggregationState &source,
-//                                       AggregationState *destination) const {
-//  const AggregationStateAvg &avg_source =
-//      static_cast<const AggregationStateAvg &>(source);
-//  AggregationStateAvg *avg_destination =
-//      static_cast<AggregationStateAvg *>(destination);
-//
-//  SpinMutexLock lock(avg_destination->mutex_);
-//  avg_destination->count_ += avg_source.count_;
-//  avg_destination->sum_ = merge_add_operator_->applyToTypedValues(
-//      avg_destination->sum_, avg_source.sum_);
-//}
-//
-//void AggregationHandleAvg::mergeStatesFast(const std::uint8_t *source,
-//                                           std::uint8_t *destination) const {
-//  const TypedValue *src_sum_ptr =
-//      reinterpret_cast<const TypedValue *>(source + 
blank_state_.sum_offset_);
-//  const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(
-//      source + blank_state_.count_offset_);
-//  TypedValue *dst_sum_ptr =
-//      reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_);
-//  std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(
-//      destination + blank_state_.count_offset_);
-//  (*dst_count_ptr) += (*src_count_ptr);
-//  *dst_sum_ptr =
-//      merge_add_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
-//}
-//
-//TypedValue AggregationHandleAvg::finalize(const AggregationState &state) 
const {
-//  const AggregationStateAvg &agg_state =
-//      static_cast<const AggregationStateAvg &>(state);
-//  if (agg_state.count_ == 0) {
-//    // AVG() over no values is NULL.
-//    return result_type_->makeNullValue();
-//  } else {
-//    // Divide sum by count to get final average.
-//    return divide_operator_->applyToTypedValues(
-//        agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
-//  }
-//}
-//
-//ColumnVector* AggregationHandleAvg::finalizeHashTable(
-//    const AggregationStateHashTableBase &hash_table,
-//    std::vector<std::vector<TypedValue>> *group_by_keys,
-//    int index) const {
-//  return finalizeHashTableHelperFast<AggregationHandleAvg,
-//                                     AggregationStateFastHashTable>(
-//      *result_type_, hash_table, group_by_keys, index);
-//}
-//
-//AggregationState*
-//AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
-//    const AggregationStateHashTableBase &distinctify_hash_table) const {
-//  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-//      AggregationHandleAvg,
-//      AggregationStateAvg>(distinctify_hash_table);
-//}
-//
-//void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
-//    const AggregationStateHashTableBase &distinctify_hash_table,
-//    AggregationStateHashTableBase *aggregation_hash_table,
-//    std::size_t index) const {
-//  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
-//      AggregationHandleAvg,
-//      AggregationStateFastHashTable>(
-//      distinctify_hash_table, aggregation_hash_table, index);
-//}
+AggregationHandleAvg::AggregationHandleAvg(const Type &argument_type) {
+  // We sum Int as Long and Float as Double so that we have more headroom when
+  // adding many values.
+  TypeID type_precision_id;
+  switch (argument_type.getTypeID()) {
+    case kInt:
+    case kLong:
+      type_precision_id = kLong;
+      break;
+    case kFloat:
+    case kDouble:
+      type_precision_id = kDouble;
+      break;
+    default:
+      type_precision_id = argument_type.getTypeID();
+      break;
+  }
+
+  const Type &sum_type = TypeFactory::GetType(type_precision_id);
+  const Type &count_type = TypeFactory::GetType(CountType::kStaticTypeID);
+
+  const std::size_t sum_state_size = sum_type.maximumByteLength();
+  count_offset_ = sum_state_size;
+  state_size_ = sum_state_size + sizeof(CountCppType);
+
+  blank_state_.reset(state_size_, false);
+  sum_type.makeZeroValue(blank_state_.get());
+  count_type.makeZeroValue(static_cast<char *>(blank_state_.get()) + 
sum_state_size);
+
+  tv_blank_sum_ = sum_type.makeZeroValue();
+
+  // Make operators to do arithmetic:
+  // Add operator for summing argument values.
+  accumulate_add_operator_.reset(
+      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+          .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type));
+  const auto accumulate_add_functor = 
accumulate_add_operator_->getMergeFunctor();
+  accumulate_functor_ = [sum_state_size, accumulate_add_functor](
+      void *state, const void *value) {
+    accumulate_add_functor(state, value);
+    void *count_ptr = static_cast<char *>(state) + sum_state_size;
+    ++(*static_cast<CountCppType *>(count_ptr));
+  };
+
+  // Add operator for merging states.
+  merge_add_operator_.reset(
+      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+          .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type));
+  const auto merge_add_functor = merge_add_operator_->getMergeFunctor();
+  merge_functor_ = [sum_state_size, merge_add_functor](
+      void *destination_state, const void *source_state) {
+    merge_add_functor(destination_state, source_state);
+    void *destination_count_ptr =
+        static_cast<char *>(destination_state) + sum_state_size;
+    const void *source_count_ptr =
+        static_cast<const char *>(source_state) + sum_state_size;
+    *static_cast<CountCppType *>(destination_count_ptr) +=
+        *static_cast<const CountCppType *>(source_count_ptr);
+  };
+
+  // Divide operator for dividing sum by count to get final average.
+  divide_operator_.reset(
+      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+          .makeUncheckedBinaryOperatorForTypes(sum_type,
+                                               TypeFactory::GetType(kDouble)));
+  const auto divide_functor = divide_operator_->getFunctor();
+  finalize_functor_ = [sum_state_size, divide_functor](
+      void *result, const void *state) {
+    const void *count_ptr = static_cast<const char *>(state) + sum_state_size;
+    const double count = *static_cast<const CountCppType *>(count_ptr);
+    divide_functor(result, state, &count);
+  };
+
+  result_type_ =
+      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+          .resultTypeForArgumentTypes(sum_type, TypeFactory::GetType(kDouble));
+}
+
+void AggregationHandleAvg::accumulateColumnVectors(
+    void *state,
+    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+  DCHECK_EQ(1u, column_vectors.size())
+      << "Got wrong number of ColumnVectors for AVG: " << 
column_vectors.size();
+
+  std::size_t count = 0;
+  TypedValue cv_sum = accumulate_add_operator_->accumulateColumnVector(
+      tv_blank_sum_, *column_vectors.front(), &count);
+  cv_sum.copyInto(state);
+  void *count_ptr = static_cast<char *>(state) + count_offset_;
+  *static_cast<CountCppType *>(count_ptr) = count;
+}
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+void AggregationHandleAvg::accumulateValueAccessor(
+    void *state,
+    ValueAccessor *accessor,
+    const std::vector<attribute_id> &accessor_ids) const {
+  DCHECK_EQ(1u, accessor_ids.size())
+      << "Got wrong number of attributes for AVG: " << accessor_ids.size();
+
+  std::size_t count = 0;
+  TypedValue cv_sum = accumulate_add_operator_->accumulateValueAccessor(
+      tv_blank_sum_, accessor, accessor_ids.front(), &count);
+  cv_sum.copyInto(state);
+  void *count_ptr = static_cast<char *>(state) + count_offset_;
+  *static_cast<CountCppType *>(count_ptr) = count;
+}
+#endif
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp 
b/expressions/aggregation/AggregationHandleAvg.hpp
index cc5adc8..416cf53 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "storage/HashTableBase.hpp"
+#include "types/LongType.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/operations/binary_operations/BinaryOperation.hpp"
@@ -52,6 +53,17 @@ class AggregationHandleAvg : public AggregationHandle {
  public:
   ~AggregationHandleAvg() override {}
 
+  void accumulateColumnVectors(
+      void *state,
+      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const 
override;
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+  void accumulateValueAccessor(
+      void *state,
+      ValueAccessor *accessor,
+      const std::vector<attribute_id> &accessor_ids) const override;
+#endif
+
  private:
   friend class AggregateFunctionAvg;
 
@@ -60,14 +72,19 @@ class AggregationHandleAvg : public AggregationHandle {
    *
    * @param type Type of the avg value.
    **/
-  explicit AggregationHandleAvg(const Type &type);
-
-//  const Type &argument_type_;
-//  const Type *result_type_;
-//  AggregationStateAvg blank_state_;
-//  std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
-//  std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_;
-//  std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
+  explicit AggregationHandleAvg(const Type &argument_type);
+
+  typedef LongType CountType;
+  typedef CountType::cpptype CountCppType;
+
+    // TODO: temporary
+  TypedValue tv_blank_sum_;
+
+  std::size_t count_offset_;
+
+  std::unique_ptr<UncheckedBinaryOperator> accumulate_add_operator_;
+  std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_;
+  std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp 
b/expressions/aggregation/AggregationHandleCount.cpp
index c095a82..12e2b86 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -49,7 +49,10 @@ class ValueAccessor;
 template <bool count_star, bool nullable_type>
 AggregationHandleCount<count_star, nullable_type>::AggregationHandleCount() {
   state_size_ = sizeof(ResultCppType);
-  blank_state_.reset(state_size_, true);
+  blank_state_.reset(state_size_, false);
+
+  result_type_ = &TypeFactory::GetType(ResultType::kStaticTypeID);
+  result_type_->makeZeroValue(blank_state_.get());
 
   accumulate_functor_ = [](void *state, const void *value) {
     *static_cast<ResultCppType *>(state) += 1;
@@ -65,7 +68,6 @@ AggregationHandleCount<count_star, 
nullable_type>::AggregationHandleCount() {
         *static_cast<const ResultCppType *>(state);
   };
 
-  result_type_ = &TypeFactory::GetType(ResultType::kStaticTypeID);
 }
 
 template <bool count_star, bool nullable_type>

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp 
b/expressions/aggregation/AggregationHandleSum.cpp
index 4e77ed0..61a0774 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -66,6 +66,7 @@ AggregationHandleSum::AggregationHandleSum(const Type 
&argument_type) {
   state_size_ = sum_type.maximumByteLength();
   blank_state_.reset(state_size_, false);
 
+  sum_type.makeZeroValue(blank_state_.get());
   tv_blank_state_ = sum_type.makeZeroValue();
 
   // Make operators to do arithmetic:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp 
b/storage/AggregationOperationState.cpp
index 50e7c06..89b7b72 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -83,15 +83,15 @@ AggregationOperationState::AggregationOperationState(
     group_by_types.emplace_back(&group_by_element->getType());
   }
 
-  std::vector<AggregationHandle *> group_by_handles;
-  group_by_handles.clear();
+  std::vector<AggregationHandle *> aggregation_handles;
+  aggregation_handles.clear();
 
   // Set up each individual aggregate in this operation.
   for (std::size_t i = 0; i < aggregate_functions.size(); ++i) {
     // Get the Types of this aggregate's arguments so that we can create an
     // AggregationHandle.
     std::vector<const Type *> argument_types;
-    for (const std::unique_ptr<const Scalar> &argument : arguments[i]) {
+    for (const std::unique_ptr<const Scalar> &argument : arguments_[i]) {
       argument_types.emplace_back(&argument->getType());
     }
 
@@ -107,9 +107,9 @@ AggregationOperationState::AggregationOperationState(
 
     if (!group_by_list_.empty()) {
       // TODO(jianqiao): handle DISTINCT aggregation.
-      // if (is_distinct[i]) {
+      // if (is_distinct_[i]) {
       // }
-      group_by_handles.emplace_back(handles_.back());
+      aggregation_handles.emplace_back(handles_.back());
     } else {
       // Aggregation without GROUP BY: create a single global state.
       single_states_.emplace_back(handles_.back()->createInitialState());
@@ -119,8 +119,8 @@ AggregationOperationState::AggregationOperationState(
       // relation. If so, remember the attribute IDs so that we can do copy
       // elision when actually performing the aggregation.
       std::vector<attribute_id> local_arguments_as_attributes;
-      local_arguments_as_attributes.reserve(arguments[i].size());
-      for (const std::unique_ptr<const Scalar> &argument : arguments[i]) {
+      local_arguments_as_attributes.reserve(arguments_[i].size());
+      for (const std::unique_ptr<const Scalar> &argument : arguments_[i]) {
         const attribute_id argument_id =
             argument->getAttributeIdForValueAccessor();
         if (argument_id == -1) {
@@ -139,12 +139,21 @@ AggregationOperationState::AggregationOperationState(
     }
   }
 
-  if (!group_by_handles.empty()) {
+  if (!group_by_list_.empty()) {
+    // TODO: handle non-fast-path case
+    for (const auto &group_by_attribute : group_by_list_) {
+      const attribute_id attr_id =
+          group_by_attribute->getAttributeIdForValueAccessor();
+      CHECK_NE(attr_id, kInvalidAttributeID);
+
+      group_by_attribute_ids_.emplace_back(attr_id);
+    }
+
     // Aggregation with GROUP BY: create a HashTable pool for per-group states.
     group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
                                                      hash_table_impl_type,
                                                      group_by_types,
-                                                     group_by_handles,
+                                                     aggregation_handles,
                                                      storage_manager));
   }
 }
@@ -349,24 +358,14 @@ void AggregationOperationState::aggregateBlockHashTable(
   BlockReference block(
       storage_manager_->getBlock(input_block, input_relation_));
 
-  // If there is a filter predicate, 'reuse_matches' holds the set of matching
-  // tuples so that it can be reused across multiple aggregates (i.e. we only
-  // pay the cost of evaluating the predicate once).
-  std::unique_ptr<TupleIdSequence> reuse_matches;
-
-  // This holds values of all the GROUP BY attributes so that the can be reused
-  // across multiple aggregates (i.e. we only pay the cost of evaluatin the
-  // GROUP BY expressions once).
-  std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
-
-  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    if (is_distinct_[agg_idx]) {
-      // Call StorageBlock::aggregateDistinct() to insert the GROUP BY 
expression
-      // values and the aggregation arguments together as keys directly into 
the
-      // (threadsafe) shared global distinctify HashTable for this aggregate.
-      // TODO(jianqiao): handle DISTINCT aggregation.
-    }
-  }
+//  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+//    if (is_distinct_[agg_idx]) {
+//      // Call StorageBlock::aggregateDistinct() to insert the GROUP BY 
expression
+//      // values and the aggregation arguments together as keys directly into 
the
+//      // (threadsafe) shared global distinctify HashTable for this aggregate.
+//      // TODO(jianqiao): handle DISTINCT aggregation.
+//    }
+//  }
 
   // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
   // directly into the (threadsafe) shared global HashTable for this
@@ -376,11 +375,10 @@ void AggregationOperationState::aggregateBlockHashTable(
   DCHECK(agg_hash_table != nullptr);
 
   block->aggregateGroupBy(arguments_,
-                          group_by_list_,
+                          group_by_attribute_ids_,
                           predicate_.get(),
-                          agg_hash_table,
-                          &reuse_matches,
-                          &reuse_group_by_vectors);
+                          agg_hash_table);
+
   group_by_hashtable_pool_->returnHashTable(agg_hash_table);
 }
 
@@ -437,6 +435,9 @@ void AggregationOperationState::finalizeHashTable(
     mergeGroupByHashTables(final_hash_table.get(), hash_table.get());
   }
 
+//  static_cast<ThreadPrivateAggregationStateHashTable *>(
+//      final_hash_table.get())->print();
+
   // Bulk-insert the complete result.
   std::unique_ptr<AggregationResultIterator> results(
       final_hash_table->createResultIterator());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp 
b/storage/AggregationOperationState.hpp
index 9fa3bd2..869f391 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -191,6 +191,7 @@ class AggregationOperationState {
   const CatalogRelationSchema &input_relation_;
   std::unique_ptr<const Predicate> predicate_;
   std::vector<std::unique_ptr<const Scalar>> group_by_list_;
+  std::vector<attribute_id> group_by_attribute_ids_;
 
   // Each individual aggregate in this operation has an AggregationHandle and
   // some number of Scalar arguments.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/AggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationStateHashTable.hpp 
b/storage/AggregationStateHashTable.hpp
index 85a6bdc..ee69725 100644
--- a/storage/AggregationStateHashTable.hpp
+++ b/storage/AggregationStateHashTable.hpp
@@ -46,6 +46,7 @@
 #include "threading/SpinSharedMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypeFunctors.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "utility/Alignment.hpp"
 #include "utility/InlineMemcpy.hpp"
 #include "utility/Macros.hpp"
@@ -112,65 +113,119 @@ class ThreadPrivateAggregationStateHashTable : public 
AggregationStateHashTableB
   }
 
   bool upsertValueAccessor(ValueAccessor *accessor,
+                           ColumnVectorsValueAccessor *temp_accessor,
                            const attribute_id key_attr_id,
                            const std::vector<attribute_id> &argument_ids) 
override {
-    if (key_manager_.isKeyNullable()) {
-      return upsertValueAccessorInternal<true>(
-          accessor, key_attr_id, argument_ids);
+//    if (key_manager_.isKeyNullable()) {
+//      return upsertValueAccessorInternal<true>(
+//          accessor, key_attr_id, argument_ids);
+//    } else {
+//      return upsertValueAccessorInternal<false>(
+//          accessor, key_attr_id, argument_ids);
+//    }
+    return true;
+  }
+//
+//  template <bool check_for_null_keys>
+//  bool upsertValueAccessorInternal(ValueAccessor *accessor,
+//                                   const attribute_id key_attr_id,
+//                                   const std::vector<attribute_id> 
&argument_ids) {
+//    return InvokeOnAnyValueAccessor(
+//        accessor,
+//        [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
+//      accessor->beginIteration();
+//      while (accessor->next()) {
+//        const void *key = accessor->template 
getUntypedValue<check_for_null_keys>(key_attr_id);
+//        if (check_for_null_keys && key == nullptr) {
+//          continue;
+//        }
+//        void *bucket = locateBucket(key);
+//        payload_manager_.template updateStates<check_for_null_keys>(
+//            bucket, accessor, argument_ids);
+//      }
+//      return true;
+//    });
+//  }
+
+  bool upsertValueAccessorCompositeKey(ValueAccessor *accessor,
+                                       ColumnVectorsValueAccessor 
*temp_accessor,
+                                       const std::vector<attribute_id> 
&key_attr_ids,
+                                       const std::vector<attribute_id> 
&argument_ids) override {
+//    if (key_attr_ids.size() == 1) {
+//      upsertValueAccessor(accessor,
+//                          key_attr_ids.front(),
+//                          argument_ids);
+//    }
+
+    if (temp_accessor == nullptr) {
+      if (key_manager_.isKeyNullable()) {
+        return upsertValueAccessorCompositeKeyInternal<true>(
+            accessor, key_attr_ids, argument_ids);
+      } else {
+        return upsertValueAccessorCompositeKeyInternal<false>(
+            accessor, key_attr_ids, argument_ids);
+      }
     } else {
-      return upsertValueAccessorInternal<false>(
-          accessor, key_attr_id, argument_ids);
+      if (key_manager_.isKeyNullable()) {
+        return upsertValueAccessorCompositeKeyInternal<true>(
+            accessor, temp_accessor, key_attr_ids, argument_ids);
+      } else {
+        return upsertValueAccessorCompositeKeyInternal<false>(
+            accessor, temp_accessor, key_attr_ids, argument_ids);
+      }
     }
   }
 
   template <bool check_for_null_keys>
-  bool upsertValueAccessorInternal(ValueAccessor *accessor,
-                                   const attribute_id key_attr_id,
-                                   const std::vector<attribute_id> 
&argument_ids) {
+  bool upsertValueAccessorCompositeKeyInternal(ValueAccessor *accessor,
+                                               const std::vector<attribute_id> 
&key_attr_ids,
+                                               const std::vector<attribute_id> 
&argument_ids) {
     return InvokeOnAnyValueAccessor(
         accessor,
         [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
       accessor->beginIteration();
+      void *prealloc_bucket = allocateBucket();
       while (accessor->next()) {
-        const void *key = accessor->template 
getUntypedValue<check_for_null_keys>(key_attr_id);
-        if (check_for_null_keys && key == nullptr) {
-          continue;
-        }
-        bool is_empty;
-        void *bucket = locateBucket(key, &is_empty);
-        if (is_empty) {
-          payload_manager_.initializeStates(bucket);
+        if (check_for_null_keys) {
+          const bool is_null =
+              key_manager_.writeNullableUntypedKeyFromValueAccessorToBucket(
+                  accessor,
+                  key_attr_ids,
+                  prealloc_bucket);
+          if (is_null) {
+            continue;
+          }
         } else {
-          payload_manager_.template updateStates<check_for_null_keys>(
-              bucket, accessor, argument_ids);
+          key_manager_.writeUntypedKeyFromValueAccessorToBucket(
+              accessor,
+              key_attr_ids,
+              prealloc_bucket);
         }
+        void *bucket = locateBucketWithPrealloc(&prealloc_bucket);
+        payload_manager_.updateStates(
+            bucket, accessor, argument_ids);
       }
+      // Reclaim the last unused bucket
+      --buckets_allocated_;
       return true;
     });
-  }
-
-  bool upsertValueAccessorCompositeKey(ValueAccessor *accessor,
-                                       const std::vector<attribute_id> 
&key_attr_ids,
-                                       const std::vector<attribute_id> 
&argument_ids) override {
-    if (key_manager_.isKeyNullable()) {
-      return upsertValueAccessorCompositeKeyInternal<true>(
-          accessor, key_attr_ids, argument_ids);
-    } else {
-      return upsertValueAccessorCompositeKeyInternal<false>(
-          accessor, key_attr_ids, argument_ids);
-    }
+    return true;
   }
 
   template <bool check_for_null_keys>
   bool upsertValueAccessorCompositeKeyInternal(ValueAccessor *accessor,
+                                               ColumnVectorsValueAccessor 
*temp_accessor,
                                                const std::vector<attribute_id> 
&key_attr_ids,
                                                const std::vector<attribute_id> 
&argument_ids) {
     return InvokeOnAnyValueAccessor(
         accessor,
         [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
       accessor->beginIteration();
+      temp_accessor->beginIteration();
       void *prealloc_bucket = allocateBucket();
       while (accessor->next()) {
+        temp_accessor->next();
+
         if (check_for_null_keys) {
           const bool is_null =
               key_manager_.writeNullableUntypedKeyFromValueAccessorToBucket(
@@ -186,30 +241,29 @@ class ThreadPrivateAggregationStateHashTable : public 
AggregationStateHashTableB
               key_attr_ids,
               prealloc_bucket);
         }
-        void *bucket = locateBucketWithPrealloc(prealloc_bucket);
-        if (bucket == prealloc_bucket) {
-          payload_manager_.initializeStates(bucket);
-          prealloc_bucket = allocateBucket();
-        } else {
-          payload_manager_.template updateStates<check_for_null_keys>(
-              bucket, accessor, argument_ids);
-        }
+
+        void *bucket = locateBucketWithPrealloc(&prealloc_bucket);
+        payload_manager_.updateStates(
+            bucket, accessor, temp_accessor, argument_ids);
       }
       // Reclaim the last unused bucket
       --buckets_allocated_;
       return true;
     });
+    return true;
   }
 
   void mergeHashTable(const ThreadPrivateAggregationStateHashTable 
*source_hash_table) {
     source_hash_table->forEachKeyAndStates(
         [&](const void *source_key, const void *source_states) -> void {
-          bool is_empty;
-          void *bucket = locateBucket(source_key, &is_empty);
-          if (is_empty) {
+          auto slot_it = slots_.find(source_key);
+          if (slot_it == slots_.end()) {
+            void *bucket = allocateBucket();
+            key_manager_.writeUntypedKeyToBucket(source_key, bucket);
             payload_manager_.copyStates(bucket, source_states);
+            slots_.emplace(key_manager_.getUntypedKeyComponent(bucket), 
bucket);
           } else {
-            payload_manager_.mergeStates(bucket, source_states);
+            payload_manager_.mergeStates(slot_it->second, source_states);
           }
         });
   }
@@ -233,26 +287,28 @@ class ThreadPrivateAggregationStateHashTable : public 
AggregationStateHashTableB
     return static_cast<char *>(buckets_) + bucket_id * bucket_size_;
   }
 
-  inline void* locateBucket(const void *key, bool *is_empty) {
+  inline void* locateBucket(const void *key) {
     auto slot_it = slots_.find(key);
     if (slot_it == slots_.end()) {
       void *bucket = allocateBucket();
       key_manager_.writeUntypedKeyToBucket(key, bucket);
+      payload_manager_.initializeStates(bucket);
       slots_.emplace(key_manager_.getUntypedKeyComponent(bucket), bucket);
-      *is_empty = true;
       return bucket;
     } else {
-      *is_empty = false;
       return slot_it->second;
     }
   }
 
-  inline void* locateBucketWithPrealloc(void *prealloc_bucket) {
-    const void *key = key_manager_.getUntypedKeyComponent(prealloc_bucket);
+  inline void* locateBucketWithPrealloc(void **prealloc_bucket) {
+    void *bucket = *prealloc_bucket;
+    const void *key = key_manager_.getUntypedKeyComponent(bucket);
     auto slot_it = slots_.find(key);
     if (slot_it == slots_.end()) {
-      slots_.emplace(key, prealloc_bucket);
-      return prealloc_bucket;
+      payload_manager_.initializeStates(bucket);
+      slots_.emplace(key, bucket);
+      *prealloc_bucket = allocateBucket();
+      return bucket;
     } else {
       return slot_it->second;
     }
@@ -295,7 +351,7 @@ class ThreadPrivateAggregationStateHashTable : public 
AggregationStateHashTableB
     std::cerr << "Buckets: \n";
     for (const auto &pair : slots_) {
       std::cerr << pair.first << " -- " << pair.second << "\n";
-      std::cerr << *static_cast<const int *>(pair.second) << "\n";
+      std::cerr << *static_cast<const std::uint64_t *>(pair.second) << "\n";
     }
   }
 
@@ -335,4 +391,3 @@ class ThreadPrivateAggregationStateHashTable : public 
AggregationStateHashTableB
 }  // namespace quickstep
 
 #endif  // QUICKSTEP_STORAGE_AGGREGATION_STATE_HASH_TABLE_HPP_
-

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/AggregationStateManager.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationStateManager.hpp 
b/storage/AggregationStateManager.hpp
index 98dca90..81a2db8 100644
--- a/storage/AggregationStateManager.hpp
+++ b/storage/AggregationStateManager.hpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "threading/SpinMutex.hpp"
 #include "threading/SpinSharedMutex.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "utility/InlineMemcpy.hpp"
 #include "utility/Macros.hpp"
 #include "utility/ScopedBuffer.hpp"
@@ -71,8 +72,7 @@ class AggregationStateManager {
       new(initial_states_.get()) Mutex;
     }
     for (std::size_t i = 0; i < handles_.size(); ++i) {
-      handles_[i]->initializeState(
-          static_cast<char *>(initial_states_.get()) + state_offsets_[i]);
+      handles_[i]->initializeState(getStateComponent(initial_states_.get(), 
i));
     }
   }
 
@@ -88,33 +88,39 @@ class AggregationStateManager {
     copyStates(states, initial_states_.get());
   }
 
-  template <bool check_for_null_keys, typename ValueAccessorT>
-  inline void updateState(void *states,
-                          ValueAccessorT *accessor,
-                          const attribute_id argument_id) const {
-    // TODO: templates on whether to check invalid attribute id
-    DCHECK_NE(argument_id, kInvalidAttributeID);
-
-    const void *value =
-        accessor->template getUntypedValue<check_for_null_keys>(argument_id);
-    if (check_for_null_keys && value == nullptr) {
-      return;
+  template <typename ValueAccessorT>
+  inline void updateStates(void *states,
+                           ValueAccessorT *accessor,
+                           const std::vector<attribute_id> &argument_ids) 
const {
+    for (std::size_t i = 0; i < argument_ids.size(); ++i) {
+      // TODO: templates on whether to check invalid attribute id
+      const void *value;
+      const attribute_id argument_id = argument_ids[i];
+      if (argument_id == kInvalidAttributeID) {
+        value = nullptr;
+      } else {
+        value = accessor->template getUntypedValue<false>(argument_id);
+       // TODO: check null
+      }
+      accumulate_functors_[i](getStateComponent(states, i), value);
     }
-    accumulate_functors_.front()(states, value);
   }
 
-  template <bool check_for_null_keys, typename ValueAccessorT>
+  template <typename ValueAccessorT>
   inline void updateStates(void *states,
                            ValueAccessorT *accessor,
+                           ColumnVectorsValueAccessor *temp_accessor,
                            const std::vector<attribute_id> &argument_ids) 
const {
     for (std::size_t i = 0; i < argument_ids.size(); ++i) {
       // TODO: templates on whether to check invalid attribute id
-      DCHECK_NE(argument_ids[i], kInvalidAttributeID);
-
-      const void *value =
-          accessor->template 
getUntypedValue<check_for_null_keys>(argument_ids[i]);
-      if (check_for_null_keys && value == nullptr) {
-        return;
+      const void *value = nullptr;
+      const attribute_id argument_id = argument_ids[i];
+      if (argument_id >= 0) {
+        value = accessor->template getUntypedValue<false>(argument_id);
+        // TODO: check null
+      } else if (argument_id != kInvalidAttributeID){
+        value = temp_accessor->template 
getUntypedValue<false>(-(argument_id+2));
+        // TODO: check null
       }
       accumulate_functors_[i](getStateComponent(states, i), value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index f823494..04e82a0 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -29,6 +29,7 @@
 namespace quickstep {
 
 class AggregationResultIterator;
+class ColumnVectorsValueAccessor;
 
 /** \addtogroup Storage
  *  @{
@@ -88,10 +89,12 @@ class AggregationStateHashTableBase {
   virtual ~AggregationStateHashTableBase() {}
 
   virtual bool upsertValueAccessor(ValueAccessor *accessor,
+                                   ColumnVectorsValueAccessor *temp_accessor,
                                    const attribute_id key_attr_id,
                                    const std::vector<attribute_id> 
&argument_ids) = 0;
 
   virtual bool upsertValueAccessorCompositeKey(ValueAccessor *accessor,
+                                               ColumnVectorsValueAccessor 
*temp_accessor,
                                                const std::vector<attribute_id> 
&key_attr_ids,
                                                const std::vector<attribute_id> 
&argument_ids) = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index abb17f1..ca4f897 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -431,12 +431,10 @@ ScopedBuffer StorageBlock::aggregate(
 
 void StorageBlock::aggregateGroupBy(
     const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
-    const std::vector<std::unique_ptr<const Scalar>> &group_by,
+    const std::vector<attribute_id> &group_by_attribute_ids,
     const Predicate *predicate,
-    AggregationStateHashTableBase *hash_table,
-    std::unique_ptr<TupleIdSequence> *reuse_matches,
-    std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
-  DCHECK_GT(group_by.size(), 0u)
+    AggregationStateHashTableBase *hash_table) const {
+  DCHECK_GT(group_by_attribute_ids.size(), 0u)
       << "Called aggregateGroupBy() with zero GROUP BY expressions";
 
   SubBlocksReference sub_blocks_ref(*tuple_store_,
@@ -446,68 +444,43 @@ void StorageBlock::aggregateGroupBy(
   // IDs of 'arguments' as attributes in the ValueAccessor we create below.
   std::vector<attribute_id> argument_ids;
 
-  // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
-  std::vector<attribute_id> key_ids;
-
-  // An intermediate ValueAccessor that stores the materialized 'arguments' for
-  // this aggregate, as well as the GROUP BY expression values.
+  std::unique_ptr<TupleIdSequence> matches;
+  std::unique_ptr<ValueAccessor> accessor;
   ColumnVectorsValueAccessor temp_result;
-  {
-    std::unique_ptr<ValueAccessor> accessor;
-    if (predicate) {
-      if (!*reuse_matches) {
-        // If there is a filter predicate that hasn't already been evaluated,
-        // evaluate it now and save the results for other aggregates on this
-        // same block.
-        reuse_matches->reset(getMatchesForPredicate(predicate));
-      }
 
-      // Create a filtered ValueAccessor that only iterates over predicate
-      // matches.
-      accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
-    } else {
-      // Create a ValueAccessor that iterates over all tuples in this block
-      accessor.reset(tuple_store_->createValueAccessor());
-    }
+  if (predicate) {
+    // Create a filtered ValueAccessor that only iterates over predicate
+    // matches.
+    matches.reset(getMatchesForPredicate(predicate));
+    accessor.reset(tuple_store_->createValueAccessor(matches.get()));
+  } else {
+    // Create a ValueAccessor that iterates over all tuples in this block
+    accessor.reset(tuple_store_->createValueAccessor());
+  }
 
-    attribute_id attr_id = 0;
-
-    // First, put GROUP BY keys into 'temp_result'.
-    if (reuse_group_by_vectors->empty()) {
-      // Compute GROUP BY values from group_by Scalars, and store them in
-      // reuse_group_by_vectors for reuse by other aggregates on this same
-      // block.
-      reuse_group_by_vectors->reserve(group_by.size());
-      for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
-        reuse_group_by_vectors->emplace_back(
-            group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
-        temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
-        key_ids.push_back(attr_id++);
-      }
+  attribute_id temp_result_attribute_id = 0;
+  for (const auto &argument_vec :  arguments) {
+    CHECK_LE(argument_vec.size(), 1);
+
+    if (argument_vec.size() == 0) {
+      argument_ids.emplace_back(kInvalidAttributeID);
     } else {
-      // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
-      DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
-          << "Wrong number of reuse_group_by_vectors";
-      for (const std::unique_ptr<ColumnVector> &reuse_cv : 
*reuse_group_by_vectors) {
-        temp_result.addColumn(reuse_cv.get(), false);
-        key_ids.push_back(attr_id++);
+      const auto &argument = argument_vec.front();
+      const attribute_id argument_id =
+          argument->getAttributeIdForValueAccessor();
+      if (argument_id != kInvalidAttributeID) {
+        argument_ids.emplace_back(argument_id);
+      } else {
+        temp_result.addColumn(argument->getAllValues(accessor.get(), 
&sub_blocks_ref));
+        argument_ids.push_back(-(temp_result_attribute_id + 2));
+        ++temp_result_attribute_id;
       }
     }
-
-    // Compute argument vectors and add them to 'temp_result'.
-    for (const std::vector<std::unique_ptr<const Scalar>> &argument : 
arguments) {
-        for (const std::unique_ptr<const Scalar> &args : argument) {
-          temp_result.addColumn(args->getAllValues(accessor.get(), 
&sub_blocks_ref));
-          argument_ids.push_back(attr_id++);
-        }
-        if (argument.empty()) {
-          argument_ids.push_back(kInvalidAttributeID);
-        }
-     }
   }
 
-  hash_table->upsertValueAccessorCompositeKey(&temp_result,
-                                              key_ids,
+  hash_table->upsertValueAccessorCompositeKey(accessor.get(),
+                                              temp_result_attribute_id == 0 ? 
nullptr : &temp_result,
+                                              group_by_attribute_ids,
                                               argument_ids);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 08d81d0..a4e8448 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -462,11 +462,9 @@ class StorageBlock : public StorageBlockBase {
    */
   void aggregateGroupBy(
       const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
-      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const std::vector<attribute_id> &group_by_attribute_ids,
       const Predicate *predicate,
-      AggregationStateHashTableBase *hash_table,
-      std::unique_ptr<TupleIdSequence> *reuse_matches,
-      std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) 
const;
+      AggregationStateHashTableBase *hash_table) const;
 
   /**
    * @brief Perform an UPDATE query over the tuples in this StorageBlock.


Reply via email to