Updates to transitive closure

Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2aefd7bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2aefd7bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2aefd7bc

Branch: refs/heads/transitive-closure
Commit: 2aefd7bce5bad9bb6063b4fd71ec37876d58662d
Parents: 734ddc1
Author: Jianqiao Zhu <jianq...@cs.wisc.edu>
Authored: Mon Dec 11 14:45:08 2017 -0600
Committer: Jianqiao Zhu <jianq...@cs.wisc.edu>
Committed: Mon Dec 11 16:07:23 2017 -0600

----------------------------------------------------------------------
 query_optimizer/ExecutionGenerator.cpp          |  19 ++-
 query_optimizer/PhysicalGenerator.cpp           |   2 +-
 .../BuildTransitiveClosureOperator.cpp          |   2 -
 relational_operators/CMakeLists.txt             |   5 +
 .../InitializeTransitiveClosureOperator.cpp     |   6 +-
 .../TransitiveClosureOperator.cpp               | 158 +++++++++++++++++++
 .../TransitiveClosureOperator.hpp               |  86 +++++++++-
 storage/TransitiveClosureState.hpp              |   8 +
 types/containers/ColumnVector.hpp               |   7 +
 .../BarrieredReadWriteConcurrentBitVector.hpp   |   7 +
 utility/BitVector.hpp                           |  28 +++-
 11 files changed, 314 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp 
b/query_optimizer/ExecutionGenerator.cpp
index 8f29271..648b937 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -2488,18 +2488,23 @@ void ExecutionGenerator::convertTransitiveClosure(
                                  &output_relation,
                                  insert_destination_proto);
 
+  const QueryPlan::DAGNodeIndex tc_operator_index =
+      execution_plan_->addRelationalOperator(
+          new TransitiveClosureOperator(query_handle_->query_id(),
+                                        transitive_closure_state_index,
+                                        *output_relation,
+                                        insert_destination_index));
+  insert_destination_proto->set_relational_op_index(tc_operator_index);
 
-  (void)insert_destination_index;
+  execution_plan_->addDirectDependency(tc_operator_index,
+                                       build_tc_operator_index,
+                                       true /* is_pipeline_breaker */);
 
-  // TODO: fix
-  insert_destination_proto->set_relational_op_index(build_tc_operator_index /* 
FIX */);
   physical_to_output_relation_map_.emplace(
       std::piecewise_construct,
       std::forward_as_tuple(physical_plan),
-      std::forward_as_tuple(build_tc_operator_index /* FIX */, 
output_relation));
-
-  temporary_relation_info_vec_.emplace_back(build_tc_operator_index /* FIX */,
-                                            output_relation);
+      std::forward_as_tuple(tc_operator_index, output_relation));
+  temporary_relation_info_vec_.emplace_back(tc_operator_index, 
output_relation);
 }
 
 }  // namespace optimizer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp 
b/query_optimizer/PhysicalGenerator.cpp
index b7b0db0..865cd11 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -194,7 +194,7 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
              << physical_plan_->toString();
   }
 
-  std::cerr << "Optimized physical plan:\n" << physical_plan_->toString();
+  DVLOG(4) << "Optimized physical plan:\n" << physical_plan_->toString();
 
   if (FLAGS_visualize_plan) {
     quickstep::PlanVisualizer plan_visualizer;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/relational_operators/BuildTransitiveClosureOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildTransitiveClosureOperator.cpp 
b/relational_operators/BuildTransitiveClosureOperator.cpp
index e151756..919a974 100644
--- a/relational_operators/BuildTransitiveClosureOperator.cpp
+++ b/relational_operators/BuildTransitiveClosureOperator.cpp
@@ -107,7 +107,6 @@ void BuildTransitiveClosureWorkOrder::execute() {
 }
 
 void BuildTransitiveClosureWorkOrder::buildStartRelation(ValueAccessor 
*accessor) {
-  std::cout << "BuildStartRelation: " << block_ << "\n";
   InvokeOnAnyValueAccessor(
       accessor,
       [&](auto *accessor) -> void {
@@ -119,7 +118,6 @@ void 
BuildTransitiveClosureWorkOrder::buildStartRelation(ValueAccessor *accessor
 }
 
 void BuildTransitiveClosureWorkOrder::buildEdgeRelation(ValueAccessor 
*accessor) {
-  std::cout << "BuildEdgeRelation: " << block_ << "\n";
   InvokeOnAnyValueAccessor(
       accessor,
       [&](auto *accessor) -> void {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt 
b/relational_operators/CMakeLists.txt
index 6cc7f08..e85eb4e 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -585,11 +585,16 @@ endif(QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
 target_link_libraries(quickstep_relationaloperators_TransitiveClosureOperator
                       glog
                       quickstep_catalog_CatalogRelation
+                      quickstep_cli_Flags
                       quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_WorkOrderProtosContainer
+                      quickstep_queryexecution_WorkOrdersContainer
                       quickstep_relationaloperators_RelationalOperator
                       quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrder_proto
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_TransitiveClosureState
+                      quickstep_utility_BitVector
                       quickstep_utility_Macros
                       quickstep_utility_Range
                       tmb)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/relational_operators/InitializeTransitiveClosureOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeTransitiveClosureOperator.cpp 
b/relational_operators/InitializeTransitiveClosureOperator.cpp
index a6ffe6f..ff21cf9 100644
--- a/relational_operators/InitializeTransitiveClosureOperator.cpp
+++ b/relational_operators/InitializeTransitiveClosureOperator.cpp
@@ -45,10 +45,15 @@ bool InitializeTransitiveClosureOperator::getAllWorkOrders(
   if (started_) {
     return true;
   }
+  started_ = true;
 
   TransitiveClosureState *state =
       
query_context->getTransitiveClosureState(transitive_closure_context_index_);
 
+  if (state->range() == 0) {
+    return true;
+  }
+
   constexpr std::size_t kMinBatchSize = 1024ul * 1024ul * 4ul;
   const std::size_t range = state->range();
   const std::size_t num_batches =
@@ -66,7 +71,6 @@ bool InitializeTransitiveClosureOperator::getAllWorkOrders(
         op_index_);
   }
 
-  started_ = true;
   return true;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/relational_operators/TransitiveClosureOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TransitiveClosureOperator.cpp 
b/relational_operators/TransitiveClosureOperator.cpp
index e69de29..2d2776a 100644
--- a/relational_operators/TransitiveClosureOperator.cpp
+++ b/relational_operators/TransitiveClosureOperator.cpp
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/TransitiveClosureOperator.hpp"
+
+#include <algorithm>
+#include <cstddef>
+
+#include "cli/Flags.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/TransitiveClosureState.hpp"
+#include "types/IntType.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/BitVector.hpp"
+#include "utility/Range.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool TransitiveClosureOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  if (started_) {
+    return true;
+  }
+  started_ = true;
+
+  TransitiveClosureState *state =
+      
query_context->getTransitiveClosureState(transitive_closure_context_index_);
+
+  if (state->range() == 0) {
+    return true;
+  }
+
+  InsertDestination *output_destination =
+      query_context->getInsertDestination(output_destination_index_);
+
+  const std::size_t num_batches = std::min(state->range(), FLAGS_num_workers * 
2);
+  const RangeSplitter splitter =
+      RangeSplitter::CreateWithNumPartitions(0, state->range(), num_batches);
+
+  for (std::size_t i = 0; i < splitter.getNumPartitions(); ++i) {
+    container->addNormalWorkOrder(
+        new TransitiveClosureWorkOrder(query_id_,
+                                       splitter.getPartition(i),
+                                       state,
+                                       output_destination),
+        op_index_);
+  }
+
+  return true;
+}
+
+bool TransitiveClosureOperator::getAllWorkOrderProtos(
+    WorkOrderProtosContainer *container)  {
+  LOG(FATAL) << "Not supported";
+}
+
+void TransitiveClosureWorkOrder::execute() {
+  std::vector<int> delta;
+  delta.reserve(range_);
+  BitVector<false> next(range_, false);
+  BitVector<false> result(range_, false);
+
+  const int kBulkInsertBatchSize = std::max(0x10000, state_->range());
+
+  std::shared_ptr<NativeColumnVector> src_cv =
+      std::make_shared<NativeColumnVector>(IntType::InstanceNonNullable(),
+                                           kBulkInsertBatchSize);
+  std::shared_ptr<NativeColumnVector> dst_cv =
+      std::make_shared<NativeColumnVector>(IntType::InstanceNonNullable(),
+                                           kBulkInsertBatchSize);
+
+  int total = 0;
+  for (int src = interval_.begin(); src < interval_.end(); ++src) {
+    if (state_->hasStart(src)) {
+      // Evaluate single source transitive closure.
+      evaluateSingleSource(src, &delta, &next, &result);
+
+      const int num_values = result.onesCount();
+      if (total + num_values > kBulkInsertBatchSize) {
+        bulkInsert(src_cv, dst_cv);
+        src_cv->clear();
+        dst_cv->clear();
+        total = 0;
+      }
+
+      std::size_t dst = -1;
+      for (int i = 0; i < num_values; ++i) {
+        *static_cast<int*>(src_cv->getPtrForDirectWrite()) = src;
+        dst = result.firstOne(dst + 1);
+        *static_cast<int*>(dst_cv->getPtrForDirectWrite()) = dst;
+      }
+
+      total += num_values;
+    }
+  }
+  if (total > 0) {
+    bulkInsert(src_cv, dst_cv);
+  }
+}
+
+void TransitiveClosureWorkOrder::evaluateSingleSource(
+    const int start,
+    std::vector<int> *delta,
+    BitVector<false> *next,
+    BitVector<false> *result) const {
+  delta->clear();
+  delta->emplace_back(start);
+  result->clear();
+
+  while (!delta->empty()) {
+    next->clear();
+    for (const int source : *delta) {
+      next->unionWith(state_->getEdgeData(source));
+    }
+    delta->clear();
+    next->subtractTo(*result, delta);
+    result->unionWith(*next);
+  }
+}
+
+void TransitiveClosureWorkOrder::bulkInsert(const ColumnVectorPtr &src_cv,
+                                            const ColumnVectorPtr &dst_cv) {
+  ColumnVectorsValueAccessor columns;
+  columns.addColumn(src_cv);
+  columns.addColumn(dst_cv);
+  output_destination_->bulkInsertTuples(&columns);
+}
+
+}  // namespace quickstep
+

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/relational_operators/TransitiveClosureOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TransitiveClosureOperator.hpp 
b/relational_operators/TransitiveClosureOperator.hpp
index f983a4a..d4ac13c 100644
--- a/relational_operators/TransitiveClosureOperator.hpp
+++ b/relational_operators/TransitiveClosureOperator.hpp
@@ -30,6 +30,8 @@
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/TransitiveClosureState.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "utility/BitVector.hpp"
 #include "utility/Macros.hpp"
 #include "utility/Range.hpp"
 
@@ -41,6 +43,8 @@ namespace tmb { class MessageBus; }
 
 namespace quickstep {
 
+class NativeColumnVector;
+class InsertDestination;
 class StorageManager;
 class WorkOrderProtosContainer;
 class WorkOrdersContainer;
@@ -49,9 +53,89 @@ class WorkOrdersContainer;
  *  @{
  */
 
+class TransitiveClosureOperator : public RelationalOperator {
+ public:
+  TransitiveClosureOperator(const std::size_t query_id,
+                            const std::size_t transitive_closure_context_index,
+                            const CatalogRelation &output_relation,
+                            const QueryContext::insert_destination_id 
output_destination_index)
+      : RelationalOperator(query_id, 1u),
+        transitive_closure_context_index_(transitive_closure_context_index),
+        output_relation_(output_relation),
+        output_destination_index_(output_destination_index),
+        started_(false) {
+  }
+
+  ~TransitiveClosureOperator() override {}
+
+  OperatorType getOperatorType() const override {
+    return kTransitiveClosure;
+  }
+
+  std::string getName() const override {
+    return "TransitiveClosureOperator";
+  }
+
+  bool getAllWorkOrders(WorkOrdersContainer *container,
+                        QueryContext *query_context,
+                        StorageManager *storage_manager,
+                        const tmb::client_id scheduler_client_id,
+                        tmb::MessageBus *bus) override;
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+  QueryContext::insert_destination_id getInsertDestinationID() const override {
+    return output_destination_index_;
+  }
+
+  const relation_id getOutputRelationID() const override {
+    return output_relation_.getID();
+  }
+
+ private:
+  const std::size_t transitive_closure_context_index_;
+  const CatalogRelation &output_relation_;
+  const QueryContext::insert_destination_id output_destination_index_;
+
+  bool started_;
+
+  DISALLOW_COPY_AND_ASSIGN(TransitiveClosureOperator);
+};
+
+class TransitiveClosureWorkOrder : public WorkOrder {
+ public:
+  TransitiveClosureWorkOrder(const std::size_t query_id,
+                             const Range &interval,
+                             TransitiveClosureState *state,
+                             InsertDestination *output_destination)
+      : WorkOrder(query_id, 1u),
+        interval_(interval),
+        range_(state->range()),
+        state_(state),
+        output_destination_(output_destination) {}
+
+  ~TransitiveClosureWorkOrder() override {}
+
+  void execute() override;
+
+ private:
+  void evaluateSingleSource(const int start,
+                            std::vector<int> *delta,
+                            BitVector<false> *next,
+                            BitVector<false> *result) const;
+
+  void bulkInsert(const ColumnVectorPtr &src_cv, const ColumnVectorPtr 
&dst_cv);
+
+  const Range interval_;
+  const int range_;
+  TransitiveClosureState *state_;
+  InsertDestination *output_destination_;
+
+  DISALLOW_COPY_AND_ASSIGN(TransitiveClosureWorkOrder);
+};
 
 /** @} */
 
 }  // namespace quickstep
 
-#endif  // 
QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_TRANSITIVE_CLOSURE_OPERATOR_HPP_
+#endif  // QUICKSTEP_RELATIONAL_OPERATORS_TRANSITIVE_CLOSURE_OPERATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/storage/TransitiveClosureState.hpp
----------------------------------------------------------------------
diff --git a/storage/TransitiveClosureState.hpp 
b/storage/TransitiveClosureState.hpp
index 852972a..7cc06a8 100644
--- a/storage/TransitiveClosureState.hpp
+++ b/storage/TransitiveClosureState.hpp
@@ -69,6 +69,14 @@ class TransitiveClosureState {
     edges_[source]->setBit(destination);
   }
 
+  inline bool hasStart(const int value) {
+    return starts_->getBit(value);
+  }
+
+  inline const void* getEdgeData(const int source) const {
+    return edges_[source]->getData();
+  }
+
  private:
   const int range_;
   std::unique_ptr<BarrieredReadWriteConcurrentBitVector> starts_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/types/containers/ColumnVector.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVector.hpp 
b/types/containers/ColumnVector.hpp
index 5ef9871..029a409 100644
--- a/types/containers/ColumnVector.hpp
+++ b/types/containers/ColumnVector.hpp
@@ -172,6 +172,13 @@ class NativeColumnVector : public ColumnVector {
     return true;
   }
 
+  void clear() {
+    actual_length_ = 0;
+    if (null_bitmap_ != nullptr) {
+      null_bitmap_->clear();
+    }
+  }
+
   /**
    * @brief Determine if this NativeColumnVector's Type is nullable.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/utility/BarrieredReadWriteConcurrentBitVector.hpp
----------------------------------------------------------------------
diff --git a/utility/BarrieredReadWriteConcurrentBitVector.hpp 
b/utility/BarrieredReadWriteConcurrentBitVector.hpp
index 1dcb58e..b52aa9f 100644
--- a/utility/BarrieredReadWriteConcurrentBitVector.hpp
+++ b/utility/BarrieredReadWriteConcurrentBitVector.hpp
@@ -118,6 +118,13 @@ class BarrieredReadWriteConcurrentBitVector {
   }
 
   /**
+   * @return The underlying bytes of this bit vector.
+   **/
+  inline const void *getData() const {
+    return data_array_;
+  }
+
+  /**
    * @brief Clear this bit vector, setting all bits to zero.
    **/
   inline void clear() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2aefd7bc/utility/BitVector.hpp
----------------------------------------------------------------------
diff --git a/utility/BitVector.hpp b/utility/BitVector.hpp
index 4472407..e245dce 100644
--- a/utility/BitVector.hpp
+++ b/utility/BitVector.hpp
@@ -25,6 +25,7 @@
 #include <cstdlib>
 #include <cstring>
 #include <limits>
+#include <vector>
 
 #include "utility/BitManipulation.hpp"
 #include "utility/Macros.hpp"
@@ -75,7 +76,7 @@ class BitVector {
    *
    * @param num_bits The length of the BitVector in bits.
    **/
-  explicit BitVector(const std::size_t num_bits)
+  explicit BitVector(const std::size_t num_bits, const bool initialize = true)
       : owned_(true),
         short_version_(enable_short_version && (num_bits < 33)),
         // NOTE(chasseur): If 'num_bits' is 0, we put 'this' in 'data_array_'
@@ -86,7 +87,9 @@ class BitVector {
                                                             : this)),
         num_bits_(num_bits),
         data_array_size_((num_bits >> kHigherOrderShift) + (num_bits & 
kLowerOrderMask ? 1 : 0)) {
-    clear();
+    if (initialize) {
+      clear();
+    }
   }
 
   /**
@@ -855,6 +858,27 @@ class BitVector {
     return num_bits_;
   }
 
+  inline void unionWith(const void *other) {
+    DCHECK(!enable_short_version);
+    const std::size_t *other_data_array = static_cast<const 
std::size_t*>(other);
+    for (std::size_t array_idx = 0; array_idx < data_array_size_; ++array_idx) 
{
+      data_array_[array_idx] |= other_data_array[array_idx];
+    }
+  }
+
+  inline void subtractTo(const BitVector &other, std::vector<int> *output) {
+    DCHECK(!enable_short_version);
+    for (std::size_t array_idx = 0; array_idx < data_array_size_; ++array_idx) 
{
+      const std::size_t base = array_idx << kHigherOrderShift;
+      std::size_t value = data_array_[array_idx] & 
~other.data_array_[array_idx];
+      while (value != 0) {
+        const std::size_t offset = leading_zero_count<std::size_t>(value);
+        value ^= TopBit<std::size_t>() >> offset;
+        output->emplace_back(base + offset);
+      }
+    }
+  }
+
  private:
   // This works as long as the bit-width of size_t is power of 2:
   static const std::size_t kLowerOrderMask = (sizeof(std::size_t) << 3) - 1;

Reply via email to