Fixed 4 failures on unit tests

Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1b7f098d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1b7f098d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1b7f098d

Branch: refs/heads/partitioned-aggregation
Commit: 1b7f098d789933e8a0be099860ec575e5aec4dcc
Parents: 539cb0a
Author: rathijit <rathi...@node-2.aggregation.quickstep-pg0.wisc.cloudlab.us>
Authored: Fri Aug 5 06:00:12 2016 -0500
Committer: Harshad Deshmukh <hbdeshm...@apache.org>
Committed: Fri Sep 9 10:50:25 2016 -0500

----------------------------------------------------------------------
 .../aggregation/AggregationConcreteHandle.cpp   | 14 +++---
 .../aggregation/AggregationConcreteHandle.hpp   | 41 ++++++++++++++--
 expressions/aggregation/AggregationHandle.hpp   |  6 ++-
 .../aggregation/AggregationHandleAvg.cpp        | 14 +++---
 .../aggregation/AggregationHandleAvg.hpp        | 15 +++++-
 .../aggregation/AggregationHandleCount.cpp      |  7 +--
 .../aggregation/AggregationHandleCount.hpp      | 19 ++++++--
 .../aggregation/AggregationHandleDistinct.cpp   |  2 +-
 .../aggregation/AggregationHandleDistinct.hpp   |  2 +-
 .../aggregation/AggregationHandleMax.cpp        | 14 +++---
 .../aggregation/AggregationHandleMax.hpp        | 13 ++++-
 .../aggregation/AggregationHandleMin.cpp        | 14 +++---
 .../aggregation/AggregationHandleMin.hpp        | 15 +++++-
 .../aggregation/AggregationHandleSum.cpp        | 15 +++---
 .../aggregation/AggregationHandleSum.hpp        | 15 +++++-
 storage/AggregationOperationState.cpp           | 51 +++++++++++---------
 storage/CMakeLists.txt                          |  1 -
 storage/FastHashTable.hpp                       | 41 +++++++++++++++-
 storage/FastSeparateChainingHashTable.hpp       | 16 +++---
 19 files changed, 221 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp 
b/expressions/aggregation/AggregationConcreteHandle.cpp
index 1efe010..ac5148b 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -52,17 +52,17 @@ void 
AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
     AggregationStateHashTableBase *distinctify_hash_table) const {
   // If the key-value pair is already there, we don't need to update the value,
   // which should always be "true". I.e. the value is just a placeholder.
-//  const auto noop_upserter = [](const auto &accessor, const bool *value) -> 
void {};
+  //  const auto noop_upserter = [](const auto &accessor, const bool *value) 
-> void {};
 
   AggregationStateFastHashTable *hash_table =
       static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
   if (key_ids.size() == 1) {
-// TODO(rathijit): fix
-//    hash_table->upsertValueAccessor(accessor,
-//                                    key_ids[0],
-//                                    true /* check_for_null_keys */,
-//                                    true /* initial_value */,
-//                                    &noop_upserter);
+    std::vector<std::vector<attribute_id>> args;
+    args.emplace_back(key_ids);
+    hash_table->upsertValueAccessorFast(args,
+                                    accessor,
+                                    key_ids[0],
+                                    true /* check_for_null_keys */);
   } else {
     std::vector<std::vector<attribute_id>> empty_args;
     empty_args.resize(1);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp 
b/expressions/aggregation/AggregationConcreteHandle.hpp
index d332ec9..609937a 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -27,6 +27,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "storage/HashTable.hpp"
+#include "storage/FastHashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
@@ -278,6 +279,11 @@ class AggregationConcreteHandle : public AggregationHandle 
{
       const AggregationStateHashTableBase &distinctify_hash_table) const;
 
   template <typename HandleT,
+            typename StateT>
+  StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+      const AggregationStateHashTableBase &distinctify_hash_table) const;
+
+  template <typename HandleT,
             typename StateT,
             typename HashTableT>
   void aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
@@ -289,7 +295,7 @@ class AggregationConcreteHandle : public AggregationHandle {
             typename HashTableT>
   void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *hash_table) const;
+      AggregationStateHashTableBase *hash_table, int index) const;
 
 
   template <typename HandleT,
@@ -494,6 +500,31 @@ StateT* 
AggregationConcreteHandle::aggregateOnDistinctifyHashTableForSingleUnary
 }
 
 template <typename HandleT,
+          typename StateT>
+StateT* 
AggregationConcreteHandle::aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+    const AggregationStateHashTableBase &distinctify_hash_table) const {
+  const HandleT& handle = static_cast<const HandleT&>(*this);
+  StateT *state = static_cast<StateT*>(createInitialState());
+
+  // A lambda function which will be called on each key from the distinctify
+  // hash table.
+  const auto aggregate_functor = [&handle, &state](const TypedValue &key,
+                                                   const std::uint8_t 
&dumb_placeholder) {
+    // For each (unary) key in the distinctify hash table, aggregate the key
+    // into "state".
+    handle.iterateUnaryInl(state, key);
+  };
+
+  const AggregationStateFastHashTable &hash_table =
+      static_cast<const AggregationStateFastHashTable 
&>(distinctify_hash_table);
+  // Invoke the lambda function "aggregate_functor" on each key from the 
distinctify
+  // hash table.
+  hash_table.forEach(&aggregate_functor);
+
+  return state;
+}
+
+template <typename HandleT,
           typename StateT,
           typename HashTableT>
 void 
AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
@@ -534,13 +565,13 @@ template <typename HandleT,
           typename HashTableT>
 void 
AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table) const {
+    AggregationStateHashTableBase *aggregation_hash_table, int index) const {
   const HandleT& handle = static_cast<const HandleT&>(*this);
   HashTableT *target_hash_table = 
static_cast<HashTableT*>(aggregation_hash_table);
 
   // A lambda function which will be called on each key-value pair from the
   // distinctify hash table.
-  const auto aggregate_functor = [&handle, &target_hash_table](
+  const auto aggregate_functor = [&handle, &target_hash_table, &index](
       std::vector<TypedValue> &key,
       const bool &dumb_placeholder) {
     // For each (composite) key vector in the distinctify hash table with size 
N.
@@ -552,10 +583,10 @@ void 
AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHe
     // An upserter as lambda function for aggregating the argument into its
     // GROUP BY group's entry inside aggregation_hash_table.
     const auto upserter = [&handle, &argument](std::uint8_t *state) {
-      handle.iterateUnaryInlFast(argument, state+sizeof(SpinMutex));
+      handle.iterateUnaryInlFast(argument, state);
     };
 
-    target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter);
+    target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter, index);
   };
 
   const HashTableT &source_hash_table =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp 
b/expressions/aggregation/AggregationHandle.hpp
index 92cd6a7..7c9e544 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -347,7 +347,7 @@ class AggregationHandle {
    */
   virtual void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table) const = 0;
+      AggregationStateHashTableBase *aggregation_hash_table, int index) const 
= 0;
 
   /**
    * @brief Merge two GROUP BY hash tables in one.
@@ -362,11 +362,13 @@ class AggregationHandle {
       const AggregationStateHashTableBase &source_hash_table,
       AggregationStateHashTableBase *destination_hash_table) const = 0;
 
-  virtual size_t getPayloadSize() const {return 8;}
+  virtual size_t getPayloadSize() const {return 1;}
   virtual void setPayloadOffset(std::size_t) {}
   virtual void iterateInlFast(const std::vector<TypedValue> &arguments, 
uint8_t *byte_ptr) {}
   virtual void mergeStatesFast(const uint8_t *src, uint8_t *dst) const {}
   virtual void initPayload(uint8_t *byte_ptr) {}
+  virtual void BlockUpdate() {}
+  virtual void AllowUpdate() {}
 
  protected:
   AggregationHandle() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp 
b/expressions/aggregation/AggregationHandleAvg.cpp
index f38c628..383a81f 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -42,7 +42,7 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleAvg::AggregationHandleAvg(const Type &type)
-    : argument_type_(type) {
+    : argument_type_(type), block_update(false) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
   TypeID type_precision_id;
@@ -206,7 +206,7 @@ ColumnVector* AggregationHandleAvg::finalizeHashTable(
 
 AggregationState* 
AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
       AggregationHandleAvg,
       AggregationStateAvg>(
           distinctify_hash_table);
@@ -214,14 +214,12 @@ AggregationState* 
AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle
 
 void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+    AggregationStateHashTableBase *aggregation_hash_table, int index) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
       AggregationHandleAvg,
-      AggregationStateAvg,
-      AggregationStateHashTable<AggregationStateAvg>>(
+      AggregationStateFastHashTable>(
           distinctify_hash_table,
-          blank_state_,
-          aggregation_hash_table);
+          aggregation_hash_table, index);
 }
 
 void AggregationHandleAvg::mergeGroupByHashTables(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp 
b/expressions/aggregation/AggregationHandleAvg.hpp
index e187d22..15835e0 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -123,7 +123,7 @@ class AggregationHandleAvg : public 
AggregationConcreteHandle {
     ++state->count_;
   }
 
-  inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) {
+  inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) 
const {
     DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
     if (value.isNull()) return;
     TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + 
blank_state_.sum_offset);
@@ -133,9 +133,18 @@ class AggregationHandleAvg : public 
AggregationConcreteHandle {
   }
 
   inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t 
*byte_ptr) override {
+     if (block_update) return;
      iterateUnaryInlFast(arguments.front(), byte_ptr);
   }
 
+  void BlockUpdate() override {
+      block_update = true;
+  }
+
+  void AllowUpdate() override {
+      block_update = false;
+  }
+
   void initPayload(uint8_t *byte_ptr) override {
     TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + 
blank_state_.sum_offset);
     std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr + 
blank_state_.count_offset);
@@ -208,7 +217,7 @@ class AggregationHandleAvg : public 
AggregationConcreteHandle {
    */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table) const override;
+      AggregationStateHashTableBase *aggregation_hash_table, int index) const 
override;
 
   void mergeGroupByHashTables(
       const AggregationStateHashTableBase &source_hash_table,
@@ -235,6 +244,8 @@ class AggregationHandleAvg : public 
AggregationConcreteHandle {
   std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_;
   std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
 
+  bool block_update;
+
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp 
b/expressions/aggregation/AggregationHandleCount.cpp
index f1eadf1..3a333ef 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -196,7 +196,7 @@ AggregationState* AggregationHandleCount<count_star, 
nullable_type>
     ::aggregateOnDistinctifyHashTableForSingle(
         const AggregationStateHashTableBase &distinctify_hash_table) const {
   DCHECK_EQ(count_star, false);
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
       AggregationHandleCount<count_star, nullable_type>,
       AggregationStateCount>(
           distinctify_hash_table);
@@ -206,13 +206,14 @@ template <bool count_star, bool nullable_type>
 void AggregationHandleCount<count_star, nullable_type>
     ::aggregateOnDistinctifyHashTableForGroupBy(
         const AggregationStateHashTableBase &distinctify_hash_table,
-        AggregationStateHashTableBase *aggregation_hash_table) const {
+        AggregationStateHashTableBase *aggregation_hash_table, int index) 
const {
   DCHECK_EQ(count_star, false);
   aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
       AggregationHandleCount<count_star, nullable_type>,
       AggregationStateFastHashTable>(
           distinctify_hash_table,
-          aggregation_hash_table);
+          aggregation_hash_table,
+          index);
 }
 
 template <bool count_star, bool nullable_type>

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp 
b/expressions/aggregation/AggregationHandleCount.hpp
index ed21c41..a95cae5 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -135,15 +135,24 @@ class AggregationHandleCount : public 
AggregationConcreteHandle {
   }
 
   inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t 
*byte_ptr) override {
+     if (block_update) return;
      if (arguments.size())
          iterateUnaryInlFast(arguments.front(), byte_ptr);
      else
          iterateNullaryInlFast(byte_ptr);
   }
 
+  void BlockUpdate() override {
+     block_update = true;
+  }
+
+  void AllowUpdate() override {
+     block_update = false;
+  }
+
   void initPayload(uint8_t *byte_ptr) override {
-    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
-    *count_ptr = 0;
+     std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+     *count_ptr = 0;
   }
 
   AggregationState* accumulateNullary(const std::size_t num_tuples) const 
override {
@@ -208,7 +217,7 @@ class AggregationHandleCount : public 
AggregationConcreteHandle {
    */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table) const override;
+      AggregationStateHashTableBase *aggregation_hash_table, int index) const 
override;
 
   void mergeGroupByHashTables(
       const AggregationStateHashTableBase &source_hash_table,
@@ -224,9 +233,11 @@ class AggregationHandleCount : public 
AggregationConcreteHandle {
   /**
    * @brief Constructor.
    **/
-  AggregationHandleCount() {
+  AggregationHandleCount() : block_update(false) {
   }
 
+  bool block_update;
+
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleCount);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp 
b/expressions/aggregation/AggregationHandleDistinct.cpp
index 2b9391a..a5fc095 100644
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ b/expressions/aggregation/AggregationHandleDistinct.cpp
@@ -72,7 +72,7 @@ ColumnVector* AggregationHandleDistinct::finalizeHashTable(
                                                const bool &dumb_placeholder) 
-> void {
     group_by_keys->emplace_back(std::move(group_by_key));
   };
-  static_cast<const 
AggregationStateHashTable<bool>&>(hash_table).forEachCompositeKey(&keys_retriever);
+  static_cast<const 
AggregationStateFastHashTable&>(hash_table).forEachCompositeKey(&keys_retriever);
 
   return nullptr;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp 
b/expressions/aggregation/AggregationHandleDistinct.hpp
index 0a3acb3..f6ef0c7 100644
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ b/expressions/aggregation/AggregationHandleDistinct.hpp
@@ -90,7 +90,7 @@ class AggregationHandleDistinct : public 
AggregationConcreteHandle {
 
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *groupby_hash_table) const override {
+      AggregationStateHashTableBase *groupby_hash_table, int index) const 
override {
     LOG(FATAL) << "AggregationHandleDistinct does not support "
                << "aggregateOnDistinctifyHashTableForGroupBy().";
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp 
b/expressions/aggregation/AggregationHandleMax.cpp
index 2080a03..c11fcc8 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -39,7 +39,7 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleMax::AggregationHandleMax(const Type &type)
-    : type_(type) {
+    : type_(type), block_update(false) {
   
fast_comparator_.reset(ComparisonFactory::GetComparison(ComparisonID::kGreater)
                          .makeUncheckedComparatorForTypes(type,
                                                           
type.getNonNullableVersion()));
@@ -135,7 +135,7 @@ ColumnVector* AggregationHandleMax::finalizeHashTable(
 
 AggregationState* 
AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
       AggregationHandleMax,
       AggregationStateMax>(
           distinctify_hash_table);
@@ -143,14 +143,12 @@ AggregationState* 
AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle
 
 void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+    AggregationStateHashTableBase *aggregation_hash_table, int index) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
       AggregationHandleMax,
-      AggregationStateMax,
-      AggregationStateHashTable<AggregationStateMax>>(
+      AggregationStateFastHashTable>(
           distinctify_hash_table,
-          AggregationStateMax(type_),
-          aggregation_hash_table);
+          aggregation_hash_table, index);
 }
 
 void AggregationHandleMax::mergeGroupByHashTables(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp 
b/expressions/aggregation/AggregationHandleMax.hpp
index 3c06fc4..82d6ebb 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -113,9 +113,18 @@ class AggregationHandleMax : public 
AggregationConcreteHandle {
   }
 
   inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t 
*byte_ptr) override {
+    if (block_update) return;
     iterateUnaryInlFast(arguments.front(), byte_ptr);
   }
 
+  void BlockUpdate() override {
+      block_update = true;
+  }
+
+  void AllowUpdate() override {
+      block_update = false;
+  }
+
   void initPayload(uint8_t *byte_ptr) override {
     TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
     TypedValue t1 = (type_.getNullableVersion().makeNullValue());
@@ -175,7 +184,7 @@ class AggregationHandleMax : public 
AggregationConcreteHandle {
    */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table) const override;
+      AggregationStateHashTableBase *aggregation_hash_table, int index) const 
override;
 
   void mergeGroupByHashTables(
       const AggregationStateHashTableBase &source_hash_table,
@@ -221,6 +230,8 @@ class AggregationHandleMax : public 
AggregationConcreteHandle {
   const Type &type_;
   std::unique_ptr<UncheckedComparator> fast_comparator_;
 
+  bool block_update;
+
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleMax);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp 
b/expressions/aggregation/AggregationHandleMin.cpp
index 9d5be72..70d6c1b 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -39,7 +39,7 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleMin::AggregationHandleMin(const Type &type)
-    : type_(type) {
+    : type_(type), block_update(false) {
   fast_comparator_.reset(ComparisonFactory::GetComparison(ComparisonID::kLess)
                          .makeUncheckedComparatorForTypes(type,
                                                           
type.getNonNullableVersion()));
@@ -136,7 +136,7 @@ ColumnVector* AggregationHandleMin::finalizeHashTable(
 
 AggregationState* 
AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
       AggregationHandleMin,
       AggregationStateMin>(
           distinctify_hash_table);
@@ -144,14 +144,12 @@ AggregationState* 
AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle
 
 void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+    AggregationStateHashTableBase *aggregation_hash_table, int index) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
       AggregationHandleMin,
-      AggregationStateMin,
-      AggregationStateHashTable<AggregationStateMin>>(
+      AggregationStateFastHashTable>(
           distinctify_hash_table,
-          AggregationStateMin(type_),
-          aggregation_hash_table);
+          aggregation_hash_table, index);
 }
 
 void AggregationHandleMin::mergeGroupByHashTables(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp 
b/expressions/aggregation/AggregationHandleMin.hpp
index 6329cd7..0f5e3a1 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -110,16 +110,25 @@ class AggregationHandleMin : public 
AggregationConcreteHandle {
     compareAndUpdate(state, value);
   }
 
-  inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) {
+  inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) 
const {
       DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
       TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
       compareAndUpdateFast(min_ptr, value);
   }
 
   inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t 
*byte_ptr) override {
+    if (block_update) return;
     iterateUnaryInlFast(arguments.front(), byte_ptr);
   }
 
+  void BlockUpdate() override {
+      block_update = true;
+  }
+
+  void AllowUpdate() override {
+      block_update = false;
+  }
+
   void initPayload(uint8_t *byte_ptr) override {
     TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
     TypedValue t1 = (type_.getNullableVersion().makeNullValue());
@@ -178,7 +187,7 @@ class AggregationHandleMin : public 
AggregationConcreteHandle {
    */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table) const override;
+      AggregationStateHashTableBase *aggregation_hash_table, int index) const 
override;
 
   void mergeGroupByHashTables(
       const AggregationStateHashTableBase &source_hash_table,
@@ -223,6 +232,8 @@ class AggregationHandleMin : public 
AggregationConcreteHandle {
   const Type &type_;
   std::unique_ptr<UncheckedComparator> fast_comparator_;
 
+  bool block_update;
+
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleMin);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp 
b/expressions/aggregation/AggregationHandleSum.cpp
index 7a16605..534db30 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -43,7 +43,7 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleSum::AggregationHandleSum(const Type &type)
-    : argument_type_(type) {
+    : argument_type_(type), block_update(false) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
   TypeID type_precision_id;
@@ -184,7 +184,7 @@ ColumnVector* AggregationHandleSum::finalizeHashTable(
 
 AggregationState* 
AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
       AggregationHandleSum,
       AggregationStateSum>(
           distinctify_hash_table);
@@ -192,14 +192,13 @@ AggregationState* 
AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle
 
 void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+    AggregationStateHashTableBase *aggregation_hash_table, int index) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
       AggregationHandleSum,
-      AggregationStateSum,
-      AggregationStateHashTable<AggregationStateSum>>(
+      AggregationStateFastHashTable>(
           distinctify_hash_table,
-          blank_state_,
-          aggregation_hash_table);
+          aggregation_hash_table,
+          index);
 }
 
 void AggregationHandleSum::mergeGroupByHashTables(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp 
b/expressions/aggregation/AggregationHandleSum.hpp
index 79f8331..3a2252d 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -118,7 +118,7 @@ class AggregationHandleSum : public 
AggregationConcreteHandle {
     state->null_ = false;
   }
 
-  inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) {
+  inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) 
const {
     DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
     if (value.isNull()) return;
     TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + 
blank_state_.sum_offset);
@@ -128,9 +128,18 @@ class AggregationHandleSum : public 
AggregationConcreteHandle {
   }
 
   inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t 
*byte_ptr) override {
+     if (block_update) return;
      iterateUnaryInlFast(arguments.front(), byte_ptr);
   }
 
+  void BlockUpdate() override {
+      block_update = true;
+  }
+
+  void AllowUpdate() override {
+      block_update = false;
+  }
+
   void initPayload(uint8_t *byte_ptr) override {
     TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + 
blank_state_.sum_offset);
     bool *null_ptr = reinterpret_cast<bool *>(byte_ptr + 
blank_state_.null_offset);
@@ -188,7 +197,7 @@ class AggregationHandleSum : public 
AggregationConcreteHandle {
    */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table) const override;
+      AggregationStateHashTableBase *aggregation_hash_table, int index) const 
override;
 
   void mergeGroupByHashTables(
       const AggregationStateHashTableBase &source_hash_table,
@@ -214,6 +223,8 @@ class AggregationHandleSum : public 
AggregationConcreteHandle {
   std::unique_ptr<UncheckedBinaryOperator> fast_operator_;
   std::unique_ptr<UncheckedBinaryOperator> merge_operator_;
 
+  bool block_update;
+
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp 
b/storage/AggregationOperationState.cpp
index 7d6d179..833b707 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -105,7 +105,8 @@ AggregationOperationState::AggregationOperationState(
         new HashTablePool(estimated_num_entries,
                           hash_table_impl_type,
                           group_by_types,
-                          handles_.back(),
+                          {1},
+                          handles_,
                           storage_manager)));
   } else {
     // Set up each individual aggregate in this operation.
@@ -142,8 +143,11 @@ AggregationOperationState::AggregationOperationState(
                               group_by_types,
                               handles_.back().get(),
                               storage_manager)));*/
+         if (*is_distinct_it) {
+            handles_.back()->BlockUpdate();
+         }
          group_by_handles.emplace_back(handles_.back());
-         payload_sizes.emplace_back(handles_.back()->getPayloadSize());
+         payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
 
       } else {
         // Aggregation without GROUP BY: create a single global state.
@@ -186,26 +190,26 @@ AggregationOperationState::AggregationOperationState(
                 estimated_num_entries,
                 storage_manager));*/
 
-std::vector<AggregationHandle *> local;
-local.emplace_back(handles_.back());
+        std::vector<AggregationHandle *> local;
+        // local.emplace_back(handles_.back());
+        local.clear();
         distinctify_hashtables_.emplace_back(
-AggregationStateFastHashTableFactory::CreateResizable(
+        AggregationStateFastHashTableFactory::CreateResizable(
                 *distinctify_hash_table_impl_types_it,
                 key_types,
                 estimated_num_entries,
                 {0},
                 local,
                 storage_manager));
-
         ++distinctify_hash_table_impl_types_it;
       } else {
         distinctify_hashtables_.emplace_back(nullptr);
       }
     }
 
-      if (!group_by_handles.empty()) {
-        // Aggregation with GROUP BY: create a HashTable pool for per-group 
states.
-        group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+    if (!group_by_handles.empty()) {
+      // Aggregation with GROUP BY: create a HashTable pool for per-group 
states.
+      group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
             new HashTablePool(estimated_num_entries,
                               hash_table_impl_type,
                               group_by_types,
@@ -213,7 +217,7 @@ AggregationStateFastHashTableFactory::CreateResizable(
                               group_by_handles,
                               storage_manager)));
       }
-  }
+    }
 }
 
 AggregationOperationState* AggregationOperationState::ReconstructFromProto(
@@ -442,13 +446,15 @@ void 
AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
                                distinctify_hashtables_[agg_idx].get(),
                                &reuse_matches,
                                &reuse_group_by_vectors);
-    } else {
-      // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
-      // directly into the (threadsafe) shared global HashTable for this
-      // aggregate.
-      DCHECK(group_by_hashtable_pools_[0] != nullptr);
-      AggregationStateHashTableBase *agg_hash_table = 
group_by_hashtable_pools_[0]->getHashTableFast();
-      DCHECK(agg_hash_table != nullptr);
+    }
+  }
+
+  // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
+  // directly into the (threadsafe) shared global HashTable for this
+  // aggregate.
+  DCHECK(group_by_hashtable_pools_[0] != nullptr);
+  AggregationStateHashTableBase *agg_hash_table = 
group_by_hashtable_pools_[0]->getHashTableFast();
+  DCHECK(agg_hash_table != nullptr);
  /*     block->aggregateGroupBy(*handles_[agg_idx],
                               arguments_[agg_idx],
                               group_by_list_,
@@ -456,16 +462,13 @@ void 
AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
                               agg_hash_table,
                               &reuse_matches,
                               &reuse_group_by_vectors);*/
-      block->aggregateGroupByFast(arguments_,
+  block->aggregateGroupByFast(arguments_,
                               group_by_list_,
                               predicate_.get(),
                               agg_hash_table,
                               &reuse_matches,
                               &reuse_group_by_vectors);
-      group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
-      break;
-    }
-  }
+  group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
 }
 
 void AggregationOperationState::finalizeSingleState(InsertDestination 
*output_destination) {
@@ -541,9 +544,11 @@ void 
AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
       DCHECK(hash_tables->back() != nullptr);
       AggregationStateHashTableBase *agg_hash_table = 
hash_tables->back().get();
       DCHECK(agg_hash_table != nullptr);
+      handles_[agg_idx]->AllowUpdate();
       handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
           *distinctify_hashtables_[agg_idx],
-          agg_hash_table);
+          agg_hash_table,
+          agg_idx);
     }
 
     auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index b6f2ef9..79a5b87 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -631,7 +631,6 @@ target_link_libraries(quickstep_storage_EvictionPolicy
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_FastHashTable
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_aggregation_AggregationHandleAvg
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_StorageBlob

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/storage/FastHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTable.hpp b/storage/FastHashTable.hpp
index 12e447f..cba039a 100644
--- a/storage/FastHashTable.hpp
+++ b/storage/FastHashTable.hpp
@@ -35,7 +35,6 @@
 #include "storage/TupleReference.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
-#include "expressions/aggregation/AggregationHandleAvg.hpp"
 #include "threading/SpinSharedMutex.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
@@ -434,6 +433,11 @@ class FastHashTable : public HashTableBase<resizable,
                           const uint8_t *init_value_ptr,
                           FunctorT *functor);
 
+  template <typename FunctorT>
+  bool upsertCompositeKeyFast(const std::vector<TypedValue> &key,
+                          const uint8_t *init_value_ptr,
+                          FunctorT *functor, int index);
+
   bool upsertCompositeKeyNewFast(const std::vector<TypedValue> &key,
                           const uint8_t *init_value_ptr,
                           const uint8_t *source_state);
@@ -1851,6 +1855,41 @@ template <bool resizable,
           bool serializable,
           bool force_key_copy,
           bool allow_duplicate_keys>
+template <typename FunctorT>
+bool FastHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::upsertCompositeKeyFast(const std::vector<TypedValue> &key,
+                         const std::uint8_t *init_value_ptr,
+                         FunctorT *functor, int index) {
+  DEBUG_ASSERT(!allow_duplicate_keys);
+  const std::size_t variable_size = 
calculateVariableLengthCompositeKeyCopySize(key);
+  if (resizable) {
+    for (;;) {
+      {
+        SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
+        uint8_t *value = upsertCompositeKeyInternalFast(key, init_value_ptr, 
variable_size);
+        if (value != nullptr) {
+          (*functor)(value+payload_offsets_[index]);
+          return true;
+        }
+      }
+      resize(0, variable_size);
+    }
+  } else {
+    uint8_t *value = upsertCompositeKeyInternalFast(key, init_value_ptr, 
variable_size);
+    if (value == nullptr) {
+      return false;
+    } else {
+      (*functor)(value+payload_offsets_[index]);
+      return true;
+    }
+  }
+}
+
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
 bool FastHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
     ::upsertCompositeKeyNewFast(const std::vector<TypedValue> &key,
                          const std::uint8_t *init_value_ptr,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b7f098d/storage/FastSeparateChainingHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastSeparateChainingHashTable.hpp 
b/storage/FastSeparateChainingHashTable.hpp
index 64c4979..756d6e5 100644
--- a/storage/FastSeparateChainingHashTable.hpp
+++ b/storage/FastSeparateChainingHashTable.hpp
@@ -308,8 +308,11 @@ FastSeparateChainingHashTable<resizable, serializable, 
force_key_copy, allow_dup
           key_manager_(this->key_types_, kValueOffset + 
this->total_payload_size_),
           bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
   init_payload_ = static_cast<std::uint8_t 
*>(calloc(this->total_payload_size_, 1));
-  for (auto handle : handles)
-      handle->initPayload(init_payload_);
+  int k = 0;
+  for (auto handle : handles) {
+      handle->initPayload(init_payload_+this->payload_offsets_[k]);
+      k++;
+  }
   // Bucket size always rounds up to the alignment requirement of the atomic
   // size_t "next" pointer at the front or a ValueT, whichever is larger.
   //
@@ -437,8 +440,7 @@ FastSeparateChainingHashTable<resizable, serializable, 
force_key_copy, allow_dup
               true),
           kBucketAlignment(alignof(std::atomic<std::size_t>) < 
alignof(uint8_t) ? alignof(uint8_t)
                                                   : 
alignof(std::atomic<std::size_t>)),
-          kValueOffset((((sizeof(std::atomic<std::size_t>) + 
sizeof(std::size_t) - 1) /
-                                           alignof(uint8_t)) + 1) * 
alignof(uint8_t)),
+          kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
           key_manager_(this->key_types_, kValueOffset + sizeof(uint8_t)),
           bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
   // Bucket size always rounds up to the alignment requirement of the atomic
@@ -1046,7 +1048,6 @@ uint8_t* FastSeparateChainingHashTable<resizable, 
serializable, force_key_copy,
     else
         memcpy(value, init_value_ptr, this->total_payload_size_);
 
-
   // Update the previous chain pointer to point to the new bucket.
   pending_chain_ptr->store(pending_chain_ptr_finish_value, 
std::memory_order_release);
 
@@ -1168,10 +1169,11 @@ uint8_t* FastSeparateChainingHashTable<resizable, 
serializable, force_key_copy,
 //  uint8_t *value;
 //  value = static_cast<unsigned char*>(bucket) + kValueOffset;
     uint8_t *value = static_cast<unsigned char*>(bucket) + kValueOffset;
-    if (init_value_ptr == nullptr)
+    if (init_value_ptr == nullptr) {
         memcpy(value, init_payload_, this->total_payload_size_);
-    else
+    } else {
         memcpy(value, init_value_ptr, this->total_payload_size_);
+    }
 
   // Update the previous chaing pointer to point to the new bucket.
   pending_chain_ptr->store(pending_chain_ptr_finish_value, 
std::memory_order_release);


Reply via email to