Adds backend support for hash semi/anti joins. (#164) * Added implementations for HashSemiJoin and HashAntiJoin operators.
* Added component in ExecutionGenerator to convert semi/anti join nodes into relational operators. * Add 'this' pointer in anonymous functionfor gcc build Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a39ad965 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a39ad965 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a39ad965 Branch: refs/heads/master Commit: a39ad9654f10fbe67a90cb7b65dc30dc797f55b3 Parents: 914f2d8 Author: Jianqiao Zhu <[email protected]> Authored: Thu Apr 14 16:28:24 2016 -0500 Committer: Jignesh Patel <[email protected]> Committed: Thu Apr 14 16:28:24 2016 -0500 ---------------------------------------------------------------------- query_optimizer/ExecutionGenerator.cpp | 42 +- .../tests/execution_generator/Select.test | 43 +++ relational_operators/CMakeLists.txt | 2 + relational_operators/HashJoinOperator.cpp | 360 +++++++++++++++-- relational_operators/HashJoinOperator.hpp | 350 ++++++++++++++--- relational_operators/WorkOrder.proto | 58 ++- relational_operators/WorkOrderFactory.cpp | 211 ++++++++-- storage/HashTable.hpp | 387 +++++++++++++++++++ storage/LinearOpenAddressingHashTable.hpp | 72 ++++ storage/SeparateChainingHashTable.hpp | 54 +++ .../SimpleScalarSeparateChainingHashTable.hpp | 37 ++ storage/StorageBlock.hpp | 20 + 12 files changed, 1506 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index cf90be7..aa6b0dc 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -23,6 +23,7 @@ #include <cstddef> #include <memory> #include <string> +#include <type_traits> #include <unordered_map> #include <utility> #include <vector> @@ -570,16 +571,19 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { key_types.push_back(&left_attribute_type); } - // Choose the smaller table as the inner build table, - // and the other one as the outer probe table. std::size_t probe_cardinality = cost_model_->estimateCardinality(probe_physical); std::size_t build_cardinality = cost_model_->estimateCardinality(build_physical); - if (probe_cardinality < build_cardinality) { - // Switch the probe and build physical nodes. - std::swap(probe_physical, build_physical); - std::swap(probe_cardinality, build_cardinality); - std::swap(probe_attribute_ids, build_attribute_ids); - std::swap(any_probe_attributes_nullable, any_build_attributes_nullable); + // For inner join, we may swap the probe table and the build table. + if (physical_plan->join_type() == P::HashJoin::JoinType::kInnerJoin) { + // Choose the smaller table as the inner build table, + // and the other one as the outer probe table. + if (probe_cardinality < build_cardinality) { + // Switch the probe and build physical nodes. + std::swap(probe_physical, build_physical); + std::swap(probe_cardinality, build_cardinality); + std::swap(probe_attribute_ids, build_attribute_ids); + std::swap(any_probe_attributes_nullable, any_build_attributes_nullable); + } } // Convert the residual predicate proto. @@ -647,6 +651,25 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { &output_relation, insert_destination_proto); + // Get JoinType + HashJoinOperator::JoinType join_type; + switch (physical_plan->join_type()) { + case P::HashJoin::JoinType::kInnerJoin: + join_type = HashJoinOperator::JoinType::kInnerJoin; + break; + case P::HashJoin::JoinType::kLeftSemiJoin: + join_type = HashJoinOperator::JoinType::kLeftSemiJoin; + break; + case P::HashJoin::JoinType::kLeftAntiJoin: + join_type = HashJoinOperator::JoinType::kLeftAntiJoin; + break; + default: + LOG(FATAL) << "Invalid physical::HashJoin::JoinType: " + << static_cast<typename std::underlying_type<P::HashJoin::JoinType>::type>( + physical_plan->join_type()); + } + + // Create hash join operator const QueryPlan::DAGNodeIndex join_operator_index = execution_plan_->addRelationalOperator( new HashJoinOperator( @@ -659,7 +682,8 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { insert_destination_index, join_hash_table_index, residual_predicate_index, - project_expressions_group_index)); + project_expressions_group_index, + join_type)); insert_destination_proto->set_relational_op_index(join_operator_index); const QueryPlan::DAGNodeIndex destroy_operator_index = http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/query_optimizer/tests/execution_generator/Select.test ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test index e1614cb..9bfa27c 100644 --- a/query_optimizer/tests/execution_generator/Select.test +++ b/query_optimizer/tests/execution_generator/Select.test @@ -721,6 +721,49 @@ WHERE CASE WHEN i < j THEN i +-----------+-----------+ == +SELECT i AS odd +FROM generate_series(0, 10, 1) AS gs1(i) +WHERE + NOT EXISTS ( + SELECT * + FROM generate_series(0, 10, 2) AS gs2(even) + WHERE i = even + ); +-- ++-----------+ +|odd | ++-----------+ +| 1| +| 3| +| 5| +| 7| +| 9| ++-----------+ +== + +SELECT i +FROM generate_series(0, 100, 3) AS gs1(i) +WHERE + EXISTS ( + SELECT * + FROM generate_series(0, 100, 5) AS gs2(i) + WHERE gs1.i = gs2.i + ) + AND NOT EXISTS ( + SELECT * + FROM generate_series(0, 100, 10) AS gs3(i) + WHERE gs1.i = gs3.i + ) + AND (i < 40 OR i > 60); +-- ++-----------+ +|i | ++-----------+ +| 15| +| 75| ++-----------+ +== + # TODO(team): Support uncorrelated queries. # SELECT COUNT(*) # FROM test http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index 17a9a6f..b02bc6b 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -176,11 +176,13 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager + quickstep_storage_SubBlocksReference quickstep_storage_TupleReference quickstep_storage_TupleStorageSubBlock quickstep_storage_ValueAccessor quickstep_types_containers_ColumnVectorsValueAccessor quickstep_utility_Macros + quickstep_utility_PtrList tmb) target_link_libraries(quickstep_relationaloperators_InsertOperator glog http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/HashJoinOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp index f7bbf38..e0076e3 100644 --- a/relational_operators/HashJoinOperator.cpp +++ b/relational_operators/HashJoinOperator.cpp @@ -35,6 +35,7 @@ #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" +#include "storage/SubBlocksReference.hpp" #include "storage/TupleReference.hpp" #include "storage/TupleStorageSubBlock.hpp" #include "storage/ValueAccessor.hpp" @@ -85,7 +86,7 @@ class MapBasedJoinedTupleCollector { // Consolidation is a no-op for this version, but we provide this trivial // call so that MapBasedJoinedTupleCollector and // VectorBasedJoinedTupleCollector have the same interface and can both be - // used in the templated HashJoinWorkOrder::executeWithCollectorType() method. + // used in the templated HashInnerJoinWorkOrder::executeWithCollectorType() method. inline void consolidate() const { } @@ -183,6 +184,25 @@ class VectorBasedJoinedTupleCollector { consolidated_joined_tuples_; }; +class SemiAntiJoinTupleCollector { + public: + explicit SemiAntiJoinTupleCollector(const TupleStorageSubBlock &tuple_store) { + filter_.reset(tuple_store.getExistenceMap()); + } + + template <typename ValueAccessorT> + inline void operator()(const ValueAccessorT &accessor) { + filter_->set(accessor.getCurrentPosition(), false); + } + + const TupleIdSequence* filter() const { + return filter_.get(); + } + + private: + std::unique_ptr<TupleIdSequence> filter_; +}; + } // namespace bool HashJoinOperator::getAllWorkOrders( @@ -191,31 +211,53 @@ bool HashJoinOperator::getAllWorkOrders( StorageManager *storage_manager, const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) { + switch (join_type_) { + case JoinType::kInnerJoin: + return getAllNonOuterJoinWorkOrders<HashInnerJoinWorkOrder>( + container, query_context, storage_manager); + case JoinType::kLeftSemiJoin: + return getAllNonOuterJoinWorkOrders<HashSemiJoinWorkOrder>( + container, query_context, storage_manager); + case JoinType::kLeftAntiJoin: + return getAllNonOuterJoinWorkOrders<HashAntiJoinWorkOrder>( + container, query_context, storage_manager); + default: + LOG(FATAL) << "Unknown join type in HashJoinOperator::getAllWorkOrders()"; + } +} + +template <class JoinWorkOrderClass> +bool HashJoinOperator::getAllNonOuterJoinWorkOrders( + WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager) { // We wait until the building of global hash table is complete. if (blocking_dependencies_met_) { DCHECK(query_context != nullptr); - const Predicate *residual_predicate = query_context->getPredicate(residual_predicate_index_); + const Predicate *residual_predicate = + query_context->getPredicate(residual_predicate_index_); const vector<unique_ptr<const Scalar>> &selection = query_context->getScalarGroup(selection_index_); InsertDestination *output_destination = query_context->getInsertDestination(output_destination_index_); - JoinHashTable *hash_table = query_context->getJoinHashTable(hash_table_index_); + const JoinHashTable &hash_table = + *(query_context->getJoinHashTable(hash_table_index_)); if (probe_relation_is_stored_) { if (!started_) { for (const block_id probe_block_id : probe_relation_block_ids_) { container->addNormalWorkOrder( - new HashJoinWorkOrder(build_relation_, - probe_relation_, - join_key_attributes_, - any_join_key_attributes_nullable_, - probe_block_id, - residual_predicate, - selection, - output_destination, - hash_table, - storage_manager), + new JoinWorkOrderClass(build_relation_, + probe_relation_, + join_key_attributes_, + any_join_key_attributes_nullable_, + probe_block_id, + residual_predicate, + selection, + hash_table, + output_destination, + storage_manager), op_index_); } started_ = true; @@ -224,27 +266,26 @@ bool HashJoinOperator::getAllWorkOrders( } else { while (num_workorders_generated_ < probe_relation_block_ids_.size()) { container->addNormalWorkOrder( - new HashJoinWorkOrder( - build_relation_, - probe_relation_, - join_key_attributes_, - any_join_key_attributes_nullable_, - probe_relation_block_ids_[num_workorders_generated_], - residual_predicate, - selection, - output_destination, - hash_table, - storage_manager), + new JoinWorkOrderClass(build_relation_, + probe_relation_, + join_key_attributes_, + any_join_key_attributes_nullable_, + probe_relation_block_ids_[num_workorders_generated_], + residual_predicate, + selection, + hash_table, + output_destination, + storage_manager), op_index_); ++num_workorders_generated_; } // end while return done_feeding_input_relation_; - } // end else (input_relation_is_stored is false) - } // end if (blocking_dependencies_met) + } // end else (probe_relation_is_stored_) + } // end if (blocking_dependencies_met_) return false; } -void HashJoinWorkOrder::execute() { +void HashInnerJoinWorkOrder::execute() { if (FLAGS_vector_based_joined_tuple_collector) { executeWithCollectorType<VectorBasedJoinedTupleCollector>(); } else { @@ -253,7 +294,7 @@ void HashJoinWorkOrder::execute() { } template <typename CollectorT> -void HashJoinWorkOrder::executeWithCollectorType() { +void HashInnerJoinWorkOrder::executeWithCollectorType() { BlockReference probe_block( storage_manager_->getBlock(block_id_, probe_relation_)); const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); @@ -261,13 +302,13 @@ void HashJoinWorkOrder::executeWithCollectorType() { std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); CollectorT collector; if (join_key_attributes_.size() == 1) { - hash_table_->getAllFromValueAccessor( + hash_table_.getAllFromValueAccessor( probe_accessor.get(), join_key_attributes_.front(), any_join_key_attributes_nullable_, &collector); } else { - hash_table_->getAllFromValueAccessorCompositeKey( + hash_table_.getAllFromValueAccessorCompositeKey( probe_accessor.get(), join_key_attributes_, any_join_key_attributes_nullable_, @@ -348,4 +389,263 @@ void HashJoinWorkOrder::executeWithCollectorType() { } } +void HashSemiJoinWorkOrder::execute() { + if (residual_predicate_ == nullptr) { + executeWithoutResidualPredicate(); + } else { + executeWithResidualPredicate(); + } +} + +void HashSemiJoinWorkOrder::executeWithResidualPredicate() { + const relation_id build_relation_id = build_relation_.getID(); + const relation_id probe_relation_id = probe_relation_.getID(); + + BlockReference probe_block = storage_manager_->getBlock(block_id_, + probe_relation_); + const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); + + std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); + + // TODO(harshad) - Make this function work with both types of collectors. + + // We collect all the matching probe relation tuples, as there's a residual + // preidcate that needs to be applied after collecting these matches. + MapBasedJoinedTupleCollector collector; + if (join_key_attributes_.size() == 1) { + hash_table_.getAllFromValueAccessor( + probe_accessor.get(), + join_key_attributes_.front(), + any_join_key_attributes_nullable_, + &collector); + } else { + hash_table_.getAllFromValueAccessorCompositeKey( + probe_accessor.get(), + join_key_attributes_, + any_join_key_attributes_nullable_, + &collector); + } + + // Get a filter for tuples in the given probe block. + TupleIdSequence filter(probe_store.getMaxTupleID() + 1); + filter.setRange(0, filter.length(), false); + for (const std::pair<const block_id, + std::vector<std::pair<tuple_id, tuple_id>>> + &build_block_entry : *collector.getJoinedTuples()) { + // First element of the pair build_block_entry is the build block ID + // 2nd element of the pair is a vector of pairs, in each of which - + // 1st element is a matching tuple ID from the inner (build) relation. + // 2nd element is a matching tuple ID from the outer (probe) relation. + + // Get the block from the build relation for this pair of matched tuples. + BlockReference build_block = + storage_manager_->getBlock(build_block_entry.first, build_relation_); + const TupleStorageSubBlock &build_store = + build_block->getTupleStorageSubBlock(); + std::unique_ptr<ValueAccessor> build_accessor( + build_store.createValueAccessor()); + for (const std::pair<tuple_id, tuple_id> &hash_match + : build_block_entry.second) { + // For each pair, 1st element is a tuple ID from the build relation in the + // given build block, 2nd element is a tuple ID from the probe relation. + if (filter.get(hash_match.second)) { + // We have already found matches for this tuple that belongs to the + // probe side, skip it. + continue; + } + if (residual_predicate_->matchesForJoinedTuples(*build_accessor, + build_relation_id, + hash_match.first, + *probe_accessor, + probe_relation_id, + hash_match.second)) { + filter.set(hash_match.second, true); + } + } + } + + SubBlocksReference sub_blocks_ref(probe_store, + probe_block->getIndices(), + probe_block->getIndicesConsistent()); + + std::unique_ptr<ValueAccessor> probe_accessor_with_filter( + probe_store.createValueAccessor(&filter)); + ColumnVectorsValueAccessor temp_result; + for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin(); + selection_it != selection_.end(); + ++selection_it) { + temp_result.addColumn((*selection_it)->getAllValues( + probe_accessor_with_filter.get(), &sub_blocks_ref)); + } + + output_destination_->bulkInsertTuples(&temp_result); +} + +void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() { + DCHECK(residual_predicate_ == nullptr); + + BlockReference probe_block = storage_manager_->getBlock(block_id_, + probe_relation_); + const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); + + std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); + SemiAntiJoinTupleCollector collector(probe_store); + // We collect all the probe relation tuples which have at least one matching + // tuple in the build relation. As a performance optimization, the hash table + // just looks for the existence of the probing key in the hash table and sets + // the bit for the probing key in the collector. The optimization works + // because there is no residual predicate in this case, unlike + // executeWithResidualPredicate(). + if (join_key_attributes_.size() == 1u) { + // Call the collector to set the bit to 0 for every key without a match. + hash_table_.runOverKeysFromValueAccessorIfMatchNotFound( + probe_accessor.get(), + join_key_attributes_.front(), + any_join_key_attributes_nullable_, + &collector); + } else { + // Call the collector to set the bit to 0 for every key without a match. + hash_table_.runOverKeysFromValueAccessorIfMatchNotFoundCompositeKey( + probe_accessor.get(), + join_key_attributes_, + any_join_key_attributes_nullable_, + &collector); + } + + SubBlocksReference sub_blocks_ref(probe_store, + probe_block->getIndices(), + probe_block->getIndicesConsistent()); + + std::unique_ptr<ValueAccessor> probe_accessor_with_filter( + probe_store.createValueAccessor(collector.filter())); + ColumnVectorsValueAccessor temp_result; + for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin(); + selection_it != selection_.end(); ++selection_it) { + temp_result.addColumn((*selection_it)->getAllValues( + probe_accessor_with_filter.get(), &sub_blocks_ref)); + } + + output_destination_->bulkInsertTuples(&temp_result); +} + +void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() { + DCHECK(residual_predicate_ == nullptr); + + BlockReference probe_block = storage_manager_->getBlock(block_id_, + probe_relation_); + const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); + + std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); + SemiAntiJoinTupleCollector collector(probe_store); + // We probe the hash table to find the keys which have an entry in the + // hash table. + if (join_key_attributes_.size() == 1) { + // Call the collector to set the bit to 0 for every key with a match. + hash_table_.runOverKeysFromValueAccessorIfMatchFound( + probe_accessor.get(), + join_key_attributes_.front(), + any_join_key_attributes_nullable_, + &collector); + } else { + // Call the collector to set the bit to 0 for every key with a match. + hash_table_.runOverKeysFromValueAccessorIfMatchFoundCompositeKey( + probe_accessor.get(), + join_key_attributes_, + any_join_key_attributes_nullable_, + &collector); + } + + SubBlocksReference sub_blocks_ref(probe_store, + probe_block->getIndices(), + probe_block->getIndicesConsistent()); + + std::unique_ptr<ValueAccessor> probe_accessor_with_filter( + probe_store.createValueAccessor(collector.filter())); + ColumnVectorsValueAccessor temp_result; + for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin(); + selection_it != selection_.end(); ++selection_it) { + temp_result.addColumn((*selection_it)->getAllValues( + probe_accessor_with_filter.get(), &sub_blocks_ref)); + } + + output_destination_->bulkInsertTuples(&temp_result); +} + +void HashAntiJoinWorkOrder::executeWithResidualPredicate() { + const relation_id build_relation_id = build_relation_.getID(); + const relation_id probe_relation_id = probe_relation_.getID(); + + BlockReference probe_block = storage_manager_->getBlock(block_id_, + probe_relation_); + const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); + + std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); + // TODO(harshad) - Make the following code work with both types of collectors. + MapBasedJoinedTupleCollector collector; + // We probe the hash table and get all the matches. Unlike + // executeWithoutResidualPredicate(), we have to collect all the matching + // tuples, because after this step we still have to evalute the residual + // predicate. + if (join_key_attributes_.size() == 1) { + hash_table_.getAllFromValueAccessor( + probe_accessor.get(), + join_key_attributes_.front(), + any_join_key_attributes_nullable_, + &collector); + } else { + hash_table_.getAllFromValueAccessorCompositeKey( + probe_accessor.get(), + join_key_attributes_, + any_join_key_attributes_nullable_, + &collector); + } + + // Create a filter for all the tuples from the given probe block. + std::unique_ptr<TupleIdSequence> filter(probe_store.getExistenceMap()); + for (const std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>> + &build_block_entry : *collector.getJoinedTuples()) { + // First element of the pair build_block_entry is the build block ID + // 2nd element of the pair is a vector of pairs, in each of which - + // 1st element is a matching tuple ID from the inner (build) relation. + // 2nd element is a matching tuple ID from the outer (probe) relation. + BlockReference build_block = storage_manager_->getBlock(build_block_entry.first, + build_relation_); + const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock(); + std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor()); + for (const std::pair<tuple_id, tuple_id> &hash_match + : build_block_entry.second) { + if (!filter->get(hash_match.second)) { + // We have already seen this tuple, skip it. + continue; + } + if (residual_predicate_->matchesForJoinedTuples(*build_accessor, + build_relation_id, + hash_match.first, + *probe_accessor, + probe_relation_id, + hash_match.second)) { + // Note that the filter marks a match as false, as needed by the anti + // join definition. + filter->set(hash_match.second, false); + } + } + } + + SubBlocksReference sub_blocks_ref(probe_store, + probe_block->getIndices(), + probe_block->getIndicesConsistent()); + + std::unique_ptr<ValueAccessor> probe_accessor_with_filter( + probe_store.createValueAccessor(filter.get())); + ColumnVectorsValueAccessor temp_result; + for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin(); + selection_it != selection_.end(); + ++selection_it) { + temp_result.addColumn((*selection_it)->getAllValues(probe_accessor_with_filter.get(), + &sub_blocks_ref)); + } + + output_destination_->bulkInsertTuples(&temp_result); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/HashJoinOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp index a00e590..c22f435 100644 --- a/relational_operators/HashJoinOperator.hpp +++ b/relational_operators/HashJoinOperator.hpp @@ -31,6 +31,7 @@ #include "storage/HashTable.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" +#include "utility/PtrList.hpp" #include "glog/logging.h" @@ -52,11 +53,17 @@ class WorkOrdersContainer; */ /** - * @brief An operator which performs a hash-join on two - * relations. + * @brief An operator which performs a hash-join, including inner-join, + * semi-join, anti-join and outer-join on two relations. **/ class HashJoinOperator : public RelationalOperator { public: + enum class JoinType { + kInnerJoin = 0, + kLeftSemiJoin, + kLeftAntiJoin + }; + /** * @brief Constructor. * @@ -98,6 +105,7 @@ class HashJoinOperator : public RelationalOperator { * corresponding to the attributes of the relation referred by * output_relation_id. Each Scalar is evaluated for the joined tuples, * and the resulting value is inserted into the join result. + * @param join_type The type of join corresponding to this operator. **/ HashJoinOperator(const CatalogRelation &build_relation, const CatalogRelation &probe_relation, @@ -108,21 +116,24 @@ class HashJoinOperator : public RelationalOperator { const QueryContext::insert_destination_id output_destination_index, const QueryContext::join_hash_table_id hash_table_index, const QueryContext::predicate_id residual_predicate_index, - const QueryContext::scalar_group_id selection_index) - : build_relation_(build_relation), - probe_relation_(probe_relation), - probe_relation_is_stored_(probe_relation_is_stored), - join_key_attributes_(join_key_attributes), - any_join_key_attributes_nullable_(any_join_key_attributes_nullable), - output_relation_(output_relation), - output_destination_index_(output_destination_index), - hash_table_index_(hash_table_index), - residual_predicate_index_(residual_predicate_index), - selection_index_(selection_index), - probe_relation_block_ids_(probe_relation_is_stored ? probe_relation.getBlocksSnapshot() - : std::vector<block_id>()), - num_workorders_generated_(0), - started_(false) {} + const QueryContext::scalar_group_id selection_index, + const JoinType join_type = JoinType::kInnerJoin) + : build_relation_(build_relation), + probe_relation_(probe_relation), + probe_relation_is_stored_(probe_relation_is_stored), + join_key_attributes_(join_key_attributes), + any_join_key_attributes_nullable_(any_join_key_attributes_nullable), + output_relation_(output_relation), + output_destination_index_(output_destination_index), + hash_table_index_(hash_table_index), + residual_predicate_index_(residual_predicate_index), + selection_index_(selection_index), + join_type_(join_type), + probe_relation_block_ids_(probe_relation_is_stored + ? probe_relation.getBlocksSnapshot() + : std::vector<block_id>()), + num_workorders_generated_(0), + started_(false) {} ~HashJoinOperator() override {} @@ -164,6 +175,11 @@ class HashJoinOperator : public RelationalOperator { } private: + template <class JoinWorkOrderClass> + bool getAllNonOuterJoinWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager); + const CatalogRelation &build_relation_; const CatalogRelation &probe_relation_; const bool probe_relation_is_stored_; @@ -174,6 +190,7 @@ class HashJoinOperator : public RelationalOperator { const QueryContext::join_hash_table_id hash_table_index_; const QueryContext::predicate_id residual_predicate_index_; const QueryContext::scalar_group_id selection_index_; + const JoinType join_type_; std::vector<block_id> probe_relation_block_ids_; std::size_t num_workorders_generated_; @@ -184,9 +201,9 @@ class HashJoinOperator : public RelationalOperator { }; /** - * @brief A WorkOrder produced by HashJoinOperator. + * @brief An inner join WorkOrder produced by HashJoinOperator. **/ -class HashJoinWorkOrder : public WorkOrder { +class HashInnerJoinWorkOrder : public WorkOrder { public: /** * @brief Constructor. @@ -206,20 +223,20 @@ class HashJoinWorkOrder : public WorkOrder { * @param selection A list of Scalars corresponding to the relation attributes * in \c output_destination. Each Scalar is evaluated for the joined * tuples, and the resulting value is inserted into the join result. - * @param output_destination The InsertDestination to insert the join results. * @param hash_table The JoinHashTable to use. + * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. **/ - HashJoinWorkOrder(const CatalogRelationSchema &build_relation, - const CatalogRelationSchema &probe_relation, - const std::vector<attribute_id> &join_key_attributes, - const bool any_join_key_attributes_nullable, - const block_id lookup_block_id, - const Predicate *residual_predicate, - const std::vector<std::unique_ptr<const Scalar>> &selection, - InsertDestination *output_destination, - JoinHashTable *hash_table, - StorageManager *storage_manager) + HashInnerJoinWorkOrder(const CatalogRelationSchema &build_relation, + const CatalogRelationSchema &probe_relation, + const std::vector<attribute_id> &join_key_attributes, + const bool any_join_key_attributes_nullable, + const block_id lookup_block_id, + const Predicate *residual_predicate, + const std::vector<std::unique_ptr<const Scalar>> &selection, + const JoinHashTable &hash_table, + InsertDestination *output_destination, + StorageManager *storage_manager) : build_relation_(build_relation), probe_relation_(probe_relation), join_key_attributes_(join_key_attributes), @@ -227,8 +244,8 @@ class HashJoinWorkOrder : public WorkOrder { block_id_(lookup_block_id), residual_predicate_(residual_predicate), selection_(selection), + hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - hash_table_(DCHECK_NOTNULL(hash_table)), storage_manager_(DCHECK_NOTNULL(storage_manager)) {} /** @@ -249,20 +266,20 @@ class HashJoinWorkOrder : public WorkOrder { * @param selection A list of Scalars corresponding to the relation attributes * in \c output_destination. Each Scalar is evaluated for the joined * tuples, and the resulting value is inserted into the join result. - * @param output_destination The InsertDestination to insert the join results. * @param hash_table The JoinHashTable to use. + * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. **/ - HashJoinWorkOrder(const CatalogRelationSchema &build_relation, - const CatalogRelationSchema &probe_relation, - std::vector<attribute_id> &&join_key_attributes, - const bool any_join_key_attributes_nullable, - const block_id lookup_block_id, - const Predicate *residual_predicate, - const std::vector<std::unique_ptr<const Scalar>> &selection, - InsertDestination *output_destination, - JoinHashTable *hash_table, - StorageManager *storage_manager) + HashInnerJoinWorkOrder(const CatalogRelationSchema &build_relation, + const CatalogRelationSchema &probe_relation, + std::vector<attribute_id> &&join_key_attributes, + const bool any_join_key_attributes_nullable, + const block_id lookup_block_id, + const Predicate *residual_predicate, + const std::vector<std::unique_ptr<const Scalar>> &selection, + const JoinHashTable &hash_table, + InsertDestination *output_destination, + StorageManager *storage_manager) : build_relation_(build_relation), probe_relation_(probe_relation), join_key_attributes_(std::move(join_key_attributes)), @@ -270,11 +287,11 @@ class HashJoinWorkOrder : public WorkOrder { block_id_(lookup_block_id), residual_predicate_(residual_predicate), selection_(selection), + hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - hash_table_(DCHECK_NOTNULL(hash_table)), storage_manager_(DCHECK_NOTNULL(storage_manager)) {} - ~HashJoinWorkOrder() override {} + ~HashInnerJoinWorkOrder() override {} /** * @exception TupleTooLargeForBlock A tuple produced by this join was too @@ -290,18 +307,257 @@ class HashJoinWorkOrder : public WorkOrder { template <typename CollectorT> void executeWithCollectorType(); - const CatalogRelationSchema &build_relation_, &probe_relation_; + const CatalogRelationSchema &build_relation_; + const CatalogRelationSchema &probe_relation_; + const std::vector<attribute_id> join_key_attributes_; + const bool any_join_key_attributes_nullable_; + const block_id block_id_; + const Predicate *residual_predicate_; + const std::vector<std::unique_ptr<const Scalar>> &selection_; + const JoinHashTable &hash_table_; + + InsertDestination *output_destination_; + StorageManager *storage_manager_; + + DISALLOW_COPY_AND_ASSIGN(HashInnerJoinWorkOrder); +}; + +/** + * @brief A left semi-join WorkOrder produced by the HashJoinOperator to execute + * EXISTS() clause. + **/ +class HashSemiJoinWorkOrder : public WorkOrder { + public: + /** + * @brief Constructor. + * + * @param build_relation The relation that the hash table was originally built + * on (i.e. the inner relation in the join). + * @param probe_relation The relation to probe the hash table with (i.e. the + * outer relation in the join). + * @param join_key_attributes The IDs of equijoin attributes in \c + * probe_relation. + * @param any_join_key_attributes_nullable If any attribute is nullable. + * @param lookup_block_id The block id of the probe_relation. + * @param residual_predicate If non-null, apply as an additional filter to + * pairs of tuples that match the hash-join (i.e. key equality) + * predicate. Effectively, this makes the join predicate the + * conjunction of the key-equality predicate and residual_predicate. + * @param selection A list of Scalars corresponding to the relation attributes + * in \c output_destination. Each Scalar is evaluated for the joined + * tuples, and the resulting value is inserted into the join result. + * @param hash_table The JoinHashTable to use. + * @param output_destination The InsertDestination to insert the join results. + * @param storage_manager The StorageManager to use. + **/ + HashSemiJoinWorkOrder(const CatalogRelationSchema &build_relation, + const CatalogRelationSchema &probe_relation, + const std::vector<attribute_id> &join_key_attributes, + const bool any_join_key_attributes_nullable, + const block_id lookup_block_id, + const Predicate *residual_predicate, + const std::vector<std::unique_ptr<const Scalar>> &selection, + const JoinHashTable &hash_table, + InsertDestination *output_destination, + StorageManager *storage_manager) + : build_relation_(build_relation), + probe_relation_(probe_relation), + join_key_attributes_(join_key_attributes), + any_join_key_attributes_nullable_(any_join_key_attributes_nullable), + block_id_(lookup_block_id), + residual_predicate_(residual_predicate), + selection_(selection), + hash_table_(hash_table), + output_destination_(DCHECK_NOTNULL(output_destination)), + storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + + /** + * @brief Constructor for the distributed version. + * + * @param build_relation The relation that the hash table was originally built + * on (i.e. the inner relation in the join). + * @param probe_relation The relation to probe the hash table with (i.e. the + * outer relation in the join). + * @param join_key_attributes The IDs of equijoin attributes in \c + * probe_relation. + * @param any_join_key_attributes_nullable If any attribute is nullable. + * @param lookup_block_id The block id of the probe_relation. + * @param residual_predicate If non-null, apply as an additional filter to + * pairs of tuples that match the hash-join (i.e. key equality) + * predicate. Effectively, this makes the join predicate the + * conjunction of the key-equality predicate and residual_predicate. + * @param selection A list of Scalars corresponding to the relation attributes + * in \c output_destination. Each Scalar is evaluated for the joined + * tuples, and the resulting value is inserted into the join result. + * @param hash_table The JoinHashTable to use. + * @param output_destination The InsertDestination to insert the join results. + * @param storage_manager The StorageManager to use. + **/ + HashSemiJoinWorkOrder(const CatalogRelationSchema &build_relation, + const CatalogRelationSchema &probe_relation, + std::vector<attribute_id> &&join_key_attributes, + const bool any_join_key_attributes_nullable, + const block_id lookup_block_id, + const Predicate *residual_predicate, + const std::vector<std::unique_ptr<const Scalar>> &selection, + const JoinHashTable &hash_table, + InsertDestination *output_destination, + StorageManager *storage_manager) + : build_relation_(build_relation), + probe_relation_(probe_relation), + join_key_attributes_(std::move(join_key_attributes)), + any_join_key_attributes_nullable_(any_join_key_attributes_nullable), + block_id_(lookup_block_id), + residual_predicate_(residual_predicate), + selection_(selection), + hash_table_(hash_table), + output_destination_(DCHECK_NOTNULL(output_destination)), + storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + + ~HashSemiJoinWorkOrder() override {} + + void execute() override; + + private: + void executeWithoutResidualPredicate(); + + void executeWithResidualPredicate(); + + const CatalogRelationSchema &build_relation_; + const CatalogRelationSchema &probe_relation_; + const std::vector<attribute_id> join_key_attributes_; + const bool any_join_key_attributes_nullable_; + const block_id block_id_; + const Predicate *residual_predicate_; + const std::vector<std::unique_ptr<const Scalar>> &selection_; + const JoinHashTable &hash_table_; + + InsertDestination *output_destination_; + StorageManager *storage_manager_; + + DISALLOW_COPY_AND_ASSIGN(HashSemiJoinWorkOrder); +}; + +/** + * @brief A left anti-join WorkOrder produced by the HashJoinOperator to execute + * NOT EXISTS() clause. + **/ +class HashAntiJoinWorkOrder : public WorkOrder { + public: + /** + * @brief Constructor. + * + * @param build_relation The relation that the hash table was originally built + * on (i.e. the inner relation in the join). + * @param probe_relation The relation to probe the hash table with (i.e. the + * outer relation in the join). + * @param join_key_attributes The IDs of equijoin attributes in \c + * probe_relation. + * @param any_join_key_attributes_nullable If any attribute is nullable. + * @param lookup_block_id The block id of the probe_relation. + * @param residual_predicate If non-null, apply as an additional filter to + * pairs of tuples that match the hash-join (i.e. key equality) + * predicate. Effectively, this makes the join predicate the + * conjunction of the key-equality predicate and residual_predicate. + * @param selection A list of Scalars corresponding to the relation attributes + * in \c output_destination. Each Scalar is evaluated for the joined + * tuples, and the resulting value is inserted into the join result. + * @param hash_table The JoinHashTable to use. + * @param output_destination The InsertDestination to insert the join results. + * @param storage_manager The StorageManager to use. + **/ + HashAntiJoinWorkOrder(const CatalogRelationSchema &build_relation, + const CatalogRelationSchema &probe_relation, + const std::vector<attribute_id> &join_key_attributes, + const bool any_join_key_attributes_nullable, + const block_id lookup_block_id, + const Predicate *residual_predicate, + const std::vector<std::unique_ptr<const Scalar>> &selection, + const JoinHashTable &hash_table, + InsertDestination *output_destination, + StorageManager *storage_manager) + : build_relation_(build_relation), + probe_relation_(probe_relation), + join_key_attributes_(join_key_attributes), + any_join_key_attributes_nullable_(any_join_key_attributes_nullable), + block_id_(lookup_block_id), + residual_predicate_(residual_predicate), + selection_(selection), + hash_table_(hash_table), + output_destination_(DCHECK_NOTNULL(output_destination)), + storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + + /** + * @brief Constructor for the distributed version. + * + * @param build_relation The relation that the hash table was originally built + * on (i.e. the inner relation in the join). + * @param probe_relation The relation to probe the hash table with (i.e. the + * outer relation in the join). + * @param join_key_attributes The IDs of equijoin attributes in \c + * probe_relation. + * @param any_join_key_attributes_nullable If any attribute is nullable. + * @param lookup_block_id The block id of the probe_relation. + * @param residual_predicate If non-null, apply as an additional filter to + * pairs of tuples that match the hash-join (i.e. key equality) + * predicate. Effectively, this makes the join predicate the + * conjunction of the key-equality predicate and residual_predicate. + * @param selection A list of Scalars corresponding to the relation attributes + * in \c output_destination. Each Scalar is evaluated for the joined + * tuples, and the resulting value is inserted into the join result. + * @param hash_table The JoinHashTable to use. + * @param output_destination The InsertDestination to insert the join results. + * @param storage_manager The StorageManager to use. + **/ + HashAntiJoinWorkOrder(const CatalogRelationSchema &build_relation, + const CatalogRelationSchema &probe_relation, + std::vector<attribute_id> &&join_key_attributes, + const bool any_join_key_attributes_nullable, + const block_id lookup_block_id, + const Predicate *residual_predicate, + const std::vector<std::unique_ptr<const Scalar>> &selection, + const JoinHashTable &hash_table, + InsertDestination *output_destination, + StorageManager *storage_manager) + : build_relation_(build_relation), + probe_relation_(probe_relation), + join_key_attributes_(join_key_attributes), + any_join_key_attributes_nullable_(any_join_key_attributes_nullable), + block_id_(lookup_block_id), + residual_predicate_(residual_predicate), + selection_(selection), + hash_table_(hash_table), + output_destination_(DCHECK_NOTNULL(output_destination)), + storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + + ~HashAntiJoinWorkOrder() override {} + + void execute() override { + if (residual_predicate_ == nullptr) { + executeWithoutResidualPredicate(); + } else { + executeWithResidualPredicate(); + } + } + + private: + void executeWithoutResidualPredicate(); + + void executeWithResidualPredicate(); + + const CatalogRelationSchema &build_relation_; + const CatalogRelationSchema &probe_relation_; const std::vector<attribute_id> join_key_attributes_; const bool any_join_key_attributes_nullable_; const block_id block_id_; const Predicate *residual_predicate_; const std::vector<std::unique_ptr<const Scalar>> &selection_; + const JoinHashTable &hash_table_; InsertDestination *output_destination_; - JoinHashTable *hash_table_; StorageManager *storage_manager_; - DISALLOW_COPY_AND_ASSIGN(HashJoinWorkOrder); + DISALLOW_COPY_AND_ASSIGN(HashAntiJoinWorkOrder); }; /** @} */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index e0ec19d..1a0bcd1 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -29,18 +29,20 @@ enum WorkOrderType { DESTROY_HASH = 6; DROP_TABLE = 7; FINALIZE_AGGREGATION = 8; - HASH_JOIN = 9; - INSERT = 10; - NESTED_LOOP_JOIN = 11; - SAMPLE = 12; - SAVE_BLOCKS = 13; - SELECT = 14; - SORT_MERGE_RUN = 15; - SORT_RUN_GENERATION = 16; - TABLE_GENERATOR = 17; - TEXT_SCAN = 18; - TEXT_SPLIT = 19; - UPDATE = 20; + HASH_INNER_JOIN = 9; + HASH_SEMI_JOIN = 10; + HASH_ANTI_JOIN = 11; + INSERT = 12; + NESTED_LOOP_JOIN = 13; + SAMPLE = 14; + SAVE_BLOCKS = 15; + SELECT = 16; + SORT_MERGE_RUN = 17; + SORT_RUN_GENERATION = 18; + TABLE_GENERATOR = 19; + TEXT_SCAN = 20; + TEXT_SPLIT = 21; + UPDATE = 22; } message WorkOrder { @@ -104,7 +106,7 @@ message FinalizeAggregationWorkOrder { } } -message HashJoinWorkOrder { +message HashInnerJoinWorkOrder { extend WorkOrder { // All required. optional int32 build_relation_id = 160; @@ -119,6 +121,36 @@ message HashJoinWorkOrder { } } +message HashAntiJoinWorkOrder { + extend WorkOrder { + // All required. + optional int32 build_relation_id = 350; + optional int32 probe_relation_id = 351; + repeated int32 join_key_attributes = 352; + optional bool any_join_key_attributes_nullable = 353; + optional int32 insert_destination_index = 354; + optional uint32 join_hash_table_index = 355; + optional int32 residual_predicate_index = 356; + optional int32 selection_index = 357; + optional fixed64 block_id = 358; + } +} + +message HashSemiJoinWorkOrder { + extend WorkOrder { + // All required. + optional int32 build_relation_id = 360; + optional int32 probe_relation_id = 361; + repeated int32 join_key_attributes = 362; + optional bool any_join_key_attributes_nullable = 363; + optional int32 insert_destination_index = 364; + optional uint32 join_hash_table_index = 365; + optional int32 residual_predicate_index = 366; + optional int32 selection_index = 367; + optional fixed64 block_id = 368; + } +} + message InsertWorkOrder { extend WorkOrder { // All required. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index 4713681..92c1140 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -135,30 +135,88 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder query_context->getInsertDestination( proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index))); } - case serialization::HASH_JOIN: { - LOG(INFO) << "Creating HashJoinWorkOrder"; + case serialization::HASH_INNER_JOIN: { + LOG(INFO) << "Creating HashInnerJoinWorkOrder"; vector<attribute_id> join_key_attributes; - for (int i = 0; i < proto.ExtensionSize(serialization::HashJoinWorkOrder::join_key_attributes); ++i) { + const int join_key_attributes_size = + proto.ExtensionSize(serialization::HashAntiJoinWorkOrder::join_key_attributes); + for (int i = 0; i < join_key_attributes_size; ++i) { join_key_attributes.push_back( - proto.GetExtension(serialization::HashJoinWorkOrder::join_key_attributes, i)); + proto.GetExtension(serialization::HashInnerJoinWorkOrder::join_key_attributes, i)); } - return new HashJoinWorkOrder( + return new HashInnerJoinWorkOrder( catalog_database->getRelationSchemaById( - proto.GetExtension(serialization::HashJoinWorkOrder::build_relation_id)), + proto.GetExtension(serialization::HashInnerJoinWorkOrder::build_relation_id)), catalog_database->getRelationSchemaById( - proto.GetExtension(serialization::HashJoinWorkOrder::probe_relation_id)), + proto.GetExtension(serialization::HashInnerJoinWorkOrder::probe_relation_id)), move(join_key_attributes), - proto.GetExtension(serialization::HashJoinWorkOrder::any_join_key_attributes_nullable), - proto.GetExtension(serialization::HashJoinWorkOrder::block_id), + proto.GetExtension(serialization::HashInnerJoinWorkOrder::any_join_key_attributes_nullable), + proto.GetExtension(serialization::HashInnerJoinWorkOrder::block_id), query_context->getPredicate( - proto.GetExtension(serialization::HashJoinWorkOrder::residual_predicate_index)), + proto.GetExtension(serialization::HashInnerJoinWorkOrder::residual_predicate_index)), query_context->getScalarGroup( - proto.GetExtension(serialization::HashJoinWorkOrder::selection_index)), + proto.GetExtension(serialization::HashInnerJoinWorkOrder::selection_index)), + *query_context->getJoinHashTable( + proto.GetExtension(serialization::HashInnerJoinWorkOrder::join_hash_table_index)), query_context->getInsertDestination( - proto.GetExtension(serialization::HashJoinWorkOrder::insert_destination_index)), - query_context->getJoinHashTable( - proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index)), + proto.GetExtension(serialization::HashInnerJoinWorkOrder::insert_destination_index)), + storage_manager); + } + case serialization::HASH_SEMI_JOIN: { + LOG(INFO) << "Creating HashSemiJoinWorkOrder"; + vector<attribute_id> join_key_attributes; + const int join_key_attributes_size = + proto.ExtensionSize(serialization::HashAntiJoinWorkOrder::join_key_attributes); + for (int i = 0; i < join_key_attributes_size; ++i) { + join_key_attributes.push_back( + proto.GetExtension(serialization::HashSemiJoinWorkOrder::join_key_attributes, i)); + } + + return new HashSemiJoinWorkOrder( + catalog_database->getRelationSchemaById( + proto.GetExtension(serialization::HashSemiJoinWorkOrder::build_relation_id)), + catalog_database->getRelationSchemaById( + proto.GetExtension(serialization::HashSemiJoinWorkOrder::probe_relation_id)), + move(join_key_attributes), + proto.GetExtension(serialization::HashSemiJoinWorkOrder::any_join_key_attributes_nullable), + proto.GetExtension(serialization::HashSemiJoinWorkOrder::block_id), + query_context->getPredicate( + proto.GetExtension(serialization::HashSemiJoinWorkOrder::residual_predicate_index)), + query_context->getScalarGroup( + proto.GetExtension(serialization::HashSemiJoinWorkOrder::selection_index)), + *query_context->getJoinHashTable( + proto.GetExtension(serialization::HashSemiJoinWorkOrder::join_hash_table_index)), + query_context->getInsertDestination( + proto.GetExtension(serialization::HashSemiJoinWorkOrder::insert_destination_index)), + storage_manager); + } + case serialization::HASH_ANTI_JOIN: { + LOG(INFO) << "Creating HashAntiJoinWorkOrder"; + vector<attribute_id> join_key_attributes; + const int join_key_attributes_size = + proto.ExtensionSize(serialization::HashAntiJoinWorkOrder::join_key_attributes); + for (int i = 0; i < join_key_attributes_size; ++i) { + join_key_attributes.push_back( + proto.GetExtension(serialization::HashAntiJoinWorkOrder::join_key_attributes, i)); + } + + return new HashAntiJoinWorkOrder( + catalog_database->getRelationSchemaById( + proto.GetExtension(serialization::HashAntiJoinWorkOrder::build_relation_id)), + catalog_database->getRelationSchemaById( + proto.GetExtension(serialization::HashAntiJoinWorkOrder::probe_relation_id)), + move(join_key_attributes), + proto.GetExtension(serialization::HashAntiJoinWorkOrder::any_join_key_attributes_nullable), + proto.GetExtension(serialization::HashAntiJoinWorkOrder::block_id), + query_context->getPredicate( + proto.GetExtension(serialization::HashAntiJoinWorkOrder::residual_predicate_index)), + query_context->getScalarGroup( + proto.GetExtension(serialization::HashAntiJoinWorkOrder::selection_index)), + *query_context->getJoinHashTable( + proto.GetExtension(serialization::HashAntiJoinWorkOrder::join_hash_table_index)), + query_context->getInsertDestination( + proto.GetExtension(serialization::HashAntiJoinWorkOrder::insert_destination_index)), storage_manager); } case serialization::INSERT: { @@ -394,46 +452,137 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, query_context.isValidInsertDestinationId( proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index)); } - case serialization::HASH_JOIN: { - if (!proto.HasExtension(serialization::HashJoinWorkOrder::build_relation_id) || - !proto.HasExtension(serialization::HashJoinWorkOrder::probe_relation_id)) { + case serialization::HASH_INNER_JOIN: { + if (!proto.HasExtension(serialization::HashInnerJoinWorkOrder::build_relation_id) || + !proto.HasExtension(serialization::HashInnerJoinWorkOrder::probe_relation_id)) { + return false; + } + + const relation_id build_relation_id = + proto.GetExtension(serialization::HashInnerJoinWorkOrder::build_relation_id); + if (!catalog_database.hasRelationWithId(build_relation_id)) { + return false; + } + + const relation_id probe_relation_id = + proto.GetExtension(serialization::HashInnerJoinWorkOrder::probe_relation_id); + if (!catalog_database.hasRelationWithId(probe_relation_id)) { + return false; + } + + const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id); + const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id); + for (int i = 0; i < proto.ExtensionSize(serialization::HashInnerJoinWorkOrder::join_key_attributes); ++i) { + const attribute_id attr_id = + proto.GetExtension(serialization::HashInnerJoinWorkOrder::join_key_attributes, i); + if (!build_relation.hasAttributeWithId(attr_id) || + !probe_relation.hasAttributeWithId(attr_id)) { + return false; + } + } + + return proto.HasExtension(serialization::HashInnerJoinWorkOrder::any_join_key_attributes_nullable) && + proto.HasExtension(serialization::HashInnerJoinWorkOrder::insert_destination_index) && + query_context.isValidInsertDestinationId( + proto.GetExtension(serialization::HashInnerJoinWorkOrder::insert_destination_index)) && + proto.HasExtension(serialization::HashInnerJoinWorkOrder::join_hash_table_index) && + query_context.isValidJoinHashTableId( + proto.GetExtension(serialization::HashInnerJoinWorkOrder::join_hash_table_index)) && + proto.HasExtension(serialization::HashInnerJoinWorkOrder::residual_predicate_index) && + query_context.isValidPredicate( + proto.GetExtension(serialization::HashInnerJoinWorkOrder::residual_predicate_index)) && + proto.HasExtension(serialization::HashInnerJoinWorkOrder::selection_index) && + query_context.isValidScalarGroupId( + proto.GetExtension(serialization::HashInnerJoinWorkOrder::selection_index)) && + proto.HasExtension(serialization::HashInnerJoinWorkOrder::block_id); + } + case serialization::HASH_SEMI_JOIN: { + if (!proto.HasExtension(serialization::HashSemiJoinWorkOrder::build_relation_id) || + !proto.HasExtension(serialization::HashSemiJoinWorkOrder::probe_relation_id)) { + return false; + } + + const relation_id build_relation_id = + proto.GetExtension(serialization::HashSemiJoinWorkOrder::build_relation_id); + if (!catalog_database.hasRelationWithId(build_relation_id)) { + return false; + } + + const relation_id probe_relation_id = + proto.GetExtension(serialization::HashSemiJoinWorkOrder::probe_relation_id); + if (!catalog_database.hasRelationWithId(probe_relation_id)) { + return false; + } + + const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id); + const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id); + for (int i = 0; i < proto.ExtensionSize(serialization::HashSemiJoinWorkOrder::join_key_attributes); ++i) { + const attribute_id attr_id = + proto.GetExtension(serialization::HashSemiJoinWorkOrder::join_key_attributes, i); + if (!build_relation.hasAttributeWithId(attr_id) || + !probe_relation.hasAttributeWithId(attr_id)) { + return false; + } + } + + return proto.HasExtension(serialization::HashSemiJoinWorkOrder::any_join_key_attributes_nullable) && + proto.HasExtension(serialization::HashSemiJoinWorkOrder::insert_destination_index) && + query_context.isValidInsertDestinationId( + proto.GetExtension(serialization::HashSemiJoinWorkOrder::insert_destination_index)) && + proto.HasExtension(serialization::HashSemiJoinWorkOrder::join_hash_table_index) && + query_context.isValidJoinHashTableId( + proto.GetExtension(serialization::HashSemiJoinWorkOrder::join_hash_table_index)) && + proto.HasExtension(serialization::HashSemiJoinWorkOrder::residual_predicate_index) && + query_context.isValidPredicate( + proto.GetExtension(serialization::HashSemiJoinWorkOrder::residual_predicate_index)) && + proto.HasExtension(serialization::HashSemiJoinWorkOrder::selection_index) && + query_context.isValidScalarGroupId( + proto.GetExtension(serialization::HashSemiJoinWorkOrder::selection_index)) && + proto.HasExtension(serialization::HashSemiJoinWorkOrder::block_id); + } + case serialization::HASH_ANTI_JOIN: { + if (!proto.HasExtension(serialization::HashAntiJoinWorkOrder::build_relation_id) || + !proto.HasExtension(serialization::HashAntiJoinWorkOrder::probe_relation_id)) { return false; } - const relation_id build_relation_id = proto.GetExtension(serialization::HashJoinWorkOrder::build_relation_id); + const relation_id build_relation_id = + proto.GetExtension(serialization::HashAntiJoinWorkOrder::build_relation_id); if (!catalog_database.hasRelationWithId(build_relation_id)) { return false; } - const relation_id probe_relation_id = proto.GetExtension(serialization::HashJoinWorkOrder::probe_relation_id); + const relation_id probe_relation_id = + proto.GetExtension(serialization::HashAntiJoinWorkOrder::probe_relation_id); if (!catalog_database.hasRelationWithId(probe_relation_id)) { return false; } const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id); const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id); - for (int i = 0; i < proto.ExtensionSize(serialization::HashJoinWorkOrder::join_key_attributes); ++i) { - const attribute_id attr_id = proto.GetExtension(serialization::HashJoinWorkOrder::join_key_attributes, i); + for (int i = 0; i < proto.ExtensionSize(serialization::HashAntiJoinWorkOrder::join_key_attributes); ++i) { + const attribute_id attr_id = + proto.GetExtension(serialization::HashAntiJoinWorkOrder::join_key_attributes, i); if (!build_relation.hasAttributeWithId(attr_id) || !probe_relation.hasAttributeWithId(attr_id)) { return false; } } - return proto.HasExtension(serialization::HashJoinWorkOrder::any_join_key_attributes_nullable) && - proto.HasExtension(serialization::HashJoinWorkOrder::insert_destination_index) && + return proto.HasExtension(serialization::HashAntiJoinWorkOrder::any_join_key_attributes_nullable) && + proto.HasExtension(serialization::HashAntiJoinWorkOrder::insert_destination_index) && query_context.isValidInsertDestinationId( - proto.GetExtension(serialization::HashJoinWorkOrder::insert_destination_index)) && - proto.HasExtension(serialization::HashJoinWorkOrder::join_hash_table_index) && + proto.GetExtension(serialization::HashAntiJoinWorkOrder::insert_destination_index)) && + proto.HasExtension(serialization::HashAntiJoinWorkOrder::join_hash_table_index) && query_context.isValidJoinHashTableId( - proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index)) && - proto.HasExtension(serialization::HashJoinWorkOrder::residual_predicate_index) && + proto.GetExtension(serialization::HashAntiJoinWorkOrder::join_hash_table_index)) && + proto.HasExtension(serialization::HashAntiJoinWorkOrder::residual_predicate_index) && query_context.isValidPredicate( - proto.GetExtension(serialization::HashJoinWorkOrder::residual_predicate_index)) && - proto.HasExtension(serialization::HashJoinWorkOrder::selection_index) && + proto.GetExtension(serialization::HashAntiJoinWorkOrder::residual_predicate_index)) && + proto.HasExtension(serialization::HashAntiJoinWorkOrder::selection_index) && query_context.isValidScalarGroupId( - proto.GetExtension(serialization::HashJoinWorkOrder::selection_index)) && - proto.HasExtension(serialization::HashJoinWorkOrder::block_id); + proto.GetExtension(serialization::HashAntiJoinWorkOrder::selection_index)) && + proto.HasExtension(serialization::HashAntiJoinWorkOrder::block_id); } case serialization::INSERT: { return proto.HasExtension(serialization::InsertWorkOrder::insert_destination_index) && http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/storage/HashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp index ab74e2c..ef79d11 100644 --- a/storage/HashTable.hpp +++ b/storage/HashTable.hpp @@ -707,6 +707,89 @@ class HashTable : public HashTableBase<resizable, FunctorT *functor) const; /** + * @brief Lookup (multiple) keys from a ValueAccessor, apply a functor to + * the matching values and additionally call a hasMatch() function of + * the functor when the first match for a key is found. + * @warning This method assumes that no concurrent calls to put(), + * putCompositeKey(), putValueAccessor(), + * putValueAccessorCompositeKey(), upsert(), upsertCompositeKey(), + * upsertValueAccessor(), or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingle(), + * getSingleCompositeKey(), getAll(), getAllCompositeKey(), + * getAllFromValueAccessor(), getAllFromValueAccessorCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * @note This version is for single scalar keys. See also + * getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch(). + * + * @param accessor A ValueAccessor which will be used to access keys. + * beginIteration() should be called on accessor before calling this + * method. + * @param key_attr_id The attribute ID of the keys to be read from accessor. + * @param check_for_null_keys If true, each key will be checked to see if it + * is null before looking it up (null keys are skipped). This must be + * set to true if some of the keys that will be read from accessor may + * be null. + * @param functor A pointer to a functor, which should provide two functions: + * 1) An operator that takes 2 arguments: const ValueAccessor& (or better + * yet, a templated call operator which takes a const reference to + * some subclass of ValueAccessor as its first argument) and + * const ValueT&. The operator will be invoked once for each pair of a + * key taken from accessor and matching value. + * 2) A function hasMatch that takes 1 argument: const ValueAccessor&. + * The function will be called only once for a key from accessor when + * the first match is found. + */ + template <typename FunctorT> + void getAllFromValueAccessorWithExtraWorkForFirstMatch( + ValueAccessor *accessor, + const attribute_id key_attr_id, + const bool check_for_null_keys, + FunctorT *functor) const; + + /** + * @brief Lookup (multiple) keys from a ValueAccessor, apply a functor to + * the matching values and additionally call a hasMatch() function of + * the functor when the first match for a key is found. Composite key + * version. + * @warning This method assumes that no concurrent calls to put(), + * putCompositeKey(), putValueAccessor(), + * putValueAccessorCompositeKey(), upsert(), upsertCompositeKey(), + * upsertValueAccessor(), or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingle(), + * getSingleCompositeKey(), getAll(), getAllCompositeKey(), + * getAllFromValueAccessor(), getAllFromValueAccessorCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * + * @param accessor A ValueAccessor which will be used to access keys. + * beginIteration() should be called on accessor before calling this + * method. + * @param key_attr_id The attribute ID of the keys to be read from accessor. + * @param check_for_null_keys If true, each key will be checked to see if it + * is null before looking it up (null keys are skipped). This must be + * set to true if some of the keys that will be read from accessor may + * be null. + * @param functor A pointer to a functor, which should provide two functions: + * 1) An operator that takes 2 arguments: const ValueAccessor& (or better + * yet, a templated call operator which takes a const reference to + * some subclass of ValueAccessor as its first argument) and + * const ValueT&. The operator will be invoked once for each pair of a + * key taken from accessor and matching value. + * 2) A function hasMatch that takes 1 argument: const ValueAccessor&. + * The function will be called only once for a key from accessor when + * the first match is found. + */ + template <typename FunctorT> + void getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch( + ValueAccessor *accessor, + const std::vector<attribute_id> &key_attr_ids, + const bool check_for_null_keys, + FunctorT *functor) const; + + /** * @brief Lookup (multiple) keys from a ValueAccessor and apply a functor to * the matching values. Composite key version. * @@ -746,6 +829,113 @@ class HashTable : public HashTableBase<resizable, FunctorT *functor) const; /** + * @brief Apply the functor to each key with a match in the hash table. + * + * @param accessor A ValueAccessor which will be used to access keys. + * beginIteration() should be called on accessor before calling this + * method. + * @param key_attr_id The attribute ID of the keys to be read from accessor. + * @param check_for_null_keys If true, each key will be checked to see if it + * is null before looking it up (null keys are skipped). This must be + * set to true if some of the keys that will be read from accessor may + * be null. + * @param functor A pointer to a functor which should provide an operator that + * takes 1 argument: const ValueAccessor&. The operator will be called + * only once for a key from accessor if there is a match. + */ + template <typename FunctorT> + void runOverKeysFromValueAccessorIfMatchFound(ValueAccessor *accessor, + const attribute_id key_attr_id, + const bool check_for_null_keys, + FunctorT *functor) const { + return runOverKeysFromValueAccessor<true>(accessor, + key_attr_id, + check_for_null_keys, + functor); + } + + /** + * @brief Apply the functor to each key with a match in the hash table. + * + * @param accessor A ValueAccessor which will be used to access keys. + * beginIteration() should be called on accessor before calling this + * method. + * @param key_attr_id The attribute ID of the keys to be read from accessor. + * @param check_for_null_keys If true, each key will be checked to see if it + * is null before looking it up (null keys are skipped). This must be + * set to true if some of the keys that will be read from accessor may + * be null. + * @param functor A pointer to a functor which should provide an operator that + * takes 1 argument: const ValueAccessor&. The operator will be called + * only once for a key from accessor if there is a match. + */ + template <typename FunctorT> + void runOverKeysFromValueAccessorIfMatchFoundCompositeKey( + ValueAccessor *accessor, + const std::vector<attribute_id> &key_attr_ids, + const bool check_for_null_keys, + FunctorT *functor) const { + return runOverKeysFromValueAccessorCompositeKey<true>(accessor, + key_attr_ids, + check_for_null_keys, + functor); + } + + /** + * @brief Apply the functor to each key without a match in the hash table. + * + * @param accessor A ValueAccessor which will be used to access keys. + * beginIteration() should be called on accessor before calling this + * method. + * @param key_attr_id The attribute ID of the keys to be read from accessor. + * @param check_for_null_keys If true, each key will be checked to see if it + * is null before looking it up (null keys are skipped). This must be + * set to true if some of the keys that will be read from accessor may + * be null. + * @param functor A pointer to a functor which should provide an operator that + * takes 1 argument: const ValueAccessor&. The operator will be called + * only once for a key from accessor if there is no match. + */ + template <typename FunctorT> + void runOverKeysFromValueAccessorIfMatchNotFound( + ValueAccessor *accessor, + const attribute_id key_attr_id, + const bool check_for_null_keys, + FunctorT *functor) const { + return runOverKeysFromValueAccessor<false>(accessor, + key_attr_id, + check_for_null_keys, + functor); + } + + /** + * @brief Apply the functor to each key without a match in the hash table. + * + * @param accessor A ValueAccessor which will be used to access keys. + * beginIteration() should be called on accessor before calling this + * method. + * @param key_attr_id The attribute ID of the keys to be read from accessor. + * @param check_for_null_keys If true, each key will be checked to see if it + * is null before looking it up (null keys are skipped). This must be + * set to true if some of the keys that will be read from accessor may + * be null. + * @param functor A pointer to a functor which should provide an operator that + * takes 1 argument: const ValueAccessor&. The operator will be called + * only once for a key from accessor if there is no match. + */ + template <typename FunctorT> + void runOverKeysFromValueAccessorIfMatchNotFoundCompositeKey( + ValueAccessor *accessor, + const std::vector<attribute_id> &key_attr_ids, + const bool check_for_null_keys, + FunctorT *functor) const { + return runOverKeysFromValueAccessorCompositeKey<false>(accessor, + key_attr_ids, + check_for_null_keys, + functor); + } + + /** * @brief Apply a functor to each key, value pair in this hash table. * * @warning This method assumes that no concurrent calls to put(), @@ -965,6 +1155,10 @@ class HashTable : public HashTableBase<resizable, const ValueT **value, std::size_t *entry_num) const = 0; + // Return true if key exists in the hash table. + virtual bool hasKey(const TypedValue &key) const = 0; + virtual bool hasCompositeKey(const std::vector<TypedValue> &key) const = 0; + // For a resizable HashTable, grow to accomodate more entries. If // 'extra_buckets' is not zero, it may serve as a "hint" to implementations // that at least the requested number of extra buckets are required when @@ -1048,6 +1242,21 @@ class HashTable : public HashTableBase<resizable, return false; } + // If run_if_match_found is true, apply the functor to each key if a match is + // found; otherwise, apply the functor if no match is found. + template <bool run_if_match_found, typename FunctorT> + void runOverKeysFromValueAccessor(ValueAccessor *accessor, + const attribute_id key_attr_id, + const bool check_for_null_keys, + FunctorT *functor) const; + + template <bool run_if_match_found, typename FunctorT> + void runOverKeysFromValueAccessorCompositeKey( + ValueAccessor *accessor, + const std::vector<attribute_id> &key_attr_ids, + const bool check_for_null_keys, + FunctorT *functor) const; + // Method containing the actual logic implementing getAllFromValueAccessor(). // Has extra template parameters that control behavior to avoid some // inner-loop branching. @@ -1678,6 +1887,184 @@ template <typename ValueT, bool force_key_copy, bool allow_duplicate_keys> template <typename FunctorT> +void HashTable<ValueT, + resizable, + serializable, + force_key_copy, + allow_duplicate_keys>:: + getAllFromValueAccessorWithExtraWorkForFirstMatch( + ValueAccessor *accessor, + const attribute_id key_attr_id, + const bool check_for_null_keys, + FunctorT *functor) const { + InvokeOnAnyValueAccessor( + accessor, + [&](auto *accessor) -> void { // NOLINT(build/c++11) + while (accessor->next()) { + TypedValue key = accessor->getTypedValue(key_attr_id); + if (check_for_null_keys && key.isNull()) { + continue; + } + const std::size_t hash_code = adjust_hashes_ ? AdjustHash(key.getHash()) + : key.getHash(); + std::size_t entry_num = 0; + const ValueT *value; + if (getNextEntryForKey(key, hash_code, &value, &entry_num)) { + functor->recordMatch(*accessor); + (*functor)(*accessor, *value); + if (!allow_duplicate_keys) { + continue; + } + while (getNextEntryForKey(key, hash_code, &value, &entry_num)) { + (*functor)(*accessor, *value); + } + } + } + }); // NOLINT(whitespace/parens) +} + +template <typename ValueT, + bool resizable, + bool serializable, + bool force_key_copy, + bool allow_duplicate_keys> +template <typename FunctorT> +void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys> + ::getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch( + ValueAccessor *accessor, + const std::vector<attribute_id> &key_attr_ids, + const bool check_for_null_keys, + FunctorT *functor) const { + DEBUG_ASSERT(key_types_.size() == key_attr_ids.size()); + std::vector<TypedValue> key_vector; + key_vector.resize(key_attr_ids.size()); + InvokeOnAnyValueAccessor( + accessor, + [&](auto *accessor) -> void { // NOLINT(build/c++11) + while (accessor->next()) { + bool null_key = false; + for (std::vector<attribute_id>::size_type key_idx = 0; + key_idx < key_types_.size(); + ++key_idx) { + key_vector[key_idx] = accessor->getTypedValue(key_attr_ids[key_idx]); + if (check_for_null_keys && key_vector[key_idx].isNull()) { + null_key = true; + break; + } + } + if (null_key) { + continue; + } + + const std::size_t hash_code = adjust_hashes_ ? AdjustHash(hashCompositeKey(key_vector)) + : hashCompositeKey(key_vector); + std::size_t entry_num = 0; + const ValueT *value; + if (getNextEntryForCompositeKey(key_vector, hash_code, &value, &entry_num)) { + functor->recordMatch(*accessor); + (*functor)(*accessor, *value); + if (!allow_duplicate_keys) { + continue; + } + while (getNextEntryForCompositeKey(key_vector, hash_code, &value, &entry_num)) { + (*functor)(*accessor, *value); + } + } + } + }); // NOLINT(whitespace/parens) +} + +template <typename ValueT, + bool resizable, + bool serializable, + bool force_key_copy, + bool allow_duplicate_keys> +template <bool run_if_match_found, typename FunctorT> +void HashTable<ValueT, + resizable, + serializable, + force_key_copy, + allow_duplicate_keys>:: + runOverKeysFromValueAccessor(ValueAccessor *accessor, + const attribute_id key_attr_id, + const bool check_for_null_keys, + FunctorT *functor) const { + InvokeOnAnyValueAccessor( + accessor, + [&](auto *accessor) -> void { // NOLINT(build/c++11) + while (accessor->next()) { + TypedValue key = accessor->getTypedValue(key_attr_id); + if (check_for_null_keys && key.isNull()) { + if (!run_if_match_found) { + (*functor)(*accessor); + continue; + } + } + if (run_if_match_found) { + if (this->hasKey(key)) { + (*functor)(*accessor); + } + } else { + if (!this->hasKey(key)) { + (*functor)(*accessor); + } + } + } + }); // NOLINT(whitespace/parens) +} + +template <typename ValueT, + bool resizable, + bool serializable, + bool force_key_copy, + bool allow_duplicate_keys> +template <bool run_if_match_found, typename FunctorT> +void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys> + ::runOverKeysFromValueAccessorCompositeKey(ValueAccessor *accessor, + const std::vector<attribute_id> &key_attr_ids, + const bool check_for_null_keys, + FunctorT *functor) const { + DEBUG_ASSERT(key_types_.size() == key_attr_ids.size()); + std::vector<TypedValue> key_vector; + key_vector.resize(key_attr_ids.size()); + InvokeOnAnyValueAccessor( + accessor, + [&](auto *accessor) -> void { // NOLINT(build/c++11) + while (accessor->next()) { + bool null_key = false; + for (std::vector<attribute_id>::size_type key_idx = 0; + key_idx < key_types_.size(); + ++key_idx) { + key_vector[key_idx] = accessor->getTypedValue(key_attr_ids[key_idx]); + if (check_for_null_keys && key_vector[key_idx].isNull()) { + null_key = true; + break; + } + } + if (null_key) { + if (!run_if_match_found) { + (*functor)(*accessor); + continue; + } + } + + if (run_if_match_found) { + if (this->hasCompositeKey(key_vector)) { + (*functor)(*accessor); + } + } else if (!this->hasCompositeKey(key_vector)) { + (*functor)(*accessor); + } + } + }); // NOLINT(whitespace/parens) +} + +template <typename ValueT, + bool resizable, + bool serializable, + bool force_key_copy, + bool allow_duplicate_keys> +template <typename FunctorT> std::size_t HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys> ::forEach(FunctorT *functor) const { std::size_t entries_visited = 0;
