Repository: incubator-quickstep
Updated Branches:
  refs/heads/quickstep-28-29 7924d35c6 -> f8d696a3d (forced update)


Single aggregationGroupBy method in StorageBlock.

- New methods for separating unary and nullary updation of states.
- Added TODO to move method from HashTableBase class.
- Added doxygen for the AggregationHandle new functions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f8d696a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f8d696a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f8d696a3

Branch: refs/heads/quickstep-28-29
Commit: f8d696a3de7d4702fffd2e89933f2476653f5ed6
Parents: 2a9efc4
Author: Harshad Deshmukh <hbdeshm...@apache.org>
Authored: Mon Sep 12 16:03:01 2016 -0500
Committer: Harshad Deshmukh <hbdeshm...@apache.org>
Committed: Mon Sep 19 14:16:58 2016 -0500

----------------------------------------------------------------------
 catalog/CatalogTypedefs.hpp                     |  2 +
 .../aggregation/AggregationConcreteHandle.cpp   |  7 +-
 expressions/aggregation/AggregationHandle.hpp   | 58 ++++++++++++-
 .../aggregation/AggregationHandleAvg.hpp        |  6 +-
 .../aggregation/AggregationHandleCount.hpp      | 15 ++--
 .../aggregation/AggregationHandleMax.hpp        |  6 +-
 .../aggregation/AggregationHandleMin.hpp        |  6 +-
 .../aggregation/AggregationHandleSum.hpp        |  6 +-
 query_optimizer/ExecutionGenerator.cpp          | 20 ++---
 .../tests/AggregationOperator_unittest.cpp      |  3 +-
 storage/AggregationOperationState.cpp           | 64 +++++++-------
 storage/AggregationOperationState.hpp           |  4 +-
 storage/FastHashTable.hpp                       | 60 +++++++------
 storage/FastHashTableFactory.hpp                | 46 ----------
 storage/HashTableBase.hpp                       | 19 +++-
 storage/StorageBlock.cpp                        | 91 ++------------------
 storage/StorageBlock.hpp                        | 25 ++----
 17 files changed, 182 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/catalog/CatalogTypedefs.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogTypedefs.hpp b/catalog/CatalogTypedefs.hpp
index f7a2d53..70bac84 100644
--- a/catalog/CatalogTypedefs.hpp
+++ b/catalog/CatalogTypedefs.hpp
@@ -49,6 +49,8 @@ constexpr int kInvalidCatalogId = -1;
 // Used to indicate no preference for a NUMA Node ID.
 constexpr numa_node_id kAnyNUMANodeID = -1;
 
+constexpr attribute_id kInvalidAttributeID = -1;
+
 /** @} */
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp 
b/expressions/aggregation/AggregationConcreteHandle.cpp
index ae677d9..e3fb520 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -56,13 +56,10 @@ void 
AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
   AggregationStateFastHashTable *hash_table =
       static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
   if (key_ids.size() == 1) {
-    std::vector<std::vector<attribute_id>> args;
-    args.emplace_back(key_ids);
     hash_table->upsertValueAccessorFast(
-        args, accessor, key_ids[0], true /* check_for_null_keys */);
+        key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
   } else {
-    std::vector<std::vector<attribute_id>> empty_args;
-    empty_args.resize(1);
+    std::vector<attribute_id> empty_args {kInvalidAttributeID};
     hash_table->upsertValueAccessorCompositeKeyFast(
         empty_args, accessor, key_ids, true /* check_for_null_keys */);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp 
b/expressions/aggregation/AggregationHandle.hpp
index d2cee6d..c4663cf 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -130,7 +130,7 @@ class AggregationHandle {
    *        A StorageBlob will be allocated to serve as the HashTable's
    *        in-memory storage.
    * @return A new HashTable instance with the appropriate state type for this
-   *         aggregate as the ValueT.
+   *         aggregate.
    **/
   virtual AggregationStateHashTableBase* createGroupByHashTable(
       const HashTableImplType hash_table_impl,
@@ -297,7 +297,7 @@ class AggregationHandle {
    * in-memory
    *        storage.
    * @return A new HashTable instance with the appropriate state type for this
-   *         aggregate as the ValueT.
+   *         aggregate.
    */
   virtual AggregationStateHashTableBase* createDistinctifyHashTable(
       const HashTableImplType hash_table_impl,
@@ -356,13 +356,63 @@ class AggregationHandle {
       AggregationStateHashTableBase *aggregation_hash_table,
       std::size_t index) const = 0;
 
+  /**
+   * @brief Get the number of bytes to store the aggregation handle's state.
+   **/
   virtual std::size_t getPayloadSize() const { return 1; }
-  virtual void updateState(const std::vector<TypedValue> &arguments,
-                           std::uint8_t *byte_ptr) const {}
+
+  /**
+   * @brief Update the aggregation state for nullary aggregation function e.g.
+   *        COUNT(*).
+   *
+   * @note This function should be overloaded by those aggregation function
+   *       which can perform nullary operations, e.g. COUNT.
+   *
+   * @param byte_ptr The pointer where the aggregation state is stored.
+   **/
+  virtual void updateStateNullary(std::uint8_t *byte_ptr) const {}
+
+  /**
+   * @brief Update the aggregation state for unary aggregation function e.g.
+   *        SUM(a).
+   *
+   * @param argument The argument which will be used to update the state of the
+   *        aggregation function.
+   * @param byte_ptr The pointer where the aggregation state is stored.
+   **/
+  virtual void updateStateUnary(const TypedValue &argument,
+                                std::uint8_t *byte_ptr) const {}
+
+  /**
+   * @brief Merge two aggregation states for this aggregation handle.
+   *
+   * @note This function should be used with the hash table specifically meant
+   *       for aggregations only.
+   *
+   * @param src A pointer to the source aggregation state.
+   * @param dst A pointer to the destination aggregation state.
+   **/
   virtual void mergeStatesFast(const std::uint8_t *src,
                                std::uint8_t *dst) const {}
+
+  /**
+   * @brief Initialize the payload (in the aggregation hash table) for the 
given
+   *        aggregation handle.
+   *
+   * @param byte_ptr The pointer to the aggregation state in the hash table.
+   **/
   virtual void initPayload(std::uint8_t *byte_ptr) const {}
+
+  /**
+   * @brief Inform the aggregation handle to block (prohibit) updates on the
+   *        aggregation state.
+   **/
   virtual void blockUpdate() {}
+
+  /**
+   * @brief Inform the aggregation handle to allow updates on the
+   *        aggregation state.
+   **/
   virtual void allowUpdate() {}
 
  protected:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp 
b/expressions/aggregation/AggregationHandleAvg.hpp
index 3e49213..366ba8e 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -141,10 +141,10 @@ class AggregationHandleAvg : public 
AggregationConcreteHandle {
     ++(*count_ptr);
   }
 
-  inline void updateState(const std::vector<TypedValue> &arguments,
-                          std::uint8_t *byte_ptr) const override {
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
     if (!block_update_) {
-      iterateUnaryInlFast(arguments.front(), byte_ptr);
+      iterateUnaryInlFast(argument, byte_ptr);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp 
b/expressions/aggregation/AggregationHandleCount.hpp
index 2c6d717..9b97590 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -135,13 +135,16 @@ class AggregationHandleCount : public 
AggregationConcreteHandle {
     }
   }
 
-  inline void updateState(const std::vector<TypedValue> &arguments,
-                          std::uint8_t *byte_ptr) const override {
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
     if (!block_update_) {
-      if (arguments.size())
-        iterateUnaryInlFast(arguments.front(), byte_ptr);
-      else
-        iterateNullaryInlFast(byte_ptr);
+      iterateUnaryInlFast(argument, byte_ptr);
+    }
+  }
+
+  inline void updateStateNullary(std::uint8_t *byte_ptr) const override {
+    if (!block_update_) {
+      iterateNullaryInlFast(byte_ptr);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp 
b/expressions/aggregation/AggregationHandleMax.hpp
index de173c9..6c54b9d 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -112,10 +112,10 @@ class AggregationHandleMax : public 
AggregationConcreteHandle {
     compareAndUpdateFast(max_ptr, value);
   }
 
-  inline void updateState(const std::vector<TypedValue> &arguments,
-                          std::uint8_t *byte_ptr) const override {
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
     if (!block_update_) {
-      iterateUnaryInlFast(arguments.front(), byte_ptr);
+      iterateUnaryInlFast(argument, byte_ptr);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp 
b/expressions/aggregation/AggregationHandleMin.hpp
index 4a0eca4..9baf736 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -114,10 +114,10 @@ class AggregationHandleMin : public 
AggregationConcreteHandle {
     compareAndUpdateFast(min_ptr, value);
   }
 
-  inline void updateState(const std::vector<TypedValue> &arguments,
-                          std::uint8_t *byte_ptr) const override {
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
     if (!block_update_) {
-      iterateUnaryInlFast(arguments.front(), byte_ptr);
+      iterateUnaryInlFast(argument, byte_ptr);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp 
b/expressions/aggregation/AggregationHandleSum.hpp
index 8d719ab..18d45d9 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -133,10 +133,10 @@ class AggregationHandleSum : public 
AggregationConcreteHandle {
     *null_ptr = false;
   }
 
-  inline void updateState(const std::vector<TypedValue> &arguments,
-                          std::uint8_t *byte_ptr) const override {
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
     if (!block_update_) {
-      iterateUnaryInlFast(arguments.front(), byte_ptr);
+      iterateUnaryInlFast(argument, byte_ptr);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp 
b/query_optimizer/ExecutionGenerator.cpp
index 130134c..968314e 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1371,13 +1371,9 @@ void ExecutionGenerator::convertAggregate(
   }
 
   if (!group_by_types.empty()) {
-    // SimplifyHashTableImplTypeProto() switches the hash table implementation
-    // from SeparateChaining to SimpleScalarSeparateChaining when there is a
-    // single scalar key type with a reversible hash function.
+    // Right now, only SeparateChaining is supported.
     aggr_state_proto->set_hash_table_impl_type(
-        SimplifyHashTableImplTypeProto(
-            HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type),
-            group_by_types));
+        serialization::HashTableImplType::SEPARATE_CHAINING);
   }
 
   for (const E::AliasPtr &named_aggregate_expression : 
physical_plan->aggregate_expressions()) {
@@ -1404,15 +1400,9 @@ void ExecutionGenerator::convertAggregate(
     if (unnamed_aggregate_expression->is_distinct()) {
       const std::vector<E::ScalarPtr> &arguments = 
unnamed_aggregate_expression->getArguments();
       DCHECK_GE(arguments.size(), 1u);
-      if (group_by_types.empty() && arguments.size() == 1) {
-        aggr_state_proto->add_distinctify_hash_table_impl_types(
-            SimplifyHashTableImplTypeProto(
-                
HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type),
-                {&arguments[0]->getValueType()}));
-      } else {
-        aggr_state_proto->add_distinctify_hash_table_impl_types(
-            HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type));
-      }
+      // Right now only SeparateChaining implementation is supported.
+      aggr_state_proto->add_distinctify_hash_table_impl_types(
+          serialization::HashTableImplType::SEPARATE_CHAINING);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp 
b/relational_operators/tests/AggregationOperator_unittest.cpp
index 0138362..6881dea 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -363,8 +363,9 @@ class AggregationOperatorTest : public ::testing::Test {
     aggr_state_proto->set_estimated_num_entries(estimated_entries);
 
     // Also need to set the HashTable implementation for GROUP BY.
+    // Right now, only SeparateChaining is supported.
     aggr_state_proto->set_hash_table_impl_type(
-        serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING);
+        serialization::HashTableImplType::SEPARATE_CHAINING);
 
     // Create Operators.
     op_.reset(new AggregationOperator(0, *table_, true, aggr_state_index));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp 
b/storage/AggregationOperationState.cpp
index c5f59f9..073b813 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -94,13 +94,12 @@ AggregationOperationState::AggregationOperationState(
     handles_.emplace_back(new AggregationHandleDistinct());
     arguments_.push_back({});
     is_distinct_.emplace_back(false);
-    group_by_hashtable_pools_.emplace_back(
-        std::unique_ptr<HashTablePool>(new HashTablePool(estimated_num_entries,
-                                                         hash_table_impl_type,
-                                                         group_by_types,
-                                                         {1},
-                                                         handles_,
-                                                         storage_manager)));
+    group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+                                                     hash_table_impl_type,
+                                                     group_by_types,
+                                                     {1},
+                                                     handles_,
+                                                     storage_manager));
   } else {
     // Set up each individual aggregate in this operation.
     std::vector<const AggregateFunction *>::const_iterator agg_func_it =
@@ -196,13 +195,12 @@ AggregationOperationState::AggregationOperationState(
     if (!group_by_handles.empty()) {
       // Aggregation with GROUP BY: create a HashTable pool for per-group
       // states.
-      group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
-          new HashTablePool(estimated_num_entries,
-                            hash_table_impl_type,
-                            group_by_types,
-                            payload_sizes,
-                            group_by_handles,
-                            storage_manager)));
+      group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+                                                       hash_table_impl_type,
+                                                       group_by_types,
+                                                       payload_sizes,
+                                                       group_by_handles,
+                                                       storage_manager));
     }
   }
 }
@@ -444,17 +442,17 @@ void AggregationOperationState::aggregateBlockHashTable(
   // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
   // directly into the (threadsafe) shared global HashTable for this
   // aggregate.
-  DCHECK(group_by_hashtable_pools_[0] != nullptr);
+  DCHECK(group_by_hashtable_pool_ != nullptr);
   AggregationStateHashTableBase *agg_hash_table =
-      group_by_hashtable_pools_[0]->getHashTableFast();
+      group_by_hashtable_pool_->getHashTableFast();
   DCHECK(agg_hash_table != nullptr);
-  block->aggregateGroupByFast(arguments_,
-                              group_by_list_,
-                              predicate_.get(),
-                              agg_hash_table,
-                              &reuse_matches,
-                              &reuse_group_by_vectors);
-  group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
+  block->aggregateGroupBy(arguments_,
+                          group_by_list_,
+                          predicate_.get(),
+                          agg_hash_table,
+                          &reuse_matches,
+                          &reuse_group_by_vectors);
+  group_by_hashtable_pool_->returnHashTable(agg_hash_table);
 }
 
 void AggregationOperationState::finalizeSingleState(
@@ -497,7 +495,7 @@ void AggregationOperationState::finalizeHashTable(
   // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
   // e.g. Keep merging entries from smaller hash tables to larger.
 
-  auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
+  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
   if (hash_tables->size() > 1) {
     for (int hash_table_index = 0;
          hash_table_index < static_cast<int>(hash_tables->size() - 1);
@@ -512,17 +510,17 @@ void AggregationOperationState::finalizeHashTable(
   std::vector<std::unique_ptr<ColumnVector>> final_values;
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     if (is_distinct_[agg_idx]) {
-      DCHECK(group_by_hashtable_pools_[0] != nullptr);
-      auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
+      DCHECK(group_by_hashtable_pool_ != nullptr);
+      auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
       DCHECK(hash_tables != nullptr);
       if (hash_tables->empty()) {
         // We may have a case where hash_tables is empty, e.g. no input blocks.
         // However for aggregateOnDistinctifyHashTableForGroupBy to work
         // correctly, we should create an empty group by hash table.
         AggregationStateHashTableBase *new_hash_table =
-            group_by_hashtable_pools_[0]->getHashTableFast();
-        group_by_hashtable_pools_[0]->returnHashTable(new_hash_table);
-        hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
+            group_by_hashtable_pool_->getHashTableFast();
+        group_by_hashtable_pool_->returnHashTable(new_hash_table);
+        hash_tables = group_by_hashtable_pool_->getAllHashTables();
       }
       DCHECK(hash_tables->back() != nullptr);
       AggregationStateHashTableBase *agg_hash_table = 
hash_tables->back().get();
@@ -532,16 +530,16 @@ void AggregationOperationState::finalizeHashTable(
           *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
     }
 
-    auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
+    auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
     DCHECK(hash_tables != nullptr);
     if (hash_tables->empty()) {
       // We may have a case where hash_tables is empty, e.g. no input blocks.
       // However for aggregateOnDistinctifyHashTableForGroupBy to work
       // correctly, we should create an empty group by hash table.
       AggregationStateHashTableBase *new_hash_table =
-          group_by_hashtable_pools_[0]->getHashTable();
-      group_by_hashtable_pools_[0]->returnHashTable(new_hash_table);
-      hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
+          group_by_hashtable_pool_->getHashTable();
+      group_by_hashtable_pool_->returnHashTable(new_hash_table);
+      hash_tables = group_by_hashtable_pool_->getAllHashTables();
     }
     AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
     DCHECK(agg_hash_table != nullptr);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp 
b/storage/AggregationOperationState.hpp
index 7956bc6..cbbfc22 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -221,8 +221,8 @@ class AggregationOperationState {
   std::vector<std::unique_ptr<AggregationStateHashTableBase>>
       group_by_hashtables_;
 
-  // A vector of group by hash table pools, one for each group by clause.
-  std::vector<std::unique_ptr<HashTablePool>> group_by_hashtable_pools_;
+  // A vector of group by hash table pools.
+  std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
 
   StorageManager *storage_manager_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/storage/FastHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTable.hpp b/storage/FastHashTable.hpp
index f1e8d1a..4a95cd9 100644
--- a/storage/FastHashTable.hpp
+++ b/storage/FastHashTable.hpp
@@ -456,7 +456,7 @@ class FastHashTable : public HashTableBase<resizable,
    *         not be inserted).
    **/
   bool upsertValueAccessorFast(
-      const std::vector<std::vector<attribute_id>> &argument_ids,
+      const std::vector<attribute_id> &argument_ids,
       ValueAccessor *accessor,
       const attribute_id key_attr_id,
       const bool check_for_null_keys);
@@ -509,7 +509,7 @@ class FastHashTable : public HashTableBase<resizable,
    *         not be inserted).
    **/
   bool upsertValueAccessorCompositeKeyFast(
-      const std::vector<std::vector<attribute_id>> &argument,
+      const std::vector<attribute_id> &argument,
       ValueAccessor *accessor,
       const std::vector<attribute_id> &key_attr_ids,
       const bool check_for_null_keys) override;
@@ -1866,13 +1866,12 @@ bool FastHashTable<resizable,
                    force_key_copy,
                    allow_duplicate_keys>::
     upsertValueAccessorFast(
-        const std::vector<std::vector<attribute_id>> &argument_ids,
+        const std::vector<attribute_id> &argument_ids,
         ValueAccessor *accessor,
         const attribute_id key_attr_id,
         const bool check_for_null_keys) {
   DEBUG_ASSERT(!allow_duplicate_keys);
   std::size_t variable_size;
-  std::vector<TypedValue> local;
   return InvokeOnAnyValueAccessor(
       accessor,
       [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
@@ -1898,13 +1897,14 @@ bool FastHashTable<resizable,
                 } else {
                   SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
                   for (unsigned int k = 0; k < num_handles_; ++k) {
-                    local.clear();
-                    if (argument_ids[k].size()) {
-                      local.emplace_back(
-                          accessor->getTypedValue(argument_ids[k].front()));
+                    if (argument_ids[k] != kInvalidAttributeID) {
+                      handles_[k]->updateStateUnary(
+                          accessor->getTypedValue(argument_ids[k]),
+                          value + payload_offsets_[k]);
+                    } else {
+                      handles_[k]->updateStateNullary(value +
+                                                      payload_offsets_[k]);
                     }
-                    handles_[k]->updateState(local,
-                                             value + payload_offsets_[k]);
                   }
                 }
               }
@@ -1929,12 +1929,14 @@ bool FastHashTable<resizable,
             } else {
               SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
               for (unsigned int k = 0; k < num_handles_; ++k) {
-                local.clear();
-                if (argument_ids[k].size()) {
-                  local.emplace_back(
-                      accessor->getTypedValue(argument_ids[k].front()));
+                if (argument_ids[k] != kInvalidAttributeID) {
+                  handles_[k]->updateStateUnary(
+                      accessor->getTypedValue(argument_ids[k]),
+                      value + payload_offsets_[k]);
+                } else {
+                  handles_[k]->updateStateNullary(value +
+                                                  payload_offsets_[k]);
                 }
-                handles_[k]->updateState(local, value + payload_offsets_[k]);
               }
             }
           }
@@ -1953,7 +1955,7 @@ bool FastHashTable<resizable,
                    force_key_copy,
                    allow_duplicate_keys>::
     upsertValueAccessorCompositeKeyFast(
-        const std::vector<std::vector<attribute_id>> &argument_ids,
+        const std::vector<attribute_id> &argument_ids,
         ValueAccessor *accessor,
         const std::vector<attribute_id> &key_attr_ids,
         const bool check_for_null_keys) {
@@ -1961,7 +1963,6 @@ bool FastHashTable<resizable,
   std::size_t variable_size;
   std::vector<TypedValue> key_vector;
   key_vector.resize(key_attr_ids.size());
-  std::vector<TypedValue> local;
   return InvokeOnAnyValueAccessor(
       accessor,
       [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
@@ -1989,13 +1990,14 @@ bool FastHashTable<resizable,
                 } else {
                   SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
                   for (unsigned int k = 0; k < num_handles_; ++k) {
-                    local.clear();
-                    if (argument_ids[k].size()) {
-                      local.emplace_back(
-                          accessor->getTypedValue(argument_ids[k].front()));
+                    if (argument_ids[k] != kInvalidAttributeID) {
+                      handles_[k]->updateStateUnary(
+                          accessor->getTypedValue(argument_ids[k]),
+                          value + payload_offsets_[k]);
+                    } else {
+                      handles_[k]->updateStateNullary(value +
+                                                      payload_offsets_[k]);
                     }
-                    handles_[k]->updateState(local,
-                                             value + payload_offsets_[k]);
                   }
                 }
               }
@@ -2022,12 +2024,14 @@ bool FastHashTable<resizable,
             } else {
               SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
               for (unsigned int k = 0; k < num_handles_; ++k) {
-                local.clear();
-                if (argument_ids[k].size()) {
-                  local.emplace_back(
-                      accessor->getTypedValue(argument_ids[k].front()));
+                if (argument_ids[k] != kInvalidAttributeID) {
+                  handles_[k]->updateStateUnary(
+                      accessor->getTypedValue(argument_ids[k]),
+                      value + payload_offsets_[k]);
+                } else {
+                  handles_[k]->updateStateNullary(value +
+                                                  payload_offsets_[k]);
                 }
-                handles_[k]->updateState(local, value + payload_offsets_[k]);
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/storage/FastHashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTableFactory.hpp b/storage/FastHashTableFactory.hpp
index 6ad3212..dc4f893 100644
--- a/storage/FastHashTableFactory.hpp
+++ b/storage/FastHashTableFactory.hpp
@@ -90,30 +90,6 @@ class FastHashTableFactory {
             serializable,
             force_key_copy,
             allow_duplicate_keys>(key_types, num_entries, payload_sizes, 
handles, storage_manager);
-      case HashTableImplType::kLinearOpenAddressing:
-/*        return new LinearOpenAddressingHashTable<
-            ValueT,
-            resizable,
-            serializable,
-            force_key_copy,
-            allow_duplicate_keys>(key_types, num_entries, storage_manager);*/
-        return new FastSeparateChainingHashTable<
-            resizable,
-            serializable,
-            force_key_copy,
-            allow_duplicate_keys>(key_types, num_entries, payload_sizes, 
handles, storage_manager);
-      case HashTableImplType::kSimpleScalarSeparateChaining:
-        return new FastSeparateChainingHashTable<
-            resizable,
-            serializable,
-            force_key_copy,
-            allow_duplicate_keys>(key_types, num_entries, payload_sizes, 
handles, storage_manager);
-/*        return new SimpleScalarSeparateChainingHashTable<
-            ValueT,
-            resizable,
-            serializable,
-            force_key_copy,
-            allow_duplicate_keys>(key_types, num_entries, storage_manager);*/
       default: {
         LOG(FATAL) << "Unrecognized HashTableImplType in 
HashTableFactory::createResizable()\n";
       }
@@ -167,28 +143,6 @@ class FastHashTableFactory {
                                   hash_table_memory_size,
                                   new_hash_table,
                                   hash_table_memory_zeroed);
-      case HashTableImplType::kLinearOpenAddressing:
-/*        return new LinearOpenAddressingHashTable<
-            ValueT,
-            resizable,
-            serializable,
-            force_key_copy,
-            allow_duplicate_keys>(key_types,
-                                  hash_table_memory,
-                                  hash_table_memory_size,
-                                  new_hash_table,
-                                  hash_table_memory_zeroed);*/
-      case HashTableImplType::kSimpleScalarSeparateChaining:
-/*        return new SimpleScalarSeparateChainingHashTable<
-            ValueT,
-            resizable,
-            serializable,
-            force_key_copy,
-            allow_duplicate_keys>(key_types,
-                                  hash_table_memory,
-                                  hash_table_memory_size,
-                                  new_hash_table,
-                                  hash_table_memory_zeroed);*/
       default: {
         LOG(FATAL) << "Unrecognized HashTableImplType\n";
       }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index b908d6f..cd0a141 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -74,8 +74,25 @@ class HashTableBase {
  public:
   virtual ~HashTableBase() {}
 
+  /**
+   * TODO(harshad) We should get rid of this function from here. We are
+   * postponing it because of the amount of work to be done is significant.
+   * The steps are as follows:
+   * 1. Replace AggregationStateHashTableBase occurence in HashTablePool to
+   * the FastHashTable implementation (i.e. an implementation specialized for
+   * aggregation).
+   * 2. Remove createGroupByHashTable from the AggregationHandle* classes.
+   * 3. Replace AggregationStateHashTableBase occurences in AggregationHandle*
+   * clases to the FastHashTable implementation (i.e. an implementation
+   * specialized for aggregation).
+   * 4. Move this method to the FastHashTable class from here, so that it can
+   * be called from the AggregationHandle* classes.
+   *
+   * Optionally, we can also remove the AggregationStateHashTableBase
+   * specialization from this file.
+   **/
   virtual bool upsertValueAccessorCompositeKeyFast(
-      const std::vector<std::vector<attribute_id>> &argument,
+      const std::vector<attribute_id> &argument,
       ValueAccessor *accessor,
       const std::vector<attribute_id> &key_attr_ids,
       const bool check_for_null_keys) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 8ff18b5..ec5990f 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -415,87 +415,6 @@ AggregationState* StorageBlock::aggregate(
 }
 
 void StorageBlock::aggregateGroupBy(
-    const AggregationHandle &handle,
-    const std::vector<std::unique_ptr<const Scalar>> &arguments,
-    const std::vector<std::unique_ptr<const Scalar>> &group_by,
-    const Predicate *predicate,
-    AggregationStateHashTableBase *hash_table,
-    std::unique_ptr<TupleIdSequence> *reuse_matches,
-    std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
-  DCHECK_GT(group_by.size(), 0u)
-      << "Called aggregateGroupBy() with zero GROUP BY expressions";
-
-  SubBlocksReference sub_blocks_ref(*tuple_store_,
-                                    indices_,
-                                    indices_consistent_);
-
-  // IDs of 'arguments' as attributes in the ValueAccessor we create below.
-  std::vector<attribute_id> argument_ids;
-
-  // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
-  std::vector<attribute_id> key_ids;
-
-  // An intermediate ValueAccessor that stores the materialized 'arguments' for
-  // this aggregate, as well as the GROUP BY expression values.
-  ColumnVectorsValueAccessor temp_result;
-  {
-    std::unique_ptr<ValueAccessor> accessor;
-    if (predicate) {
-      if (!*reuse_matches) {
-        // If there is a filter predicate that hasn't already been evaluated,
-        // evaluate it now and save the results for other aggregates on this
-        // same block.
-        reuse_matches->reset(getMatchesForPredicate(predicate));
-      }
-
-      // Create a filtered ValueAccessor that only iterates over predicate
-      // matches.
-      accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
-    } else {
-      // Create a ValueAccessor that iterates over all tuples in this block
-      accessor.reset(tuple_store_->createValueAccessor());
-    }
-
-    attribute_id attr_id = 0;
-
-    // First, put GROUP BY keys into 'temp_result'.
-    if (reuse_group_by_vectors->empty()) {
-      // Compute GROUP BY values from group_by Scalars, and store them in
-      // reuse_group_by_vectors for reuse by other aggregates on this same
-      // block.
-      reuse_group_by_vectors->reserve(group_by.size());
-      for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
-        reuse_group_by_vectors->emplace_back(
-            group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
-        temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
-        key_ids.push_back(attr_id++);
-      }
-    } else {
-      // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
-      DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
-          << "Wrong number of reuse_group_by_vectors";
-      for (const std::unique_ptr<ColumnVector> &reuse_cv : 
*reuse_group_by_vectors) {
-        temp_result.addColumn(reuse_cv.get(), false);
-        key_ids.push_back(attr_id++);
-      }
-    }
-
-    // Compute argument vectors and add them to 'temp_result'.
-    for (const std::unique_ptr<const Scalar> &argument : arguments) {
-      temp_result.addColumn(argument->getAllValues(accessor.get(), 
&sub_blocks_ref));
-      argument_ids.push_back(attr_id++);
-    }
-  }
-
-  // Actually do aggregation into '*hash_table'.
-  handle.aggregateValueAccessorIntoHashTable(&temp_result,
-                                             argument_ids,
-                                             key_ids,
-                                             hash_table);
-}
-
-
-void StorageBlock::aggregateGroupByFast(
     const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
     const std::vector<std::unique_ptr<const Scalar>> &group_by,
     const Predicate *predicate,
@@ -510,8 +429,7 @@ void StorageBlock::aggregateGroupByFast(
                                     indices_consistent_);
 
   // IDs of 'arguments' as attributes in the ValueAccessor we create below.
-  std::vector<attribute_id> arg_ids;
-  std::vector<std::vector<attribute_id>> argument_ids;
+  std::vector<attribute_id> argument_ids;
 
   // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
   std::vector<attribute_id> key_ids;
@@ -563,12 +481,13 @@ void StorageBlock::aggregateGroupByFast(
 
     // Compute argument vectors and add them to 'temp_result'.
     for (const std::vector<std::unique_ptr<const Scalar>> &argument : 
arguments) {
-        arg_ids.clear();
         for (const std::unique_ptr<const Scalar> &args : argument) {
           temp_result.addColumn(args->getAllValues(accessor.get(), 
&sub_blocks_ref));
-          arg_ids.push_back(attr_id++);
+          argument_ids.push_back(attr_id++);
+        }
+        if (argument.empty()) {
+          argument_ids.push_back(kInvalidAttributeID);
         }
-        argument_ids.push_back(arg_ids);
      }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f8d696a3/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 8b59a3c..398008e 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -459,23 +459,14 @@ class StorageBlock : public StorageBlockBase {
    * attributes as std::vector<attribute_id> (like in selectSimple()) for fast
    * path when there are no expressions specified in the query.
    */
-  void aggregateGroupBy(const AggregationHandle &handle,
-                        const std::vector<std::unique_ptr<const Scalar>> 
&arguments,
-                        const std::vector<std::unique_ptr<const Scalar>> 
&group_by,
-                        const Predicate *predicate,
-                        AggregationStateHashTableBase *hash_table,
-                        std::unique_ptr<TupleIdSequence> *reuse_matches,
-                        std::vector<std::unique_ptr<ColumnVector>>
-                            *reuse_group_by_vectors) const;
-
-
-  void aggregateGroupByFast(const 
std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
-                        const std::vector<std::unique_ptr<const Scalar>> 
&group_by,
-                        const Predicate *predicate,
-                        AggregationStateHashTableBase *hash_table,
-                        std::unique_ptr<TupleIdSequence> *reuse_matches,
-                        std::vector<std::unique_ptr<ColumnVector>>
-                            *reuse_group_by_vectors) const;
+  void aggregateGroupBy(
+      const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const Predicate *predicate,
+      AggregationStateHashTableBase *hash_table,
+      std::unique_ptr<TupleIdSequence> *reuse_matches,
+      std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) 
const;
+
   /**
    * @brief Inserts the GROUP BY expressions and aggregation arguments together
    *        as keys into the distinctify hash table.


Reply via email to