Updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/91e49820 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/91e49820 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/91e49820 Branch: refs/heads/adaptive-bloom-filters Commit: 91e498202d7b19cb5c7d4d8c61218d112c446b71 Parents: 9cc47e5 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Fri Jul 29 17:47:42 2016 -0500 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Fri Jul 29 17:47:42 2016 -0500 ---------------------------------------------------------------------- query_execution/QueryContext.cpp | 11 ++- query_optimizer/ExecutionGenerator.cpp | 17 ++++ query_optimizer/ExecutionHeuristics.cpp | 36 +++++-- query_optimizer/ExecutionHeuristics.hpp | 51 +++++++--- query_optimizer/physical/Aggregate.hpp | 20 +++- query_optimizer/physical/HashJoin.hpp | 50 ---------- query_optimizer/physical/Physical.hpp | 50 ++++++++++ query_optimizer/rules/AttachBloomFilters.cpp | 76 ++++++++++----- .../StarSchemaHashJoinOrderOptimization.cpp | 4 +- .../StarSchemaHashJoinOrderOptimization.hpp | 25 +++-- storage/AggregationOperationState.cpp | 98 +++++++++++++++++++- storage/AggregationOperationState.hpp | 10 +- storage/AggregationOperationState.proto | 6 ++ storage/HashTable.hpp | 27 ++---- storage/HashTable.proto | 6 +- storage/HashTableFactory.hpp | 9 +- storage/StorageBlock.cpp | 28 +----- storage/StorageBlock.hpp | 7 +- utility/PlanVisualizer.cpp | 24 +++++ 19 files changed, 377 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_execution/QueryContext.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp index 7019b6a..fd0ed08 100644 --- a/query_execution/QueryContext.cpp +++ b/query_execution/QueryContext.cpp @@ -61,15 +61,16 @@ QueryContext::QueryContext(const serialization::QueryContext &proto, << "Attempted to create QueryContext from an invalid proto description:\n" << proto.DebugString(); + for (int i = 0; i < proto.bloom_filters_size(); ++i) { + bloom_filters_.emplace_back(new BloomFilter(proto.bloom_filters(i))); + } + for (int i = 0; i < proto.aggregation_states_size(); ++i) { aggregation_states_.emplace_back( AggregationOperationState::ReconstructFromProto(proto.aggregation_states(i), database, - storage_manager)); - } - - for (int i = 0; i < proto.bloom_filters_size(); ++i) { - bloom_filters_.emplace_back(new BloomFilter(proto.bloom_filters(i))); + storage_manager, + bloom_filters_)); } for (int i = 0; i < proto.generator_functions_size(); ++i) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index fe6b6e7..e10f991 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -1344,6 +1344,16 @@ void ExecutionGenerator::convertAggregate( findRelationInfoOutputByPhysical(physical_plan->input()); aggr_state_proto->set_relation_id(input_relation_info->relation->getID()); + const P::BloomFilterConfig &bloom_filter_config = + physical_plan->bloom_filter_config(); + std::vector<attribute_id> bloom_filter_attribute_ids; + + for (const auto &bf : bloom_filter_config.probe_side_bloom_filters) { + const CatalogAttribute *bf_catalog_attribute + = attribute_substitution_map_[bf.attribute->id()]; + bloom_filter_attribute_ids.emplace_back(bf_catalog_attribute->getID()); + } + std::vector<const Type*> group_by_types; for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) { unique_ptr<const Scalar> execution_group_by_expression; @@ -1458,6 +1468,13 @@ void ExecutionGenerator::convertAggregate( std::forward_as_tuple(finalize_aggregation_operator_index, output_relation)); temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index, output_relation); + + if (FLAGS_optimize_joins) { + execution_heuristics_->addAggregateInfo(aggregation_operator_index, + bloom_filter_config, + std::move(bloom_filter_attribute_ids), + aggr_state_index); + } } void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/ExecutionHeuristics.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionHeuristics.cpp b/query_optimizer/ExecutionHeuristics.cpp index b407453..26c4378 100644 --- a/query_optimizer/ExecutionHeuristics.cpp +++ b/query_optimizer/ExecutionHeuristics.cpp @@ -65,10 +65,7 @@ void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan, bloom_filter_config.builder), std::make_pair(bloom_filter_id, info.build_operator_index_)); - auto *build_side_bloom_filter = hash_table_proto->add_build_side_bloom_filters(); - build_side_bloom_filter->set_bloom_filter_id(bloom_filter_id); - build_side_bloom_filter->set_attr_id(info.build_side_bloom_filter_ids_[i]); - + hash_table_proto->add_build_side_bloom_filter_id(bloom_filter_id); std::cout << "Build " << build_side_bf.attribute->toString() << " @" << bloom_filter_config.builder << "\n"; } @@ -83,7 +80,7 @@ void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan, auto *probe_side_bloom_filter = hash_table_proto->add_probe_side_bloom_filters(); const auto &probe_side_bf = bloom_filter_config.probe_side_bloom_filters[i]; - std::cout << "Probe " << probe_side_bf.attribute->toString() + std::cout << "HashJoin probe " << probe_side_bf.attribute->toString() << " @" << probe_side_bf.builder << "\n"; const auto &build_side_info = @@ -92,13 +89,40 @@ void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan, probe_side_bf.builder)); probe_side_bloom_filter->set_bloom_filter_id(build_side_info.first); probe_side_bloom_filter->set_attr_id(info.probe_side_bloom_filter_ids_[i]); - std::cout << "Probe attr_id = " << info.probe_side_bloom_filter_ids_[i] << "\n"; + std::cout << "HashJoin probe attr_id = " << info.probe_side_bloom_filter_ids_[i] << "\n"; query_plan->addDirectDependency(info.join_operator_index_, build_side_info.second, true /* is_pipeline_breaker */); } } + + for (const auto &info : aggregates_) { + auto *aggregate_proto = + query_context_proto->mutable_aggregation_states(info.aggregate_state_id_); + const auto &bloom_filter_config = info.bloom_filter_config_; + + for (std::size_t i = 0; i < info.bloom_filter_ids_.size(); ++i) { + auto *bloom_filter = aggregate_proto->add_bloom_filters(); + const auto &bf = + bloom_filter_config.probe_side_bloom_filters[i]; + std::cout << "Aggregate probe " << bf.attribute->toString() + << " @" << bf.builder << "\n"; + + const auto &build_side_info = + bloom_filter_map.at( + std::make_pair(bf.source_attribute->id(), + bf.builder)); + bloom_filter->set_bloom_filter_id(build_side_info.first); + bloom_filter->set_attr_id(info.bloom_filter_ids_[i]); + std::cout << "Aggregate probe attr_id = " + << info.bloom_filter_ids_[i] << "\n"; + + query_plan->addDirectDependency(info.aggregate_operator_index_, + build_side_info.second, + true /* is_pipeline_breaker */); + } + } } void ExecutionHeuristics::setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/ExecutionHeuristics.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionHeuristics.hpp b/query_optimizer/ExecutionHeuristics.hpp index 8af1b4a..0755124 100644 --- a/query_optimizer/ExecutionHeuristics.hpp +++ b/query_optimizer/ExecutionHeuristics.hpp @@ -93,6 +93,23 @@ class ExecutionHeuristics { const std::size_t estimated_build_relation_cardinality_; }; + struct AggregateInfo { + AggregateInfo(const QueryPlan::DAGNodeIndex aggregate_operator_index, + const physical::BloomFilterConfig &bloom_filter_config, + std::vector<attribute_id> &&bloom_filter_ids, + const QueryContext::aggregation_state_id aggregate_state_id) + : aggregate_operator_index_(aggregate_operator_index), + bloom_filter_config_(bloom_filter_config), + bloom_filter_ids_(bloom_filter_ids), + aggregate_state_id_(aggregate_state_id) { + } + + const QueryPlan::DAGNodeIndex aggregate_operator_index_; + const physical::BloomFilterConfig &bloom_filter_config_; + const std::vector<attribute_id> bloom_filter_ids_; + const QueryContext::aggregation_state_id aggregate_state_id_; + }; + /** * @brief Constructor. @@ -121,15 +138,25 @@ class ExecutionHeuristics { std::vector<attribute_id> &&probe_side_bloom_filter_ids, const QueryContext::join_hash_table_id join_hash_table_id, const std::size_t estimated_build_relation_cardinality) { - hash_joins_.push_back(HashJoinInfo(build_operator_index, - join_operator_index, - referenced_stored_build_relation, - referenced_stored_probe_relation, - bloom_filter_config, - std::move(build_side_bloom_filter_ids), - std::move(probe_side_bloom_filter_ids), - join_hash_table_id, - estimated_build_relation_cardinality)); + hash_joins_.emplace_back(build_operator_index, + join_operator_index, + referenced_stored_build_relation, + referenced_stored_probe_relation, + bloom_filter_config, + std::move(build_side_bloom_filter_ids), + std::move(probe_side_bloom_filter_ids), + join_hash_table_id, + estimated_build_relation_cardinality); + } + + inline void addAggregateInfo(const QueryPlan::DAGNodeIndex aggregate_operator_index, + const physical::BloomFilterConfig &bloom_filter_config, + std::vector<attribute_id> &&bloom_filter_ids, + const QueryContext::aggregation_state_id aggregate_state_id) { + aggregates_.emplace_back(aggregate_operator_index, + bloom_filter_config, + std::move(bloom_filter_ids), + aggregate_state_id); } /** @@ -152,13 +179,9 @@ class ExecutionHeuristics { void setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto, const std::size_t cardinality); - std::size_t estimated_build_relation_cardinality() const { - return estimated_build_relation_cardinality_; - } - private: std::vector<HashJoinInfo> hash_joins_; - std::size_t estimated_build_relation_cardinality_; + std::vector<AggregateInfo> aggregates_; DISALLOW_COPY_AND_ASSIGN(ExecutionHeuristics); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/physical/Aggregate.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/Aggregate.hpp b/query_optimizer/physical/Aggregate.hpp index e40d894..b40997c 100644 --- a/query_optimizer/physical/Aggregate.hpp +++ b/query_optimizer/physical/Aggregate.hpp @@ -101,6 +101,10 @@ class Aggregate : public Physical { bool impliesUniqueAttributes( const std::vector<expressions::AttributeReferencePtr> &attributes) const override; + const BloomFilterConfig &bloom_filter_config() const { + return bloom_filter_config_; + } + /** * @brief Creates an Aggregate physical node. * @@ -114,9 +118,14 @@ class Aggregate : public Physical { PhysicalPtr input, const std::vector<expressions::NamedExpressionPtr> &grouping_expressions, const std::vector<expressions::AliasPtr> &aggregate_expressions, - const expressions::PredicatePtr &filter_predicate) { + const expressions::PredicatePtr &filter_predicate, + const BloomFilterConfig bloom_filter_config = BloomFilterConfig()) { return AggregatePtr( - new Aggregate(input, grouping_expressions, aggregate_expressions, filter_predicate)); + new Aggregate(input, + grouping_expressions, + aggregate_expressions, + filter_predicate, + bloom_filter_config)); } protected: @@ -133,11 +142,13 @@ class Aggregate : public Physical { PhysicalPtr input, const std::vector<expressions::NamedExpressionPtr> &grouping_expressions, const std::vector<expressions::AliasPtr> &aggregate_expressions, - const expressions::PredicatePtr &filter_predicate) + const expressions::PredicatePtr &filter_predicate, + const BloomFilterConfig &bloom_filter_config) : input_(input), grouping_expressions_(grouping_expressions), aggregate_expressions_(aggregate_expressions), - filter_predicate_(filter_predicate) { + filter_predicate_(filter_predicate), + bloom_filter_config_(bloom_filter_config) { addChild(input_); } @@ -145,6 +156,7 @@ class Aggregate : public Physical { std::vector<expressions::NamedExpressionPtr> grouping_expressions_; std::vector<expressions::AliasPtr> aggregate_expressions_; expressions::PredicatePtr filter_predicate_; + BloomFilterConfig bloom_filter_config_; DISALLOW_COPY_AND_ASSIGN(Aggregate); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/physical/HashJoin.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp index cacb08b..104cb52 100644 --- a/query_optimizer/physical/HashJoin.hpp +++ b/query_optimizer/physical/HashJoin.hpp @@ -48,56 +48,6 @@ namespace physical { class HashJoin; typedef std::shared_ptr<const HashJoin> HashJoinPtr; -struct BloomFilterConfig { - struct BuildSide { - BuildSide(const expressions::AttributeReferencePtr &attribute_in) - : attribute(attribute_in) { - } - expressions::AttributeReferencePtr attribute; - }; - struct ProbeSide { - ProbeSide(const expressions::AttributeReferencePtr &attribute_in, - const expressions::AttributeReferencePtr &source_attribute_in, - const physical::PhysicalPtr &builder_in) - : attribute(attribute_in), - source_attribute(source_attribute_in), - builder(builder_in) { - } - expressions::AttributeReferencePtr attribute; - expressions::AttributeReferencePtr source_attribute; - PhysicalPtr builder; - }; - BloomFilterConfig() {} - BloomFilterConfig(const PhysicalPtr &builder_in) - : builder(builder_in) { - } - BloomFilterConfig(const PhysicalPtr &builder_in, - const std::vector<BuildSide> &build_side_bloom_filters_in, - const std::vector<ProbeSide> &probe_side_bloom_filters_in) - : builder(builder_in), - build_side_bloom_filters(build_side_bloom_filters_in), - probe_side_bloom_filters(probe_side_bloom_filters_in) { - } - void addBuildSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in) { - for (const auto &build_bf : build_side_bloom_filters) { - if (attribute_in == build_bf.attribute) { - return; - } - } - build_side_bloom_filters.emplace_back(attribute_in); - } - void addProbeSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in, - const expressions::AttributeReferencePtr &source_attribute_in, - const physical::PhysicalPtr &builder_in) { - probe_side_bloom_filters.emplace_back(attribute_in, - source_attribute_in, - builder_in); - } - PhysicalPtr builder; - std::vector<BuildSide> build_side_bloom_filters; - std::vector<ProbeSide> probe_side_bloom_filters; -}; - /** * @brief Physical hash join node. */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/physical/Physical.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/Physical.hpp b/query_optimizer/physical/Physical.hpp index 721b987..389cd05 100644 --- a/query_optimizer/physical/Physical.hpp +++ b/query_optimizer/physical/Physical.hpp @@ -39,6 +39,56 @@ namespace physical { class Physical; typedef std::shared_ptr<const Physical> PhysicalPtr; +struct BloomFilterConfig { + struct BuildSide { + BuildSide(const expressions::AttributeReferencePtr &attribute_in) + : attribute(attribute_in) { + } + expressions::AttributeReferencePtr attribute; + }; + struct ProbeSide { + ProbeSide(const expressions::AttributeReferencePtr &attribute_in, + const expressions::AttributeReferencePtr &source_attribute_in, + const physical::PhysicalPtr &builder_in) + : attribute(attribute_in), + source_attribute(source_attribute_in), + builder(builder_in) { + } + expressions::AttributeReferencePtr attribute; + expressions::AttributeReferencePtr source_attribute; + PhysicalPtr builder; + }; + BloomFilterConfig() {} + BloomFilterConfig(const PhysicalPtr &builder_in) + : builder(builder_in) { + } + BloomFilterConfig(const PhysicalPtr &builder_in, + const std::vector<BuildSide> &build_side_bloom_filters_in, + const std::vector<ProbeSide> &probe_side_bloom_filters_in) + : builder(builder_in), + build_side_bloom_filters(build_side_bloom_filters_in), + probe_side_bloom_filters(probe_side_bloom_filters_in) { + } + void addBuildSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in) { + for (const auto &build_bf : build_side_bloom_filters) { + if (attribute_in == build_bf.attribute) { + return; + } + } + build_side_bloom_filters.emplace_back(attribute_in); + } + void addProbeSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in, + const expressions::AttributeReferencePtr &source_attribute_in, + const physical::PhysicalPtr &builder_in) { + probe_side_bloom_filters.emplace_back(attribute_in, + source_attribute_in, + builder_in); + } + PhysicalPtr builder; + std::vector<BuildSide> build_side_bloom_filters; + std::vector<ProbeSide> probe_side_bloom_filters; +}; + /** * @brief Base class for physical plan nodes. */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/rules/AttachBloomFilters.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/AttachBloomFilters.cpp b/query_optimizer/rules/AttachBloomFilters.cpp index e3bdc36..f6602b8 100644 --- a/query_optimizer/rules/AttachBloomFilters.cpp +++ b/query_optimizer/rules/AttachBloomFilters.cpp @@ -51,24 +51,24 @@ P::PhysicalPtr AttachBloomFilters::apply(const P::PhysicalPtr &input) { visitProducer(input, 0); visitConsumer(input); - for (const auto &info_vec_pair : consumers_) { - std::cerr << "--------\n" - << "Node " << info_vec_pair.first->getName() - << " " << info_vec_pair.first << "\n"; - - for (const auto &info : info_vec_pair.second) { - std::cerr << info.attribute->attribute_alias(); - if (info.attribute->id() != info.source_attribute->id()) { - std::cerr << "{FROM " << info.source_attribute->attribute_alias() << "}"; - } - if (info.from_sibling) { - std::cerr << " sibling"; - } - std::cerr << " @" << info.source << "[" << info.depth << "]" - << ": " << info.selectivity << "\n"; - } - std::cerr << "********\n"; - } +// for (const auto &info_vec_pair : consumers_) { +// std::cerr << "--------\n" +// << "Node " << info_vec_pair.first->getName() +// << " " << info_vec_pair.first << "\n"; +// +// for (const auto &info : info_vec_pair.second) { +// std::cerr << info.attribute->attribute_alias(); +// if (info.attribute->id() != info.source_attribute->id()) { +// std::cerr << "{FROM " << info.source_attribute->attribute_alias() << "}"; +// } +// if (info.from_sibling) { +// std::cerr << " sibling"; +// } +// std::cerr << " @" << info.source << "[" << info.depth << "]" +// << ": " << info.selectivity << "\n"; +// } +// std::cerr << "********\n"; +// } return visitAndAttach(input); } @@ -192,9 +192,20 @@ void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) { info.attribute); } } + } + P::PhysicalPtr consumer_child = nullptr; + if (node->getPhysicalType() == P::PhysicalType::kHashJoin) { + consumer_child = std::static_pointer_cast<const P::HashJoin>(node)->left(); + } + if (node->getPhysicalType() == P::PhysicalType::kAggregate) { + consumer_child = std::static_pointer_cast<const P::Aggregate>(node)->input(); + } + + if (consumer_child != nullptr) { // Decide attaches - if (cost_model_->estimateCardinality(consumer_child) > 200000000 && + auto &consumer_bloom_filters = consumers_[consumer_child]; + if (cost_model_->estimateCardinality(consumer_child) > 10000000 && !consumer_bloom_filters.empty()) { std::map<E::AttributeReferencePtr, const BloomFilterInfo*> filters; for (const auto &info : consumer_bloom_filters) { @@ -240,10 +251,10 @@ P::PhysicalPtr AttachBloomFilters::visitAndAttach(const physical::PhysicalPtr &n if (node->getPhysicalType() == P::PhysicalType::kHashJoin) { const auto attach_it = attaches_.find(node); if (attach_it != attaches_.end()) { - for (const auto& item : attach_it->second.probe_side_bloom_filters) { - std::cout << "Attach probe from " << item.builder - << " to " << node << "\n"; - } +// for (const auto& item : attach_it->second.probe_side_bloom_filters) { +// std::cout << "Attach probe from " << item.builder +// << " to " << node << "\n"; +// } const P::HashJoinPtr hash_join = std::static_pointer_cast<const P::HashJoin>(node); @@ -259,6 +270,25 @@ P::PhysicalPtr AttachBloomFilters::visitAndAttach(const physical::PhysicalPtr &n } } + if (node->getPhysicalType() == P::PhysicalType::kAggregate) { + const auto attach_it = attaches_.find(node); + if (attach_it != attaches_.end()) { +// for (const auto& item : attach_it->second.probe_side_bloom_filters) { +// std::cout << "Attach probe from " << item.builder +// << " to " << node << "\n"; +// } + + const P::AggregatePtr aggregate = + std::static_pointer_cast<const P::Aggregate>(node); + return P::Aggregate::Create( + aggregate->input(), + aggregate->grouping_expressions(), + aggregate->aggregate_expressions(), + aggregate->filter_predicate(), + attach_it->second); + } + } + if (has_changed) { return node->copyWithNewChildren(new_children); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp index 42a7402..22485b2 100644 --- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp +++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp @@ -150,7 +150,8 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan( cost_model_->estimateCardinality(tables[i]), cost_model_->estimateSelectivity(tables[i]), CountSharedAttributes(join_group.referenced_attributes, - tables[i]->getOutputAttributes())); + tables[i]->getOutputAttributes()), + tables[i]->getPhysicalType() == physical::PhysicalType::kAggregate); } // Auxiliary mapping info. @@ -316,6 +317,7 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan( selected_probe_table_info->estimated_num_output_attributes = CountSharedAttributes(join_group.referenced_attributes, output->getOutputAttributes()); + selected_probe_table_info->is_aggregation = false; remaining_tables.emplace(selected_probe_table_info); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp index a0e34ce..7a6fa81 100644 --- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp +++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp @@ -75,12 +75,14 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> { const physical::PhysicalPtr &table_in, const std::size_t estimated_cardinality_in, const double estimated_selectivity_in, - const std::size_t estimated_num_output_attributes_in) + const std::size_t estimated_num_output_attributes_in, + const bool is_aggregation_in) : table_info_id(table_info_id_in), table(table_in), estimated_cardinality(estimated_cardinality_in), estimated_selectivity(estimated_selectivity_in), - estimated_num_output_attributes(estimated_num_output_attributes_in) { + estimated_num_output_attributes(estimated_num_output_attributes_in), + is_aggregation(is_aggregation_in) { } const std::size_t table_info_id; @@ -88,6 +90,7 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> { std::size_t estimated_cardinality; double estimated_selectivity; std::size_t estimated_num_output_attributes; + bool is_aggregation; }; struct JoinPair { @@ -107,13 +110,17 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> { return rhs_has_large_output; } -// const bool lhs_has_small_build = -// !lhs_has_large_output && lhs.build->estimated_cardinality < 0x1000; -// const bool rhs_has_small_build = -// !rhs_has_large_output && rhs.build->estimated_cardinality < 0x1000; -// if (lhs_has_small_build != rhs_has_small_build) { -// return lhs_has_small_build; -// } + const bool lhs_has_small_build = + !lhs_has_large_output && lhs.build->estimated_cardinality < 0x100; + const bool rhs_has_small_build = + !rhs_has_large_output && rhs.build->estimated_cardinality < 0x100; + if (lhs_has_small_build != rhs_has_small_build) { + return lhs_has_small_build; + } + + if (lhs.probe->is_aggregation != rhs.probe->is_aggregation) { + return lhs.probe->is_aggregation; + } if (lhs.probe->estimated_cardinality != rhs.probe->estimated_cardinality) { return lhs.probe->estimated_cardinality < rhs.probe->estimated_cardinality; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 4878cf1..8c9e8b6 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -46,10 +46,13 @@ #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" +#include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorUtil.hpp" #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" #include "types/containers/Tuple.hpp" +#include "utility/BloomFilterAdapter.hpp" #include "glog/logging.h" @@ -57,6 +60,8 @@ using std::unique_ptr; namespace quickstep { +DECLARE_int64(bloom_adapter_batch_size); + AggregationOperationState::AggregationOperationState( const CatalogRelationSchema &input_relation, const std::vector<const AggregateFunction*> &aggregate_functions, @@ -64,12 +69,16 @@ AggregationOperationState::AggregationOperationState( std::vector<bool> &&is_distinct, std::vector<std::unique_ptr<const Scalar>> &&group_by, const Predicate *predicate, + std::vector<const BloomFilter *> &&bloom_filters, + std::vector<attribute_id> &&bloom_filter_attribute_ids, const std::size_t estimated_num_entries, const HashTableImplType hash_table_impl_type, const std::vector<HashTableImplType> &distinctify_hash_table_impl_types, StorageManager *storage_manager) : input_relation_(input_relation), predicate_(predicate), + bloom_filters_(std::move(bloom_filters)), + bloom_filter_attribute_ids_(std::move(bloom_filter_attribute_ids)), group_by_list_(std::move(group_by)), arguments_(std::move(arguments)), is_distinct_(std::move(is_distinct)), @@ -183,7 +192,8 @@ AggregationOperationState::AggregationOperationState( AggregationOperationState* AggregationOperationState::ReconstructFromProto( const serialization::AggregationOperationState &proto, const CatalogDatabaseLite &database, - StorageManager *storage_manager) { + StorageManager *storage_manager, + const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) { DCHECK(ProtoIsValid(proto, database)); // Rebuild contructor arguments from their representation in 'proto'. @@ -232,12 +242,25 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto( database)); } + std::vector<const BloomFilter*> bloom_filter_vector; + std::vector<attribute_id> bloom_filter_attribute_ids; + for (int i = 0; i < proto.bloom_filters_size(); ++i) { + std::cerr << "Add bloom filter " << i << "\n"; + // Add the pointer to the probe bloom filter within the list of probe bloom filters to use. + const auto bloom_filter_proto = proto.bloom_filters(i); + bloom_filter_vector.emplace_back( + bloom_filters[bloom_filter_proto.bloom_filter_id()].get()); + bloom_filter_attribute_ids.emplace_back(bloom_filter_proto.attr_id()); + } + return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()), aggregate_functions, std::move(arguments), std::move(is_distinct), std::move(group_by_expressions), predicate.release(), + std::move(bloom_filter_vector), + std::move(bloom_filter_attribute_ids), proto.estimated_num_entries(), HashTableImplTypeFromProto(proto.hash_table_impl_type()), distinctify_hash_table_impl_types, @@ -340,6 +363,10 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b // tuples so that it can be reused across multiple aggregates (i.e. we only // pay the cost of evaluating the predicate once). std::unique_ptr<TupleIdSequence> reuse_matches; + if (predicate_) { + reuse_matches.reset(block->getMatchesForPredicate(predicate_.get())); + } + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { @@ -358,7 +385,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b arguments_[agg_idx], local_arguments_as_attributes, {}, /* group_by */ - predicate_.get(), distinctify_hashtables_[agg_idx].get(), &reuse_matches, nullptr /* reuse_group_by_vectors */); @@ -369,7 +395,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b block->aggregate(*handles_[agg_idx], arguments_[agg_idx], local_arguments_as_attributes, - predicate_.get(), &reuse_matches)); } } @@ -391,6 +416,71 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo // GROUP BY expressions once). std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors; + if (predicate_) { + reuse_matches.reset(block->getMatchesForPredicate(predicate_.get())); + } + + if (bloom_filters_.size() > 0) { + const std::size_t num_tuples = block->getNumTuples(); +// std::cerr << "Before: " +// << (reuse_matches ? reuse_matches->numTuples() : num_tuples) +// << "\n"; + std::unique_ptr<ValueAccessor> accessor; + if (reuse_matches) { + accessor.reset( + block->getTupleStorageSubBlock().createValueAccessor(reuse_matches.get())); + } else { + accessor.reset( + block->getTupleStorageSubBlock().createValueAccessor()); + } + InvokeOnAnyValueAccessor( + accessor.get(), + [&](auto *accessor) -> void { // NOLINT(build/c++11) + std::unique_ptr<TupleIdSequence> filtered(new TupleIdSequence(num_tuples)); + + std::vector<std::size_t> attr_size_vector; + attr_size_vector.reserve(bloom_filter_attribute_ids_.size()); + for (const auto &attr : bloom_filter_attribute_ids_) { + auto val_and_size = + accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, attr); + attr_size_vector.emplace_back(val_and_size.second); + } + + std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter; + bloom_filter_adapter.reset(new BloomFilterAdapter( + bloom_filters_, bloom_filter_attribute_ids_, attr_size_vector)); + + std::uint32_t batch_size_try = FLAGS_bloom_adapter_batch_size; + std::uint32_t num_tuples_left = accessor->getNumTuples(); + std::vector<tuple_id> batch(num_tuples_left); + + do { + std::uint32_t batch_size = + batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left; + for (std::size_t i = 0; i < batch_size; ++i) { + accessor->next(); + batch.push_back(accessor->getCurrentPosition()); + } + + std::size_t num_hits = bloom_filter_adapter->bulkProbe<true>(accessor, batch); + for (std::size_t t = 0; t < num_hits; ++t){ + filtered->set(batch[t], true); + } + + batch.clear(); + num_tuples_left -= batch_size; + batch_size_try = batch_size * 2; + } while (num_tuples_left > 0); + + if (reuse_matches) { + reuse_matches->intersectWith(*filtered); + } else { + reuse_matches.reset(filtered.release()); + } + }); +// std::cerr << "After: " << reuse_matches->numTuples() << "\n"; + } + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { @@ -402,7 +492,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo arguments_[agg_idx], nullptr, /* arguments_as_attributes */ group_by_list_, - predicate_.get(), distinctify_hashtables_[agg_idx].get(), &reuse_matches, &reuse_group_by_vectors); @@ -416,7 +505,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo block->aggregateGroupBy(*handles_[agg_idx], arguments_[agg_idx], group_by_list_, - predicate_.get(), agg_hash_table, &reuse_matches, &reuse_group_by_vectors); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 0199749..5db7325 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -33,6 +33,7 @@ #include "storage/HashTableBase.hpp" #include "storage/HashTablePool.hpp" #include "storage/StorageBlockInfo.hpp" +#include "utility/BloomFilter.hpp" #include "utility/Macros.hpp" namespace quickstep { @@ -108,6 +109,8 @@ class AggregationOperationState { std::vector<bool> &&is_distinct, std::vector<std::unique_ptr<const Scalar>> &&group_by, const Predicate *predicate, + std::vector<const BloomFilter *> &&bloom_filters, + std::vector<attribute_id> &&bloom_filter_attribute_ids, const std::size_t estimated_num_entries, const HashTableImplType hash_table_impl_type, const std::vector<HashTableImplType> &distinctify_hash_table_impl_types, @@ -131,7 +134,8 @@ class AggregationOperationState { static AggregationOperationState* ReconstructFromProto( const serialization::AggregationOperationState &proto, const CatalogDatabaseLite &database, - StorageManager *storage_manager); + StorageManager *storage_manager, + const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters); /** * @brief Check whether a serialization::AggregationOperationState is @@ -181,6 +185,10 @@ class AggregationOperationState { // filter predicate (if any), and the list of GROUP BY expressions (if any). const CatalogRelationSchema &input_relation_; std::unique_ptr<const Predicate> predicate_; + + std::vector<const BloomFilter*> bloom_filters_; + std::vector<attribute_id> bloom_filter_attribute_ids_; + std::vector<std::unique_ptr<const Scalar>> group_by_list_; // Each individual aggregate in this operation has an AggregationHandle and http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/AggregationOperationState.proto ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto index bf78e3a..165148e 100644 --- a/storage/AggregationOperationState.proto +++ b/storage/AggregationOperationState.proto @@ -42,4 +42,10 @@ message AggregationOperationState { // Each DISTINCT aggregation has its distinctify hash table impl type. repeated HashTableImplType distinctify_hash_table_impl_types = 7; + + message BloomFilter { + required uint32 bloom_filter_id = 1; + required uint32 attr_id = 2; + } + repeated BloomFilter bloom_filters = 8; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/HashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp index 2c526c2..04c2ca8 100644 --- a/storage/HashTable.hpp +++ b/storage/HashTable.hpp @@ -41,14 +41,12 @@ #include "types/TypedValue.hpp" #include "utility/BloomFilter.hpp" #include "utility/BloomFilterAdapter.hpp" -#include "utility/EventProfiler.hpp" #include "utility/HashPair.hpp" #include "utility/Macros.hpp" namespace quickstep { DECLARE_int64(bloom_adapter_batch_size); -DECLARE_bool(adapt_bloom_filters); /** \addtogroup Storage * @{ @@ -1048,7 +1046,7 @@ class HashTable : public HashTableBase<resizable, * @param probe_attribute_ids The vector of attribute ids to use for probing * the bloom filter. **/ - inline void addProbeSideAttributeIds(const attribute_id &probe_attribute_id) { + inline void addProbeSideAttributeId(const attribute_id probe_attribute_id) { probe_attribute_ids_.push_back(probe_attribute_id); } @@ -2263,7 +2261,7 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_ for (const auto &probe_attr : probe_attribute_ids_) { auto val_and_size = accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, probe_attr); - attr_size_vector.push_back(val_and_size.second); + attr_size_vector.emplace_back(val_and_size.second); } bloom_filter_adapter.reset(new BloomFilterAdapter( @@ -2280,30 +2278,18 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_ std::uint32_t num_tuples_left = accessor->getNumTuples(); std::vector<tuple_id> batch(num_tuples_left); - auto *container = simple_profiler.getContainer(); - auto *line = container->getEventLine(0); - do { - const std::uint32_t batch_size = + std::uint32_t batch_size = batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left; for (std::size_t i = 0; i < batch_size; ++i) { accessor->next(); batch.push_back(accessor->getCurrentPosition()); } - line->emplace_back(); - std::size_t num_hits; - if (FLAGS_adapt_bloom_filters) { - num_hits = bloom_filter_adapter->bulkProbe<true>(accessor, batch); - } else { - num_hits = bloom_filter_adapter->bulkProbe<false>(accessor, batch); - } - line->back().setPayload(num_hits+0); - line->back().endEvent(); -// std::size_t num_hits = batch_size; + std::size_t num_hits = bloom_filter_adapter->bulkProbe<true>(accessor, batch); - for (std::size_t i = 0; i < num_hits; ++i){ - const tuple_id probe_tid = batch[i]; + for (std::size_t t = 0; t < num_hits; ++t){ + tuple_id probe_tid = batch[t]; TypedValue key = accessor->getTypedValueAtAbsolutePosition(key_attr_id, probe_tid); if (check_for_null_keys && key.isNull()) { continue; @@ -2320,6 +2306,7 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_ break; } } + batch.clear(); num_tuples_left -= batch_size; batch_size_try = batch_size * 2; } while (!accessor->iterationFinished()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/HashTable.proto ---------------------------------------------------------------------- diff --git a/storage/HashTable.proto b/storage/HashTable.proto index 0cf9f5e..90bc9f7 100644 --- a/storage/HashTable.proto +++ b/storage/HashTable.proto @@ -34,10 +34,10 @@ message HashTable { required HashTableImplType hash_table_impl_type = 1; repeated Type key_types = 2; required uint64 estimated_num_entries = 3; - message BloomFilterReference { + repeated uint32 build_side_bloom_filter_id = 4; + message ProbeSideBloomFilter { required uint32 bloom_filter_id = 1; required uint32 attr_id = 2; } - repeated BloomFilterReference build_side_bloom_filters = 4; - repeated BloomFilterReference probe_side_bloom_filters = 5; + repeated ProbeSideBloomFilter probe_side_bloom_filters = 6; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/HashTableFactory.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp index 00a09c1..df2962a 100644 --- a/storage/HashTableFactory.hpp +++ b/storage/HashTableFactory.hpp @@ -318,11 +318,9 @@ class HashTableFactory { // individual implementations of the hash table constructors. // Check if there are any build side bloom filter defined on the hash table. - if (proto.build_side_bloom_filters_size() > 0) { - CHECK_EQ(1u, proto.build_side_bloom_filters_size()); + if (proto.build_side_bloom_filter_id_size() > 0) { hash_table->enableBuildSideBloomFilter(); - hash_table->setBuildSideBloomFilter( - bloom_filters[proto.build_side_bloom_filters(0).bloom_filter_id()].get()); + hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get()); } // Check if there are any probe side bloom filters defined on the hash table. @@ -335,8 +333,7 @@ class HashTableFactory { hash_table->addProbeSideBloomFilter( bloom_filters[probe_side_bloom_filter.bloom_filter_id()].get()); - // Add the attribute ids corresponding to this probe bloom filter. - hash_table->addProbeSideAttributeIds(probe_side_bloom_filter.attr_id()); + hash_table->addProbeSideAttributeId(probe_side_bloom_filter.attr_id()); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/StorageBlock.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp index fdd438d..78aba7c 100644 --- a/storage/StorageBlock.cpp +++ b/storage/StorageBlock.cpp @@ -389,15 +389,7 @@ AggregationState* StorageBlock::aggregate( const AggregationHandle &handle, const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<attribute_id> *arguments_as_attributes, - const Predicate *predicate, std::unique_ptr<TupleIdSequence> *reuse_matches) const { - // If there is a filter predicate that hasn't already been evaluated, - // evaluate it now and save the results for other aggregates on this same - // block. - if (predicate && !*reuse_matches) { - reuse_matches->reset(getMatchesForPredicate(predicate)); - } - #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION // If all the arguments to this aggregate are plain relation attributes, // aggregate directly on a ValueAccessor from this block to avoid a copy. @@ -418,7 +410,6 @@ void StorageBlock::aggregateGroupBy( const AggregationHandle &handle, const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<std::unique_ptr<const Scalar>> &group_by, - const Predicate *predicate, AggregationStateHashTableBase *hash_table, std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const { @@ -440,14 +431,7 @@ void StorageBlock::aggregateGroupBy( ColumnVectorsValueAccessor temp_result; { std::unique_ptr<ValueAccessor> accessor; - if (predicate) { - if (!*reuse_matches) { - // If there is a filter predicate that hasn't already been evaluated, - // evaluate it now and save the results for other aggregates on this - // same block. - reuse_matches->reset(getMatchesForPredicate(predicate)); - } - + if (reuse_matches) { // Create a filtered ValueAccessor that only iterates over predicate // matches. accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get())); @@ -499,7 +483,6 @@ void StorageBlock::aggregateDistinct( const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<attribute_id> *arguments_as_attributes, const std::vector<std::unique_ptr<const Scalar>> &group_by, - const Predicate *predicate, AggregationStateHashTableBase *distinctify_hash_table, std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const { @@ -514,14 +497,7 @@ void StorageBlock::aggregateDistinct( ColumnVectorsValueAccessor temp_result; { std::unique_ptr<ValueAccessor> accessor; - if (predicate) { - if (!*reuse_matches) { - // If there is a filter predicate that hasn't already been evaluated, - // evaluate it now and save the results for other aggregates on this - // same block. - reuse_matches->reset(getMatchesForPredicate(predicate)); - } - + if (reuse_matches) { // Create a filtered ValueAccessor that only iterates over predicate // matches. accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get())); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/StorageBlock.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp index 3ae3812..3217fa2 100644 --- a/storage/StorageBlock.hpp +++ b/storage/StorageBlock.hpp @@ -410,7 +410,6 @@ class StorageBlock : public StorageBlockBase { const AggregationHandle &handle, const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<attribute_id> *arguments_as_attributes, - const Predicate *predicate, std::unique_ptr<TupleIdSequence> *reuse_matches) const; /** @@ -460,7 +459,6 @@ class StorageBlock : public StorageBlockBase { void aggregateGroupBy(const AggregationHandle &handle, const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<std::unique_ptr<const Scalar>> &group_by, - const Predicate *predicate, AggregationStateHashTableBase *hash_table, std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> @@ -505,7 +503,6 @@ class StorageBlock : public StorageBlockBase { const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<attribute_id> *arguments_as_attributes, const std::vector<std::unique_ptr<const Scalar>> &group_by, - const Predicate *predicate, AggregationStateHashTableBase *distinctify_hash_table, std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const; @@ -588,6 +585,8 @@ class StorageBlock : public StorageBlockBase { **/ const std::size_t getNumTuples() const; + TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const; + private: static TupleStorageSubBlock* CreateTupleStorageSubBlock( const CatalogRelationSchema &relation, @@ -627,8 +626,6 @@ class StorageBlock : public StorageBlockBase { // StorageBlock's header. bool rebuildIndexes(bool short_circuit); - TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const; - std::unordered_map<attribute_id, TypedValue>* generateUpdatedValues( const ValueAccessor &accessor, const tuple_id tuple, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/utility/PlanVisualizer.cpp ---------------------------------------------------------------------- diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp index 37fa790..4cc1b0f 100644 --- a/utility/PlanVisualizer.cpp +++ b/utility/PlanVisualizer.cpp @@ -29,6 +29,7 @@ #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" #include "query_optimizer/expressions/AttributeReference.hpp" +#include "query_optimizer/physical/Aggregate.hpp" #include "query_optimizer/physical/HashJoin.hpp" #include "query_optimizer/physical/Physical.hpp" #include "query_optimizer/physical/PhysicalType.hpp" @@ -155,6 +156,29 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) { node_info.labels.emplace_back("RIGHT join attrs unique"); } + const auto &bf_config = hash_join->bloom_filter_config(); + for (const auto &bf : bf_config.build_side_bloom_filters) { + node_info.labels.emplace_back( + std::string("[BF build] ") + bf.attribute->attribute_alias()); + } + for (const auto &bf : bf_config.probe_side_bloom_filters) { + node_info.labels.emplace_back( + std::string("[BF probe] ") + bf.attribute->attribute_alias()); + } + + break; + } + case P::PhysicalType::kAggregate: { + const P::AggregatePtr aggregate = + std::static_pointer_cast<const P::Aggregate>(input); + node_info.labels.emplace_back(input->getName()); + + const auto &bf_config = aggregate->bloom_filter_config(); + for (const auto &bf : bf_config.probe_side_bloom_filters) { + node_info.labels.emplace_back( + std::string("[BF probe] ") + bf.attribute->attribute_alias()); + } + break; } default: {