Added the partitioned hash join in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/968ce3f7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/968ce3f7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/968ce3f7 Branch: refs/heads/reorder-attrs Commit: 968ce3f75969817b3dcc16c0c1c3218817cfe9b8 Parents: 7bce0b8 Author: Zuyu Zhang <zu...@apache.org> Authored: Wed Jan 25 15:53:39 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Wed Jan 25 16:41:35 2017 -0800 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 3 + query_execution/ForemanDistributed.cpp | 7 +- query_execution/PolicyEnforcerDistributed.cpp | 2 + query_execution/PolicyEnforcerDistributed.hpp | 11 +- query_execution/QueryExecutionTypedefs.hpp | 6 ++ query_execution/QueryManagerDistributed.cpp | 11 ++ query_execution/QueryManagerDistributed.hpp | 40 ++++--- .../tests/execution_generator/CMakeLists.txt | 6 ++ relational_operators/BuildHashOperator.cpp | 9 +- relational_operators/BuildHashOperator.hpp | 18 ---- relational_operators/CMakeLists.txt | 1 + relational_operators/DestroyHashOperator.cpp | 4 +- relational_operators/DestroyHashOperator.hpp | 8 +- relational_operators/HashJoinOperator.cpp | 31 +++--- relational_operators/HashJoinOperator.hpp | 105 ++++--------------- relational_operators/WorkOrder.proto | 10 +- relational_operators/WorkOrderFactory.cpp | 10 -- 17 files changed, 115 insertions(+), 167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 0f74384..c4c4079 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -156,6 +156,7 @@ if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed glog quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogTypedefs quickstep_catalog_Catalog_proto quickstep_queryexecution_PolicyEnforcerBase quickstep_queryexecution_QueryContext @@ -252,7 +253,9 @@ target_link_libraries(quickstep_queryexecution_QueryManagerBase quickstep_utility_Macros) if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_QueryManagerDistributed + quickstep_catalog_CatalogTypedefs quickstep_queryexecution_QueryContext + quickstep_queryexecution_QueryContext_proto quickstep_queryexecution_QueryExecutionMessages_proto quickstep_queryexecution_QueryExecutionState quickstep_queryexecution_QueryExecutionTypedefs http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index 0fa701d..fe4c483 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -278,23 +278,28 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p size_t *shiftboss_index_for_hash_join) { const S::WorkOrder &work_order_proto = proto.work_order(); QueryContext::join_hash_table_id join_hash_table_index; + partition_id part_id; switch (work_order_proto.work_order_type()) { case S::BUILD_HASH: join_hash_table_index = work_order_proto.GetExtension(S::BuildHashWorkOrder::join_hash_table_index); + part_id = work_order_proto.GetExtension(S::BuildHashWorkOrder::partition_id); break; case S::HASH_JOIN: join_hash_table_index = work_order_proto.GetExtension(S::HashJoinWorkOrder::join_hash_table_index); + part_id = work_order_proto.GetExtension(S::HashJoinWorkOrder::partition_id); break; case S::DESTROY_HASH: join_hash_table_index = work_order_proto.GetExtension(S::DestroyHashWorkOrder::join_hash_table_index); + part_id = work_order_proto.GetExtension(S::DestroyHashWorkOrder::partition_id); break; default: return false; } static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForHashJoin( - proto.query_id(), join_hash_table_index, next_shiftboss_index_to_schedule, shiftboss_index_for_hash_join); + proto.query_id(), join_hash_table_index, part_id, next_shiftboss_index_to_schedule, + shiftboss_index_for_hash_join); return true; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index c5642bc..e9f57d3 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -173,11 +173,13 @@ void PolicyEnforcerDistributed::getShiftbossIndexForAggregation( void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin( const std::size_t query_id, const QueryContext::join_hash_table_id join_hash_table_index, + const partition_id part_id, const std::size_t next_shiftboss_index_to_schedule, std::size_t *shiftboss_index) { DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get()); query_manager->getShiftbossIndexForHashJoin(join_hash_table_index, + part_id, next_shiftboss_index_to_schedule, shiftboss_index); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp index 2c00a6b..248948a 100644 --- a/query_execution/PolicyEnforcerDistributed.hpp +++ b/query_execution/PolicyEnforcerDistributed.hpp @@ -21,6 +21,7 @@ #include <utility> #include <vector> +#include "catalog/CatalogTypedefs.hpp" #include "query_execution/PolicyEnforcerBase.hpp" #include "query_execution/QueryContext.hpp" #include "query_execution/QueryExecutionMessages.pb.h" @@ -124,19 +125,21 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase { /** * @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder. - * If it is the first BuildHash on <join_hash_table_index>, <shiftboss_index> - * will be set to <next_shiftboss_index_to_schedule>. Otherwise, - * <shiftboss_index> will be set to the index of the Shiftboss that has - * executed the first BuildHash. + * If it is the first BuildHash on <join_hash_table_index, part_id>, + * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>. + * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that + * has executed the first BuildHash. * * @param query_id The query id. * @param join_hash_table_index The Hash Table for the Join. + * @param part_id The partition ID. * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder. * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder. **/ void getShiftbossIndexForHashJoin( const std::size_t query_id, const QueryContext::join_hash_table_id join_hash_table_index, + const partition_id part_id, const std::size_t next_shiftboss_index_to_schedule, std::size_t *shiftboss_index); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index faf2132..9f78302 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -62,6 +62,12 @@ using ClientIDMap = ThreadIDBasedMap<client_id, 'a', 'p'>; +#ifdef QUICKSTEP_DISTRIBUTED + +constexpr std::size_t kInvalidShiftbossIndex = static_cast<std::size_t>(-1); + +#endif // QUICKSTEP_DISTRIBUTED + // We sort the following message types in the order of a life cycle of a query. enum QueryExecutionMessageType : message_type_id { kAdmitRequestMessage = 0, // Requesting a query (or queries) to be admitted, from http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/query_execution/QueryManagerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp index 5c7e0d8..6ac96ab 100644 --- a/query_execution/QueryManagerDistributed.cpp +++ b/query_execution/QueryManagerDistributed.cpp @@ -23,8 +23,10 @@ #include <cstdlib> #include <memory> #include <utility> +#include <vector> #include "query_execution/QueryContext.hpp" +#include "query_execution/QueryContext.pb.h" #include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryExecutionUtil.hpp" @@ -45,6 +47,7 @@ using std::malloc; using std::move; using std::size_t; using std::unique_ptr; +using std::vector; namespace quickstep { @@ -65,6 +68,14 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle, processOperator(index, false); } } + + const serialization::QueryContext &query_context_proto = query_handle->getQueryContextProto(); + shiftboss_indexes_for_aggrs_.resize(query_context_proto.aggregation_states_size(), kInvalidShiftbossIndex); + + for (int i = 0; i < query_context_proto.join_hash_tables_size(); ++i) { + shiftboss_indexes_for_hash_joins_.push_back( + vector<size_t>(query_context_proto.join_hash_tables(i).num_partitions(), kInvalidShiftbossIndex)); + } } serialization::WorkOrderMessage* QueryManagerDistributed::getNextWorkOrderMessage( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/query_execution/QueryManagerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp index 7a07fcb..631b15a 100644 --- a/query_execution/QueryManagerDistributed.hpp +++ b/query_execution/QueryManagerDistributed.hpp @@ -22,10 +22,12 @@ #include <cstddef> #include <memory> -#include <unordered_map> +#include <vector> +#include "catalog/CatalogTypedefs.hpp" #include "query_execution/QueryContext.hpp" #include "query_execution/QueryExecutionState.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryManagerBase.hpp" #include "query_execution/WorkOrderProtosContainer.hpp" #include "utility/Macros.hpp" @@ -105,13 +107,12 @@ class QueryManagerDistributed final : public QueryManagerBase { void getShiftbossIndexForAggregation(const QueryContext::aggregation_state_id aggr_state_index, const std::size_t next_shiftboss_index_to_schedule, std::size_t *shiftboss_index) { - const auto cit = shiftboss_indexes_for_aggrs_.find(aggr_state_index); - if (cit != shiftboss_indexes_for_aggrs_.end()) { - *shiftboss_index = cit->second; - } else { - shiftboss_indexes_for_aggrs_.emplace(aggr_state_index, next_shiftboss_index_to_schedule); - *shiftboss_index = next_shiftboss_index_to_schedule; + DCHECK_LT(aggr_state_index, shiftboss_indexes_for_aggrs_.size()); + if (shiftboss_indexes_for_aggrs_[aggr_state_index] == kInvalidShiftbossIndex) { + shiftboss_indexes_for_aggrs_[aggr_state_index] = next_shiftboss_index_to_schedule; } + + *shiftboss_index = shiftboss_indexes_for_aggrs_[aggr_state_index]; } /** @@ -119,19 +120,22 @@ class QueryManagerDistributed final : public QueryManagerBase { * Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>. * * @param join_hash_table_index The Hash Table for the Join. + * @param part_id The partition ID. * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder. * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder. **/ void getShiftbossIndexForHashJoin(const QueryContext::join_hash_table_id join_hash_table_index, + const partition_id part_id, const std::size_t next_shiftboss_index_to_schedule, std::size_t *shiftboss_index) { - const auto cit = shiftboss_indexes_for_hash_joins_.find(join_hash_table_index); - if (cit != shiftboss_indexes_for_hash_joins_.end()) { - *shiftboss_index = cit->second; - } else { - shiftboss_indexes_for_hash_joins_.emplace(join_hash_table_index, next_shiftboss_index_to_schedule); - *shiftboss_index = next_shiftboss_index_to_schedule; + DCHECK_LT(join_hash_table_index, shiftboss_indexes_for_hash_joins_.size()); + DCHECK_LT(part_id, shiftboss_indexes_for_hash_joins_[join_hash_table_index].size()); + + if (shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] == kInvalidShiftbossIndex) { + shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] = next_shiftboss_index_to_schedule; } + + *shiftboss_index = shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id]; } private: @@ -156,11 +160,13 @@ class QueryManagerDistributed final : public QueryManagerBase { std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_; - // A map from an aggregation id to its scheduled Shiftboss index. - std::unordered_map<QueryContext::aggregation_state_id, std::size_t> shiftboss_indexes_for_aggrs_; + // From an aggregation id (QueryContext::aggregation_state_id) to its + // scheduled Shiftboss index. + std::vector<std::size_t> shiftboss_indexes_for_aggrs_; - // A map from a join hash table to its scheduled Shiftboss index. - std::unordered_map<QueryContext::join_hash_table_id, std::size_t> shiftboss_indexes_for_hash_joins_; + // Get the scheduled Shiftboss index given + // [QueryContext::join_hash_table_id][partition_id]. + std::vector<std::vector<std::size_t>> shiftboss_indexes_for_hash_joins_; DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/query_optimizer/tests/execution_generator/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt index 2705130..d38f4aa 100644 --- a/query_optimizer/tests/execution_generator/CMakeLists.txt +++ b/query_optimizer/tests/execution_generator/CMakeLists.txt @@ -51,6 +51,11 @@ if (ENABLE_DISTRIBUTED) "${CMAKE_CURRENT_SOURCE_DIR}/Join.test" "${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin.test" "${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin/") + add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_partition + "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/Partition.test" + "${CMAKE_CURRENT_BINARY_DIR}/DistributedPartition.test" + "${CMAKE_CURRENT_BINARY_DIR}/DistributedPartition/") add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_select "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest" "${CMAKE_CURRENT_SOURCE_DIR}/Select.test" @@ -146,6 +151,7 @@ if (ENABLE_DISTRIBUTED) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin) + file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedPartition) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/relational_operators/BuildHashOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp index 14ec204..8f40fbb 100644 --- a/relational_operators/BuildHashOperator.cpp +++ b/relational_operators/BuildHashOperator.cpp @@ -80,7 +80,7 @@ bool BuildHashOperator::getAllWorkOrders( for (const block_id block : input_relation_block_ids_[part_id]) { container->addNormalWorkOrder( new BuildHashWorkOrder(query_id_, input_relation_, join_key_attributes_, any_join_key_attributes_nullable_, - num_partitions_, part_id, block, hash_table, storage_manager, + part_id, block, hash_table, storage_manager, CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)), op_index_); } @@ -94,9 +94,9 @@ bool BuildHashOperator::getAllWorkOrders( input_relation_block_ids_[part_id].size()) { container->addNormalWorkOrder( new BuildHashWorkOrder(query_id_, input_relation_, join_key_attributes_, any_join_key_attributes_nullable_, - num_partitions_, part_id, - input_relation_block_ids_[part_id][num_workorders_generated_[part_id]], hash_table, - storage_manager, CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)), + part_id, input_relation_block_ids_[part_id][num_workorders_generated_[part_id]], + hash_table, storage_manager, + CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)), op_index_); ++num_workorders_generated_[part_id]; } @@ -142,7 +142,6 @@ serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id } proto->SetExtension(serialization::BuildHashWorkOrder::any_join_key_attributes_nullable, any_join_key_attributes_nullable_); - proto->SetExtension(serialization::BuildHashWorkOrder::num_partitions, num_partitions_); proto->SetExtension(serialization::BuildHashWorkOrder::join_hash_table_index, hash_table_index_); proto->SetExtension(serialization::BuildHashWorkOrder::partition_id, part_id); proto->SetExtension(serialization::BuildHashWorkOrder::block_id, block); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/relational_operators/BuildHashOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp index c9f29cb..273e3b1 100644 --- a/relational_operators/BuildHashOperator.hpp +++ b/relational_operators/BuildHashOperator.hpp @@ -173,8 +173,6 @@ class BuildHashWorkOrder : public WorkOrder { * @param join_key_attributes The IDs of equijoin attributes in * input_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param num_partitions The number of partitions in 'input_relation'. If no - * partitions, it is one. * @param part_id The partition id of 'input_relation'. * @param build_block_id The block id. * @param hash_table The JoinHashTable to use. @@ -185,7 +183,6 @@ class BuildHashWorkOrder : public WorkOrder { const CatalogRelationSchema &input_relation, const std::vector<attribute_id> &join_key_attributes, const bool any_join_key_attributes_nullable, - const std::size_t num_partitions, const partition_id part_id, const block_id build_block_id, JoinHashTable *hash_table, @@ -195,7 +192,6 @@ class BuildHashWorkOrder : public WorkOrder { input_relation_(input_relation), join_key_attributes_(join_key_attributes), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), - num_partitions_(num_partitions), part_id_(part_id), build_block_id_(build_block_id), hash_table_(DCHECK_NOTNULL(hash_table)), @@ -210,8 +206,6 @@ class BuildHashWorkOrder : public WorkOrder { * @param join_key_attributes The IDs of equijoin attributes in * input_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param num_partitions The number of partitions in 'input_relation'. If no - * partitions, it is one. * @param part_id The partition id of 'input_relation'. * @param build_block_id The block id. * @param hash_table The JoinHashTable to use. @@ -222,7 +216,6 @@ class BuildHashWorkOrder : public WorkOrder { const CatalogRelationSchema &input_relation, std::vector<attribute_id> &&join_key_attributes, const bool any_join_key_attributes_nullable, - const std::size_t num_partitions, const partition_id part_id, const block_id build_block_id, JoinHashTable *hash_table, @@ -232,7 +225,6 @@ class BuildHashWorkOrder : public WorkOrder { input_relation_(input_relation), join_key_attributes_(std::move(join_key_attributes)), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), - num_partitions_(num_partitions), part_id_(part_id), build_block_id_(build_block_id), hash_table_(DCHECK_NOTNULL(hash_table)), @@ -248,15 +240,6 @@ class BuildHashWorkOrder : public WorkOrder { void execute() override; /** - * @brief Get the number of partitions. - * - * @return The number of partitions. - */ - std::size_t num_partitions() const { - return num_partitions_; - } - - /** * @brief Get the partition id. * * @return The partition id. @@ -269,7 +252,6 @@ class BuildHashWorkOrder : public WorkOrder { const CatalogRelationSchema &input_relation_; const std::vector<attribute_id> join_key_attributes_; const bool any_join_key_attributes_nullable_; - const std::size_t num_partitions_; const partition_id part_id_; const block_id build_block_id_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index 66ea2d1..78da7b8 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -207,6 +207,7 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator quickstep_catalog_CatalogRelationSchema quickstep_catalog_CatalogTypedefs quickstep_catalog_PartitionScheme + quickstep_catalog_PartitionSchemeHeader quickstep_expressions_predicate_Predicate quickstep_expressions_scalar_Scalar quickstep_queryexecution_QueryContext http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/relational_operators/DestroyHashOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/DestroyHashOperator.cpp b/relational_operators/DestroyHashOperator.cpp index 4827ef5..5b84bba 100644 --- a/relational_operators/DestroyHashOperator.cpp +++ b/relational_operators/DestroyHashOperator.cpp @@ -35,7 +35,7 @@ bool DestroyHashOperator::getAllWorkOrders( const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) { if (blocking_dependencies_met_ && !work_generated_) { - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { container->addNormalWorkOrder( new DestroyHashWorkOrder(query_id_, hash_table_index_, part_id, query_context), op_index_); @@ -47,7 +47,7 @@ bool DestroyHashOperator::getAllWorkOrders( bool DestroyHashOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { if (blocking_dependencies_met_ && !work_generated_) { - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { serialization::WorkOrder *proto = new serialization::WorkOrder; proto->set_work_order_type(serialization::DESTROY_HASH); proto->set_query_id(query_id_); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/relational_operators/DestroyHashOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/DestroyHashOperator.hpp b/relational_operators/DestroyHashOperator.hpp index 8a1fabd..b707999 100644 --- a/relational_operators/DestroyHashOperator.hpp +++ b/relational_operators/DestroyHashOperator.hpp @@ -53,14 +53,14 @@ class DestroyHashOperator : public RelationalOperator { * @brief Constructor. * * @param query_id The ID of the query to which this operator belongs. - * @param num_partitions The number of partitions. + * @param build_num_partitions The number of partitions in 'build_relation'. * @param hash_table_index The index of the JoinHashTable in QueryContext. **/ DestroyHashOperator(const std::size_t query_id, - const std::size_t num_partitions, + const std::size_t build_num_partitions, const QueryContext::join_hash_table_id hash_table_index) : RelationalOperator(query_id), - num_partitions_(num_partitions), + build_num_partitions_(build_num_partitions), hash_table_index_(hash_table_index), work_generated_(false) {} @@ -79,7 +79,7 @@ class DestroyHashOperator : public RelationalOperator { bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; private: - const std::size_t num_partitions_; + const std::size_t build_num_partitions_; const QueryContext::join_hash_table_id hash_table_index_; bool work_generated_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/relational_operators/HashJoinOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp index 0062b93..fd3841f 100644 --- a/relational_operators/HashJoinOperator.cpp +++ b/relational_operators/HashJoinOperator.cpp @@ -213,15 +213,15 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders( return true; } - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { const JoinHashTable &hash_table = *(query_context->getJoinHashTable(hash_table_index_, part_id)); for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) { container->addNormalWorkOrder( new JoinWorkOrderClass(query_id_, build_relation_, probe_relation_, join_key_attributes_, - any_join_key_attributes_nullable_, num_partitions_, part_id, probe_block_id, - residual_predicate, selection, hash_table, output_destination, storage_manager, + any_join_key_attributes_nullable_, part_id, probe_block_id, residual_predicate, + selection, hash_table, output_destination, storage_manager, CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)), op_index_); } @@ -229,14 +229,14 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders( started_ = true; return true; } else { - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { const JoinHashTable &hash_table = *(query_context->getJoinHashTable(hash_table_index_, part_id)); while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) { container->addNormalWorkOrder( new JoinWorkOrderClass(query_id_, build_relation_, probe_relation_, join_key_attributes_, - any_join_key_attributes_nullable_, num_partitions_, part_id, + any_join_key_attributes_nullable_, part_id, probe_relation_block_ids_[part_id][num_workorders_generated_[part_id]], residual_predicate, selection, hash_table, output_destination, storage_manager, CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)), @@ -269,16 +269,15 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders( return true; } - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { const JoinHashTable &hash_table = *(query_context->getJoinHashTable(hash_table_index_, part_id)); for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) { container->addNormalWorkOrder( new HashOuterJoinWorkOrder(query_id_, build_relation_, probe_relation_, join_key_attributes_, - any_join_key_attributes_nullable_, num_partitions_, part_id, probe_block_id, - selection, is_selection_on_build_, hash_table, output_destination, - storage_manager, + any_join_key_attributes_nullable_, part_id, probe_block_id, selection, + is_selection_on_build_, hash_table, output_destination, storage_manager, CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)), op_index_); } @@ -286,14 +285,14 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders( started_ = true; return true; } else { - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { const JoinHashTable &hash_table = *(query_context->getJoinHashTable(hash_table_index_, part_id)); while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) { container->addNormalWorkOrder( new HashOuterJoinWorkOrder(query_id_, build_relation_, probe_relation_, join_key_attributes_, - any_join_key_attributes_nullable_, num_partitions_, part_id, + any_join_key_attributes_nullable_, part_id, probe_relation_block_ids_[part_id][num_workorders_generated_[part_id]], selection, is_selection_on_build_, hash_table, output_destination, storage_manager, @@ -336,7 +335,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrderProtos( return true; } - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) { container->addWorkOrderProto( createNonOuterJoinWorkOrderProto(hash_join_type, probe_block_id, part_id), @@ -346,7 +345,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrderProtos( started_ = true; return true; } else { - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) { container->addWorkOrderProto( createNonOuterJoinWorkOrderProto(hash_join_type, @@ -376,7 +375,6 @@ serialization::WorkOrder* HashJoinOperator::createNonOuterJoinWorkOrderProto( } proto->SetExtension(serialization::HashJoinWorkOrder::any_join_key_attributes_nullable, any_join_key_attributes_nullable_); - proto->SetExtension(serialization::HashJoinWorkOrder::num_partitions, num_partitions_); proto->SetExtension(serialization::HashJoinWorkOrder::insert_destination_index, output_destination_index_); proto->SetExtension(serialization::HashJoinWorkOrder::join_hash_table_index, hash_table_index_); proto->SetExtension(serialization::HashJoinWorkOrder::partition_id, part_id); @@ -399,7 +397,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrderProtos(WorkOrderProtosContainer * return true; } - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) { container->addWorkOrderProto(createOuterJoinWorkOrderProto(probe_block_id, part_id), op_index_); } @@ -407,7 +405,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrderProtos(WorkOrderProtosContainer * started_ = true; return true; } else { - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) { container->addWorkOrderProto( createOuterJoinWorkOrderProto(probe_relation_block_ids_[part_id][num_workorders_generated_[part_id]], @@ -436,7 +434,6 @@ serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const } proto->SetExtension(serialization::HashJoinWorkOrder::any_join_key_attributes_nullable, any_join_key_attributes_nullable_); - proto->SetExtension(serialization::HashJoinWorkOrder::num_partitions, num_partitions_); proto->SetExtension(serialization::HashJoinWorkOrder::insert_destination_index, output_destination_index_); proto->SetExtension(serialization::HashJoinWorkOrder::join_hash_table_index, hash_table_index_); proto->SetExtension(serialization::HashJoinWorkOrder::selection_index, selection_index_); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/relational_operators/HashJoinOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp index e655f70..acfe3d2 100644 --- a/relational_operators/HashJoinOperator.hpp +++ b/relational_operators/HashJoinOperator.hpp @@ -29,6 +29,7 @@ #include "catalog/CatalogRelation.hpp" #include "catalog/CatalogTypedefs.hpp" #include "catalog/PartitionScheme.hpp" +#include "catalog/PartitionSchemeHeader.hpp" #include "query_execution/QueryContext.hpp" #include "relational_operators/RelationalOperator.hpp" #include "relational_operators/WorkOrder.hpp" @@ -101,8 +102,8 @@ class HashJoinOperator : public RelationalOperator { * @param join_key_attributes The IDs of equijoin attributes in * probe_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param num_partitions The number of partitions in 'input_relation'. If no - * partitions, it is one. + * @param build_num_partitions The number of partitions in 'build_relation'. + * If no partitions, it is one. * @param output_relation The output relation. * @param output_destination_index The index of the InsertDestination in the * QueryContext to insert the join results. @@ -128,7 +129,7 @@ class HashJoinOperator : public RelationalOperator { const bool probe_relation_is_stored, const std::vector<attribute_id> &join_key_attributes, const bool any_join_key_attributes_nullable, - const std::size_t num_partitions, + const std::size_t build_num_partitions, const CatalogRelation &output_relation, const QueryContext::insert_destination_id output_destination_index, const QueryContext::join_hash_table_id hash_table_index, @@ -142,7 +143,7 @@ class HashJoinOperator : public RelationalOperator { probe_relation_is_stored_(probe_relation_is_stored), join_key_attributes_(join_key_attributes), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), - num_partitions_(num_partitions), + build_num_partitions_(build_num_partitions), output_relation_(output_relation), output_destination_index_(output_destination_index), hash_table_index_(hash_table_index), @@ -152,8 +153,8 @@ class HashJoinOperator : public RelationalOperator { ? std::vector<bool>() : *is_selection_on_build), join_type_(join_type), - probe_relation_block_ids_(num_partitions), - num_workorders_generated_(num_partitions), + probe_relation_block_ids_(build_num_partitions), + num_workorders_generated_(build_num_partitions), started_(false) { DCHECK(join_type != JoinType::kLeftOuterJoin || (is_selection_on_build != nullptr && @@ -162,12 +163,15 @@ class HashJoinOperator : public RelationalOperator { if (probe_relation_is_stored) { if (probe_relation.hasPartitionScheme()) { const PartitionScheme &part_scheme = *probe_relation.getPartitionScheme(); - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + DCHECK_EQ(build_num_partitions_, part_scheme.getPartitionSchemeHeader().getNumPartitions()); + for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { probe_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id); } } else { - // No partition. - probe_relation_block_ids_[0] = probe_relation.getBlocksSnapshot(); + // Broadcast hash join if probe has no partitions. + for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { + probe_relation_block_ids_[part_id] = probe_relation.getBlocksSnapshot(); + } } } } @@ -209,7 +213,14 @@ class HashJoinOperator : public RelationalOperator { const partition_id part_id) override { DCHECK_EQ(probe_relation_.getID(), input_relation_id); - probe_relation_block_ids_[part_id].push_back(input_block_id); + if (probe_relation_.hasPartitionScheme()) { + probe_relation_block_ids_[part_id].push_back(input_block_id); + } else { + // Broadcast hash join if probe has no partitions. + for (std::size_t build_part_id = 0; build_part_id < build_num_partitions_; ++build_part_id) { + probe_relation_block_ids_[build_part_id].push_back(input_block_id); + } + } } QueryContext::insert_destination_id getInsertDestinationID() const override { @@ -261,7 +272,7 @@ class HashJoinOperator : public RelationalOperator { const bool probe_relation_is_stored_; const std::vector<attribute_id> join_key_attributes_; const bool any_join_key_attributes_nullable_; - const std::size_t num_partitions_; + const std::size_t build_num_partitions_; const CatalogRelation &output_relation_; const QueryContext::insert_destination_id output_destination_index_; const QueryContext::join_hash_table_id hash_table_index_; @@ -295,8 +306,6 @@ class HashInnerJoinWorkOrder : public WorkOrder { * @param join_key_attributes The IDs of equijoin attributes in \c * probe_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param num_partitions The number of partitions in 'probe_relation'. If no - * partitions, it is one. * @param part_id The partition id of 'probe_relation'. * @param lookup_block_id The block id of the probe_relation. * @param residual_predicate If non-null, apply as an additional filter to @@ -317,7 +326,6 @@ class HashInnerJoinWorkOrder : public WorkOrder { const CatalogRelationSchema &probe_relation, const std::vector<attribute_id> &join_key_attributes, const bool any_join_key_attributes_nullable, - const std::size_t num_partitions, const partition_id part_id, const block_id lookup_block_id, const Predicate *residual_predicate, @@ -331,7 +339,6 @@ class HashInnerJoinWorkOrder : public WorkOrder { probe_relation_(probe_relation), join_key_attributes_(join_key_attributes), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), - num_partitions_(num_partitions), part_id_(part_id), block_id_(lookup_block_id), residual_predicate_(residual_predicate), @@ -352,8 +359,6 @@ class HashInnerJoinWorkOrder : public WorkOrder { * @param join_key_attributes The IDs of equijoin attributes in \c * probe_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param num_partitions The number of partitions in 'probe_relation'. If no - * partitions, it is one. * @param part_id The partition id of 'probe_relation'. * @param lookup_block_id The block id of the probe_relation. * @param residual_predicate If non-null, apply as an additional filter to @@ -374,7 +379,6 @@ class HashInnerJoinWorkOrder : public WorkOrder { const CatalogRelationSchema &probe_relation, std::vector<attribute_id> &&join_key_attributes, const bool any_join_key_attributes_nullable, - const std::size_t num_partitions, const partition_id part_id, const block_id lookup_block_id, const Predicate *residual_predicate, @@ -388,7 +392,6 @@ class HashInnerJoinWorkOrder : public WorkOrder { probe_relation_(probe_relation), join_key_attributes_(std::move(join_key_attributes)), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), - num_partitions_(num_partitions), part_id_(part_id), block_id_(lookup_block_id), residual_predicate_(residual_predicate), @@ -411,15 +414,6 @@ class HashInnerJoinWorkOrder : public WorkOrder { void execute() override; /** - * @brief Get the number of partitions. - * - * @return The number of partitions. - */ - std::size_t num_partitions() const { - return num_partitions_; - } - - /** * @brief Get the partition id. * * @return The partition id. @@ -433,7 +427,6 @@ class HashInnerJoinWorkOrder : public WorkOrder { const CatalogRelationSchema &probe_relation_; const std::vector<attribute_id> join_key_attributes_; const bool any_join_key_attributes_nullable_; - const std::size_t num_partitions_; const partition_id part_id_; const block_id block_id_; const Predicate *residual_predicate_; @@ -465,8 +458,6 @@ class HashSemiJoinWorkOrder : public WorkOrder { * @param join_key_attributes The IDs of equijoin attributes in \c * probe_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param num_partitions The number of partitions in 'probe_relation'. If no - * partitions, it is one. * @param part_id The partition id of 'probe_relation'. * @param lookup_block_id The block id of the probe_relation. * @param residual_predicate If non-null, apply as an additional filter to @@ -487,7 +478,6 @@ class HashSemiJoinWorkOrder : public WorkOrder { const CatalogRelationSchema &probe_relation, const std::vector<attribute_id> &join_key_attributes, const bool any_join_key_attributes_nullable, - const std::size_t num_partitions, const partition_id part_id, const block_id lookup_block_id, const Predicate *residual_predicate, @@ -501,7 +491,6 @@ class HashSemiJoinWorkOrder : public WorkOrder { probe_relation_(probe_relation), join_key_attributes_(join_key_attributes), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), - num_partitions_(num_partitions), part_id_(part_id), block_id_(lookup_block_id), residual_predicate_(residual_predicate), @@ -522,8 +511,6 @@ class HashSemiJoinWorkOrder : public WorkOrder { * @param join_key_attributes The IDs of equijoin attributes in \c * probe_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param num_partitions The number of partitions in 'probe_relation'. If no - * partitions, it is one. * @param part_id The partition id of 'probe_relation'. * @param lookup_block_id The block id of the probe_relation. * @param residual_predicate If non-null, apply as an additional filter to @@ -544,7 +531,6 @@ class HashSemiJoinWorkOrder : public WorkOrder { const CatalogRelationSchema &probe_relation, std::vector<attribute_id> &&join_key_attributes, const bool any_join_key_attributes_nullable, - const std::size_t num_partitions, const partition_id part_id, const block_id lookup_block_id, const Predicate *residual_predicate, @@ -558,7 +544,6 @@ class HashSemiJoinWorkOrder : public WorkOrder { probe_relation_(probe_relation), join_key_attributes_(std::move(join_key_attributes)), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), - num_partitions_(num_partitions), part_id_(part_id), block_id_(lookup_block_id), residual_predicate_(residual_predicate), @@ -573,15 +558,6 @@ class HashSemiJoinWorkOrder : public WorkOrder { void execute() override; /** - * @brief Get the number of partitions. - * - * @return The number of partitions. - */ - std::size_t num_partitions() const { - return num_partitions_; - } - - /** * @brief Get the partition id. * * @return The partition id. @@ -599,7 +575,6 @@ class HashSemiJoinWorkOrder : public WorkOrder { const CatalogRelationSchema &probe_relation_; const std::vector<attribute_id> join_key_attributes_; const bool any_join_key_attributes_nullable_; - const std::size_t num_partitions_; const partition_id part_id_; const block_id block_id_; const Predicate *residual_predicate_; @@ -631,8 +606,6 @@ class HashAntiJoinWorkOrder : public WorkOrder { * @param join_key_attributes The IDs of equijoin attributes in \c * probe_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param num_partitions The number of partitions in 'probe_relation'. If no - * partitions, it is one. * @param part_id The partition id of 'probe_relation'. * @param lookup_block_id The block id of the probe_relation. * @param residual_predicate If non-null, apply as an additional filter to @@ -653,7 +626,6 @@ class HashAntiJoinWorkOrder : public WorkOrder { const CatalogRelationSchema &probe_relation, const std::vector<attribute_id> &join_key_attributes, const bool any_join_key_attributes_nullable, - const std::size_t num_partitions, const partition_id part_id, const block_id lookup_block_id, const Predicate *residual_predicate, @@ -667,7 +639,6 @@ class HashAntiJoinWorkOrder : public WorkOrder { probe_relation_(probe_relation), join_key_attributes_(join_key_attributes), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), - num_partitions_(num_partitions), part_id_(part_id), block_id_(lookup_block_id), residual_predicate_(residual_predicate), @@ -688,8 +659,6 @@ class HashAntiJoinWorkOrder : public WorkOrder { * @param join_key_attributes The IDs of equijoin attributes in \c * probe_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param num_partitions The number of partitions in 'probe_relation'. If no - * partitions, it is one. * @param part_id The partition id of 'probe_relation'. * @param lookup_block_id The block id of the probe_relation. * @param residual_predicate If non-null, apply as an additional filter to @@ -710,7 +679,6 @@ class HashAntiJoinWorkOrder : public WorkOrder { const CatalogRelationSchema &probe_relation, std::vector<attribute_id> &&join_key_attributes, const bool any_join_key_attributes_nullable, - const std::size_t num_partitions, const partition_id part_id, const block_id lookup_block_id, const Predicate *residual_predicate, @@ -724,7 +692,6 @@ class HashAntiJoinWorkOrder : public WorkOrder { probe_relation_(probe_relation), join_key_attributes_(std::move(join_key_attributes)), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), - num_partitions_(num_partitions), part_id_(part_id), block_id_(lookup_block_id), residual_predicate_(residual_predicate), @@ -745,15 +712,6 @@ class HashAntiJoinWorkOrder : public WorkOrder { } /** - * @brief Get the number of partitions. - * - * @return The number of partitions. - */ - std::size_t num_partitions() const { - return num_partitions_; - } - - /** * @brief Get the partition id. * * @return The partition id. @@ -771,7 +729,6 @@ class HashAntiJoinWorkOrder : public WorkOrder { const CatalogRelationSchema &probe_relation_; const std::vector<attribute_id> join_key_attributes_; const bool any_join_key_attributes_nullable_; - const std::size_t num_partitions_; const partition_id part_id_; const block_id block_id_; const Predicate *residual_predicate_; @@ -802,8 +759,6 @@ class HashOuterJoinWorkOrder : public WorkOrder { * @param join_key_attributes The IDs of equijoin attributes in \c * probe_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param num_partitions The number of partitions in 'probe_relation'. If no - * partitions, it is one. * @param part_id The partition id of 'probe_relation'. * @param lookup_block_id The block id of the probe_relation. * @param selection A list of Scalars corresponding to the relation attributes @@ -823,7 +778,6 @@ class HashOuterJoinWorkOrder : public WorkOrder { const CatalogRelationSchema &probe_relation, const std::vector<attribute_id> &join_key_attributes, const bool any_join_key_attributes_nullable, - const std::size_t num_partitions, const partition_id part_id, const block_id lookup_block_id, const std::vector<std::unique_ptr<const Scalar>> &selection, @@ -837,7 +791,6 @@ class HashOuterJoinWorkOrder : public WorkOrder { probe_relation_(probe_relation), join_key_attributes_(join_key_attributes), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), - num_partitions_(num_partitions), part_id_(part_id), block_id_(lookup_block_id), selection_(selection), @@ -858,8 +811,6 @@ class HashOuterJoinWorkOrder : public WorkOrder { * @param join_key_attributes The IDs of equijoin attributes in \c * probe_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param num_partitions The number of partitions in 'probe_relation'. If no - * partitions, it is one. * @param part_id The partition id of 'probe_relation'. * @param lookup_block_id The block id of the probe_relation. * @param selection A list of Scalars corresponding to the relation attributes @@ -878,7 +829,6 @@ class HashOuterJoinWorkOrder : public WorkOrder { const CatalogRelationSchema &probe_relation, std::vector<attribute_id> &&join_key_attributes, const bool any_join_key_attributes_nullable, - const std::size_t num_partitions, const partition_id part_id, const block_id lookup_block_id, const std::vector<std::unique_ptr<const Scalar>> &selection, @@ -892,7 +842,6 @@ class HashOuterJoinWorkOrder : public WorkOrder { probe_relation_(probe_relation), join_key_attributes_(std::move(join_key_attributes)), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), - num_partitions_(num_partitions), part_id_(part_id), block_id_(lookup_block_id), selection_(selection), @@ -907,15 +856,6 @@ class HashOuterJoinWorkOrder : public WorkOrder { void execute() override; /** - * @brief Get the number of partitions. - * - * @return The number of partitions. - */ - std::size_t num_partitions() const { - return num_partitions_; - } - - /** * @brief Get the partition id. * * @return The partition id. @@ -929,7 +869,6 @@ class HashOuterJoinWorkOrder : public WorkOrder { const CatalogRelationSchema &probe_relation_; const std::vector<attribute_id> join_key_attributes_; const bool any_join_key_attributes_nullable_; - const std::size_t num_partitions_; const partition_id part_id_; const block_id block_id_; const std::vector<std::unique_ptr<const Scalar>> &selection_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index b914fce..f8d9246 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -63,16 +63,15 @@ message AggregationWorkOrder { } } -// Next tag: 40. +// Next tag: 39. message BuildHashWorkOrder { extend WorkOrder { // All required. optional int32 relation_id = 32; repeated int32 join_key_attributes = 33; optional bool any_join_key_attributes_nullable = 34; - optional uint64 num_partitions = 38; optional uint32 join_hash_table_index = 35; - optional uint64 partition_id = 39; + optional uint64 partition_id = 38; optional fixed64 block_id = 36; optional int32 lip_deployment_index = 37; } @@ -113,7 +112,7 @@ message FinalizeAggregationWorkOrder { } } -// Next tag: 174. +// Next tag: 173. message HashJoinWorkOrder { enum HashJoinWorkOrderType { HASH_ANTI_JOIN = 0; @@ -129,10 +128,9 @@ message HashJoinWorkOrder { optional int32 probe_relation_id = 162; repeated int32 join_key_attributes = 163; optional bool any_join_key_attributes_nullable = 164; - optional uint64 num_partitions = 172; optional int32 insert_destination_index = 165; optional uint32 join_hash_table_index = 166; - optional uint64 partition_id = 173; + optional uint64 partition_id = 172; optional int32 selection_index = 167; optional fixed64 block_id = 168; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/968ce3f7/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index c09bcbe..a6cba02 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -107,7 +107,6 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::BuildHashWorkOrder::relation_id)), move(join_key_attributes), proto.GetExtension(serialization::BuildHashWorkOrder::any_join_key_attributes_nullable), - proto.GetExtension(serialization::BuildHashWorkOrder::num_partitions), part_id, proto.GetExtension(serialization::BuildHashWorkOrder::block_id), query_context->getJoinHashTable( @@ -199,9 +198,6 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder const block_id lookup_block_id = proto.GetExtension(serialization::HashJoinWorkOrder::block_id); - const std::size_t num_partitions = - proto.GetExtension(serialization::HashJoinWorkOrder::num_partitions); - const Predicate *residual_predicate = nullptr; if (hash_join_work_order_type != serialization::HashJoinWorkOrder::HASH_OUTER_JOIN) { residual_predicate = @@ -233,7 +229,6 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder probe_relation, move(join_key_attributes), any_join_key_attributes_nullable, - num_partitions, part_id, lookup_block_id, residual_predicate, @@ -251,7 +246,6 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder probe_relation, move(join_key_attributes), any_join_key_attributes_nullable, - num_partitions, part_id, lookup_block_id, residual_predicate, @@ -277,7 +271,6 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder probe_relation, move(join_key_attributes), any_join_key_attributes_nullable, - num_partitions, part_id, lookup_block_id, selection, @@ -295,7 +288,6 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder probe_relation, move(join_key_attributes), any_join_key_attributes_nullable, - num_partitions, part_id, lookup_block_id, residual_predicate, @@ -544,7 +536,6 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, return proto.HasExtension(serialization::BuildHashWorkOrder::any_join_key_attributes_nullable) && proto.HasExtension(serialization::BuildHashWorkOrder::block_id) && proto.HasExtension(serialization::BuildHashWorkOrder::join_hash_table_index) && - proto.HasExtension(serialization::BuildHashWorkOrder::num_partitions) && proto.HasExtension(serialization::BuildHashWorkOrder::partition_id) && query_context.isValidJoinHashTableId( proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index), @@ -648,7 +639,6 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, query_context.isValidInsertDestinationId( proto.GetExtension(serialization::HashJoinWorkOrder::insert_destination_index)) && proto.HasExtension(serialization::HashJoinWorkOrder::join_hash_table_index) && - proto.HasExtension(serialization::HashJoinWorkOrder::num_partitions) && proto.HasExtension(serialization::HashJoinWorkOrder::partition_id) && query_context.isValidJoinHashTableId( proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index),