Add backend support for LIPFilters.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/96ef3507 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/96ef3507 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/96ef3507 Branch: refs/heads/lip-refactor-backend Commit: 96ef3507128563473c083364f7e14a7e414bbe44 Parents: 9c32ea4 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Wed Sep 7 13:20:43 2016 -0500 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Fri Oct 21 23:40:32 2016 -0500 ---------------------------------------------------------------------- expressions/scalar/ScalarAttribute.cpp | 2 +- relational_operators/AggregationOperator.cpp | 12 ++- relational_operators/AggregationOperator.hpp | 10 +- relational_operators/BuildHashOperator.cpp | 17 +++- relational_operators/BuildHashOperator.hpp | 18 +++- relational_operators/CMakeLists.txt | 10 ++ relational_operators/HashJoinOperator.cpp | 78 +++++++++++--- relational_operators/HashJoinOperator.hpp | 49 ++++++--- relational_operators/SelectOperator.cpp | 76 ++++++++++---- relational_operators/SelectOperator.hpp | 16 ++- storage/AggregationOperationState.cpp | 51 ++++++---- storage/AggregationOperationState.hpp | 9 +- storage/CMakeLists.txt | 5 +- storage/StorageBlock.cpp | 118 +++++++++------------- storage/StorageBlock.hpp | 82 ++++++--------- utility/lip_filter/CMakeLists.txt | 4 + utility/lip_filter/LIPFilterBuilder.hpp | 3 - utility/lip_filter/LIPFilterUtil.hpp | 79 +++++++++++++++ 18 files changed, 434 insertions(+), 205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/expressions/scalar/ScalarAttribute.cpp ---------------------------------------------------------------------- diff --git a/expressions/scalar/ScalarAttribute.cpp b/expressions/scalar/ScalarAttribute.cpp index b29286b..cc42084 100644 --- a/expressions/scalar/ScalarAttribute.cpp +++ b/expressions/scalar/ScalarAttribute.cpp @@ -168,7 +168,7 @@ ColumnVector* ScalarAttribute::getAllValuesForJoin( ValueAccessor *accessor = using_left_relation ? left_accessor : right_accessor; - return InvokeOnValueAccessorNotAdapter( + return InvokeOnAnyValueAccessor( accessor, [&joined_tuple_ids, &attr_id, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/relational_operators/AggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp index 056e76d..963012c 100644 --- a/relational_operators/AggregationOperator.cpp +++ b/relational_operators/AggregationOperator.cpp @@ -27,6 +27,8 @@ #include "relational_operators/WorkOrder.pb.h" #include "storage/AggregationOperationState.hpp" #include "storage/StorageBlockInfo.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" +#include "utility/lip_filter/LIPFilterUtil.hpp" #include "tmb/id_typedefs.h" @@ -45,7 +47,8 @@ bool AggregationOperator::getAllWorkOrders( new AggregationWorkOrder( query_id_, input_block_id, - query_context->getAggregationState(aggr_state_index_)), + query_context->getAggregationState(aggr_state_index_), + MakeLIPFilterAdaptiveProber(lip_deployment_index_, query_context)), op_index_); } started_ = true; @@ -57,7 +60,8 @@ bool AggregationOperator::getAllWorkOrders( new AggregationWorkOrder( query_id_, input_relation_block_ids_[num_workorders_generated_], - query_context->getAggregationState(aggr_state_index_)), + query_context->getAggregationState(aggr_state_index_), + MakeLIPFilterAdaptiveProber(lip_deployment_index_, query_context)), op_index_); ++num_workorders_generated_; } @@ -86,6 +90,7 @@ bool AggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *contai } serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_id block) { + // TODO(jianqiao): LIPFilter serialization::WorkOrder *proto = new serialization::WorkOrder; proto->set_work_order_type(serialization::AGGREGATION); proto->set_query_id(query_id_); @@ -96,9 +101,8 @@ serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_ return proto; } - void AggregationWorkOrder::execute() { - state_->aggregateBlock(input_block_id_); + state_->aggregateBlock(input_block_id_, lip_filter_adaptive_prober_.get()); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/relational_operators/AggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp index 31c1da4..b5ed977 100644 --- a/relational_operators/AggregationOperator.hpp +++ b/relational_operators/AggregationOperator.hpp @@ -30,6 +30,7 @@ #include "relational_operators/WorkOrder.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" #include "glog/logging.h" @@ -137,13 +138,16 @@ class AggregationWorkOrder : public WorkOrder { * @param query_id The ID of this query. * @param input_block_id The block id. * @param state The AggregationState to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ AggregationWorkOrder(const std::size_t query_id, const block_id input_block_id, - AggregationOperationState *state) + AggregationOperationState *state, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), input_block_id_(input_block_id), - state_(DCHECK_NOTNULL(state)) {} + state_(DCHECK_NOTNULL(state)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} ~AggregationWorkOrder() override {} @@ -153,6 +157,8 @@ class AggregationWorkOrder : public WorkOrder { const block_id input_block_id_; AggregationOperationState *state_; + std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_; + DISALLOW_COPY_AND_ASSIGN(AggregationWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/relational_operators/BuildHashOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp index 465621c..aafbb46 100644 --- a/relational_operators/BuildHashOperator.cpp +++ b/relational_operators/BuildHashOperator.cpp @@ -34,6 +34,8 @@ #include "storage/TupleReference.hpp" #include "storage/TupleStorageSubBlock.hpp" #include "storage/ValueAccessor.hpp" +#include "utility/lip_filter/LIPFilterBuilder.hpp" +#include "utility/lip_filter/LIPFilterUtil.hpp" #include "glog/logging.h" @@ -79,7 +81,8 @@ bool BuildHashOperator::getAllWorkOrders( any_join_key_attributes_nullable_, input_block_id, hash_table, - storage_manager), + storage_manager, + MakeLIPFilterBuilder(lip_deployment_index_, query_context)), op_index_); } started_ = true; @@ -95,7 +98,8 @@ bool BuildHashOperator::getAllWorkOrders( any_join_key_attributes_nullable_, input_relation_block_ids_[num_workorders_generated_], hash_table, - storage_manager), + storage_manager, + MakeLIPFilterBuilder(lip_deployment_index_, query_context)), op_index_); ++num_workorders_generated_; } @@ -136,17 +140,24 @@ serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id any_join_key_attributes_nullable_); proto->SetExtension(serialization::BuildHashWorkOrder::join_hash_table_index, hash_table_index_); proto->SetExtension(serialization::BuildHashWorkOrder::block_id, block); + // TODO(jianqiao): update lip_filter related stuff return proto; } - void BuildHashWorkOrder::execute() { BlockReference block( storage_manager_->getBlock(build_block_id_, input_relation_)); TupleReferenceGenerator generator(build_block_id_); std::unique_ptr<ValueAccessor> accessor(block->getTupleStorageSubBlock().createValueAccessor()); + + // Build LIPFilters if enabled. + if (lip_filter_builder_ != nullptr) { + lip_filter_builder_->insertValueAccessor(accessor.get()); + accessor->beginIterationVirtual(); + } + HashTablePutResult result; if (join_key_attributes_.size() == 1) { result = hash_table_->putValueAccessor(accessor.get(), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/relational_operators/BuildHashOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp index 4a80a8a..0f96ef2 100644 --- a/relational_operators/BuildHashOperator.hpp +++ b/relational_operators/BuildHashOperator.hpp @@ -20,6 +20,7 @@ #ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_ #define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_ +#include <memory> #include <string> #include <utility> #include <vector> @@ -31,6 +32,7 @@ #include "relational_operators/WorkOrder.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilterBuilder.hpp" #include "glog/logging.h" @@ -162,6 +164,7 @@ class BuildHashWorkOrder : public WorkOrder { * @param build_block_id The block id. * @param hash_table The JoinHashTable to use. * @param storage_manager The StorageManager to use. + * @param lip_filter_builder The attached LIP filter builer. **/ BuildHashWorkOrder(const std::size_t query_id, const CatalogRelationSchema &input_relation, @@ -169,14 +172,16 @@ class BuildHashWorkOrder : public WorkOrder { const bool any_join_key_attributes_nullable, const block_id build_block_id, JoinHashTable *hash_table, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterBuilder *lip_filter_builder = nullptr) : WorkOrder(query_id), input_relation_(input_relation), join_key_attributes_(join_key_attributes), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), build_block_id_(build_block_id), hash_table_(DCHECK_NOTNULL(hash_table)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_builder_(lip_filter_builder) {} /** * @brief Constructor for the distributed version. @@ -189,6 +194,7 @@ class BuildHashWorkOrder : public WorkOrder { * @param build_block_id The block id. * @param hash_table The JoinHashTable to use. * @param storage_manager The StorageManager to use. + * @param lip_filter_builder The attached LIP filter builer. **/ BuildHashWorkOrder(const std::size_t query_id, const CatalogRelationSchema &input_relation, @@ -196,14 +202,16 @@ class BuildHashWorkOrder : public WorkOrder { const bool any_join_key_attributes_nullable, const block_id build_block_id, JoinHashTable *hash_table, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterBuilder *lip_filter_builder = nullptr) : WorkOrder(query_id), input_relation_(input_relation), join_key_attributes_(std::move(join_key_attributes)), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), build_block_id_(build_block_id), hash_table_(DCHECK_NOTNULL(hash_table)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_builder_(lip_filter_builder) {} ~BuildHashWorkOrder() override {} @@ -222,6 +230,8 @@ class BuildHashWorkOrder : public WorkOrder { JoinHashTable *hash_table_; StorageManager *storage_manager_; + std::unique_ptr<LIPFilterBuilder> lip_filter_builder_; + DISALLOW_COPY_AND_ASSIGN(BuildHashWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index a9645b4..8c96d87 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -92,6 +92,8 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator quickstep_storage_AggregationOperationState quickstep_storage_StorageBlockInfo quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilterAdaptiveProber + quickstep_utility_lipfilter_LIPFilterUtil tmb) target_link_libraries(quickstep_relationaloperators_BuildHashOperator glog @@ -111,6 +113,8 @@ target_link_libraries(quickstep_relationaloperators_BuildHashOperator quickstep_storage_TupleStorageSubBlock quickstep_storage_ValueAccessor quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilterBuilder + quickstep_utility_lipfilter_LIPFilterUtil tmb) target_link_libraries(quickstep_relationaloperators_CreateIndexOperator glog @@ -223,6 +227,8 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator quickstep_types_containers_ColumnVector quickstep_types_containers_ColumnVectorsValueAccessor quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilterAdaptiveProber + quickstep_utility_lipfilter_LIPFilterUtil tmb) target_link_libraries(quickstep_relationaloperators_InsertOperator glog @@ -322,7 +328,11 @@ target_link_libraries(quickstep_relationaloperators_SelectOperator quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager + quickstep_storage_TupleIdSequence + quickstep_storage_ValueAccessor quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilterAdaptiveProber + quickstep_utility_lipfilter_LIPFilterUtil tmb) if(QUICKSTEP_HAVE_LIBNUMA) target_link_libraries(quickstep_relationaloperators_SelectOperator http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/relational_operators/HashJoinOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp index 779c0fe..1ac13d3 100644 --- a/relational_operators/HashJoinOperator.cpp +++ b/relational_operators/HashJoinOperator.cpp @@ -48,6 +48,8 @@ #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" +#include "utility/lip_filter/LIPFilterUtil.hpp" #include "gflags/gflags.h" #include "glog/logging.h" @@ -95,8 +97,8 @@ class MapBasedJoinedTupleCollector { class SemiAntiJoinTupleCollector { public: - explicit SemiAntiJoinTupleCollector(const TupleStorageSubBlock &tuple_store) { - filter_.reset(tuple_store.getExistenceMap()); + explicit SemiAntiJoinTupleCollector(TupleIdSequence *existence_map) { + filter_ = existence_map; } template <typename ValueAccessorT> @@ -104,12 +106,8 @@ class SemiAntiJoinTupleCollector { filter_->set(accessor.getCurrentPosition(), false); } - const TupleIdSequence* filter() const { - return filter_.get(); - } - private: - std::unique_ptr<TupleIdSequence> filter_; + TupleIdSequence *filter_; }; class OuterJoinTupleCollector { @@ -203,7 +201,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders( selection, hash_table, output_destination, - storage_manager), + storage_manager, + MakeLIPFilterAdaptiveProber(lip_deployment_index_, query_context)), op_index_); } started_ = true; @@ -223,7 +222,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders( selection, hash_table, output_destination, - storage_manager), + storage_manager, + MakeLIPFilterAdaptiveProber(lip_deployment_index_, query_context)), op_index_); ++num_workorders_generated_; } // end while @@ -295,6 +295,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders( } bool HashJoinOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + // TODO(jianqiao): LIPFilter switch (join_type_) { case JoinType::kInnerJoin: return getAllNonOuterJoinWorkOrderProtos(container, serialization::HashJoinWorkOrder::HASH_INNER_JOIN); @@ -423,6 +424,18 @@ void HashInnerJoinWorkOrder::execute() { const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); + + // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled. + std::unique_ptr<TupleIdSequence> existence_map; + std::unique_ptr<ValueAccessor> base_accessor; + if (lip_filter_adaptive_prober_ != nullptr) { + base_accessor.reset(probe_accessor.release()); + existence_map.reset( + lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get())); + probe_accessor.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); + } + MapBasedJoinedTupleCollector collector; if (join_key_attributes_.size() == 1) { hash_table_.getAllFromValueAccessor( @@ -529,6 +542,17 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() { std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); + // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled. + std::unique_ptr<TupleIdSequence> existence_map; + std::unique_ptr<ValueAccessor> base_accessor; + if (lip_filter_adaptive_prober_ != nullptr) { + base_accessor.reset(probe_accessor.release()); + existence_map.reset( + lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get())); + probe_accessor.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); + } + // We collect all the matching probe relation tuples, as there's a residual // preidcate that needs to be applied after collecting these matches. MapBasedJoinedTupleCollector collector; @@ -548,7 +572,6 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() { // Get a filter for tuples in the given probe block. TupleIdSequence filter(probe_store.getMaxTupleID() + 1); - filter.setRange(0, filter.length(), false); for (const std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>> &build_block_entry : *collector.getJoinedTuples()) { @@ -609,7 +632,23 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() { const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); - SemiAntiJoinTupleCollector collector(probe_store); + + // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled. + std::unique_ptr<TupleIdSequence> existence_map; + std::unique_ptr<ValueAccessor> base_accessor; + if (lip_filter_adaptive_prober_ != nullptr) { + base_accessor.reset(probe_accessor.release()); + existence_map.reset( + lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get())); + probe_accessor.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); + } + + if (existence_map == nullptr) { + existence_map.reset(probe_store.getExistenceMap()); + } + + SemiAntiJoinTupleCollector collector(existence_map.get()); // We collect all the probe relation tuples which have at least one matching // tuple in the build relation. As a performance optimization, the hash table // just looks for the existence of the probing key in the hash table and sets @@ -636,8 +675,15 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() { probe_block->getIndices(), probe_block->getIndicesConsistent()); - std::unique_ptr<ValueAccessor> probe_accessor_with_filter( - probe_store.createValueAccessor(collector.filter())); + std::unique_ptr<ValueAccessor> probe_accessor_with_filter; + if (base_accessor != nullptr) { + probe_accessor_with_filter.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); + } else { + probe_accessor_with_filter.reset( + probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); + } + ColumnVectorsValueAccessor temp_result; for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin(); selection_it != selection_.end(); ++selection_it) { @@ -656,7 +702,9 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() { const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); - SemiAntiJoinTupleCollector collector(probe_store); + std::unique_ptr<TupleIdSequence> existence_map(probe_store.getExistenceMap()); + + SemiAntiJoinTupleCollector collector(existence_map.get()); // We probe the hash table to find the keys which have an entry in the // hash table. if (join_key_attributes_.size() == 1) { @@ -680,7 +728,7 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() { probe_block->getIndicesConsistent()); std::unique_ptr<ValueAccessor> probe_accessor_with_filter( - probe_store.createValueAccessor(collector.filter())); + probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); ColumnVectorsValueAccessor temp_result; for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin(); selection_it != selection_.end(); ++selection_it) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/relational_operators/HashJoinOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp index fa393b6..566a367 100644 --- a/relational_operators/HashJoinOperator.hpp +++ b/relational_operators/HashJoinOperator.hpp @@ -35,6 +35,7 @@ #include "storage/HashTable.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" #include "glog/logging.h" @@ -295,6 +296,7 @@ class HashInnerJoinWorkOrder : public WorkOrder { * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ HashInnerJoinWorkOrder( const std::size_t query_id, @@ -307,7 +309,8 @@ class HashInnerJoinWorkOrder : public WorkOrder { const std::vector<std::unique_ptr<const Scalar>> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -318,7 +321,8 @@ class HashInnerJoinWorkOrder : public WorkOrder { selection_(selection), hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} /** * @brief Constructor for the distributed version. @@ -342,6 +346,7 @@ class HashInnerJoinWorkOrder : public WorkOrder { * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ HashInnerJoinWorkOrder( const std::size_t query_id, @@ -354,7 +359,8 @@ class HashInnerJoinWorkOrder : public WorkOrder { const std::vector<std::unique_ptr<const Scalar>> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -365,7 +371,8 @@ class HashInnerJoinWorkOrder : public WorkOrder { selection_(selection), hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} ~HashInnerJoinWorkOrder() override {} @@ -392,6 +399,8 @@ class HashInnerJoinWorkOrder : public WorkOrder { InsertDestination *output_destination_; StorageManager *storage_manager_; + std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_; + DISALLOW_COPY_AND_ASSIGN(HashInnerJoinWorkOrder); }; @@ -423,6 +432,7 @@ class HashSemiJoinWorkOrder : public WorkOrder { * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ HashSemiJoinWorkOrder( const std::size_t query_id, @@ -435,7 +445,8 @@ class HashSemiJoinWorkOrder : public WorkOrder { const std::vector<std::unique_ptr<const Scalar>> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -446,7 +457,8 @@ class HashSemiJoinWorkOrder : public WorkOrder { selection_(selection), hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} /** * @brief Constructor for the distributed version. @@ -470,6 +482,7 @@ class HashSemiJoinWorkOrder : public WorkOrder { * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ HashSemiJoinWorkOrder( const std::size_t query_id, @@ -482,7 +495,8 @@ class HashSemiJoinWorkOrder : public WorkOrder { const std::vector<std::unique_ptr<const Scalar>> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -493,7 +507,8 @@ class HashSemiJoinWorkOrder : public WorkOrder { selection_(selection), hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} ~HashSemiJoinWorkOrder() override {} @@ -516,6 +531,8 @@ class HashSemiJoinWorkOrder : public WorkOrder { InsertDestination *output_destination_; StorageManager *storage_manager_; + std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_; + DISALLOW_COPY_AND_ASSIGN(HashSemiJoinWorkOrder); }; @@ -547,6 +564,7 @@ class HashAntiJoinWorkOrder : public WorkOrder { * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ HashAntiJoinWorkOrder( const std::size_t query_id, @@ -559,7 +577,8 @@ class HashAntiJoinWorkOrder : public WorkOrder { const std::vector<std::unique_ptr<const Scalar>> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -570,7 +589,8 @@ class HashAntiJoinWorkOrder : public WorkOrder { selection_(selection), hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} /** * @brief Constructor for the distributed version. @@ -594,6 +614,7 @@ class HashAntiJoinWorkOrder : public WorkOrder { * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ HashAntiJoinWorkOrder( const std::size_t query_id, @@ -606,7 +627,8 @@ class HashAntiJoinWorkOrder : public WorkOrder { const std::vector<std::unique_ptr<const Scalar>> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -617,7 +639,8 @@ class HashAntiJoinWorkOrder : public WorkOrder { selection_(selection), hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} ~HashAntiJoinWorkOrder() override {} @@ -646,6 +669,8 @@ class HashAntiJoinWorkOrder : public WorkOrder { InsertDestination *output_destination_; StorageManager *storage_manager_; + std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_; + DISALLOW_COPY_AND_ASSIGN(HashAntiJoinWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/relational_operators/SelectOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp index d56326e..e453069 100644 --- a/relational_operators/SelectOperator.cpp +++ b/relational_operators/SelectOperator.cpp @@ -30,6 +30,10 @@ #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" +#include "storage/TupleIdSequence.hpp" +#include "storage/ValueAccessor.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" +#include "utility/lip_filter/LIPFilterUtil.hpp" #include "glog/logging.h" @@ -40,22 +44,26 @@ namespace quickstep { class Predicate; void SelectOperator::addWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, StorageManager *storage_manager, const Predicate *predicate, const std::vector<std::unique_ptr<const Scalar>> *selection, InsertDestination *output_destination) { if (input_relation_is_stored_) { for (const block_id input_block_id : input_relation_block_ids_) { - container->addNormalWorkOrder(new SelectWorkOrder(query_id_, - input_relation_, - input_block_id, - predicate, - simple_projection_, - simple_selection_, - selection, - output_destination, - storage_manager), - op_index_); + container->addNormalWorkOrder( + new SelectWorkOrder( + query_id_, + input_relation_, + input_block_id, + predicate, + simple_projection_, + simple_selection_, + selection, + output_destination, + storage_manager, + MakeLIPFilterAdaptiveProber(lip_deployment_index_, query_context)), + op_index_); } } else { while (num_workorders_generated_ < input_relation_block_ids_.size()) { @@ -69,7 +77,8 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container, simple_selection_, selection, output_destination, - storage_manager), + storage_manager, + MakeLIPFilterAdaptiveProber(lip_deployment_index_, query_context)), op_index_); ++num_workorders_generated_; } @@ -78,6 +87,7 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container, #ifdef QUICKSTEP_HAVE_LIBNUMA void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, StorageManager *storage_manager, const Predicate *predicate, const std::vector<std::unique_ptr<const Scalar>> *selection, @@ -99,6 +109,7 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container, selection, output_destination, storage_manager, + MakeLIPFilterAdaptiveProber(lip_deployment_index_, query_context), placement_scheme_->getNUMANodeForBlock(input_block_id)), op_index_); } @@ -120,6 +131,7 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container, selection, output_destination, storage_manager, + MakeLIPFilterAdaptiveProber(lip_deployment_index_, query_context), placement_scheme_->getNUMANodeForBlock(block_in_partition)), op_index_); ++num_workorders_generated_in_partition_[part_id]; @@ -151,11 +163,21 @@ bool SelectOperator::getAllWorkOrders( if (input_relation_.hasPartitionScheme()) { #ifdef QUICKSTEP_HAVE_LIBNUMA if (input_relation_.hasNUMAPlacementScheme()) { - addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination); + addPartitionAwareWorkOrders(container, + query_context, + storage_manager, + predicate, + selection, + output_destination); } #endif } else { - addWorkOrders(container, storage_manager, predicate, selection, output_destination); + addWorkOrders(container, + query_context, + storage_manager, + predicate, + selection, + output_destination); } started_ = true; } @@ -164,11 +186,21 @@ bool SelectOperator::getAllWorkOrders( if (input_relation_.hasPartitionScheme()) { #ifdef QUICKSTEP_HAVE_LIBNUMA if (input_relation_.hasNUMAPlacementScheme()) { - addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination); + addPartitionAwareWorkOrders(container, + query_context, + storage_manager, + predicate, + selection, + output_destination); } #endif } else { - addWorkOrders(container, storage_manager, predicate, selection, output_destination); + addWorkOrders(container, + query_context, + storage_manager, + predicate, + selection, + output_destination); } return done_feeding_input_relation_; } @@ -219,14 +251,24 @@ void SelectWorkOrder::execute() { BlockReference block( storage_manager_->getBlock(input_block_id_, input_relation_, getPreferredNUMANodes()[0])); + std::unique_ptr<TupleIdSequence> lip_filter_matches; + if (lip_filter_adaptive_prober_ != nullptr) { + std::unique_ptr<ValueAccessor> accessor( + block->getTupleStorageSubBlock().createValueAccessor()); + lip_filter_matches.reset( + lip_filter_adaptive_prober_->filterValueAccessor(accessor.get())); + } + if (simple_projection_) { block->selectSimple(simple_selection_, predicate_, - output_destination_); + output_destination_, + lip_filter_matches.get()); } else { block->select(*DCHECK_NOTNULL(selection_), predicate_, - output_destination_); + output_destination_, + lip_filter_matches.get()); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/relational_operators/SelectOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp index 0f5c712..2ace458 100644 --- a/relational_operators/SelectOperator.hpp +++ b/relational_operators/SelectOperator.hpp @@ -38,6 +38,7 @@ #include "relational_operators/WorkOrder.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" #include "glog/logging.h" @@ -49,6 +50,7 @@ namespace quickstep { class CatalogRelationSchema; class InsertDestination; +class LIPFilterDeployment; class Predicate; class Scalar; class StorageManager; @@ -247,12 +249,14 @@ class SelectOperator : public RelationalOperator { } void addWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, StorageManager *storage_manager, const Predicate *predicate, const std::vector<std::unique_ptr<const Scalar>> *selection, InsertDestination *output_destination); void addPartitionAwareWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, StorageManager *storage_manager, const Predicate *predicate, const std::vector<std::unique_ptr<const Scalar>> *selection, @@ -318,6 +322,7 @@ class SelectWorkOrder : public WorkOrder { * @param output_destination The InsertDestination to insert the selection * results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ SelectWorkOrder(const std::size_t query_id, const CatalogRelationSchema &input_relation, @@ -328,6 +333,7 @@ class SelectWorkOrder : public WorkOrder { const std::vector<std::unique_ptr<const Scalar>> *selection, InsertDestination *output_destination, StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr, const numa_node_id numa_node = 0) : WorkOrder(query_id), input_relation_(input_relation), @@ -337,7 +343,8 @@ class SelectWorkOrder : public WorkOrder { simple_selection_(simple_selection), selection_(selection), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) { + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) { preferred_numa_nodes_.push_back(numa_node); } @@ -360,6 +367,7 @@ class SelectWorkOrder : public WorkOrder { * @param output_destination The InsertDestination to insert the selection * results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ SelectWorkOrder(const std::size_t query_id, const CatalogRelationSchema &input_relation, @@ -370,6 +378,7 @@ class SelectWorkOrder : public WorkOrder { const std::vector<std::unique_ptr<const Scalar>> *selection, InsertDestination *output_destination, StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr, const numa_node_id numa_node = 0) : WorkOrder(query_id), input_relation_(input_relation), @@ -379,7 +388,8 @@ class SelectWorkOrder : public WorkOrder { simple_selection_(std::move(simple_selection)), selection_(selection), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) { + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) { preferred_numa_nodes_.push_back(numa_node); } @@ -407,6 +417,8 @@ class SelectWorkOrder : public WorkOrder { InsertDestination *output_destination_; StorageManager *storage_manager_; + std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_; + DISALLOW_COPY_AND_ASSIGN(SelectWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 249026d..f89fd7a 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -46,10 +46,13 @@ #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" +#include "storage/TupleIdSequence.hpp" +#include "storage/ValueAccessor.hpp" #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" #include "types/containers/Tuple.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" #include "glog/logging.h" @@ -331,11 +334,12 @@ bool AggregationOperationState::ProtoIsValid( return true; } -void AggregationOperationState::aggregateBlock(const block_id input_block) { +void AggregationOperationState::aggregateBlock(const block_id input_block, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober) { if (group_by_list_.empty()) { aggregateBlockSingleState(input_block); } else { - aggregateBlockHashTable(input_block); + aggregateBlockHashTable(input_block, lip_filter_adaptive_prober); } } @@ -367,10 +371,13 @@ void AggregationOperationState::aggregateBlockSingleState( BlockReference block( storage_manager_->getBlock(input_block, input_relation_)); - // If there is a filter predicate, 'reuse_matches' holds the set of matching - // tuples so that it can be reused across multiple aggregates (i.e. we only - // pay the cost of evaluating the predicate once). - std::unique_ptr<TupleIdSequence> reuse_matches; + std::unique_ptr<TupleIdSequence> matches; + if (predicate_ != nullptr) { + std::unique_ptr<ValueAccessor> accessor( + block->getTupleStorageSubBlock().createValueAccessor()); + matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get())); + } + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { const std::vector<attribute_id> *local_arguments_as_attributes = nullptr; #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION @@ -387,9 +394,8 @@ void AggregationOperationState::aggregateBlockSingleState( arguments_[agg_idx], local_arguments_as_attributes, {}, /* group_by */ - predicate_.get(), + matches.get(), distinctify_hashtables_[agg_idx].get(), - &reuse_matches, nullptr /* reuse_group_by_vectors */); local_state.emplace_back(nullptr); } else { @@ -397,8 +403,7 @@ void AggregationOperationState::aggregateBlockSingleState( local_state.emplace_back(block->aggregate(*handles_[agg_idx], arguments_[agg_idx], local_arguments_as_attributes, - predicate_.get(), - &reuse_matches)); + matches.get())); } } @@ -407,14 +412,24 @@ void AggregationOperationState::aggregateBlockSingleState( } void AggregationOperationState::aggregateBlockHashTable( - const block_id input_block) { + const block_id input_block, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober) { BlockReference block( storage_manager_->getBlock(input_block, input_relation_)); - // If there is a filter predicate, 'reuse_matches' holds the set of matching - // tuples so that it can be reused across multiple aggregates (i.e. we only - // pay the cost of evaluating the predicate once). - std::unique_ptr<TupleIdSequence> reuse_matches; + // Apply LIPFilters first, and then the predicate, to generate a TupleIdSequence + // as the existence map for the tuples. + std::unique_ptr<TupleIdSequence> matches; + if (lip_filter_adaptive_prober != nullptr || predicate_ != nullptr) { + std::unique_ptr<ValueAccessor> accessor( + block->getTupleStorageSubBlock().createValueAccessor()); + if (lip_filter_adaptive_prober != nullptr) { + matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get())); + } + if (predicate_ != nullptr) { + matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get())); + } + } // This holds values of all the GROUP BY attributes so that the can be reused // across multiple aggregates (i.e. we only pay the cost of evaluatin the @@ -431,9 +446,8 @@ void AggregationOperationState::aggregateBlockHashTable( arguments_[agg_idx], nullptr, /* arguments_as_attributes */ group_by_list_, - predicate_.get(), + matches.get(), distinctify_hashtables_[agg_idx].get(), - &reuse_matches, &reuse_group_by_vectors); } } @@ -447,9 +461,8 @@ void AggregationOperationState::aggregateBlockHashTable( DCHECK(agg_hash_table != nullptr); block->aggregateGroupBy(arguments_, group_by_list_, - predicate_.get(), + matches.get(), agg_hash_table, - &reuse_matches, &reuse_group_by_vectors); group_by_hashtable_pool_->returnHashTable(agg_hash_table); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 3b0f286..f3332a7 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -41,6 +41,7 @@ class AggregateFunction; class CatalogDatabaseLite; class CatalogRelationSchema; class InsertDestination; +class LIPFilterAdaptiveProber; class StorageManager; /** \addtogroup Storage @@ -155,8 +156,11 @@ class AggregationOperationState { * * @param input_block The block ID of the storage block where the aggreates * are going to be computed. + * @param lip_filter_adaptive_prober The LIPFilter prober for pre-filtering + * the block. **/ - void aggregateBlock(const block_id input_block); + void aggregateBlock(const block_id input_block, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober); /** * @brief Generate the final results for the aggregates managed by this @@ -185,7 +189,8 @@ class AggregationOperationState { // Aggregate on input block. void aggregateBlockSingleState(const block_id input_block); - void aggregateBlockHashTable(const block_id input_block); + void aggregateBlockHashTable(const block_id input_block, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober); void finalizeSingleState(InsertDestination *output_destination); void finalizeHashTable(InsertDestination *output_destination); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index 325a7cb..0e32cc1 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -292,11 +292,14 @@ target_link_libraries(quickstep_storage_AggregationOperationState quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager + quickstep_storage_TupleIdSequence + quickstep_storage_ValueAccessor quickstep_types_TypedValue quickstep_types_containers_ColumnVector quickstep_types_containers_ColumnVectorsValueAccessor quickstep_types_containers_Tuple - quickstep_utility_Macros) + quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilterAdaptiveProber) target_link_libraries(quickstep_storage_AggregationOperationState_proto quickstep_expressions_Expressions_proto quickstep_expressions_aggregation_AggregateFunction_proto http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/storage/StorageBlock.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp index ec5990f..57e4136 100644 --- a/storage/StorageBlock.cpp +++ b/storage/StorageBlock.cpp @@ -341,20 +341,24 @@ void StorageBlock::sample(const bool is_block_sample, void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection, const Predicate *predicate, - InsertDestinationInterface *destination) const { + InsertDestinationInterface *destination, + const TupleIdSequence *filter) const { ColumnVectorsValueAccessor temp_result; { SubBlocksReference sub_blocks_ref(*tuple_store_, indices_, indices_consistent_); + std::unique_ptr<ValueAccessor> base_accessor( + tuple_store_->createValueAccessor(filter)); + std::unique_ptr<TupleIdSequence> matches; std::unique_ptr<ValueAccessor> accessor; - if (predicate == nullptr) { - accessor.reset(tuple_store_->createValueAccessor()); - } else { + if (predicate != nullptr) { matches.reset(getMatchesForPredicate(predicate)); - accessor.reset(tuple_store_->createValueAccessor(matches.get())); + accessor.reset(base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches)); + } else { + accessor.reset(base_accessor.release()); } for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection.begin(); @@ -371,14 +375,18 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection, void StorageBlock::selectSimple(const std::vector<attribute_id> &selection, const Predicate *predicate, - InsertDestinationInterface *destination) const { - std::unique_ptr<ValueAccessor> accessor; + InsertDestinationInterface *destination, + const TupleIdSequence *filter) const { + std::unique_ptr<ValueAccessor> base_accessor( + tuple_store_->createValueAccessor(filter)); + std::unique_ptr<TupleIdSequence> matches; - if (predicate == nullptr) { - accessor.reset(tuple_store_->createValueAccessor()); - } else { + std::unique_ptr<ValueAccessor> accessor; + if (predicate != nullptr) { matches.reset(getMatchesForPredicate(predicate)); - accessor.reset(tuple_store_->createValueAccessor(matches.get())); + accessor.reset(base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches)); + } else { + accessor.reset(base_accessor.release()); } destination->bulkInsertTuplesWithRemappedAttributes(selection, @@ -389,37 +397,28 @@ AggregationState* StorageBlock::aggregate( const AggregationHandle &handle, const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<attribute_id> *arguments_as_attributes, - const Predicate *predicate, - std::unique_ptr<TupleIdSequence> *reuse_matches) const { - // If there is a filter predicate that hasn't already been evaluated, - // evaluate it now and save the results for other aggregates on this same - // block. - if (predicate && !*reuse_matches) { - reuse_matches->reset(getMatchesForPredicate(predicate)); - } - + const TupleIdSequence *filter) const { #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION // If all the arguments to this aggregate are plain relation attributes, // aggregate directly on a ValueAccessor from this block to avoid a copy. if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) { DCHECK_EQ(arguments.size(), arguments_as_attributes->size()) << "Mismatch between number of arguments and number of attribute_ids"; - return aggregateHelperValueAccessor(handle, *arguments_as_attributes, reuse_matches->get()); + return aggregateHelperValueAccessor(handle, *arguments_as_attributes, filter); } // TODO(shoban): We may want to optimize for ScalarLiteral here. #endif // Call aggregateHelperColumnVector() to materialize each argument as a // ColumnVector, then aggregate over those. - return aggregateHelperColumnVector(handle, arguments, reuse_matches->get()); + return aggregateHelperColumnVector(handle, arguments, filter); } void StorageBlock::aggregateGroupBy( const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments, const std::vector<std::unique_ptr<const Scalar>> &group_by, - const Predicate *predicate, + const TupleIdSequence *filter, AggregationStateHashTableBase *hash_table, - std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const { DCHECK_GT(group_by.size(), 0u) << "Called aggregateGroupBy() with zero GROUP BY expressions"; @@ -438,23 +437,7 @@ void StorageBlock::aggregateGroupBy( // this aggregate, as well as the GROUP BY expression values. ColumnVectorsValueAccessor temp_result; { - std::unique_ptr<ValueAccessor> accessor; - if (predicate) { - if (!*reuse_matches) { - // If there is a filter predicate that hasn't already been evaluated, - // evaluate it now and save the results for other aggregates on this - // same block. - reuse_matches->reset(getMatchesForPredicate(predicate)); - } - - // Create a filtered ValueAccessor that only iterates over predicate - // matches. - accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get())); - } else { - // Create a ValueAccessor that iterates over all tuples in this block - accessor.reset(tuple_store_->createValueAccessor()); - } - + std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter)); attribute_id attr_id = 0; // First, put GROUP BY keys into 'temp_result'. @@ -503,9 +486,8 @@ void StorageBlock::aggregateDistinct( const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<attribute_id> *arguments_as_attributes, const std::vector<std::unique_ptr<const Scalar>> &group_by, - const Predicate *predicate, + const TupleIdSequence *filter, AggregationStateHashTableBase *distinctify_hash_table, - std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const { DCHECK_GT(arguments.size(), 0u) << "Called aggregateDistinct() with zero argument expressions"; @@ -517,22 +499,7 @@ void StorageBlock::aggregateDistinct( // this aggregate, as well as the GROUP BY expression values. ColumnVectorsValueAccessor temp_result; { - std::unique_ptr<ValueAccessor> accessor; - if (predicate) { - if (!*reuse_matches) { - // If there is a filter predicate that hasn't already been evaluated, - // evaluate it now and save the results for other aggregates on this - // same block. - reuse_matches->reset(getMatchesForPredicate(predicate)); - } - - // Create a filtered ValueAccessor that only iterates over predicate - // matches. - accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get())); - } else { - // Create a ValueAccessor that iterates over all tuples in this block - accessor.reset(tuple_store_->createValueAccessor()); - } + std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter)); #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION // If all the arguments to this aggregate are plain relation attributes, @@ -1246,23 +1213,36 @@ bool StorageBlock::rebuildIndexes(bool short_circuit) { return all_indices_consistent_; } -TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate) const { +TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate, + const TupleIdSequence *filter) const { if (predicate == nullptr) { - return tuple_store_->getExistenceMap(); + TupleIdSequence *matched = tuple_store_->getExistenceMap(); + if (filter != nullptr) { + matched->intersectWith(*filter); + } + return matched; } std::unique_ptr<ValueAccessor> value_accessor(tuple_store_->createValueAccessor()); - std::unique_ptr<TupleIdSequence> existence_map; - if (!tuple_store_->isPacked()) { - existence_map.reset(tuple_store_->getExistenceMap()); - } SubBlocksReference sub_blocks_ref(*tuple_store_, indices_, indices_consistent_); - return predicate->getAllMatches(value_accessor.get(), - &sub_blocks_ref, - nullptr, - existence_map.get()); + + if (!tuple_store_->isPacked()) { + std::unique_ptr<TupleIdSequence> existence_map(tuple_store_->getExistenceMap()); + if (filter != nullptr) { + existence_map->intersectWith(*filter); + } + return predicate->getAllMatches(value_accessor.get(), + &sub_blocks_ref, + nullptr, + existence_map.get()); + } else { + return predicate->getAllMatches(value_accessor.get(), + &sub_blocks_ref, + nullptr, + filter); + } } std::unordered_map<attribute_id, TypedValue>* StorageBlock::generateUpdatedValues( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/storage/StorageBlock.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp index bab5bab..61a35fe 100644 --- a/storage/StorageBlock.hpp +++ b/storage/StorageBlock.hpp @@ -313,6 +313,17 @@ class StorageBlock : public StorageBlockBase { ValueAccessor *accessor); /** + * @brief Get the IDs of tuples in this StorageBlock which match a given Predicate. + * + * @param predicate The predicate to match. + * @param filter If non-NULL, then only tuple IDs which are set in the + * filter will be checked (all others will be assumed to be false). + * @return A TupleIdSequence which contains matching tuple IDs for predicate. + **/ + TupleIdSequence* getMatchesForPredicate(const Predicate *predicate, + const TupleIdSequence *filter = nullptr) const; + + /** * @brief Perform a random sampling of data on the StorageBlock. The number * of records sampled is determined by the sample percentage in case of * tuple sample. For block sample all the records in a block are taken. @@ -340,6 +351,8 @@ class StorageBlock : public StorageBlockBase { * should be matched. * @param destination Where to insert the tuples resulting from the SELECT * query. + * @param filter If non-NULL, then only tuple IDs which are set in the + * filter will be checked (all others will be assumed to be false). * @exception TupleTooLargeForBlock A tuple produced by this selection was * too large to insert into an empty block provided by * destination. Selection may be partially complete (with some @@ -349,17 +362,20 @@ class StorageBlock : public StorageBlockBase { **/ void select(const std::vector<std::unique_ptr<const Scalar>> &selection, const Predicate *predicate, - InsertDestinationInterface *destination) const; + InsertDestinationInterface *destination, + const TupleIdSequence *filter) const; /** * @brief Perform a simple SELECT query on this StorageBlock which only * projects attributes and does not evaluate expressions. * - * @param destination Where to insert the tuples resulting from the SELECT - * query. * @param selection The attributes to project. * @param predicate A predicate for selection. NULL indicates that all tuples * should be matched. + * @param destination Where to insert the tuples resulting from the SELECT + * query. + * @param filter If non-NULL, then only tuple IDs which are set in the + * filter will be checked (all others will be assumed to be false). * @exception TupleTooLargeForBlock A tuple produced by this selection was * too large to insert into an empty block provided by * destination. Selection may be partially complete (with some @@ -372,7 +388,8 @@ class StorageBlock : public StorageBlockBase { **/ void selectSimple(const std::vector<attribute_id> &selection, const Predicate *predicate, - InsertDestinationInterface *destination) const; + InsertDestinationInterface *destination, + const TupleIdSequence *filter) const; /** * @brief Perform non GROUP BY aggregation on the tuples in the this storage @@ -384,23 +401,8 @@ class StorageBlock : public StorageBlockBase { * @param arguments_as_attributes If non-NULL, indicates a valid attribute_id * for each of the elements in arguments, and is used to elide a copy. * Has no effect if NULL, or if VECTOR_COPY_ELISION_LEVEL is NONE. - * @param predicate A predicate for selection. nullptr indicates that all - * tuples should be aggregated on. - * @param reuse_matches This parameter is used to store and reuse tuple-id - * sequence of matches pre-computed in an earlier invocations to - * aggregate(). \c reuse_matches is never \c nullptr for ease of use. - * Current invocation of aggregate() will reuse TupleIdSequence if - * passed, otherwise compute a TupleIdSequence based on \c predicate - * and store in \c reuse_matches. We use std::unique_ptr for each of - * use, since the caller will not have to selective free. - * - * For example, see this relevant pseudo-C++ code: - * \code - * std::unique_ptr<TupleIdSequence> matches; - * for each aggregate { - * block.aggregate(..., &matches); - * } - * \endcode + * @param filter If non-NULL, then only tuple IDs which are set in the + * filter will be checked (all others will be assumed to be false). * * @return Aggregated state for this block in the form of an * AggregationState. AggregationHandle::mergeStates() can be called @@ -412,8 +414,7 @@ class StorageBlock : public StorageBlockBase { const AggregationHandle &handle, const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<attribute_id> *arguments_as_attributes, - const Predicate *predicate, - std::unique_ptr<TupleIdSequence> *reuse_matches) const; + const TupleIdSequence *filter) const; /** * @brief Perform GROUP BY aggregation on the tuples in the this storage @@ -423,18 +424,10 @@ class StorageBlock : public StorageBlockBase { * @param group_by The list of GROUP BY attributes/expressions. The tuples in * this storage block are grouped by these attributes before * aggregation. - * @param predicate A predicate for selection. nullptr indicates that all - * tuples should be aggregated on. + * @param filter If non-NULL, then only tuple IDs which are set in the + * filter will be checked (all others will be assumed to be false). * @param hash_table Hash table to store aggregation state mapped based on * GROUP BY value list (defined by \c group_by). - * @param reuse_matches This parameter is used to store and reuse tuple-id - * sequence of matches pre-computed in an earlier invocations of - * aggregateGroupBy(). \c reuse_matches is never \c nullptr for ease of - * use. Current invocation of aggregateGroupBy() will reuse - * TupleIdSequence if passed, otherwise computes a TupleIdSequence based - * on \c predicate and stores in \c reuse_matches. We use - * std::unique_ptr for each of use, since the caller will not have to - * selective free. * @param reuse_group_by_vectors This parameter is used to store and reuse * GROUP BY attribute vectors pre-computed in an earlier invocation of * aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr @@ -444,10 +437,9 @@ class StorageBlock : public StorageBlockBase { * * For sample usage of aggregateGroupBy, see this relevant pseudo-C++ code: * \code - * std::unique_ptr<TupleIdSequence> matches; * std::vector<std::unique_ptr<ColumnVector>> group_by_vectors; * for each aggregate { - * block.aggregateGroupBy(..., &matches, &group_by_vectors); + * block.aggregateGroupBy(..., &group_by_vectors); * } * \endcode **/ @@ -461,9 +453,8 @@ class StorageBlock : public StorageBlockBase { void aggregateGroupBy( const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments, const std::vector<std::unique_ptr<const Scalar>> &group_by, - const Predicate *predicate, + const TupleIdSequence *filter, AggregationStateHashTableBase *hash_table, - std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const; /** @@ -481,19 +472,11 @@ class StorageBlock : public StorageBlockBase { * for each of the elements in arguments, and is used to elide a copy. * Has no effect if NULL, or if VECTOR_COPY_ELISION_LEVEL is NONE. * @param group_by The list of GROUP BY attributes/expressions. - * @param predicate A predicate for selection. \c nullptr indicates that all - * tuples should be aggregated on. + * @param filter If non-NULL, then only tuple IDs which are set in the + * filter will be checked (all others will be assumed to be false). * @param distinctify_hash_table Hash table to store the arguments and GROUP * BY expressions together as hash table key and a bool constant \c true * as hash table value. (So the hash table actually serves as a hash set.) - * @param reuse_matches This parameter is used to store and reuse tuple-id - * sequence of matches pre-computed in an earlier invocations of - * aggregateGroupBy(). \c reuse_matches is never \c nullptr for ease of - * use. Current invocation of aggregateGroupBy() will reuse - * TupleIdSequence if passed, otherwise computes a TupleIdSequence based - * on \c predicate and stores in \c reuse_matches. We use - * std::unique_ptr for each of use, since the caller will not have to - * selective free. * @param reuse_group_by_vectors This parameter is used to store and reuse * GROUP BY attribute vectors pre-computed in an earlier invocation of * aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr @@ -505,9 +488,8 @@ class StorageBlock : public StorageBlockBase { const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<attribute_id> *arguments_as_attributes, const std::vector<std::unique_ptr<const Scalar>> &group_by, - const Predicate *predicate, + const TupleIdSequence *filter, AggregationStateHashTableBase *distinctify_hash_table, - std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const; /** @@ -627,8 +609,6 @@ class StorageBlock : public StorageBlockBase { // StorageBlock's header. bool rebuildIndexes(bool short_circuit); - TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const; - std::unordered_map<attribute_id, TypedValue>* generateUpdatedValues( const ValueAccessor &accessor, const tuple_id tuple, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/utility/lip_filter/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt index b7224d2..23b3763 100644 --- a/utility/lip_filter/CMakeLists.txt +++ b/utility/lip_filter/CMakeLists.txt @@ -25,6 +25,7 @@ add_library(quickstep_utility_lipfilter_LIPFilterAdaptiveProber ../../empty_src. add_library(quickstep_utility_lipfilter_LIPFilterBuilder ../../empty_src.cpp LIPFilterBuilder.hpp) add_library(quickstep_utility_lipfilter_LIPFilterDeployment LIPFilterDeployment.cpp LIPFilterDeployment.hpp) add_library(quickstep_utility_lipfilter_LIPFilterFactory LIPFilterFactory.cpp LIPFilterFactory.hpp) +add_library(quickstep_utility_lipfilter_LIPFilterUtil ../../empty_src.cpp LIPFilterUtil.hpp) add_library(quickstep_utility_lipfilter_LIPFilter_proto ${utility_lipfilter_LIPFilter_proto_srcs}) add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src.cpp SingleIdentityHashFilter.hpp) @@ -58,6 +59,9 @@ target_link_libraries(quickstep_utility_lipfilter_LIPFilterFactory quickstep_utility_lipfilter_LIPFilter_proto quickstep_utility_lipfilter_SingleIdentityHashFilter quickstep_utility_Macros) +target_link_libraries(quickstep_utility_lipfilter_LIPFilterUtil + quickstep_queryexecution_QueryContext + quickstep_utility_lipfilter_LIPFilterDeployment) target_link_libraries(quickstep_utility_lipfilter_LIPFilter_proto ${PROTOBUF_LIBRARY} quickstep_types_Type_proto) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/utility/lip_filter/LIPFilterBuilder.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterBuilder.hpp b/utility/lip_filter/LIPFilterBuilder.hpp index deb8f66..aa84a06 100644 --- a/utility/lip_filter/LIPFilterBuilder.hpp +++ b/utility/lip_filter/LIPFilterBuilder.hpp @@ -39,9 +39,6 @@ class ValueAccessor; * @{ */ -class LIPFilterBuilder; -typedef std::shared_ptr<LIPFilterBuilder> LIPFilterBuilderPtr; - /** * @brief Helper class for building LIPFilters from a relation (i.e. ValueAccessor). */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96ef3507/utility/lip_filter/LIPFilterUtil.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterUtil.hpp b/utility/lip_filter/LIPFilterUtil.hpp new file mode 100644 index 0000000..ab17163 --- /dev/null +++ b/utility/lip_filter/LIPFilterUtil.hpp @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_UTIL_HPP_ +#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_UTIL_HPP_ + +#include "query_execution/QueryContext.hpp" +#include "utility/lip_filter/LIPFilterDeployment.hpp" + +namespace quickstep { + +class LIPFilterBuilder; +class LIPFilterAdaptiveProber; + +/** \addtogroup Utility + * @{ + */ + +/** + * @brief Create a LIPFilterBuilder for the given LIPFilterDeployment in QueryContext. + * + * @param lip_deployment_index The id of the LIPFilterDeployment in QueryContext. + * @param query_context The QueryContext. + * @return A LIPFilterBuilder object, or nullptr if \p lip_deployment_index is invalid. + */ +inline LIPFilterBuilder* MakeLIPFilterBuilder( + const QueryContext::lip_deployment_id lip_deployment_index, + const QueryContext *query_context) { + if (lip_deployment_index == QueryContext::kInvalidLIPDeploymentId) { + return nullptr; + } else { + const LIPFilterDeployment *lip_filter_deployment = + query_context->getLIPDeployment(lip_deployment_index); + return lip_filter_deployment->createLIPFilterBuilder(); + } +} + +/** + * @brief Create a LIPFilterAdaptiveProber for the given LIPFilterDeployment + * in QueryContext. + * + * @param lip_deployment_index The id of the LIPFilterDeployment in QueryContext. + * @param query_context The QueryContext. + * @return A LIPFilterAdaptiveProber object, or nullptr if \p lip_deployment_index + * is invalid. + */ +inline LIPFilterAdaptiveProber* MakeLIPFilterAdaptiveProber( + const QueryContext::lip_deployment_id lip_deployment_index, + const QueryContext *query_context) { + if (lip_deployment_index == QueryContext::kInvalidLIPDeploymentId) { + return nullptr; + } else { + const LIPFilterDeployment *lip_filter_deployment = + query_context->getLIPDeployment(lip_deployment_index); + return lip_filter_deployment->createLIPFilterAdaptiveProber(); + } +} + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_UTIL_HPP_