Updates to transitive closure
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2aefd7bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2aefd7bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2aefd7bc Branch: refs/heads/transitive-closure Commit: 2aefd7bce5bad9bb6063b4fd71ec37876d58662d Parents: 734ddc1 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Mon Dec 11 14:45:08 2017 -0600 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Mon Dec 11 16:07:23 2017 -0600 ---------------------------------------------------------------------- query_optimizer/ExecutionGenerator.cpp | 19 ++- query_optimizer/PhysicalGenerator.cpp | 2 +- .../BuildTransitiveClosureOperator.cpp | 2 - relational_operators/CMakeLists.txt | 5 + .../InitializeTransitiveClosureOperator.cpp | 6 +- .../TransitiveClosureOperator.cpp | 158 +++++++++++++++++++ .../TransitiveClosureOperator.hpp | 86 +++++++++- storage/TransitiveClosureState.hpp | 8 + types/containers/ColumnVector.hpp | 7 + .../BarrieredReadWriteConcurrentBitVector.hpp | 7 + utility/BitVector.hpp | 28 +++- 11 files changed, 314 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 8f29271..648b937 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -2488,18 +2488,23 @@ void ExecutionGenerator::convertTransitiveClosure( &output_relation, insert_destination_proto); + const QueryPlan::DAGNodeIndex tc_operator_index = + execution_plan_->addRelationalOperator( + new TransitiveClosureOperator(query_handle_->query_id(), + transitive_closure_state_index, + *output_relation, + insert_destination_index)); + insert_destination_proto->set_relational_op_index(tc_operator_index); - (void)insert_destination_index; + execution_plan_->addDirectDependency(tc_operator_index, + build_tc_operator_index, + true /* is_pipeline_breaker */); - // TODO: fix - insert_destination_proto->set_relational_op_index(build_tc_operator_index /* FIX */); physical_to_output_relation_map_.emplace( std::piecewise_construct, std::forward_as_tuple(physical_plan), - std::forward_as_tuple(build_tc_operator_index /* FIX */, output_relation)); - - temporary_relation_info_vec_.emplace_back(build_tc_operator_index /* FIX */, - output_relation); + std::forward_as_tuple(tc_operator_index, output_relation)); + temporary_relation_info_vec_.emplace_back(tc_operator_index, output_relation); } } // namespace optimizer http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/query_optimizer/PhysicalGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp index b7b0db0..865cd11 100644 --- a/query_optimizer/PhysicalGenerator.cpp +++ b/query_optimizer/PhysicalGenerator.cpp @@ -194,7 +194,7 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() { << physical_plan_->toString(); } - std::cerr << "Optimized physical plan:\n" << physical_plan_->toString(); + DVLOG(4) << "Optimized physical plan:\n" << physical_plan_->toString(); if (FLAGS_visualize_plan) { quickstep::PlanVisualizer plan_visualizer; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/relational_operators/BuildTransitiveClosureOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildTransitiveClosureOperator.cpp b/relational_operators/BuildTransitiveClosureOperator.cpp index e151756..919a974 100644 --- a/relational_operators/BuildTransitiveClosureOperator.cpp +++ b/relational_operators/BuildTransitiveClosureOperator.cpp @@ -107,7 +107,6 @@ void BuildTransitiveClosureWorkOrder::execute() { } void BuildTransitiveClosureWorkOrder::buildStartRelation(ValueAccessor *accessor) { - std::cout << "BuildStartRelation: " << block_ << "\n"; InvokeOnAnyValueAccessor( accessor, [&](auto *accessor) -> void { @@ -119,7 +118,6 @@ void BuildTransitiveClosureWorkOrder::buildStartRelation(ValueAccessor *accessor } void BuildTransitiveClosureWorkOrder::buildEdgeRelation(ValueAccessor *accessor) { - std::cout << "BuildEdgeRelation: " << block_ << "\n"; InvokeOnAnyValueAccessor( accessor, [&](auto *accessor) -> void { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index 6cc7f08..e85eb4e 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -585,11 +585,16 @@ endif(QUICKSTEP_HAVE_FILE_MANAGER_HDFS) target_link_libraries(quickstep_relationaloperators_TransitiveClosureOperator glog quickstep_catalog_CatalogRelation + quickstep_cli_Flags quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer + quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_StorageBlockInfo quickstep_storage_TransitiveClosureState + quickstep_utility_BitVector quickstep_utility_Macros quickstep_utility_Range tmb) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/relational_operators/InitializeTransitiveClosureOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/InitializeTransitiveClosureOperator.cpp b/relational_operators/InitializeTransitiveClosureOperator.cpp index a6ffe6f..ff21cf9 100644 --- a/relational_operators/InitializeTransitiveClosureOperator.cpp +++ b/relational_operators/InitializeTransitiveClosureOperator.cpp @@ -45,10 +45,15 @@ bool InitializeTransitiveClosureOperator::getAllWorkOrders( if (started_) { return true; } + started_ = true; TransitiveClosureState *state = query_context->getTransitiveClosureState(transitive_closure_context_index_); + if (state->range() == 0) { + return true; + } + constexpr std::size_t kMinBatchSize = 1024ul * 1024ul * 4ul; const std::size_t range = state->range(); const std::size_t num_batches = @@ -66,7 +71,6 @@ bool InitializeTransitiveClosureOperator::getAllWorkOrders( op_index_); } - started_ = true; return true; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/relational_operators/TransitiveClosureOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/TransitiveClosureOperator.cpp b/relational_operators/TransitiveClosureOperator.cpp index e69de29..2d2776a 100644 --- a/relational_operators/TransitiveClosureOperator.cpp +++ b/relational_operators/TransitiveClosureOperator.cpp @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "relational_operators/TransitiveClosureOperator.hpp" + +#include <algorithm> +#include <cstddef> + +#include "cli/Flags.hpp" +#include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" +#include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" +#include "storage/TransitiveClosureState.hpp" +#include "types/IntType.hpp" +#include "types/containers/ColumnVector.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" +#include "utility/BitVector.hpp" +#include "utility/Range.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace quickstep { + +bool TransitiveClosureOperator::getAllWorkOrders( + WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) { + if (started_) { + return true; + } + started_ = true; + + TransitiveClosureState *state = + query_context->getTransitiveClosureState(transitive_closure_context_index_); + + if (state->range() == 0) { + return true; + } + + InsertDestination *output_destination = + query_context->getInsertDestination(output_destination_index_); + + const std::size_t num_batches = std::min(state->range(), FLAGS_num_workers * 2); + const RangeSplitter splitter = + RangeSplitter::CreateWithNumPartitions(0, state->range(), num_batches); + + for (std::size_t i = 0; i < splitter.getNumPartitions(); ++i) { + container->addNormalWorkOrder( + new TransitiveClosureWorkOrder(query_id_, + splitter.getPartition(i), + state, + output_destination), + op_index_); + } + + return true; +} + +bool TransitiveClosureOperator::getAllWorkOrderProtos( + WorkOrderProtosContainer *container) { + LOG(FATAL) << "Not supported"; +} + +void TransitiveClosureWorkOrder::execute() { + std::vector<int> delta; + delta.reserve(range_); + BitVector<false> next(range_, false); + BitVector<false> result(range_, false); + + const int kBulkInsertBatchSize = std::max(0x10000, state_->range()); + + std::shared_ptr<NativeColumnVector> src_cv = + std::make_shared<NativeColumnVector>(IntType::InstanceNonNullable(), + kBulkInsertBatchSize); + std::shared_ptr<NativeColumnVector> dst_cv = + std::make_shared<NativeColumnVector>(IntType::InstanceNonNullable(), + kBulkInsertBatchSize); + + int total = 0; + for (int src = interval_.begin(); src < interval_.end(); ++src) { + if (state_->hasStart(src)) { + // Evaluate single source transitive closure. + evaluateSingleSource(src, &delta, &next, &result); + + const int num_values = result.onesCount(); + if (total + num_values > kBulkInsertBatchSize) { + bulkInsert(src_cv, dst_cv); + src_cv->clear(); + dst_cv->clear(); + total = 0; + } + + std::size_t dst = -1; + for (int i = 0; i < num_values; ++i) { + *static_cast<int*>(src_cv->getPtrForDirectWrite()) = src; + dst = result.firstOne(dst + 1); + *static_cast<int*>(dst_cv->getPtrForDirectWrite()) = dst; + } + + total += num_values; + } + } + if (total > 0) { + bulkInsert(src_cv, dst_cv); + } +} + +void TransitiveClosureWorkOrder::evaluateSingleSource( + const int start, + std::vector<int> *delta, + BitVector<false> *next, + BitVector<false> *result) const { + delta->clear(); + delta->emplace_back(start); + result->clear(); + + while (!delta->empty()) { + next->clear(); + for (const int source : *delta) { + next->unionWith(state_->getEdgeData(source)); + } + delta->clear(); + next->subtractTo(*result, delta); + result->unionWith(*next); + } +} + +void TransitiveClosureWorkOrder::bulkInsert(const ColumnVectorPtr &src_cv, + const ColumnVectorPtr &dst_cv) { + ColumnVectorsValueAccessor columns; + columns.addColumn(src_cv); + columns.addColumn(dst_cv); + output_destination_->bulkInsertTuples(&columns); +} + +} // namespace quickstep + http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/relational_operators/TransitiveClosureOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/TransitiveClosureOperator.hpp b/relational_operators/TransitiveClosureOperator.hpp index f983a4a..d4ac13c 100644 --- a/relational_operators/TransitiveClosureOperator.hpp +++ b/relational_operators/TransitiveClosureOperator.hpp @@ -30,6 +30,8 @@ #include "relational_operators/WorkOrder.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/TransitiveClosureState.hpp" +#include "types/containers/ColumnVector.hpp" +#include "utility/BitVector.hpp" #include "utility/Macros.hpp" #include "utility/Range.hpp" @@ -41,6 +43,8 @@ namespace tmb { class MessageBus; } namespace quickstep { +class NativeColumnVector; +class InsertDestination; class StorageManager; class WorkOrderProtosContainer; class WorkOrdersContainer; @@ -49,9 +53,89 @@ class WorkOrdersContainer; * @{ */ +class TransitiveClosureOperator : public RelationalOperator { + public: + TransitiveClosureOperator(const std::size_t query_id, + const std::size_t transitive_closure_context_index, + const CatalogRelation &output_relation, + const QueryContext::insert_destination_id output_destination_index) + : RelationalOperator(query_id, 1u), + transitive_closure_context_index_(transitive_closure_context_index), + output_relation_(output_relation), + output_destination_index_(output_destination_index), + started_(false) { + } + + ~TransitiveClosureOperator() override {} + + OperatorType getOperatorType() const override { + return kTransitiveClosure; + } + + std::string getName() const override { + return "TransitiveClosureOperator"; + } + + bool getAllWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) override; + + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + + QueryContext::insert_destination_id getInsertDestinationID() const override { + return output_destination_index_; + } + + const relation_id getOutputRelationID() const override { + return output_relation_.getID(); + } + + private: + const std::size_t transitive_closure_context_index_; + const CatalogRelation &output_relation_; + const QueryContext::insert_destination_id output_destination_index_; + + bool started_; + + DISALLOW_COPY_AND_ASSIGN(TransitiveClosureOperator); +}; + +class TransitiveClosureWorkOrder : public WorkOrder { + public: + TransitiveClosureWorkOrder(const std::size_t query_id, + const Range &interval, + TransitiveClosureState *state, + InsertDestination *output_destination) + : WorkOrder(query_id, 1u), + interval_(interval), + range_(state->range()), + state_(state), + output_destination_(output_destination) {} + + ~TransitiveClosureWorkOrder() override {} + + void execute() override; + + private: + void evaluateSingleSource(const int start, + std::vector<int> *delta, + BitVector<false> *next, + BitVector<false> *result) const; + + void bulkInsert(const ColumnVectorPtr &src_cv, const ColumnVectorPtr &dst_cv); + + const Range interval_; + const int range_; + TransitiveClosureState *state_; + InsertDestination *output_destination_; + + DISALLOW_COPY_AND_ASSIGN(TransitiveClosureWorkOrder); +}; /** @} */ } // namespace quickstep -#endif // QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_TRANSITIVE_CLOSURE_OPERATOR_HPP_ +#endif // QUICKSTEP_RELATIONAL_OPERATORS_TRANSITIVE_CLOSURE_OPERATOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/storage/TransitiveClosureState.hpp ---------------------------------------------------------------------- diff --git a/storage/TransitiveClosureState.hpp b/storage/TransitiveClosureState.hpp index 852972a..7cc06a8 100644 --- a/storage/TransitiveClosureState.hpp +++ b/storage/TransitiveClosureState.hpp @@ -69,6 +69,14 @@ class TransitiveClosureState { edges_[source]->setBit(destination); } + inline bool hasStart(const int value) { + return starts_->getBit(value); + } + + inline const void* getEdgeData(const int source) const { + return edges_[source]->getData(); + } + private: const int range_; std::unique_ptr<BarrieredReadWriteConcurrentBitVector> starts_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/types/containers/ColumnVector.hpp ---------------------------------------------------------------------- diff --git a/types/containers/ColumnVector.hpp b/types/containers/ColumnVector.hpp index 5ef9871..029a409 100644 --- a/types/containers/ColumnVector.hpp +++ b/types/containers/ColumnVector.hpp @@ -172,6 +172,13 @@ class NativeColumnVector : public ColumnVector { return true; } + void clear() { + actual_length_ = 0; + if (null_bitmap_ != nullptr) { + null_bitmap_->clear(); + } + } + /** * @brief Determine if this NativeColumnVector's Type is nullable. * http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/utility/BarrieredReadWriteConcurrentBitVector.hpp ---------------------------------------------------------------------- diff --git a/utility/BarrieredReadWriteConcurrentBitVector.hpp b/utility/BarrieredReadWriteConcurrentBitVector.hpp index 1dcb58e..b52aa9f 100644 --- a/utility/BarrieredReadWriteConcurrentBitVector.hpp +++ b/utility/BarrieredReadWriteConcurrentBitVector.hpp @@ -118,6 +118,13 @@ class BarrieredReadWriteConcurrentBitVector { } /** + * @return The underlying bytes of this bit vector. + **/ + inline const void *getData() const { + return data_array_; + } + + /** * @brief Clear this bit vector, setting all bits to zero. **/ inline void clear() { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/utility/BitVector.hpp ---------------------------------------------------------------------- diff --git a/utility/BitVector.hpp b/utility/BitVector.hpp index 4472407..e245dce 100644 --- a/utility/BitVector.hpp +++ b/utility/BitVector.hpp @@ -25,6 +25,7 @@ #include <cstdlib> #include <cstring> #include <limits> +#include <vector> #include "utility/BitManipulation.hpp" #include "utility/Macros.hpp" @@ -75,7 +76,7 @@ class BitVector { * * @param num_bits The length of the BitVector in bits. **/ - explicit BitVector(const std::size_t num_bits) + explicit BitVector(const std::size_t num_bits, const bool initialize = true) : owned_(true), short_version_(enable_short_version && (num_bits < 33)), // NOTE(chasseur): If 'num_bits' is 0, we put 'this' in 'data_array_' @@ -86,7 +87,9 @@ class BitVector { : this)), num_bits_(num_bits), data_array_size_((num_bits >> kHigherOrderShift) + (num_bits & kLowerOrderMask ? 1 : 0)) { - clear(); + if (initialize) { + clear(); + } } /** @@ -855,6 +858,27 @@ class BitVector { return num_bits_; } + inline void unionWith(const void *other) { + DCHECK(!enable_short_version); + const std::size_t *other_data_array = static_cast<const std::size_t*>(other); + for (std::size_t array_idx = 0; array_idx < data_array_size_; ++array_idx) { + data_array_[array_idx] |= other_data_array[array_idx]; + } + } + + inline void subtractTo(const BitVector &other, std::vector<int> *output) { + DCHECK(!enable_short_version); + for (std::size_t array_idx = 0; array_idx < data_array_size_; ++array_idx) { + const std::size_t base = array_idx << kHigherOrderShift; + std::size_t value = data_array_[array_idx] & ~other.data_array_[array_idx]; + while (value != 0) { + const std::size_t offset = leading_zero_count<std::size_t>(value); + value ^= TopBit<std::size_t>() >> offset; + output->emplace_back(base + offset); + } + } + } + private: // This works as long as the bit-width of size_t is power of 2: static const std::size_t kLowerOrderMask = (sizeof(std::size_t) << 3) - 1;