Parallel work order generation support.

Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ad8e6cf7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ad8e6cf7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ad8e6cf7

Branch: refs/heads/partitioned-aggregation
Commit: ad8e6cf7c596ecc9c2fe9c34e5b45c60a6606d75
Parents: af7465e
Author: Harshad Deshmukh <hbdeshm...@apache.org>
Authored: Fri Aug 19 10:35:09 2016 -0500
Committer: Harshad Deshmukh <hbdeshm...@apache.org>
Committed: Fri Sep 16 13:24:50 2016 -0500

----------------------------------------------------------------------
 .../FinalizeAggregationOperator.cpp             | 35 ++++++++++++++++----
 .../FinalizeAggregationOperator.hpp             | 10 ++++--
 2 files changed, 36 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8e6cf7/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp 
b/relational_operators/FinalizeAggregationOperator.cpp
index 7e337de..55d1357 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -41,12 +41,29 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
 
   if (blocking_dependencies_met_ && !started_) {
     started_ = true;
-    container->addNormalWorkOrder(
-        new FinalizeAggregationWorkOrder(
-            query_id_,
-            query_context->getAggregationState(aggr_state_index_),
-            query_context->getInsertDestination(output_destination_index_)),
-        op_index_);
+    DCHECK(query_context->getAggregationState(aggr_state_index_) != nullptr);
+    if 
(query_context->getAggregationState(aggr_state_index_)->isAggregatePartitioned())
 {
+      // The same AggregationState is shared across all the WorkOrders.
+      for (std::size_t part_id = 0;
+           part_id < query_context->getAggregationState(aggr_state_index_)
+                         ->getNumPartitions();
+           ++part_id) {
+        container->addNormalWorkOrder(
+            new FinalizeAggregationWorkOrder(
+                query_id_,
+                query_context->getAggregationState(aggr_state_index_),
+                query_context->getInsertDestination(output_destination_index_),
+                static_cast<int>(part_id)),
+            op_index_);
+      }
+    } else {
+      container->addNormalWorkOrder(
+          new FinalizeAggregationWorkOrder(
+              query_id_,
+              query_context->getAggregationState(aggr_state_index_),
+              query_context->getInsertDestination(output_destination_index_)),
+          op_index_);
+    }
   }
   return started_;
 }
@@ -70,7 +87,11 @@ bool 
FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
 
 
 void FinalizeAggregationWorkOrder::execute() {
-  state_->finalizeAggregate(output_destination_);
+  if (state_->isAggregatePartitioned()) {
+    state_->finalizeAggregatePartitioned(part_id_, output_destination_);
+  } else {
+    state_->finalizeAggregate(output_destination_);
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8e6cf7/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp 
b/relational_operators/FinalizeAggregationOperator.hpp
index 0aeac2a..7517d58 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -119,13 +119,18 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
    * @param state The AggregationState to use.
    * @param output_destination The InsertDestination to insert aggregation
    *        results.
+   * @param part_id The partition ID for which the Finalize aggregation work
+   *        order is issued. Ignore this field if the aggregation is not
+   *        partitioned.
    */
   FinalizeAggregationWorkOrder(const std::size_t query_id,
                                AggregationOperationState *state,
-                               InsertDestination *output_destination)
+                               InsertDestination *output_destination,
+                               int part_id = -1)
       : WorkOrder(query_id),
         state_(DCHECK_NOTNULL(state)),
-        output_destination_(DCHECK_NOTNULL(output_destination)) {}
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        part_id_(part_id) {}
 
   ~FinalizeAggregationWorkOrder() override {}
 
@@ -134,6 +139,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
  private:
   AggregationOperationState *state_;
   InsertDestination *output_destination_;
+  const int part_id_;
 
   DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
 };

Reply via email to