http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/query_optimizer/rules/AttachBloomFilters.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/AttachBloomFilters.cpp b/query_optimizer/rules/AttachBloomFilters.cpp new file mode 100644 index 0000000..03a42a0 --- /dev/null +++ b/query_optimizer/rules/AttachBloomFilters.cpp @@ -0,0 +1,308 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_optimizer/rules/AttachBloomFilters.hpp" + +#include <memory> +#include <set> +#include <unordered_set> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" +#include "query_optimizer/expressions/AttributeReference.hpp" +#include "query_optimizer/expressions/NamedExpression.hpp" +#include "query_optimizer/expressions/PatternMatcher.hpp" +#include "query_optimizer/physical/HashJoin.hpp" +#include "query_optimizer/physical/PatternMatcher.hpp" +#include "query_optimizer/physical/Physical.hpp" +#include "query_optimizer/physical/PhysicalType.hpp" +#include "query_optimizer/physical/TopLevelPlan.hpp" + +#include "glog/logging.h" + +namespace quickstep { +namespace optimizer { + +namespace E = ::quickstep::optimizer::expressions; +namespace P = ::quickstep::optimizer::physical; + +P::PhysicalPtr AttachBloomFilters::apply(const P::PhysicalPtr &input) { + DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan); + cost_model_.reset( + new cost::StarSchemaSimpleCostModel( + std::static_pointer_cast<const P::TopLevelPlan>(input)->shared_subplans())); + + visitProducer(input, 0); + visitConsumer(input); + +// for (const auto &info_vec_pair : consumers_) { +// std::cerr << "--------\n" +// << "Node " << info_vec_pair.first->getName() +// << " " << info_vec_pair.first << "\n"; +// +// for (const auto &info : info_vec_pair.second) { +// std::cerr << info.attribute->attribute_alias(); +// if (info.attribute->id() != info.source_attribute->id()) { +// std::cerr << "{FROM " << info.source_attribute->attribute_alias() << "}"; +// } +// if (info.from_sibling) { +// std::cerr << " sibling"; +// } +// std::cerr << " @" << info.source << "[" << info.depth << "]" +// << ": " << info.selectivity << "\n"; +// } +// std::cerr << "********\n"; +// } + + return visitAndAttach(input); +} + +void AttachBloomFilters::visitProducer(const P::PhysicalPtr &node, const int depth) { + for (const P::PhysicalPtr &child : node->children()) { + visitProducer(child, depth+1); + } + + std::vector<BloomFilterInfo> bloom_filters; + + if (node->getPhysicalType() == P::PhysicalType::kHashJoin) { + const P::HashJoinPtr &hash_join = + std::static_pointer_cast<const P::HashJoin>(node); + const P::PhysicalPtr &build_node = hash_join->right(); + double selectivity = cost_model_->estimateSelectivity(build_node); + if (selectivity < 1.0) { + auto &build_node_info = producers_[build_node]; + for (const auto &attr : hash_join->right_join_attributes()) { + build_node_info.emplace_back(node, attr, depth, selectivity, false); + } + } + } + + const std::vector<E::AttributeReferencePtr> output_attributes( + node->getOutputAttributes()); + std::unordered_set<E::ExprId> output_attribute_ids; + for (const auto &attr : output_attributes) { + output_attribute_ids.emplace(attr->id()); + } + + // First check inherited bloom filters + std::vector<const BloomFilterInfo*> candidates; + switch (node->getPhysicalType()) { + case P::PhysicalType::kAggregate: + case P::PhysicalType::kSelection: + case P::PhysicalType::kHashJoin: { + for (const P::PhysicalPtr &child : node->children()) { + for (const BloomFilterInfo &info : producers_[child]) { + candidates.emplace_back(&info); + } + } + } + default: + break; + } + + for (const BloomFilterInfo *info : candidates) { + if (output_attribute_ids.find(info->attribute->id()) != output_attribute_ids.end()) { + bloom_filters.emplace_back( + info->source, info->attribute, info->depth, info->selectivity, false); + } + } + + // Self-produced bloom filters +// double selectivity = cost_model_->estimateSelectivity(node); +// if (selectivity < 1.0) { +// for (const auto &attr : output_attributes) { +// bloom_filters.emplace_back(node, attr, depth, selectivity, false); +// } +// } + + producers_.emplace(node, std::move(bloom_filters)); +} + +void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) { + std::vector<BloomFilterInfo> bloom_filters; + + // Bloom filters from parent + const auto &parent_bloom_filters = consumers_[node]; + if (!parent_bloom_filters.empty()) { + for (const auto &child : node->children()) { + std::unordered_set<E::ExprId> child_output_attribute_ids; + for (const auto &attr : child->getOutputAttributes()) { + child_output_attribute_ids.emplace(attr->id()); + } + + std::vector<BloomFilterInfo> bloom_filters; + for (const auto &info : parent_bloom_filters) { + if (child_output_attribute_ids.find(info.attribute->id()) + != child_output_attribute_ids.end()) { + bloom_filters.emplace_back(info.source, + info.attribute, + info.depth, + info.selectivity, + false, + info.source_attribute); + } + } + consumers_.emplace(child, std::move(bloom_filters)); + } + } + + // Bloom filters from build side to probe side via HashJoin + if (node->getPhysicalType() == P::PhysicalType::kHashJoin) { + const P::HashJoinPtr hash_join = + std::static_pointer_cast<const P::HashJoin>(node); + if (hash_join->join_type() == P::HashJoin::JoinType::kInnerJoin || + hash_join->join_type() == P::HashJoin::JoinType::kLeftSemiJoin) { + const P::PhysicalPtr &producer_child = hash_join->right(); + const P::PhysicalPtr &consumer_child = hash_join->left(); + std::unordered_map<E::ExprId, E::AttributeReferencePtr> join_attribute_pairs; + for (std::size_t i = 0; i < hash_join->left_join_attributes().size(); ++i) { + const E::AttributeReferencePtr probe_join_attribute = + hash_join->left_join_attributes()[i]; + const E::AttributeReferencePtr build_join_attribute = + hash_join->right_join_attributes()[i]; + join_attribute_pairs.emplace(build_join_attribute->id(), + probe_join_attribute); + } + + auto &consumer_bloom_filters = consumers_[consumer_child]; + for (const auto &info : producers_[producer_child]) { + const auto pair_it = join_attribute_pairs.find(info.attribute->id()); + if (pair_it != join_attribute_pairs.end()) { + consumer_bloom_filters.emplace_back(info.source, + pair_it->second, + info.depth, + info.selectivity, + true, + info.attribute); + } + } + } + } + + P::PhysicalPtr consumer_child = nullptr; + if (node->getPhysicalType() == P::PhysicalType::kHashJoin) { + consumer_child = std::static_pointer_cast<const P::HashJoin>(node)->left(); + } + if (node->getPhysicalType() == P::PhysicalType::kAggregate) { + consumer_child = std::static_pointer_cast<const P::Aggregate>(node)->input(); + } + + if (consumer_child != nullptr) { + // Decide attaches + auto &consumer_bloom_filters = consumers_[consumer_child]; + if (cost_model_->estimateCardinality(consumer_child) > 10000000 && + !consumer_bloom_filters.empty()) { + std::map<E::AttributeReferencePtr, const BloomFilterInfo*> filters; + for (const auto &info : consumer_bloom_filters) { + auto it = filters.find(info.attribute); + if (it == filters.end()) { + filters.emplace(info.attribute, &info); + } else { + if (BloomFilterInfo::isBetterThan(&info, it->second)) { + it->second = &info; + } + } + } + + auto &probe_attaches = getBloomFilterConfig(node); + for (const auto &pair : filters) { + auto &build_attaches = getBloomFilterConfig(pair.second->source); + build_attaches.addBuildSideBloomFilter( + pair.second->source_attribute); + probe_attaches.addProbeSideBloomFilter( + pair.first, + pair.second->source_attribute, + pair.second->source); + } + } + } + + for (const auto &child : node->children()) { + visitConsumer(child); + } +} + +P::PhysicalPtr AttachBloomFilters::visitAndAttach(const physical::PhysicalPtr &node) { + std::vector<P::PhysicalPtr> new_children; + bool has_changed = false; + for (const auto &child : node->children()) { + P::PhysicalPtr new_child = visitAndAttach(child); + if (new_child != child) { + has_changed = true; + } + new_children.emplace_back(new_child); + } + + if (node->getPhysicalType() == P::PhysicalType::kHashJoin) { + const auto attach_it = attaches_.find(node); + if (attach_it != attaches_.end()) { +// for (const auto& item : attach_it->second.probe_side_bloom_filters) { +// std::cout << "Attach probe from " << item.builder +// << " to " << node << "\n"; +// } + + const P::HashJoinPtr hash_join = + std::static_pointer_cast<const P::HashJoin>(node); + return P::HashJoin::Create( + new_children[0], + new_children[1], + hash_join->left_join_attributes(), + hash_join->right_join_attributes(), + hash_join->residual_predicate(), + hash_join->project_expressions(), + hash_join->join_type(), + attach_it->second); + } + } + + if (node->getPhysicalType() == P::PhysicalType::kAggregate) { + const auto attach_it = attaches_.find(node); + if (attach_it != attaches_.end()) { +// for (const auto& item : attach_it->second.probe_side_bloom_filters) { +// std::cout << "Attach probe from " << item.builder +// << " to " << node << "\n"; +// } + + const P::AggregatePtr aggregate = + std::static_pointer_cast<const P::Aggregate>(node); + return P::Aggregate::Create( + aggregate->input(), + aggregate->grouping_expressions(), + aggregate->aggregate_expressions(), + aggregate->filter_predicate(), + attach_it->second); + } + } + + if (has_changed) { + return node->copyWithNewChildren(new_children); + } + + return node; +} + +P::BloomFilterConfig& AttachBloomFilters::getBloomFilterConfig(const physical::PhysicalPtr &node) { + if (attaches_.find(node) == attaches_.end()) { + attaches_.emplace(node, node); + } + return attaches_[node]; +} + +} // namespace optimizer +} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/query_optimizer/rules/AttachBloomFilters.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/AttachBloomFilters.hpp b/query_optimizer/rules/AttachBloomFilters.hpp new file mode 100644 index 0000000..e4437f7 --- /dev/null +++ b/query_optimizer/rules/AttachBloomFilters.hpp @@ -0,0 +1,118 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_BLOOM_FILTERS_HPP_ +#define QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_BLOOM_FILTERS_HPP_ + +#include <algorithm> +#include <cstddef> +#include <memory> +#include <string> +#include <unordered_map> +#include <unordered_set> +#include <utility> +#include <vector> + +#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" +#include "query_optimizer/expressions/ExprId.hpp" +#include "query_optimizer/expressions/NamedExpression.hpp" +#include "query_optimizer/expressions/Predicate.hpp" +#include "query_optimizer/physical/Physical.hpp" +#include "query_optimizer/physical/HashJoin.hpp" +#include "query_optimizer/rules/Rule.hpp" +#include "utility/Macros.hpp" + +namespace quickstep { +namespace optimizer { + +/** \addtogroup OptimizerRules + * @{ + */ + +/** + * @brief TODO + */ +class AttachBloomFilters : public Rule<physical::Physical> { + public: + AttachBloomFilters() {} + + ~AttachBloomFilters() override {} + + std::string getName() const override { + return "AttachBloomFilters"; + } + + physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override; + + private: + struct BloomFilterInfo { + BloomFilterInfo(const physical::PhysicalPtr &source_in, + const expressions::AttributeReferencePtr &attribute_in, + const int depth_in, + const double selectivity_in, + const bool from_sibling_in, + const expressions::AttributeReferencePtr &source_attribute_in = nullptr) + : source(source_in), + attribute(attribute_in), + depth(depth_in), + selectivity(selectivity_in), + from_sibling(from_sibling_in), + source_attribute( + source_attribute_in == nullptr + ? attribute_in + : source_attribute_in) { + + } + static bool isBetterThan(const BloomFilterInfo *a, + const BloomFilterInfo *b) { + if (a->selectivity == b->selectivity) { + return a->depth > b->depth; + } else { + return a->selectivity < b->selectivity; + } + } + physical::PhysicalPtr source; + expressions::AttributeReferencePtr attribute; + int depth; + double selectivity; + bool from_sibling; + expressions::AttributeReferencePtr source_attribute; + }; + + void visitProducer(const physical::PhysicalPtr &node, const int depth); + + void visitConsumer(const physical::PhysicalPtr &node); + + physical::PhysicalPtr visitAndAttach(const physical::PhysicalPtr &node); + + physical::BloomFilterConfig &getBloomFilterConfig(const physical::PhysicalPtr &node); + + std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_; + + std::map<physical::PhysicalPtr, std::vector<BloomFilterInfo>> producers_; + std::map<physical::PhysicalPtr, std::vector<BloomFilterInfo>> consumers_; + std::map<physical::PhysicalPtr, physical::BloomFilterConfig> attaches_; + + DISALLOW_COPY_AND_ASSIGN(AttachBloomFilters); +}; + +/** @} */ + +} // namespace optimizer +} // namespace quickstep + +#endif /* QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_BLOOM_FILTERS_HPP_ */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/query_optimizer/rules/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt index 04a9814..6b248f4 100644 --- a/query_optimizer/rules/CMakeLists.txt +++ b/query_optimizer/rules/CMakeLists.txt @@ -18,6 +18,7 @@ add_subdirectory(tests) # Declare micro-libs: +add_library(quickstep_queryoptimizer_rules_AttachBloomFilters AttachBloomFilters.cpp AttachBloomFilters.hpp) add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp) add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp) add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp) @@ -36,6 +37,20 @@ add_library(quickstep_queryoptimizer_rules_UnnestSubqueries UnnestSubqueries.cpp # Link dependencies: +target_link_libraries(quickstep_queryoptimizer_rules_AttachBloomFilters + quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel + quickstep_queryoptimizer_expressions_AttributeReference + quickstep_queryoptimizer_expressions_ExprId + quickstep_queryoptimizer_expressions_NamedExpression + quickstep_queryoptimizer_expressions_PatternMatcher + quickstep_queryoptimizer_expressions_Predicate + quickstep_queryoptimizer_physical_HashJoin + quickstep_queryoptimizer_physical_PatternMatcher + quickstep_queryoptimizer_physical_Physical + quickstep_queryoptimizer_physical_PhysicalType + quickstep_queryoptimizer_physical_TopLevelPlan + quickstep_queryoptimizer_rules_Rule + quickstep_utility_Macros) target_link_libraries(quickstep_queryoptimizer_rules_BottomUpRule glog quickstep_queryoptimizer_rules_Rule @@ -127,6 +142,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOpti quickstep_queryoptimizer_physical_PhysicalType quickstep_queryoptimizer_physical_TopLevelPlan quickstep_queryoptimizer_rules_Rule + quickstep_utility_DisjointTreeForest quickstep_utility_Macros) target_link_libraries(quickstep_queryoptimizer_rules_SwapProbeBuild quickstep_queryoptimizer_costmodel_SimpleCostModel @@ -187,6 +203,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_UpdateExpression # Module all-in-one library: add_library(quickstep_queryoptimizer_rules ../../empty_src.cpp OptimizerRulesModule.hpp) target_link_libraries(quickstep_queryoptimizer_rules + quickstep_queryoptimizer_rules_AttachBloomFilters quickstep_queryoptimizer_rules_BottomUpRule quickstep_queryoptimizer_rules_CollapseProject quickstep_queryoptimizer_rules_GenerateJoins http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp index 9770606..cfbb5d1 100644 --- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp +++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp @@ -31,6 +31,7 @@ #include "query_optimizer/physical/Physical.hpp" #include "query_optimizer/physical/PhysicalType.hpp" #include "query_optimizer/physical/TopLevelPlan.hpp" +#include "utility/DisjointTreeForest.hpp" #include "glog/logging.h" @@ -72,6 +73,9 @@ P::PhysicalPtr StarSchemaHashJoinOrderOptimization::applyInternal(const P::Physi JoinGroupInfo *join_group = nullptr; if (parent_join_group == nullptr || !is_valid_cascading_hash_join) { new_join_group.reset(new JoinGroupInfo()); + for (const auto &attr : input->getReferencedAttributes()) { + new_join_group->referenced_attributes.emplace(attr->id()); + } join_group = new_join_group.get(); } else { join_group = parent_join_group; @@ -144,7 +148,10 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan( i, tables[i], cost_model_->estimateCardinality(tables[i]), - cost_model_->estimateSelectivity(tables[i])); + cost_model_->estimateSelectivity(tables[i]), + CountSharedAttributes(join_group.referenced_attributes, + tables[i]->getOutputAttributes()), + tables[i]->getPhysicalType() == physical::PhysicalType::kAggregate); } // Auxiliary mapping info. @@ -161,9 +168,19 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan( } } - // Create a join graph where tables are vertices, and add an edge between vertices - // t1 and t2 for each join predicate t1.x = t2.y - std::vector<std::unordered_set<std::size_t>> join_graph(table_info_storage.size()); + std::set<TableInfo*> remaining_tables; + for (auto &table_info : table_info_storage) { + remaining_tables.emplace(&table_info); + } + + DisjointTreeForest<E::ExprId> join_attribute_forest; + for (const auto &attr_id_pair : join_group.join_attribute_pairs) { + join_attribute_forest.makeSet(attr_id_pair.first); + join_attribute_forest.makeSet(attr_id_pair.second); + join_attribute_forest.merge(attr_id_pair.first, attr_id_pair.second); + } + + std::map<std::size_t, std::map<std::size_t, E::ExprId>> join_attribute_groups; for (const auto &attr_id_pair : join_group.join_attribute_pairs) { DCHECK(attribute_id_to_table_info_index_map.find(attr_id_pair.first) != attribute_id_to_table_info_index_map.end()); @@ -176,128 +193,169 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan( attribute_id_to_table_info_index_map[attr_id_pair.second]; DCHECK_NE(first_table_idx, second_table_idx); - table_info_storage[first_table_idx].join_attribute_pairs.emplace( - attr_id_pair.first, attr_id_pair.second); - table_info_storage[second_table_idx].join_attribute_pairs.emplace( - attr_id_pair.second, attr_id_pair.first); - - join_graph[first_table_idx].emplace(second_table_idx); - join_graph[second_table_idx].emplace(first_table_idx); - } - - std::set<TableInfo*, TableInfoPtrLessComparator> table_info_ordered_by_priority; - for (std::size_t i = 0; i < table_info_storage.size(); ++i) { - table_info_ordered_by_priority.emplace(&table_info_storage[i]); + DCHECK_EQ(join_attribute_forest.find(attr_id_pair.first), + join_attribute_forest.find(attr_id_pair.second)); + const std::size_t attr_group_id = join_attribute_forest.find(attr_id_pair.first); + auto &attr_group = join_attribute_groups[attr_group_id]; + attr_group.emplace(first_table_idx, attr_id_pair.first); + attr_group.emplace(second_table_idx, attr_id_pair.second); } - // Contruct hash join tree. while (true) { - TableInfo *first_table_info = *table_info_ordered_by_priority.begin(); - table_info_ordered_by_priority.erase( - table_info_ordered_by_priority.begin()); - const std::size_t first_table_info_id = first_table_info->table_info_id; - - TableInfo *second_table_info = nullptr; - std::set<TableInfo*, TableInfoPtrLessComparator>::iterator second_table_info_it; - for (auto candidate_table_info_it = table_info_ordered_by_priority.begin(); - candidate_table_info_it != table_info_ordered_by_priority.end(); - ++candidate_table_info_it) { - TableInfo *candidate_table_info = *candidate_table_info_it; - const std::size_t candidate_table_info_id = candidate_table_info->table_info_id; - - if (join_graph[first_table_info_id].find(candidate_table_info_id) - == join_graph[first_table_info_id].end() && - join_graph[candidate_table_info_id].find(first_table_info_id) - == join_graph[candidate_table_info_id].end()) { - continue; - } else if (second_table_info == nullptr) { - second_table_info = candidate_table_info; - second_table_info_it = candidate_table_info_it; - } - - bool is_likely_many_to_many_join = false; - for (const auto join_attr_pair : first_table_info->join_attribute_pairs) { - if (candidate_table_info->joined_attribute_set.find(join_attr_pair.second) - != candidate_table_info->joined_attribute_set.end()) { - is_likely_many_to_many_join = true; - break; - } - } - for (const auto join_attr_pair : candidate_table_info->join_attribute_pairs) { - if (first_table_info->joined_attribute_set.find(join_attr_pair.second) - != first_table_info->joined_attribute_set.end()) { - is_likely_many_to_many_join = true; - break; + // TODO(jianqiao): design better data structure to improve efficiency here. + std::unique_ptr<JoinPair> best_join = nullptr; + for (TableInfo *probe_table_info : remaining_tables) { + for (TableInfo *build_table_info : remaining_tables) { + if (probe_table_info != build_table_info) { + std::vector<E::AttributeReferencePtr> build_attrs; + const std::size_t probe_table_id = probe_table_info->table_info_id; + const std::size_t build_table_id = build_table_info->table_info_id; + for (const auto &attr_group_pair : join_attribute_groups) { + const auto &attr_group = attr_group_pair.second; + auto probe_it = attr_group.find(probe_table_id); + auto build_it = attr_group.find(build_table_id); + if (probe_it != attr_group.end() && build_it != attr_group.end()) { + build_attrs.emplace_back( + attribute_id_to_reference_map.at(build_it->second)); + } + } + if (!build_attrs.empty() + && build_table_info->table->impliesUniqueAttributes(build_attrs)) { + std::unique_ptr<JoinPair> new_join( + new JoinPair(probe_table_info, build_table_info)); + if (best_join == nullptr || new_join->isBetterThan(*best_join)) { +// if (best_join != nullptr) { +// std::cerr << "(" << best_join->probe->estimated_selectivity +// << ", " << best_join->probe->estimated_cardinality << ")" +// << " -- " +// << "(" << best_join->build->estimated_selectivity +// << ", " << best_join->build->estimated_cardinality << ")" +// << "\n"; +// std::cerr << "REPLACED WITH\n"; +// } +// std::cerr << "(" << new_join->probe->estimated_selectivity +// << ", " << new_join->probe->estimated_cardinality << ")" +// << " -- " +// << "(" << new_join->build->estimated_selectivity +// << ", " << new_join->build->estimated_cardinality << ")" +// << "\n****\n"; + best_join.reset(new_join.release()); + } + } } } - if (!is_likely_many_to_many_join) { - second_table_info = candidate_table_info; - second_table_info_it = candidate_table_info_it; - break; - } } - DCHECK(second_table_info != nullptr); - table_info_ordered_by_priority.erase(second_table_info_it); - const P::PhysicalPtr &left_child = first_table_info->table; - const P::PhysicalPtr &right_child = second_table_info->table; + TableInfo *selected_probe_table_info = nullptr; + TableInfo *selected_build_table_info = nullptr; + + if (best_join != nullptr) { + selected_probe_table_info = best_join->probe; + selected_build_table_info = best_join->build; + } + + // TODO(jianqiao): Handle the case when there is no primary key-foreign key information available. + CHECK(selected_probe_table_info != nullptr); + CHECK(selected_build_table_info != nullptr); + +// std::cerr << selected_probe_table_info->estimated_selectivity +// << " -- " +// << selected_build_table_info->estimated_selectivity +// << "\n"; + +// std::cerr << selected_probe_table_info->estimated_num_output_attributes +// << " -- " +// << selected_build_table_info->estimated_num_output_attributes +// << "\n"; + + remaining_tables.erase(selected_probe_table_info); + remaining_tables.erase(selected_build_table_info); + + const P::PhysicalPtr &probe_child = selected_probe_table_info->table; + const P::PhysicalPtr &build_child = selected_build_table_info->table; std::vector<E::NamedExpressionPtr> output_attributes; - for (const E::AttributeReferencePtr &left_attr : left_child->getOutputAttributes()) { - output_attributes.emplace_back(left_attr); + for (const E::AttributeReferencePtr &probe_attr : probe_child->getOutputAttributes()) { + output_attributes.emplace_back(probe_attr); } - for (const E::AttributeReferencePtr &right_attr : right_child->getOutputAttributes()) { - output_attributes.emplace_back(right_attr); + for (const E::AttributeReferencePtr &build_attr : build_child->getOutputAttributes()) { + output_attributes.emplace_back(build_attr); } - std::vector<E::AttributeReferencePtr> left_join_attributes; - std::vector<E::AttributeReferencePtr> right_join_attributes; - std::unordered_set<expressions::ExprId> new_joined_attribute_set; - for (const auto &join_attr_pair : first_table_info->join_attribute_pairs) { - if (second_table_info->join_attribute_pairs.find(join_attr_pair.second) - != second_table_info->join_attribute_pairs.end()) { - left_join_attributes.emplace_back( - attribute_id_to_reference_map[join_attr_pair.first]); - right_join_attributes.emplace_back( - attribute_id_to_reference_map[join_attr_pair.second]); - - new_joined_attribute_set.emplace(join_attr_pair.first); - new_joined_attribute_set.emplace(join_attr_pair.second); + std::vector<E::AttributeReferencePtr> probe_attributes; + std::vector<E::AttributeReferencePtr> build_attributes; + const std::size_t probe_table_id = selected_probe_table_info->table_info_id; + const std::size_t build_table_id = selected_build_table_info->table_info_id; + for (const auto &attr_group_pair : join_attribute_groups) { + const auto &attr_group = attr_group_pair.second; + auto probe_it = attr_group.find(probe_table_id); + auto build_it = attr_group.find(build_table_id); + if (probe_it != attr_group.end() && build_it != attr_group.end()) { + probe_attributes.emplace_back( + attribute_id_to_reference_map.at(probe_it->second)); + build_attributes.emplace_back( + attribute_id_to_reference_map.at(build_it->second)); } } - DCHECK_GE(left_join_attributes.size(), static_cast<std::size_t>(1)); - if (table_info_ordered_by_priority.size() > 0) { + if (remaining_tables.size() > 0) { P::PhysicalPtr output = - P::HashJoin::Create(left_child, - right_child, - left_join_attributes, - right_join_attributes, + P::HashJoin::Create(probe_child, + build_child, + probe_attributes, + build_attributes, nullptr, output_attributes, P::HashJoin::JoinType::kInnerJoin); - second_table_info->table = output; +// P::PhysicalPtr output; +// if (selected_build_table_info->estimated_num_output_attributes >= 4 && +// selected_probe_table_info->estimated_num_output_attributes < 4) { +// output = P::HashJoin::Create(build_child, +// probe_child, +// build_attributes, +// probe_attributes, +// nullptr, +// output_attributes, +// P::HashJoin::JoinType::kInnerJoin); +// } else { +// output = P::HashJoin::Create(probe_child, +// build_child, +// probe_attributes, +// build_attributes, +// nullptr, +// output_attributes, +// P::HashJoin::JoinType::kInnerJoin); +// } + + selected_probe_table_info->table = output; // TODO(jianqiao): Cache the estimated cardinality for each plan in cost // model to avoid duplicated estimation. - second_table_info->estimated_cardinality = cost_model_->estimateCardinality(output); - - second_table_info->join_attribute_pairs.insert(first_table_info->join_attribute_pairs.begin(), - first_table_info->join_attribute_pairs.end()); - second_table_info->joined_attribute_set.insert(first_table_info->joined_attribute_set.begin(), - first_table_info->joined_attribute_set.end()); - second_table_info->joined_attribute_set.insert(new_joined_attribute_set.begin(), - new_joined_attribute_set.end()); - table_info_ordered_by_priority.emplace(second_table_info); - - join_graph[second_table_info->table_info_id].insert(join_graph[first_table_info_id].begin(), - join_graph[first_table_info_id].end()); - + selected_probe_table_info->estimated_cardinality = cost_model_->estimateCardinality(output); + selected_probe_table_info->estimated_selectivity = cost_model_->estimateSelectivity(output); + + selected_probe_table_info->estimated_num_output_attributes = + CountSharedAttributes(join_group.referenced_attributes, + output->getOutputAttributes()); + selected_probe_table_info->is_aggregation = false; + + remaining_tables.emplace(selected_probe_table_info); + + // Update join attribute groups. + for (auto &attr_group_pair : join_attribute_groups) { + auto &attr_group = attr_group_pair.second; + auto build_it = attr_group.find(build_table_id); + if (build_it != attr_group.end()) { + const E::ExprId attr_id = build_it->second; + attr_group.erase(build_it); + attr_group.emplace(probe_table_id, attr_id); + } + } } else { - return P::HashJoin::Create(left_child, - right_child, - left_join_attributes, - right_join_attributes, + return P::HashJoin::Create(probe_child, + build_child, + probe_attributes, + build_attributes, residual_predicate, project_expressions, P::HashJoin::JoinType::kInnerJoin); @@ -305,5 +363,18 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan( } } +std::size_t StarSchemaHashJoinOrderOptimization::CountSharedAttributes( + const std::unordered_set<expressions::ExprId> &attr_set1, + const std::vector<expressions::AttributeReferencePtr> &attr_set2) { + std::size_t cnt = 0; + for (const auto &attr : attr_set2) { + if (attr_set1.find(attr->id()) != attr_set1.end()) { + ++cnt; + } + } + return cnt; +} + + } // namespace optimizer } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp index deddffd..33d95a5 100644 --- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp +++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp @@ -62,6 +62,7 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> { * @brief A group of tables to form a hash join tree. */ struct JoinGroupInfo { + std::unordered_set<expressions::ExprId> referenced_attributes; std::vector<physical::PhysicalPtr> tables; std::vector<std::pair<expressions::ExprId, expressions::ExprId>> join_attribute_pairs; }; @@ -70,49 +71,84 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> { * @brief Auxiliary information of a table for the optimizer. */ struct TableInfo { - TableInfo(const std::size_t in_table_info_id, - const physical::PhysicalPtr &in_table, - const std::size_t in_estimated_cardinality, - const double in_estimated_selectivity) - : table_info_id(in_table_info_id), - table(in_table), - estimated_cardinality(in_estimated_cardinality), - estimated_selectivity(in_estimated_selectivity) { + TableInfo(const std::size_t table_info_id_in, + const physical::PhysicalPtr &table_in, + const std::size_t estimated_cardinality_in, + const double estimated_selectivity_in, + const std::size_t estimated_num_output_attributes_in, + const bool is_aggregation_in) + : table_info_id(table_info_id_in), + table(table_in), + estimated_cardinality(estimated_cardinality_in), + estimated_selectivity(estimated_selectivity_in), + estimated_num_output_attributes(estimated_num_output_attributes_in), + is_aggregation(is_aggregation_in) { } const std::size_t table_info_id; physical::PhysicalPtr table; std::size_t estimated_cardinality; double estimated_selectivity; - std::unordered_multimap<expressions::ExprId, expressions::ExprId> join_attribute_pairs; - std::unordered_set<expressions::ExprId> joined_attribute_set; + std::size_t estimated_num_output_attributes; + bool is_aggregation; }; - /** - * @brief Comparator that compares the join priorities between two tables. - */ - struct TableInfoPtrLessComparator { - inline bool operator() (const TableInfo *lhs, const TableInfo *rhs) { - bool swapped = false; - if (lhs->estimated_cardinality > rhs->estimated_cardinality) { - std::swap(lhs, rhs); - swapped = true; + struct JoinPair { + JoinPair(TableInfo *probe_in, TableInfo *build_in) + : probe(probe_in), build(build_in) { + } + + inline bool isBetterThan(const JoinPair &rhs) const { + const auto &lhs = *this; + const bool lhs_has_large_output = + lhs.build->estimated_num_output_attributes + + lhs.probe->estimated_num_output_attributes > 5; + const bool rhs_has_large_output = + rhs.build->estimated_num_output_attributes + + rhs.probe->estimated_num_output_attributes > 5; + if (lhs_has_large_output || rhs_has_large_output) { + if (lhs_has_large_output != rhs_has_large_output) { + return rhs_has_large_output; + } + double lhs_selectivity = + lhs.build->estimated_selectivity * lhs.probe->estimated_selectivity; + double rhs_selectivity = + rhs.build->estimated_selectivity * rhs.probe->estimated_selectivity; + if (lhs_selectivity != rhs_selectivity) { + return lhs_selectivity < rhs_selectivity; + } } - if (lhs->estimated_selectivity < rhs->estimated_selectivity) { - return !swapped; - } else if (lhs->estimated_cardinality < 1000u && - rhs->estimated_cardinality > 10000u && - lhs->estimated_selectivity < rhs->estimated_selectivity * 1.5) { - return !swapped; - } else if (lhs->estimated_selectivity > rhs->estimated_selectivity) { - return swapped; - } else if (lhs->estimated_cardinality != rhs->estimated_cardinality) { - return !swapped; + const bool lhs_has_small_build = + !lhs_has_large_output && lhs.build->estimated_cardinality < 0x100; + const bool rhs_has_small_build = + !rhs_has_large_output && rhs.build->estimated_cardinality < 0x100; + if (lhs_has_small_build != rhs_has_small_build) { + return lhs_has_small_build; + } + + if (lhs.probe->is_aggregation != rhs.probe->is_aggregation) { + return lhs.probe->is_aggregation; + } + + if (lhs.probe->estimated_cardinality != rhs.probe->estimated_cardinality) { + return lhs.probe->estimated_cardinality < rhs.probe->estimated_cardinality; + } + if (lhs.build->estimated_selectivity != rhs.build->estimated_selectivity) { + return lhs.build->estimated_selectivity < rhs.build->estimated_selectivity; + } + if (lhs.build->estimated_cardinality != rhs.build->estimated_cardinality) { + return lhs.build->estimated_cardinality < rhs.build->estimated_cardinality; + } + if (lhs.probe->table != rhs.probe->table) { + return lhs.probe->table < rhs.probe->table; } else { - return swapped ^ (lhs->table < rhs->table); + return lhs.build->table < rhs.build->table; } } + + TableInfo *probe; + TableInfo *build; }; physical::PhysicalPtr applyInternal(const physical::PhysicalPtr &input, @@ -123,6 +159,10 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> { const expressions::PredicatePtr &residual_predicate, const std::vector<expressions::NamedExpressionPtr> &project_expressions); + static std::size_t CountSharedAttributes( + const std::unordered_set<expressions::ExprId> &attr_set1, + const std::vector<expressions::AttributeReferencePtr> &attr_set2); + std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_; DISALLOW_COPY_AND_ASSIGN(StarSchemaHashJoinOrderOptimization); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/query_optimizer/tests/ExecutionHeuristics_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/ExecutionHeuristics_unittest.cpp b/query_optimizer/tests/ExecutionHeuristics_unittest.cpp index 815c13e..ac0adea 100644 --- a/query_optimizer/tests/ExecutionHeuristics_unittest.cpp +++ b/query_optimizer/tests/ExecutionHeuristics_unittest.cpp @@ -70,7 +70,8 @@ class ExecutionHeuristicsTest : public ::testing::Test { probe_relation, std::move(build_attribute_ids), std::move(probe_attribute_ids), - join_hash_table_id); + join_hash_table_id, + build_relation->estimateTupleCardinality()); } QueryPlan::DAGNodeIndex createDummyBuildHashOperator(QueryPlan *query_plan, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/relational_operators/HashJoinOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp index 667df1e..16c0d82 100644 --- a/relational_operators/HashJoinOperator.cpp +++ b/relational_operators/HashJoinOperator.cpp @@ -59,6 +59,11 @@ using std::vector; namespace quickstep { +DEFINE_int64(bloom_adapter_batch_size, 64, + "Number of tuples to probe in bulk in Bloom filter adapter."); +DEFINE_bool(adapt_bloom_filters, true, + "Whether to adaptively adjust the ordering of bloom filters."); + namespace { // Functor passed to HashTable::getAllFromValueAccessor() to collect matching @@ -75,6 +80,11 @@ class MapBasedJoinedTupleCollector { joined_tuples_[tref.block].emplace_back(tref.tuple, accessor.getCurrentPosition()); } + inline void operator()(const tuple_id probe_tid, + const TupleReference &build_tref) { + joined_tuples_[build_tref.block].emplace_back(build_tref.tuple, probe_tid); + } + // Get a mutable pointer to the collected map of joined tuple ID pairs. The // key is inner block_id, values are vectors of joined tuple ID pairs with // tuple ID from the inner block on the left and the outer block on the http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/relational_operators/HashJoinOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp index 235bfe4..cf680f6 100644 --- a/relational_operators/HashJoinOperator.hpp +++ b/relational_operators/HashJoinOperator.hpp @@ -307,8 +307,9 @@ class HashInnerJoinWorkOrder : public WorkOrder { const std::vector<std::unique_ptr<const Scalar>> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) - : WorkOrder(query_id), + StorageManager *storage_manager, + const int op_index = -1) + : WorkOrder(query_id, op_index), build_relation_(build_relation), probe_relation_(probe_relation), join_key_attributes_(join_key_attributes), @@ -354,8 +355,9 @@ class HashInnerJoinWorkOrder : public WorkOrder { const std::vector<std::unique_ptr<const Scalar>> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) - : WorkOrder(query_id), + StorageManager *storage_manager, + const int op_index = -1) + : WorkOrder(query_id, op_index), build_relation_(build_relation), probe_relation_(probe_relation), join_key_attributes_(std::move(join_key_attributes)), @@ -435,8 +437,9 @@ class HashSemiJoinWorkOrder : public WorkOrder { const std::vector<std::unique_ptr<const Scalar>> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) - : WorkOrder(query_id), + StorageManager *storage_manager, + const int op_index = -1) + : WorkOrder(query_id, op_index), build_relation_(build_relation), probe_relation_(probe_relation), join_key_attributes_(join_key_attributes), @@ -482,8 +485,9 @@ class HashSemiJoinWorkOrder : public WorkOrder { const std::vector<std::unique_ptr<const Scalar>> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) - : WorkOrder(query_id), + StorageManager *storage_manager, + const int op_index = -1) + : WorkOrder(query_id, op_index), build_relation_(build_relation), probe_relation_(probe_relation), join_key_attributes_(std::move(join_key_attributes)), @@ -559,8 +563,9 @@ class HashAntiJoinWorkOrder : public WorkOrder { const std::vector<std::unique_ptr<const Scalar>> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) - : WorkOrder(query_id), + StorageManager *storage_manager, + const int op_index = -1) + : WorkOrder(query_id, op_index), build_relation_(build_relation), probe_relation_(probe_relation), join_key_attributes_(join_key_attributes), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/relational_operators/WorkOrder.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp index df195cc..4eb6b3a 100644 --- a/relational_operators/WorkOrder.hpp +++ b/relational_operators/WorkOrder.hpp @@ -299,16 +299,23 @@ class WorkOrder { return query_id_; } + inline const int getOperatorIndex() const { + return op_index_; + } + protected: /** * @brief Constructor. * * @param query_id The ID of the query to which this WorkOrder belongs. **/ - explicit WorkOrder(const std::size_t query_id) - : query_id_(query_id) {} + explicit WorkOrder(const std::size_t query_id, + const int op_index = -1) + : query_id_(query_id), + op_index_(op_index) {} const std::size_t query_id_; + const int op_index_; // A vector of preferred NUMA node IDs where this workorder should be executed. // These node IDs typically indicate the NUMA node IDs of the input(s) of the // workorder. Derived classes should ensure that there are no duplicate entries http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 4878cf1..668164c 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -46,10 +46,13 @@ #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" +#include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorUtil.hpp" #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" #include "types/containers/Tuple.hpp" +#include "utility/BloomFilterAdapter.hpp" #include "glog/logging.h" @@ -57,6 +60,8 @@ using std::unique_ptr; namespace quickstep { +DECLARE_int64(bloom_adapter_batch_size); + AggregationOperationState::AggregationOperationState( const CatalogRelationSchema &input_relation, const std::vector<const AggregateFunction*> &aggregate_functions, @@ -64,12 +69,16 @@ AggregationOperationState::AggregationOperationState( std::vector<bool> &&is_distinct, std::vector<std::unique_ptr<const Scalar>> &&group_by, const Predicate *predicate, + std::vector<const BloomFilter *> &&bloom_filters, + std::vector<attribute_id> &&bloom_filter_attribute_ids, const std::size_t estimated_num_entries, const HashTableImplType hash_table_impl_type, const std::vector<HashTableImplType> &distinctify_hash_table_impl_types, StorageManager *storage_manager) : input_relation_(input_relation), predicate_(predicate), + bloom_filters_(std::move(bloom_filters)), + bloom_filter_attribute_ids_(std::move(bloom_filter_attribute_ids)), group_by_list_(std::move(group_by)), arguments_(std::move(arguments)), is_distinct_(std::move(is_distinct)), @@ -183,7 +192,8 @@ AggregationOperationState::AggregationOperationState( AggregationOperationState* AggregationOperationState::ReconstructFromProto( const serialization::AggregationOperationState &proto, const CatalogDatabaseLite &database, - StorageManager *storage_manager) { + StorageManager *storage_manager, + const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) { DCHECK(ProtoIsValid(proto, database)); // Rebuild contructor arguments from their representation in 'proto'. @@ -232,12 +242,24 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto( database)); } + std::vector<const BloomFilter*> bloom_filter_vector; + std::vector<attribute_id> bloom_filter_attribute_ids; + for (int i = 0; i < proto.bloom_filters_size(); ++i) { + // Add the pointer to the probe bloom filter within the list of probe bloom filters to use. + const auto bloom_filter_proto = proto.bloom_filters(i); + bloom_filter_vector.emplace_back( + bloom_filters[bloom_filter_proto.bloom_filter_id()].get()); + bloom_filter_attribute_ids.emplace_back(bloom_filter_proto.attr_id()); + } + return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()), aggregate_functions, std::move(arguments), std::move(is_distinct), std::move(group_by_expressions), predicate.release(), + std::move(bloom_filter_vector), + std::move(bloom_filter_attribute_ids), proto.estimated_num_entries(), HashTableImplTypeFromProto(proto.hash_table_impl_type()), distinctify_hash_table_impl_types, @@ -340,6 +362,10 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b // tuples so that it can be reused across multiple aggregates (i.e. we only // pay the cost of evaluating the predicate once). std::unique_ptr<TupleIdSequence> reuse_matches; + if (predicate_) { + reuse_matches.reset(block->getMatchesForPredicate(predicate_.get())); + } + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { @@ -358,7 +384,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b arguments_[agg_idx], local_arguments_as_attributes, {}, /* group_by */ - predicate_.get(), distinctify_hashtables_[agg_idx].get(), &reuse_matches, nullptr /* reuse_group_by_vectors */); @@ -369,7 +394,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b block->aggregate(*handles_[agg_idx], arguments_[agg_idx], local_arguments_as_attributes, - predicate_.get(), &reuse_matches)); } } @@ -391,6 +415,72 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo // GROUP BY expressions once). std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors; + if (predicate_) { + reuse_matches.reset(block->getMatchesForPredicate(predicate_.get())); + } + + if (bloom_filters_.size() > 0) { + const std::size_t num_tuples = block->getNumTuples(); +// std::cerr << "Before: " << num_tuples << " -- " +// << (reuse_matches ? reuse_matches->numTuples() : num_tuples) +// << "\n"; + std::unique_ptr<ValueAccessor> accessor; + if (reuse_matches) { + accessor.reset( + block->getTupleStorageSubBlock().createValueAccessor(reuse_matches.get())); + } else { + accessor.reset( + block->getTupleStorageSubBlock().createValueAccessor()); + } + InvokeOnAnyValueAccessor( + accessor.get(), + [&](auto *accessor) -> void { // NOLINT(build/c++11) + std::unique_ptr<TupleIdSequence> filtered(new TupleIdSequence(num_tuples)); + + std::vector<std::size_t> attr_size_vector; + attr_size_vector.reserve(bloom_filter_attribute_ids_.size()); + for (const auto &attr : bloom_filter_attribute_ids_) { + auto val_and_size = + accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, attr); + attr_size_vector.emplace_back(val_and_size.second); + } + + std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter; + bloom_filter_adapter.reset(new BloomFilterAdapter( + bloom_filters_, bloom_filter_attribute_ids_, attr_size_vector)); + + std::uint32_t batch_size_try = FLAGS_bloom_adapter_batch_size; + std::uint32_t num_tuples_left = accessor->getNumTuples(); + std::vector<tuple_id> batch(num_tuples_left); + + do { + std::uint32_t batch_size = + batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left; + for (std::size_t i = 0; i < batch_size; ++i) { + accessor->next(); + batch.push_back(accessor->getCurrentPosition()); + } + + std::size_t num_hits = + bloom_filter_adapter->bulkProbe<true>(accessor, batch, batch_size); + for (std::size_t t = 0; t < num_hits; ++t){ + filtered->set(batch[t], true); + } + + batch.clear(); + num_tuples_left -= batch_size; + batch_size_try = batch_size * 2; + } while (num_tuples_left > 0); + + if (reuse_matches) { + reuse_matches->intersectWith(*filtered); + } else { + reuse_matches.reset(filtered.release()); + } + }); +// std::cerr << "After: " << reuse_matches->numTuples() << "\n"; + } + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { @@ -402,7 +492,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo arguments_[agg_idx], nullptr, /* arguments_as_attributes */ group_by_list_, - predicate_.get(), distinctify_hashtables_[agg_idx].get(), &reuse_matches, &reuse_group_by_vectors); @@ -416,7 +505,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo block->aggregateGroupBy(*handles_[agg_idx], arguments_[agg_idx], group_by_list_, - predicate_.get(), agg_hash_table, &reuse_matches, &reuse_group_by_vectors); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 0199749..5db7325 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -33,6 +33,7 @@ #include "storage/HashTableBase.hpp" #include "storage/HashTablePool.hpp" #include "storage/StorageBlockInfo.hpp" +#include "utility/BloomFilter.hpp" #include "utility/Macros.hpp" namespace quickstep { @@ -108,6 +109,8 @@ class AggregationOperationState { std::vector<bool> &&is_distinct, std::vector<std::unique_ptr<const Scalar>> &&group_by, const Predicate *predicate, + std::vector<const BloomFilter *> &&bloom_filters, + std::vector<attribute_id> &&bloom_filter_attribute_ids, const std::size_t estimated_num_entries, const HashTableImplType hash_table_impl_type, const std::vector<HashTableImplType> &distinctify_hash_table_impl_types, @@ -131,7 +134,8 @@ class AggregationOperationState { static AggregationOperationState* ReconstructFromProto( const serialization::AggregationOperationState &proto, const CatalogDatabaseLite &database, - StorageManager *storage_manager); + StorageManager *storage_manager, + const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters); /** * @brief Check whether a serialization::AggregationOperationState is @@ -181,6 +185,10 @@ class AggregationOperationState { // filter predicate (if any), and the list of GROUP BY expressions (if any). const CatalogRelationSchema &input_relation_; std::unique_ptr<const Predicate> predicate_; + + std::vector<const BloomFilter*> bloom_filters_; + std::vector<attribute_id> bloom_filter_attribute_ids_; + std::vector<std::unique_ptr<const Scalar>> group_by_list_; // Each individual aggregate in this operation has an AggregationHandle and http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/AggregationOperationState.proto ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto index bf78e3a..165148e 100644 --- a/storage/AggregationOperationState.proto +++ b/storage/AggregationOperationState.proto @@ -42,4 +42,10 @@ message AggregationOperationState { // Each DISTINCT aggregation has its distinctify hash table impl type. repeated HashTableImplType distinctify_hash_table_impl_types = 7; + + message BloomFilter { + required uint32 bloom_filter_id = 1; + required uint32 attr_id = 2; + } + repeated BloomFilter bloom_filters = 8; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/BasicColumnStoreValueAccessor.hpp ---------------------------------------------------------------------- diff --git a/storage/BasicColumnStoreValueAccessor.hpp b/storage/BasicColumnStoreValueAccessor.hpp index 759e187..7907fd5 100644 --- a/storage/BasicColumnStoreValueAccessor.hpp +++ b/storage/BasicColumnStoreValueAccessor.hpp @@ -18,6 +18,8 @@ #ifndef QUICKSTEP_STORAGE_BASIC_COLUMN_STORE_VALUE_ACCESSOR_HPP_ #define QUICKSTEP_STORAGE_BASIC_COLUMN_STORE_VALUE_ACCESSOR_HPP_ +#include <cstddef> +#include <utility> #include <vector> #include "catalog/CatalogRelationSchema.hpp" @@ -43,7 +45,8 @@ class BasicColumnStoreValueAccessorHelper { : relation_(relation), num_tuples_(num_tuples), column_stripes_(column_stripes), - column_null_bitmaps_(column_null_bitmaps) { + column_null_bitmaps_(column_null_bitmaps), + attr_max_lengths_(relation.getMaximumAttributeByteLengths()) { } inline tuple_id numPackedTuples() const { @@ -61,9 +64,23 @@ class BasicColumnStoreValueAccessorHelper { return nullptr; } - // TODO(chasseur): Consider cacheing the byte lengths of attributes. - return static_cast<const char*>(column_stripes_[attr]) - + (tuple * relation_.getAttributeById(attr)->getType().maximumByteLength()); + return static_cast<const char*>(column_stripes_[attr]) + (tuple * attr_max_lengths_[attr]); + } + + template <bool check_null> + inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple, + const attribute_id attr) const { + DEBUG_ASSERT(tuple < num_tuples_); + DEBUG_ASSERT(relation_.hasAttributeWithId(attr)); + if (check_null + && (!column_null_bitmaps_.elementIsNull(attr)) + && column_null_bitmaps_[attr].getBit(tuple)) { + return std::make_pair(nullptr, 0); + } + + const std::size_t attr_length = attr_max_lengths_[attr]; + return std::make_pair(static_cast<const char*>(column_stripes_[attr]) + (tuple * attr_length), + attr_length); } inline TypedValue getAttributeValueTyped(const tuple_id tuple, @@ -80,6 +97,7 @@ class BasicColumnStoreValueAccessorHelper { const tuple_id num_tuples_; const std::vector<void*> &column_stripes_; const PtrVector<BitVector<false>, true> &column_null_bitmaps_; + const std::vector<std::size_t> &attr_max_lengths_; DISALLOW_COPY_AND_ASSIGN(BasicColumnStoreValueAccessorHelper); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/BloomFilterIndexSubBlock.cpp ---------------------------------------------------------------------- diff --git a/storage/BloomFilterIndexSubBlock.cpp b/storage/BloomFilterIndexSubBlock.cpp index e806217..a40f69f 100644 --- a/storage/BloomFilterIndexSubBlock.cpp +++ b/storage/BloomFilterIndexSubBlock.cpp @@ -55,7 +55,6 @@ BloomFilterIndexSubBlock::BloomFilterIndexSubBlock(const TupleStorageSubBlock &t sub_block_memory_size), is_initialized_(false), is_consistent_(false), - random_seed_(kBloomFilterSeed), bit_array_size_in_bytes_(description.GetExtension( BloomFilterIndexSubBlockDescription::bloom_filter_size)) { CHECK(DescriptionIsValid(relation_, description_)) @@ -74,8 +73,7 @@ BloomFilterIndexSubBlock::BloomFilterIndexSubBlock(const TupleStorageSubBlock &t const std::uint32_t salt_count = description.GetExtension(BloomFilterIndexSubBlockDescription::number_of_hashes); // Initialize the bloom_filter_ data structure to operate on bit_array. - bloom_filter_.reset(new BloomFilter(random_seed_, - salt_count, + bloom_filter_.reset(new BloomFilter(salt_count, bit_array_size_in_bytes_, bit_array_.get(), is_bloom_filter_initialized)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/BloomFilterIndexSubBlock.hpp ---------------------------------------------------------------------- diff --git a/storage/BloomFilterIndexSubBlock.hpp b/storage/BloomFilterIndexSubBlock.hpp index 4925673..8c81156 100644 --- a/storage/BloomFilterIndexSubBlock.hpp +++ b/storage/BloomFilterIndexSubBlock.hpp @@ -65,11 +65,6 @@ class BloomFilterIndexSubBlock : public IndexSubBlock { kSelectivityNone }; - /** - * @brief A random seed to initialize the bloom filter hash functions. - **/ - static const std::uint64_t kBloomFilterSeed = 0xA5A5A5A55A5A5A5AULL; - BloomFilterIndexSubBlock(const TupleStorageSubBlock &tuple_store, const IndexSubBlockDescription &description, const bool new_block, @@ -179,7 +174,6 @@ class BloomFilterIndexSubBlock : public IndexSubBlock { private: bool is_initialized_; bool is_consistent_; - const std::uint64_t random_seed_; const std::uint64_t bit_array_size_in_bytes_; std::vector<attribute_id> indexed_attribute_ids_; std::unique_ptr<unsigned char> bit_array_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index 582effd..777a888 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -677,6 +677,8 @@ target_link_libraries(quickstep_storage_HashTable quickstep_types_Type quickstep_types_TypedValue quickstep_utility_BloomFilter + quickstep_utility_BloomFilterAdapter + quickstep_utility_EventProfiler quickstep_utility_HashPair quickstep_utility_Macros) target_link_libraries(quickstep_storage_HashTableBase http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/CompressedColumnStoreValueAccessor.hpp ---------------------------------------------------------------------- diff --git a/storage/CompressedColumnStoreValueAccessor.hpp b/storage/CompressedColumnStoreValueAccessor.hpp index 64eb315..984dea3 100644 --- a/storage/CompressedColumnStoreValueAccessor.hpp +++ b/storage/CompressedColumnStoreValueAccessor.hpp @@ -52,6 +52,7 @@ class CompressedColumnStoreValueAccessorHelper { const PtrVector<BitVector<false>, true> &uncompressed_column_null_bitmaps) : relation_(relation), num_tuples_(num_tuples), + attr_max_lengths_(relation.getMaximumAttributeByteLengths()), compression_info_(compression_info), dictionary_coded_attributes_(dictionary_coded_attributes), truncated_attributes_(truncated_attributes), @@ -84,6 +85,26 @@ class CompressedColumnStoreValueAccessorHelper { } } + template <bool check_null> + inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple, + const attribute_id attr) const { + if (dictionary_coded_attributes_[attr]) { + return dictionaries_.atUnchecked(attr).getUntypedValueAndByteLengthForCode<check_null>( + getCode(tuple, attr)); + } else if (truncated_attributes_[attr]) { + if (truncated_attribute_is_int_[attr]) { + int_buffer_ = getCode(tuple, attr); + return std::make_pair(&int_buffer_, sizeof(int_buffer_)); + } else { + long_buffer_ = getCode(tuple, attr); + return std::make_pair(&long_buffer_, sizeof(long_buffer_)); + } + } else { + return std::make_pair(getAttributePtr<check_null>(tuple, attr), + attr_max_lengths_[attr]); + } + } + inline TypedValue getAttributeValueTyped(const tuple_id tuple, const attribute_id attr) const { if (dictionary_coded_attributes_[attr]) { @@ -138,6 +159,7 @@ class CompressedColumnStoreValueAccessorHelper { const CatalogRelationSchema &relation_; const tuple_id num_tuples_; + const std::vector<std::size_t> &attr_max_lengths_; const CompressedBlockInfo &compression_info_; const std::vector<bool> &dictionary_coded_attributes_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/CompressedPackedRowStoreValueAccessor.hpp ---------------------------------------------------------------------- diff --git a/storage/CompressedPackedRowStoreValueAccessor.hpp b/storage/CompressedPackedRowStoreValueAccessor.hpp index 024b0ec..7058aec 100644 --- a/storage/CompressedPackedRowStoreValueAccessor.hpp +++ b/storage/CompressedPackedRowStoreValueAccessor.hpp @@ -58,6 +58,7 @@ class CompressedPackedRowStoreValueAccessorHelper { num_tuples_(num_tuples), tuple_length_bytes_(tuple_length_bytes), attribute_offsets_(attribute_offsets), + attr_max_lengths_(relation.getMaximumAttributeByteLengths()), compression_info_(compression_info), dictionary_coded_attributes_(dictionary_coded_attributes), truncated_attributes_(truncated_attributes), @@ -92,6 +93,26 @@ class CompressedPackedRowStoreValueAccessorHelper { } } + template <bool check_null> + inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple, + const attribute_id attr) const { + if (dictionary_coded_attributes_[attr]) { + return dictionaries_.atUnchecked(attr).getUntypedValueAndByteLengthForCode<check_null>( + getCode(tuple, attr)); + } else if (truncated_attributes_[attr]) { + if (truncated_attribute_is_int_[attr]) { + int_buffer_ = getCode(tuple, attr); + return std::make_pair(&int_buffer_, sizeof(int_buffer_)); + } else { + long_buffer_ = getCode(tuple, attr); + return std::make_pair(&long_buffer_, sizeof(long_buffer_)); + } + } else { + return std::make_pair(getAttributePtr<check_null>(tuple, attr), + attr_max_lengths_[attr]); + } + } + inline TypedValue getAttributeValueTyped(const tuple_id tuple, const attribute_id attr) const { if (dictionary_coded_attributes_[attr]) { @@ -150,6 +171,7 @@ class CompressedPackedRowStoreValueAccessorHelper { const tuple_id num_tuples_; const std::size_t tuple_length_bytes_; const std::vector<std::size_t> &attribute_offsets_; + const std::vector<std::size_t> &attr_max_lengths_; const CompressedBlockInfo &compression_info_; const std::vector<bool> &dictionary_coded_attributes_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/HashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp index be31fd9..6e3dc96 100644 --- a/storage/HashTable.hpp +++ b/storage/HashTable.hpp @@ -23,6 +23,7 @@ #include <atomic> #include <cstddef> #include <cstdlib> +#include <memory> #include <type_traits> #include <vector> @@ -39,11 +40,14 @@ #include "types/Type.hpp" #include "types/TypedValue.hpp" #include "utility/BloomFilter.hpp" +#include "utility/BloomFilterAdapter.hpp" #include "utility/HashPair.hpp" #include "utility/Macros.hpp" namespace quickstep { +DECLARE_int64(bloom_adapter_batch_size); + /** \addtogroup Storage * @{ */ @@ -1016,8 +1020,12 @@ class HashTable : public HashTableBase<resizable, * * @param bloom_filter The pointer to the bloom filter. **/ - inline void setBuildSideBloomFilter(BloomFilter *bloom_filter) { - build_bloom_filter_ = bloom_filter; + inline void addBuildSideBloomFilter(BloomFilter *bloom_filter) { + build_bloom_filters_.emplace_back(bloom_filter); + } + + inline void addBuildSideAttributeId(const attribute_id build_attribute_id) { + build_attribute_ids_.push_back(build_attribute_id); } /** @@ -1042,8 +1050,8 @@ class HashTable : public HashTableBase<resizable, * @param probe_attribute_ids The vector of attribute ids to use for probing * the bloom filter. **/ - inline void addProbeSideAttributeIds(std::vector<attribute_id> &&probe_attribute_ids) { - probe_attribute_ids_.push_back(probe_attribute_ids); + inline void addProbeSideAttributeId(const attribute_id probe_attribute_id) { + probe_attribute_ids_.push_back(probe_attribute_id); } protected: @@ -1329,9 +1337,10 @@ class HashTable : public HashTableBase<resizable, // Data structures used for bloom filter optimized semi-joins. bool has_build_side_bloom_filter_ = false; bool has_probe_side_bloom_filter_ = false; - BloomFilter *build_bloom_filter_; + std::vector<BloomFilter *> build_bloom_filters_; + std::vector<attribute_id> build_attribute_ids_; std::vector<const BloomFilter*> probe_bloom_filters_; - std::vector<std::vector<attribute_id>> probe_attribute_ids_; + std::vector<attribute_id> probe_attribute_ids_; DISALLOW_COPY_AND_ASSIGN(HashTable); }; @@ -1477,12 +1486,26 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al &prealloc_state); } } - std::unique_ptr<BloomFilter> thread_local_bloom_filter; + if (has_build_side_bloom_filter_) { - thread_local_bloom_filter.reset(new BloomFilter(build_bloom_filter_->getRandomSeed(), - build_bloom_filter_->getNumberOfHashes(), - build_bloom_filter_->getBitArraySize())); + for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) { + auto *build_bloom_filter = build_bloom_filters_[i]; + std::unique_ptr<BloomFilter> thread_local_bloom_filter( + new BloomFilter(build_bloom_filter->getNumberOfHashes(), + build_bloom_filter->getBitArraySize())); + const auto &build_attr = build_attribute_ids_[i]; + const std::size_t attr_size = + accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, build_attr).second; + while (accessor->next()) { + thread_local_bloom_filter->insertUnSafe( + static_cast<const std::uint8_t *>(accessor->getUntypedValue(build_attr)), + attr_size); + } + build_bloom_filter->bitwiseOr(thread_local_bloom_filter.get()); + accessor->beginIteration(); + } } + if (resizable) { while (result == HashTablePutResult::kOutOfSpace) { { @@ -1498,11 +1521,6 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al variable_size, (*functor)(*accessor), using_prealloc ? &prealloc_state : nullptr); - // Insert into bloom filter, if enabled. - if (has_build_side_bloom_filter_) { - thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()), - key.getDataSize()); - } if (result == HashTablePutResult::kDuplicateKey) { DEBUG_ASSERT(!using_prealloc); return result; @@ -1528,20 +1546,11 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al variable_size, (*functor)(*accessor), using_prealloc ? &prealloc_state : nullptr); - // Insert into bloom filter, if enabled. - if (has_build_side_bloom_filter_) { - thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()), - key.getDataSize()); - } if (result != HashTablePutResult::kOK) { return result; } } } - // Update the build side bloom filter with thread local copy, if available. - if (has_build_side_bloom_filter_) { - build_bloom_filter_->bitwiseOr(thread_local_bloom_filter.get()); - } return HashTablePutResult::kOK; }); @@ -1607,6 +1616,26 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al &prealloc_state); } } + + if (has_build_side_bloom_filter_) { + for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) { + auto *build_bloom_filter = build_bloom_filters_[i]; + std::unique_ptr<BloomFilter> thread_local_bloom_filter( + new BloomFilter(build_bloom_filter->getNumberOfHashes(), + build_bloom_filter->getBitArraySize())); + const auto &build_attr = build_attribute_ids_[i]; + const std::size_t attr_size = + accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, build_attr).second; + while (accessor->next()) { + thread_local_bloom_filter->insertUnSafe( + static_cast<const std::uint8_t *>(accessor->getUntypedValue(build_attr)), + attr_size); + } + build_bloom_filter->bitwiseOr(thread_local_bloom_filter.get()); + accessor->beginIteration(); + } + } + if (resizable) { while (result == HashTablePutResult::kOutOfSpace) { { @@ -2229,6 +2258,7 @@ inline std::size_t HashTable<ValueT, resizable, serializable, force_key_copy, al } } + template <typename ValueT, bool resizable, bool serializable, @@ -2246,42 +2276,85 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_ InvokeOnAnyValueAccessor( accessor, [&](auto *accessor) -> void { // NOLINT(build/c++11) - while (accessor->next()) { - // Probe any bloom filters, if enabled. - if (has_probe_side_bloom_filter_) { - DCHECK_EQ(probe_bloom_filters_.size(), probe_attribute_ids_.size()); - // Check if the key is contained in the BloomFilters or not. - bool bloom_miss = false; - for (std::size_t i = 0; i < probe_bloom_filters_.size() && !bloom_miss; ++i) { - const BloomFilter *bloom_filter = probe_bloom_filters_[i]; - for (const attribute_id &attr_id : probe_attribute_ids_[i]) { - TypedValue bloom_key = accessor->getTypedValue(attr_id); - if (!bloom_filter->contains(static_cast<const std::uint8_t*>(bloom_key.getDataPtr()), - bloom_key.getDataSize())) { - bloom_miss = true; + std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter; + if (has_probe_side_bloom_filter_) { + // Find (and cache) the size of each attribute in the probe lists. + // NOTE(nav): This code uses the accessor to get the size, + // and hence only works if there's at least one tuple. + std::vector<std::size_t> attr_size_vector; + attr_size_vector.reserve(probe_attribute_ids_.size()); + for (const auto &probe_attr : probe_attribute_ids_) { + auto val_and_size = + accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, probe_attr); +// std::cerr << "BF attr size = " << val_and_size.second << "\n"; + attr_size_vector.emplace_back(val_and_size.second); + } + + bloom_filter_adapter.reset(new BloomFilterAdapter( + probe_bloom_filters_, probe_attribute_ids_, attr_size_vector)); + + // We want to have large batch sizes for cache efficiency while probeing, + // but small batch sizes to ensure that the adaptation logic kicks in + // (and does early). We use exponentially increasing batch sizes to + // achieve a balance between the two. + // + // We also keep track of num_tuples_left in the block, to ensure that + // we don't reserve an unnecessarily large vector. + std::uint32_t batch_size_try = FLAGS_bloom_adapter_batch_size; + std::uint32_t num_tuples_left = accessor->getNumTuples(); + std::vector<tuple_id> batch(num_tuples_left); + + do { + std::uint32_t batch_size = + batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left; + for (std::size_t i = 0; i < batch_size; ++i) { + accessor->next(); + batch.push_back(accessor->getCurrentPosition()); + } + + std::size_t num_hits = + bloom_filter_adapter->bulkProbe<true>(accessor, batch, batch_size); + + for (std::size_t t = 0; t < num_hits; ++t){ + tuple_id probe_tid = batch[t]; + TypedValue key = accessor->getTypedValueAtAbsolutePosition(key_attr_id, probe_tid); + if (check_for_null_keys && key.isNull()) { + continue; + } + const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral() + : key.getHash(); + const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash) + : true_hash; + std::size_t entry_num = 0; + const ValueT *value; + while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) { + (*functor)(probe_tid, *value); + if (!allow_duplicate_keys) break; - } } } - if (bloom_miss) { - continue; // On a bloom filter miss, probing the hash table can be skipped. - } - } + batch.clear(); + num_tuples_left -= batch_size; + batch_size_try = batch_size * 2; + } while (!accessor->iterationFinished()); + } - TypedValue key = accessor->getTypedValue(key_attr_id); - if (check_for_null_keys && key.isNull()) { - continue; - } - const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral() - : key.getHash(); - const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash) - : true_hash; - std::size_t entry_num = 0; - const ValueT *value; - while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) { - (*functor)(*accessor, *value); - if (!allow_duplicate_keys) { - break; + else { // no Bloom filters to probe + while(accessor->next()) { + TypedValue key = accessor->getTypedValue(key_attr_id); + if (check_for_null_keys && key.isNull()) { + continue; + } + const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral() + : key.getHash(); + const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash) + : true_hash; + std::size_t entry_num = 0; + const ValueT *value; + while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) { + (*functor)(*accessor, *value); + if (!allow_duplicate_keys) + break; } } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/HashTable.proto ---------------------------------------------------------------------- diff --git a/storage/HashTable.proto b/storage/HashTable.proto index 7f00f29..6eabf60 100644 --- a/storage/HashTable.proto +++ b/storage/HashTable.proto @@ -34,10 +34,10 @@ message HashTable { required HashTableImplType hash_table_impl_type = 1; repeated Type key_types = 2; required uint64 estimated_num_entries = 3; - repeated uint32 build_side_bloom_filter_id = 4; - message ProbeSideBloomFilter { - required uint32 probe_side_bloom_filter_id = 1; - repeated uint32 probe_side_attr_ids = 2; + message BloomFilter { + required uint32 bloom_filter_id = 1; + required uint32 attr_id = 2; } - repeated ProbeSideBloomFilter probe_side_bloom_filters = 6; + repeated BloomFilter probe_side_bloom_filters = 4; + repeated BloomFilter build_side_bloom_filters = 5; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/HashTableFactory.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp index 34baaeb..fbb3d41 100644 --- a/storage/HashTableFactory.hpp +++ b/storage/HashTableFactory.hpp @@ -318,9 +318,15 @@ class HashTableFactory { // individual implementations of the hash table constructors. // Check if there are any build side bloom filter defined on the hash table. - if (proto.build_side_bloom_filter_id_size() > 0) { + if (proto.build_side_bloom_filters_size() > 0) { hash_table->enableBuildSideBloomFilter(); - hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get()); + for (int j = 0; j < proto.build_side_bloom_filters_size(); ++j) { + const auto build_side_bloom_filter = proto.build_side_bloom_filters(j); + hash_table->addBuildSideBloomFilter( + bloom_filters[build_side_bloom_filter.bloom_filter_id()].get()); + + hash_table->addBuildSideAttributeId(build_side_bloom_filter.attr_id()); + } } // Check if there are any probe side bloom filters defined on the hash table. @@ -330,15 +336,10 @@ class HashTableFactory { for (int j = 0; j < proto.probe_side_bloom_filters_size(); ++j) { // Add the pointer to the probe bloom filter within the list of probe bloom filters to use. const auto probe_side_bloom_filter = proto.probe_side_bloom_filters(j); - hash_table->addProbeSideBloomFilter(bloom_filters[probe_side_bloom_filter.probe_side_bloom_filter_id()].get()); - - // Add the attribute ids corresponding to this probe bloom filter. - std::vector<attribute_id> probe_attribute_ids; - for (int k = 0; k < probe_side_bloom_filter.probe_side_attr_ids_size(); ++k) { - const attribute_id probe_attribute_id = probe_side_bloom_filter.probe_side_attr_ids(k); - probe_attribute_ids.push_back(probe_attribute_id); - } - hash_table->addProbeSideAttributeIds(std::move(probe_attribute_ids)); + hash_table->addProbeSideBloomFilter( + bloom_filters[probe_side_bloom_filter.bloom_filter_id()].get()); + + hash_table->addProbeSideAttributeId(probe_side_bloom_filter.attr_id()); } }
