Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/239#discussion_r112884381
  
    --- Diff: storage/ThreadPrivateCompactKeyHashTable.cpp ---
    @@ -0,0 +1,422 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "storage/ThreadPrivateCompactKeyHashTable.hpp"
    +
    +#include <algorithm>
    +#include <cstddef>
    +#include <cstdint>
    +#include <type_traits>
    +#include <vector>
    +
    +#include "expressions/aggregation/AggregationHandle.hpp"
    +#include "expressions/aggregation/AggregationID.hpp"
    +#include "storage/StorageBlob.hpp"
    +#include "storage/StorageBlockInfo.hpp"
    +#include "storage/StorageManager.hpp"
    +#include "storage/ValueAccessorMultiplexer.hpp"
    +#include "types/Type.hpp"
    +#include "types/TypeID.hpp"
    +#include "types/containers/ColumnVectorsValueAccessor.hpp"
    +#include "utility/ScopedBuffer.hpp"
    +
    +#include "glog/logging.h"
    +
    +namespace quickstep {
    +
    +namespace {
    +
    +#define CASE_KEY_SIZE(value) \
    +  case value: return functor(std::integral_constant<std::size_t, value>())
    +
    +template <typename FunctorT>
    +auto InvokeOnKeySize(const std::size_t key_size, const FunctorT &functor) {
    +  switch (key_size) {
    +    CASE_KEY_SIZE(1);
    +    CASE_KEY_SIZE(2);
    +    CASE_KEY_SIZE(3);
    +    CASE_KEY_SIZE(4);
    +    CASE_KEY_SIZE(5);
    +    CASE_KEY_SIZE(6);
    +    CASE_KEY_SIZE(7);
    +    CASE_KEY_SIZE(8);
    +    default:
    +      break;
    +  }
    +  LOG(FATAL) << "Unexpected key size: " << key_size;
    +}
    +
    +#undef CASE_KEY_SIZE
    +
    +}  // namespace
    +
    +constexpr std::size_t ThreadPrivateCompactKeyHashTable::kKeyCodeSize;
    +
    +ThreadPrivateCompactKeyHashTable::ThreadPrivateCompactKeyHashTable(
    +    const std::vector<const Type*> &key_types,
    +    const std::size_t num_entries,
    +    const std::vector<AggregationHandle*> &handles,
    +    StorageManager *storage_manager)
    +    : key_types_(key_types),
    +      handles_(handles),
    +      total_state_size_(0),
    +      num_buckets_(0),
    +      buckets_allocated_(0),
    +      storage_manager_(storage_manager) {
    +  // Cache key sizes.
    +  for (const Type *key_type : key_types) {
    +    DCHECK(!key_type->isVariableLength());
    +    DCHECK(!key_type->isNullable());
    +    key_sizes_.emplace_back(key_type->maximumByteLength());
    +  }
    +
    +  for (const AggregationHandle *handle : handles) {
    +    const std::vector<const Type*> arg_types = handle->getArgumentTypes();
    +    DCHECK_LE(arg_types.size(), 1u);
    +    DCHECK(arg_types.empty() || !arg_types.front()->isNullable());
    +
    +    // Figure out state size.
    +    std::size_t state_size = 0;
    +    switch (handle->getAggregationID()) {
    +      case AggregationID::kCount: {
    +        state_size = sizeof(std::int64_t);
    +        break;
    +      }
    +      case AggregationID::kSum: {
    +        DCHECK_EQ(1u, arg_types.size());
    +        switch (arg_types.front()->getTypeID()) {
    +          case TypeID::kInt:  // Fall through
    +          case TypeID::kLong:
    +            state_size = sizeof(std::int64_t);
    +            break;
    +          case TypeID::kFloat:  // Fall through
    +          case TypeID::kDouble:
    +            state_size = sizeof(double);
    +            break;
    +          default:
    +            LOG(FATAL) << "Unexpected argument type";
    +        }
    +        break;
    +      }
    +      default:
    +        LOG(FATAL) << "Unexpected AggregationID";
    +    }
    +    state_sizes_.emplace_back(state_size);
    +    total_state_size_ += state_size;
    +  }
    +
    +  // Calculate required memory size for keys and states.
    +  const std::size_t required_memory =
    +      num_entries * (kKeyCodeSize + total_state_size_);
    +  const std::size_t num_storage_slots =
    +      storage_manager_->SlotsNeededForBytes(required_memory);
    +
    +  // Use storage manager to allocate memory.
    +  const block_id blob_id = storage_manager->createBlob(num_storage_slots);
    +  blob_ = storage_manager->getBlobMutable(blob_id);
    +
    +  num_buckets_ = blob_->size() / (kKeyCodeSize + total_state_size_);
    +  void *memory = blob_->getMemoryMutable();
    +
    +  // Calculate the memory locations of state vectors.
    +  keys_ = static_cast<KeyCode*>(memory);
    +  char *state_memory = static_cast<char*>(memory) + num_buckets_ * 
kKeyCodeSize;
    +  std::memset(state_memory, 0, num_buckets_ * total_state_size_);
    +
    +  for (std::size_t i = 0; i < state_sizes_.size(); ++i) {
    +    state_vecs_.emplace_back(state_memory);
    +    state_memory += num_buckets_ * state_sizes_[i];
    +  }
    +}
    +
    +ThreadPrivateCompactKeyHashTable::~ThreadPrivateCompactKeyHashTable() {
    +  // Release the blob.
    +  if (blob_.valid()) {
    +    const block_id blob_id = blob_->getID();
    +    blob_.release();
    +    storage_manager_->deleteBlockOrBlobFile(blob_id);
    +  }
    +}
    +
    +void ThreadPrivateCompactKeyHashTable::resize() {
    +  DCHECK_EQ(buckets_allocated_, num_buckets_);
    +
    +  const std::size_t resized_memory_size =
    +      num_buckets_ * 2 * (kKeyCodeSize + total_state_size_);
    +  const std::size_t resized_num_slots =
    +      storage_manager_->SlotsNeededForBytes(resized_memory_size);
    +
    +  const block_id resized_blob_id =
    +      storage_manager_->createBlob(resized_num_slots);
    +  MutableBlobReference resized_blob =
    +      storage_manager_->getBlobMutable(resized_blob_id);
    +
    +  const std::size_t resized_num_buckets =
    +      resized_blob->size() / (kKeyCodeSize + total_state_size_);
    +  void *resized_memory = resized_blob->getMemoryMutable();
    +
    +  KeyCode *resized_keys = static_cast<KeyCode*>(resized_memory);
    +  std::memcpy(resized_keys, keys_, buckets_allocated_ * kKeyCodeSize);
    +  keys_ = resized_keys;
    +
    +  char *resized_state_memory =
    +      static_cast<char*>(resized_memory) + resized_num_buckets * 
kKeyCodeSize;
    +  for (std::size_t i = 0; i < state_sizes_.size(); ++i) {
    +    const std::size_t vec_size = buckets_allocated_ * state_sizes_[i];
    +    const std::size_t resized_vec_size = resized_num_buckets * 
state_sizes_[i];
    +
    +    std::memcpy(resized_state_memory, state_vecs_[i], vec_size);
    +    std::memset(resized_state_memory + vec_size,
    +                0,
    +                resized_vec_size - vec_size);
    +
    +    state_vecs_[i] = resized_state_memory;
    +    resized_state_memory += resized_vec_size;
    +  }
    +
    +  std::swap(blob_, resized_blob);
    +  num_buckets_ = resized_num_buckets;
    +
    +  const block_id blob_id_to_delete = resized_blob->getID();
    +  resized_blob.release();
    +  storage_manager_->deleteBlockOrBlobFile(blob_id_to_delete);
    +}
    +
    +bool ThreadPrivateCompactKeyHashTable::upsertValueAccessorCompositeKey(
    +    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
    +    const std::vector<MultiSourceAttributeId> &key_attr_ids,
    +    const ValueAccessorMultiplexer &accessor_mux) {
    +  ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
    +  ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
    +
    +  DCHECK(base_accessor != nullptr);
    +  const std::size_t num_tuples = base_accessor->getNumTuplesVirtual();
    +
    +  ScopedBuffer buffer(num_tuples * kKeyCodeSize);
    +  KeyCode *key_codes = static_cast<KeyCode*>(buffer.get());
    +  std::size_t key_code_offset = 0;
    +  for (std::size_t i = 0; i < key_attr_ids.size(); ++i) {
    +    const auto &key_attr_id = key_attr_ids[i];
    +    ValueAccessor *accessor =
    +        key_attr_id.source == ValueAccessorSource::kBase
    +            ? base_accessor
    +            : derived_accessor;
    +    DCHECK(accessor != nullptr);
    +
    +    // Pack the key component into the 64-bit code (with proper offset).
    +    InvokeOnKeySize(
    +        key_sizes_[i],
    +        [&](auto key_size) -> void {  // NOLINT(build/c++11)
    +      ConstructKeyCode<decltype(key_size)::value>(
    +          key_code_offset, key_attr_id.attr_id, accessor, key_codes);
    +    });
    +    key_code_offset += key_sizes_[i];
    +  }
    +
    +  std::vector<BucketIndex> bucket_indices;
    +  bucket_indices.reserve(num_tuples);
    +  for (std::size_t i = 0; i < num_tuples; ++i) {
    +    const std::size_t code = key_codes[i];
    +    const auto index_it = index_.find(code);
    +    if (index_it == index_.end()) {
    +      if (buckets_allocated_ >= num_buckets_) {
    +        resize();
    +      }
    +      index_.emplace(code, buckets_allocated_);
    +      bucket_indices.emplace_back(buckets_allocated_);
    +      keys_[buckets_allocated_] = code;
    +      ++buckets_allocated_;
    +    } else {
    +      bucket_indices.emplace_back(index_it->second);
    +    }
    +  }
    +
    +  // Dispatch on AggregationID and argument type.
    +  // TODO(jianqiao): refactor type system and aggregation facilities to 
eliminate
    +  // this type of ad-hoc switch statements.
    +  for (std::size_t i = 0; i < handles_.size(); ++i) {
    +    const AggregationHandle *handle = handles_[i];
    +    switch (handle->getAggregationID()) {
    +      case AggregationID::kCount: {
    +        upsertValueAccessorCount(bucket_indices, state_vecs_[i]);
    +        break;
    +      }
    +      case AggregationID::kSum: {
    +        DCHECK_EQ(1u, argument_ids[i].size());
    +        const auto &argument_id = argument_ids[i].front();
    +        ValueAccessor *accessor =
    +            argument_id.source == ValueAccessorSource::kBase
    +                ? base_accessor
    +                : derived_accessor;
    +        DCHECK(accessor != nullptr);
    +
    +        DCHECK_EQ(1u, handle->getArgumentTypes().size());
    +        const Type *argument_type = handle->getArgumentTypes().front();
    +        switch (argument_type->getTypeID()) {
    +          case kInt: {
    +            upsertValueAccessorSum<int, std::int64_t>(
    +                bucket_indices, argument_id.attr_id, accessor, 
state_vecs_[i]);
    +            break;
    +          }
    +          case kLong: {
    +            upsertValueAccessorSum<std::int64_t, std::int64_t>(
    +                bucket_indices, argument_id.attr_id, accessor, 
state_vecs_[i]);
    +            break;
    +          }
    +          case kFloat: {
    +            upsertValueAccessorSum<float, double>(
    +                bucket_indices, argument_id.attr_id, accessor, 
state_vecs_[i]);
    +            break;
    +          }
    +          case kDouble: {
    +            upsertValueAccessorSum<double, double>(
    +                bucket_indices, argument_id.attr_id, accessor, 
state_vecs_[i]);
    +            break;
    +          }
    +          default:
    +            LOG(FATAL) << "Unexpected argument type";
    +        }
    +        break;
    +      }
    +      default:
    +        LOG(FATAL) << "Unexpected AggregationID";
    +    }
    +  }
    +
    +  return true;
    +}
    +
    +void ThreadPrivateCompactKeyHashTable::mergeFrom(
    +    const ThreadPrivateCompactKeyHashTable &source) {
    +  // First merge keys and generate location mappings. That is, source hash
    +  // table's bucket *i* should be merged into destination hash table's 
bucket
    +  // *dst_bucket_indices[i]*.
    +  std::vector<BucketIndex> dst_bucket_indices;
    --- End diff --
    
    Suggest to change to `std::vector<BucketIndex> 
dst_bucket_indices(source.buckets_allocated_);`, and use 
`dst_bucket_indices[i]` to assign new values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to