Repository: incubator-quickstep Updated Branches: refs/heads/master 5ffdaaf9f -> 4ba819c5b
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/rules/InjectJoinFilters.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/InjectJoinFilters.cpp b/query_optimizer/rules/InjectJoinFilters.cpp new file mode 100644 index 0000000..0fcd06b --- /dev/null +++ b/query_optimizer/rules/InjectJoinFilters.cpp @@ -0,0 +1,438 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "query_optimizer/rules/InjectJoinFilters.hpp" + +#include <cstddef> +#include <cstdint> +#include <vector> + +#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" +#include "query_optimizer/expressions/AttributeReference.hpp" +#include "query_optimizer/expressions/ExpressionUtil.hpp" +#include "query_optimizer/expressions/Predicate.hpp" +#include "query_optimizer/physical/LIPFilterConfiguration.hpp" +#include "query_optimizer/physical/Aggregate.hpp" +#include "query_optimizer/physical/FilterJoin.hpp" +#include "query_optimizer/physical/HashJoin.hpp" +#include "query_optimizer/physical/PatternMatcher.hpp" +#include "query_optimizer/physical/Physical.hpp" +#include "query_optimizer/physical/PhysicalType.hpp" +#include "query_optimizer/physical/Selection.hpp" +#include "query_optimizer/physical/TopLevelPlan.hpp" +#include "query_optimizer/rules/PruneColumns.hpp" +#include "types/TypeID.hpp" +#include "types/TypedValue.hpp" +#include "utility/lip_filter/LIPFilter.hpp" + +#include "glog/logging.h" + +namespace quickstep { +namespace optimizer { + +namespace E = ::quickstep::optimizer::expressions; +namespace P = ::quickstep::optimizer::physical; + +P::PhysicalPtr InjectJoinFilters::apply(const P::PhysicalPtr &input) { + DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan); + + const P::TopLevelPlanPtr top_level_plan = + std::static_pointer_cast<const P::TopLevelPlan>(input); + cost_model_.reset( + new cost::StarSchemaSimpleCostModel( + top_level_plan->shared_subplans())); + lip_filter_configuration_.reset(new P::LIPFilterConfiguration()); + + // Step 1. Transform applicable HashJoin nodes to FilterJoin nodes. + P::PhysicalPtr output = transformHashJoinToFilters(input); + + // Step 2. Push down FilterJoin nodes to be evaluated early. + output = pushDownFilters(output); + + // Step 3. Add Selection nodes for attaching the LIPFilters, if necessary. + output = addFilterAnchors(output, false); + + // Step 4. Because of the pushdown of FilterJoin nodes, there are optimization + // opportunities for projecting columns early. + output = PruneColumns().apply(output); + + // Step 5. For each FilterJoin node, attach its corresponding LIPFilter to + // proper nodes. + concretizeAsLIPFilters(output, nullptr); + + if (!lip_filter_configuration_->getBuildInfoMap().empty() || + !lip_filter_configuration_->getProbeInfoMap().empty()) { + output = std::static_pointer_cast<const P::TopLevelPlan>(output) + ->copyWithLIPFilterConfiguration( + P::LIPFilterConfigurationPtr(lip_filter_configuration_.release())); + } + + return output; +} + +bool InjectJoinFilters::isTransformable( + const physical::HashJoinPtr &hash_join) const { + // Conditions for replacing a HashJoin with a FilterJoin: + + // No residual predicate. + if (hash_join->residual_predicate() != nullptr) { + return false; + } + // Single attribute equi-join. + if (hash_join->right_join_attributes().size() > 1) { + return false; + } + // All the output attributes must be from the probe side. + if (!E::SubsetOfExpressions(hash_join->getOutputAttributes(), + hash_join->left()->getOutputAttributes())) { + return false; + } + switch (hash_join->join_type()) { + case P::HashJoin::JoinType::kInnerJoin: { + // In the case of inner join, the build side join attributes must be unique. + if (!cost_model_->impliesUniqueAttributes(hash_join->right(), + hash_join->right_join_attributes())) { + return false; + } + break; + } + case P::HashJoin::JoinType::kLeftSemiJoin: // Fall through + case P::HashJoin::JoinType::kLeftAntiJoin: + break; + default: + return false; + } + + // The build side join attribute has integer type and its values are exactly + // within a reasonable range. + std::int64_t min_cpp_value; + std::int64_t max_cpp_value; + const bool has_exact_min_max_stats = + findExactMinMaxValuesForAttributeHelper(hash_join->right(), + hash_join->right_join_attributes().front(), + &min_cpp_value, + &max_cpp_value); + if (!has_exact_min_max_stats) { + return false; + } + + // TODO(jianqiao): implement SimpleHashSetExactFilter to relax this requirement. + // Note that 1G bits = 128MB. + const std::int64_t value_range = max_cpp_value - min_cpp_value; + DCHECK_GE(value_range, 0); + if (value_range > kMaxFilterSize) { + return false; + } + + return true; +} + +P::PhysicalPtr InjectJoinFilters::transformHashJoinToFilters( + const P::PhysicalPtr &input) const { + std::vector<P::PhysicalPtr> new_children; + bool has_changed_children = false; + for (const P::PhysicalPtr &child : input->children()) { + const P::PhysicalPtr new_child = transformHashJoinToFilters(child); + if (child != new_child && !has_changed_children) { + has_changed_children = true; + } + new_children.push_back(new_child); + } + + P::HashJoinPtr hash_join; + if (P::SomeHashJoin::MatchesWithConditionalCast(input, &hash_join) && + isTransformable(hash_join)) { + const bool is_anti_join = + hash_join->join_type() == P::HashJoin::JoinType::kLeftAntiJoin; + + DCHECK_EQ(2u, new_children.size()); + P::PhysicalPtr build_child = new_children[1]; + E::PredicatePtr build_side_filter_predicate = nullptr; + P::SelectionPtr selection; + if (P::SomeSelection::MatchesWithConditionalCast(build_child, &selection) && + E::SubsetOfExpressions(hash_join->right_join_attributes(), + selection->input()->getOutputAttributes())) { + build_child = selection->input(); + build_side_filter_predicate = selection->filter_predicate(); + } + + return P::FilterJoin::Create(new_children[0], + build_child, + hash_join->left_join_attributes(), + hash_join->right_join_attributes(), + hash_join->project_expressions(), + build_side_filter_predicate, + is_anti_join); + } + + if (has_changed_children) { + return input->copyWithNewChildren(new_children); + } else { + return input; + } +} + +physical::PhysicalPtr InjectJoinFilters::pushDownFilters( + const physical::PhysicalPtr &input) const { + std::vector<P::PhysicalPtr> new_children; + bool has_changed_children = false; + for (const P::PhysicalPtr &child : input->children()) { + const P::PhysicalPtr new_child = pushDownFilters(child); + if (child != new_child && !has_changed_children) { + has_changed_children = true; + } + new_children.push_back(new_child); + } + + P::FilterJoinPtr filter_join; + if (P::SomeFilterJoin::MatchesWithConditionalCast(input, &filter_join)) { + DCHECK_EQ(2u, new_children.size()); + return pushDownFiltersInternal( + new_children[0], new_children[1], filter_join); + } + + if (has_changed_children) { + return input->copyWithNewChildren(new_children); + } else { + return input; + } +} + +physical::PhysicalPtr InjectJoinFilters::pushDownFiltersInternal( + const physical::PhysicalPtr &probe_child, + const physical::PhysicalPtr &build_child, + const physical::FilterJoinPtr &filter_join) const { + switch (probe_child->getPhysicalType()) { + case P::PhysicalType::kAggregate: // Fall through + case P::PhysicalType::kHashJoin: + case P::PhysicalType::kSample: + case P::PhysicalType::kSelection: + case P::PhysicalType::kSort: + case P::PhysicalType::kWindowAggregate: { + DCHECK_GE(probe_child->getNumChildren(), 1u); + const P::PhysicalPtr child = probe_child->children()[0]; + if (E::SubsetOfExpressions(filter_join->probe_attributes(), + child->getOutputAttributes())) { + const P::PhysicalPtr new_child = + pushDownFiltersInternal(child, build_child, filter_join); + if (new_child != child) { + std::vector<P::PhysicalPtr> new_children = probe_child->children(); + new_children[0] = new_child; + return probe_child->copyWithNewChildren(new_children); + } + } + } + default: + break; + } + + if (probe_child != filter_join->left()) { + // TODO(jianqiao): may need to update probe_attributes. + return P::FilterJoin::Create(probe_child, + build_child, + filter_join->probe_attributes(), + filter_join->build_attributes(), + E::ToNamedExpressions(probe_child->getOutputAttributes()), + filter_join->build_side_filter_predicate(), + filter_join->is_anti_join()); + } else { + return filter_join; + } +} + + +physical::PhysicalPtr InjectJoinFilters::addFilterAnchors( + const physical::PhysicalPtr &input, + const bool ancestor_can_anchor_filter) const { + std::vector<P::PhysicalPtr> new_children; + + switch (input->getPhysicalType()) { + case P::PhysicalType::kAggregate: { + const P::AggregatePtr &aggregate = + std::static_pointer_cast<const P::Aggregate>(input); + new_children.emplace_back( + addFilterAnchors(aggregate->input(), true)); + break; + } + case P::PhysicalType::kSelection: { + const P::SelectionPtr &selection = + std::static_pointer_cast<const P::Selection>(input); + new_children.emplace_back( + addFilterAnchors(selection->input(), true)); + break; + } + // NOTE(jianqiao): Some of the SSB/TPCH queries slow down significantly if + // we attach converted filters to parent HashJoin nodes. E.g. one HashJoin + + // one attached LIPFilter is slower than the original two HashJoins. This is + // due to some implementation issues with the current HashJoinOperator. So + // currently we disable the anchoring of filters to HashJoin nodes. That is, + // in the case that a FilterJoin's parent node (or ancestor node, if there + // is a chain of FilterJoins) is a HashJoin, we create an extra Selection + // before the parent HashJoin as anchoring node to attach the filters. This + // guarantees non-degrading performance. + /* + case P::PhysicalType::kHashJoin: { + const P::HashJoinPtr &hash_join = + std::static_pointer_cast<const P::HashJoin>(input); + new_children.emplace_back( + addFilterAnchors(hash_join->left(), true)); + new_children.emplace_back( + addFilterAnchors(hash_join->right(), false)); + break; + } + */ + case P::PhysicalType::kFilterJoin: { + const P::FilterJoinPtr &filter_join = + std::static_pointer_cast<const P::FilterJoin>(input); + new_children.emplace_back( + addFilterAnchors(filter_join->left(), true)); + new_children.emplace_back( + addFilterAnchors(filter_join->right(), true)); + break; + } + default: { + for (const P::PhysicalPtr &child : input->children()) { + new_children.emplace_back(addFilterAnchors(child, false)); + } + } + } + + DCHECK_EQ(new_children.size(), input->children().size()); + const P::PhysicalPtr output_with_new_children = + new_children == input->children() + ? input + : input->copyWithNewChildren(new_children); + + if (input->getPhysicalType() == P::PhysicalType::kFilterJoin && + !ancestor_can_anchor_filter) { + const P::FilterJoinPtr &filter_join = + std::static_pointer_cast<const P::FilterJoin>(output_with_new_children); + return P::Selection::Create(filter_join, + filter_join->project_expressions(), + nullptr); + } else { + return output_with_new_children; + } +} + +void InjectJoinFilters::concretizeAsLIPFilters( + const P::PhysicalPtr &input, + const P::PhysicalPtr &anchor_node) const { + switch (input->getPhysicalType()) { + case P::PhysicalType::kAggregate: { + const P::AggregatePtr &aggregate = + std::static_pointer_cast<const P::Aggregate>(input); + concretizeAsLIPFilters(aggregate->input(), aggregate); + break; + } + case P::PhysicalType::kSelection: { + const P::SelectionPtr &selection = + std::static_pointer_cast<const P::Selection>(input); + concretizeAsLIPFilters(selection->input(), selection); + break; + } + // Currently we disable the attachment of filters to HashJoin nodes. See the + // comments in InjectJoinFilters::addFilterAnchors(). + /* + case P::PhysicalType::kHashJoin: { + const P::HashJoinPtr &hash_join = + std::static_pointer_cast<const P::HashJoin>(input); + concretizeAsLIPFilters(hash_join->left(), hash_join); + concretizeAsLIPFilters(hash_join->right(), nullptr); + break; + } + */ + case P::PhysicalType::kFilterJoin: { + const P::FilterJoinPtr &filter_join = + std::static_pointer_cast<const P::FilterJoin>(input); + DCHECK_EQ(1u, filter_join->build_attributes().size()); + const E::AttributeReferencePtr &build_attr = + filter_join->build_attributes().front(); + + std::int64_t min_cpp_value; + std::int64_t max_cpp_value; + const bool has_exact_min_max_stats = + findExactMinMaxValuesForAttributeHelper(filter_join, + build_attr, + &min_cpp_value, + &max_cpp_value); + DCHECK(has_exact_min_max_stats); + DCHECK_GE(max_cpp_value, min_cpp_value); + DCHECK_LE(max_cpp_value - min_cpp_value, kMaxFilterSize); + CHECK(anchor_node != nullptr); + + lip_filter_configuration_->addBuildInfo( + P::BitVectorExactFilterBuildInfo::Create(build_attr, + min_cpp_value, + max_cpp_value, + filter_join->is_anti_join()), + filter_join); + lip_filter_configuration_->addProbeInfo( + P::LIPFilterProbeInfo::Create(filter_join->probe_attributes().front(), + build_attr, + filter_join), + anchor_node); + + concretizeAsLIPFilters(filter_join->left(), anchor_node); + concretizeAsLIPFilters(filter_join->right(), filter_join); + break; + } + default: { + for (const P::PhysicalPtr &child : input->children()) { + concretizeAsLIPFilters(child, nullptr); + } + } + } +} + +bool InjectJoinFilters::findExactMinMaxValuesForAttributeHelper( + const physical::PhysicalPtr &physical_plan, + const expressions::AttributeReferencePtr &attribute, + std::int64_t *min_cpp_value, + std::int64_t *max_cpp_value) const { + bool min_value_is_exact; + bool max_value_is_exact; + + const TypedValue min_value = + cost_model_->findMinValueStat(physical_plan, attribute, &min_value_is_exact); + const TypedValue max_value = + cost_model_->findMaxValueStat(physical_plan, attribute, &max_value_is_exact); + if (min_value.isNull() || max_value.isNull() || + (!min_value_is_exact) || (!max_value_is_exact)) { + return false; + } + + switch (attribute->getValueType().getTypeID()) { + case TypeID::kInt: { + *min_cpp_value = min_value.getLiteral<int>(); + *max_cpp_value = max_value.getLiteral<int>(); + return true; + } + case TypeID::kLong: { + *min_cpp_value = min_value.getLiteral<std::int64_t>(); + *max_cpp_value = max_value.getLiteral<std::int64_t>(); + return true; + } + default: + return false; + } +} + +} // namespace optimizer +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/rules/InjectJoinFilters.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/InjectJoinFilters.hpp b/query_optimizer/rules/InjectJoinFilters.hpp new file mode 100644 index 0000000..c5250b3 --- /dev/null +++ b/query_optimizer/rules/InjectJoinFilters.hpp @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_ +#define QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_ + +#include <cstdint> +#include <memory> +#include <string> + +#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" +#include "query_optimizer/expressions/AttributeReference.hpp" +#include "query_optimizer/physical/LIPFilterConfiguration.hpp" +#include "query_optimizer/physical/FilterJoin.hpp" +#include "query_optimizer/physical/HashJoin.hpp" +#include "query_optimizer/physical/Physical.hpp" +#include "query_optimizer/rules/Rule.hpp" +#include "utility/Macros.hpp" + +namespace quickstep { +namespace optimizer { + +/** \addtogroup OptimizerRules + * @{ + */ + +/** + * @brief Rule that applies to a physical plan to transform HashJoin nodes into + * FilterJoin nodes. + * + * This is an optimization that strength-reduces HashJoins to FilterJoins + * (implemented as LIPFilters attached to some anchoring operators where the + * filters get applied). Briefly speaking, the idea is that in the case that + * (1) the join attribute has consecutive integer values bounded in a reasonably + * small range AND (2) the output attributes are all from the probe-side table, + * we can eliminate the HashJoin by building a BitVector on the build-side + * attribute and using the BitVector to filter the probe-side table. + */ +class InjectJoinFilters : public Rule<physical::Physical> { + public: + /** + * @brief Constructor. + */ + InjectJoinFilters() {} + + ~InjectJoinFilters() override {} + + std::string getName() const override { + return "TransformFilterJoins"; + } + + physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override; + + private: + // Check whether a HashJoin can be transformed into a FilterJoin. + bool isTransformable(const physical::HashJoinPtr &hash_join) const; + + // Transform applicable HashJoin nodes into FilterJoin nodes. + physical::PhysicalPtr transformHashJoinToFilters( + const physical::PhysicalPtr &input) const; + + // Push down FilterJoin nodes to be evaluated early. + physical::PhysicalPtr pushDownFilters(const physical::PhysicalPtr &input) const; + + // Add Selection node, if necessary, for anchoring the LIP filters built by + // FilterJoin nodes. + physical::PhysicalPtr addFilterAnchors(const physical::PhysicalPtr &input, + const bool ancestor_can_anchor_filter) const; + + // Setup lip_filter_configuration_ with the transformed plan tree. + void concretizeAsLIPFilters(const physical::PhysicalPtr &input, + const physical::PhysicalPtr &anchor_node) const; + + physical::PhysicalPtr pushDownFiltersInternal( + const physical::PhysicalPtr &probe_child, + const physical::PhysicalPtr &build_child, + const physical::FilterJoinPtr &filter_join) const; + + bool findExactMinMaxValuesForAttributeHelper( + const physical::PhysicalPtr &physical_plan, + const expressions::AttributeReferencePtr &attribute, + std::int64_t *min_cpp_value, + std::int64_t *max_cpp_value) const; + + std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_; + std::unique_ptr<physical::LIPFilterConfiguration> lip_filter_configuration_; + + // TODO(jianqiao): Add this threshold as a gflag. + // Note that 1G bits = 128MB + static constexpr std::int64_t kMaxFilterSize = 1000000000L; + + DISALLOW_COPY_AND_ASSIGN(InjectJoinFilters); +}; + +/** @} */ + +} // namespace optimizer +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/tests/OptimizerTextTest.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/OptimizerTextTest.cpp b/query_optimizer/tests/OptimizerTextTest.cpp index e17f5c4..07accf6 100644 --- a/query_optimizer/tests/OptimizerTextTest.cpp +++ b/query_optimizer/tests/OptimizerTextTest.cpp @@ -34,6 +34,7 @@ namespace optimizer { DECLARE_bool(reorder_columns); DECLARE_bool(reorder_hash_joins); DECLARE_bool(use_lip_filters); +DECLARE_bool(use_filter_joins); } } @@ -64,6 +65,7 @@ int main(int argc, char** argv) { quickstep::optimizer::FLAGS_reorder_columns = false; quickstep::optimizer::FLAGS_reorder_hash_joins = false; quickstep::optimizer::FLAGS_use_lip_filters = false; + quickstep::optimizer::FLAGS_use_filter_joins = false; ::testing::InitGoogleTest(&argc, argv); int success = RUN_ALL_TESTS(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/BuildLIPFilterOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildLIPFilterOperator.cpp b/relational_operators/BuildLIPFilterOperator.cpp new file mode 100644 index 0000000..f7c09cd --- /dev/null +++ b/relational_operators/BuildLIPFilterOperator.cpp @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "relational_operators/BuildLIPFilterOperator.hpp" + +#include <memory> +#include <vector> + +#include "catalog/CatalogRelation.hpp" +#include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" +#include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" +#include "storage/StorageBlock.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageManager.hpp" +#include "storage/TupleIdSequence.hpp" +#include "storage/TupleStorageSubBlock.hpp" +#include "storage/ValueAccessor.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" +#include "utility/lip_filter/LIPFilterBuilder.hpp" +#include "utility/lip_filter/LIPFilterUtil.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace quickstep { + +bool BuildLIPFilterOperator::getAllWorkOrders( + WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) { + DCHECK(query_context != nullptr); + + const Predicate *build_side_predicate = + query_context->getPredicate(build_side_predicate_index_); + + if (input_relation_is_stored_) { + if (!started_) { + for (const block_id input_block_id : input_relation_block_ids_) { + container->addNormalWorkOrder( + new BuildLIPFilterWorkOrder( + query_id_, + input_relation_, + input_block_id, + build_side_predicate, + storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), + CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)), + op_index_); + } + started_ = true; + } + return true; + } else { + while (num_workorders_generated_ < input_relation_block_ids_.size()) { + container->addNormalWorkOrder( + new BuildLIPFilterWorkOrder( + query_id_, + input_relation_, + input_relation_block_ids_[num_workorders_generated_], + build_side_predicate, + storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), + CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)), + op_index_); + ++num_workorders_generated_; + } + return done_feeding_input_relation_; + } +} + +bool BuildLIPFilterOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (input_relation_is_stored_) { + if (!started_) { + for (const block_id block : input_relation_block_ids_) { + container->addWorkOrderProto(createWorkOrderProto(block), op_index_); + } + started_ = true; + } + return true; + } else { + while (num_workorders_generated_ < input_relation_block_ids_.size()) { + container->addWorkOrderProto( + createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]), + op_index_); + ++num_workorders_generated_; + } + return done_feeding_input_relation_; + } +} + +serialization::WorkOrder* BuildLIPFilterOperator::createWorkOrderProto(const block_id block) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::BUILD_LIP_FILTER); + proto->set_query_id(query_id_); + + proto->SetExtension(serialization::BuildLIPFilterWorkOrder::relation_id, input_relation_.getID()); + proto->SetExtension(serialization::BuildLIPFilterWorkOrder::build_block_id, block); + proto->SetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index, + build_side_predicate_index_); + proto->SetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index, lip_deployment_index_); + + return proto; +} + +void BuildLIPFilterWorkOrder::execute() { + BlockReference block( + storage_manager_->getBlock(build_block_id_, input_relation_)); + + // Apply the predicate first. + std::unique_ptr<TupleIdSequence> predicate_matches; + if (build_side_predicate_ != nullptr) { + predicate_matches.reset(block->getMatchesForPredicate(build_side_predicate_)); + } + + std::unique_ptr<ValueAccessor> accessor( + block->getTupleStorageSubBlock().createValueAccessor(predicate_matches.get())); + + if (lip_filter_adaptive_prober_ != nullptr) { + // Probe the LIP filters if there are any. Note that the LIP filters to be + // probed are for filtering the input relation. They are distinct from the + // target LIP filters we are building. + std::unique_ptr<TupleIdSequence> matches( + lip_filter_adaptive_prober_->filterValueAccessor(accessor.get())); + std::unique_ptr<ValueAccessor> filtered_accessor( + accessor->createSharedTupleIdSequenceAdapterVirtual(*matches)); + + lip_filter_builder_->insertValueAccessor(filtered_accessor.get()); + } else { + lip_filter_builder_->insertValueAccessor(accessor.get()); + } +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/BuildLIPFilterOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildLIPFilterOperator.hpp b/relational_operators/BuildLIPFilterOperator.hpp new file mode 100644 index 0000000..5192b40 --- /dev/null +++ b/relational_operators/BuildLIPFilterOperator.hpp @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_ +#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_ + +#include <cstddef> +#include <memory> +#include <string> +#include <vector> + +#include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/QueryContext.hpp" +#include "relational_operators/RelationalOperator.hpp" +#include "relational_operators/WorkOrder.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" +#include "utility/lip_filter/LIPFilterBuilder.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + +namespace quickstep { + +class CatalogRelationSchema; +class Predicate; +class StorageManager; +class WorkOrderProtosContainer; +class WorkOrdersContainer; + +namespace serialization { class WorkOrder; } + +/** \addtogroup RelationalOperators + * @{ + */ + +/** + * @brief An operator which builds LIPFilters on one relation. + **/ +class BuildLIPFilterOperator : public RelationalOperator { + public: + /** + * @brief Constructor. + * + * @note The LIPFilters' information are not passed explicitly as parameters + * to this constructor, but attached later via RelationalOperator::deployLIPFilters(). + * + * @param query_id The ID of the query to which this operator belongs. + * @param input_relation The relation to build LIP filters on. + * @param build_side_predicate_index The index of the predicate in QueryContext + * where the predicate is to be applied to the input relation before + * building the LIP filters (or kInvalidPredicateId if no predicate is + * to be applied). + * @param input_relation_is_stored If input_relation is a stored relation and + * is fully available to the operator before it can start generating + * workorders. + **/ + BuildLIPFilterOperator(const std::size_t query_id, + const CatalogRelation &input_relation, + const QueryContext::predicate_id build_side_predicate_index, + const bool input_relation_is_stored) + : RelationalOperator(query_id), + input_relation_(input_relation), + build_side_predicate_index_(build_side_predicate_index), + input_relation_is_stored_(input_relation_is_stored), + input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot() + : std::vector<block_id>()), + num_workorders_generated_(0), + started_(false) {} + + ~BuildLIPFilterOperator() override {} + + /** + * @return The input relation to build LIP filters on. + */ + const CatalogRelation& input_relation() const { + return input_relation_; + } + + std::string getName() const override { + return "BuildLIPFilterOperator"; + } + + bool getAllWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) override; + + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + + void feedInputBlock(const block_id input_block_id, + const relation_id input_relation_id, + const partition_id part_id) override { + input_relation_block_ids_.push_back(input_block_id); + } + + private: + /** + * @brief Create Work Order proto. + * + * @param block The block id used in the Work Order. + **/ + serialization::WorkOrder* createWorkOrderProto(const block_id block); + + const CatalogRelation &input_relation_; + const QueryContext::predicate_id build_side_predicate_index_; + const bool input_relation_is_stored_; + + std::vector<block_id> input_relation_block_ids_; + std::vector<block_id>::size_type num_workorders_generated_; + + bool started_; + + DISALLOW_COPY_AND_ASSIGN(BuildLIPFilterOperator); +}; + +/** + * @brief A WorkOrder produced by BuildLIPFilterOperator. + **/ +class BuildLIPFilterWorkOrder : public WorkOrder { + public: + /** + * @brief Constructor. + * + * @param query_id The ID of the query to which this WorkOrder belongs. + * @param input_relation The relation to build LIP filters on. + * @param build_block_id The block id. + * @param build_side_predicate The predicate to be applied to filter the input + * relation before building the LIP filters (or nullptr if no predicate + * is to be applied). + * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. + * @param lip_filter_builder The attached LIP filter builder. + **/ + BuildLIPFilterWorkOrder(const std::size_t query_id, + const CatalogRelationSchema &input_relation, + const block_id build_block_id, + const Predicate *build_side_predicate, + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober, + LIPFilterBuilder *lip_filter_builder) + : WorkOrder(query_id), + input_relation_(input_relation), + build_block_id_(build_block_id), + build_side_predicate_(build_side_predicate), + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober), + lip_filter_builder_(DCHECK_NOTNULL(lip_filter_builder)) {} + + ~BuildLIPFilterWorkOrder() override {} + + /** + * @return The input relation to build LIP filters on. + */ + const CatalogRelationSchema& input_relation() const { + return input_relation_; + } + + void execute() override; + + private: + const CatalogRelationSchema &input_relation_; + const block_id build_block_id_; + const Predicate *build_side_predicate_; + + StorageManager *storage_manager_; + + std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_; + std::unique_ptr<LIPFilterBuilder> lip_filter_builder_; + + DISALLOW_COPY_AND_ASSIGN(BuildLIPFilterWorkOrder); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index c8447f3..c18dc77 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -34,6 +34,7 @@ set_gflags_lib_name () # Declare micro-libs: add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp) add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp) +add_library(quickstep_relationaloperators_BuildLIPFilterOperator BuildLIPFilterOperator.cpp BuildLIPFilterOperator.hpp) add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp) add_library(quickstep_relationaloperators_CreateTableOperator CreateTableOperator.cpp CreateTableOperator.hpp) add_library(quickstep_relationaloperators_DestroyAggregationStateOperator @@ -113,6 +114,27 @@ target_link_libraries(quickstep_relationaloperators_BuildHashOperator quickstep_utility_lipfilter_LIPFilterBuilder quickstep_utility_lipfilter_LIPFilterUtil tmb) +target_link_libraries(quickstep_relationaloperators_BuildLIPFilterOperator + glog + quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogTypedefs + quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer + quickstep_queryexecution_WorkOrdersContainer + quickstep_relationaloperators_RelationalOperator + quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto + quickstep_storage_StorageBlock + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageManager + quickstep_storage_TupleIdSequence + quickstep_storage_TupleStorageSubBlock + quickstep_storage_ValueAccessor + quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilterAdaptiveProber + quickstep_utility_lipfilter_LIPFilterBuilder + quickstep_utility_lipfilter_LIPFilterUtil + tmb) target_link_libraries(quickstep_relationaloperators_CreateIndexOperator glog quickstep_catalog_CatalogRelation @@ -483,6 +505,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory quickstep_queryexecution_QueryContext quickstep_relationaloperators_AggregationOperator quickstep_relationaloperators_BuildHashOperator + quickstep_relationaloperators_BuildLIPFilterOperator quickstep_relationaloperators_DeleteOperator quickstep_relationaloperators_DestroyAggregationStateOperator quickstep_relationaloperators_DestroyHashOperator @@ -515,6 +538,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrder_proto add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp) target_link_libraries(quickstep_relationaloperators quickstep_relationaloperators_AggregationOperator + quickstep_relationaloperators_BuildLIPFilterOperator quickstep_relationaloperators_BuildHashOperator quickstep_relationaloperators_CreateIndexOperator quickstep_relationaloperators_CreateTableOperator http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index f8d9246..76753d2 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -24,25 +24,26 @@ import "relational_operators/SortMergeRunOperator.proto"; enum WorkOrderType { AGGREGATION = 1; BUILD_HASH = 2; - CREATE_INDEX = 3; // Placeholder. - CREATE_TABLE = 4; // Placeholder. - DELETE = 5; - DESTROY_HASH = 6; - DROP_TABLE = 7; - FINALIZE_AGGREGATION = 8; - HASH_JOIN = 9; - INSERT = 10; - NESTED_LOOP_JOIN = 11; - SAMPLE = 12; - SAVE_BLOCKS = 13; - SELECT = 14; - SORT_MERGE_RUN = 15; - SORT_RUN_GENERATION = 16; - TABLE_GENERATOR = 17; - TEXT_SCAN = 18; - UPDATE = 19; - WINDOW_AGGREGATION = 20; - DESTROY_AGGREGATION_STATE = 21; + BUILD_LIP_FILTER = 3; + CREATE_INDEX = 4; // Placeholder. + CREATE_TABLE = 5; // Placeholder. + DELETE = 6; + DESTROY_HASH = 7; + DROP_TABLE = 8; + FINALIZE_AGGREGATION = 9; + HASH_JOIN = 10; + INSERT = 11; + NESTED_LOOP_JOIN = 12; + SAMPLE = 13; + SAVE_BLOCKS = 14; + SELECT = 15; + SORT_MERGE_RUN = 16; + SORT_RUN_GENERATION = 17; + TABLE_GENERATOR = 18; + TEXT_SCAN = 19; + UPDATE = 20; + WINDOW_AGGREGATION = 21; + DESTROY_AGGREGATION_STATE = 22; } message WorkOrder { @@ -77,6 +78,16 @@ message BuildHashWorkOrder { } } +message BuildLIPFilterWorkOrder { + extend WorkOrder { + // All required. + optional int32 relation_id = 48; + optional fixed64 build_block_id = 49; + optional int32 build_side_predicate_index = 50; + optional int32 lip_deployment_index = 51; + } +} + message DeleteWorkOrder { extend WorkOrder { // All required. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index a6cba02..5e8d03d 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -30,6 +30,7 @@ #include "query_execution/QueryContext.hpp" #include "relational_operators/AggregationOperator.hpp" #include "relational_operators/BuildHashOperator.hpp" +#include "relational_operators/BuildLIPFilterOperator.hpp" #include "relational_operators/DeleteOperator.hpp" #include "relational_operators/DestroyAggregationStateOperator.hpp" #include "relational_operators/DestroyHashOperator.hpp" @@ -90,6 +91,23 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder CreateLIPFilterAdaptiveProberHelper( proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context)); } + case serialization::BUILD_LIP_FILTER: { + LOG(INFO) << "Creating BuildLIPFilterWorkOrder in Shiftboss " << shiftboss_index; + + const QueryContext::lip_deployment_id lip_deployment_index = + proto.GetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index); + + return new BuildLIPFilterWorkOrder( + proto.query_id(), + catalog_database->getRelationSchemaById( + proto.GetExtension(serialization::BuildLIPFilterWorkOrder::relation_id)), + proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_block_id), + query_context->getPredicate( + proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index)), + storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index, query_context), + CreateLIPFilterBuilderHelper(lip_deployment_index, query_context)); + } case serialization::BUILD_HASH: { LOG(INFO) << "Creating BuildHashWorkOrder in Shiftboss " << shiftboss_index; vector<attribute_id> join_key_attributes; @@ -541,6 +559,33 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index), proto.GetExtension(serialization::BuildHashWorkOrder::partition_id)); } + case serialization::BUILD_LIP_FILTER: { + if (!proto.HasExtension(serialization::BuildLIPFilterWorkOrder::relation_id)) { + return false; + } + + const relation_id rel_id = + proto.GetExtension(serialization::BuildLIPFilterWorkOrder::relation_id); + if (!catalog_database.hasRelationWithId(rel_id)) { + return false; + } + + if (!proto.HasExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index)) { + return false; + } else { + const QueryContext::lip_deployment_id lip_deployment_index = + proto.GetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index); + if (lip_deployment_index != QueryContext::kInvalidLIPDeploymentId && + !query_context.isValidLIPDeploymentId(lip_deployment_index)) { + return false; + } + } + + return proto.HasExtension(serialization::BuildLIPFilterWorkOrder::build_block_id) && + proto.HasExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index) && + query_context.isValidPredicate( + proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index)); + } case serialization::DELETE: { return proto.HasExtension(serialization::DeleteWorkOrder::relation_id) && catalog_database.hasRelationWithId( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt index 8571149..aeff388 100644 --- a/utility/CMakeLists.txt +++ b/utility/CMakeLists.txt @@ -270,6 +270,7 @@ target_link_libraries(quickstep_utility_PlanVisualizer quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel quickstep_queryoptimizer_expressions_AttributeReference quickstep_queryoptimizer_expressions_ExprId + quickstep_queryoptimizer_physical_FilterJoin quickstep_queryoptimizer_physical_HashJoin quickstep_queryoptimizer_physical_LIPFilterConfiguration quickstep_queryoptimizer_physical_Physical http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/PlanVisualizer.cpp ---------------------------------------------------------------------- diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp index df7a20c..f8bf6f8 100644 --- a/utility/PlanVisualizer.cpp +++ b/utility/PlanVisualizer.cpp @@ -32,6 +32,7 @@ #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" #include "query_optimizer/expressions/AttributeReference.hpp" #include "query_optimizer/expressions/ExprId.hpp" +#include "query_optimizer/physical/FilterJoin.hpp" #include "query_optimizer/physical/HashJoin.hpp" #include "query_optimizer/physical/Physical.hpp" #include "query_optimizer/physical/PhysicalType.hpp" @@ -58,6 +59,8 @@ std::string PlanVisualizer::visualize(const P::PhysicalPtr &input) { color_map_["TableReference"] = "skyblue"; color_map_["Selection"] = "#90EE90"; + color_map_["FilterJoin"] = "pink"; + color_map_["FilterJoin(Anti)"] = "pink"; color_map_["HashJoin"] = "red"; color_map_["HashLeftOuterJoin"] = "orange"; color_map_["HashLeftSemiJoin"] = "orange"; @@ -126,7 +129,8 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) { edge_info.dst_node_id = node_id; edge_info.dashed = false; - if (input->getPhysicalType() == P::PhysicalType::kHashJoin && + if ((input->getPhysicalType() == P::PhysicalType::kHashJoin || + input->getPhysicalType() == P::PhysicalType::kFilterJoin) && child == input->children()[1]) { edge_info.dashed = true; } @@ -165,6 +169,20 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) { } break; } + case P::PhysicalType::kFilterJoin: { + const P::FilterJoinPtr filter_join = + std::static_pointer_cast<const P::FilterJoin>(input); + node_info.labels.emplace_back(input->getName()); + + const auto &probe_attributes = filter_join->probe_attributes(); + const auto &build_attributes = filter_join->build_attributes(); + for (std::size_t i = 0; i < probe_attributes.size(); ++i) { + node_info.labels.emplace_back( + probe_attributes[i]->attribute_alias() + " = " + + build_attributes[i]->attribute_alias()); + } + break; + } default: { node_info.labels.emplace_back(input->getName()); break; @@ -177,7 +195,7 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) { if (build_it != build_filters.end()) { for (const auto &build_info : build_it->second) { node_info.labels.emplace_back( - std::string("[LIP build] ") + build_info.build_attribute->attribute_alias()); + std::string("[LIP build] ") + build_info->build_attribute()->attribute_alias()); } } const auto &probe_filters = lip_filter_conf_->getProbeInfoMap(); @@ -185,7 +203,7 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) { if (probe_it != probe_filters.end()) { for (const auto &probe_info : probe_it->second) { node_info.labels.emplace_back( - std::string("[LIP probe] ") + probe_info.probe_attribute->attribute_alias()); + std::string("[LIP probe] ") + probe_info->probe_attribute()->attribute_alias()); } } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/BitVectorExactFilter.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp new file mode 100644 index 0000000..6ad0567 --- /dev/null +++ b/utility/lip_filter/BitVectorExactFilter.hpp @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_ +#define QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_ + +#include <atomic> +#include <cstdint> +#include <cstring> +#include <vector> + +#include "catalog/CatalogTypedefs.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageConstants.hpp" +#include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorUtil.hpp" +#include "types/Type.hpp" +#include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilter.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +/** \addtogroup Utility + * @{ + */ + +/** + * @brief A LIP filter that tests the EXACT memberships of elements, i.e. there + * will be neither false positives nor false negatives. The implementation + * is to simply reinterpret_cast a value's byte stream into the specified + * CppType as the underlying bit vector's index. Therefore, to use this + * filter, the corresponding LIP attribute's values must be bounded in a + * reasonably small integer range. + */ +template <typename CppType, bool is_anti_filter> +class BitVectorExactFilter : public LIPFilter { + public: + /** + * @brief Constructor. + * + * @param min_value The minimum possible value for this filter to set. + * @param max_value The maximum possible value for this filter to set. + */ + explicit BitVectorExactFilter(const std::int64_t min_value, + const std::int64_t max_value) + : LIPFilter(LIPFilterType::kBitVectorExactFilter), + min_value_(static_cast<CppType>(min_value)), + max_value_(static_cast<CppType>(max_value)), + bit_array_(GetByteSize(max_value - min_value + 1)) { + DCHECK_EQ(min_value, static_cast<std::int64_t>(min_value_)); + DCHECK_EQ(max_value, static_cast<std::int64_t>(max_value_)); + DCHECK_GE(max_value_, min_value_); + + std::memset(bit_array_.data(), + 0x0, + sizeof(std::atomic<std::uint8_t>) * GetByteSize(max_value - min_value + 1)); + } + + void insertValueAccessor(ValueAccessor *accessor, + const attribute_id attr_id, + const Type *attr_type) override { + InvokeOnAnyValueAccessor( + accessor, + [&](auto *accessor) -> void { // NOLINT(build/c++11) + if (attr_type->isNullable()) { + this->insertValueAccessorInternal<true>(accessor, attr_id); + } else { + this->insertValueAccessorInternal<false>(accessor, attr_id); + } + }); + } + + std::size_t filterBatch(ValueAccessor *accessor, + const attribute_id attr_id, + const bool is_attr_nullable, + std::vector<tuple_id> *batch, + const std::size_t batch_size) const override { + DCHECK(batch != nullptr); + DCHECK_LE(batch_size, batch->size()); + + return InvokeOnAnyValueAccessor( + accessor, + [&](auto *accessor) -> std::size_t { // NOLINT(build/c++11) + if (is_attr_nullable) { + return this->filterBatchInternal<true>(accessor, attr_id, batch, batch_size); + } else { + return this->filterBatchInternal<false>(accessor, attr_id, batch, batch_size); + } + }); + } + + private: + /** + * @brief Round up bit_size to multiples of 8. + */ + inline static std::size_t GetByteSize(const std::size_t bit_size) { + return (bit_size + 7u) / 8u; + } + + /** + * @brief Iterate through the accessor and hash values into the internal bit + * array. + */ + template <bool is_attr_nullable, typename ValueAccessorT> + inline void insertValueAccessorInternal(ValueAccessorT *accessor, + const attribute_id attr_id) { + accessor->beginIteration(); + while (accessor->next()) { + const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id); + if (!is_attr_nullable || value != nullptr) { + insert(value); + } + } + } + + /** + * @brief Filter the given batch of tuples from a ValueAccessor. Write the + * tuple ids which survive in the filtering back to \p batch. + */ + template <bool is_attr_nullable, typename ValueAccessorT> + inline std::size_t filterBatchInternal(const ValueAccessorT *accessor, + const attribute_id attr_id, + std::vector<tuple_id> *batch, + const std::size_t batch_size) const { + std::size_t out_size = 0; + for (std::size_t i = 0; i < batch_size; ++i) { + const tuple_id tid = batch->at(i); + const void *value = + accessor->template getUntypedValueAtAbsolutePosition(attr_id, tid); + if (is_attr_nullable && value == nullptr) { + continue; + } + if (contains(value)) { + batch->at(out_size) = tid; + ++out_size; + } + } + return out_size; + } + + /** + * @brief Inserts a given value into the exact filter. + */ + inline void insert(const void *key_begin) { + const CppType value = *reinterpret_cast<const CppType *>(key_begin); + DCHECK_GE(value, min_value_); + DCHECK_LE(value, max_value_); + + const CppType loc = value - min_value_; + bit_array_[loc >> 3u].fetch_or(1u << (loc & 7u), std::memory_order_relaxed); + } + + /** + * @brief Test membership of a given value in the exact filter. + */ + inline bool contains(const void *key_begin) const { + const CppType value = *reinterpret_cast<const CppType *>(key_begin); + if (value < min_value_ || value > max_value_) { + return is_anti_filter; + } + + const CppType loc = value - min_value_; + const bool is_bit_set = + (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc & 7u))) != 0; + + if (is_anti_filter) { + return !is_bit_set; + } else { + return is_bit_set; + } + } + + const CppType min_value_; + const CppType max_value_; + alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_; + + DISALLOW_COPY_AND_ASSIGN(BitVectorExactFilter); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt index 23b3763..edd0d24 100644 --- a/utility/lip_filter/CMakeLists.txt +++ b/utility/lip_filter/CMakeLists.txt @@ -20,6 +20,7 @@ QS_PROTOBUF_GENERATE_CPP(utility_lipfilter_LIPFilter_proto_srcs LIPFilter.proto) # Declare micro-libs: +add_library(quickstep_utility_lipfilter_BitVectorExactFilter ../../empty_src.cpp BitVectorExactFilter.hpp) add_library(quickstep_utility_lipfilter_LIPFilter ../../empty_src.cpp LIPFilter.hpp) add_library(quickstep_utility_lipfilter_LIPFilterAdaptiveProber ../../empty_src.cpp LIPFilterAdaptiveProber.hpp) add_library(quickstep_utility_lipfilter_LIPFilterBuilder ../../empty_src.cpp LIPFilterBuilder.hpp) @@ -31,6 +32,15 @@ add_library(quickstep_utility_lipfilter_LIPFilter_proto add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src.cpp SingleIdentityHashFilter.hpp) # Link dependencies: +target_link_libraries(quickstep_utility_lipfilter_BitVectorExactFilter + quickstep_catalog_CatalogTypedefs + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageConstants + quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorUtil + quickstep_types_Type + quickstep_utility_lipfilter_LIPFilter + quickstep_utility_Macros) target_link_libraries(quickstep_utility_lipfilter_LIPFilter quickstep_catalog_CatalogTypedefs quickstep_storage_StorageBlockInfo @@ -56,6 +66,7 @@ target_link_libraries(quickstep_utility_lipfilter_LIPFilterDeployment quickstep_utility_lipfilter_LIPFilterBuilder quickstep_utility_lipfilter_LIPFilter_proto) target_link_libraries(quickstep_utility_lipfilter_LIPFilterFactory + quickstep_utility_lipfilter_BitVectorExactFilter quickstep_utility_lipfilter_LIPFilter_proto quickstep_utility_lipfilter_SingleIdentityHashFilter quickstep_utility_Macros) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilter.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilter.hpp b/utility/lip_filter/LIPFilter.hpp index 682d69f..ba38264 100644 --- a/utility/lip_filter/LIPFilter.hpp +++ b/utility/lip_filter/LIPFilter.hpp @@ -37,8 +37,8 @@ class ValueAccessor; */ enum class LIPFilterType { + kBitVectorExactFilter, kBloomFilter, - kExactFilter, kSingleIdentityHashFilter }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilter.proto ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilter.proto b/utility/lip_filter/LIPFilter.proto index def13dd..45843f3 100644 --- a/utility/lip_filter/LIPFilter.proto +++ b/utility/lip_filter/LIPFilter.proto @@ -22,8 +22,8 @@ package quickstep.serialization; import "types/Type.proto"; enum LIPFilterType { - BLOOM_FILTER = 1; - EXACT_FILTER = 2; + BIT_VECTOR_EXACT_FILTER = 1; + BLOOM_FILTER = 2; SINGLE_IDENTITY_HASH_FILTER = 3; } @@ -33,17 +33,22 @@ message LIPFilter { extensions 16 to max; } -message SingleIdentityHashFilter { +message BitVectorExactFilter { extend LIPFilter { // All required - optional uint64 filter_cardinality = 16; - optional uint64 attribute_size = 17; + optional int64 min_value = 16; + optional int64 max_value = 17; + optional uint64 attribute_size = 18; + optional bool is_anti_filter = 19; } } -enum LIPFilterActionType { - BUILD = 1; - PROBE = 2; +message SingleIdentityHashFilter { + extend LIPFilter { + // All required + optional uint64 filter_cardinality = 24; + optional uint64 attribute_size = 25; + } } message LIPFilterDeployment { @@ -53,6 +58,6 @@ message LIPFilterDeployment { required Type attribute_type = 3; } - required LIPFilterActionType action_type = 1; - repeated Entry entries = 2; + repeated Entry build_entries = 1; + repeated Entry probe_entries = 2; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilterDeployment.cpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterDeployment.cpp b/utility/lip_filter/LIPFilterDeployment.cpp index cd4d90f..5721496 100644 --- a/utility/lip_filter/LIPFilterDeployment.cpp +++ b/utility/lip_filter/LIPFilterDeployment.cpp @@ -28,45 +28,49 @@ #include "utility/lip_filter/LIPFilterBuilder.hpp" #include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" -#include "glog/logging.h" - namespace quickstep { LIPFilterDeployment::LIPFilterDeployment( const serialization::LIPFilterDeployment &proto, const std::vector<std::unique_ptr<LIPFilter>> &lip_filters) { - switch (proto.action_type()) { - case serialization::LIPFilterActionType::BUILD: - action_type_ = LIPFilterActionType::kBuild; - break; - case serialization::LIPFilterActionType::PROBE: - action_type_ = LIPFilterActionType::kProbe; - break; - default: - LOG(FATAL) << "Unsupported LIPFilterActionType: " - << serialization::LIPFilterActionType_Name(proto.action_type()); + if (proto.build_entries_size() > 0) { + build_.reset(new LIPFilterDeploymentInfo()); + for (int i = 0; i < proto.build_entries_size(); ++i) { + const auto &entry_proto = proto.build_entries(i); + build_->lip_filters_.emplace_back( + lip_filters.at(entry_proto.lip_filter_id()).get()); + build_->attr_ids_.emplace_back(entry_proto.attribute_id()); + build_->attr_types_.emplace_back( + &TypeFactory::ReconstructFromProto(entry_proto.attribute_type())); + } } - for (int i = 0; i < proto.entries_size(); ++i) { - const auto &entry_proto = proto.entries(i); - lip_filters_.emplace_back(lip_filters.at(entry_proto.lip_filter_id()).get()); - attr_ids_.emplace_back(entry_proto.attribute_id()); - attr_types_.emplace_back(&TypeFactory::ReconstructFromProto(entry_proto.attribute_type())); + if (proto.probe_entries_size() > 0) { + probe_.reset(new LIPFilterDeploymentInfo()); + for (int i = 0; i < proto.probe_entries_size(); ++i) { + const auto &entry_proto = proto.probe_entries(i); + probe_->lip_filters_.emplace_back( + lip_filters.at(entry_proto.lip_filter_id()).get()); + probe_->attr_ids_.emplace_back(entry_proto.attribute_id()); + probe_->attr_types_.emplace_back( + &TypeFactory::ReconstructFromProto(entry_proto.attribute_type())); + } } } bool LIPFilterDeployment::ProtoIsValid( const serialization::LIPFilterDeployment &proto) { - if (proto.action_type() != serialization::LIPFilterActionType::BUILD && - proto.action_type() != serialization::LIPFilterActionType::PROBE) { - LOG(FATAL) << "Unsupported LIPFilterActionType: " - << serialization::LIPFilterActionType_Name(proto.action_type()); - } - if (proto.entries_size() == 0) { + if (proto.build_entries_size() == 0 && proto.probe_entries_size() == 0) { return false; } - for (int i = 0; i < proto.entries_size(); ++i) { - const auto &entry_proto = proto.entries(i); + for (int i = 0; i < proto.build_entries_size(); ++i) { + const auto &entry_proto = proto.build_entries(i); + if (!TypeFactory::ProtoIsValid(entry_proto.attribute_type())) { + return false; + } + } + for (int i = 0; i < proto.probe_entries_size(); ++i) { + const auto &entry_proto = proto.probe_entries(i); if (!TypeFactory::ProtoIsValid(entry_proto.attribute_type())) { return false; } @@ -75,13 +79,23 @@ bool LIPFilterDeployment::ProtoIsValid( } LIPFilterBuilder* LIPFilterDeployment::createLIPFilterBuilder() const { - DCHECK(action_type_ == LIPFilterActionType::kBuild); - return new LIPFilterBuilder(lip_filters_, attr_ids_, attr_types_); + if (build_ == nullptr) { + return nullptr; + } + + return new LIPFilterBuilder(build_->lip_filters_, + build_->attr_ids_, + build_->attr_types_); } LIPFilterAdaptiveProber* LIPFilterDeployment::createLIPFilterAdaptiveProber() const { - DCHECK(action_type_ == LIPFilterActionType::kProbe); - return new LIPFilterAdaptiveProber(lip_filters_, attr_ids_, attr_types_); + if (probe_ == nullptr) { + return nullptr; + } + + return new LIPFilterAdaptiveProber(probe_->lip_filters_, + probe_->attr_ids_, + probe_->attr_types_); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilterDeployment.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterDeployment.hpp b/utility/lip_filter/LIPFilterDeployment.hpp index 9b37f88..ab1259b 100644 --- a/utility/lip_filter/LIPFilterDeployment.hpp +++ b/utility/lip_filter/LIPFilterDeployment.hpp @@ -39,11 +39,6 @@ class Type; * @{ */ -enum class LIPFilterActionType { - kBuild = 0, - kProbe -}; - /** * @brief Helper class for organizing a group of LIPFilters in the backend. * Each LIPFilterDeployment object is attached to a RelationalOperator. @@ -69,16 +64,6 @@ class LIPFilterDeployment { static bool ProtoIsValid(const serialization::LIPFilterDeployment &proto); /** - * @brief Get the action type for this group of LIPFilters (i.e. whether - * to build or probe the filters). - * - * @return The action type. - */ - LIPFilterActionType getActionType() const { - return action_type_; - } - - /** * @brief Create a LIPFilterBuilder for this group of LIPFilters. * * @return A new LIPFilterBuilder object for this group of LIPFilters. @@ -95,11 +80,14 @@ class LIPFilterDeployment { LIPFilterAdaptiveProber* createLIPFilterAdaptiveProber() const; private: - LIPFilterActionType action_type_; - - std::vector<LIPFilter *> lip_filters_; - std::vector<attribute_id> attr_ids_; - std::vector<const Type *> attr_types_; + struct LIPFilterDeploymentInfo { + std::vector<LIPFilter *> lip_filters_; + std::vector<attribute_id> attr_ids_; + std::vector<const Type *> attr_types_; + }; + + std::unique_ptr<LIPFilterDeploymentInfo> build_; + std::unique_ptr<LIPFilterDeploymentInfo> probe_; DISALLOW_COPY_AND_ASSIGN(LIPFilterDeployment); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilterFactory.cpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterFactory.cpp b/utility/lip_filter/LIPFilterFactory.cpp index ebc4a0e..f69d8b0 100644 --- a/utility/lip_filter/LIPFilterFactory.cpp +++ b/utility/lip_filter/LIPFilterFactory.cpp @@ -23,6 +23,7 @@ #include <cstdint> #include "utility/lip_filter/LIPFilter.pb.h" +#include "utility/lip_filter/BitVectorExactFilter.hpp" #include "utility/lip_filter/SingleIdentityHashFilter.hpp" #include "glog/logging.h" @@ -31,6 +32,46 @@ namespace quickstep { LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter &proto) { switch (proto.lip_filter_type()) { + case serialization::LIPFilterType::BIT_VECTOR_EXACT_FILTER: { + const std::size_t attr_size = + proto.GetExtension(serialization::BitVectorExactFilter::attribute_size); + const std::int64_t min_value = + proto.GetExtension(serialization::BitVectorExactFilter::min_value); + const std::int64_t max_value = + proto.GetExtension(serialization::BitVectorExactFilter::max_value); + const bool is_anti_filter = + proto.GetExtension(serialization::BitVectorExactFilter::is_anti_filter); + + switch (attr_size) { + case 1: + if (is_anti_filter) { + return new BitVectorExactFilter<std::int8_t, true>(min_value, max_value); + } else { + return new BitVectorExactFilter<std::int8_t, false>(min_value, max_value); + } + case 2: + if (is_anti_filter) { + return new BitVectorExactFilter<std::int16_t, true>(min_value, max_value); + } else { + return new BitVectorExactFilter<std::int16_t, false>(min_value, max_value); + } + case 4: + if (is_anti_filter) { + return new BitVectorExactFilter<std::int32_t, true>(min_value, max_value); + } else { + return new BitVectorExactFilter<std::int32_t, false>(min_value, max_value); + } + case 8: + if (is_anti_filter) { + return new BitVectorExactFilter<std::int64_t, true>(min_value, max_value); + } else { + return new BitVectorExactFilter<std::int64_t, false>(min_value, max_value); + } + default: + LOG(FATAL) << "Invalid attribute size for BitVectorExactFilter: " + << attr_size; + } + } case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: { const std::size_t attr_size = proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size); @@ -57,6 +98,15 @@ LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter bool LIPFilterFactory::ProtoIsValid(const serialization::LIPFilter &proto) { switch (proto.lip_filter_type()) { + case serialization::LIPFilterType::BIT_VECTOR_EXACT_FILTER: { + const std::size_t attr_size = + proto.GetExtension(serialization::BitVectorExactFilter::attribute_size); + const std::int64_t min_value = + proto.GetExtension(serialization::BitVectorExactFilter::min_value); + const std::int64_t max_value = + proto.GetExtension(serialization::BitVectorExactFilter::max_value); + return (attr_size != 0 && max_value >= min_value); + } case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: { const std::size_t attr_size = proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size);