Repository: incubator-quickstep
Updated Branches:
  refs/heads/collision-free-agg 60519429e -> dee650f64


Updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/dee650f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/dee650f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/dee650f6

Branch: refs/heads/collision-free-agg
Commit: dee650f64d7209aff329b4e79dce8737ff229727
Parents: 6051942
Author: Jianqiao Zhu <jianq...@cs.wisc.edu>
Authored: Sat Feb 4 15:33:25 2017 -0600
Committer: Jianqiao Zhu <jianq...@cs.wisc.edu>
Committed: Sat Feb 4 15:33:25 2017 -0600

----------------------------------------------------------------------
 .../aggregation/AggregationConcreteHandle.cpp   |   2 +-
 query_optimizer/ExecutionGenerator.cpp          |  24 +-
 query_optimizer/ExecutionGenerator.hpp          |  14 +-
 .../FinalizeAggregationOperator.cpp             |   2 +-
 .../InitializeAggregationOperator.cpp           |   2 +-
 storage/AggregationOperationState.cpp           |  98 ++++----
 storage/AggregationOperationState.hpp           |  34 ++-
 storage/CollisionFreeVectorTable.cpp            |  28 ++-
 storage/CollisionFreeVectorTable.hpp            | 231 ++++++++++++++-----
 storage/HashTableBase.hpp                       |  10 +-
 storage/HashTableFactory.hpp                    |  27 ++-
 storage/PackedPayloadHashTable.cpp              |   2 +-
 storage/PackedPayloadHashTable.hpp              | 213 ++++++++++++++++-
 storage/ValueAccessorMultiplexer.hpp            |  42 ++++
 14 files changed, 570 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp 
b/expressions/aggregation/AggregationConcreteHandle.cpp
index ca2e35d..73b826f 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -58,7 +58,7 @@ void 
AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
   }
 
   static_cast<PackedPayloadHashTable *>(distinctify_hash_table)
-      ->upsertValueAccessor({}, concatenated_ids, accessor_mux);
+      ->upsertValueAccessorCompositeKey({}, concatenated_ids, accessor_mux);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp 
b/query_optimizer/ExecutionGenerator.cpp
index c047b67..a512e6f 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -377,7 +377,7 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
 bool ExecutionGenerator::canUseCollisionFreeAggregation(
     const P::AggregatePtr &aggregate,
     const std::size_t estimated_num_groups,
-    std::size_t *exact_num_groups) const {
+    std::size_t *max_num_groups) const {
   // Supports only single group-by key.
   if (aggregate->grouping_expressions().size() != 1) {
     return false;
@@ -421,9 +421,13 @@ bool ExecutionGenerator::canUseCollisionFreeAggregation(
       return false;
   }
 
-  // TODO
+  // TODO(jianqiao):
+  // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
+  // 2. Reason about the upbound (e.g. by checking memory size) instead of
+  //    hardcoding it here.
+  const std::int64_t kGroupSizeUpbound = 1000000000;
   if (min_cpp_value < 0 ||
-      max_cpp_value > 1000000000 ||
+      max_cpp_value > kGroupSizeUpbound ||
       max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
     return false;
   }
@@ -436,6 +440,7 @@ bool ExecutionGenerator::canUseCollisionFreeAggregation(
       return false;
     }
 
+    // TODO(jianqiao): Support AggregationID::AVG.
     switch (agg_func->getAggregate().getAggregationID()) {
       case AggregationID::kCount:  // Fall through
       case AggregationID::kSum:
@@ -450,7 +455,16 @@ bool ExecutionGenerator::canUseCollisionFreeAggregation(
     }
 
     if (arguments.size() == 1) {
-      switch (arguments.front()->getValueType().getTypeID()) {
+      const Type &arg_type = arguments.front()->getValueType();
+
+      // TODO(jianqiao): we need a bit more work in CollisionFreeVectorTable to
+      // support nullable argument types. That is, we need a bit vector there
+      // for each aggregation handle to indicate whether the state is NULL.
+      if (arg_type.isNullable()) {
+        return false;
+      }
+
+      switch (arg_type.getTypeID()) {
         case TypeID::kInt:  // Fall through
         case TypeID::kLong:
         case TypeID::kFloat:
@@ -462,7 +476,7 @@ bool ExecutionGenerator::canUseCollisionFreeAggregation(
     }
   }
 
-  *exact_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
+  *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
   return true;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp 
b/query_optimizer/ExecutionGenerator.hpp
index b52fe97..987f11a 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -205,9 +205,21 @@ class ExecutionGenerator {
    */
   std::string getNewRelationName();
 
+  /**
+   * @brief Checks whether an aggregate node can be efficiently evaluated with
+   *        the collision-free aggregation fast path.
+   *
+   * @param aggregate The physical aggregate node to be checked.
+   * @param estimated_num_groups The estimated number of groups for the 
aggregate.
+   * @param exact_num_groups If collision-free aggregation is applicable, the
+   *        pointed content of this pointer will be set as the maximum possible
+   *        number of groups that the collision-free hash table need to hold.
+   * @return A bool value indicating whether collision-free aggregation can be
+   *         used to evaluate \p aggregate.
+   */
   bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
                                       const std::size_t estimated_num_groups,
-                                      std::size_t *exact_num_groups) const;
+                                      std::size_t *max_num_groups) const;
 
   /**
    * @brief Sets up the info of the CatalogRelation represented by 
TableReference.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp 
b/relational_operators/FinalizeAggregationOperator.cpp
index 72beb60..c80c575 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -45,7 +45,7 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
         query_context->getAggregationState(aggr_state_index_);
     DCHECK(agg_state != nullptr);
     for (std::size_t part_id = 0;
-         part_id < agg_state->getNumPartitions();
+         part_id < agg_state->getNumFinalizationPartitions();
          ++part_id) {
       container->addNormalWorkOrder(
           new FinalizeAggregationWorkOrder(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/relational_operators/InitializeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.cpp 
b/relational_operators/InitializeAggregationOperator.cpp
index 3da719d..162f909 100644
--- a/relational_operators/InitializeAggregationOperator.cpp
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -72,7 +72,7 @@ bool 
InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContain
 }
 
 void InitializeAggregationWorkOrder::execute() {
-  state_->initializeState(partition_id_);
+  state_->initialize(partition_id_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp 
b/storage/AggregationOperationState.cpp
index 007447f..0461b9e 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -197,8 +197,8 @@ AggregationOperationState::AggregationOperationState(
     }
   }
 
-  // Aggregation with GROUP BY: create a HashTable pool.
   if (!group_by_key_ids_.empty()) {
+    // Aggregation with GROUP BY: create the hash table (pool).
     if (is_aggregate_collision_free_) {
       collision_free_hashtable_.reset(
           AggregationStateHashTableFactory::CreateResizable(
@@ -216,11 +216,12 @@ AggregationOperationState::AggregationOperationState(
                                        group_by_handles,
                                        storage_manager));
     } else {
-      group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
-                                                       hash_table_impl_type,
-                                                       group_by_types_,
-                                                       group_by_handles,
-                                                       storage_manager));
+      group_by_hashtable_pool_.reset(
+          new HashTablePool(estimated_num_entries,
+                            hash_table_impl_type,
+                            group_by_types_,
+                            group_by_handles,
+                            storage_manager));
     }
   }
 }
@@ -352,36 +353,6 @@ bool AggregationOperationState::ProtoIsValid(
   return true;
 }
 
-std::size_t AggregationOperationState::getNumPartitions() const {
-  if (is_aggregate_collision_free_) {
-    return static_cast<CollisionFreeVectorTable *>(
-        collision_free_hashtable_.get())->getNumFinalizationPartitions();
-  } else if (is_aggregate_partitioned_) {
-    return partitioned_group_by_hashtable_pool_->getNumPartitions();
-  } else  {
-    return 1u;
-  }
-}
-
-std::size_t AggregationOperationState::getNumInitializationPartitions() const {
-  if (is_aggregate_collision_free_) {
-    return static_cast<CollisionFreeVectorTable *>(
-        collision_free_hashtable_.get())->getNumInitializationPartitions();
-  } else {
-    return 0u;
-  }
-}
-
-void AggregationOperationState::initializeState(const std::size_t 
partition_id) {
-  if (is_aggregate_collision_free_) {
-    static_cast<CollisionFreeVectorTable *>(
-        collision_free_hashtable_.get())->initialize(partition_id);
-  } else {
-    LOG(FATAL) << "AggregationOperationState::initializeState() "
-               << "is not supported by this aggregation";
-  }
-}
-
 bool AggregationOperationState::checkAggregatePartitioned(
     const std::size_t estimated_num_groups,
     const std::vector<bool> &is_distinct,
@@ -404,7 +375,8 @@ bool AggregationOperationState::checkAggregatePartitioned(
     return false;
   }
 
-  // Currently we require that all the group-by keys are ScalarAttributes.
+  // Currently we require that all the group-by keys are ScalarAttributes for
+  // the convenient of implementing copy elision.
   // TODO(jianqiao): relax this requirement.
   for (const auto &group_by_element : group_by) {
     if (group_by_element->getAttributeIdForValueAccessor() == 
kInvalidAttributeID) {
@@ -420,6 +392,36 @@ bool AggregationOperationState::checkAggregatePartitioned(
   return false;
 }
 
+std::size_t AggregationOperationState::getNumInitializationPartitions() const {
+  if (is_aggregate_collision_free_) {
+    return static_cast<CollisionFreeVectorTable *>(
+        collision_free_hashtable_.get())->getNumInitializationPartitions();
+  } else {
+    return 0u;
+  }
+}
+
+std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
+  if (is_aggregate_collision_free_) {
+    return static_cast<CollisionFreeVectorTable *>(
+        collision_free_hashtable_.get())->getNumFinalizationPartitions();
+  } else if (is_aggregate_partitioned_) {
+    return partitioned_group_by_hashtable_pool_->getNumPartitions();
+  } else  {
+    return 1u;
+  }
+}
+
+void AggregationOperationState::initialize(const std::size_t partition_id) {
+  if (is_aggregate_collision_free_) {
+    static_cast<CollisionFreeVectorTable *>(
+        collision_free_hashtable_.get())->initialize(partition_id);
+  } else {
+    LOG(FATAL) << "AggregationOperationState::initializeState() "
+               << "is not supported by this aggregation";
+  }
+}
+
 void AggregationOperationState::aggregateBlock(const block_id input_block,
                                                LIPFilterAdaptiveProber 
*lip_filter_adaptive_prober) {
   BlockReference block(
@@ -521,7 +523,6 @@ void AggregationOperationState::mergeGroupByHashTables(
 
 void AggregationOperationState::aggregateBlockHashTable(
     const ValueAccessorMultiplexer &accessor_mux) {
-  // TODO
   if (is_aggregate_collision_free_) {
     aggregateBlockHashTableImplCollisionFree(accessor_mux);
   } else if (is_aggregate_partitioned_) {
@@ -535,9 +536,9 @@ void 
AggregationOperationState::aggregateBlockHashTableImplCollisionFree(
     const ValueAccessorMultiplexer &accessor_mux) {
   DCHECK(collision_free_hashtable_ != nullptr);
 
-  collision_free_hashtable_->upsertValueAccessor(argument_ids_,
-                                                 group_by_key_ids_,
-                                                 accessor_mux);
+  collision_free_hashtable_->upsertValueAccessorCompositeKey(argument_ids_,
+                                                             group_by_key_ids_,
+                                                             accessor_mux);
 }
 
 void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
@@ -593,9 +594,9 @@ void 
AggregationOperationState::aggregateBlockHashTableImplPartitioned(
 
       ValueAccessorMultiplexer local_mux(base_adapter.get(), 
derived_adapter.get());
       partitioned_group_by_hashtable_pool_->getHashTable(partition)
-          ->upsertValueAccessor(argument_ids_,
-                                group_by_key_ids_,
-                                local_mux);
+          ->upsertValueAccessorCompositeKey(argument_ids_,
+                                            group_by_key_ids_,
+                                            local_mux);
     }
   });
 }
@@ -617,9 +618,9 @@ void 
AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
   AggregationStateHashTableBase *agg_hash_table =
       group_by_hashtable_pool_->getHashTable();
 
-  agg_hash_table->upsertValueAccessor(argument_ids_,
-                                      group_by_key_ids_,
-                                      accessor_mux);
+  agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_,
+                                                  group_by_key_ids_,
+                                                  accessor_mux);
   group_by_hashtable_pool_->returnHashTable(agg_hash_table);
 }
 
@@ -674,9 +675,8 @@ void 
AggregationOperationState::finalizeHashTableImplCollisionFree(
   CollisionFreeVectorTable *hash_table =
       static_cast<CollisionFreeVectorTable *>(collision_free_hashtable_.get());
 
-  // TODO
   const std::size_t max_length =
-      hash_table->getNumTuplesInPartition(partition_id);
+      hash_table->getNumTuplesInFinalizationPartition(partition_id);
   ColumnVectorsValueAccessor complete_result;
 
   DCHECK_EQ(1u, group_by_types_.size());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp 
b/storage/AggregationOperationState.hpp
index 783d7bc..27d7eb9 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -160,6 +160,29 @@ class AggregationOperationState {
       const CatalogDatabaseLite &database);
 
   /**
+   * @brief Get the number of partitions to be used for initializing the
+   *        aggregation.
+   *
+   * @return The number of partitions to be used for initializing the 
aggregation.
+   **/
+  std::size_t getNumInitializationPartitions() const;
+
+  /**
+   * @brief Get the number of partitions to be used for finalizing the
+   *        aggregation.
+   *
+   * @return The number of partitions to be used for finalizing the 
aggregation.
+   **/
+  std::size_t getNumFinalizationPartitions() const;
+
+  /**
+   * @brief Initialize the specified partition of this aggregation.
+   *
+   * @param partition_id ID of the partition to be initialized.
+   */
+  void initialize(const std::size_t partition_id);
+
+  /**
    * @brief Compute aggregates on the tuples of the given storage block,
    *        updating the running state maintained by this
    *        AggregationOperationState.
@@ -183,17 +206,8 @@ class AggregationOperationState {
   void finalizeAggregate(const std::size_t partition_id,
                          InsertDestination *output_destination);
 
-  /**
-   * @brief Get the number of partitions to be used for the aggregation.
-   *        For non-partitioned aggregations, we return 1.
-   **/
-  std::size_t getNumPartitions() const;
-
-  std::size_t getNumInitializationPartitions() const;
-
-  void initializeState(const std::size_t partition_id);
-
  private:
+  // Check whether partitioned aggregation can be applied.
   bool checkAggregatePartitioned(
       const std::size_t estimated_num_groups,
       const std::vector<bool> &is_distinct,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/CollisionFreeVectorTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.cpp 
b/storage/CollisionFreeVectorTable.cpp
index 841db25..f2bc23b 100644
--- a/storage/CollisionFreeVectorTable.cpp
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -39,23 +39,22 @@
 namespace quickstep {
 
 CollisionFreeVectorTable::CollisionFreeVectorTable(
-    const std::vector<const Type *> &key_types,
+    const Type *key_type,
     const std::size_t num_entries,
     const std::vector<AggregationHandle *> &handles,
     StorageManager *storage_manager)
-    : key_type_(key_types.front()),
+    : key_type_(key_type),
       num_entries_(num_entries),
       num_handles_(handles.size()),
       handles_(handles),
-      num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)),
+      
num_finalize_partitions_(CalculateNumFinalizationPartitions(num_entries_)),
       storage_manager_(storage_manager) {
-  CHECK_EQ(1u, key_types.size());
   DCHECK_GT(num_entries, 0u);
 
-  std::map<std::string, std::size_t> memory_offsets;
   std::size_t required_memory = 0;
+  const std::size_t existence_map_offset = 0;
+  std::vector<std::size_t> state_offsets;
 
-  memory_offsets.emplace("existence_map", required_memory);
   required_memory += CacheLineAlignedBytes(
       BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
 
@@ -89,8 +88,7 @@ CollisionFreeVectorTable::CollisionFreeVectorTable(
         LOG(FATAL) << "Not implemented";
     }
 
-    memory_offsets.emplace(std::string("state") + std::to_string(i),
-                           required_memory);
+    state_offsets.emplace_back(required_memory);
     required_memory += CacheLineAlignedBytes(state_size * num_entries);
   }
 
@@ -102,19 +100,18 @@ CollisionFreeVectorTable::CollisionFreeVectorTable(
 
   void *memory_start = blob_->getMemoryMutable();
   existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
-      reinterpret_cast<char *>(memory_start) + 
memory_offsets.at("existence_map"),
+      reinterpret_cast<char *>(memory_start) + existence_map_offset,
       num_entries,
       false /* initialize */));
 
   for (std::size_t i = 0; i < num_handles_; ++i) {
+    // Columnwise layout.
     vec_tables_.emplace_back(
-        reinterpret_cast<char *>(memory_start) +
-            memory_offsets.at(std::string("state") + std::to_string(i)));
+        reinterpret_cast<char *>(memory_start) + state_offsets.at(i));
   }
 
   memory_size_ = required_memory;
-  num_init_partitions_ =
-      std::max(1uL, std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL));
+  num_init_partitions_ = CalculateNumInitializationPartitions(memory_size_);
 }
 
 CollisionFreeVectorTable::~CollisionFreeVectorTable() {
@@ -126,7 +123,7 @@ CollisionFreeVectorTable::~CollisionFreeVectorTable() {
 void CollisionFreeVectorTable::destroyPayload() {
 }
 
-bool CollisionFreeVectorTable::upsertValueAccessor(
+bool CollisionFreeVectorTable::upsertValueAccessorCompositeKey(
     const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
     const std::vector<MultiSourceAttributeId> &key_ids,
     const ValueAccessorMultiplexer &accessor_mux) {
@@ -183,6 +180,7 @@ bool CollisionFreeVectorTable::upsertValueAccessor(
       is_argument_nullable = argument_type->isNullable();
     }
 
+    // Dispatch to specialized implementations to achieve maximum performance.
     InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
         base_accessor,
         [&](auto *accessor) -> void {  // NOLINT(build/c++11)
@@ -260,7 +258,7 @@ void CollisionFreeVectorTable::finalizeKey(const 
std::size_t partition_id,
 }
 
 void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id,
-                                             std::size_t handle_id,
+                                             const std::size_t handle_id,
                                              NativeColumnVector *output_cv) 
const {
   const std::size_t start_position =
       calculatePartitionStartPosition(partition_id);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp 
b/storage/CollisionFreeVectorTable.hpp
index cbe3b81..102b696 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -57,8 +57,17 @@ class StorageMnager;
 
 class CollisionFreeVectorTable : public AggregationStateHashTableBase {
  public:
+  /**
+   * @brief Constructor.
+   *
+   * @param key_type The group-by key type.
+   * @param num_entries The estimated number of entries this table will hold.
+   * @param handles The aggregation handles.
+   * @param storage_manager The StorageManager to use (a StorageBlob will be
+   *        allocated to hold this table's contents).
+   **/
   CollisionFreeVectorTable(
-      const std::vector<const Type *> &key_types,
+      const Type *key_type,
       const std::size_t num_entries,
       const std::vector<AggregationHandle *> &handles,
       StorageManager *storage_manager);
@@ -67,15 +76,30 @@ class CollisionFreeVectorTable : public 
AggregationStateHashTableBase {
 
   void destroyPayload() override;
 
+  /**
+   * @brief Get the number of partitions to be used for initializing the table.
+   *
+   * @return The number of partitions to be used for initializing the table.
+   */
   inline std::size_t getNumInitializationPartitions() const {
     return num_init_partitions_;
   }
 
+  /**
+   * @brief Get the number of partitions to be used for finalizing the 
aggregation.
+   *
+   * @return The number of partitions to be used for finalizing the 
aggregation.
+   */
   inline std::size_t getNumFinalizationPartitions() const {
     return num_finalize_partitions_;
   }
 
-  inline std::size_t getNumTuplesInPartition(
+  /**
+   * @brief Get the exact number of tuples in the specified finalization 
partition.
+   *
+   * @return The exact number of tuples in the specified finalization 
partition.
+   */
+  inline std::size_t getNumTuplesInFinalizationPartition(
       const std::size_t partition_id) const {
     const std::size_t start_position =
         calculatePartitionStartPosition(partition_id);
@@ -84,6 +108,11 @@ class CollisionFreeVectorTable : public 
AggregationStateHashTableBase {
     return existence_map_->onesCountInRange(start_position, end_position);
   }
 
+  /**
+   * @brief Initialize the specified partition of this aggregation table.
+   *
+   * @param partition_id ID of the partition to be initialized.
+   */
   inline void initialize(const std::size_t partition_id) {
     const std::size_t memory_segment_size =
         (memory_size_ + num_init_partitions_ - 1) / num_init_partitions_;
@@ -93,16 +122,46 @@ class CollisionFreeVectorTable : public 
AggregationStateHashTableBase {
                 std::min(memory_segment_size, memory_size_ - memory_start));
   }
 
-  bool upsertValueAccessor(
+  /**
+   * @brief Use aggregation handles to update (multiple) aggregation states in
+   *        this vector table, with group-by keys and arguments drawn from the
+   *        given ValueAccessors.
+   *
+   * @param argument_ids The multi-source attribute IDs of each argument
+   *        component to be read from \p accessor_mux.
+   * @param key_ids The multi-source attribute IDs of each group-by key
+   *        component to be read from \p accessor_mux.
+   * @param accessor_mux A ValueAccessorMultiplexer object that contains the
+   *        ValueAccessors which will be used to access keys. beginIteration()
+   *        should be called on the accessors before calling this method.
+   * @return Always return true.
+   **/
+  bool upsertValueAccessorCompositeKey(
       const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
       const std::vector<MultiSourceAttributeId> &key_ids,
       const ValueAccessorMultiplexer &accessor_mux) override;
 
+  /**
+   * @brief Copy the keys from this table to a NativeColumnVector, for the
+   *        specified partition.
+   *
+   * @param partition_id ID of the partition to copy keys from.
+   * @param output_cv The NativeColumnVector to copy keys to.
+   */
   void finalizeKey(const std::size_t partition_id,
                    NativeColumnVector *output_cv) const;
 
+
+  /**
+   * @brief Finalize the aggregation states to a NativeColumnVector, for the
+   *        specified partition and aggregation handle.
+   *
+   * @param partition_id ID of the partition to finalize.
+   * @param handle_id ID of the aggregation handle to finalize.
+   * @param output_cv The NativeColumnVector to write finalized values to.
+   */
   void finalizeState(const std::size_t partition_id,
-                     std::size_t handle_id,
+                     const std::size_t handle_id,
                      NativeColumnVector *output_cv) const;
 
  private:
@@ -110,6 +169,28 @@ class CollisionFreeVectorTable : public 
AggregationStateHashTableBase {
     return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * 
kCacheLineBytes;
   }
 
+  inline static std::size_t CalculateNumInitializationPartitions(
+      const std::size_t memory_size) {
+    // Set initialization memory block size as 4MB.
+    constexpr std::size_t kInitBlockSize = 4uL * 1024u * 1024u;
+
+    // At least 1 partition, at most 80 partitions.
+    // TODO(jianqiao): set the upbound as (# of workers * 2) instead of the
+    // hardcoded 80.
+    return std::max(1uL, std::min(memory_size / kInitBlockSize, 80uL));
+  }
+
+  inline static std::size_t CalculateNumFinalizationPartitions(
+      const std::size_t num_entries) {
+    // Set finalization segment size as 4096 entries.
+    constexpr std::size_t kFinalizeSegmentSize = 4uL * 1024L;
+
+    // At least 1 partition, at most 80 partitions.
+    // TODO(jianqiao): set the upbound as (# of workers * 2) instead of the
+    // hardcoded 80.
+    return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, 80uL));
+  }
+
   inline std::size_t calculatePartitionLength() const {
     const std::size_t partition_length =
         (num_entries_ + num_finalize_partitions_ - 1) / 
num_finalize_partitions_;
@@ -214,79 +295,29 @@ class CollisionFreeVectorTable : public 
AggregationStateHashTableBase {
   template <typename KeyT>
   inline void finalizeKeyInternal(const std::size_t start_position,
                                   const std::size_t end_position,
-                                  NativeColumnVector *output_cv) const {
-    std::size_t loc = start_position - 1;
-    while ((loc = existence_map_->nextOne(loc)) < end_position) {
-      *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc;
-    }
-  }
+                                  NativeColumnVector *output_cv) const;
 
   template <typename ...ArgTypes>
   inline void finalizeStateDispatchHelper(const AggregationID agg_id,
                                           const Type *argument_type,
                                           const void *vec_table,
-                                          ArgTypes &&...args) const {
-    switch (agg_id) {
-       case AggregationID::kCount:
-         finalizeStateCount(static_cast<const std::atomic<std::size_t> 
*>(vec_table),
-                            std::forward<ArgTypes>(args)...);
-         return;
-       case AggregationID::kSum:
-         finalizeStateSumHelper(argument_type,
-                                vec_table,
-                                std::forward<ArgTypes>(args)...);
-         return;
-       default:
-         LOG(FATAL) << "Not supported";
-    }
-  }
+                                          ArgTypes &&...args) const;
 
   template <typename ...ArgTypes>
   inline void finalizeStateSumHelper(const Type *argument_type,
                                      const void *vec_table,
-                                     ArgTypes &&...args) const {
-    DCHECK(argument_type != nullptr);
-
-    switch (argument_type->getTypeID()) {
-      case TypeID::kInt:    // Fall through
-      case TypeID::kLong:
-        finalizeStateSum<std::int64_t>(
-            static_cast<const std::atomic<std::int64_t> *>(vec_table),
-            std::forward<ArgTypes>(args)...);
-        return;
-      case TypeID::kFloat:  // Fall through
-      case TypeID::kDouble:
-        finalizeStateSum<double>(
-            static_cast<const std::atomic<double> *>(vec_table),
-            std::forward<ArgTypes>(args)...);
-        return;
-      default:
-        LOG(FATAL) << "Not supported";
-    }
-  }
+                                     ArgTypes &&...args) const;
 
   inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table,
                                  const std::size_t start_position,
                                  const std::size_t end_position,
-                                 NativeColumnVector *output_cv) const {
-    std::size_t loc = start_position - 1;
-    while ((loc = existence_map_->nextOne(loc)) < end_position) {
-      *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) =
-          vec_table[loc].load(std::memory_order_relaxed);
-    }
-  }
+                                 NativeColumnVector *output_cv) const;
 
   template <typename ResultT, typename StateT>
   inline void finalizeStateSum(const std::atomic<StateT> *vec_table,
                                const std::size_t start_position,
                                const std::size_t end_position,
-                               NativeColumnVector *output_cv) const {
-    std::size_t loc = start_position - 1;
-    while ((loc = existence_map_->nextOne(loc)) < end_position) {
-      *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) =
-          vec_table[loc].load(std::memory_order_relaxed);
-    }
-  }
+                               NativeColumnVector *output_cv) const;
 
   const Type *key_type_;
   const std::size_t num_entries_;
@@ -616,6 +647,88 @@ inline void CollisionFreeVectorTable
   }
 }
 
+template <typename KeyT>
+inline void CollisionFreeVectorTable
+    ::finalizeKeyInternal(const std::size_t start_position,
+                          const std::size_t end_position,
+                          NativeColumnVector *output_cv) const {
+  std::size_t loc = start_position - 1;
+  while ((loc = existence_map_->nextOne(loc)) < end_position) {
+    *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc;
+  }
+}
+
+template <typename ...ArgTypes>
+inline void CollisionFreeVectorTable
+    ::finalizeStateDispatchHelper(const AggregationID agg_id,
+                                  const Type *argument_type,
+                                  const void *vec_table,
+                                  ArgTypes &&...args) const {
+  switch (agg_id) {
+     case AggregationID::kCount:
+       finalizeStateCount(static_cast<const std::atomic<std::size_t> 
*>(vec_table),
+                          std::forward<ArgTypes>(args)...);
+       return;
+     case AggregationID::kSum:
+       finalizeStateSumHelper(argument_type,
+                              vec_table,
+                              std::forward<ArgTypes>(args)...);
+       return;
+     default:
+       LOG(FATAL) << "Not supported";
+  }
+}
+
+template <typename ...ArgTypes>
+inline void CollisionFreeVectorTable
+    ::finalizeStateSumHelper(const Type *argument_type,
+                             const void *vec_table,
+                             ArgTypes &&...args) const {
+  DCHECK(argument_type != nullptr);
+
+  switch (argument_type->getTypeID()) {
+    case TypeID::kInt:    // Fall through
+    case TypeID::kLong:
+      finalizeStateSum<std::int64_t>(
+          static_cast<const std::atomic<std::int64_t> *>(vec_table),
+          std::forward<ArgTypes>(args)...);
+      return;
+    case TypeID::kFloat:  // Fall through
+    case TypeID::kDouble:
+      finalizeStateSum<double>(
+          static_cast<const std::atomic<double> *>(vec_table),
+          std::forward<ArgTypes>(args)...);
+      return;
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+inline void CollisionFreeVectorTable
+    ::finalizeStateCount(const std::atomic<std::size_t> *vec_table,
+                         const std::size_t start_position,
+                         const std::size_t end_position,
+                         NativeColumnVector *output_cv) const {
+  std::size_t loc = start_position - 1;
+  while ((loc = existence_map_->nextOne(loc)) < end_position) {
+    *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) =
+        vec_table[loc].load(std::memory_order_relaxed);
+  }
+}
+
+template <typename ResultT, typename StateT>
+inline void CollisionFreeVectorTable
+    ::finalizeStateSum(const std::atomic<StateT> *vec_table,
+                       const std::size_t start_position,
+                       const std::size_t end_position,
+                       NativeColumnVector *output_cv) const {
+  std::size_t loc = start_position - 1;
+  while ((loc = existence_map_->nextOne(loc)) < end_position) {
+    *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) =
+        vec_table[loc].load(std::memory_order_relaxed);
+  }
+}
+
 }  // namespace quickstep
 
 #endif  // QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index 064c7cb..b4b6918 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -78,12 +78,6 @@ class HashTableBase {
  public:
   virtual ~HashTableBase() {}
 
-  /**
-   * @brief Destroy the payload stored in the hash table.
-   **/
-  virtual void destroyPayload() {
-  }
-
  protected:
   HashTableBase() {}
 
@@ -111,8 +105,10 @@ class AggregationStateHashTableBase {
    *
    * Optionally, we can also remove the AggregationStateHashTableBase
    * specialization from this file.
+   *
+   * TODO(jianqiao): Refractor the interface design for aggregation hash table.
    **/
-  virtual bool upsertValueAccessor(
+  virtual bool upsertValueAccessorCompositeKey(
       const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
       const std::vector<MultiSourceAttributeId> &key_attr_ids,
       const ValueAccessorMultiplexer &accessor_mux) = 0;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index b88bf87..9686429 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -334,10 +334,28 @@ typedef HashTableFactory<TupleReference, true, false, 
false, true>
     JoinHashTableFactory;
 
 /**
- * @brief TODO
- */
+ * @brief Factory class that makes it easier to instantiate aggregation state
+ *        hash tables.
+ **/
 class AggregationStateHashTableFactory {
  public:
+  /**
+   * @brief Create a new aggregation state hash table, with the type selected 
by
+   *        hash_table_type. Other parameters are forwarded to the hash table's
+   *        constructor.
+   *
+   * @param hash_table_type The specific hash table implementation that should
+   *        be used.
+   * @param key_types A vector of one or more types (>1 indicates a composite
+   *        key). Forwarded as-is to the hash table's constructor.
+   * @param num_entries The estimated number of entries the hash table will
+   *        hold. Forwarded as-is to the hash table's constructor.
+   * @param storage_manager The StorageManager to use (a StorageBlob will be
+   *        allocated to hold the hash table's contents). Forwarded as-is to 
the
+   *        hash table constructor.
+   * @return A new aggregation state hash table.
+   **/
+
   static AggregationStateHashTableBase* CreateResizable(
       const HashTableImplType hash_table_type,
       const std::vector<const Type*> &key_types,
@@ -349,11 +367,12 @@ class AggregationStateHashTableFactory {
         return new PackedPayloadHashTable(
             key_types, num_entries, handles, storage_manager);
       case HashTableImplType::kCollisionFreeVector:
+        DCHECK_EQ(1u, key_types.size());
         return new CollisionFreeVectorTable(
-            key_types, num_entries, handles, storage_manager);
+            key_types.front(), num_entries, handles, storage_manager);
       default: {
         LOG(FATAL) << "Unrecognized HashTableImplType in "
-                   << "AggregationStateHashTableFactory::createResizable()\n";
+                   << "AggregationStateHashTableFactory::createResizable()";
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/PackedPayloadHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.cpp 
b/storage/PackedPayloadHashTable.cpp
index 06e7687..2875aa9 100644
--- a/storage/PackedPayloadHashTable.cpp
+++ b/storage/PackedPayloadHashTable.cpp
@@ -203,7 +203,7 @@ void PackedPayloadHashTable::destroyPayload() {
   }
 }
 
-bool PackedPayloadHashTable::upsertValueAccessor(
+bool PackedPayloadHashTable::upsertValueAccessorCompositeKey(
     const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
     const std::vector<MultiSourceAttributeId> &key_attr_ids,
     const ValueAccessorMultiplexer &accessor_mux) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp 
b/storage/PackedPayloadHashTable.hpp
index 5e1b177..c5d5369 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -58,8 +58,30 @@ namespace quickstep {
  *  @{
  */
 
+/**
+ * @brief Aggregation hash table implementation in which the payload can be 
just
+ *        a bunch of bytes. This implementation is suitable for aggregation 
with
+ *        multiple aggregation handles (e.g. SUM, MAX, MIN etc).
+ *
+ * At present the hash table uses separate chaining to resolve collisions, i.e.
+ * Keys/values are stored in a separate region of memory from the base hash
+ * table slot array. Every bucket has a "next" pointer so that entries that
+ * collide (i.e. map to the same base slot) form chains of pointers with each
+ * other.
+ **/
 class PackedPayloadHashTable : public AggregationStateHashTableBase {
  public:
+  /**
+   * @brief Constructor.
+   *
+   * @param key_types A vector of one or more types (>1 indicates a composite
+   *        key).
+   * @param num_entries The estimated number of entries this hash table will
+   *        hold.
+   * @param handles The aggregation handles.
+   * @param storage_manager The StorageManager to use (a StorageBlob will be
+   *        allocated to hold this hash table's contents).
+   **/
   PackedPayloadHashTable(
       const std::vector<const Type *> &key_types,
       const std::size_t num_entries,
@@ -68,47 +90,228 @@ class PackedPayloadHashTable : public 
AggregationStateHashTableBase {
 
   ~PackedPayloadHashTable() override;
 
+  /**
+   * @brief Erase all entries in this hash table.
+   *
+   * @warning This method is not guaranteed to be threadsafe.
+   **/
   void clear();
 
   void destroyPayload() override;
 
-  bool upsertValueAccessor(
+  /**
+   * @brief Use aggregation handles to update (multiple) aggregation states in
+   *        this hash table, with group-by keys and arguments drawn from the
+   *        given ValueAccessors. New states are first inserted if not already
+   *        present.
+   *
+   * @note This method is threadsafe with regard to other calls to
+   *       upsertCompositeKey() and upsertValueAccessorCompositeKey().
+   *
+   * @param argument_ids The multi-source attribute IDs of each argument
+   *        component to be read from \p accessor_mux.
+   * @param key_ids The multi-source attribute IDs of each group-by key
+   *        component to be read from \p accessor_mux.
+   * @param accessor_mux A ValueAccessorMultiplexer object that contains the
+   *        ValueAccessors which will be used to access keys. beginIteration()
+   *        should be called on the accessors before calling this method.
+   * @return True on success, false if upsert failed because there was not
+   *         enough space to insert new entries for all the keys in accessor
+   *         (note that some entries may still have been upserted, and
+   *         accessors' iterations will be left on the first tuple which could
+   *         not be inserted).
+   **/
+  bool upsertValueAccessorCompositeKey(
       const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
       const std::vector<MultiSourceAttributeId> &key_ids,
       const ValueAccessorMultiplexer &accessor_mux) override;
 
+  /**
+   * @return The ID of the StorageBlob used to store this hash table.
+   **/
   inline block_id getBlobId() const {
     return blob_->getID();
   }
 
+  /**
+   * @warning This method assumes that no concurrent calls to
+   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+   *          taking place (i.e. that this HashTable is immutable for the
+   *          duration of the call).
+   *          Concurrent calls to getSingleCompositeKey(), forEach(), and
+   *          forEachCompositeKey() are safe.
+   *
+   * @return The number of entries in this HashTable.
+   **/
   inline std::size_t numEntries() const {
     return header_->buckets_allocated.load(std::memory_order_relaxed);
   }
 
+  /**
+   * @brief Use aggregation handles to merge the given aggregation states into
+   *        the aggregation states mapped to the given key. New states are 
first
+   *        inserted if not already present.
+   *
+   * @warning The key must not be null.
+   * @note This method is threadsafe with regard to other calls to
+   *       upsertCompositeKey() and upsertValueAccessorCompositeKey().
+   *
+   * @param key The key.
+   * @param source_state The source aggregation states to be merged into this
+   *        hash table.
+   * @return True on success, false if upsert failed because there was not
+   *         enough space to insert a new entry in this hash table.
+   **/
   inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
                                  const std::uint8_t *source_state);
 
+  /**
+   * @brief Apply a functor to an aggregation state mapped to the given key.
+   *        First inserting a new state if one is not already present.
+   *
+   * @warning The key must not be null.
+   * @note This method is threadsafe with regard to other calls to
+   *       upsertCompositeKey() and upsertValueAccessorCompositeKey().
+   *
+   * @param key The key.
+   * @param functor A pointer to a functor, which should provide a call
+   *        operator which takes an aggregation state (of type std::uint8_t *)
+   *        as an argument.
+   * @param index The index of the target aggregation state among those states
+   *        mapped to \p key.
+   * @return True on success, false if upsert failed because there was not
+   *         enough space to insert a new entry in this hash table.
+   **/
   template <typename FunctorT>
   inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
                                  FunctorT *functor,
-                                 int index);
+                                 const std::size_t index);
 
+  /**
+   * @brief Lookup a composite key against this hash table to find a matching
+   *        entry.
+   *
+   * @warning The key must not be null.
+   * @warning This method assumes that no concurrent calls to
+   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+   *          taking place (i.e. that this HashTable is immutable for the
+   *          duration of the call and as long as the returned pointer may be
+   *          dereferenced). Concurrent calls to getSingleCompositeKey(),
+   *          forEach(), and forEachCompositeKey() are safe.
+   *
+   * @param key The key to look up.
+   * @return The value of a matched entry if a matching key is found.
+   *         Otherwise, return NULL.
+   **/
   inline const std::uint8_t* getSingleCompositeKey(
       const std::vector<TypedValue> &key) const;
 
+  /**
+   * @brief Lookup a composite key against this hash table to find a matching
+   *        entry. Then return the aggregation state component with the
+   *        specified index.
+   *
+   * @warning The key must not be null.
+   * @warning This method assumes that no concurrent calls to
+   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+   *          taking place (i.e. that this HashTable is immutable for the
+   *          duration of the call and as long as the returned pointer may be
+   *          dereferenced). Concurrent calls to getSingleCompositeKey(),
+   *          forEach(), and forEachCompositeKey() are safe.
+   *
+   * @param key The key to look up.
+   * @param index The index of the target aggregation state among those states
+   *        mapped to \p key.
+   * @return The aggregation state of the specified index if a matching key is
+   *         found. Otherwise, return NULL.
+   **/
   inline const std::uint8_t* getSingleCompositeKey(
       const std::vector<TypedValue> &key,
-      const int index) const;
+      const std::size_t index) const;
 
+  /**
+   * @brief Apply a functor to each (key, value) pair in this hash table.
+   *
+   * @warning This method assumes that no concurrent calls to
+   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+   *          taking place (i.e. that this HashTable is immutable for the
+   *          duration of the call and as long as the returned pointer may be
+   *          dereferenced). Concurrent calls to getSingleCompositeKey(),
+   *          forEach(), and forEachCompositeKey() are safe.
+   *
+   * @param functor A pointer to a functor, which should provide a call 
operator
+   *        which takes 2 arguments: const TypedValue&, const std::uint8_t*.
+   *        The call operator will be invoked once on each key, value pair in
+   *        this hash table.
+   * @return The number of key-value pairs visited.
+   **/
   template <typename FunctorT>
   inline std::size_t forEach(FunctorT *functor) const;
 
+  /**
+   * @brief Apply a functor to each (key, aggregation state) pair in this hash
+   *        table, where the aggregation state is retrieved from the value
+   *        that maps to the corresponding key with the specified index.
+   *
+   * @warning This method assumes that no concurrent calls to
+   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+   *          taking place (i.e. that this HashTable is immutable for the
+   *          duration of the call and as long as the returned pointer may be
+   *          dereferenced). Concurrent calls to getSingleCompositeKey(),
+   *          forEach(), and forEachCompositeKey() are safe.
+   *
+   * @param functor A pointer to a functor, which should provide a call 
operator
+   *        which takes 2 arguments: const TypedValue&, const std::uint8_t*.
+   *        The call operator will be invoked once on each (key, aggregation 
state)
+   *        pair in this hash table.
+   * @param index The index of the target aggregation state among those states
+   *        mapped to \p key.
+   * @return The number of key-value pairs visited.
+   **/
   template <typename FunctorT>
   inline std::size_t forEach(FunctorT *functor, const int index) const;
 
+  /**
+   * @brief Apply a functor to each key, value pair in this hash table.
+   *        Composite key version.
+   *
+   * @warning This method assumes that no concurrent calls to
+   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+   *          taking place (i.e. that this HashTable is immutable for the
+   *          duration of the call and as long as the returned pointer may be
+   *          dereferenced). Concurrent calls to getSingleCompositeKey(),
+   *          forEach(), and forEachCompositeKey() are safe.
+   *
+   * @param functor A pointer to a functor, which should provide a call 
operator
+   *        which takes 2 arguments: const TypedValue&, const std::uint8_t*.
+   *        The call operator will be invoked once on each key, value pair in
+   *        this hash table.
+   * @return The number of key-value pairs visited.
+   **/
   template <typename FunctorT>
   inline std::size_t forEachCompositeKey(FunctorT *functor) const;
 
+  /**
+   * @brief Apply a functor to each (key, aggregation state) pair in this hash
+   *        table, where the aggregation state is retrieved from the value
+   *        that maps to the corresponding key with the specified index.
+   *        Composite key version.
+   *
+   * @warning This method assumes that no concurrent calls to
+   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+   *          taking place (i.e. that this HashTable is immutable for the
+   *          duration of the call and as long as the returned pointer may be
+   *          dereferenced). Concurrent calls to getSingleCompositeKey(),
+   *          forEach(), and forEachCompositeKey() are safe.
+   *
+   * @param functor A pointer to a functor, which should provide a call 
operator
+   *        which takes 2 arguments: const TypedValue&, const std::uint8_t*.
+   *        The call operator will be invoked once on each (key, aggregation 
state)
+   *        pair in this hash table.
+   * @param index The index of the target aggregation state among those states
+   *        mapped to \p key.
+   * @return The number of key-value pairs visited.
+   **/
   template <typename FunctorT>
   inline std::size_t forEachCompositeKey(FunctorT *functor,
                                          const std::size_t index) const;
@@ -495,7 +698,7 @@ inline const std::uint8_t* 
PackedPayloadHashTable::getSingleCompositeKey(
 
 inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey(
     const std::vector<TypedValue> &key,
-    const int index) const {
+    const std::size_t index) const {
   DEBUG_ASSERT(this->key_types_.size() == key.size());
 
   const std::size_t hash_code = this->hashCompositeKey(key);
@@ -549,7 +752,7 @@ template <typename FunctorT>
 inline bool PackedPayloadHashTable::upsertCompositeKey(
     const std::vector<TypedValue> &key,
     FunctorT *functor,
-    int index) {
+    const std::size_t index) {
   const std::size_t variable_size =
       calculateVariableLengthCompositeKeyCopySize(key);
   for (;;) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/ValueAccessorMultiplexer.hpp
----------------------------------------------------------------------
diff --git a/storage/ValueAccessorMultiplexer.hpp 
b/storage/ValueAccessorMultiplexer.hpp
index 0ed6175..94773a2 100644
--- a/storage/ValueAccessorMultiplexer.hpp
+++ b/storage/ValueAccessorMultiplexer.hpp
@@ -39,6 +39,10 @@ enum class ValueAccessorSource {
   kInvalid
 };
 
+/**
+ * @brief A data structure for representing attribute ids referring multiple
+ *        ValueAccessors.
+ */
 struct MultiSourceAttributeId {
   MultiSourceAttributeId(const ValueAccessorSource in_source,
                          const attribute_id in_attr_id)
@@ -53,21 +57,59 @@ struct MultiSourceAttributeId {
   const attribute_id attr_id;
 };
 
+/**
+ * @brief A class that encapsulates multiple ValueAccessors and provides helper
+ *        methods for accessing the ValueAccessors with MultiSourceAttributeId.
+ *
+ * This class is in its very initial form that serves a small set of essential
+ * functionalities for the purpose of aggregation copy elision. That is, given 
a
+ * storage block to be aggregated on, we may have aggregations on a storage
+ * attribute (e.g. SUM(x)) or on a non-trivial expression (e.g. SUM(x * y)).
+ * For the former case, copy elision is applicable that the attribute gets 
accessed
+ * directly from the storage block. In the later case, we have to create a
+ * temporary data structure (i.e. ColumnVectorsValueAccessor) that stores the
+ * intermediate results. Thus, we refer to the ValueAccessor created directly
+ * from the storage block as the BASE accessor and the intermediate result
+ * ColumnVectorsValueAccessor as the DERIVED accessor. And we utilize this 
class
+ * (ValueAccessorMultiplexer) to pass both accessors around to enable copy 
elision.
+ *
+ * This class (together with ValueAccessorSource and MultiSourceAttributeId)
+ * may be rewritten or exteneded later to more generally support copy elisions
+ * in various scenarios.
+ */
 class ValueAccessorMultiplexer {
  public:
+  /**
+   * @brief Constructor.
+   *
+   * @param base_accessor The base accessor.
+   * @param derived_accessor The derived accessor.
+   */
   ValueAccessorMultiplexer(ValueAccessor *base_accessor,
                            ValueAccessor *derived_accessor)
       : base_accessor_(base_accessor),
         derived_accessor_(derived_accessor) {}
 
+  /**
+   * @return The base accessor.
+   */
   inline ValueAccessor* getBaseAccessor() const {
     return base_accessor_;
   }
 
+  /**
+   * @return The derived accessor.
+   */
   inline ValueAccessor* getDerivedAccessor() const {
     return derived_accessor_;
   }
 
+  /**
+   * @brief Get the value accessor that corresponds to the specified source.
+   *
+   * @param source The value accessor source.
+   * @return The value accessor that corresponds to \p source.
+   */
   inline ValueAccessor* getValueAccessorBySource(
       const ValueAccessorSource &source) const {
     switch (source) {

Reply via email to