http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/539cb0a2/storage/FastSeparateChainingHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastSeparateChainingHashTable.hpp 
b/storage/FastSeparateChainingHashTable.hpp
new file mode 100644
index 0000000..64c4979
--- /dev/null
+++ b/storage/FastSeparateChainingHashTable.hpp
@@ -0,0 +1,1761 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_FAST_SEPARATE_CHAINING_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_FAST_SEPARATE_CHAINING_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstring>
+#include <limits>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "storage/HashTable.hpp"
+#include "storage/FastHashTable.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/HashTableKeyManager.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "threading/SpinSharedMutex.hpp"
+#include "types/Type.hpp"
+#include "types/TypedValue.hpp"
+#include "utility/Alignment.hpp"
+#include "utility/Macros.hpp"
+#include "utility/PrimeNumber.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+/**
+ * @brief A hash table implementation which uses separate chaining for buckets.
+ **/
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+class FastSeparateChainingHashTable : public FastHashTable<resizable,
+                                                   serializable,
+                                                   force_key_copy,
+                                                   allow_duplicate_keys> {
+ public:
+  FastSeparateChainingHashTable(const std::vector<const Type*> &key_types,
+                            const std::size_t num_entries,
+                            const std::vector<std::size_t> &payload_sizes,
+                            const std::vector<AggregationHandle *> &handles,
+                            StorageManager *storage_manager);
+
+  FastSeparateChainingHashTable(const std::vector<const Type*> &key_types,
+                            void *hash_table_memory,
+                            const std::size_t hash_table_memory_size,
+                            const bool new_hash_table,
+                            const bool hash_table_memory_zeroed);
+
+  // Delegating constructors for single scalar keys.
+  FastSeparateChainingHashTable(const Type &key_type,
+                            const std::size_t num_entries,
+                            StorageManager *storage_manager)
+      : FastSeparateChainingHashTable(std::vector<const Type*>(1, &key_type),
+                                  num_entries,
+                                  storage_manager) {
+  }
+
+  FastSeparateChainingHashTable(const Type &key_type,
+                            void *hash_table_memory,
+                            const std::size_t hash_table_memory_size,
+                            const bool new_hash_table,
+                            const bool hash_table_memory_zeroed)
+      : FastSeparateChainingHashTable(std::vector<const Type*>(1, &key_type),
+                                  hash_table_memory,
+                                  hash_table_memory_size,
+                                  new_hash_table,
+                                  hash_table_memory_zeroed) {
+  }
+
+  ~FastSeparateChainingHashTable() override {
+    DestroyValues(buckets_,
+                  header_->buckets_allocated.load(std::memory_order_relaxed),
+                  bucket_size_);
+    std::free(init_payload_);
+  }
+
+  void clear() override;
+
+  std::size_t numEntries() const override {
+    return header_->buckets_allocated.load(std::memory_order_relaxed);
+  }
+
+  const uint8_t* getSingle(const TypedValue &key) const override;
+  const uint8_t* getSingleCompositeKey(const std::vector<TypedValue> &key) 
const override;
+  const uint8_t* getSingleCompositeKey(const std::vector<TypedValue> &key, int 
index) const override;
+
+  void getAll(const TypedValue &key,
+              std::vector<const uint8_t*> *values) const override;
+  void getAllCompositeKey(const std::vector<TypedValue> &key,
+                          std::vector<const uint8_t*> *values) const override;
+
+ protected:
+  HashTablePutResult putInternal(const TypedValue &key,
+                                 const std::size_t variable_key_size,
+                                 const uint8_t &value,
+                                 HashTablePreallocationState *prealloc_state) 
override;
+  HashTablePutResult putCompositeKeyInternal(const std::vector<TypedValue> 
&key,
+                                             const std::size_t 
variable_key_size,
+                                             const uint8_t &value,
+                                             HashTablePreallocationState 
*prealloc_state) override;
+  HashTablePutResult putCompositeKeyInternalFast(const std::vector<TypedValue> 
&key,
+                                             const std::size_t 
variable_key_size,
+                                             const std::uint8_t 
*init_value_ptr,
+                                             HashTablePreallocationState 
*prealloc_state) override;
+
+  uint8_t* upsertInternal(const TypedValue &key,
+                         const std::size_t variable_key_size,
+                         const uint8_t &initial_value) override;
+  uint8_t* upsertInternalFast(const TypedValue &key,
+                         const std::uint8_t *init_value_ptr,
+                         const std::size_t variable_key_size) override;
+
+  uint8_t* upsertCompositeKeyInternal(const std::vector<TypedValue> &key,
+                                     const std::size_t variable_key_size,
+                                     const uint8_t &initial_value) override;
+
+  uint8_t* upsertCompositeKeyInternalFast(const std::vector<TypedValue> &key,
+                                     const std::uint8_t *init_value_ptr,
+                                     const std::size_t variable_key_size) 
override;
+
+  bool getNextEntry(TypedValue *key,
+                    const uint8_t **value,
+                    std::size_t *entry_num) const override;
+  bool getNextEntryCompositeKey(std::vector<TypedValue> *key,
+                                const uint8_t **value,
+                                std::size_t *entry_num) const override;
+
+  bool getNextEntryForKey(const TypedValue &key,
+                          const std::size_t hash_code,
+                          const uint8_t **value,
+                          std::size_t *entry_num) const override;
+  bool getNextEntryForCompositeKey(const std::vector<TypedValue> &key,
+                                   const std::size_t hash_code,
+                                   const uint8_t **value,
+                                   std::size_t *entry_num) const override;
+
+  bool hasKey(const TypedValue &key) const override;
+  bool hasCompositeKey(const std::vector<TypedValue> &key) const override;
+
+  void resize(const std::size_t extra_buckets,
+              const std::size_t extra_variable_storage,
+              const std::size_t retry_num = 0) override;
+
+  bool preallocateForBulkInsert(const std::size_t total_entries,
+                                const std::size_t total_variable_key_size,
+                                HashTablePreallocationState *prealloc_state) 
override;
+
+  size_t get_buckets_allocated() const override {return 
header_->buckets_allocated;}
+
+ private:
+  struct Header {
+    std::size_t num_slots;
+    std::size_t num_buckets;
+    alignas(kCacheLineBytes)
+        std::atomic<std::size_t> buckets_allocated;
+    alignas(kCacheLineBytes)
+        std::atomic<std::size_t> variable_length_bytes_allocated;
+  };
+
+  std::uint8_t *init_payload_;
+  std::size_t kBucketAlignment;
+
+  // Value's offset in a bucket is the first alignof(ValueT) boundary after the
+  // next pointer and hash code.
+  std::size_t kValueOffset;
+
+  // Round bucket size up to a multiple of kBucketAlignment.
+  constexpr std::size_t ComputeBucketSize(const std::size_t fixed_key_size) {
+    return (((kValueOffset + this->total_payload_size_ + fixed_key_size - 1) / 
kBucketAlignment) + 1)
+           * kBucketAlignment;
+  }
+  // If ValueT is not trivially destructible, invoke its destructor for all
+  // values held in the specified buckets (including those in "empty" buckets
+  // that were default constructed). If ValueT is trivially destructible, this
+  // is a no-op.
+  void DestroyValues(void *buckets,
+                            const std::size_t num_buckets,
+                            const std::size_t bucket_size);
+
+  // Attempt to find an empty bucket to insert 'hash_code' into, starting after
+  // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot
+  // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an
+  // empty bucket is found. Returns false if 'allow_duplicate_keys' is false
+  // and a hash collision is found (caller should then check whether there is a
+  // genuine key collision or the hash collision is spurious). Returns false
+  // and sets '*bucket' to NULL if there are no more empty buckets in the hash
+  // table. If 'variable_key_allocation_required' is nonzero, this method will
+  // attempt to allocate storage for a variable-length key BEFORE allocating a
+  // bucket, so that no bucket number below 'header_->num_buckets' is ever
+  // deallocated after being allocated.
+  inline bool locateBucketForInsertion(const std::size_t hash_code,
+                                       const std::size_t 
variable_key_allocation_required,
+                                       void **bucket,
+                                       std::atomic<std::size_t> 
**pending_chain_ptr,
+                                       std::size_t 
*pending_chain_ptr_finish_value,
+                                       HashTablePreallocationState 
*prealloc_state);
+
+  // Write a scalar 'key' and its 'hash_code' into the '*bucket', which was
+  // found by locateBucketForInsertion(). Assumes that storage for a
+  // variable-length key copy (if any) was already allocated by a successful
+  // call to allocateVariableLengthKeyStorage().
+  inline void writeScalarKeyToBucket(const TypedValue &key,
+                                     const std::size_t hash_code,
+                                     void *bucket,
+                                     HashTablePreallocationState 
*prealloc_state);
+
+  // Write a composite 'key' and its 'hash_code' into the '*bucket', which was
+  // found by locateBucketForInsertion(). Assumes that storage for
+  // variable-length key copies (if any) was already allocated by a successful
+  // call to allocateVariableLengthKeyStorage().
+  inline void writeCompositeKeyToBucket(const std::vector<TypedValue> &key,
+                                        const std::size_t hash_code,
+                                        void *bucket,
+                                        HashTablePreallocationState 
*prealloc_state);
+
+  // Determine whether it is actually necessary to resize this hash table.
+  // Checks that there is at least one unallocated bucket, and that there is
+  // at least 'extra_variable_storage' bytes of variable-length storage free.
+  bool isFull(const std::size_t extra_variable_storage) const;
+
+  // Helper object to manage key storage.
+  HashTableKeyManager<serializable, force_key_copy> key_manager_;
+
+  // In-memory structure is as follows:
+  //   - SeparateChainingHashTable::Header
+  //   - Array of slots, interpreted as follows:
+  //       - 0 = Points to nothing (empty)
+  //       - SIZE_T_MAX = Pending (some thread is starting a chain from this
+  //         slot and will overwrite it soon)
+  //       - Anything else = The number of the first bucket in the chain for
+  //         this slot PLUS ONE (i.e. subtract one to get the actual bucket
+  //         number).
+  //   - Array of buckets, each of which is:
+  //       - atomic size_t "next" pointer, interpreted the same as slots above.
+  //       - size_t hash value
+  //       - possibly some unused bytes as needed so that ValueT's alignment
+  //         requirement is met
+  //       - ValueT value slot
+  //       - fixed-length key storage (which may include pointers to external
+  //         memory or offsets of variable length keys stored within this hash
+  //         table)
+  //       - possibly some additional unused bytes so that bucket size is a
+  //         multiple of both alignof(std::atomic<std::size_t>) and
+  //         alignof(ValueT)
+  //   - Variable-length key storage region (referenced by offsets stored in
+  //     fixed-length keys).
+  Header *header_;
+
+  std::atomic<std::size_t> *slots_;
+  void *buckets_;
+  const std::size_t bucket_size_;
+
+  DISALLOW_COPY_AND_ASSIGN(FastSeparateChainingHashTable);
+};
+
+/** @} */
+
+// ----------------------------------------------------------------------------
+// Implementations of template class methods follow.
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::FastSeparateChainingHashTable(const std::vector<const Type*> &key_types,
+                                const std::size_t num_entries,
+                                const std::vector<std::size_t> &payload_sizes,
+                                const std::vector<AggregationHandle *> 
&handles,
+                                StorageManager *storage_manager)
+        : FastHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>(
+              key_types,
+              num_entries,
+              handles,
+              payload_sizes,
+              storage_manager,
+              false,
+              false,
+              true),
+          kBucketAlignment(alignof(std::atomic<std::size_t>)),
+          kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
+          key_manager_(this->key_types_, kValueOffset + 
this->total_payload_size_),
+          bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
+  init_payload_ = static_cast<std::uint8_t 
*>(calloc(this->total_payload_size_, 1));
+  for (auto handle : handles)
+      handle->initPayload(init_payload_);
+  // Bucket size always rounds up to the alignment requirement of the atomic
+  // size_t "next" pointer at the front or a ValueT, whichever is larger.
+  //
+  // Give base HashTable information about what key components are stored
+  // inline from 'key_manager_'.
+  this->setKeyInline(key_manager_.getKeyInline());
+
+  // Pick out a prime number of slots and calculate storage requirements.
+  std::size_t num_slots_tmp = get_next_prime_number(num_entries * 
kHashTableLoadFactor);
+  std::size_t required_memory = sizeof(Header)
+                                + num_slots_tmp * 
sizeof(std::atomic<std::size_t>)
+                                + (num_slots_tmp / kHashTableLoadFactor)
+                                    * (bucket_size_ + 
key_manager_.getEstimatedVariableKeySize());
+  std::size_t num_storage_slots = 
this->storage_manager_->SlotsNeededForBytes(required_memory);
+  if (num_storage_slots == 0) {
+    FATAL_ERROR("Storage requirement for SeparateChainingHashTable "
+                "exceeds maximum allocation size.");
+  }
+
+  // Get a StorageBlob to hold the hash table.
+  const block_id blob_id = 
this->storage_manager_->createBlob(num_storage_slots);
+  this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
+
+  void *aligned_memory_start = this->blob_->getMemoryMutable();
+  std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
+  if (align(alignof(Header),
+            sizeof(Header),
+            aligned_memory_start,
+            available_memory)
+          == nullptr) {
+    // With current values from StorageConstants.hpp, this should be
+    // impossible. A blob is at least 1 MB, while a Header has alignment
+    // requirement of just kCacheLineBytes (64 bytes).
+    FATAL_ERROR("StorageBlob used to hold resizable "
+                "SeparateChainingHashTable is too small to meet alignment "
+                "requirements of SeparateChainingHashTable::Header.");
+  } else if (aligned_memory_start != this->blob_->getMemoryMutable()) {
+    // This should also be impossible, since the StorageManager allocates slots
+    // aligned to kCacheLineBytes.
+    DEV_WARNING("StorageBlob memory adjusted by "
+                << (num_storage_slots * kSlotSizeBytes - available_memory)
+                << " bytes to meet alignment requirement for "
+                << "SeparateChainingHashTable::Header.");
+  }
+
+  // Locate the header.
+  header_ = static_cast<Header*>(aligned_memory_start);
+  aligned_memory_start = static_cast<char*>(aligned_memory_start) + 
sizeof(Header);
+  available_memory -= sizeof(Header);
+
+  // Recompute the number of slots & buckets using the actual available memory.
+  // Most likely, we got some extra free bucket space due to "rounding up" to
+  // the storage blob's size. It's also possible (though very unlikely) that we
+  // will wind up with fewer buckets than we initially wanted because of screwy
+  // alignment requirements for ValueT.
+  std::size_t num_buckets_tmp
+      = available_memory / (kHashTableLoadFactor * 
sizeof(std::atomic<std::size_t>)
+                            + bucket_size_
+                            + key_manager_.getEstimatedVariableKeySize());
+  num_slots_tmp = get_previous_prime_number(num_buckets_tmp * 
kHashTableLoadFactor);
+  num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor;
+  DEBUG_ASSERT(num_slots_tmp > 0);
+  DEBUG_ASSERT(num_buckets_tmp > 0);
+
+  // Locate the slot array.
+  slots_ = static_cast<std::atomic<std::size_t>*>(aligned_memory_start);
+  aligned_memory_start = static_cast<char*>(aligned_memory_start)
+                         + sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+  available_memory -= sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+
+  // Locate the buckets.
+  buckets_ = aligned_memory_start;
+  // Extra-paranoid: If ValueT has an alignment requirement greater than that
+  // of std::atomic<std::size_t>, we may need to adjust the start of the bucket
+  // array.
+  if (align(kBucketAlignment,
+            bucket_size_,
+            buckets_,
+            available_memory)
+          == nullptr) {
+    FATAL_ERROR("StorageBlob used to hold resizable "
+                "SeparateChainingHashTable is too small to meet "
+                "alignment requirements of buckets.");
+  } else if (buckets_ != aligned_memory_start) {
+    DEV_WARNING("Bucket array start position adjusted to meet alignment "
+                "requirement for SeparateChainingHashTable's value type.");
+    if (num_buckets_tmp * bucket_size_ > available_memory) {
+      --num_buckets_tmp;
+    }
+  }
+
+  // Fill in the header.
+  header_->num_slots = num_slots_tmp;
+  header_->num_buckets = num_buckets_tmp;
+  header_->buckets_allocated.store(0, std::memory_order_relaxed);
+  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+  available_memory -= bucket_size_ * (header_->num_buckets);
+
+  // Locate variable-length key storage region, and give it all the remaining
+  // bytes in the blob.
+  key_manager_.setVariableLengthStorageInfo(
+      static_cast<char*>(buckets_) + header_->num_buckets * bucket_size_,
+      available_memory,
+      &(header_->variable_length_bytes_allocated));
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::FastSeparateChainingHashTable(const std::vector<const Type*> &key_types,
+                                void *hash_table_memory,
+                                const std::size_t hash_table_memory_size,
+                                const bool new_hash_table,
+                                const bool hash_table_memory_zeroed)
+        : FastHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>(
+              key_types,
+              hash_table_memory,
+              hash_table_memory_size,
+              new_hash_table,
+              hash_table_memory_zeroed,
+              false,
+              false,
+              true),
+          kBucketAlignment(alignof(std::atomic<std::size_t>) < 
alignof(uint8_t) ? alignof(uint8_t)
+                                                  : 
alignof(std::atomic<std::size_t>)),
+          kValueOffset((((sizeof(std::atomic<std::size_t>) + 
sizeof(std::size_t) - 1) /
+                                           alignof(uint8_t)) + 1) * 
alignof(uint8_t)),
+          key_manager_(this->key_types_, kValueOffset + sizeof(uint8_t)),
+          bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
+  // Bucket size always rounds up to the alignment requirement of the atomic
+  // size_t "next" pointer at the front or a ValueT, whichever is larger.
+  //
+  // Make sure that the larger of the two alignment requirements also satisfies
+  // the smaller.
+  static_assert(alignof(std::atomic<std::size_t>) < alignof(uint8_t)
+                    ? alignof(uint8_t) % alignof(std::atomic<std::size_t>) == 0
+                    : alignof(std::atomic<std::size_t>) % alignof(uint8_t) == 
0,
+                "Alignment requirement of std::atomic<std::size_t> does not "
+                "evenly divide with alignment requirement of ValueT.");
+
+  // Give base HashTable information about what key components are stored
+  // inline from 'key_manager_'.
+  this->setKeyInline(key_manager_.getKeyInline());
+
+  // FIXME(chasseur): If we are reconstituting a HashTable using a block of
+  // memory whose start was aligned differently than the memory block that was
+  // originally used (modulo alignof(Header)), we could wind up with all of our
+  // data structures misaligned. If memory is inside a
+  // StorageBlock/StorageBlob, this will never occur, since the StorageManager
+  // always allocates slots aligned to kCacheLineBytes. Similarly, this isn't
+  // a problem for memory inside any other allocation aligned to at least
+  // alignof(Header) == kCacheLineBytes.
+
+  void *aligned_memory_start = this->hash_table_memory_;
+  std::size_t available_memory = this->hash_table_memory_size_;
+
+  if (align(alignof(Header),
+            sizeof(Header),
+            aligned_memory_start,
+            available_memory)
+          == nullptr) {
+    FATAL_ERROR("Attempted to create a non-resizable "
+                << "SeparateChainingHashTable with "
+                << available_memory << " bytes of memory at "
+                << aligned_memory_start << " which either can not fit a "
+                << "SeparateChainingHashTable::Header or meet its alignement "
+                << "requirement.");
+  } else if (aligned_memory_start != this->hash_table_memory_) {
+    // In general, we could get memory of any alignment, although at least
+    // cache-line aligned would be nice.
+    DEV_WARNING("StorageBlob memory adjusted by "
+                << (this->hash_table_memory_size_ - available_memory)
+                << " bytes to meet alignment requirement for "
+                << "SeparateChainingHashTable::Header.");
+  }
+
+  header_ = static_cast<Header*>(aligned_memory_start);
+  aligned_memory_start = static_cast<char*>(aligned_memory_start) + 
sizeof(Header);
+  available_memory -= sizeof(Header);
+
+  if (new_hash_table) {
+    std::size_t estimated_bucket_capacity
+        = available_memory / (kHashTableLoadFactor * 
sizeof(std::atomic<std::size_t>)
+                              + bucket_size_
+                              + key_manager_.getEstimatedVariableKeySize());
+    std::size_t num_slots = 
get_previous_prime_number(estimated_bucket_capacity * kHashTableLoadFactor);
+
+    // Fill in the header.
+    header_->num_slots = num_slots;
+    header_->num_buckets = num_slots / kHashTableLoadFactor;
+    header_->buckets_allocated.store(0, std::memory_order_relaxed);
+    header_->variable_length_bytes_allocated.store(0, 
std::memory_order_relaxed);
+  }
+
+  // Locate the slot array.
+  slots_ = static_cast<std::atomic<std::size_t>*>(aligned_memory_start);
+  aligned_memory_start = static_cast<char*>(aligned_memory_start)
+                         + sizeof(std::atomic<std::size_t>) * 
header_->num_slots;
+  available_memory -= sizeof(std::atomic<std::size_t>) * header_->num_slots;
+
+  if (new_hash_table && !hash_table_memory_zeroed) {
+    std::memset(slots_, 0x0, sizeof(std::atomic<std::size_t>) * 
header_->num_slots);
+  }
+
+  // Locate the buckets.
+  buckets_ = aligned_memory_start;
+  // Extra-paranoid: sizeof(Header) should almost certainly be a multiple of
+  // kBucketAlignment, unless ValueT has some members with seriously big
+  // (> kCacheLineBytes) alignment requirements specified using alignas().
+  if (align(kBucketAlignment,
+            bucket_size_,
+            buckets_,
+            available_memory)
+          == nullptr) {
+    FATAL_ERROR("Attempted to create a non-resizable "
+                << "SeparateChainingHashTable with "
+                << this->hash_table_memory_size_ << " bytes of memory at "
+                << this->hash_table_memory_ << ", which can hold an aligned "
+                << "SeparateChainingHashTable::Header but does not have "
+                << "enough remaining space for even a single hash bucket.");
+  } else if (buckets_ != aligned_memory_start) {
+    DEV_WARNING("Bucket array start position adjusted to meet alignment "
+                "requirement for SeparateChainingHashTable's value type.");
+    if (header_->num_buckets * bucket_size_ > available_memory) {
+      DEBUG_ASSERT(new_hash_table);
+      --(header_->num_buckets);
+    }
+  }
+  available_memory -= bucket_size_ * header_->num_buckets;
+
+  // Make sure "next" pointers in buckets are zeroed-out.
+  if (new_hash_table && !hash_table_memory_zeroed) {
+    std::memset(buckets_, 0x0, header_->num_buckets * bucket_size_);
+  }
+
+  // Locate variable-length key storage region.
+  key_manager_.setVariableLengthStorageInfo(
+      static_cast<char*>(buckets_) + header_->num_buckets * bucket_size_,
+      available_memory,
+      &(header_->variable_length_bytes_allocated));
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::clear() {
+  const std::size_t used_buckets = 
header_->buckets_allocated.load(std::memory_order_relaxed);
+  // Destroy existing values, if necessary.
+  DestroyValues(buckets_,
+                used_buckets,
+                bucket_size_);
+
+  // Zero-out slot array.
+  std::memset(slots_, 0x0, sizeof(std::atomic<std::size_t>) * 
header_->num_slots);
+
+  // Zero-out used buckets.
+  std::memset(buckets_, 0x0, used_buckets * bucket_size_);
+
+  header_->buckets_allocated.store(0, std::memory_order_relaxed);
+  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+  key_manager_.zeroNextVariableLengthKeyOffset();
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+const uint8_t* FastSeparateChainingHashTable<resizable, serializable, 
force_key_copy, allow_duplicate_keys>
+    ::getSingle(const TypedValue &key) const {
+  DEBUG_ASSERT(!allow_duplicate_keys);
+  DEBUG_ASSERT(this->key_types_.size() == 1);
+  
DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+  const std::size_t hash_code = key.getHash();
+  std::size_t bucket_ref = slots_[hash_code % 
header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+    const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) 
* bucket_size_;
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) && 
key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+      // Match located.
+      return reinterpret_cast<const uint8_t*>(bucket + kValueOffset);
+    }
+    bucket_ref = reinterpret_cast<const 
std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+  }
+
+  // Reached the end of the chain and didn't find a match.
+  return nullptr;
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+const uint8_t* FastSeparateChainingHashTable<resizable, serializable, 
force_key_copy, allow_duplicate_keys>
+    ::getSingleCompositeKey(const std::vector<TypedValue> &key) const {
+  DEBUG_ASSERT(!allow_duplicate_keys);
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  std::size_t bucket_ref = slots_[hash_code % 
header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+    const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) 
* bucket_size_;
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) && 
key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Match located.
+      return reinterpret_cast<const uint8_t*>(bucket + kValueOffset);
+    }
+    bucket_ref = reinterpret_cast<const 
std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+  }
+
+  // Reached the end of the chain and didn't find a match.
+  return nullptr;
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+const uint8_t* FastSeparateChainingHashTable<resizable, serializable, 
force_key_copy, allow_duplicate_keys>
+    ::getSingleCompositeKey(const std::vector<TypedValue> &key, int index) 
const {
+  DEBUG_ASSERT(!allow_duplicate_keys);
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  std::size_t bucket_ref = slots_[hash_code % 
header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+    const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) 
* bucket_size_;
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) && 
key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Match located.
+      return reinterpret_cast<const uint8_t*>(bucket + 
kValueOffset)+this->payload_offsets_[index];
+    }
+    bucket_ref = reinterpret_cast<const 
std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+  }
+
+  // Reached the end of the chain and didn't find a match.
+  return nullptr;
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::getAll(const TypedValue &key, std::vector<const uint8_t*> *values) const 
{
+  DEBUG_ASSERT(this->key_types_.size() == 1);
+  
DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+  const std::size_t hash_code = key.getHash();
+  std::size_t bucket_ref = slots_[hash_code % 
header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+    const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) 
* bucket_size_;
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) && 
key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+      // Match located.
+      values->push_back(reinterpret_cast<const uint8_t*>(bucket + 
kValueOffset));
+      if (!allow_duplicate_keys) {
+        return;
+      }
+    }
+    bucket_ref = reinterpret_cast<const 
std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+  }
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::getAllCompositeKey(const std::vector<TypedValue> &key, std::vector<const 
uint8_t*> *values) const {
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  std::size_t bucket_ref = slots_[hash_code % 
header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+    const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) 
* bucket_size_;
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) && 
key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Match located.
+      values->push_back(reinterpret_cast<const uint8_t*>(bucket + 
kValueOffset));
+      if (!allow_duplicate_keys) {
+        return;
+      }
+    }
+    bucket_ref = reinterpret_cast<const 
std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+  }
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+HashTablePutResult
+    FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+        ::putInternal(const TypedValue &key,
+                      const std::size_t variable_key_size,
+                      const uint8_t &value,
+                      HashTablePreallocationState *prealloc_state) {
+  DEBUG_ASSERT(this->key_types_.size() == 1);
+  
DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+  if (prealloc_state == nullptr) {
+    // Early check for a free bucket.
+    if (header_->buckets_allocated.load(std::memory_order_relaxed) >= 
header_->num_buckets) {
+      return HashTablePutResult::kOutOfSpace;
+    }
+
+    // TODO(chasseur): If allow_duplicate_keys is true, avoid storing more than
+    // one copy of the same variable-length key.
+    if (!key_manager_.allocateVariableLengthKeyStorage(variable_key_size)) {
+      // Ran out of variable-length key storage space.
+      return HashTablePutResult::kOutOfSpace;
+    }
+  }
+
+  const std::size_t hash_code = key.getHash();
+  void *bucket = nullptr;
+  std::atomic<std::size_t> *pending_chain_ptr;
+  std::size_t pending_chain_ptr_finish_value;
+  for (;;) {
+    if (locateBucketForInsertion(hash_code,
+                                 0,
+                                 &bucket,
+                                 &pending_chain_ptr,
+                                 &pending_chain_ptr_finish_value,
+                                 prealloc_state)) {
+      // Found an empty bucket.
+      break;
+    } else if (bucket == nullptr) {
+      // Ran out of buckets. Deallocate any variable space that we were unable
+      // to use.
+      DEBUG_ASSERT(prealloc_state == nullptr);
+      key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+      return HashTablePutResult::kOutOfSpace;
+    } else {
+      // Hash collision found, and duplicates aren't allowed.
+      DEBUG_ASSERT(!allow_duplicate_keys);
+      DEBUG_ASSERT(prealloc_state == nullptr);
+      if (key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+        // Duplicate key. Deallocate any variable storage space and return.
+        key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+        return HashTablePutResult::kDuplicateKey;
+      }
+    }
+  }
+
+  // Write the key and hash.
+  writeScalarKeyToBucket(key, hash_code, bucket, prealloc_state);
+
+  // Store the value by using placement new with ValueT's copy constructor.
+  new(static_cast<char*>(bucket) + kValueOffset) uint8_t(value);
+
+  // Update the previous chain pointer to point to the new bucket.
+  pending_chain_ptr->store(pending_chain_ptr_finish_value, 
std::memory_order_release);
+
+  // We're all done.
+  return HashTablePutResult::kOK;
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+HashTablePutResult
+    FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+        ::putCompositeKeyInternal(const std::vector<TypedValue> &key,
+                                  const std::size_t variable_key_size,
+                                  const uint8_t &value,
+                                  HashTablePreallocationState *prealloc_state) 
{
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  if (prealloc_state == nullptr) {
+    // Early check for a free bucket.
+    if (header_->buckets_allocated.load(std::memory_order_relaxed) >= 
header_->num_buckets) {
+      return HashTablePutResult::kOutOfSpace;
+    }
+
+    // TODO(chasseur): If allow_duplicate_keys is true, avoid storing more than
+    // one copy of the same variable-length key.
+    if (!key_manager_.allocateVariableLengthKeyStorage(variable_key_size)) {
+      // Ran out of variable-length key storage space.
+      return HashTablePutResult::kOutOfSpace;
+    }
+  }
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  void *bucket = nullptr;
+  std::atomic<std::size_t> *pending_chain_ptr;
+  std::size_t pending_chain_ptr_finish_value;
+  for (;;) {
+    if (locateBucketForInsertion(hash_code,
+                                 0,
+                                 &bucket,
+                                 &pending_chain_ptr,
+                                 &pending_chain_ptr_finish_value,
+                                 prealloc_state)) {
+      // Found an empty bucket.
+      break;
+    } else if (bucket == nullptr) {
+      // Ran out of buckets. Deallocate any variable space that we were unable
+      // to use.
+      DEBUG_ASSERT(prealloc_state == nullptr);
+      key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+      return HashTablePutResult::kOutOfSpace;
+    } else {
+      // Hash collision found, and duplicates aren't allowed.
+      DEBUG_ASSERT(!allow_duplicate_keys);
+      DEBUG_ASSERT(prealloc_state == nullptr);
+      if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+        // Duplicate key. Deallocate any variable storage space and return.
+        key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+        return HashTablePutResult::kDuplicateKey;
+      }
+    }
+  }
+
+  // Write the key and hash.
+  writeCompositeKeyToBucket(key, hash_code, bucket, prealloc_state);
+
+  // Store the value by using placement new with ValueT's copy constructor.
+  new(static_cast<char*>(bucket) + kValueOffset) uint8_t(value);
+
+  // Update the previous chain pointer to point to the new bucket.
+  pending_chain_ptr->store(pending_chain_ptr_finish_value, 
std::memory_order_release);
+
+  // We're all done.
+  return HashTablePutResult::kOK;
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+HashTablePutResult
+    FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+        ::putCompositeKeyInternalFast(const std::vector<TypedValue> &key,
+                                  const std::size_t variable_key_size,
+                                  const uint8_t *init_value_ptr,
+                                  HashTablePreallocationState *prealloc_state) 
{
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  if (prealloc_state == nullptr) {
+    // Early check for a free bucket.
+    if (header_->buckets_allocated.load(std::memory_order_relaxed) >= 
header_->num_buckets) {
+      return HashTablePutResult::kOutOfSpace;
+    }
+
+    // TODO(chasseur): If allow_duplicate_keys is true, avoid storing more than
+    // one copy of the same variable-length key.
+    if (!key_manager_.allocateVariableLengthKeyStorage(variable_key_size)) {
+      // Ran out of variable-length key storage space.
+      return HashTablePutResult::kOutOfSpace;
+    }
+  }
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  void *bucket = nullptr;
+  std::atomic<std::size_t> *pending_chain_ptr;
+  std::size_t pending_chain_ptr_finish_value;
+  for (;;) {
+    if (locateBucketForInsertion(hash_code,
+                                 0,
+                                 &bucket,
+                                 &pending_chain_ptr,
+                                 &pending_chain_ptr_finish_value,
+                                 prealloc_state)) {
+      // Found an empty bucket.
+      break;
+    } else if (bucket == nullptr) {
+      // Ran out of buckets. Deallocate any variable space that we were unable
+      // to use.
+      DEBUG_ASSERT(prealloc_state == nullptr);
+      key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+      return HashTablePutResult::kOutOfSpace;
+    } else {
+      // Hash collision found, and duplicates aren't allowed.
+      DEBUG_ASSERT(!allow_duplicate_keys);
+      DEBUG_ASSERT(prealloc_state == nullptr);
+      if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+        // Duplicate key. Deallocate any variable storage space and return.
+        key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+        return HashTablePutResult::kDuplicateKey;
+      }
+    }
+  }
+
+  // Write the key and hash.
+  writeCompositeKeyToBucket(key, hash_code, bucket, prealloc_state);
+
+  // Store the value by using placement new with ValueT's copy constructor.
+//  new(static_cast<char*>(bucket) + kValueOffset) uint8_t(value);
+    uint8_t *value = static_cast<uint8_t*>(bucket) + kValueOffset;
+        memcpy(value, init_value_ptr, this->total_payload_size_);
+  // Update the previous chain pointer to point to the new bucket.
+  pending_chain_ptr->store(pending_chain_ptr_finish_value, 
std::memory_order_release);
+
+  // We're all done.
+  return HashTablePutResult::kOK;
+}
+
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+uint8_t* FastSeparateChainingHashTable<resizable, serializable, 
force_key_copy, allow_duplicate_keys>
+    ::upsertInternal(const TypedValue &key,
+                     const std::size_t variable_key_size,
+                     const uint8_t &initial_value) {
+  DEBUG_ASSERT(!allow_duplicate_keys);
+  DEBUG_ASSERT(this->key_types_.size() == 1);
+  
DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+  if (variable_key_size > 0) {
+    // Don't allocate yet, since the key may already be present. However, we
+    // do check if either the allocated variable storage space OR the free
+    // space is big enough to hold the key (at least one must be true: either
+    // the key is already present and allocated, or we need to be able to
+    // allocate enough space for it).
+    std::size_t allocated_bytes = 
header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+    if ((allocated_bytes < variable_key_size)
+        && (allocated_bytes + variable_key_size > 
key_manager_.getVariableLengthKeyStorageSize())) {
+      return nullptr;
+    }
+  }
+
+  const std::size_t hash_code = key.getHash();
+  void *bucket = nullptr;
+  std::atomic<std::size_t> *pending_chain_ptr;
+  std::size_t pending_chain_ptr_finish_value;
+  for (;;) {
+    if (locateBucketForInsertion(hash_code,
+                                 variable_key_size,
+                                 &bucket,
+                                 &pending_chain_ptr,
+                                 &pending_chain_ptr_finish_value,
+                                 nullptr)) {
+      // Found an empty bucket.
+      break;
+    } else if (bucket == nullptr) {
+      // Ran out of buckets or variable-key space.
+      return nullptr;
+    } else if (key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+      // Found an already-existing entry for this key.
+      return reinterpret_cast<uint8_t*>(static_cast<char*>(bucket) + 
kValueOffset);
+    }
+  }
+
+  // We are now writing to an empty bucket.
+  // Write the key and hash.
+  writeScalarKeyToBucket(key, hash_code, bucket, nullptr);
+
+  // Copy the supplied 'initial_value' into place.
+  uint8_t *value = new(static_cast<char*>(bucket) + kValueOffset) 
uint8_t(initial_value);
+
+  // Update the previous chain pointer to point to the new bucket.
+  pending_chain_ptr->store(pending_chain_ptr_finish_value, 
std::memory_order_release);
+
+  // Return the value.
+  return value;
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+uint8_t* FastSeparateChainingHashTable<resizable, serializable, 
force_key_copy, allow_duplicate_keys>
+    ::upsertInternalFast(const TypedValue &key,
+                     const std::uint8_t *init_value_ptr,
+                     const std::size_t variable_key_size) {
+  DEBUG_ASSERT(!allow_duplicate_keys);
+  DEBUG_ASSERT(this->key_types_.size() == 1);
+  
DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+  if (variable_key_size > 0) {
+    // Don't allocate yet, since the key may already be present. However, we
+    // do check if either the allocated variable storage space OR the free
+    // space is big enough to hold the key (at least one must be true: either
+    // the key is already present and allocated, or we need to be able to
+    // allocate enough space for it).
+    std::size_t allocated_bytes = 
header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+    if ((allocated_bytes < variable_key_size)
+        && (allocated_bytes + variable_key_size > 
key_manager_.getVariableLengthKeyStorageSize())) {
+      return nullptr;
+    }
+  }
+
+  const std::size_t hash_code = key.getHash();
+  void *bucket = nullptr;
+  std::atomic<std::size_t> *pending_chain_ptr;
+  std::size_t pending_chain_ptr_finish_value;
+  for (;;) {
+    if (locateBucketForInsertion(hash_code,
+                                 variable_key_size,
+                                 &bucket,
+                                 &pending_chain_ptr,
+                                 &pending_chain_ptr_finish_value,
+                                 nullptr)) {
+      // Found an empty bucket.
+      break;
+    } else if (bucket == nullptr) {
+      // Ran out of buckets or variable-key space.
+      return nullptr;
+    } else if (key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+      // Found an already-existing entry for this key.
+      return reinterpret_cast<uint8_t*>(static_cast<char*>(bucket) + 
kValueOffset);
+    }
+  }
+
+  // We are now writing to an empty bucket.
+  // Write the key and hash.
+  writeScalarKeyToBucket(key, hash_code, bucket, nullptr);
+
+  // Copy the supplied 'initial_value' into place.
+//  uint8_t *value = new(static_cast<char*>(bucket) + kValueOffset) 
uint8_t(initial_value);
+
+    uint8_t *value = static_cast<unsigned char*>(bucket) + kValueOffset;
+    if (init_value_ptr == nullptr)
+        memcpy(value, init_payload_, this->total_payload_size_);
+    else
+        memcpy(value, init_value_ptr, this->total_payload_size_);
+
+
+  // Update the previous chain pointer to point to the new bucket.
+  pending_chain_ptr->store(pending_chain_ptr_finish_value, 
std::memory_order_release);
+
+  // Return the value.
+  return value;
+}
+
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+uint8_t* FastSeparateChainingHashTable<resizable, serializable, 
force_key_copy, allow_duplicate_keys>
+    ::upsertCompositeKeyInternal(const std::vector<TypedValue> &key,
+                                 const std::size_t variable_key_size,
+                                 const uint8_t &initial_value) {
+  DEBUG_ASSERT(!allow_duplicate_keys);
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  if (variable_key_size > 0) {
+    // Don't allocate yet, since the key may already be present. However, we
+    // do check if either the allocated variable storage space OR the free
+    // space is big enough to hold the key (at least one must be true: either
+    // the key is already present and allocated, or we need to be able to
+    // allocate enough space for it).
+    std::size_t allocated_bytes = 
header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+    if ((allocated_bytes < variable_key_size)
+        && (allocated_bytes + variable_key_size > 
key_manager_.getVariableLengthKeyStorageSize())) {
+      return nullptr;
+    }
+  }
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  void *bucket = nullptr;
+  std::atomic<std::size_t> *pending_chain_ptr;
+  std::size_t pending_chain_ptr_finish_value;
+  for (;;) {
+    if (locateBucketForInsertion(hash_code,
+                                 variable_key_size,
+                                 &bucket,
+                                 &pending_chain_ptr,
+                                 &pending_chain_ptr_finish_value,
+                                 nullptr)) {
+      // Found an empty bucket.
+      break;
+    } else if (bucket == nullptr) {
+      // Ran out of buckets or variable-key space.
+      return nullptr;
+    } else if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Found an already-existing entry for this key.
+      return reinterpret_cast<uint8_t*>(static_cast<char*>(bucket) + 
kValueOffset);
+    }
+  }
+
+  // We are now writing to an empty bucket.
+  // Write the key and hash.
+  writeCompositeKeyToBucket(key, hash_code, bucket, nullptr);
+
+  // Copy the supplied 'initial_value' into place.
+  uint8_t *value = new(static_cast<char*>(bucket) + kValueOffset) 
uint8_t(initial_value);
+
+  // Update the previous chaing pointer to point to the new bucket.
+  pending_chain_ptr->store(pending_chain_ptr_finish_value, 
std::memory_order_release);
+
+  // Return the value.
+  return value;
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+uint8_t* FastSeparateChainingHashTable<resizable, serializable, 
force_key_copy, allow_duplicate_keys>
+    ::upsertCompositeKeyInternalFast(const std::vector<TypedValue> &key,
+                                 const std::uint8_t *init_value_ptr,
+                                 const std::size_t variable_key_size) {
+  DEBUG_ASSERT(!allow_duplicate_keys);
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  if (variable_key_size > 0) {
+    // Don't allocate yet, since the key may already be present. However, we
+    // do check if either the allocated variable storage space OR the free
+    // space is big enough to hold the key (at least one must be true: either
+    // the key is already present and allocated, or we need to be able to
+    // allocate enough space for it).
+    std::size_t allocated_bytes = 
header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+    if ((allocated_bytes < variable_key_size)
+        && (allocated_bytes + variable_key_size > 
key_manager_.getVariableLengthKeyStorageSize())) {
+      return nullptr;
+    }
+  }
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  void *bucket = nullptr;
+  std::atomic<std::size_t> *pending_chain_ptr;
+  std::size_t pending_chain_ptr_finish_value;
+  for (;;) {
+    if (locateBucketForInsertion(hash_code,
+                                 variable_key_size,
+                                 &bucket,
+                                 &pending_chain_ptr,
+                                 &pending_chain_ptr_finish_value,
+                                 nullptr)) {
+      // Found an empty bucket.
+      break;
+    } else if (bucket == nullptr) {
+      // Ran out of buckets or variable-key space.
+      return nullptr;
+    } else if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Found an already-existing entry for this key.
+      return reinterpret_cast<uint8_t*>(static_cast<char*>(bucket) + 
kValueOffset);
+    }
+  }
+
+  // We are now writing to an empty bucket.
+  // Write the key and hash.
+  writeCompositeKeyToBucket(key, hash_code, bucket, nullptr);
+
+//  uint8_t *value;
+//  value = static_cast<unsigned char*>(bucket) + kValueOffset;
+    uint8_t *value = static_cast<unsigned char*>(bucket) + kValueOffset;
+    if (init_value_ptr == nullptr)
+        memcpy(value, init_payload_, this->total_payload_size_);
+    else
+        memcpy(value, init_value_ptr, this->total_payload_size_);
+
+  // Update the previous chaing pointer to point to the new bucket.
+  pending_chain_ptr->store(pending_chain_ptr_finish_value, 
std::memory_order_release);
+
+  // Return the value.
+  return value;
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::getNextEntry(TypedValue *key, const uint8_t **value, std::size_t 
*entry_num) const {
+  DEBUG_ASSERT(this->key_types_.size() == 1);
+  if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) 
{
+    const char *bucket = static_cast<const char*>(buckets_) + (*entry_num) * 
bucket_size_;
+    *key = key_manager_.getKeyComponentTyped(bucket, 0);
+    *value = reinterpret_cast<const uint8_t*>(bucket + kValueOffset);
+    ++(*entry_num);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::getNextEntryCompositeKey(std::vector<TypedValue> *key,
+                               const uint8_t **value,
+                               std::size_t *entry_num) const {
+  if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) 
{
+    const char *bucket = static_cast<const char*>(buckets_) + (*entry_num) * 
bucket_size_;
+    for (std::vector<const Type*>::size_type key_idx = 0;
+         key_idx < this->key_types_.size();
+         ++key_idx) {
+      key->emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx));
+    }
+    *value = reinterpret_cast<const uint8_t*>(bucket + kValueOffset);
+    ++(*entry_num);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::getNextEntryForKey(const TypedValue &key,
+                         const std::size_t hash_code,
+                         const uint8_t **value,
+                         std::size_t *entry_num) const {
+  DEBUG_ASSERT(this->key_types_.size() == 1);
+  
DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+  if (*entry_num == 0) {
+    *entry_num = slots_[hash_code % 
header_->num_slots].load(std::memory_order_relaxed);
+  } else if (*entry_num == std::numeric_limits<std::size_t>::max()) {
+    return false;
+  }
+
+  while (*entry_num != 0) {
+    DEBUG_ASSERT(*entry_num != std::numeric_limits<std::size_t>::max());
+    const char *bucket = static_cast<const char*>(buckets_) + (*entry_num - 1) 
* bucket_size_;
+    *entry_num = reinterpret_cast<const 
std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) && 
key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+      // Match located.
+      *value = reinterpret_cast<const uint8_t*>(bucket + kValueOffset);
+      if (*entry_num == 0) {
+        // If this is the last bucket in the chain, prevent the next call from
+        // starting over again.
+        *entry_num = std::numeric_limits<std::size_t>::max();
+      }
+      return true;
+    }
+  }
+
+  // Reached the end of the chain.
+  return false;
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::getNextEntryForCompositeKey(const std::vector<TypedValue> &key,
+                                  const std::size_t hash_code,
+                                  const uint8_t **value,
+                                  std::size_t *entry_num) const {
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  if (*entry_num == 0) {
+    *entry_num = slots_[hash_code % 
header_->num_slots].load(std::memory_order_relaxed);
+  } else if (*entry_num == std::numeric_limits<std::size_t>::max()) {
+    return false;
+  }
+
+  while (*entry_num != 0) {
+    DEBUG_ASSERT(*entry_num != std::numeric_limits<std::size_t>::max());
+    const char *bucket = static_cast<const char*>(buckets_) + (*entry_num - 1) 
* bucket_size_;
+    *entry_num = reinterpret_cast<const 
std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) && 
key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Match located.
+      *value = reinterpret_cast<const uint8_t*>(bucket + kValueOffset);
+      if (*entry_num == 0) {
+        // If this is the last bucket in the chain, prevent the next call from
+        // starting over again.
+        *entry_num = std::numeric_limits<std::size_t>::max();
+      }
+      return true;
+    }
+  }
+
+  // Reached the end of the chain.
+  return false;
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::hasKey(const TypedValue &key) const {
+  DEBUG_ASSERT(this->key_types_.size() == 1);
+  
DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+  const std::size_t hash_code = key.getHash();
+  std::size_t bucket_ref = slots_[hash_code % 
header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+    const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) 
* bucket_size_;
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) && 
key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+      // Find a match.
+      return true;
+    }
+    bucket_ref = reinterpret_cast<const 
std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+  }
+  return false;
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::hasCompositeKey(const std::vector<TypedValue> &key) const {
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  std::size_t bucket_ref = slots_[hash_code % 
header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+    const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) 
* bucket_size_;
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) && 
key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Find a match.
+      return true;
+    }
+    bucket_ref = reinterpret_cast<const 
std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+  }
+  return false;
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::resize(const std::size_t extra_buckets,
+             const std::size_t extra_variable_storage,
+             const std::size_t retry_num) {
+  DEBUG_ASSERT(resizable);
+
+  // A retry should never be necessary with this implementation of HashTable.
+  // Separate chaining ensures that any resized hash table with more buckets
+  // than the original table will be able to hold more entries than the
+  // original.
+  DEBUG_ASSERT(retry_num == 0);
+
+  SpinSharedMutexExclusiveLock<true> write_lock(this->resize_shared_mutex_);
+
+  // Recheck whether the hash table is still full. Note that multiple threads
+  // might wait to rebuild this hash table simultaneously. Only the first one
+  // should do the rebuild.
+  if (!isFull(extra_variable_storage)) {
+    return;
+  }
+
+  // Approximately double the number of buckets and slots.
+  //
+  // TODO(chasseur): It may be worth it to more than double the number of
+  // buckets here so that we can maintain a good, sparse fill factor for a
+  // longer time as more values are inserted. Such behavior should take into
+  // account kHashTableLoadFactor.
+  std::size_t resized_num_slots = get_next_prime_number(
+      (header_->num_buckets + extra_buckets / 2) * kHashTableLoadFactor * 2);
+  std::size_t variable_storage_required
+      = (resized_num_slots / kHashTableLoadFactor) * 
key_manager_.getEstimatedVariableKeySize();
+  const std::size_t original_variable_storage_used
+      = 
header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+  // If this resize was triggered by a too-large variable-length key, bump up
+  // the variable-length storage requirement.
+  if ((extra_variable_storage > 0)
+      && (extra_variable_storage + original_variable_storage_used
+          > key_manager_.getVariableLengthKeyStorageSize())) {
+    variable_storage_required += extra_variable_storage;
+  }
+
+  const std::size_t resized_memory_required
+      = sizeof(Header)
+        + resized_num_slots * sizeof(std::atomic<std::size_t>)
+        + (resized_num_slots / kHashTableLoadFactor) * bucket_size_
+        + variable_storage_required;
+  const std::size_t resized_storage_slots
+      = this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
+  if (resized_storage_slots == 0) {
+    FATAL_ERROR("Storage requirement for resized SeparateChainingHashTable "
+                "exceeds maximum allocation size.");
+  }
+
+  // Get a new StorageBlob to hold the resized hash table.
+  const block_id resized_blob_id = 
this->storage_manager_->createBlob(resized_storage_slots);
+  MutableBlobReference resized_blob = 
this->storage_manager_->getBlobMutable(resized_blob_id);
+
+  // Locate data structures inside the new StorageBlob.
+  void *aligned_memory_start = resized_blob->getMemoryMutable();
+  std::size_t available_memory = resized_storage_slots * kSlotSizeBytes;
+  if (align(alignof(Header),
+            sizeof(Header),
+            aligned_memory_start,
+            available_memory)
+          == nullptr) {
+    // Should be impossible, as noted in constructor.
+    FATAL_ERROR("StorageBlob used to hold resized SeparateChainingHashTable "
+                "is too small to meet alignment requirements of "
+                "LinearOpenAddressingHashTable::Header.");
+  } else if (aligned_memory_start != resized_blob->getMemoryMutable()) {
+    // Again, should be impossible.
+    DEV_WARNING("In SeparateChainingHashTable::resize(), StorageBlob "
+                << "memory adjusted by "
+                << (resized_num_slots * kSlotSizeBytes - available_memory)
+                << " bytes to meet alignment requirement for "
+                << "LinearOpenAddressingHashTable::Header.");
+  }
+
+  Header *resized_header = static_cast<Header*>(aligned_memory_start);
+  aligned_memory_start = static_cast<char*>(aligned_memory_start) + 
sizeof(Header);
+  available_memory -= sizeof(Header);
+
+  // As in constructor, recompute the number of slots and buckets using the
+  // actual available memory.
+  std::size_t resized_num_buckets
+      = (available_memory - extra_variable_storage)
+        / (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>)
+           + bucket_size_
+           + key_manager_.getEstimatedVariableKeySize());
+  resized_num_slots = get_previous_prime_number(resized_num_buckets * 
kHashTableLoadFactor);
+  resized_num_buckets = resized_num_slots / kHashTableLoadFactor;
+
+  // Locate slot array.
+  std::atomic<std::size_t> *resized_slots = 
static_cast<std::atomic<std::size_t>*>(aligned_memory_start);
+  aligned_memory_start = static_cast<char*>(aligned_memory_start)
+                         + sizeof(std::atomic<std::size_t>) * 
resized_num_slots;
+  available_memory -= sizeof(std::atomic<std::size_t>) * resized_num_slots;
+
+  // As in constructor, we will be extra paranoid and use align() to locate the
+  // start of the array of buckets, as well.
+  void *resized_buckets = aligned_memory_start;
+  if (align(kBucketAlignment,
+            bucket_size_,
+            resized_buckets,
+            available_memory)
+          == nullptr) {
+    FATAL_ERROR("StorageBlob used to hold resized SeparateChainingHashTable "
+                "is too small to meet alignment requirements of buckets.");
+  } else if (resized_buckets != aligned_memory_start) {
+    DEV_WARNING("Bucket array start position adjusted to meet alignment "
+                "requirement for SeparateChainingHashTable's value type.");
+    if (resized_num_buckets * bucket_size_ + variable_storage_required > 
available_memory) {
+      --resized_num_buckets;
+    }
+  }
+  aligned_memory_start = static_cast<char*>(aligned_memory_start)
+                         + resized_num_buckets * bucket_size_;
+  available_memory -= resized_num_buckets * bucket_size_;
+
+  void *resized_variable_length_key_storage = aligned_memory_start;
+  const std::size_t resized_variable_length_key_storage_size = 
available_memory;
+
+  const std::size_t original_buckets_used = 
header_->buckets_allocated.load(std::memory_order_relaxed);
+
+  // Initialize the header.
+  resized_header->num_slots = resized_num_slots;
+  resized_header->num_buckets = resized_num_buckets;
+  resized_header->buckets_allocated.store(original_buckets_used, 
std::memory_order_relaxed);
+  resized_header->variable_length_bytes_allocated.store(
+      original_variable_storage_used,
+      std::memory_order_relaxed);
+
+  // Bulk-copy buckets. This is safe because:
+  //     1. The "next" pointers will be adjusted when rebuilding chains below.
+  //     2. The hash codes will stay the same.
+  //     3. For key components:
+  //       a. Inline keys will stay exactly the same.
+  //       b. Offsets into variable-length storage will remain valid, because
+  //          we also do a byte-for-byte copy of variable-length storage below.
+  //       c. Absolute external pointers will still point to the same address.
+  //       d. Relative pointers are not used with resizable hash tables.
+  //     4. If values are not trivially copyable, then we invoke ValueT's copy
+  //        or move constructor with placement new.
+  std::memcpy(resized_buckets, buckets_, original_buckets_used * bucket_size_);
+
+  // TODO(chasseur): std::is_trivially_copyable is not yet implemented in
+  // GCC 4.8.3, so we assume we need to invoke ValueT's copy or move
+  // constructor, even though the plain memcpy above could suffice for many
+  // possible ValueTs.
+  void *current_value_original = static_cast<char*>(buckets_) + kValueOffset;
+  void *current_value_resized = static_cast<char*>(resized_buckets) + 
kValueOffset;
+  for (std::size_t bucket_num = 0; bucket_num < original_buckets_used; 
++bucket_num) {
+    // Use a move constructor if available to avoid a deep-copy, since resizes
+    // always succeed.
+    new (current_value_resized) 
uint8_t(std::move(*static_cast<uint8_t*>(current_value_original)));
+    current_value_original = static_cast<char*>(current_value_original) + 
bucket_size_;
+    current_value_resized = static_cast<char*>(current_value_resized) + 
bucket_size_;
+  }
+
+  // Copy over variable-length key components, if any.
+  if (original_variable_storage_used > 0) {
+    DEBUG_ASSERT(original_variable_storage_used
+                 == key_manager_.getNextVariableLengthKeyOffset());
+    DEBUG_ASSERT(original_variable_storage_used <= 
resized_variable_length_key_storage_size);
+    std::memcpy(resized_variable_length_key_storage,
+                key_manager_.getVariableLengthKeyStorage(),
+                original_variable_storage_used);
+  }
+
+  // Destroy values in the original hash table, if neccesary,
+  DestroyValues(buckets_,
+                original_buckets_used,
+                bucket_size_);
+
+  // Make resized structures active.
+  std::swap(this->blob_, resized_blob);
+  header_ = resized_header;
+  slots_ = resized_slots;
+  buckets_ = resized_buckets;
+  key_manager_.setVariableLengthStorageInfo(
+      resized_variable_length_key_storage,
+      resized_variable_length_key_storage_size,
+      &(resized_header->variable_length_bytes_allocated));
+
+  // Drop the old blob.
+  const block_id old_blob_id = resized_blob->getID();
+  resized_blob.release();
+  this->storage_manager_->deleteBlockOrBlobFile(old_blob_id);
+
+  // Rebuild chains.
+  void *current_bucket = buckets_;
+  for (std::size_t bucket_num = 0; bucket_num < original_buckets_used; 
++bucket_num) {
+    std::atomic<std::size_t> *next_ptr
+        = static_cast<std::atomic<std::size_t>*>(current_bucket);
+    const std::size_t hash_code = *reinterpret_cast<const std::size_t*>(
+        static_cast<const char*>(current_bucket) + 
sizeof(std::atomic<std::size_t>));
+
+    const std::size_t slot_number = hash_code % header_->num_slots;
+    std::size_t slot_ptr_value = 0;
+    if (slots_[slot_number].compare_exchange_strong(slot_ptr_value,
+                                                    bucket_num + 1,
+                                                    
std::memory_order_relaxed)) {
+      // This bucket is the first in the chain for this block, so reset its
+      // next pointer to 0.
+      next_ptr->store(0, std::memory_order_relaxed);
+    } else {
+      // A chain already exists starting from this slot, so put this bucket at
+      // the head.
+      next_ptr->store(slot_ptr_value, std::memory_order_relaxed);
+      slots_[slot_number].store(bucket_num + 1, std::memory_order_relaxed);
+    }
+    current_bucket = static_cast<char*>(current_bucket) + bucket_size_;
+  }
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::preallocateForBulkInsert(const std::size_t total_entries,
+                               const std::size_t total_variable_key_size,
+                               HashTablePreallocationState *prealloc_state) {
+  DEBUG_ASSERT(allow_duplicate_keys);
+  if (!key_manager_.allocateVariableLengthKeyStorage(total_variable_key_size)) 
{
+    return false;
+  }
+
+  // We use load then compare-exchange here instead of simply fetch-add,
+  // because if multiple threads are simultaneously trying to allocate more
+  // than one bucket and exceed 'header_->num_buckets', their respective
+  // rollbacks might happen in such an order that some bucket ranges get
+  // skipped, while others might get double-allocated later.
+  std::size_t original_buckets_allocated = 
header_->buckets_allocated.load(std::memory_order_relaxed);
+  std::size_t buckets_post_allocation = original_buckets_allocated + 
total_entries;
+  while ((buckets_post_allocation <= header_->num_buckets)
+         && 
!header_->buckets_allocated.compare_exchange_weak(original_buckets_allocated,
+                                                              
buckets_post_allocation,
+                                                              
std::memory_order_relaxed)) {
+    buckets_post_allocation = original_buckets_allocated + total_entries;
+  }
+
+  if (buckets_post_allocation > header_->num_buckets) {
+    key_manager_.deallocateVariableLengthKeyStorage(total_variable_key_size);
+    return false;
+  }
+
+  prealloc_state->bucket_position = original_buckets_allocated;
+  if (total_variable_key_size != 0) {
+    prealloc_state->variable_length_key_position
+        = 
key_manager_.incrementNextVariableLengthKeyOffset(total_variable_key_size);
+  }
+  return true;
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::DestroyValues(void *hash_buckets,
+                    const std::size_t num_buckets,
+                    const std::size_t bucket_size) {
+  if (!std::is_trivially_destructible<uint8_t>::value) {
+    void *value_ptr = static_cast<char*>(hash_buckets) + kValueOffset;
+    for (std::size_t bucket_num = 0;
+         bucket_num < num_buckets;
+         ++bucket_num) {
+     static_cast<uint8_t*>(value_ptr)->~uint8_t();
+      value_ptr = static_cast<char*>(value_ptr) + bucket_size;
+    }
+  }
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+inline bool FastSeparateChainingHashTable<resizable, serializable, 
force_key_copy, allow_duplicate_keys>
+    ::locateBucketForInsertion(const std::size_t hash_code,
+                               const std::size_t 
variable_key_allocation_required,
+                               void **bucket,
+                               std::atomic<std::size_t> **pending_chain_ptr,
+                               std::size_t *pending_chain_ptr_finish_value,
+                               HashTablePreallocationState *prealloc_state) {
+  DEBUG_ASSERT((prealloc_state == nullptr) || allow_duplicate_keys);
+  if (*bucket == nullptr) {
+    *pending_chain_ptr = &(slots_[hash_code % header_->num_slots]);
+  } else {
+    *pending_chain_ptr = static_cast<std::atomic<std::size_t>*>(*bucket);
+  }
+  for (;;) {
+    std::size_t existing_chain_ptr = 0;
+    if ((*pending_chain_ptr)->compare_exchange_strong(existing_chain_ptr,
+                                                      
std::numeric_limits<std::size_t>::max(),
+                                                      
std::memory_order_acq_rel)) {
+      // Got to the end of the chain. Allocate a new bucket.
+
+      // First, allocate variable-length key storage, if needed (i.e. if this
+      // is an upsert and we didn't allocate up-front).
+      if ((prealloc_state == nullptr)
+          && 
!key_manager_.allocateVariableLengthKeyStorage(variable_key_allocation_required))
 {
+        // Ran out of variable-length storage.
+        (*pending_chain_ptr)->store(0, std::memory_order_release);
+        *bucket = nullptr;
+        return false;
+      }
+
+      const std::size_t allocated_bucket_num
+          = (prealloc_state == nullptr)
+              ? header_->buckets_allocated.fetch_add(1, 
std::memory_order_relaxed)
+              : (prealloc_state->bucket_position)++;
+      if (allocated_bucket_num >= header_->num_buckets) {
+        // Ran out of buckets.
+        DEBUG_ASSERT(prealloc_state == nullptr);
+        header_->buckets_allocated.fetch_sub(1, std::memory_order_relaxed);
+        (*pending_chain_ptr)->store(0, std::memory_order_release);
+        *bucket = nullptr;
+        return false;
+      } else {
+        *bucket = static_cast<char*>(buckets_) + allocated_bucket_num * 
bucket_size_;
+        *pending_chain_ptr_finish_value = allocated_bucket_num + 1;
+        return true;
+      }
+    }
+    // Spin until the real "next" pointer is available.
+    while (existing_chain_ptr == std::numeric_limits<std::size_t>::max()) {
+      existing_chain_ptr = 
(*pending_chain_ptr)->load(std::memory_order_acquire);
+    }
+    if (existing_chain_ptr == 0) {
+      // Other thread had to roll back, so try again.
+      continue;
+    }
+    // Chase the next pointer.
+    *bucket = static_cast<char*>(buckets_) + (existing_chain_ptr - 1) * 
bucket_size_;
+    *pending_chain_ptr = static_cast<std::atomic<std::size_t>*>(*bucket);
+    if (!allow_duplicate_keys) {
+      const std::size_t hash_in_bucket
+          = *reinterpret_cast<const std::size_t*>(static_cast<const 
char*>(*bucket)
+                                                  + 
sizeof(std::atomic<std::size_t>));
+      if (hash_in_bucket == hash_code) {
+        return false;
+      }
+    }
+  }
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+inline void FastSeparateChainingHashTable<resizable, serializable, 
force_key_copy, allow_duplicate_keys>
+    ::writeScalarKeyToBucket(const TypedValue &key,
+                             const std::size_t hash_code,
+                             void *bucket,
+                             HashTablePreallocationState *prealloc_state) {
+  *reinterpret_cast<std::size_t*>(static_cast<char*>(bucket) + 
sizeof(std::atomic<std::size_t>))
+      = hash_code;
+  key_manager_.writeKeyComponentToBucket(key, 0, bucket, prealloc_state);
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+inline void FastSeparateChainingHashTable<resizable, serializable, 
force_key_copy, allow_duplicate_keys>
+    ::writeCompositeKeyToBucket(const std::vector<TypedValue> &key,
+                                const std::size_t hash_code,
+                                void *bucket,
+                                HashTablePreallocationState *prealloc_state) {
+  DEBUG_ASSERT(key.size() == this->key_types_.size());
+  *reinterpret_cast<std::size_t*>(static_cast<char*>(bucket) + 
sizeof(std::atomic<std::size_t>))
+      = hash_code;
+  for (std::size_t idx = 0;
+       idx < this->key_types_.size();
+       ++idx) {
+    key_manager_.writeKeyComponentToBucket(key[idx], idx, bucket, 
prealloc_state);
+  }
+}
+
+template <bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, 
allow_duplicate_keys>
+    ::isFull(const std::size_t extra_variable_storage) const {
+  if (header_->buckets_allocated.load(std::memory_order_relaxed) >= 
header_->num_buckets) {
+    // All buckets are allocated.
+    return true;
+  }
+
+  if (extra_variable_storage > 0) {
+    if (extra_variable_storage
+            + 
header_->variable_length_bytes_allocated.load(std::memory_order_relaxed)
+        > key_manager_.getVariableLengthKeyStorageSize()) {
+      // Not enough variable-length key storage space.
+      return true;
+    }
+  }
+
+  return false;
+}
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_SEPARATE_CHAINING_HASH_TABLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/539cb0a2/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index f1594e3..7eadae9 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -66,7 +66,7 @@ class HashTableBase {
  public:
   virtual ~HashTableBase() {
   }
-
+virtual size_t get_buckets_allocated() const {return 0;}
  protected:
   HashTableBase() {
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/539cb0a2/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
index 53fe514..17578de 100644
--- a/storage/HashTablePool.hpp
+++ b/storage/HashTablePool.hpp
@@ -27,6 +27,8 @@
 
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
+#include "storage/FastHashTableFactory.hpp"
 #include "threading/SpinMutex.hpp"
 #include "utility/Macros.hpp"
 #include "utility/StringUtil.hpp"
@@ -81,6 +83,19 @@ class HashTablePool {
         agg_handle_(DCHECK_NOTNULL(agg_handle)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
 
+  HashTablePool(const std::size_t estimated_num_entries,
+                const HashTableImplType hash_table_impl_type,
+                const std::vector<const Type *> &group_by_types,
+                const std::vector<std::size_t> &payload_sizes,
+                const std::vector<AggregationHandle *> &handles,
+                StorageManager *storage_manager)
+      : 
estimated_num_entries_(reduceEstimatedCardinality(estimated_num_entries)),
+        hash_table_impl_type_(hash_table_impl_type),
+        group_by_types_(group_by_types),
+        payload_sizes_(payload_sizes),
+        handles_(handles),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
   /**
    * @brief Check out a hash table for insertion.
    *
@@ -100,6 +115,20 @@ class HashTablePool {
     return createNewHashTable();
   }
 
+  AggregationStateHashTableBase* getHashTableFast() {
+    {
+      SpinMutexLock lock(mutex_);
+      if (!hash_tables_.empty()) {
+        std::unique_ptr<AggregationStateHashTableBase> ret_hash_table(
+            std::move(hash_tables_.back()));
+        hash_tables_.pop_back();
+        DCHECK(ret_hash_table != nullptr);
+        return ret_hash_table.release();
+      }
+    }
+    return createNewHashTableFast();
+  }
+
   /**
    * @brief Return a previously checked out hash table.
    *
@@ -134,6 +163,16 @@ class HashTablePool {
                                                storage_manager_);
   }
 
+  AggregationStateHashTableBase* createNewHashTableFast() {
+    return AggregationStateFastHashTableFactory::CreateResizable(
+                hash_table_impl_type_,
+                group_by_types_,
+                estimated_num_entries_,
+                payload_sizes_,
+                handles_,
+                storage_manager_);
+  }
+
   inline std::size_t reduceEstimatedCardinality(
       const std::size_t original_estimate) const {
     if (original_estimate < kEstimateReductionFactor) {
@@ -153,7 +192,10 @@ class HashTablePool {
 
   const std::vector<const Type *> group_by_types_;
 
+  std::vector<std::size_t> payload_sizes_;
+
   AggregationHandle *agg_handle_;
+  const std::vector<AggregationHandle *> handles_;
   StorageManager *storage_manager_;
 
   SpinMutex mutex_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/539cb0a2/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 21aa12c..50732fd 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -38,6 +38,7 @@
 #include "storage/CompressedPackedRowStoreTupleStorageSubBlock.hpp"
 #include "storage/CountedReference.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
 #include "storage/IndexSubBlock.hpp"
 #include "storage/InsertDestinationInterface.hpp"
 #include "storage/PackedRowStoreTupleStorageSubBlock.hpp"
@@ -494,6 +495,92 @@ void StorageBlock::aggregateGroupBy(
                                              hash_table);
 }
 
+
+void StorageBlock::aggregateGroupByFast(
+    const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+    const std::vector<std::unique_ptr<const Scalar>> &group_by,
+    const Predicate *predicate,
+    AggregationStateHashTableBase *hash_table,
+    std::unique_ptr<TupleIdSequence> *reuse_matches,
+    std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
+  DCHECK_GT(group_by.size(), 0u)
+      << "Called aggregateGroupBy() with zero GROUP BY expressions";
+
+  SubBlocksReference sub_blocks_ref(*tuple_store_,
+                                    indices_,
+                                    indices_consistent_);
+
+  // IDs of 'arguments' as attributes in the ValueAccessor we create below.
+  std::vector<attribute_id> arg_ids;
+  std::vector<std::vector<attribute_id>> argument_ids;
+
+  // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
+  std::vector<attribute_id> key_ids;
+
+  // An intermediate ValueAccessor that stores the materialized 'arguments' for
+  // this aggregate, as well as the GROUP BY expression values.
+  ColumnVectorsValueAccessor temp_result;
+  {
+    std::unique_ptr<ValueAccessor> accessor;
+    if (predicate) {
+      if (!*reuse_matches) {
+        // If there is a filter predicate that hasn't already been evaluated,
+        // evaluate it now and save the results for other aggregates on this
+        // same block.
+        reuse_matches->reset(getMatchesForPredicate(predicate));
+      }
+
+      // Create a filtered ValueAccessor that only iterates over predicate
+      // matches.
+      accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
+    } else {
+      // Create a ValueAccessor that iterates over all tuples in this block
+      accessor.reset(tuple_store_->createValueAccessor());
+    }
+
+    attribute_id attr_id = 0;
+
+    // First, put GROUP BY keys into 'temp_result'.
+    if (reuse_group_by_vectors->empty()) {
+      // Compute GROUP BY values from group_by Scalars, and store them in
+      // reuse_group_by_vectors for reuse by other aggregates on this same
+      // block.
+      reuse_group_by_vectors->reserve(group_by.size());
+      for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+        reuse_group_by_vectors->emplace_back(
+            group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
+        temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
+        key_ids.push_back(attr_id++);
+      }
+    } else {
+      // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
+      DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
+          << "Wrong number of reuse_group_by_vectors";
+      for (const std::unique_ptr<ColumnVector> &reuse_cv : 
*reuse_group_by_vectors) {
+        temp_result.addColumn(reuse_cv.get(), false);
+        key_ids.push_back(attr_id++);
+      }
+    }
+
+    // Compute argument vectors and add them to 'temp_result'.
+    for (const std::vector<std::unique_ptr<const Scalar>> &argument : 
arguments) {
+        arg_ids.clear();
+        for (const std::unique_ptr<const Scalar> &args : argument) {
+          temp_result.addColumn(args->getAllValues(accessor.get(), 
&sub_blocks_ref));
+          arg_ids.push_back(attr_id++);
+        }
+        argument_ids.push_back(arg_ids);
+     }
+  }
+
+  static_cast<AggregationStateFastHashTable 
*>(hash_table)->upsertValueAccessorCompositeKeyFast(
+      argument_ids,
+      &temp_result,
+      key_ids,
+      true);
+}
+
+
 void StorageBlock::aggregateDistinct(
     const AggregationHandle &handle,
     const std::vector<std::unique_ptr<const Scalar>> &arguments,
@@ -582,7 +669,6 @@ void StorageBlock::aggregateDistinct(
       &temp_result, key_ids, distinctify_hash_table);
 }
 
-
 // TODO(chasseur): Vectorization for updates.
 StorageBlock::UpdateResult StorageBlock::update(
     const unordered_map<attribute_id, unique_ptr<const Scalar>> &assignments,


Reply via email to