Repository: incubator-quickstep
Updated Branches:
  refs/heads/collision-free-agg 6986b2a3f -> 8a694213f


Updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8a694213
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8a694213
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8a694213

Branch: refs/heads/collision-free-agg
Commit: 8a694213fd16082a886378b70effba45a2fc85cd
Parents: 6986b2a
Author: Jianqiao Zhu <jianq...@cs.wisc.edu>
Authored: Fri Feb 3 14:45:14 2017 -0600
Committer: Jianqiao Zhu <jianq...@cs.wisc.edu>
Committed: Fri Feb 3 14:45:14 2017 -0600

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt                  |   2 +-
 query_optimizer/ExecutionGenerator.cpp          |   8 +-
 relational_operators/CMakeLists.txt             |  10 +-
 .../FinalizeAggregationOperator.cpp             |   8 +-
 .../InitializeAggregationOperator.cpp           |  78 ++++++++++++
 .../InitializeAggregationOperator.hpp           | 122 +++++++++++++++++++
 .../InitializeAggregationStateOperator.cpp      |  68 -----------
 .../InitializeAggregationStateOperator.hpp      | 103 ----------------
 relational_operators/WorkOrder.proto            |  10 +-
 relational_operators/WorkOrderFactory.cpp       |  13 ++
 10 files changed, 236 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 7f75264..a755832 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -126,7 +126,7 @@ 
target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_relationaloperators_DropTableOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator
                       quickstep_relationaloperators_HashJoinOperator
-                      
quickstep_relationaloperators_InitializeAggregationStateOperator
+                      
quickstep_relationaloperators_InitializeAggregationOperator
                       quickstep_relationaloperators_InsertOperator
                       quickstep_relationaloperators_NestedLoopsJoinOperator
                       quickstep_relationaloperators_RelationalOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp 
b/query_optimizer/ExecutionGenerator.cpp
index b2ce27b..04fb105 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -106,7 +106,7 @@
 #include "relational_operators/DropTableOperator.hpp"
 #include "relational_operators/FinalizeAggregationOperator.hpp"
 #include "relational_operators/HashJoinOperator.hpp"
-#include "relational_operators/InitializeAggregationStateOperator.hpp"
+#include "relational_operators/InitializeAggregationOperator.hpp"
 #include "relational_operators/InsertOperator.hpp"
 #include "relational_operators/NestedLoopsJoinOperator.hpp"
 #include "relational_operators/RelationalOperator.hpp"
@@ -1670,14 +1670,14 @@ void ExecutionGenerator::convertAggregate(
   }
 
   if (use_parallel_initialization) {
-    const QueryPlan::DAGNodeIndex initialize_aggregation_state_operator_index =
+    const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
         execution_plan_->addRelationalOperator(
-            new InitializeAggregationStateOperator(
+            new InitializeAggregationOperator(
                 query_handle_->query_id(),
                 aggr_state_index));
 
     execution_plan_->addDirectDependency(aggregation_operator_index,
-                                         
initialize_aggregation_state_operator_index,
+                                         initialize_aggregation_operator_index,
                                          true /* is_pipeline_breaker */);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt 
b/relational_operators/CMakeLists.txt
index bd20059..df4114d 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -47,9 +47,9 @@ 
add_library(quickstep_relationaloperators_FinalizeAggregationOperator
             FinalizeAggregationOperator.cpp
             FinalizeAggregationOperator.hpp)
 add_library(quickstep_relationaloperators_HashJoinOperator 
HashJoinOperator.cpp HashJoinOperator.hpp)
-add_library(quickstep_relationaloperators_InitializeAggregationStateOperator
-            InitializeAggregationStateOperator.cpp
-            InitializeAggregationStateOperator.hpp)
+add_library(quickstep_relationaloperators_InitializeAggregationOperator
+            InitializeAggregationOperator.cpp
+            InitializeAggregationOperator.hpp)
 add_library(quickstep_relationaloperators_InsertOperator InsertOperator.cpp 
InsertOperator.hpp)
 add_library(quickstep_relationaloperators_NestedLoopsJoinOperator
             NestedLoopsJoinOperator.cpp
@@ -257,7 +257,7 @@ 
target_link_libraries(quickstep_relationaloperators_HashJoinOperator
                       quickstep_utility_lipfilter_LIPFilterAdaptiveProber
                       quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
-target_link_libraries(quickstep_relationaloperators_InitializeAggregationStateOperator
+target_link_libraries(quickstep_relationaloperators_InitializeAggregationOperator
                       glog
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_WorkOrderProtosContainer
@@ -562,7 +562,7 @@ target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_DropTableOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator
                       quickstep_relationaloperators_HashJoinOperator
-                      
quickstep_relationaloperators_InitializeAggregationStateOperator
+                      
quickstep_relationaloperators_InitializeAggregationOperator
                       quickstep_relationaloperators_InsertOperator
                       quickstep_relationaloperators_NestedLoopsJoinOperator
                       quickstep_relationaloperators_RebuildWorkOrder

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp 
b/relational_operators/FinalizeAggregationOperator.cpp
index b66030b..72beb60 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -44,13 +44,13 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
     AggregationOperationState *agg_state =
         query_context->getAggregationState(aggr_state_index_);
     DCHECK(agg_state != nullptr);
-    for (std::size_t partition_id = 0;
-         partition_id < agg_state->getNumPartitions();
-         ++partition_id) {
+    for (std::size_t part_id = 0;
+         part_id < agg_state->getNumPartitions();
+         ++part_id) {
       container->addNormalWorkOrder(
           new FinalizeAggregationWorkOrder(
               query_id_,
-              partition_id,
+              part_id,
               agg_state,
               query_context->getInsertDestination(output_destination_index_)),
           op_index_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/InitializeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.cpp 
b/relational_operators/InitializeAggregationOperator.cpp
new file mode 100644
index 0000000..3da719d
--- /dev/null
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/InitializeAggregationOperator.hpp"
+
+#include <cstddef>
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/AggregationOperationState.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool InitializeAggregationOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  if (!started_) {
+    AggregationOperationState *agg_state =
+        query_context->getAggregationState(aggr_state_index_);
+    DCHECK(agg_state != nullptr);
+
+    for (std::size_t part_id = 0;
+         part_id < agg_state->getNumInitializationPartitions();
+         ++part_id) {
+      container->addNormalWorkOrder(
+          new InitializeAggregationWorkOrder(query_id_,
+                                             part_id,
+                                             agg_state),
+          op_index_);
+    }
+    started_ = true;
+  }
+  return true;
+}
+
+bool 
InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer 
*container) {
+  if (!started_) {
+    started_ = true;
+
+    serialization::WorkOrder *proto = new serialization::WorkOrder;
+    proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION);
+    proto->set_query_id(query_id_);
+    
proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index,
+                        aggr_state_index_);
+
+    container->addWorkOrderProto(proto, op_index_);
+  }
+  return started_;
+}
+
+void InitializeAggregationWorkOrder::execute() {
+  state_->initializeState(partition_id_);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/InitializeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.hpp 
b/relational_operators/InitializeAggregationOperator.hpp
new file mode 100644
index 0000000..4ca3bd5
--- /dev/null
+++ b/relational_operators/InitializeAggregationOperator.hpp
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_
+
+#include <string>
+
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class AggregationOperationState;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ *  @{
+ */
+
+/**
+ * @brief An operator which initializes an AggregationOperationState.
+ **/
+class InitializeAggregationOperator : public RelationalOperator {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param query_id The ID of this query.
+   * @param aggr_state_index The index of the AggregationOperationState in 
QueryContext.
+   **/
+  InitializeAggregationOperator(const std::size_t query_id,
+                                const QueryContext::aggregation_state_id 
aggr_state_index)
+      : RelationalOperator(query_id),
+        aggr_state_index_(aggr_state_index),
+        started_(false) {}
+
+  ~InitializeAggregationOperator() override {}
+
+  std::string getName() const override {
+    return "InitializeAggregationStateOperator";
+  }
+
+  bool getAllWorkOrders(WorkOrdersContainer *container,
+                        QueryContext *query_context,
+                        StorageManager *storage_manager,
+                        const tmb::client_id scheduler_client_id,
+                        tmb::MessageBus *bus) override;
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+ private:
+  const QueryContext::aggregation_state_id aggr_state_index_;
+  bool started_;
+
+  DISALLOW_COPY_AND_ASSIGN(InitializeAggregationOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by InitializeAggregationOperator.
+ **/
+class InitializeAggregationWorkOrder : public WorkOrder {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param query_id The ID of the query to which this operator belongs.
+   * @param partition_id The partition ID for which the work order is issued.
+   * @param state The AggregationOperationState to be initialized.
+   */
+  InitializeAggregationWorkOrder(const std::size_t query_id,
+                                 const std::size_t partition_id,
+                                 AggregationOperationState *state)
+      : WorkOrder(query_id),
+        partition_id_(partition_id),
+        state_(DCHECK_NOTNULL(state)) {}
+
+  ~InitializeAggregationWorkOrder() override {}
+
+  void execute() override;
+
+ private:
+  const std::size_t partition_id_;
+
+  AggregationOperationState *state_;
+
+  DISALLOW_COPY_AND_ASSIGN(InitializeAggregationWorkOrder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/InitializeAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationStateOperator.cpp 
b/relational_operators/InitializeAggregationStateOperator.cpp
deleted file mode 100644
index b041aef..0000000
--- a/relational_operators/InitializeAggregationStateOperator.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "relational_operators/InitializeAggregationStateOperator.hpp"
-
-#include <vector>
-
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/WorkOrderProtosContainer.hpp"
-#include "query_execution/WorkOrdersContainer.hpp"
-#include "relational_operators/WorkOrder.pb.h"
-#include "storage/AggregationOperationState.hpp"
-
-#include "tmb/id_typedefs.h"
-
-namespace quickstep {
-
-bool InitializeAggregationStateOperator::getAllWorkOrders(
-    WorkOrdersContainer *container,
-    QueryContext *query_context,
-    StorageManager *storage_manager,
-    const tmb::client_id scheduler_client_id,
-    tmb::MessageBus *bus) {
-  if (!started_) {
-    AggregationOperationState *agg_state =
-        query_context->getAggregationState(aggr_state_index_);
-    DCHECK(agg_state != nullptr);
-
-    for (std::size_t part_id = 0;
-         part_id < agg_state->getNumInitializationPartitions();
-         ++part_id) {
-      container->addNormalWorkOrder(
-          new InitializeAggregationStateWorkOrder(query_id_,
-                                                  part_id,
-                                                  agg_state),
-          op_index_);
-    }
-    started_ = true;
-  }
-  return true;
-}
-
-bool 
InitializeAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
 *container) {
-  // TODO
-  LOG(FATAL) << "Not implemented";
-}
-
-void InitializeAggregationStateWorkOrder::execute() {
-  state_->initializeState(partition_id_);
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/InitializeAggregationStateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationStateOperator.hpp 
b/relational_operators/InitializeAggregationStateOperator.hpp
deleted file mode 100644
index 10403b3..0000000
--- a/relational_operators/InitializeAggregationStateOperator.hpp
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef 
QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_
-#define 
QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_
-
-#include <string>
-
-#include "query_execution/QueryContext.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-
-class AggregationOperationState;
-class StorageManager;
-class WorkOrderProtosContainer;
-class WorkOrdersContainer;
-
-namespace serialization { class WorkOrder; }
-
-/** \addtogroup RelationalOperators
- *  @{
- */
-
-class InitializeAggregationStateOperator : public RelationalOperator {
- public:
-  InitializeAggregationStateOperator(const std::size_t query_id,
-                                     const QueryContext::aggregation_state_id 
aggr_state_index)
-      : RelationalOperator(query_id),
-        aggr_state_index_(aggr_state_index),
-        started_(false) {}
-
-  ~InitializeAggregationStateOperator() override {}
-
-  std::string getName() const override {
-    return "InitializeAggregationStateOperator";
-  }
-
-  bool getAllWorkOrders(WorkOrdersContainer *container,
-                        QueryContext *query_context,
-                        StorageManager *storage_manager,
-                        const tmb::client_id scheduler_client_id,
-                        tmb::MessageBus *bus) override;
-
-  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
-
- private:
-  const QueryContext::aggregation_state_id aggr_state_index_;
-  bool started_;
-
-  DISALLOW_COPY_AND_ASSIGN(InitializeAggregationStateOperator);
-};
-
-class InitializeAggregationStateWorkOrder : public WorkOrder {
- public:
-  InitializeAggregationStateWorkOrder(const std::size_t query_id,
-                                      const std::size_t partition_id,
-                                      AggregationOperationState *state)
-      : WorkOrder(query_id),
-        partition_id_(partition_id),
-        state_(DCHECK_NOTNULL(state)) {}
-
-  ~InitializeAggregationStateWorkOrder() override {}
-
-  void execute() override;
-
- private:
-  const std::size_t partition_id_;
-
-  AggregationOperationState *state_;
-
-  DISALLOW_COPY_AND_ASSIGN(InitializeAggregationStateWorkOrder);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // 
QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto 
b/relational_operators/WorkOrder.proto
index 76753d2..83bb121 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -44,6 +44,7 @@ enum WorkOrderType {
   UPDATE = 20;
   WINDOW_AGGREGATION = 21;
   DESTROY_AGGREGATION_STATE = 22;
+  INITIALIZE_AGGREGATION = 23;
 }
 
 message WorkOrder {
@@ -278,6 +279,13 @@ message WindowAggregationWorkOrder {
 
 message DestroyAggregationStateWorkOrder {
   extend WorkOrder {
-    optional uint32 aggr_state_index = 339;
+    optional uint32 aggr_state_index = 352;
+  }
+}
+
+message InitializeAggregationWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional uint32 aggr_state_index = 368;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp 
b/relational_operators/WorkOrderFactory.cpp
index 5e8d03d..99bca7b 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -37,6 +37,7 @@
 #include "relational_operators/DropTableOperator.hpp"
 #include "relational_operators/FinalizeAggregationOperator.hpp"
 #include "relational_operators/HashJoinOperator.hpp"
+#include "relational_operators/InitializeAggregationOperator.hpp"
 #include "relational_operators/InsertOperator.hpp"
 #include "relational_operators/NestedLoopsJoinOperator.hpp"
 #include "relational_operators/SampleOperator.hpp"
@@ -319,6 +320,13 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const 
serialization::WorkOrder
           LOG(FATAL) << "Unknown HashJoinWorkOrder Type in 
WorkOrderFactory::ReconstructFromProto";
       }
     }
+    case serialization::INITIALIZE_AGGREGATION: {
+      LOG(INFO) << "Creating InitializeAggregationWorkOrder in Shiftboss " << 
shiftboss_index;
+      return new InitializeAggregationWorkOrder(
+          proto.query_id(),
+          query_context->getAggregationState(proto.GetExtension(
+              
serialization::InitializeAggregationWorkOrder::aggr_state_index)));
+    }
     case serialization::INSERT: {
       LOG(INFO) << "Creating InsertWorkOrder in Shiftboss " << shiftboss_index;
       return new InsertWorkOrder(
@@ -693,6 +701,11 @@ bool WorkOrderFactory::ProtoIsValid(const 
serialization::WorkOrder &proto,
                  
proto.GetExtension(serialization::HashJoinWorkOrder::selection_index)) &&
              proto.HasExtension(serialization::HashJoinWorkOrder::block_id);
     }
+    case serialization::INITIALIZE_AGGREGATION: {
+      return 
proto.HasExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index)
 &&
+             query_context.isValidAggregationStateId(
+                 
proto.GetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index));
+    }
     case serialization::INSERT: {
       return 
proto.HasExtension(serialization::InsertWorkOrder::insert_destination_index) &&
              query_context.isValidInsertDestinationId(

Reply via email to