Repository: incubator-quickstep
Updated Branches:
  refs/heads/common-subexpression c27f5beb8 -> 675dcf8de


Updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/675dcf8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/675dcf8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/675dcf8d

Branch: refs/heads/common-subexpression
Commit: 675dcf8de09a341f6b1a9a0cc0622d27bc8a1ff3
Parents: c27f5be
Author: Jianqiao Zhu <jianq...@cs.wisc.edu>
Authored: Wed Apr 19 15:11:24 2017 -0500
Committer: Jianqiao Zhu <jianq...@cs.wisc.edu>
Committed: Wed Apr 19 15:11:24 2017 -0500

----------------------------------------------------------------------
 query_optimizer/ExecutionGenerator.cpp          |   2 +-
 query_optimizer/PhysicalGenerator.cpp           |   2 +-
 .../rules/ReuseAggregateExpressions.cpp         |  50 +-
 .../rules/ReuseAggregateExpressions.hpp         |   5 +-
 storage/AggregationOperationState.cpp           |  16 +-
 storage/AggregationOperationState.hpp           |   2 +-
 storage/CMakeLists.txt                          |  14 +-
 storage/HashTable.proto                         |   2 +-
 storage/HashTableBase.hpp                       |   2 +-
 storage/HashTableFactory.hpp                    |  10 +-
 storage/ThreadPrivateCompactKeyHashTable.cpp    |  35 ++
 storage/ThreadPrivateCompactKeyHashTable.hpp    | 483 +++++++++++++++++++
 storage/ThreadPrivateNumericHashTable.cpp       |   0
 storage/ThreadPrivateNumericHashTable.hpp       | 483 -------------------
 14 files changed, 588 insertions(+), 518 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp 
b/query_optimizer/ExecutionGenerator.cpp
index 0304e2e..8f44cc1 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1590,7 +1590,7 @@ void ExecutionGenerator::convertAggregate(
               physical_plan, estimated_num_groups)) {
         std::cout << "Use two phase numeric\n";
         aggr_state_proto->set_hash_table_impl_type(
-            serialization::HashTableImplType::THREAD_PRIVATE_NUMERIC);
+            serialization::HashTableImplType::THREAD_PRIVATE_COMPACT_KEY);
       } else {
         // Otherwise, use SeparateChaining.
         std::cout << "Use normal\n";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp 
b/query_optimizer/PhysicalGenerator.cpp
index 427c365..07bd024 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -149,7 +149,7 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
     rules.emplace_back(new ReorderColumns());
   }
 
-  rules.emplace_back(new ReuseAggregateExpressions());
+  rules.emplace_back(new ReuseAggregateExpressions(optimizer_context_));
 
   rules.emplace_back(new FuseAggregateJoin());
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/query_optimizer/rules/ReuseAggregateExpressions.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ReuseAggregateExpressions.cpp 
b/query_optimizer/rules/ReuseAggregateExpressions.cpp
index 77dfd1e..3036833 100644
--- a/query_optimizer/rules/ReuseAggregateExpressions.cpp
+++ b/query_optimizer/rules/ReuseAggregateExpressions.cpp
@@ -26,7 +26,9 @@
 #include <vector>
 
 #include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregateFunctionFactory.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
+#include "query_optimizer/OptimizerContext.hpp"
 #include "query_optimizer/expressions/AggregateFunction.hpp"
 #include "query_optimizer/expressions/Alias.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
@@ -81,13 +83,19 @@ P::PhysicalPtr ReuseAggregateExpressions::applyToNode(
   for (std::size_t i = 0; i < agg_exprs.size(); ++i) {
     DCHECK(agg_exprs[i]->expression()->getExpressionType()
                == E::ExpressionType::kAggregateFunction);
-    const E::AggregateFunctionPtr agg_func =
+    const E::AggregateFunctionPtr agg_expr =
         std::static_pointer_cast<const E::AggregateFunction>(
             agg_exprs[i]->expression());
-    const AggregationID agg_id = agg_func->getAggregate().getAggregationID();
+
+    // Skip DISTINCT aggregations.
+    if (agg_expr->is_distinct()) {
+      continue;
+    }
+
+    const AggregationID agg_id = agg_expr->getAggregate().getAggregationID();
+    const std::vector<E::ScalarPtr> &arguments = agg_expr->getArguments();
 
     // Currently we only consider aggregate functions with 0 or 1 argument.
-    const std::vector<E::ScalarPtr> &arguments = agg_func->getArguments();
     if (agg_id == AggregationID::kCount) {
       if (arguments.empty()) {
         count_star_info.emplace_front(i);
@@ -124,11 +132,10 @@ P::PhysicalPtr ReuseAggregateExpressions::applyToNode(
     // First, check whether AVG can be reduced to SUM/COUNT.
     bool is_avg_processed = false;
 
-    const auto sum_it = ref_map.find(AggregationID::kSum);
     const auto avg_it = ref_map.find(AggregationID::kAvg);
-
-    if (avg_it != ref_map.end() && sum_it != ref_map.end()) {
+    if (avg_it != ref_map.end()) {
       std::size_t count_ref = kInvalidRef;
+
       if (it.first->getValueType().isNullable()) {
         const auto count_it = ref_map.find(AggregationID::kCount);
         if (count_it != ref_map.end()) {
@@ -140,8 +147,10 @@ P::PhysicalPtr ReuseAggregateExpressions::applyToNode(
       }
 
       if (count_ref != kInvalidRef) {
-        DCHECK(!sum_it->second.empty());
-        const std::size_t sum_ref = sum_it->second.front();
+        const auto sum_it = ref_map.find(AggregationID::kSum);
+        const std::size_t sum_ref =
+            sum_it == ref_map.end() ? kInvalidRef : sum_it->second.front();
+
         for (const std::size_t idx : avg_it->second) {
           agg_refs[idx].reset(new AggregateReference(sum_ref, count_ref));
         }
@@ -205,11 +214,34 @@ P::PhysicalPtr ReuseAggregateExpressions::applyToNode(
           break;
         }
         case AggregateReference::kAvg: {
+          E::AttributeReferencePtr sum_attr;
+          if (agg_ref->first_ref == kInvalidRef) {
+            const E::AggregateFunctionPtr avg_expr =
+                std::static_pointer_cast<const 
E::AggregateFunction>(agg_expr->expression());
+
+            const AggregateFunction &sum_func =
+                AggregateFunctionFactory::Get(AggregationID::kSum);
+            const E::AggregateFunctionPtr sum_expr =
+                E::AggregateFunction::Create(sum_func,
+                                             avg_expr->getArguments(),
+                                             avg_expr->is_vector_aggregate(),
+                                             avg_expr->is_distinct());
+            new_agg_exprs.emplace_back(
+                E::Alias::Create(optimizer_context_->nextExprId(),
+                                 sum_expr,
+                                 agg_expr->attribute_name(),
+                                 agg_expr->attribute_alias()));
+
+            sum_attr = E::ToRef(new_agg_exprs.back());
+          } else {
+            sum_attr = agg_attrs[agg_ref->first_ref];
+          }
+
           const BinaryOperation &divide_op =
               
BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide);
           const E::BinaryExpressionPtr avg_expr =
               E::BinaryExpression::Create(divide_op,
-                                          agg_attrs[agg_ref->first_ref],
+                                          sum_attr,
                                           agg_attrs[agg_ref->second_ref]);
           new_select_exprs.emplace_back(
               E::Alias::Create(agg_expr->id(),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/query_optimizer/rules/ReuseAggregateExpressions.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ReuseAggregateExpressions.hpp 
b/query_optimizer/rules/ReuseAggregateExpressions.hpp
index 0b18023..5b78e46 100644
--- a/query_optimizer/rules/ReuseAggregateExpressions.hpp
+++ b/query_optimizer/rules/ReuseAggregateExpressions.hpp
@@ -42,7 +42,8 @@ class OptimizerContext;
 
 class ReuseAggregateExpressions : public BottomUpRule<physical::Physical> {
  public:
-  ReuseAggregateExpressions() {}
+  ReuseAggregateExpressions(OptimizerContext *optimizer_context)
+      : optimizer_context_(optimizer_context) {}
 
   std::string getName() const override {
     return "ReuseAggregateExpressions";
@@ -84,6 +85,8 @@ class ReuseAggregateExpressions : public 
BottomUpRule<physical::Physical> {
     const std::size_t second_ref;
   };
 
+  OptimizerContext *optimizer_context_;
+
   std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
 
   DISALLOW_COPY_AND_ASSIGN(ReuseAggregateExpressions);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp 
b/storage/AggregationOperationState.cpp
index d6f21bc..6337b5e 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -49,7 +49,7 @@
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
 #include "storage/SubBlocksReference.hpp"
-#include "storage/ThreadPrivateNumericHashTable.hpp"
+#include "storage/ThreadPrivateCompactKeyHashTable.hpp"
 #include "storage/TupleIdSequence.hpp"
 #include "storage/TupleStorageSubBlock.hpp"
 #include "storage/ValueAccessor.hpp"
@@ -99,7 +99,7 @@ AggregationOperationState::AggregationOperationState(
       case HashTableImplType::kCollisionFreeVector:
         is_aggregate_collision_free_ = true;
         break;
-      case HashTableImplType::kThreadPrivateNumeric:
+      case HashTableImplType::kThreadPrivateCompactKey:
         break;
       default:
         is_aggregate_partitioned_ = checkAggregatePartitioned(
@@ -725,8 +725,8 @@ void AggregationOperationState::finalizeHashTable(
       case HashTableImplType::kSeparateChaining:
         finalizeHashTableImplThreadPrivatePackedPayload(output_destination);
         break;
-      case HashTableImplType::kThreadPrivateNumeric:
-        finalizeHashTableImplThreadPrivateNumeric(output_destination);
+      case HashTableImplType::kThreadPrivateCompactKey:
+        finalizeHashTableImplThreadPrivateCompactKey(output_destination);
         break;
       default:
         LOG(FATAL) << "Not supported";
@@ -963,7 +963,7 @@ void 
AggregationOperationState::finalizeHashTableImplThreadPrivatePackedPayload(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
-void AggregationOperationState::finalizeHashTableImplThreadPrivateNumeric(
+void AggregationOperationState::finalizeHashTableImplThreadPrivateCompactKey(
     InsertDestination *output_destination) {
   auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
   DCHECK(hash_tables != nullptr);
@@ -971,13 +971,13 @@ void 
AggregationOperationState::finalizeHashTableImplThreadPrivateNumeric(
     return;
   }
 
-  std::unique_ptr<ThreadPrivateNumericHashTable> final_hash_table(
-      
static_cast<ThreadPrivateNumericHashTable*>(hash_tables->back().release()));
+  std::unique_ptr<ThreadPrivateCompactKeyHashTable> final_hash_table(
+      
static_cast<ThreadPrivateCompactKeyHashTable*>(hash_tables->back().release()));
   for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
     std::unique_ptr<AggregationStateHashTableBase> hash_table(
         hash_tables->at(i).release());
     final_hash_table->merge(
-        static_cast<const ThreadPrivateNumericHashTable*>(hash_table.get()));
+        static_cast<const 
ThreadPrivateCompactKeyHashTable*>(hash_table.get()));
     hash_table->destroyPayload();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp 
b/storage/AggregationOperationState.hpp
index e666a68..207c4f0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -259,7 +259,7 @@ class AggregationOperationState {
   void finalizeHashTableImplThreadPrivatePackedPayload(
       InsertDestination *output_destination);
 
-  void finalizeHashTableImplThreadPrivateNumeric(
+  void finalizeHashTableImplThreadPrivateCompactKey(
       InsertDestination *output_destination);
 
   std::size_t getMemoryConsumptionBytesHelper(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index b971240..0f610eb 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -250,9 +250,9 @@ add_library(quickstep_storage_StorageManager 
StorageManager.cpp StorageManager.h
 add_library(quickstep_storage_SubBlockTypeRegistry SubBlockTypeRegistry.cpp 
SubBlockTypeRegistry.hpp)
 add_library(quickstep_storage_SubBlockTypeRegistryMacros ../empty_src.cpp 
SubBlockTypeRegistryMacros.hpp)
 add_library(quickstep_storage_SubBlocksReference ../empty_src.cpp 
SubBlocksReference.hpp)
-add_library(quickstep_storage_ThreadPrivateNumericHashTable
-            ThreadPrivateNumericHashTable.cpp
-            ThreadPrivateNumericHashTable.hpp)
+add_library(quickstep_storage_ThreadPrivateCompactKeyHashTable
+            ThreadPrivateCompactKeyHashTable.cpp
+            ThreadPrivateCompactKeyHashTable.hpp)
 add_library(quickstep_storage_TupleIdSequence ../empty_src.cpp 
TupleIdSequence.hpp)
 add_library(quickstep_storage_TupleReference ../empty_src.cpp 
TupleReference.hpp)
 add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp 
TupleStorageSubBlock.hpp)
@@ -292,7 +292,7 @@ 
target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
                       quickstep_storage_SubBlocksReference
-                      quickstep_storage_ThreadPrivateNumericHashTable
+                      quickstep_storage_ThreadPrivateCompactKeyHashTable
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_TupleStorageSubBlock
                       quickstep_storage_ValueAccessor
@@ -728,7 +728,7 @@ target_link_libraries(quickstep_storage_HashTableFactory
                       quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_SeparateChainingHashTable
                       quickstep_storage_SimpleScalarSeparateChainingHashTable
-                      quickstep_storage_ThreadPrivateNumericHashTable
+                      quickstep_storage_ThreadPrivateCompactKeyHashTable
                       quickstep_storage_TupleReference
                       quickstep_types_Type
                       quickstep_types_TypeFactory
@@ -1044,7 +1044,7 @@ 
target_link_libraries(quickstep_storage_SubBlockTypeRegistry
 target_link_libraries(quickstep_storage_SubBlocksReference
                       glog
                       quickstep_utility_PtrVector)
-target_link_libraries(quickstep_storage_ThreadPrivateNumericHashTable
+target_link_libraries(quickstep_storage_ThreadPrivateCompactKeyHashTable
                       glog
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationHandle
@@ -1183,7 +1183,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_SubBlockTypeRegistry
                       quickstep_storage_SubBlockTypeRegistryMacros
                       quickstep_storage_SubBlocksReference
-                      quickstep_storage_ThreadPrivateNumericHashTable
+                      quickstep_storage_ThreadPrivateCompactKeyHashTable
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_TupleReference
                       quickstep_storage_TupleStorageSubBlock

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index 80e363c..ed383df 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -26,7 +26,7 @@ enum HashTableImplType {
   LINEAR_OPEN_ADDRESSING = 1;
   SEPARATE_CHAINING = 2;
   SIMPLE_SCALAR_SEPARATE_CHAINING = 3;
-  THREAD_PRIVATE_NUMERIC = 4;
+  THREAD_PRIVATE_COMPACT_KEY = 4;
 }
 
 // NOTE(chasseur): This proto describes the run-time parameters for a resizable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index c3cbddf..6982a3f 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -45,7 +45,7 @@ enum class HashTableImplType {
   kLinearOpenAddressing,
   kSeparateChaining,
   kSimpleScalarSeparateChaining,
-  kThreadPrivateNumeric
+  kThreadPrivateCompactKey
 };
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 52f4d5f..cb1f16f 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -32,7 +32,7 @@
 #include "storage/PackedPayloadHashTable.hpp"
 #include "storage/SeparateChainingHashTable.hpp"
 #include "storage/SimpleScalarSeparateChainingHashTable.hpp"
-#include "storage/ThreadPrivateNumericHashTable.hpp"
+#include "storage/ThreadPrivateCompactKeyHashTable.hpp"
 #include "storage/TupleReference.hpp"
 #include "types/TypeFactory.hpp"
 #include "utility/BloomFilter.hpp"
@@ -124,8 +124,8 @@ inline HashTableImplType HashTableImplTypeFromProto(
       return HashTableImplType::kSeparateChaining;
     case serialization::HashTableImplType::SIMPLE_SCALAR_SEPARATE_CHAINING:
       return HashTableImplType::kSimpleScalarSeparateChaining;
-    case serialization::HashTableImplType::THREAD_PRIVATE_NUMERIC:
-      return HashTableImplType::kThreadPrivateNumeric;
+    case serialization::HashTableImplType::THREAD_PRIVATE_COMPACT_KEY:
+      return HashTableImplType::kThreadPrivateCompactKey;
     default: {
       LOG(FATAL) << "Unrecognized serialization::HashTableImplType\n";
     }
@@ -372,8 +372,8 @@ class AggregationStateHashTableFactory {
       case HashTableImplType::kSeparateChaining:
         return new PackedPayloadHashTable(
             key_types, num_entries, handles, storage_manager);
-      case HashTableImplType::kThreadPrivateNumeric:
-        return new ThreadPrivateNumericHashTable(
+      case HashTableImplType::kThreadPrivateCompactKey:
+        return new ThreadPrivateCompactKeyHashTable(
             key_types, num_entries, handles, storage_manager);
       default: {
         LOG(FATAL) << "Unrecognized HashTableImplType in "

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/ThreadPrivateCompactKeyHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateCompactKeyHashTable.cpp 
b/storage/ThreadPrivateCompactKeyHashTable.cpp
new file mode 100644
index 0000000..226188b
--- /dev/null
+++ b/storage/ThreadPrivateCompactKeyHashTable.cpp
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/ThreadPrivateCompactKeyHashTable.hpp"
+
+#include <cstddef>
+#include <cstdint>
+
+#include "expressions/aggregation/AggregationID.hpp"
+#include "types/TypeID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+
+
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/ThreadPrivateCompactKeyHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateCompactKeyHashTable.hpp 
b/storage/ThreadPrivateCompactKeyHashTable.hpp
new file mode 100644
index 0000000..a22fdab
--- /dev/null
+++ b/storage/ThreadPrivateCompactKeyHashTable.hpp
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/Macros.hpp"
+#include "utility/ScopedBuffer.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ThreadPrivateCompactKeyHashTable : public AggregationStateHashTableBase {
+ public:
+  ThreadPrivateCompactKeyHashTable(
+      const std::vector<const Type *> &key_types,
+      const std::size_t num_entries,
+      const std::vector<AggregationHandle *> &handles,
+      StorageManager *storage_manager)
+      : key_types_(key_types),
+        handles_(handles),
+        bucket_size_(0),
+        num_buckets_(num_entries),
+        buckets_allocated_(0) {
+    for (const Type *key_type : key_types) {
+      DCHECK(!key_type->isVariableLength());
+
+      const std::size_t key_size = key_type->maximumByteLength();
+      DCHECK(key_size == 1u || key_size == 2u || key_size == 4u || key_size == 
8u);
+
+      key_sizes_.emplace_back(key_size);
+    }
+
+    for (const AggregationHandle *handle : handles) {
+      state_offsets_.emplace_back(bucket_size_);
+
+      const std::vector<const Type*> arg_types = handle->getArgumentTypes();
+      DCHECK_LE(arg_types.size(), 1u);
+
+      std::size_t state_size = 0;
+      switch (handle->getAggregationID()) {
+        case AggregationID::kCount: {
+          state_size = sizeof(std::int64_t);
+          break;
+        }
+        case AggregationID::kSum: {
+          DCHECK_EQ(1u, arg_types.size());
+          switch (arg_types.front()->getTypeID()) {
+            case TypeID::kInt:  // Fall through
+            case TypeID::kLong:
+              state_size = sizeof(std::int64_t);
+              break;
+            case TypeID::kFloat:  // Fall through
+            case TypeID::kDouble:
+              state_size = sizeof(double);
+              break;
+            default:
+              LOG(FATAL) << "Not implemented";
+          }
+          break;
+        }
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+      bucket_size_ += state_size;
+    }
+
+    keys_.reset(sizeof(std::uint64_t) * num_buckets_);
+    buckets_.reset(bucket_size_ * num_buckets_);
+  }
+
+  ~ThreadPrivateCompactKeyHashTable() override {}
+
+  HashTableImplType getImplType() const override {
+    return HashTableImplType::kThreadPrivateCompactKey;
+  }
+
+  void destroyPayload() override {}
+
+  std::size_t getMemoryConsumptionBytes() const override {
+    return num_buckets_ * (bucket_size_ + sizeof(std::uint64_t));
+  }
+
+  inline std::size_t numEntries() const {
+    return buckets_allocated_;
+  }
+
+  bool upsertValueAccessorCompositeKey(
+      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_attr_ids,
+      const ValueAccessorMultiplexer &accessor_mux) override {
+    ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+    ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
+
+    DCHECK(base_accessor != nullptr);
+    const std::size_t num_tuples = base_accessor->getNumTuplesVirtual();
+
+    ScopedBuffer buffer(sizeof(std::uint64_t) * num_tuples);
+    std::uint64_t *key_codes = static_cast<std::uint64_t*>(buffer.get());
+    std::size_t key_code_offset = 0;
+    for (std::size_t i = 0; i < key_attr_ids.size(); ++i) {
+      const auto &key_attr_id = key_attr_ids[i];
+      ValueAccessor *accessor =
+          key_attr_id.source == ValueAccessorSource::kBase
+              ? base_accessor
+              : derived_accessor;
+      DCHECK(accessor != nullptr);
+
+      const std::size_t key_size = key_sizes_[i];
+      switch (key_size) {
+        case 1u:
+          ConstructKeyCode<std::uint8_t>(
+              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+          break;
+        case 2u:
+          ConstructKeyCode<std::uint16_t>(
+              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+          break;
+        case 4u:
+          ConstructKeyCode<std::uint32_t>(
+              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+          break;
+        case 8u:
+          ConstructKeyCode<std::uint64_t>(
+              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+          break;
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+
+      key_code_offset += key_size;
+    }
+
+    std::vector<BucketIndex> bucket_indices;
+    bucket_indices.reserve(num_tuples);
+    std::uint64_t *keys = static_cast<std::uint64_t*>(keys_.get());
+    for (std::size_t i = 0; i < num_tuples; ++i) {
+      const std::size_t code = key_codes[i];
+      const auto index_it = index_.find(code);
+      if (index_it == index_.end()) {
+        // TODO: Resize if overflow
+        index_.emplace(code, buckets_allocated_);
+        bucket_indices.emplace_back(buckets_allocated_);
+        keys[buckets_allocated_] = code;
+        ++buckets_allocated_;
+      } else {
+        bucket_indices.emplace_back(index_it->second);
+      }
+    }
+
+    // Dispatch
+    for (std::size_t i = 0; i < handles_.size(); ++i) {
+      const AggregationHandle *handle = handles_[i];
+      switch (handle->getAggregationID()) {
+        case AggregationID::kCount: {
+          upsertValueAccessorCount(bucket_indices, state_offsets_[i]);
+          break;
+        }
+        case AggregationID::kSum: {
+          DCHECK_EQ(1u, argument_ids[i].size());
+          const auto &argument_id = argument_ids[i].front();
+          ValueAccessor *accessor =
+              argument_id.source == ValueAccessorSource::kBase
+                  ? base_accessor
+                  : derived_accessor;
+          DCHECK(accessor != nullptr);
+
+          DCHECK_EQ(1u, handle->getArgumentTypes().size());
+          const Type *argument_type = handle->getArgumentTypes().front();
+          switch (argument_type->getTypeID()) {
+            case kInt: {
+              upsertValueAccessorSum<int, std::int64_t>(
+                  bucket_indices, state_offsets_[i], argument_id.attr_id, 
accessor);
+              break;
+            }
+            case kLong: {
+              upsertValueAccessorSum<std::int64_t, std::int64_t>(
+                  bucket_indices, state_offsets_[i], argument_id.attr_id, 
accessor);
+              break;
+            }
+            case kFloat: {
+              upsertValueAccessorSum<float, double>(
+                  bucket_indices, state_offsets_[i], argument_id.attr_id, 
accessor);
+              break;
+            }
+            case kDouble: {
+              upsertValueAccessorSum<double, double>(
+                  bucket_indices, state_offsets_[i], argument_id.attr_id, 
accessor);
+              break;
+            }
+            default:
+              LOG(FATAL) << "Not implemented";
+          }
+          break;
+        }
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+    }
+
+    return true;
+  }
+
+  void merge(const ThreadPrivateCompactKeyHashTable *other) {
+    std::vector<BucketIndex> dst_bucket_indices;
+    std::uint64_t *dst_keys = static_cast<std::uint64_t*>(keys_.get());
+
+    const char *src_buckets_start =
+        static_cast<const char*>(other->buckets_.get());
+    const std::uint64_t *src_keys =
+        static_cast<const std::uint64_t*>(other->keys_.get());
+
+    for (std::size_t i = 0; i < other->buckets_allocated_; ++i) {
+      const std::uint64_t code = src_keys[i];
+      const auto index_it = index_.find(code);
+
+      if (index_it == index_.end()) {
+        // TODO: Resize if overflow
+        index_.emplace(code, buckets_allocated_);
+        dst_bucket_indices.emplace_back(buckets_allocated_);
+        dst_keys[buckets_allocated_] = code;
+        ++buckets_allocated_;
+      } else {
+        dst_bucket_indices.emplace_back(index_it->second);
+      }
+    }
+
+    // Dispatch
+    for (std::size_t i = 0; i < handles_.size(); ++i) {
+      const AggregationHandle *handle = handles_[i];
+      switch (handle->getAggregationID()) {
+        case AggregationID::kCount: {
+          mergeStateSum<std::int64_t>(
+              dst_bucket_indices, src_buckets_start, state_offsets_[i]);
+          break;
+        }
+        case AggregationID::kSum: {
+          const Type *argument_type = handle->getArgumentTypes().front();
+          switch (argument_type->getTypeID()) {
+            case kInt:  // Fall through
+            case kLong: {
+              mergeStateSum<std::int64_t>(
+                  dst_bucket_indices, src_buckets_start, state_offsets_[i]);
+              break;
+            }
+            case kFloat:  // Fall through
+            case kDouble: {
+              mergeStateSum<double>(
+                  dst_bucket_indices, src_buckets_start, state_offsets_[i]);
+              break;
+            }
+            default:
+              LOG(FATAL) << "Not implemented";
+          }
+          break;
+        }
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+    }
+  }
+
+  void print() const {
+    std::cout << "num_entries = " << buckets_allocated_ << "\n";
+    const double *values = static_cast<const double*>(buckets_.get());
+    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+      std::cout << values[i] << "\n";
+    }
+  }
+
+  void finalize(ColumnVectorsValueAccessor *output) const {
+    std::size_t key_offset = 0;
+    for (std::size_t i = 0; i < key_types_.size(); ++i) {
+      const Type &key_type = *key_types_[i];
+      std::unique_ptr<NativeColumnVector> native_cv(
+          new NativeColumnVector(key_type, buckets_allocated_));
+
+      const std::size_t key_size = key_sizes_[i];
+      switch (key_size) {
+        case 1u:
+          finalizeKey<std::uint8_t>(key_offset, native_cv.get());
+          break;
+        case 2u:
+          finalizeKey<std::uint16_t>(key_offset, native_cv.get());
+          break;
+        case 4u:
+          finalizeKey<std::uint32_t>(key_offset, native_cv.get());
+          break;
+        case 8u:
+          finalizeKey<std::uint64_t>(key_offset, native_cv.get());
+          break;
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+      output->addColumn(native_cv.release());
+
+      key_offset += key_size;
+    }
+
+    // Dispatch
+    for (std::size_t i = 0; i < handles_.size(); ++i) {
+      const AggregationHandle *handle = handles_[i];
+      const Type &result_type = *handle->getResultType();
+      std::unique_ptr<NativeColumnVector> native_cv(
+          new NativeColumnVector(result_type, buckets_allocated_));
+
+      switch (handle->getAggregationID()) {
+        case AggregationID::kCount: {
+          finalizeStateSum<std::int64_t, std::int64_t>(
+              state_offsets_[i], native_cv.get());
+          break;
+        }
+        case AggregationID::kSum: {
+          const Type *argument_type = handle->getArgumentTypes().front();
+          switch (argument_type->getTypeID()) {
+            case kInt:  // Fall through
+            case kLong: {
+              finalizeStateSum<std::int64_t, std::int64_t>(
+                  state_offsets_[i], native_cv.get());
+              break;
+            }
+            case kFloat:  // Fall through
+            case kDouble: {
+              finalizeStateSum<double, double>(
+                  state_offsets_[i], native_cv.get());
+              break;
+            }
+            default:
+              LOG(FATAL) << "Not implemented";
+          }
+          break;
+        }
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+      output->addColumn(native_cv.release());
+    }
+  }
+
+ private:
+  using BucketIndex = std::uint32_t;
+
+  template <typename KeyT>
+  inline static void ConstructKeyCode(const std::size_t offset,
+                                      const attribute_id attr_id,
+                                      ValueAccessor *accessor,
+                                      void *key_code_start) {
+    InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      char *key_code_ptr = static_cast<char*>(key_code_start) + offset;
+      accessor->beginIteration();
+      while (accessor->next()) {
+        *reinterpret_cast<KeyT*>(key_code_ptr) =
+            *static_cast<const KeyT*>(
+                accessor->template getUntypedValue<false>(attr_id));
+        key_code_ptr += sizeof(std::uint64_t);
+      }
+    });
+  }
+
+  inline void upsertValueAccessorCount(const std::vector<BucketIndex> 
&bucket_indices,
+                                       const std::size_t state_offset) {
+    char *state_start = static_cast<char*>(buckets_.get()) + state_offset;
+    for (const BucketIndex idx : bucket_indices) {
+      char *state_ptr = state_start + bucket_size_ * idx;
+      *reinterpret_cast<std::int64_t*>(state_ptr) += 1;
+    }
+  }
+
+  template <typename ArgumentT, typename StateT>
+  inline void upsertValueAccessorSum(const std::vector<BucketIndex> 
&bucket_indices,
+                                     const std::size_t state_offset,
+                                     const attribute_id attr_id,
+                                     ValueAccessor *accessor) {
+    InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      accessor->beginIteration();
+
+      char *state_start = static_cast<char*>(buckets_.get()) + state_offset;
+      std::size_t loc = 0;
+      while (accessor->next()) {
+        char *state_ptr = state_start + bucket_size_ * bucket_indices[loc];
+        *reinterpret_cast<StateT*>(state_ptr) +=
+            *static_cast<const ArgumentT*>(
+                accessor->template getUntypedValue<false>(attr_id));
+        ++loc;
+      }
+    });
+  }
+
+  template <typename StateT>
+  inline void mergeStateSum(const std::vector<BucketIndex> &dst_bucket_indices,
+                            const void *src_buckets_start,
+                            const std::size_t state_offset) {
+    char *dst_state_start = static_cast<char*>(buckets_.get()) + state_offset;
+    const char* src_state_start =
+        static_cast<const char*>(src_buckets_start) + state_offset;
+    for (std::size_t i = 0; i < dst_bucket_indices.size(); ++i) {
+      char *dst_state_ptr = dst_state_start + bucket_size_ * 
dst_bucket_indices[i];
+      const char *src_state_ptr = src_state_start + bucket_size_ * i;
+      *reinterpret_cast<StateT*>(dst_state_ptr) +=
+          *reinterpret_cast<const StateT*>(src_state_ptr);
+    }
+  }
+
+  template <typename KeyT>
+  inline void finalizeKey(const std::size_t offset,
+                          NativeColumnVector *output_cv) const {
+    const char *key_ptr = static_cast<const char*>(keys_.get()) + offset;
+    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+      *static_cast<KeyT*>(output_cv->getPtrForDirectWrite()) =
+          *reinterpret_cast<const KeyT*>(key_ptr);
+      key_ptr += sizeof(std::uint64_t);
+    }
+  }
+
+  template <typename StateT, typename ResultT>
+  inline void finalizeStateSum(const std::size_t state_offset,
+                               NativeColumnVector *output_cv) const {
+    char *state_ptr = static_cast<char*>(buckets_.get()) + state_offset;
+    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+      *static_cast<ResultT*>(output_cv->getPtrForDirectWrite()) =
+          *reinterpret_cast<const StateT*>(state_ptr);
+      state_ptr += bucket_size_;
+    }
+  }
+
+  const std::vector<const Type*> key_types_;
+  const std::vector<AggregationHandle *> handles_;
+
+  std::vector<std::size_t> key_sizes_;
+  std::vector<std::size_t> state_offsets_;
+  std::size_t bucket_size_;
+
+  std::unordered_map<std::uint64_t, BucketIndex> index_;
+
+  std::size_t num_buckets_;
+  std::size_t buckets_allocated_;
+
+  ScopedBuffer keys_;
+  ScopedBuffer buckets_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadPrivateCompactKeyHashTable);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/ThreadPrivateNumericHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateNumericHashTable.cpp 
b/storage/ThreadPrivateNumericHashTable.cpp
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/ThreadPrivateNumericHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateNumericHashTable.hpp 
b/storage/ThreadPrivateNumericHashTable.hpp
deleted file mode 100644
index 2991900..0000000
--- a/storage/ThreadPrivateNumericHashTable.hpp
+++ /dev/null
@@ -1,483 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_STORAGE_THREAD_PRIVATE_NUMERIC_HASH_TABLE_HPP_
-#define QUICKSTEP_STORAGE_THREAD_PRIVATE_NUMERIC_HASH_TABLE_HPP_
-
-#include <algorithm>
-#include <cstddef>
-#include <cstdint>
-#include <unordered_map>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
-#include "storage/HashTableBase.hpp"
-#include "storage/ValueAccessorMultiplexer.hpp"
-#include "storage/ValueAccessorUtil.hpp"
-#include "types/Type.hpp"
-#include "types/TypeID.hpp"
-#include "types/containers/ColumnVector.hpp"
-#include "types/containers/ColumnVectorsValueAccessor.hpp"
-#include "utility/Macros.hpp"
-#include "utility/ScopedBuffer.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class ThreadPrivateNumericHashTable : public AggregationStateHashTableBase {
- public:
-  ThreadPrivateNumericHashTable(
-      const std::vector<const Type *> &key_types,
-      const std::size_t num_entries,
-      const std::vector<AggregationHandle *> &handles,
-      StorageManager *storage_manager)
-      : key_types_(key_types),
-        handles_(handles),
-        bucket_size_(0),
-        num_buckets_(num_entries),
-        buckets_allocated_(0) {
-    for (const Type *key_type : key_types) {
-      DCHECK(!key_type->isVariableLength());
-
-      const std::size_t key_size = key_type->maximumByteLength();
-      DCHECK(key_size == 1u || key_size == 2u || key_size == 4u || key_size == 
8u);
-
-      key_sizes_.emplace_back(key_size);
-    }
-
-    for (const AggregationHandle *handle : handles) {
-      state_offsets_.emplace_back(bucket_size_);
-
-      const std::vector<const Type*> arg_types = handle->getArgumentTypes();
-      DCHECK_LE(arg_types.size(), 1u);
-
-      std::size_t state_size = 0;
-      switch (handle->getAggregationID()) {
-        case AggregationID::kCount: {
-          state_size = sizeof(std::int64_t);
-          break;
-        }
-        case AggregationID::kSum: {
-          DCHECK_EQ(1u, arg_types.size());
-          switch (arg_types.front()->getTypeID()) {
-            case TypeID::kInt:  // Fall through
-            case TypeID::kLong:
-              state_size = sizeof(std::int64_t);
-              break;
-            case TypeID::kFloat:  // Fall through
-            case TypeID::kDouble:
-              state_size = sizeof(double);
-              break;
-            default:
-              LOG(FATAL) << "Not implemented";
-          }
-          break;
-        }
-        default:
-          LOG(FATAL) << "Not implemented";
-      }
-      bucket_size_ += state_size;
-    }
-
-    keys_.reset(sizeof(std::uint64_t) * num_buckets_);
-    buckets_.reset(bucket_size_ * num_buckets_);
-  }
-
-  ~ThreadPrivateNumericHashTable() override {}
-
-  HashTableImplType getImplType() const override {
-    return HashTableImplType::kThreadPrivateNumeric;
-  }
-
-  void destroyPayload() override {}
-
-  std::size_t getMemoryConsumptionBytes() const override {
-    return num_buckets_ * (bucket_size_ + sizeof(std::uint64_t));
-  }
-
-  inline std::size_t numEntries() const {
-    return buckets_allocated_;
-  }
-
-  bool upsertValueAccessorCompositeKey(
-      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
-      const std::vector<MultiSourceAttributeId> &key_attr_ids,
-      const ValueAccessorMultiplexer &accessor_mux) override {
-    ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
-    ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
-
-    DCHECK(base_accessor != nullptr);
-    const std::size_t num_tuples = base_accessor->getNumTuplesVirtual();
-
-    ScopedBuffer buffer(sizeof(std::uint64_t) * num_tuples);
-    std::uint64_t *key_codes = static_cast<std::uint64_t*>(buffer.get());
-    std::size_t key_code_offset = 0;
-    for (std::size_t i = 0; i < key_attr_ids.size(); ++i) {
-      const auto &key_attr_id = key_attr_ids[i];
-      ValueAccessor *accessor =
-          key_attr_id.source == ValueAccessorSource::kBase
-              ? base_accessor
-              : derived_accessor;
-      DCHECK(accessor != nullptr);
-
-      const std::size_t key_size = key_sizes_[i];
-      switch (key_size) {
-        case 1u:
-          ConstructKeyCode<std::uint8_t>(
-              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
-          break;
-        case 2u:
-          ConstructKeyCode<std::uint16_t>(
-              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
-          break;
-        case 4u:
-          ConstructKeyCode<std::uint32_t>(
-              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
-          break;
-        case 8u:
-          ConstructKeyCode<std::uint64_t>(
-              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
-          break;
-        default:
-          LOG(FATAL) << "Not implemented";
-      }
-
-      key_code_offset += key_size;
-    }
-
-    std::vector<BucketIndex> bucket_indices;
-    bucket_indices.reserve(num_tuples);
-    std::uint64_t *keys = static_cast<std::uint64_t*>(keys_.get());
-    for (std::size_t i = 0; i < num_tuples; ++i) {
-      const std::size_t code = key_codes[i];
-      const auto index_it = index_.find(code);
-      if (index_it == index_.end()) {
-        // TODO: Resize if overflow
-        index_.emplace(code, buckets_allocated_);
-        bucket_indices.emplace_back(buckets_allocated_);
-        keys[buckets_allocated_] = code;
-        ++buckets_allocated_;
-      } else {
-        bucket_indices.emplace_back(index_it->second);
-      }
-    }
-
-    // Dispatch
-    for (std::size_t i = 0; i < handles_.size(); ++i) {
-      const AggregationHandle *handle = handles_[i];
-      switch (handle->getAggregationID()) {
-        case AggregationID::kCount: {
-          upsertValueAccessorCount(bucket_indices, state_offsets_[i]);
-          break;
-        }
-        case AggregationID::kSum: {
-          DCHECK_EQ(1u, argument_ids[i].size());
-          const auto &argument_id = argument_ids[i].front();
-          ValueAccessor *accessor =
-              argument_id.source == ValueAccessorSource::kBase
-                  ? base_accessor
-                  : derived_accessor;
-          DCHECK(accessor != nullptr);
-
-          DCHECK_EQ(1u, handle->getArgumentTypes().size());
-          const Type *argument_type = handle->getArgumentTypes().front();
-          switch (argument_type->getTypeID()) {
-            case kInt: {
-              upsertValueAccessorSum<int, std::int64_t>(
-                  bucket_indices, state_offsets_[i], argument_id.attr_id, 
accessor);
-              break;
-            }
-            case kLong: {
-              upsertValueAccessorSum<std::int64_t, std::int64_t>(
-                  bucket_indices, state_offsets_[i], argument_id.attr_id, 
accessor);
-              break;
-            }
-            case kFloat: {
-              upsertValueAccessorSum<float, double>(
-                  bucket_indices, state_offsets_[i], argument_id.attr_id, 
accessor);
-              break;
-            }
-            case kDouble: {
-              upsertValueAccessorSum<double, double>(
-                  bucket_indices, state_offsets_[i], argument_id.attr_id, 
accessor);
-              break;
-            }
-            default:
-              LOG(FATAL) << "Not implemented";
-          }
-          break;
-        }
-        default:
-          LOG(FATAL) << "Not implemented";
-      }
-    }
-
-    return true;
-  }
-
-  void merge(const ThreadPrivateNumericHashTable *other) {
-    std::vector<BucketIndex> dst_bucket_indices;
-    std::uint64_t *dst_keys = static_cast<std::uint64_t*>(keys_.get());
-
-    const char *src_buckets_start =
-        static_cast<const char*>(other->buckets_.get());
-    const std::uint64_t *src_keys =
-        static_cast<const std::uint64_t*>(other->keys_.get());
-
-    for (std::size_t i = 0; i < other->buckets_allocated_; ++i) {
-      const std::uint64_t code = src_keys[i];
-      const auto index_it = index_.find(code);
-
-      if (index_it == index_.end()) {
-        // TODO: Resize if overflow
-        index_.emplace(code, buckets_allocated_);
-        dst_bucket_indices.emplace_back(buckets_allocated_);
-        dst_keys[buckets_allocated_] = code;
-        ++buckets_allocated_;
-      } else {
-        dst_bucket_indices.emplace_back(index_it->second);
-      }
-    }
-
-    // Dispatch
-    for (std::size_t i = 0; i < handles_.size(); ++i) {
-      const AggregationHandle *handle = handles_[i];
-      switch (handle->getAggregationID()) {
-        case AggregationID::kCount: {
-          mergeStateSum<std::int64_t>(
-              dst_bucket_indices, src_buckets_start, state_offsets_[i]);
-          break;
-        }
-        case AggregationID::kSum: {
-          const Type *argument_type = handle->getArgumentTypes().front();
-          switch (argument_type->getTypeID()) {
-            case kInt:  // Fall through
-            case kLong: {
-              mergeStateSum<std::int64_t>(
-                  dst_bucket_indices, src_buckets_start, state_offsets_[i]);
-              break;
-            }
-            case kFloat:  // Fall through
-            case kDouble: {
-              mergeStateSum<double>(
-                  dst_bucket_indices, src_buckets_start, state_offsets_[i]);
-              break;
-            }
-            default:
-              LOG(FATAL) << "Not implemented";
-          }
-          break;
-        }
-        default:
-          LOG(FATAL) << "Not implemented";
-      }
-    }
-  }
-
-  void print() const {
-    std::cout << "num_entries = " << buckets_allocated_ << "\n";
-    const double *values = static_cast<const double*>(buckets_.get());
-    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
-      std::cout << values[i] << "\n";
-    }
-  }
-
-  void finalize(ColumnVectorsValueAccessor *output) const {
-    std::size_t key_offset = 0;
-    for (std::size_t i = 0; i < key_types_.size(); ++i) {
-      const Type &key_type = *key_types_[i];
-      std::unique_ptr<NativeColumnVector> native_cv(
-          new NativeColumnVector(key_type, buckets_allocated_));
-
-      const std::size_t key_size = key_sizes_[i];
-      switch (key_size) {
-        case 1u:
-          finalizeKey<std::uint8_t>(key_offset, native_cv.get());
-          break;
-        case 2u:
-          finalizeKey<std::uint16_t>(key_offset, native_cv.get());
-          break;
-        case 4u:
-          finalizeKey<std::uint32_t>(key_offset, native_cv.get());
-          break;
-        case 8u:
-          finalizeKey<std::uint64_t>(key_offset, native_cv.get());
-          break;
-        default:
-          LOG(FATAL) << "Not implemented";
-      }
-      output->addColumn(native_cv.release());
-
-      key_offset += key_size;
-    }
-
-    // Dispatch
-    for (std::size_t i = 0; i < handles_.size(); ++i) {
-      const AggregationHandle *handle = handles_[i];
-      const Type &result_type = *handle->getResultType();
-      std::unique_ptr<NativeColumnVector> native_cv(
-          new NativeColumnVector(result_type, buckets_allocated_));
-
-      switch (handle->getAggregationID()) {
-        case AggregationID::kCount: {
-          finalizeStateSum<std::int64_t, std::int64_t>(
-              state_offsets_[i], native_cv.get());
-          break;
-        }
-        case AggregationID::kSum: {
-          const Type *argument_type = handle->getArgumentTypes().front();
-          switch (argument_type->getTypeID()) {
-            case kInt:  // Fall through
-            case kLong: {
-              finalizeStateSum<std::int64_t, std::int64_t>(
-                  state_offsets_[i], native_cv.get());
-              break;
-            }
-            case kFloat:  // Fall through
-            case kDouble: {
-              finalizeStateSum<double, double>(
-                  state_offsets_[i], native_cv.get());
-              break;
-            }
-            default:
-              LOG(FATAL) << "Not implemented";
-          }
-          break;
-        }
-        default:
-          LOG(FATAL) << "Not implemented";
-      }
-      output->addColumn(native_cv.release());
-    }
-  }
-
- private:
-  using BucketIndex = std::uint32_t;
-
-  template <typename KeyT>
-  inline static void ConstructKeyCode(const std::size_t offset,
-                                      const attribute_id attr_id,
-                                      ValueAccessor *accessor,
-                                      void *key_code_start) {
-    InvokeOnAnyValueAccessor(
-        accessor,
-        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-      char *key_code_ptr = static_cast<char*>(key_code_start) + offset;
-      accessor->beginIteration();
-      while (accessor->next()) {
-        *reinterpret_cast<KeyT*>(key_code_ptr) =
-            *static_cast<const KeyT*>(
-                accessor->template getUntypedValue<false>(attr_id));
-        key_code_ptr += sizeof(std::uint64_t);
-      }
-    });
-  }
-
-  inline void upsertValueAccessorCount(const std::vector<BucketIndex> 
&bucket_indices,
-                                       const std::size_t state_offset) {
-    char *state_start = static_cast<char*>(buckets_.get()) + state_offset;
-    for (const BucketIndex idx : bucket_indices) {
-      char *state_ptr = state_start + bucket_size_ * idx;
-      *reinterpret_cast<std::int64_t*>(state_ptr) += 1;
-    }
-  }
-
-  template <typename ArgumentT, typename StateT>
-  inline void upsertValueAccessorSum(const std::vector<BucketIndex> 
&bucket_indices,
-                                     const std::size_t state_offset,
-                                     const attribute_id attr_id,
-                                     ValueAccessor *accessor) {
-    InvokeOnAnyValueAccessor(
-        accessor,
-        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-      accessor->beginIteration();
-
-      char *state_start = static_cast<char*>(buckets_.get()) + state_offset;
-      std::size_t loc = 0;
-      while (accessor->next()) {
-        char *state_ptr = state_start + bucket_size_ * bucket_indices[loc];
-        *reinterpret_cast<StateT*>(state_ptr) +=
-            *static_cast<const ArgumentT*>(
-                accessor->template getUntypedValue<false>(attr_id));
-        ++loc;
-      }
-    });
-  }
-
-  template <typename StateT>
-  inline void mergeStateSum(const std::vector<BucketIndex> &dst_bucket_indices,
-                            const void *src_buckets_start,
-                            const std::size_t state_offset) {
-    char *dst_state_start = static_cast<char*>(buckets_.get()) + state_offset;
-    const char* src_state_start =
-        static_cast<const char*>(src_buckets_start) + state_offset;
-    for (std::size_t i = 0; i < dst_bucket_indices.size(); ++i) {
-      char *dst_state_ptr = dst_state_start + bucket_size_ * 
dst_bucket_indices[i];
-      const char *src_state_ptr = src_state_start + bucket_size_ * i;
-      *reinterpret_cast<StateT*>(dst_state_ptr) +=
-          *reinterpret_cast<const StateT*>(src_state_ptr);
-    }
-  }
-
-  template <typename KeyT>
-  inline void finalizeKey(const std::size_t offset,
-                          NativeColumnVector *output_cv) const {
-    const char *key_ptr = static_cast<const char*>(keys_.get()) + offset;
-    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
-      *static_cast<KeyT*>(output_cv->getPtrForDirectWrite()) =
-          *reinterpret_cast<const KeyT*>(key_ptr);
-      key_ptr += sizeof(std::uint64_t);
-    }
-  }
-
-  template <typename StateT, typename ResultT>
-  inline void finalizeStateSum(const std::size_t state_offset,
-                               NativeColumnVector *output_cv) const {
-    char *state_ptr = static_cast<char*>(buckets_.get()) + state_offset;
-    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
-      *static_cast<ResultT*>(output_cv->getPtrForDirectWrite()) =
-          *reinterpret_cast<const StateT*>(state_ptr);
-      state_ptr += bucket_size_;
-    }
-  }
-
-  const std::vector<const Type*> key_types_;
-  const std::vector<AggregationHandle *> handles_;
-
-  std::vector<std::size_t> key_sizes_;
-  std::vector<std::size_t> state_offsets_;
-  std::size_t bucket_size_;
-
-  std::unordered_map<std::uint64_t, BucketIndex> index_;
-
-  std::size_t num_buckets_;
-  std::size_t buckets_allocated_;
-
-  ScopedBuffer keys_;
-  ScopedBuffer buckets_;
-
-  DISALLOW_COPY_AND_ASSIGN(ThreadPrivateNumericHashTable);
-};
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_STORAGE_THREAD_PRIVATE_NUMERIC_HASH_TABLE_HPP_

Reply via email to