http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp 
b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
index 85c3bf3..6e6d188 100644
--- a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
@@ -31,6 +31,8 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationHandleMin.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
+#include "storage/AggregationOperationState.hpp"
+#include "storage/FastHashTableFactory.hpp"
 #include "storage/StorageManager.hpp"
 #include "types/CharType.hpp"
 #include "types/DatetimeIntervalType.hpp"
@@ -69,54 +71,59 @@ class AggregationHandleMinTest : public ::testing::Test {
   // Helper method that calls AggregationHandleMin::iterateUnaryInl() to
   // aggregate 'value' into '*state'.
   void iterateHandle(AggregationState *state, const TypedValue &value) {
-    static_cast<const 
AggregationHandleMin&>(*aggregation_handle_min_).iterateUnaryInl(
-        static_cast<AggregationStateMin*>(state),
-        value);
+    static_cast<const AggregationHandleMin &>(*aggregation_handle_min_)
+        .iterateUnaryInl(static_cast<AggregationStateMin *>(state), value);
   }
 
   void initializeHandle(const Type &type) {
     aggregation_handle_min_.reset(
-        AggregateFunctionFactory::Get(AggregationID::kMin).createHandle(
-            std::vector<const Type*>(1, &type)));
+        AggregateFunctionFactory::Get(AggregationID::kMin)
+            .createHandle(std::vector<const Type *>(1, &type)));
     aggregation_handle_min_state_.reset(
         aggregation_handle_min_->createInitialState());
   }
 
   static bool ApplyToTypesTest(TypeID typeID) {
-    const Type &type = (typeID == kChar || typeID == kVarChar) ?
-        TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) :
-        TypeFactory::GetType(typeID);
+    const Type &type =
+        (typeID == kChar || typeID == kVarChar)
+            ? TypeFactory::GetType(typeID, static_cast<std::size_t>(10))
+            : TypeFactory::GetType(typeID);
 
-    return AggregateFunctionFactory::Get(AggregationID::kMin).canApplyToTypes(
-        std::vector<const Type*>(1, &type));
+    return AggregateFunctionFactory::Get(AggregationID::kMin)
+        .canApplyToTypes(std::vector<const Type *>(1, &type));
   }
 
   static bool ResultTypeForArgumentTypeTest(TypeID input_type_id,
                                             TypeID output_type_id) {
-    const Type *result_type
-        = 
AggregateFunctionFactory::Get(AggregationID::kMin).resultTypeForArgumentTypes(
-            std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id)));
+    const Type *result_type =
+        AggregateFunctionFactory::Get(AggregationID::kMin)
+            .resultTypeForArgumentTypes(std::vector<const Type *>(
+                1, &TypeFactory::GetType(input_type_id)));
     return (result_type->getTypeID() == output_type_id);
   }
 
   template <typename CppType>
-  static void CheckMinValue(
-      CppType expected,
-      const AggregationHandle &handle,
-      const AggregationState &state) {
+  static void CheckMinValue(CppType expected,
+                            const AggregationHandle &handle,
+                            const AggregationState &state) {
     EXPECT_EQ(expected, handle.finalize(state).getLiteral<CppType>());
   }
 
-  static void CheckMinString(
-      const std::string &expected,
-      const AggregationHandle &handle,
-      const AggregationState &state) {
+  template <typename CppType>
+  static void CheckMinValue(CppType expected, const TypedValue &value) {
+    EXPECT_EQ(expected, value.getLiteral<CppType>());
+  }
+
+  static void CheckMinString(const std::string &expected,
+                             const AggregationHandle &handle,
+                             const AggregationState &state) {
     TypedValue value = handle.finalize(state);
 
     ASSERT_EQ(expected.length(), value.getAsciiStringLength());
-    EXPECT_EQ(0, std::strncmp(expected.c_str(),
-                              static_cast <const char *>(value.getDataPtr()),
-                              value.getAsciiStringLength()));
+    EXPECT_EQ(0,
+              std::strncmp(expected.c_str(),
+                           static_cast<const char *>(value.getDataPtr()),
+                           value.getAsciiStringLength()));
   }
 
   // Static templated method to initialize data types.
@@ -129,7 +136,9 @@ class AggregationHandleMinTest : public ::testing::Test {
   void checkAggregationMinGeneric() {
     const GenericType &type = GenericType::Instance(true);
     initializeHandle(type);
-    
EXPECT_TRUE(aggregation_handle_min_->finalize(*aggregation_handle_min_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_min_->finalize(*aggregation_handle_min_state_)
+            .isNull());
 
     typename GenericType::cpptype val;
     typename GenericType::cpptype min;
@@ -141,16 +150,18 @@ class AggregationHandleMinTest : public ::testing::Test {
         if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
           SetDataType(i * kNumSamples + j - 10, &val);
         } else {
-          SetDataType(static_cast<float>(i * kNumSamples + j - 10)/10, &val);
+          SetDataType(static_cast<float>(i * kNumSamples + j - 10) / 10, &val);
         }
-        iterateHandle(aggregation_handle_min_state_.get(), 
type.makeValue(&val));
+        iterateHandle(aggregation_handle_min_state_.get(),
+                      type.makeValue(&val));
         if (min > val) {
           min = val;
         }
       }
     }
     iterateHandle(aggregation_handle_min_state_.get(), type.makeNullValue());
-    CheckMinValue<typename GenericType::cpptype>(min, 
*aggregation_handle_min_, *aggregation_handle_min_state_);
+    CheckMinValue<typename GenericType::cpptype>(
+        min, *aggregation_handle_min_, *aggregation_handle_min_state_);
 
     // Test mergeStates().
     std::unique_ptr<AggregationState> merge_state(
@@ -164,7 +175,7 @@ class AggregationHandleMinTest : public ::testing::Test {
         if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
           SetDataType(i * kNumSamples + j - 20, &val);
         } else {
-          SetDataType(static_cast<float>(i * kNumSamples + j - 20)/10, &val);
+          SetDataType(static_cast<float>(i * kNumSamples + j - 20) / 10, &val);
         }
         iterateHandle(merge_state.get(), type.makeValue(&val));
         if (min > val) {
@@ -175,14 +186,14 @@ class AggregationHandleMinTest : public ::testing::Test {
     aggregation_handle_min_->mergeStates(*merge_state,
                                          aggregation_handle_min_state_.get());
     CheckMinValue<typename GenericType::cpptype>(
-        min,
-        *aggregation_handle_min_,
-        *aggregation_handle_min_state_);
+        min, *aggregation_handle_min_, *aggregation_handle_min_state_);
   }
 
   template <typename GenericType>
-  ColumnVector *createColumnVectorGeneric(const Type &type, typename 
GenericType::cpptype *min) {
-    NativeColumnVector *column = new NativeColumnVector(type, kIterations * 
kNumSamples + 3);
+  ColumnVector* createColumnVectorGeneric(const Type &type,
+                                          typename GenericType::cpptype *min) {
+    NativeColumnVector *column =
+        new NativeColumnVector(type, kIterations * kNumSamples + 3);
 
     typename GenericType::cpptype val;
     SetDataType(1000, min);
@@ -193,7 +204,7 @@ class AggregationHandleMinTest : public ::testing::Test {
         if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
           SetDataType(i * kNumSamples + j - 10, &val);
         } else {
-          SetDataType(static_cast<float>(i * kNumSamples + j - 10)/10, &val);
+          SetDataType(static_cast<float>(i * kNumSamples + j - 10) / 10, &val);
         }
         column->appendTypedValue(type.makeValue(&val));
         if (*min > val) {
@@ -201,7 +212,7 @@ class AggregationHandleMinTest : public ::testing::Test {
         }
       }
       // One NULL in the middle.
-      if (i == kIterations/2) {
+      if (i == kIterations / 2) {
         column->appendTypedValue(type.makeNullValue());
       }
     }
@@ -214,11 +225,14 @@ class AggregationHandleMinTest : public ::testing::Test {
   void checkAggregationMinGenericColumnVector() {
     const GenericType &type = GenericType::Instance(true);
     initializeHandle(type);
-    
EXPECT_TRUE(aggregation_handle_min_->finalize(*aggregation_handle_min_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_min_->finalize(*aggregation_handle_min_state_)
+            .isNull());
 
     typename GenericType::cpptype min;
     std::vector<std::unique_ptr<ColumnVector>> column_vectors;
-    column_vectors.emplace_back(createColumnVectorGeneric<GenericType>(type, 
&min));
+    column_vectors.emplace_back(
+        createColumnVectorGeneric<GenericType>(type, &min));
 
     std::unique_ptr<AggregationState> cv_state(
         aggregation_handle_min_->accumulateColumnVectors(column_vectors));
@@ -226,15 +240,12 @@ class AggregationHandleMinTest : public ::testing::Test {
     // Test the state generated directly by accumulateColumnVectors(), and also
     // test after merging back.
     CheckMinValue<typename GenericType::cpptype>(
-        min,
-        *aggregation_handle_min_,
-        *cv_state);
+        min, *aggregation_handle_min_, *cv_state);
 
-    aggregation_handle_min_->mergeStates(*cv_state, 
aggregation_handle_min_state_.get());
+    aggregation_handle_min_->mergeStates(*cv_state,
+                                         aggregation_handle_min_state_.get());
     CheckMinValue<typename GenericType::cpptype>(
-        min,
-        *aggregation_handle_min_,
-        *aggregation_handle_min_state_);
+        min, *aggregation_handle_min_, *aggregation_handle_min_state_);
   }
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -242,29 +253,29 @@ class AggregationHandleMinTest : public ::testing::Test {
   void checkAggregationMinGenericValueAccessor() {
     const GenericType &type = GenericType::Instance(true);
     initializeHandle(type);
-    
EXPECT_TRUE(aggregation_handle_min_->finalize(*aggregation_handle_min_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_min_->finalize(*aggregation_handle_min_state_)
+            .isNull());
 
-    std::unique_ptr<ColumnVectorsValueAccessor> accessor(new 
ColumnVectorsValueAccessor());
+    std::unique_ptr<ColumnVectorsValueAccessor> accessor(
+        new ColumnVectorsValueAccessor());
 
     typename GenericType::cpptype min;
     accessor->addColumn(createColumnVectorGeneric<GenericType>(type, &min));
 
     std::unique_ptr<AggregationState> va_state(
-        aggregation_handle_min_->accumulateValueAccessor(accessor.get(),
-                                                         
std::vector<attribute_id>(1, 0)));
+        aggregation_handle_min_->accumulateValueAccessor(
+            accessor.get(), std::vector<attribute_id>(1, 0)));
 
     // Test the state generated directly by accumulateValueAccessor(), and also
     // test after merging back.
     CheckMinValue<typename GenericType::cpptype>(
-        min,
-        *aggregation_handle_min_,
-        *va_state);
+        min, *aggregation_handle_min_, *va_state);
 
-    aggregation_handle_min_->mergeStates(*va_state, 
aggregation_handle_min_state_.get());
+    aggregation_handle_min_->mergeStates(*va_state,
+                                         aggregation_handle_min_state_.get());
     CheckMinValue<typename GenericType::cpptype>(
-        min,
-        *aggregation_handle_min_,
-        *aggregation_handle_min_state_);
+        min, *aggregation_handle_min_, *aggregation_handle_min_state_);
   }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
@@ -272,11 +283,13 @@ class AggregationHandleMinTest : public ::testing::Test {
   void checkAggregationMinString() {
     const StringType &type = StringType::Instance(10, true);
     initializeHandle(type);
-    
EXPECT_TRUE(aggregation_handle_min_->finalize(*aggregation_handle_min_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_min_->finalize(*aggregation_handle_min_state_)
+            .isNull());
 
     std::unique_ptr<UncheckedComparator> fast_comparator_;
     
fast_comparator_.reset(ComparisonFactory::GetComparison(ComparisonID::kLess)
-                           .makeUncheckedComparatorForTypes(type, type));
+                               .makeUncheckedComparatorForTypes(type, type));
     std::string string_literal;
     std::string min = "z";
     int val;
@@ -288,15 +301,19 @@ class AggregationHandleMinTest : public ::testing::Test {
         oss << "test" << val;
         string_literal = oss.str();
 
-        iterateHandle(aggregation_handle_min_state_.get(), 
type.makeValue(string_literal.c_str(),
-                                                        
string_literal.length() + 1).ensureNotReference());
-        if (fast_comparator_->compareDataPtrs(string_literal.c_str(), 
min.c_str())) {
+        iterateHandle(
+            aggregation_handle_min_state_.get(),
+            type.makeValue(string_literal.c_str(), string_literal.length() + 1)
+                .ensureNotReference());
+        if (fast_comparator_->compareDataPtrs(string_literal.c_str(),
+                                              min.c_str())) {
           min = string_literal;
         }
       }
     }
     iterateHandle(aggregation_handle_min_state_.get(), type.makeNullValue());
-    CheckMinString(min, *aggregation_handle_min_, 
*aggregation_handle_min_state_);
+    CheckMinString(
+        min, *aggregation_handle_min_, *aggregation_handle_min_state_);
 
     // Test mergeStates().
     std::unique_ptr<AggregationState> merge_state(
@@ -314,24 +331,27 @@ class AggregationHandleMinTest : public ::testing::Test {
 
         iterateHandle(
             merge_state.get(),
-            type.makeValue(string_literal.c_str(),
-                           string_literal.length() + 1).ensureNotReference());
-        if (fast_comparator_->compareDataPtrs(string_literal.c_str(), 
min.c_str())) {
+            type.makeValue(string_literal.c_str(), string_literal.length() + 1)
+                .ensureNotReference());
+        if (fast_comparator_->compareDataPtrs(string_literal.c_str(),
+                                              min.c_str())) {
           min = string_literal;
         }
       }
     }
     aggregation_handle_min_->mergeStates(*merge_state,
                                          aggregation_handle_min_state_.get());
-    CheckMinString(min, *aggregation_handle_min_, 
*aggregation_handle_min_state_);
+    CheckMinString(
+        min, *aggregation_handle_min_, *aggregation_handle_min_state_);
   }
 
   template <typename ColumnVectorType>
-  ColumnVector *createColumnVectorString(const Type &type, std::string *min) {
-    ColumnVectorType *column = new ColumnVectorType(type, kIterations * 
kNumSamples + 3);
+  ColumnVector* createColumnVectorString(const Type &type, std::string *min) {
+    ColumnVectorType *column =
+        new ColumnVectorType(type, kIterations * kNumSamples + 3);
     std::unique_ptr<UncheckedComparator> fast_comparator_;
     
fast_comparator_.reset(ComparisonFactory::GetComparison(ComparisonID::kLess)
-                           .makeUncheckedComparatorForTypes(type, type));
+                               .makeUncheckedComparatorForTypes(type, type));
     std::string string_literal;
     *min = "z";
     int val;
@@ -343,14 +363,16 @@ class AggregationHandleMinTest : public ::testing::Test {
         oss << "test" << val;
         string_literal = oss.str();
 
-        column->appendTypedValue(type.makeValue(string_literal.c_str(), 
string_literal.length() + 1)
-            .ensureNotReference());
-        if (fast_comparator_->compareDataPtrs(string_literal.c_str(), 
min->c_str())) {
+        column->appendTypedValue(
+            type.makeValue(string_literal.c_str(), string_literal.length() + 1)
+                .ensureNotReference());
+        if (fast_comparator_->compareDataPtrs(string_literal.c_str(),
+                                              min->c_str())) {
           *min = string_literal;
         }
       }
       // One NULL in the middle.
-      if (i == kIterations/2) {
+      if (i == kIterations / 2) {
         column->appendTypedValue(type.makeNullValue());
       }
     }
@@ -363,25 +385,26 @@ class AggregationHandleMinTest : public ::testing::Test {
   void checkAggregationMinStringColumnVector() {
     const StringType &type = StringType::Instance(10, true);
     initializeHandle(type);
-    
EXPECT_TRUE(aggregation_handle_min_->finalize(*aggregation_handle_min_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_min_->finalize(*aggregation_handle_min_state_)
+            .isNull());
 
     std::string min;
     std::vector<std::unique_ptr<ColumnVector>> column_vectors;
-    
column_vectors.emplace_back(createColumnVectorString<ColumnVectorType>(type, 
&min));
+    column_vectors.emplace_back(
+        createColumnVectorString<ColumnVectorType>(type, &min));
 
     std::unique_ptr<AggregationState> cv_state(
         aggregation_handle_min_->accumulateColumnVectors(column_vectors));
 
     // Test the state generated directly by accumulateColumnVectors(), and also
     // test after merging back.
-    CheckMinString(min,
-                   *aggregation_handle_min_,
-                   *cv_state);
-
-    aggregation_handle_min_->mergeStates(*cv_state, 
aggregation_handle_min_state_.get());
-    CheckMinString(min,
-                   *aggregation_handle_min_,
-                   *aggregation_handle_min_state_);
+    CheckMinString(min, *aggregation_handle_min_, *cv_state);
+
+    aggregation_handle_min_->mergeStates(*cv_state,
+                                         aggregation_handle_min_state_.get());
+    CheckMinString(
+        min, *aggregation_handle_min_, *aggregation_handle_min_state_);
   }
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -389,26 +412,27 @@ class AggregationHandleMinTest : public ::testing::Test {
   void checkAggregationMinStringValueAccessor() {
     const StringType &type = StringType::Instance(10, true);
     initializeHandle(type);
-    
EXPECT_TRUE(aggregation_handle_min_->finalize(*aggregation_handle_min_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_min_->finalize(*aggregation_handle_min_state_)
+            .isNull());
 
     std::string min;
-    std::unique_ptr<ColumnVectorsValueAccessor> accessor(new 
ColumnVectorsValueAccessor());
+    std::unique_ptr<ColumnVectorsValueAccessor> accessor(
+        new ColumnVectorsValueAccessor());
     accessor->addColumn(createColumnVectorString<ColumnVectorType>(type, 
&min));
 
     std::unique_ptr<AggregationState> va_state(
-        aggregation_handle_min_->accumulateValueAccessor(accessor.get(),
-                                                         
std::vector<attribute_id>(1, 0)));
+        aggregation_handle_min_->accumulateValueAccessor(
+            accessor.get(), std::vector<attribute_id>(1, 0)));
 
     // Test the state generated directly by accumulateValueAccessor(), and also
     // test after merging back.
-    CheckMinString(min,
-                   *aggregation_handle_min_,
-                   *va_state);
-
-    aggregation_handle_min_->mergeStates(*va_state, 
aggregation_handle_min_state_.get());
-    CheckMinString(min,
-                   *aggregation_handle_min_,
-                   *aggregation_handle_min_state_);
+    CheckMinString(min, *aggregation_handle_min_, *va_state);
+
+    aggregation_handle_min_->mergeStates(*va_state,
+                                         aggregation_handle_min_state_.get());
+    CheckMinString(
+        min, *aggregation_handle_min_, *aggregation_handle_min_state_);
   }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
@@ -419,9 +443,7 @@ class AggregationHandleMinTest : public ::testing::Test {
 
 template <>
 void AggregationHandleMinTest::CheckMinValue<float>(
-    float val,
-    const AggregationHandle &handle,
-    const AggregationState &state) {
+    float val, const AggregationHandle &handle, const AggregationState &state) 
{
   EXPECT_FLOAT_EQ(val, handle.finalize(state).getLiteral<float>());
 }
 
@@ -434,17 +456,20 @@ void AggregationHandleMinTest::CheckMinValue<double>(
 }
 
 template <>
-void AggregationHandleMinTest::SetDataType<DatetimeLit>(int value, DatetimeLit 
*data) {
+void AggregationHandleMinTest::SetDataType<DatetimeLit>(int value,
+                                                        DatetimeLit *data) {
   data->ticks = value;
 }
 
 template <>
-void AggregationHandleMinTest::SetDataType<DatetimeIntervalLit>(int value, 
DatetimeIntervalLit *data) {
+void AggregationHandleMinTest::SetDataType<DatetimeIntervalLit>(
+    int value, DatetimeIntervalLit *data) {
   data->interval_ticks = value;
 }
 
 template <>
-void AggregationHandleMinTest::SetDataType<YearMonthIntervalLit>(int value, 
YearMonthIntervalLit *data) {
+void AggregationHandleMinTest::SetDataType<YearMonthIntervalLit>(
+    int value, YearMonthIntervalLit *data) {
   data->months = value;
 }
 
@@ -575,50 +600,67 @@ TEST_F(AggregationHandleMinDeathTest, WrongTypeTest) {
   double double_val = 0;
   float float_val = 0;
 
-  iterateHandle(aggregation_handle_min_state_.get(), 
int_non_null_type.makeValue(&int_val));
+  iterateHandle(aggregation_handle_min_state_.get(),
+                int_non_null_type.makeValue(&int_val));
 
-  EXPECT_DEATH(iterateHandle(aggregation_handle_min_state_.get(), 
long_type.makeValue(&long_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_min_state_.get(), 
double_type.makeValue(&double_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_min_state_.get(), 
float_type.makeValue(&float_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_min_state_.get(), 
char_type.makeValue("asdf", 5)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_min_state_.get(), 
varchar_type.makeValue("asdf", 5)), "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_min_state_.get(),
+                             long_type.makeValue(&long_val)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_min_state_.get(),
+                             double_type.makeValue(&double_val)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_min_state_.get(),
+                             float_type.makeValue(&float_val)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_min_state_.get(),
+                             char_type.makeValue("asdf", 5)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_min_state_.get(),
+                             varchar_type.makeValue("asdf", 5)),
+               "");
 
   // Test mergeStates() with incorrectly typed handles.
   std::unique_ptr<AggregationHandle> aggregation_handle_min_long(
-      AggregateFunctionFactory::Get(AggregationID::kMin).createHandle(
-          std::vector<const Type*>(1, &long_type)));
+      AggregateFunctionFactory::Get(AggregationID::kMin)
+          .createHandle(std::vector<const Type *>(1, &long_type)));
   std::unique_ptr<AggregationState> aggregation_state_min_merge_long(
       aggregation_handle_min_long->createInitialState());
-  static_cast<const 
AggregationHandleMin&>(*aggregation_handle_min_long).iterateUnaryInl(
-      
static_cast<AggregationStateMin*>(aggregation_state_min_merge_long.get()),
-      long_type.makeValue(&long_val));
-  
EXPECT_DEATH(aggregation_handle_min_->mergeStates(*aggregation_state_min_merge_long,
-                                                    
aggregation_handle_min_state_.get()),
-               "");
+  static_cast<const AggregationHandleMin &>(*aggregation_handle_min_long)
+      .iterateUnaryInl(static_cast<AggregationStateMin *>(
+                           aggregation_state_min_merge_long.get()),
+                       long_type.makeValue(&long_val));
+  EXPECT_DEATH(
+      aggregation_handle_min_->mergeStates(*aggregation_state_min_merge_long,
+                                           
aggregation_handle_min_state_.get()),
+      "");
 
   std::unique_ptr<AggregationHandle> aggregation_handle_min_double(
-      AggregateFunctionFactory::Get(AggregationID::kMin).createHandle(
-          std::vector<const Type*>(1, &double_type)));
+      AggregateFunctionFactory::Get(AggregationID::kMin)
+          .createHandle(std::vector<const Type *>(1, &double_type)));
   std::unique_ptr<AggregationState> aggregation_state_min_merge_double(
       aggregation_handle_min_double->createInitialState());
-  static_cast<const 
AggregationHandleMin&>(*aggregation_handle_min_double).iterateUnaryInl(
-      
static_cast<AggregationStateMin*>(aggregation_state_min_merge_double.get()),
-      double_type.makeValue(&double_val));
-  
EXPECT_DEATH(aggregation_handle_min_->mergeStates(*aggregation_state_min_merge_double,
-                                                    
aggregation_handle_min_state_.get()),
-               "");
+  static_cast<const AggregationHandleMin &>(*aggregation_handle_min_double)
+      .iterateUnaryInl(static_cast<AggregationStateMin *>(
+                           aggregation_state_min_merge_double.get()),
+                       double_type.makeValue(&double_val));
+  EXPECT_DEATH(
+      aggregation_handle_min_->mergeStates(*aggregation_state_min_merge_double,
+                                           
aggregation_handle_min_state_.get()),
+      "");
 
   std::unique_ptr<AggregationHandle> aggregation_handle_min_float(
-      AggregateFunctionFactory::Get(AggregationID::kMin).createHandle(
-          std::vector<const Type*>(1, &float_type)));
+      AggregateFunctionFactory::Get(AggregationID::kMin)
+          .createHandle(std::vector<const Type *>(1, &float_type)));
   std::unique_ptr<AggregationState> aggregation_state_min_merge_float(
       aggregation_handle_min_float->createInitialState());
-  static_cast<const 
AggregationHandleMin&>(*aggregation_handle_min_float).iterateUnaryInl(
-      
static_cast<AggregationStateMin*>(aggregation_state_min_merge_float.get()),
-      float_type.makeValue(&float_val));
-  
EXPECT_DEATH(aggregation_handle_min_->mergeStates(*aggregation_state_min_merge_float,
-                                                    
aggregation_handle_min_state_.get()),
-               "");
+  static_cast<const AggregationHandleMin &>(*aggregation_handle_min_float)
+      .iterateUnaryInl(static_cast<AggregationStateMin *>(
+                           aggregation_state_min_merge_float.get()),
+                       float_type.makeValue(&float_val));
+  EXPECT_DEATH(
+      aggregation_handle_min_->mergeStates(*aggregation_state_min_merge_float,
+                                           
aggregation_handle_min_state_.get()),
+      "");
 }
 #endif
 
@@ -643,25 +685,28 @@ TEST_F(AggregationHandleMinTest, GroupByTableMergeTest) {
   initializeHandle(int_non_null_type);
   storage_manager_.reset(new StorageManager("./test_min_data"));
   std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
-      aggregation_handle_min_->createGroupByHashTable(
-          HashTableImplType::kSimpleScalarSeparateChaining,
+      AggregationStateFastHashTableFactory::CreateResizable(
+          HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &int_non_null_type),
           10,
+          {aggregation_handle_min_.get()->getPayloadSize()},
+          {aggregation_handle_min_.get()},
           storage_manager_.get()));
   std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
-      aggregation_handle_min_->createGroupByHashTable(
-          HashTableImplType::kSimpleScalarSeparateChaining,
+      AggregationStateFastHashTableFactory::CreateResizable(
+          HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &int_non_null_type),
           10,
+          {aggregation_handle_min_.get()->getPayloadSize()},
+          {aggregation_handle_min_.get()},
           storage_manager_.get()));
 
-  AggregationStateHashTable<AggregationStateMin> 
*destination_hash_table_derived =
-      static_cast<AggregationStateHashTable<AggregationStateMin> *>(
+  AggregationStateFastHashTable *destination_hash_table_derived =
+      static_cast<AggregationStateFastHashTable *>(
           destination_hash_table.get());
 
-  AggregationStateHashTable<AggregationStateMin> *source_hash_table_derived =
-      static_cast<AggregationStateHashTable<AggregationStateMin> *>(
-          source_hash_table.get());
+  AggregationStateFastHashTable *source_hash_table_derived =
+      static_cast<AggregationStateFastHashTable *>(source_hash_table.get());
 
   AggregationHandleMin *aggregation_handle_min_derived =
       static_cast<AggregationHandleMin *>(aggregation_handle_min_.get());
@@ -726,35 +771,52 @@ TEST_F(AggregationHandleMinTest, GroupByTableMergeTest) {
   EXPECT_EQ(exclusive_key_source_min_val.getLiteral<int>(), actual_val);
 
   // Add the key-state pairs to the hash tables.
-  source_hash_table_derived->putCompositeKey(common_key,
-                                             *common_key_source_state);
-  destination_hash_table_derived->putCompositeKey(
-      common_key, *common_key_destination_state);
-  source_hash_table_derived->putCompositeKey(exclusive_source_key,
-                                             *exclusive_key_source_state);
-  destination_hash_table_derived->putCompositeKey(
-      exclusive_destination_key, *exclusive_key_destination_state);
+  unsigned char buffer[100];
+  buffer[0] = '\0';
+  memcpy(buffer + 1,
+         common_key_source_state.get()->getPayloadAddress(),
+         aggregation_handle_min_.get()->getPayloadSize());
+  source_hash_table_derived->putCompositeKey(common_key, buffer);
+
+  memcpy(buffer + 1,
+         common_key_destination_state.get()->getPayloadAddress(),
+         aggregation_handle_min_.get()->getPayloadSize());
+  destination_hash_table_derived->putCompositeKey(common_key, buffer);
+
+  memcpy(buffer + 1,
+         exclusive_key_source_state.get()->getPayloadAddress(),
+         aggregation_handle_min_.get()->getPayloadSize());
+  source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer);
+
+  memcpy(buffer + 1,
+         exclusive_key_destination_state.get()->getPayloadAddress(),
+         aggregation_handle_min_.get()->getPayloadSize());
+  destination_hash_table_derived->putCompositeKey(exclusive_destination_key,
+                                                      buffer);
 
   EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
   EXPECT_EQ(2u, source_hash_table_derived->numEntries());
 
-  aggregation_handle_min_->mergeGroupByHashTables(*source_hash_table,
-                                                  
destination_hash_table.get());
+  AggregationOperationState::mergeGroupByHashTables(
+      source_hash_table.get(), destination_hash_table.get());
 
   EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
 
   CheckMinValue<int>(
       common_key_source_min_val.getLiteral<int>(),
-      *aggregation_handle_min_derived,
-      *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+      aggregation_handle_min_derived->finalizeHashTableEntryFast(
+          destination_hash_table_derived->getSingleCompositeKey(common_key) +
+          1));
   CheckMinValue<int>(exclusive_key_destination_min_val.getLiteral<int>(),
-                     *aggregation_handle_min_derived,
-                     *(destination_hash_table_derived->getSingleCompositeKey(
-                         exclusive_destination_key)));
+                     
aggregation_handle_min_derived->finalizeHashTableEntryFast(
+                         destination_hash_table_derived->getSingleCompositeKey(
+                             exclusive_destination_key) +
+                         1));
   CheckMinValue<int>(exclusive_key_source_min_val.getLiteral<int>(),
-                     *aggregation_handle_min_derived,
-                     *(source_hash_table_derived->getSingleCompositeKey(
-                         exclusive_source_key)));
+                     
aggregation_handle_min_derived->finalizeHashTableEntryFast(
+                         source_hash_table_derived->getSingleCompositeKey(
+                             exclusive_source_key) +
+                         1));
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp 
b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
index 0e35151..1d1c084 100644
--- a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
@@ -28,6 +28,8 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationHandleSum.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
+#include "storage/AggregationOperationState.hpp"
+#include "storage/FastHashTableFactory.hpp"
 #include "storage/StorageManager.hpp"
 #include "types/CharType.hpp"
 #include "types/DatetimeIntervalType.hpp"
@@ -52,51 +54,56 @@
 
 namespace quickstep {
 
-class AggregationHandleSumTest : public::testing::Test {
+class AggregationHandleSumTest : public ::testing::Test {
  protected:
   static const int kNumSamples = 1000;
 
   // Helper method that calls AggregationHandleSum::iterateUnaryInl() to
   // aggregate 'value' into '*state'.
   void iterateHandle(AggregationState *state, const TypedValue &value) {
-    static_cast<const 
AggregationHandleSum&>(*aggregation_handle_sum_).iterateUnaryInl(
-        static_cast<AggregationStateSum*>(state),
-        value);
+    static_cast<const AggregationHandleSum &>(*aggregation_handle_sum_)
+        .iterateUnaryInl(static_cast<AggregationStateSum *>(state), value);
   }
 
   void initializeHandle(const Type &type) {
     aggregation_handle_sum_.reset(
-        AggregateFunctionFactory::Get(AggregationID::kSum).createHandle(
-            std::vector<const Type*>(1, &type)));
+        AggregateFunctionFactory::Get(AggregationID::kSum)
+            .createHandle(std::vector<const Type *>(1, &type)));
     aggregation_handle_sum_state_.reset(
         aggregation_handle_sum_->createInitialState());
   }
 
   static bool ApplyToTypesTest(TypeID typeID) {
-    const Type &type = (typeID == kChar || typeID == kVarChar) ?
-        TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) :
-        TypeFactory::GetType(typeID);
+    const Type &type =
+        (typeID == kChar || typeID == kVarChar)
+            ? TypeFactory::GetType(typeID, static_cast<std::size_t>(10))
+            : TypeFactory::GetType(typeID);
 
-    return AggregateFunctionFactory::Get(AggregationID::kSum).canApplyToTypes(
-        std::vector<const Type*>(1, &type));
+    return AggregateFunctionFactory::Get(AggregationID::kSum)
+        .canApplyToTypes(std::vector<const Type *>(1, &type));
   }
 
   static bool ResultTypeForArgumentTypeTest(TypeID input_type_id,
                                             TypeID output_type_id) {
-    const Type *result_type
-        = 
AggregateFunctionFactory::Get(AggregationID::kSum).resultTypeForArgumentTypes(
-            std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id)));
+    const Type *result_type =
+        AggregateFunctionFactory::Get(AggregationID::kSum)
+            .resultTypeForArgumentTypes(std::vector<const Type *>(
+                1, &TypeFactory::GetType(input_type_id)));
     return (result_type->getTypeID() == output_type_id);
   }
 
   template <typename CppType>
-  static void CheckSumValue(
-      CppType expected,
-      const AggregationHandle &target,
-      const AggregationState &state) {
+  static void CheckSumValue(CppType expected,
+                            const AggregationHandle &target,
+                            const AggregationState &state) {
     EXPECT_EQ(expected, target.finalize(state).getLiteral<CppType>());
   }
 
+  template <typename CppType>
+  static void CheckSumValue(CppType expected, const TypedValue &value) {
+    EXPECT_EQ(expected, value.getLiteral<CppType>());
+  }
+
   // Static templated method to set a meaningful to data types.
   template <typename CppType>
   static void SetDataType(int value, CppType *data) {
@@ -108,7 +115,9 @@ class AggregationHandleSumTest : public::testing::Test {
     const GenericType &type = GenericType::Instance(true);
 
     initializeHandle(type);
-    
EXPECT_TRUE(aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_)
+            .isNull());
 
     typename GenericType::cpptype val;
     typename PrecisionType::cpptype sum;
@@ -119,13 +128,14 @@ class AggregationHandleSumTest : public::testing::Test {
       if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
         SetDataType(i - 10, &val);
       } else {
-        SetDataType(static_cast<float>(i - 10)/10, &val);
+        SetDataType(static_cast<float>(i - 10) / 10, &val);
       }
       iterateHandle(aggregation_handle_sum_state_.get(), type.makeValue(&val));
       sum += val;
     }
     iterateHandle(aggregation_handle_sum_state_.get(), type.makeNullValue());
-    CheckSumValue<typename PrecisionType::cpptype>(sum, 
*aggregation_handle_sum_, *aggregation_handle_sum_state_);
+    CheckSumValue<typename PrecisionType::cpptype>(
+        sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
 
     // Test mergeStates().
     std::unique_ptr<AggregationState> merge_state(
@@ -138,7 +148,7 @@ class AggregationHandleSumTest : public::testing::Test {
       if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
         SetDataType(i - 10, &val);
       } else {
-        SetDataType(static_cast<float>(i - 10)/10, &val);
+        SetDataType(static_cast<float>(i - 10) / 10, &val);
       }
       iterateHandle(merge_state.get(), type.makeValue(&val));
       sum += val;
@@ -146,13 +156,11 @@ class AggregationHandleSumTest : public::testing::Test {
     aggregation_handle_sum_->mergeStates(*merge_state,
                                          aggregation_handle_sum_state_.get());
     CheckSumValue<typename PrecisionType::cpptype>(
-        sum,
-        *aggregation_handle_sum_,
-        *aggregation_handle_sum_state_);
+        sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
   }
 
   template <typename GenericType, typename Output>
-  ColumnVector *createColumnVectorGeneric(const Type &type, Output *sum) {
+  ColumnVector* createColumnVectorGeneric(const Type &type, Output *sum) {
     NativeColumnVector *column = new NativeColumnVector(type, kNumSamples + 3);
 
     typename GenericType::cpptype val;
@@ -163,12 +171,12 @@ class AggregationHandleSumTest : public::testing::Test {
       if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
         SetDataType(i - 10, &val);
       } else {
-        SetDataType(static_cast<float>(i - 10)/10, &val);
+        SetDataType(static_cast<float>(i - 10) / 10, &val);
       }
       column->appendTypedValue(type.makeValue(&val));
       *sum += val;
       // One NULL in the middle.
-      if (i == kNumSamples/2) {
+      if (i == kNumSamples / 2) {
         column->appendTypedValue(type.makeNullValue());
       }
     }
@@ -182,12 +190,15 @@ class AggregationHandleSumTest : public::testing::Test {
     const GenericType &type = GenericType::Instance(true);
 
     initializeHandle(type);
-    
EXPECT_TRUE(aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_)
+            .isNull());
 
     typename PrecisionType::cpptype sum;
     std::vector<std::unique_ptr<ColumnVector>> column_vectors;
     column_vectors.emplace_back(
-        createColumnVectorGeneric<GenericType, typename 
PrecisionType::cpptype>(type, &sum));
+        createColumnVectorGeneric<GenericType, typename 
PrecisionType::cpptype>(
+            type, &sum));
 
     std::unique_ptr<AggregationState> cv_state(
         aggregation_handle_sum_->accumulateColumnVectors(column_vectors));
@@ -195,15 +206,12 @@ class AggregationHandleSumTest : public::testing::Test {
     // Test the state generated directly by accumulateColumnVectors(), and also
     // test after merging back.
     CheckSumValue<typename PrecisionType::cpptype>(
-        sum,
-        *aggregation_handle_sum_,
-        *cv_state);
+        sum, *aggregation_handle_sum_, *cv_state);
 
-    aggregation_handle_sum_->mergeStates(*cv_state, 
aggregation_handle_sum_state_.get());
+    aggregation_handle_sum_->mergeStates(*cv_state,
+                                         aggregation_handle_sum_state_.get());
     CheckSumValue<typename PrecisionType::cpptype>(
-        sum,
-        *aggregation_handle_sum_,
-        *aggregation_handle_sum_state_);
+        sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
   }
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -212,29 +220,30 @@ class AggregationHandleSumTest : public::testing::Test {
     const GenericType &type = GenericType::Instance(true);
 
     initializeHandle(type);
-    
EXPECT_TRUE(aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_)
+            .isNull());
 
     typename PrecisionType::cpptype sum;
-    std::unique_ptr<ColumnVectorsValueAccessor> accessor(new 
ColumnVectorsValueAccessor());
+    std::unique_ptr<ColumnVectorsValueAccessor> accessor(
+        new ColumnVectorsValueAccessor());
     accessor->addColumn(
-        createColumnVectorGeneric<GenericType, typename 
PrecisionType::cpptype>(type, &sum));
+        createColumnVectorGeneric<GenericType, typename 
PrecisionType::cpptype>(
+            type, &sum));
 
     std::unique_ptr<AggregationState> va_state(
-        aggregation_handle_sum_->accumulateValueAccessor(accessor.get(),
-                                                         
std::vector<attribute_id>(1, 0)));
+        aggregation_handle_sum_->accumulateValueAccessor(
+            accessor.get(), std::vector<attribute_id>(1, 0)));
 
     // Test the state generated directly by accumulateValueAccessor(), and also
     // test after merging back.
     CheckSumValue<typename PrecisionType::cpptype>(
-        sum,
-        *aggregation_handle_sum_,
-        *va_state);
+        sum, *aggregation_handle_sum_, *va_state);
 
-    aggregation_handle_sum_->mergeStates(*va_state, 
aggregation_handle_sum_state_.get());
+    aggregation_handle_sum_->mergeStates(*va_state,
+                                         aggregation_handle_sum_state_.get());
     CheckSumValue<typename PrecisionType::cpptype>(
-        sum,
-        *aggregation_handle_sum_,
-        *aggregation_handle_sum_state_);
+        sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
   }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
@@ -247,9 +256,7 @@ const int AggregationHandleSumTest::kNumSamples;
 
 template <>
 void AggregationHandleSumTest::CheckSumValue<float>(
-    float val,
-    const AggregationHandle &handle,
-    const AggregationState &state) {
+    float val, const AggregationHandle &handle, const AggregationState &state) 
{
   EXPECT_FLOAT_EQ(val, handle.finalize(state).getLiteral<float>());
 }
 
@@ -262,12 +269,14 @@ void AggregationHandleSumTest::CheckSumValue<double>(
 }
 
 template <>
-void AggregationHandleSumTest::SetDataType<DatetimeIntervalLit>(int value, 
DatetimeIntervalLit *data) {
+void AggregationHandleSumTest::SetDataType<DatetimeIntervalLit>(
+    int value, DatetimeIntervalLit *data) {
   data->interval_ticks = value;
 }
 
 template <>
-void AggregationHandleSumTest::SetDataType<YearMonthIntervalLit>(int value, 
YearMonthIntervalLit *data) {
+void AggregationHandleSumTest::SetDataType<YearMonthIntervalLit>(
+    int value, YearMonthIntervalLit *data) {
   data->months = value;
 }
 
@@ -314,11 +323,13 @@ TEST_F(AggregationHandleSumTest, 
DoubleTypeColumnVectorTest) {
 }
 
 TEST_F(AggregationHandleSumTest, DatetimeIntervalTypeColumnVectorTest) {
-  checkAggregationSumGenericColumnVector<DatetimeIntervalType, 
DatetimeIntervalType>();
+  checkAggregationSumGenericColumnVector<DatetimeIntervalType,
+                                         DatetimeIntervalType>();
 }
 
 TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeColumnVectorTest) {
-  checkAggregationSumGenericColumnVector<YearMonthIntervalType, 
YearMonthIntervalType>();
+  checkAggregationSumGenericColumnVector<YearMonthIntervalType,
+                                         YearMonthIntervalType>();
 }
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -339,11 +350,13 @@ TEST_F(AggregationHandleSumTest, 
DoubleTypeValueAccessorTest) {
 }
 
 TEST_F(AggregationHandleSumTest, DatetimeIntervalTypeValueAccessorTest) {
-  checkAggregationSumGenericValueAccessor<DatetimeIntervalType, 
DatetimeIntervalType>();
+  checkAggregationSumGenericValueAccessor<DatetimeIntervalType,
+                                          DatetimeIntervalType>();
 }
 
 TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeValueAccessorTest) {
-  checkAggregationSumGenericValueAccessor<YearMonthIntervalType, 
YearMonthIntervalType>();
+  checkAggregationSumGenericValueAccessor<YearMonthIntervalType,
+                                          YearMonthIntervalType>();
 }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
@@ -373,38 +386,53 @@ TEST_F(AggregationHandleSumDeathTest, WrongTypeTest) {
   float float_val = 0;
 
   // Passes.
-  iterateHandle(aggregation_handle_sum_state_.get(), 
int_non_null_type.makeValue(&int_val));
+  iterateHandle(aggregation_handle_sum_state_.get(),
+                int_non_null_type.makeValue(&int_val));
 
-  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), 
long_type.makeValue(&long_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), 
double_type.makeValue(&double_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), 
float_type.makeValue(&float_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), 
char_type.makeValue("asdf", 5)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), 
varchar_type.makeValue("asdf", 5)), "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+                             long_type.makeValue(&long_val)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+                             double_type.makeValue(&double_val)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+                             float_type.makeValue(&float_val)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+                             char_type.makeValue("asdf", 5)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+                             varchar_type.makeValue("asdf", 5)),
+               "");
 
   // Test mergeStates() with incorrectly typed handles.
   std::unique_ptr<AggregationHandle> aggregation_handle_sum_double(
-      AggregateFunctionFactory::Get(AggregationID::kSum).createHandle(
-          std::vector<const Type*>(1, &double_type)));
+      AggregateFunctionFactory::Get(AggregationID::kSum)
+          .createHandle(std::vector<const Type *>(1, &double_type)));
   std::unique_ptr<AggregationState> aggregation_state_sum_merge_double(
       aggregation_handle_sum_double->createInitialState());
-  static_cast<const 
AggregationHandleSum&>(*aggregation_handle_sum_double).iterateUnaryInl(
-      
static_cast<AggregationStateSum*>(aggregation_state_sum_merge_double.get()),
-      double_type.makeValue(&double_val));
-  
EXPECT_DEATH(aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_double,
-                                                    
aggregation_handle_sum_state_.get()),
-               "");
+  static_cast<const AggregationHandleSum &>(*aggregation_handle_sum_double)
+      .iterateUnaryInl(static_cast<AggregationStateSum *>(
+                           aggregation_state_sum_merge_double.get()),
+                       double_type.makeValue(&double_val));
+  EXPECT_DEATH(
+      aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_double,
+                                           
aggregation_handle_sum_state_.get()),
+      "");
 
   std::unique_ptr<AggregationHandle> aggregation_handle_sum_float(
-      AggregateFunctionFactory::Get(AggregationID::kSum).createHandle(
-          std::vector<const Type*>(1, &float_type)));
+      AggregateFunctionFactory::Get(AggregationID::kSum)
+          .createHandle(std::vector<const Type *>(1, &float_type)));
   std::unique_ptr<AggregationState> aggregation_state_sum_merge_float(
       aggregation_handle_sum_float->createInitialState());
-  static_cast<const 
AggregationHandleSum&>(*aggregation_handle_sum_float).iterateUnaryInl(
-      
static_cast<AggregationStateSum*>(aggregation_state_sum_merge_float.get()),
-      float_type.makeValue(&float_val));
-  
EXPECT_DEATH(aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_float,
-                                                    
aggregation_handle_sum_state_.get()),
-               "");
+  static_cast<const AggregationHandleSum &>(*aggregation_handle_sum_float)
+      .iterateUnaryInl(static_cast<AggregationStateSum *>(
+                           aggregation_state_sum_merge_float.get()),
+                       float_type.makeValue(&float_val));
+  EXPECT_DEATH(
+      aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_float,
+                                           
aggregation_handle_sum_state_.get()),
+      "");
 }
 #endif
 
@@ -425,8 +453,10 @@ TEST_F(AggregationHandleSumTest, 
ResultTypeForArgumentTypeTest) {
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kLong, kLong));
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kFloat, kDouble));
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
-  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDatetimeInterval, 
kDatetimeInterval));
-  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, 
kYearMonthInterval));
+  EXPECT_TRUE(
+      ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval));
+  EXPECT_TRUE(
+      ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
 }
 
 TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
@@ -434,25 +464,28 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
   initializeHandle(long_non_null_type);
   storage_manager_.reset(new StorageManager("./test_sum_data"));
   std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
-      aggregation_handle_sum_->createGroupByHashTable(
-          HashTableImplType::kSimpleScalarSeparateChaining,
+      AggregationStateFastHashTableFactory::CreateResizable(
+          HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &long_non_null_type),
           10,
+          {aggregation_handle_sum_.get()->getPayloadSize()},
+          {aggregation_handle_sum_.get()},
           storage_manager_.get()));
   std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
-      aggregation_handle_sum_->createGroupByHashTable(
-          HashTableImplType::kSimpleScalarSeparateChaining,
+      AggregationStateFastHashTableFactory::CreateResizable(
+          HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &long_non_null_type),
           10,
+          {aggregation_handle_sum_.get()->getPayloadSize()},
+          {aggregation_handle_sum_.get()},
           storage_manager_.get()));
 
-  AggregationStateHashTable<AggregationStateSum> 
*destination_hash_table_derived =
-      static_cast<AggregationStateHashTable<AggregationStateSum> *>(
+  AggregationStateFastHashTable *destination_hash_table_derived =
+      static_cast<AggregationStateFastHashTable *>(
           destination_hash_table.get());
 
-  AggregationStateHashTable<AggregationStateSum> *source_hash_table_derived =
-      static_cast<AggregationStateHashTable<AggregationStateSum> *>(
-          source_hash_table.get());
+  AggregationStateFastHashTable *source_hash_table_derived =
+      static_cast<AggregationStateFastHashTable *>(source_hash_table.get());
 
   AggregationHandleSum *aggregation_handle_sum_derived =
       static_cast<AggregationHandleSum *>(aggregation_handle_sum_.get());
@@ -471,7 +504,8 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
   const std::int64_t common_key_destination_sum = 4000;
   TypedValue common_key_destination_sum_val(common_key_destination_sum);
 
-  const std::int64_t merged_common_key = common_key_source_sum + 
common_key_destination_sum;
+  const std::int64_t merged_common_key =
+      common_key_source_sum + common_key_destination_sum;
   TypedValue common_key_merged_val(merged_common_key);
 
   const std::int64_t exclusive_key_source_sum = 100;
@@ -496,59 +530,82 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
   // Create sum value states for keys.
   
aggregation_handle_sum_derived->iterateUnaryInl(common_key_source_state.get(),
                                                   common_key_source_sum_val);
-  std::int64_t actual_val = 
aggregation_handle_sum_->finalize(*common_key_source_state)
-                       .getLiteral<std::int64_t>();
+  std::int64_t actual_val =
+      aggregation_handle_sum_->finalize(*common_key_source_state)
+          .getLiteral<std::int64_t>();
   EXPECT_EQ(common_key_source_sum_val.getLiteral<std::int64_t>(), actual_val);
 
   aggregation_handle_sum_derived->iterateUnaryInl(
       common_key_destination_state.get(), common_key_destination_sum_val);
   actual_val = aggregation_handle_sum_->finalize(*common_key_destination_state)
                    .getLiteral<std::int64_t>();
-  EXPECT_EQ(common_key_destination_sum_val.getLiteral<std::int64_t>(), 
actual_val);
+  EXPECT_EQ(common_key_destination_sum_val.getLiteral<std::int64_t>(),
+            actual_val);
 
   aggregation_handle_sum_derived->iterateUnaryInl(
       exclusive_key_destination_state.get(), 
exclusive_key_destination_sum_val);
   actual_val =
       aggregation_handle_sum_->finalize(*exclusive_key_destination_state)
           .getLiteral<std::int64_t>();
-  EXPECT_EQ(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(), 
actual_val);
+  EXPECT_EQ(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(),
+            actual_val);
 
   aggregation_handle_sum_derived->iterateUnaryInl(
       exclusive_key_source_state.get(), exclusive_key_source_sum_val);
   actual_val = aggregation_handle_sum_->finalize(*exclusive_key_source_state)
                    .getLiteral<std::int64_t>();
-  EXPECT_EQ(exclusive_key_source_sum_val.getLiteral<std::int64_t>(), 
actual_val);
+  EXPECT_EQ(exclusive_key_source_sum_val.getLiteral<std::int64_t>(),
+            actual_val);
 
   // Add the key-state pairs to the hash tables.
-  source_hash_table_derived->putCompositeKey(common_key,
-                                             *common_key_source_state);
-  destination_hash_table_derived->putCompositeKey(
-      common_key, *common_key_destination_state);
-  source_hash_table_derived->putCompositeKey(exclusive_source_key,
-                                             *exclusive_key_source_state);
-  destination_hash_table_derived->putCompositeKey(
-      exclusive_destination_key, *exclusive_key_destination_state);
+  unsigned char buffer[100];
+  buffer[0] = '\0';
+  memcpy(buffer + 1,
+         common_key_source_state.get()->getPayloadAddress(),
+         aggregation_handle_sum_.get()->getPayloadSize());
+  source_hash_table_derived->putCompositeKey(common_key, buffer);
+
+  memcpy(buffer + 1,
+         common_key_destination_state.get()->getPayloadAddress(),
+         aggregation_handle_sum_.get()->getPayloadSize());
+  destination_hash_table_derived->putCompositeKey(common_key, buffer);
+
+  memcpy(buffer + 1,
+         exclusive_key_source_state.get()->getPayloadAddress(),
+         aggregation_handle_sum_.get()->getPayloadSize());
+  source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer);
+
+  memcpy(buffer + 1,
+         exclusive_key_destination_state.get()->getPayloadAddress(),
+         aggregation_handle_sum_.get()->getPayloadSize());
+  destination_hash_table_derived->putCompositeKey(exclusive_destination_key,
+                                                      buffer);
 
   EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
   EXPECT_EQ(2u, source_hash_table_derived->numEntries());
 
-  aggregation_handle_sum_->mergeGroupByHashTables(*source_hash_table,
-                                                  
destination_hash_table.get());
+  AggregationOperationState::mergeGroupByHashTables(
+      source_hash_table.get(), destination_hash_table.get());
 
   EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
 
   CheckSumValue<std::int64_t>(
       common_key_merged_val.getLiteral<std::int64_t>(),
-      *aggregation_handle_sum_derived,
-      *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
-  
CheckSumValue<std::int64_t>(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(),
-                     *aggregation_handle_sum_derived,
-                     *(destination_hash_table_derived->getSingleCompositeKey(
-                         exclusive_destination_key)));
-  
CheckSumValue<std::int64_t>(exclusive_key_source_sum_val.getLiteral<std::int64_t>(),
-                     *aggregation_handle_sum_derived,
-                     *(source_hash_table_derived->getSingleCompositeKey(
-                         exclusive_source_key)));
+      aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+          destination_hash_table_derived->getSingleCompositeKey(common_key) +
+          1));
+  CheckSumValue<std::int64_t>(
+      exclusive_key_destination_sum_val.getLiteral<std::int64_t>(),
+      aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+          destination_hash_table_derived->getSingleCompositeKey(
+              exclusive_destination_key) +
+          1));
+  CheckSumValue<std::int64_t>(
+      exclusive_key_source_sum_val.getLiteral<std::int64_t>(),
+      aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+          source_hash_table_derived->getSingleCompositeKey(
+              exclusive_source_key) +
+          1));
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp 
b/query_optimizer/ExecutionGenerator.cpp
index 130134c..968314e 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1371,13 +1371,9 @@ void ExecutionGenerator::convertAggregate(
   }
 
   if (!group_by_types.empty()) {
-    // SimplifyHashTableImplTypeProto() switches the hash table implementation
-    // from SeparateChaining to SimpleScalarSeparateChaining when there is a
-    // single scalar key type with a reversible hash function.
+    // Right now, only SeparateChaining is supported.
     aggr_state_proto->set_hash_table_impl_type(
-        SimplifyHashTableImplTypeProto(
-            HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type),
-            group_by_types));
+        serialization::HashTableImplType::SEPARATE_CHAINING);
   }
 
   for (const E::AliasPtr &named_aggregate_expression : 
physical_plan->aggregate_expressions()) {
@@ -1404,15 +1400,9 @@ void ExecutionGenerator::convertAggregate(
     if (unnamed_aggregate_expression->is_distinct()) {
       const std::vector<E::ScalarPtr> &arguments = 
unnamed_aggregate_expression->getArguments();
       DCHECK_GE(arguments.size(), 1u);
-      if (group_by_types.empty() && arguments.size() == 1) {
-        aggr_state_proto->add_distinctify_hash_table_impl_types(
-            SimplifyHashTableImplTypeProto(
-                
HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type),
-                {&arguments[0]->getValueType()}));
-      } else {
-        aggr_state_proto->add_distinctify_hash_table_impl_types(
-            HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type));
-      }
+      // Right now only SeparateChaining implementation is supported.
+      aggr_state_proto->add_distinctify_hash_table_impl_types(
+          serialization::HashTableImplType::SEPARATE_CHAINING);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp 
b/relational_operators/tests/AggregationOperator_unittest.cpp
index 0138362..6881dea 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -363,8 +363,9 @@ class AggregationOperatorTest : public ::testing::Test {
     aggr_state_proto->set_estimated_num_entries(estimated_entries);
 
     // Also need to set the HashTable implementation for GROUP BY.
+    // Right now, only SeparateChaining is supported.
     aggr_state_proto->set_hash_table_impl_type(
-        serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING);
+        serialization::HashTableImplType::SEPARATE_CHAINING);
 
     // Create Operators.
     op_.reset(new AggregationOperator(0, *table_, true, aggr_state_index));

Reply via email to