http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp 
b/query_execution/PolicyEnforcerBase.cpp
deleted file mode 100644
index b799d5f..0000000
--- a/query_execution/PolicyEnforcerBase.cpp
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/PolicyEnforcerBase.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <queue>
-#include <unordered_map>
-#include <vector>
-
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/PartitionScheme.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlockInfo.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-
-namespace quickstep {
-
-DEFINE_bool(profile_and_report_workorder_perf, false,
-    "If true, Quickstep will record the exceution time of all the individual "
-    "normal work orders and report it at the end of query execution.");
-
-DEFINE_bool(visualize_execution_dag, false,
-            "If true, visualize the execution plan DAG into a graph in DOT "
-            "format (DOT is a plain text graph description language) which is "
-            "then printed via stderr.");
-
-PolicyEnforcerBase::PolicyEnforcerBase(CatalogDatabaseLite *catalog_database)
-    : catalog_database_(catalog_database),
-      profile_individual_workorders_(FLAGS_profile_and_report_workorder_perf 
|| FLAGS_visualize_execution_dag) {
-}
-
-void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
-  std::size_t query_id;
-  QueryManagerBase::dag_node_index op_index;
-
-  switch (tagged_message.message_type()) {
-    case kWorkOrderCompleteMessage: {
-      serialization::WorkOrderCompletionMessage proto;
-      // Note: This proto message contains the time it took to execute the
-      // WorkOrder. It can be accessed in this scope.
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      decrementNumQueuedWorkOrders(proto);
-
-      if (profile_individual_workorders_) {
-        recordTimeForWorkOrder(proto);
-      }
-
-      query_id = proto.query_id();
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = proto.operator_index();
-      admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
-      break;
-    }
-    case kRebuildWorkOrderCompleteMessage: {
-      serialization::WorkOrderCompletionMessage proto;
-      // Note: This proto message contains the time it took to execute the
-      // rebuild WorkOrder. It can be accessed in this scope.
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      decrementNumQueuedWorkOrders(proto);
-
-      query_id = proto.query_id();
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = proto.operator_index();
-      
admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index);
-      break;
-    }
-    case kCatalogRelationNewBlockMessage: {
-      serialization::CatalogRelationNewBlockMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-
-      const block_id block = proto.block_id();
-
-      CatalogRelation *relation =
-          
static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
-      relation->addBlock(block);
-
-      if (proto.has_partition_id()) {
-        relation->getPartitionSchemeMutable()->addBlockToPartition(
-            proto.partition_id(), block);
-      }
-      return;
-    }
-    case kDataPipelineMessage: {
-      serialization::DataPipelineMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      query_id = proto.query_id();
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = proto.operator_index();
-      admitted_queries_[query_id]->processDataPipelineMessage(
-          op_index, proto.block_id(), proto.relation_id());
-      break;
-    }
-    case kWorkOrderFeedbackMessage: {
-      WorkOrder::FeedbackMessage msg(
-          const_cast<void *>(tagged_message.message()),
-          tagged_message.message_bytes());
-      query_id = msg.header().query_id;
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = msg.header().rel_op_index;
-      admitted_queries_[query_id]->processFeedbackMessage(op_index, msg);
-      break;
-    }
-    default:
-      LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
-  }
-  if (admitted_queries_[query_id]->queryStatus(op_index) ==
-          QueryManagerBase::QueryStatusCode::kQueryExecuted) {
-    onQueryCompletion(admitted_queries_[query_id].get());
-
-    removeQuery(query_id);
-    if (!waiting_queries_.empty()) {
-      // Admit the earliest waiting query.
-      QueryHandle *new_query = waiting_queries_.front();
-      waiting_queries_.pop();
-      admitQuery(new_query);
-    }
-  }
-}
-
-void PolicyEnforcerBase::removeQuery(const std::size_t query_id) {
-  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-  if 
(!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished())
 {
-    LOG(WARNING) << "Removing query with ID " << query_id
-                 << " that hasn't finished its execution";
-  }
-  admitted_queries_.erase(query_id);
-
-  removed_query_ids_.insert(query_id);
-}
-
-bool PolicyEnforcerBase::admitQueries(
-    const std::vector<QueryHandle*> &query_handles) {
-  for (QueryHandle *curr_query : query_handles) {
-    if (!admitQuery(curr_query)) {
-      return false;
-    }
-  }
-  return true;
-}
-
-void PolicyEnforcerBase::recordTimeForWorkOrder(
-    const serialization::WorkOrderCompletionMessage &proto) {
-  const std::size_t query_id = proto.query_id();
-  std::vector<WorkOrderTimeEntry> &workorder_time_entries
-      = workorder_time_recorder_[query_id];
-  workorder_time_entries.emplace_back();
-  WorkOrderTimeEntry &entry = workorder_time_entries.back();
-  entry.worker_id = proto.worker_thread_index(),
-  entry.operator_id = proto.operator_index(),
-  entry.start_time = proto.execution_start_time(),
-  entry.end_time = proto.execution_end_time();
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp 
b/query_execution/PolicyEnforcerBase.hpp
deleted file mode 100644
index baf9c68..0000000
--- a/query_execution/PolicyEnforcerBase.hpp
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <queue>
-#include <unordered_map>
-#include <unordered_set>
-#include <vector>
-
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class QueryHandle;
-
-namespace serialization { class WorkOrderCompletionMessage; }
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A base class that ensures that a high level policy is maintained
- *        in sharing resources among concurrent queries.
- **/
-class PolicyEnforcerBase {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param catalog_database The CatalogDatabase used.
-   **/
-  explicit PolicyEnforcerBase(CatalogDatabaseLite *catalog_database);
-
-  /**
-   * @brief Virtual Destructor.
-   **/
-  virtual ~PolicyEnforcerBase() {
-    if (hasQueries()) {
-      LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
-                      "waiting queries";
-    }
-  }
-
-  /**
-   * @brief Admit multiple queries in the system.
-   *
-   * @note In the current simple implementation, we only allow one active
-   *       query in the system. Other queries will have to wait.
-   *
-   * @param query_handles A vector of QueryHandles for the queries to be
-   *        admitted.
-   *
-   * @return True if all the queries were admitted, false if at least one query
-   *         was not admitted.
-   **/
-  bool admitQueries(const std::vector<QueryHandle*> &query_handles);
-
-  /**
-   * @brief Remove a given query that is under execution.
-   *
-   * @note This function is made public so that it is possible for a query
-   *       to be killed. Otherwise, it should only be used privately by the
-   *       class.
-   *
-   * TODO(harshad) - Extend this function to support removal of waiting 
queries.
-   *
-   * @param query_id The ID of the query to be removed.
-   **/
-  void removeQuery(const std::size_t query_id);
-
-  /**
-   * @brief Process a message sent to the Foreman, which gets passed on to the
-   *        policy enforcer.
-   *
-   * @param message The message.
-   **/
-  void processMessage(const TaggedMessage &tagged_message);
-
-  /**
-   * @brief Check if the given query id ever exists.
-   *
-   * @return True if the query ever exists, otherwise false.
-   **/
-  inline bool existQuery(const std::size_t query_id) const {
-    return admitted_queries_.find(query_id) != admitted_queries_.end() ||
-           removed_query_ids_.find(query_id) != removed_query_ids_.end();
-  }
-
-  /**
-   * @brief Check if there are any queries to be executed.
-   *
-   * @return True if there is at least one active or waiting query, false if
-   *         the policy enforcer doesn't have any query.
-   **/
-  inline bool hasQueries() const {
-    return !(admitted_queries_.empty() && waiting_queries_.empty());
-  }
-
-  /**
-   * @brief Get the profiling results for individual work order execution for a
-   *        given query.
-   *
-   * @note This function should only be called if profiling individual work
-   *       orders option is enabled.
-   *
-   * @param query_id The ID of the query for which the profiling results are
-   *        requested.
-   *
-   * @return A vector of records, each being a single profiling entry.
-   **/
-  inline const std::vector<WorkOrderTimeEntry>& getProfilingResults(
-      const std::size_t query_id) const {
-    DCHECK(profile_individual_workorders_);
-    DCHECK(workorder_time_recorder_.find(query_id) !=
-           workorder_time_recorder_.end());
-    return workorder_time_recorder_.at(query_id);
-  }
-
-  /**
-   * @brief Admit a query to the system.
-   *
-   * @param query_handle The QueryHandle for the new query.
-   *
-   * @return Whether the query was admitted to the system.
-   **/
-  virtual bool admitQuery(QueryHandle *query_handle) = 0;
-
- protected:
-  static constexpr std::size_t kMaxConcurrentQueries = 1;
-
-  /**
-   * @brief Add custom actions upon the completion of a query.
-   *
-   * @param query_manager The query manager.
-   **/
-  virtual void onQueryCompletion(QueryManagerBase *query_manager) {}
-
-  /**
-   * @brief Record the execution time for a finished WorkOrder.
-   *
-   * TODO(harshad) - Extend the functionality to rebuild work orders.
-   *
-   * @param proto The completion message proto sent after the WorkOrder
-   *        execution.
-   **/
-  void recordTimeForWorkOrder(
-      const serialization::WorkOrderCompletionMessage &proto);
-
-  CatalogDatabaseLite *catalog_database_;
-
-  const bool profile_individual_workorders_;
-
-  // Key = query ID, value = QueryManagerBase* for the key query.
-  std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> 
admitted_queries_;
-
-  // TODO(quickstep-team): Delete a 'query_id' after receiving all
-  // 'QueryInitiateResponseMessage's for the 'query_id'.
-  std::unordered_set<std::size_t> removed_query_ids_;
-
-  // The queries which haven't been admitted yet.
-  std::queue<QueryHandle*> waiting_queries_;
-
-  WorkOrderTimeRecorder workorder_time_recorder_;
-
- private:
-  /**
-   * @brief Decrement the number of queued workorders for the given worker by 
1.
-   *
-   * @param proto The completion message proto received after the WorkOrder
-   *        execution.
-   **/
-  virtual void decrementNumQueuedWorkOrders(
-      const serialization::WorkOrderCompletionMessage &proto) = 0;
-
-  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerBase);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp 
b/query_execution/PolicyEnforcerDistributed.cpp
deleted file mode 100644
index c06fd86..0000000
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#include "query_execution/PolicyEnforcerDistributed.hpp"
-
-#include <cstddef>
-#include <cstdlib>
-#include <memory>
-#include <queue>
-#include <utility>
-#include <unordered_map>
-#include <vector>
-
-#include "catalog/Catalog.pb.h"
-#include "catalog/CatalogRelation.hpp"
-#include "query_execution/QueryContext.pb.h"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/QueryManagerDistributed.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "storage/StorageBlockInfo.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-
-#include "tmb/address.h"
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::free;
-using std::malloc;
-using std::move;
-using std::size_t;
-using std::unique_ptr;
-using std::vector;
-
-using tmb::TaggedMessage;
-
-namespace quickstep {
-
-namespace S = serialization;
-
-DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages 
that"
-              " can be allocated in a single round of dispatch of messages to"
-              " the workers.");
-
-void PolicyEnforcerDistributed::getWorkOrderProtoMessages(
-    vector<unique_ptr<S::WorkOrderMessage>> *work_order_proto_messages) {
-  // Iterate over admitted queries until either there are no more
-  // messages available, or the maximum number of messages have
-  // been collected.
-  DCHECK(work_order_proto_messages->empty());
-  // TODO(harshad) - Make this function generic enough so that it
-  // works well when multiple queries are getting executed.
-  if (admitted_queries_.empty()) {
-    LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is 
running";
-    return;
-  }
-
-  const std::size_t per_query_share =
-      FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
-  DCHECK_GT(per_query_share, 0u);
-
-  vector<std::size_t> finished_queries_ids;
-
-  for (const auto &admitted_query_info : admitted_queries_) {
-    QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
-    DCHECK(curr_query_manager != nullptr);
-    std::size_t messages_collected_curr_query = 0;
-    while (messages_collected_curr_query < per_query_share) {
-      S::WorkOrderMessage *next_work_order_message =
-          
static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0);
-      if (next_work_order_message != nullptr) {
-        ++messages_collected_curr_query;
-        
work_order_proto_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
-      } else {
-        // No more work ordes from the current query at this time.
-        // Check if the query's execution is over.
-        if 
(curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
-          // If the query has been executed, remove it.
-          finished_queries_ids.push_back(admitted_query_info.first);
-        }
-        break;
-      }
-    }
-  }
-  for (const std::size_t finished_qid : finished_queries_ids) {
-    onQueryCompletion(admitted_queries_[finished_qid].get());
-    removeQuery(finished_qid);
-  }
-}
-
-bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) {
-  if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
-    // Ok to admit the query.
-    const std::size_t query_id = query_handle->query_id();
-    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
-      // NOTE(zuyu): Should call before constructing a 'QueryManager'.
-      // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext'
-      // initializes.
-      initiateQueryInShiftboss(query_handle);
-
-      // Query with the same ID not present, ok to admit.
-      admitted_queries_[query_id].reset(
-          new QueryManagerDistributed(query_handle, shiftboss_directory_, 
foreman_client_id_, bus_));
-      return true;
-    } else {
-      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
-      return false;
-    }
-  } else {
-    // This query will have to wait.
-    waiting_queries_.push(query_handle);
-    return false;
-  }
-}
-
-void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const 
tmb::TaggedMessage &tagged_message) {
-  S::InitiateRebuildResponseMessage proto;
-  CHECK(proto.ParseFromArray(tagged_message.message(), 
tagged_message.message_bytes()));
-
-  const std::size_t query_id = proto.query_id();
-  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-  QueryManagerDistributed *query_manager = 
static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
-
-  const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders();
-  query_manager->processInitiateRebuildResponseMessage(proto.operator_index(), 
num_rebuild_work_orders);
-  shiftboss_directory_->addNumQueuedWorkOrders(proto.shiftboss_index(), 
num_rebuild_work_orders);
-
-  if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
-    onQueryCompletion(query_manager);
-
-    removeQuery(query_id);
-    if (!waiting_queries_.empty()) {
-      // Admit the earliest waiting query.
-      QueryHandle *new_query = waiting_queries_.front();
-      waiting_queries_.pop();
-      admitQuery(new_query);
-    }
-  }
-}
-
-void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle 
*query_handle) {
-  S::QueryInitiateMessage proto;
-  proto.set_query_id(query_handle->query_id());
-  
proto.mutable_catalog_database_cache()->MergeFrom(query_handle->getCatalogDatabaseCacheProto());
-  
proto.mutable_query_context()->MergeFrom(query_handle->getQueryContextProto());
-
-  const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kQueryInitiateMessage);
-  free(proto_bytes);
-
-  // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
-  tmb::Address shiftboss_addresses;
-  for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {
-    shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
-  }
-
-  DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" 
<< kQueryInitiateMessage
-             << "') to all Shiftbosses";
-  QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
-                                       shiftboss_addresses,
-                                       move(message),
-                                       bus_);
-}
-
-void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase 
*query_manager) {
-  const QueryHandle *query_handle = query_manager->query_handle();
-
-  const CatalogRelation *query_result = query_handle->getQueryResultRelation();
-  const tmb::client_id cli_id = query_handle->getClientId();
-  const std::size_t query_id = query_handle->query_id();
-
-  // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
-  tmb::Address shiftboss_addresses;
-  for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {
-    shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
-  }
-
-  if (query_result == nullptr) {
-    // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
-    serialization::QueryTeardownMessage proto;
-    proto.set_query_id(query_id);
-
-    const size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kQueryTeardownMessage);
-
-    DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed 
'" << kQueryTeardownMessage
-               << "') to all Shiftbosses";
-    QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
-                                         shiftboss_addresses,
-                                         move(message),
-                                         bus_);
-
-    TaggedMessage cli_message(kQueryExecutionSuccessMessage);
-
-    // Notify the CLI query execution successfully.
-    DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage 
(typed '"
-               << kQueryExecutionSuccessMessage
-               << "') to CLI with TMB client id " << cli_id;
-    const tmb::MessageBus::SendStatus send_status =
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           foreman_client_id_,
-                                           cli_id,
-                                           move(cli_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-    return;
-  }
-
-  // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in 
Shiftboss.
-  S::SaveQueryResultMessage proto;
-  proto.set_query_id(query_id);
-  proto.set_relation_id(query_result->getID());
-
-  const vector<block_id> blocks(query_result->getBlocksSnapshot());
-  for (const block_id block : blocks) {
-    proto.add_blocks(block);
-  }
-
-  proto.set_cli_id(cli_id);
-
-  const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kSaveQueryResultMessage);
-  free(proto_bytes);
-
-  // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
-  DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed 
'" << kSaveQueryResultMessage
-             << "') to all Shiftbosses";
-  QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
-                                       shiftboss_addresses,
-                                       move(message),
-                                       bus_);
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp 
b/query_execution/PolicyEnforcerDistributed.hpp
deleted file mode 100644
index 146e9af..0000000
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "query_execution/PolicyEnforcerBase.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/ShiftbossDirectory.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb {
-class MessageBus;
-class TaggedMessage;
-}
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class QueryHandle;
-class QueryManagerBase;
-
-namespace serialization { class WorkOrderMessage; }
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A class that ensures that a high level policy is maintained
- *        in sharing resources among concurrent queries.
- **/
-class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param foreman_client_id The TMB client ID of the Foreman.
-   * @param catalog_database The CatalogDatabase used.
-   * @param bus The TMB.
-   **/
-  PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
-                            CatalogDatabaseLite *catalog_database,
-                            ShiftbossDirectory *shiftboss_directory,
-                            tmb::MessageBus *bus)
-      : PolicyEnforcerBase(catalog_database),
-        foreman_client_id_(foreman_client_id),
-        shiftboss_directory_(shiftboss_directory),
-        bus_(bus) {}
-
-  /**
-   * @brief Destructor.
-   **/
-  ~PolicyEnforcerDistributed() override {}
-
-  bool admitQuery(QueryHandle *query_handle) override;
-
-  /**
-   * @brief Get work order messages to be dispatched. These messages come from
-   *        the active queries.
-   *
-   * @param work_order_proto_messages The work order messages to be dispatched.
-   **/
-  void getWorkOrderProtoMessages(
-      std::vector<std::unique_ptr<serialization::WorkOrderMessage>> 
*work_order_proto_messages);
-
-  /**
-   * @brief Process the initiate rebuild work order response message.
-   *
-   * @param tagged_message The message.
-   **/
-  void processInitiateRebuildResponseMessage(const tmb::TaggedMessage 
&tagged_message);
-
- private:
-  void decrementNumQueuedWorkOrders(const 
serialization::WorkOrderCompletionMessage &proto) override {
-    
shiftboss_directory_->decrementNumQueuedWorkOrders(proto.shiftboss_index());
-  }
-
-  void onQueryCompletion(QueryManagerBase *query_manager) override;
-
-  void initiateQueryInShiftboss(QueryHandle *query_handle);
-
-  const tmb::client_id foreman_client_id_;
-
-  ShiftbossDirectory *shiftboss_directory_;
-
-  tmb::MessageBus *bus_;
-
-  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/PolicyEnforcerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.cpp 
b/query_execution/PolicyEnforcerSingleNode.cpp
deleted file mode 100644
index 0aa2ca8..0000000
--- a/query_execution/PolicyEnforcerSingleNode.cpp
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/PolicyEnforcerSingleNode.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <queue>
-#include <utility>
-#include <unordered_map>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/QueryManagerSingleNode.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-
-namespace quickstep {
-
-DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages 
that"
-              " can be allocated in a single round of dispatch of messages to"
-              " the workers.");
-
-void PolicyEnforcerSingleNode::getWorkerMessages(
-    std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
-  // Iterate over admitted queries until either there are no more
-  // messages available, or the maximum number of messages have
-  // been collected.
-  DCHECK(worker_messages->empty());
-  // TODO(harshad) - Make this function generic enough so that it
-  // works well when multiple queries are getting executed.
-  std::size_t per_query_share = 0;
-  if (!admitted_queries_.empty()) {
-    per_query_share = FLAGS_max_msgs_per_dispatch_round / 
admitted_queries_.size();
-  } else {
-    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
-    return;
-  }
-  DCHECK_GT(per_query_share, 0u);
-  std::vector<std::size_t> finished_queries_ids;
-
-  for (const auto &admitted_query_info : admitted_queries_) {
-    QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
-    DCHECK(curr_query_manager != nullptr);
-    std::size_t messages_collected_curr_query = 0;
-    while (messages_collected_curr_query < per_query_share) {
-      WorkerMessage *next_worker_message =
-          
static_cast<QueryManagerSingleNode*>(curr_query_manager)->getNextWorkerMessage(0,
 kAnyNUMANodeID);
-      if (next_worker_message != nullptr) {
-        ++messages_collected_curr_query;
-        
worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
-      } else {
-        // No more work ordes from the current query at this time.
-        // Check if the query's execution is over.
-        if 
(curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
-          // If the query has been executed, remove it.
-          finished_queries_ids.push_back(admitted_query_info.first);
-        }
-        break;
-      }
-    }
-  }
-  for (const std::size_t finished_qid : finished_queries_ids) {
-    removeQuery(finished_qid);
-  }
-}
-
-bool PolicyEnforcerSingleNode::admitQuery(QueryHandle *query_handle) {
-  if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
-    // Ok to admit the query.
-    const std::size_t query_id = query_handle->query_id();
-    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
-      // Query with the same ID not present, ok to admit.
-      admitted_queries_[query_id].reset(
-          new QueryManagerSingleNode(foreman_client_id_, num_numa_nodes_, 
query_handle,
-                                     catalog_database_, storage_manager_, 
bus_));
-      return true;
-    } else {
-      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
-      return false;
-    }
-  } else {
-    // This query will have to wait.
-    waiting_queries_.push(query_handle);
-    return false;
-  }
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/PolicyEnforcerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.hpp 
b/query_execution/PolicyEnforcerSingleNode.hpp
deleted file mode 100644
index f87d670..0000000
--- a/query_execution/PolicyEnforcerSingleNode.hpp
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "query_execution/PolicyEnforcerBase.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/WorkerDirectory.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class QueryHandle;
-class StorageManager;
-class WorkerMessage;
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A class that ensures that a high level policy is maintained
- *        in sharing resources among concurrent queries.
- **/
-class PolicyEnforcerSingleNode final : public PolicyEnforcerBase {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param foreman_client_id The TMB client ID of the Foreman.
-   * @param num_numa_nodes Number of NUMA nodes used by the system.
-   * @param catalog_database The CatalogDatabase used.
-   * @param storage_manager The StorageManager used.
-   * @param bus The TMB.
-   **/
-  PolicyEnforcerSingleNode(const tmb::client_id foreman_client_id,
-                           const std::size_t num_numa_nodes,
-                           CatalogDatabaseLite *catalog_database,
-                           StorageManager *storage_manager,
-                           WorkerDirectory *worker_directory,
-                           tmb::MessageBus *bus)
-      : PolicyEnforcerBase(catalog_database),
-        foreman_client_id_(foreman_client_id),
-        num_numa_nodes_(num_numa_nodes),
-        storage_manager_(storage_manager),
-        worker_directory_(worker_directory),
-        bus_(bus) {}
-
-  /**
-   * @brief Destructor.
-   **/
-  ~PolicyEnforcerSingleNode() override {}
-
-  bool admitQuery(QueryHandle *query_handle) override;
-
-  /**
-   * @brief Get worker messages to be dispatched. These worker messages come
-   *        from the active queries.
-   *
-   * @param worker_messages The worker messages to be dispatched.
-   **/
-  void getWorkerMessages(
-      std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
-
- private:
-  void decrementNumQueuedWorkOrders(const 
serialization::WorkOrderCompletionMessage &proto) override {
-    
worker_directory_->decrementNumQueuedWorkOrders(proto.worker_thread_index());
-  }
-
-  const tmb::client_id foreman_client_id_;
-  const std::size_t num_numa_nodes_;
-
-  StorageManager *storage_manager_;
-  WorkerDirectory *worker_directory_;
-
-  tmb::MessageBus *bus_;
-
-  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerSingleNode);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
deleted file mode 100644
index 0e6636d..0000000
--- a/query_execution/QueryContext.cpp
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/QueryContext.hpp"
-
-#include <memory>
-#include <unordered_map>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogDatabaseLite.hpp"
-#include "catalog/CatalogRelationSchema.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/ExpressionFactories.hpp"
-#include "expressions/table_generator/GeneratorFunction.pb.h"
-#include "expressions/table_generator/GeneratorFunctionFactory.hpp"
-#include "expressions/table_generator/GeneratorFunctionHandle.hpp"
-#include "query_execution/QueryContext.pb.h"
-#include "storage/AggregationOperationState.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
-#include "storage/InsertDestination.hpp"
-#include "storage/InsertDestination.pb.h"
-#include "types/TypedValue.hpp"
-#include "types/containers/Tuple.hpp"
-#include "utility/SortConfiguration.hpp"
-#include "utility/lip_filter/LIPFilter.hpp"
-#include "utility/lip_filter/LIPFilterDeployment.hpp"
-#include "utility/lip_filter/LIPFilterFactory.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-
-using std::move;
-using std::unique_ptr;
-using std::vector;
-
-namespace quickstep {
-
-QueryContext::QueryContext(const serialization::QueryContext &proto,
-                           const CatalogDatabaseLite &database,
-                           StorageManager *storage_manager,
-                           const tmb::client_id scheduler_client_id,
-                           tmb::MessageBus *bus) {
-  DCHECK(ProtoIsValid(proto, database))
-      << "Attempted to create QueryContext from an invalid proto 
description:\n"
-      << proto.DebugString();
-
-  for (int i = 0; i < proto.aggregation_states_size(); ++i) {
-    aggregation_states_.emplace_back(
-        
AggregationOperationState::ReconstructFromProto(proto.aggregation_states(i),
-                                                        database,
-                                                        storage_manager));
-  }
-
-  for (int i = 0; i < proto.generator_functions_size(); ++i) {
-    const GeneratorFunctionHandle *func_handle =
-        
GeneratorFunctionFactory::Instance().reconstructFromProto(proto.generator_functions(i));
-    DCHECK(func_handle != nullptr);
-    generator_functions_.emplace_back(
-        std::unique_ptr<const GeneratorFunctionHandle>(func_handle));
-  }
-
-  for (int i = 0; i < proto.join_hash_tables_size(); ++i) {
-    join_hash_tables_.emplace_back(
-        
JoinHashTableFactory::CreateResizableFromProto(proto.join_hash_tables(i),
-                                                       storage_manager));
-  }
-
-  for (int i = 0; i < proto.insert_destinations_size(); ++i) {
-    const serialization::InsertDestination &insert_destination_proto = 
proto.insert_destinations(i);
-    insert_destinations_.emplace_back(InsertDestination::ReconstructFromProto(
-        proto.query_id(),
-        insert_destination_proto,
-        database.getRelationSchemaById(insert_destination_proto.relation_id()),
-        storage_manager,
-        scheduler_client_id,
-        bus));
-  }
-
-  for (int i = 0; i < proto.lip_filters_size(); ++i) {
-    lip_filters_.emplace_back(
-        std::unique_ptr<LIPFilter>(
-            LIPFilterFactory::ReconstructFromProto(proto.lip_filters(i))));
-  }
-
-  for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) {
-    lip_deployments_.emplace_back(
-        std::make_unique<LIPFilterDeployment>(
-            proto.lip_filter_deployments(i), lip_filters_));
-  }
-
-  for (int i = 0; i < proto.predicates_size(); ++i) {
-    predicates_.emplace_back(
-        PredicateFactory::ReconstructFromProto(proto.predicates(i), database));
-  }
-
-  for (int i = 0; i < proto.scalar_groups_size(); ++i) {
-    vector<unique_ptr<const Scalar>> scalar_group;
-
-    const serialization::QueryContext::ScalarGroup &scalar_group_proto = 
proto.scalar_groups(i);
-    for (int j = 0; j < scalar_group_proto.scalars_size(); ++j) {
-      scalar_group.emplace_back(
-          ScalarFactory::ReconstructFromProto(scalar_group_proto.scalars(j), 
database));
-    }
-
-    scalar_groups_.push_back(move(scalar_group));
-  }
-
-  for (int i = 0; i < proto.sort_configs_size(); ++i) {
-    sort_configs_.emplace_back(
-        SortConfiguration::ReconstructFromProto(proto.sort_configs(i), 
database));
-  }
-
-  for (int i = 0; i < proto.tuples_size(); ++i) {
-    tuples_.emplace_back(Tuple::ReconstructFromProto(proto.tuples(i)));
-  }
-
-  for (int i = 0; i < proto.update_groups_size(); ++i) {
-    const serialization::QueryContext::UpdateGroup &update_group_proto = 
proto.update_groups(i);
-
-    std::unordered_map<attribute_id, std::unique_ptr<const Scalar>> 
update_group;
-    for (int j = 0; j < update_group_proto.update_assignments_size(); ++j) {
-      const serialization::QueryContext::UpdateGroup::UpdateAssignment 
&update_assignment_proto =
-          update_group_proto.update_assignments(j);
-
-      unique_ptr<const Scalar> scalar(
-          
ScalarFactory::ReconstructFromProto(update_assignment_proto.scalar(), 
database));
-
-      update_group.emplace(update_assignment_proto.attribute_id(), 
move(scalar));
-    }
-
-    update_groups_.push_back(move(update_group));
-  }
-
-  for (int i = 0; i < proto.window_aggregation_states_size(); ++i) {
-    window_aggregation_states_.emplace_back(
-        
WindowAggregationOperationState::ReconstructFromProto(proto.window_aggregation_states(i),
-                                                              database,
-                                                              
storage_manager));
-  }
-}
-
-bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
-                                const CatalogDatabaseLite &database) {
-  for (int i = 0; i < proto.aggregation_states_size(); ++i) {
-    if (!AggregationOperationState::ProtoIsValid(proto.aggregation_states(i), 
database)) {
-      return false;
-    }
-  }
-
-  // Each GeneratorFunctionHandle object is serialized as a function name with
-  // a list of arguments. Here checks that the arguments are valid 
TypedValue's.
-  for (int i = 0; i < proto.generator_functions_size(); ++i) {
-    const serialization::GeneratorFunctionHandle &func_proto = 
proto.generator_functions(i);
-    for (int j = 0; j < func_proto.args_size(); ++j) {
-      if (!TypedValue::ProtoIsValid(func_proto.args(j))) {
-        return false;
-      }
-    }
-  }
-
-  for (int i = 0; i < proto.join_hash_tables_size(); ++i) {
-    if (!JoinHashTableFactory::ProtoIsValid(proto.join_hash_tables(i))) {
-      return false;
-    }
-  }
-
-  for (int i = 0; i < proto.insert_destinations_size(); ++i) {
-    const serialization::InsertDestination &insert_destination_proto = 
proto.insert_destinations(i);
-    const relation_id rel_id = insert_destination_proto.relation_id();
-
-    if (!database.hasRelationWithId(rel_id) ||
-        !InsertDestination::ProtoIsValid(insert_destination_proto,
-                                         
database.getRelationSchemaById(rel_id))) {
-      return false;
-    }
-  }
-
-  for (int i = 0; i < proto.lip_filters_size(); ++i) {
-    if (!LIPFilterFactory::ProtoIsValid(proto.lip_filters(i))) {
-      return false;
-    }
-  }
-
-  for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) {
-    if (!LIPFilterDeployment::ProtoIsValid(proto.lip_filter_deployments(i))) {
-      return false;
-    }
-  }
-
-  for (int i = 0; i < proto.predicates_size(); ++i) {
-    if (!PredicateFactory::ProtoIsValid(proto.predicates(i), database)) {
-      return false;
-    }
-  }
-
-  for (int i = 0; i < proto.scalar_groups_size(); ++i) {
-    const serialization::QueryContext::ScalarGroup &scalar_group_proto = 
proto.scalar_groups(i);
-    for (int j = 0; j < scalar_group_proto.scalars_size(); ++j) {
-      if (!ScalarFactory::ProtoIsValid(scalar_group_proto.scalars(j), 
database)) {
-        return false;
-      }
-    }
-  }
-
-  for (int i = 0; i < proto.sort_configs_size(); ++i) {
-    if (!SortConfiguration::ProtoIsValid(proto.sort_configs(i), database)) {
-      return false;
-    }
-  }
-
-  for (int i = 0; i < proto.tuples_size(); ++i) {
-    if (!Tuple::ProtoIsValid(proto.tuples(i))) {
-      return false;
-    }
-  }
-
-  for (int i = 0; i < proto.update_groups_size(); ++i) {
-    const serialization::QueryContext::UpdateGroup &update_group_proto = 
proto.update_groups(i);
-
-    const relation_id rel_id = update_group_proto.relation_id();
-    if (!database.hasRelationWithId(rel_id)) {
-      return false;
-    }
-    const CatalogRelationSchema &rel = database.getRelationSchemaById(rel_id);
-
-    for (int j = 0; j < update_group_proto.update_assignments_size(); ++j) {
-      const serialization::QueryContext::UpdateGroup::UpdateAssignment 
&update_assignment_proto =
-          update_group_proto.update_assignments(j);
-
-      if (!rel.hasAttributeWithId(update_assignment_proto.attribute_id()) ||
-          !ScalarFactory::ProtoIsValid(update_assignment_proto.scalar(), 
database)) {
-        return false;
-      }
-    }
-  }
-
-  for (int i = 0; i < proto.window_aggregation_states_size(); ++i) {
-    if 
(!WindowAggregationOperationState::ProtoIsValid(proto.window_aggregation_states(i),
-                                                       database)) {
-      return false;
-    }
-  }
-
-  return proto.IsInitialized();
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
deleted file mode 100644
index 7ad8fa1..0000000
--- a/query_execution/QueryContext.hpp
+++ /dev/null
@@ -1,585 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_CONTEXT_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_CONTEXT_HPP_
-
-#include <cstddef>
-#include <cstdint>
-#include <memory>
-#include <unordered_map>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/predicate/Predicate.hpp"
-#include "expressions/scalar/Scalar.hpp"
-#include "expressions/table_generator/GeneratorFunctionHandle.hpp"
-#include "storage/AggregationOperationState.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/InsertDestination.hpp"
-#include "storage/WindowAggregationOperationState.hpp"
-#include "types/containers/Tuple.hpp"
-#include "utility/Macros.hpp"
-#include "utility/SortConfiguration.hpp"
-#include "utility/lip_filter/LIPFilter.hpp"
-#include "utility/lip_filter/LIPFilterDeployment.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class StorageManager;
-
-namespace serialization { class QueryContext; }
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief The QueryContext stores stateful execution info per query.
- **/
-class QueryContext {
- public:
-  /**
-   * @brief A unique identifier for an AggregationOperationState per query.
-   **/
-  typedef std::uint32_t aggregation_state_id;
-
-  /**
-   * @brief A unique identifier for a GeneratorFunctionHandle per query.
-   **/
-  typedef std::uint32_t generator_function_id;
-
-  /**
-   * @brief A unique identifier for an InsertDestination per query.
-   *
-   * @note A negative value indicates a nonexistent InsertDestination.
-   **/
-  typedef std::int32_t insert_destination_id;
-  static constexpr insert_destination_id kInvalidInsertDestinationId = 
static_cast<insert_destination_id>(-1);
-
-  /**
-   * @brief A unique identifier for a JoinHashTable per query.
-   **/
-  typedef std::uint32_t join_hash_table_id;
-
-  /**
-   * @brief A unique identifier for a LIPFilterDeployment per query.
-   **/
-  typedef std::int32_t lip_deployment_id;
-  static constexpr lip_deployment_id kInvalidLIPDeploymentId = 
static_cast<lip_deployment_id>(-1);
-
-  /**
-   * @brief A unique identifier for a LIPFilter per query.
-   **/
-  typedef std::uint32_t lip_filter_id;
-
-  /**
-   * @brief A unique identifier for a Predicate per query.
-   *
-   * @note A negative value indicates a null Predicate.
-   **/
-  typedef std::int32_t predicate_id;
-  static constexpr predicate_id kInvalidPredicateId = 
static_cast<predicate_id>(-1);
-
-  /**
-   * @brief A unique identifier for a group of Scalars per query.
-   *
-   * @note A negative value indicates a nonexistent ScalarGroup.
-   **/
-  typedef std::int32_t scalar_group_id;
-  static constexpr scalar_group_id kInvalidScalarGroupId = 
static_cast<scalar_group_id>(-1);
-
-  /**
-   * @brief A unique identifier for a SortConfiguration per query.
-   **/
-  typedef std::uint32_t sort_config_id;
-
-  /**
-   * @brief A unique identifier for a Tuple to be inserted per query.
-   **/
-  typedef std::uint32_t tuple_id;
-
-  /**
-   * @brief A unique identifier for a group of UpdateAssignments per query.
-   **/
-  typedef std::uint32_t update_group_id;
-
-  /**
-   * @brief A unique identifier for a window aggregation state.
-   **/
-  typedef std::uint32_t window_aggregation_state_id;
-
-  /**
-   * @brief Constructor.
-   *
-   * @param proto A serialized Protocol Buffer representation of a
-   *        QueryContext, originally generated by the optimizer.
-   * @param database The Database to resolve relation and attribute references
-   *        in.
-   * @param storage_manager The StorageManager to use.
-   * @param scheduler_client_id The TMB client ID of the scheduler thread.
-   * @param bus A pointer to the TMB.
-   **/
-  QueryContext(const serialization::QueryContext &proto,
-               const CatalogDatabaseLite &database,
-               StorageManager *storage_manager,
-               const tmb::client_id scheduler_client_id,
-               tmb::MessageBus *bus);
-
-  ~QueryContext() {}
-
-  /**
-   * @brief Check whether a serialization::QueryContext is fully-formed and
-   *        all parts are valid.
-   *
-   * @param proto A serialized Protocol Buffer representation of a 
QueryContext,
-   *        originally generated by the optimizer.
-   * @param database The Database to resolve relation and attribute references
-   *        in.
-   * @return Whether proto is fully-formed and valid.
-   **/
-  static bool ProtoIsValid(const serialization::QueryContext &proto,
-                           const CatalogDatabaseLite &database);
-
-  /**
-   * @brief Whether the given AggregationOperationState id is valid.
-   *
-   * @param id The AggregationOperationState id.
-   *
-   * @return True if valid, otherwise false.
-   **/
-  bool isValidAggregationStateId(const aggregation_state_id id) const {
-    return id < aggregation_states_.size();
-  }
-
-  /**
-   * @brief Get the AggregationOperationState.
-   *
-   * @param id The AggregationOperationState id in the query.
-   *
-   * @return The AggregationOperationState, alreadly created in the 
constructor.
-   **/
-  inline AggregationOperationState* getAggregationState(const 
aggregation_state_id id) {
-    DCHECK_LT(id, aggregation_states_.size());
-    DCHECK(aggregation_states_[id]);
-    return aggregation_states_[id].get();
-  }
-
-  /**
-   * @brief Destroy the given aggregation state.
-   *
-   * @param id The ID of the AggregationOperationState to destroy.
-   **/
-  inline void destroyAggregationState(const aggregation_state_id id) {
-    DCHECK_LT(id, aggregation_states_.size());
-    DCHECK(aggregation_states_[id]);
-    aggregation_states_[id].reset(nullptr);
-  }
-
-  /**
-   * @brief Destroy the payloads from the aggregation hash tables.
-   *
-   * @warning After calling these methods, the hash table will be in an invalid
-   *          state. No other operation should be performed on them.
-   *
-   * @param id The ID of the AggregationOperationState.
-   **/
-  inline void destroyAggregationHashTablePayload(const aggregation_state_id 
id) {
-    DCHECK_LT(id, aggregation_states_.size());
-    DCHECK(aggregation_states_[id]);
-    aggregation_states_[id]->destroyAggregationHashTablePayload();
-  }
-
-  /**
-   * @brief Whether the given GeneratorFunctionHandle id is valid.
-   *
-   * @param id The GeneratorFunctionHandle id.
-   *
-   * @return True if valid, otherwise false.
-   **/
-  bool isValidGeneratorFunctionId(const generator_function_id id) const {
-    return id < generator_functions_.size();
-  }
-
-  /**
-   * @brief Get the GeneratorFunctionHandle.
-   *
-   * @param id The GeneratorFunctionHandle id in the query.
-   *
-   * @return The GeneratorFunctionHandle, alreadly created in the constructor.
-   **/
-  inline const GeneratorFunctionHandle& getGeneratorFunctionHandle(
-      const generator_function_id id) {
-    DCHECK_LT(static_cast<std::size_t>(id), generator_functions_.size());
-    return *generator_functions_[id];
-  }
-
-  /**
-   * @brief Whether the given InsertDestination id is valid.
-   *
-   * @param id The InsertDestination id.
-   *
-   * @return True if valid, otherwise false.
-   **/
-  bool isValidInsertDestinationId(const insert_destination_id id) const {
-    return id != kInvalidInsertDestinationId
-        && id >= 0
-        && static_cast<std::size_t>(id) < insert_destinations_.size();
-  }
-
-  /**
-   * @brief Get the InsertDestination.
-   *
-   * @param id The InsertDestination id in the query.
-   *
-   * @return The InsertDestination, alreadly created in the constructor.
-   **/
-  inline InsertDestination* getInsertDestination(const insert_destination_id 
id) {
-    DCHECK_GE(id, 0);
-    DCHECK_LT(static_cast<std::size_t>(id), insert_destinations_.size());
-    return insert_destinations_[id].get();
-  }
-
-  /**
-   * @brief Destory the given InsertDestination.
-   *
-   * @param id The id of the InsertDestination to destroy.
-   **/
-  inline void destroyInsertDestination(const insert_destination_id id) {
-    DCHECK_GE(id, 0);
-    DCHECK_LT(static_cast<std::size_t>(id), insert_destinations_.size());
-    insert_destinations_[id].reset();
-  }
-
-  /**
-   * @brief Whether the given JoinHashTable id is valid.
-   *
-   * @param id The JoinHashTable id.
-   *
-   * @return True if valid, otherwise false.
-   **/
-  bool isValidJoinHashTableId(const join_hash_table_id id) const {
-    return id < join_hash_tables_.size();
-  }
-
-  /**
-   * @brief Get the JoinHashTable.
-   *
-   * @param id The JoinHashTable id in the query.
-   *
-   * @return The JoinHashTable, already created in the constructor.
-   **/
-  inline JoinHashTable* getJoinHashTable(const join_hash_table_id id) {
-    DCHECK_LT(id, join_hash_tables_.size());
-    return join_hash_tables_[id].get();
-  }
-
-  /**
-   * @brief Destory the given JoinHashTable.
-   *
-   * @param id The id of the JoinHashTable to destroy.
-   **/
-  inline void destroyJoinHashTable(const join_hash_table_id id) {
-    DCHECK_LT(id, join_hash_tables_.size());
-    join_hash_tables_[id].reset();
-  }
-
-  /**
-   * @brief Whether the given LIPFilterDeployment id is valid.
-   *
-   * @param id The LIPFilterDeployment id.
-   *
-   * @return True if valid, otherwise false.
-   **/
-  bool isValidLIPDeploymentId(const lip_deployment_id id) const {
-    return static_cast<std::size_t>(id) < lip_deployments_.size();
-  }
-
-  /**
-   * @brief Get a constant pointer to the LIPFilterDeployment.
-   *
-   * @param id The LIPFilterDeployment id.
-   *
-   * @return The constant pointer to LIPFilterDeployment that is
-   *         already created in the constructor.
-   **/
-  inline const LIPFilterDeployment* getLIPDeployment(
-      const lip_deployment_id id) const {
-    DCHECK_LT(static_cast<std::size_t>(id), lip_deployments_.size());
-    return lip_deployments_[id].get();
-  }
-
-  /**
-   * @brief Destory the given LIPFilterDeployment.
-   *
-   * @param id The id of the LIPFilterDeployment to destroy.
-   **/
-  inline void destroyLIPDeployment(const lip_deployment_id id) {
-    DCHECK_LT(static_cast<std::size_t>(id), lip_deployments_.size());
-    lip_deployments_[id].reset();
-  }
-
-  /**
-   * @brief Whether the given LIPFilter id is valid.
-   *
-   * @param id The LIPFilter id.
-   *
-   * @return True if valid, otherwise false.
-   **/
-  bool isValidLIPFilterId(const lip_filter_id id) const {
-    return id < lip_filters_.size();
-  }
-
-  /**
-   * @brief Get a mutable reference to the LIPFilter.
-   *
-   * @param id The LIPFilter id.
-   *
-   * @return The LIPFilter, already created in the constructor.
-   **/
-  inline LIPFilter* getLIPFilterMutable(const lip_filter_id id) {
-    DCHECK_LT(id, lip_filters_.size());
-    return lip_filters_[id].get();
-  }
-
-  /**
-   * @brief Get a constant pointer to the LIPFilter.
-   *
-   * @param id The LIPFilter id.
-   *
-   * @return The constant pointer to LIPFilter that is
-   *         already created in the constructor.
-   **/
-  inline const LIPFilter* getLIPFilter(const lip_filter_id id) const {
-    DCHECK_LT(id, lip_filters_.size());
-    return lip_filters_[id].get();
-  }
-
-  /**
-   * @brief Destory the given LIPFilter.
-   *
-   * @param id The id of the LIPFilter to destroy.
-   **/
-  inline void destroyLIPFilter(const lip_filter_id id) {
-    DCHECK_LT(id, lip_filters_.size());
-    lip_filters_[id].reset();
-  }
-
-  /**
-   * @brief Whether the given Predicate id is valid or no predicate.
-   *
-   * @param id The Predicate id.
-   *
-   * @return True if valid or no predicate, otherwise false.
-   **/
-  bool isValidPredicate(const predicate_id id) const {
-    return (id == kInvalidPredicateId)  // No predicate.
-        || (id >= 0 && static_cast<std::size_t>(id) < predicates_.size());
-  }
-
-  /**
-   * @brief Get the const Predicate.
-   *
-   * @param id The Predicate id in the query.
-   *
-   * @return The const Predicate (alreadly created in the constructor), or
-   *         nullptr for the given invalid id.
-   **/
-  inline const Predicate* getPredicate(const predicate_id id) {
-    if (id == kInvalidPredicateId) {
-      return nullptr;
-    }
-
-    DCHECK_GE(id, 0);
-    DCHECK_LT(static_cast<std::size_t>(id), predicates_.size());
-    return predicates_[id].get();
-  }
-
-  /**
-   * @brief Whether the given Scalar group id is valid.
-   *
-   * @param id The Scalar group id.
-   *
-   * @return True if valid, otherwise false.
-   **/
-  bool isValidScalarGroupId(const scalar_group_id id) const {
-    return id != kInvalidScalarGroupId
-        && id >= 0
-        && static_cast<std::size_t>(id) < scalar_groups_.size();
-  }
-
-  /**
-   * @brief Get the group of Scalars.
-   *
-   * @param id The Scalar group id in the query.
-   *
-   * @return The reference to the Scalar group, alreadly created in the
-   *         constructor.
-   **/
-  inline const std::vector<std::unique_ptr<const Scalar>>& 
getScalarGroup(const scalar_group_id id) {
-    DCHECK_GE(id, 0);
-    DCHECK_LT(static_cast<std::size_t>(id), scalar_groups_.size());
-    return scalar_groups_[id];
-  }
-
- /**
-   * @brief Whether the given SortConfiguration id is valid.
-   *
-   * @param id The SortConfiguration id.
-   *
-   * @return True if valid, otherwise false.
-   **/
-  bool isValidSortConfigId(const sort_config_id id) const {
-    return id < sort_configs_.size();
-  }
-
-  /**
-   * @brief Get the SortConfiguration.
-   *
-   * @param id The SortConfiguration id in the query.
-   *
-   * @return The SortConfiguration, alreadly created in the constructor.
-   **/
-  inline const SortConfiguration& getSortConfig(const sort_config_id id) {
-    DCHECK_LT(id, sort_configs_.size());
-    return *sort_configs_[id];
-  }
-
-  /**
-   * @brief Whether the given Tuple id is valid.
-   *
-   * @param id The Tuple id.
-   *
-   * @return True if valid, otherwise false.
-   **/
-  bool isValidTupleId(const tuple_id id) const {
-    return id < tuples_.size();
-  }
-
-  /**
-   * @brief Release the ownership of the Tuple referenced by the id.
-   *
-   * @note Each id should use only once.
-   *
-   * @param id The Tuple id in the query.
-   *
-   * @return The Tuple, alreadly created in the constructor.
-   **/
-  inline Tuple* releaseTuple(const tuple_id id) {
-    DCHECK_LT(id, tuples_.size());
-    DCHECK(tuples_[id]);
-    return tuples_[id].release();
-  }
-
-  /**
-   * @brief Whether the given update assignments group id is valid.
-   *
-   * @param id The group id of the update assignments.
-   *
-   * @return True if valid, otherwise false.
-   **/
-  bool isValidUpdateGroupId(const update_group_id id) const {
-    return static_cast<std::size_t>(id) < update_groups_.size();
-  }
-
-  /**
-   * @brief Get the group of the update assignments for UpdateWorkOrder.
-   *
-   * @param id The group id of the update assignments in the query.
-   *
-   * @return The reference to the update assignments group, alreadly created 
in the
-   *         constructor.
-   **/
-  inline const std::unordered_map<attribute_id, std::unique_ptr<const 
Scalar>>& getUpdateGroup(
-      const update_group_id id) {
-    DCHECK_LT(static_cast<std::size_t>(id), update_groups_.size());
-    DCHECK(!update_groups_[id].empty());
-    return update_groups_[id];
-  }
-
-  /**
-   * @brief Whether the given WindowAggregationOperationState id is valid.
-   *
-   * @param id The WindowAggregationOperationState id.
-   *
-   * @return True if valid, otherwise false.
-   **/
-  bool isValidWindowAggregationStateId(const window_aggregation_state_id id) 
const {
-    return id < window_aggregation_states_.size();
-  }
-
-  /**
-   * @brief Get the WindowAggregationOperationState.
-   *
-   * @param id The WindowAggregationOperationState id in the query.
-   *
-   * @return The WindowAggregationOperationState, already created in the
-   *         constructor.
-   **/
-  inline WindowAggregationOperationState* getWindowAggregationState(
-      const window_aggregation_state_id id) {
-    DCHECK_LT(id, window_aggregation_states_.size());
-    DCHECK(window_aggregation_states_[id]);
-    return window_aggregation_states_[id].get();
-  }
-
-  /**
-   * @brief Release the given WindowAggregationOperationState.
-   *
-   * @param id The id of the WindowAggregationOperationState to destroy.
-   *
-   * @return The WindowAggregationOperationState, already created in the
-   *         constructor.
-   **/
-  inline WindowAggregationOperationState* releaseWindowAggregationState(
-      const window_aggregation_state_id id) {
-    DCHECK_LT(id, window_aggregation_states_.size());
-    DCHECK(window_aggregation_states_[id]);
-    return window_aggregation_states_[id].release();
-  }
-
- private:
-  std::vector<std::unique_ptr<AggregationOperationState>> aggregation_states_;
-  std::vector<std::unique_ptr<const GeneratorFunctionHandle>> 
generator_functions_;
-  std::vector<std::unique_ptr<InsertDestination>> insert_destinations_;
-  std::vector<std::unique_ptr<JoinHashTable>> join_hash_tables_;
-  std::vector<std::unique_ptr<LIPFilterDeployment>> lip_deployments_;
-  std::vector<std::unique_ptr<LIPFilter>> lip_filters_;
-  std::vector<std::unique_ptr<const Predicate>> predicates_;
-  std::vector<std::vector<std::unique_ptr<const Scalar>>> scalar_groups_;
-  std::vector<std::unique_ptr<const SortConfiguration>> sort_configs_;
-  std::vector<std::unique_ptr<Tuple>> tuples_;
-  std::vector<std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>> 
update_groups_;
-  std::vector<std::unique_ptr<WindowAggregationOperationState>> 
window_aggregation_states_;
-
-  DISALLOW_COPY_AND_ASSIGN(QueryContext);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_CONTEXT_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto 
b/query_execution/QueryContext.proto
deleted file mode 100644
index ab0f520..0000000
--- a/query_execution/QueryContext.proto
+++ /dev/null
@@ -1,65 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-syntax = "proto2";
-
-package quickstep.serialization;
-
-import "expressions/Expressions.proto";
-import "expressions/table_generator/GeneratorFunction.proto";
-import "storage/AggregationOperationState.proto";
-import "storage/HashTable.proto";
-import "storage/InsertDestination.proto";
-import "storage/WindowAggregationOperationState.proto";
-import "types/containers/Tuple.proto";
-import "utility/SortConfiguration.proto";
-import "utility/lip_filter/LIPFilter.proto";
-
-message QueryContext {
-  message ScalarGroup {
-    repeated Scalar scalars = 1;
-  }
-
-  message UpdateGroup {
-    message UpdateAssignment {
-      required int32 attribute_id = 1;
-      required Scalar scalar = 2;
-    }
-
-    // NOTE(zuyu): Only used for validating UpdateAssignment's attribute_id.
-    required int32 relation_id = 1;
-    repeated UpdateAssignment update_assignments = 2;
-  }
-
-  repeated AggregationOperationState aggregation_states = 1;
-  repeated GeneratorFunctionHandle generator_functions = 2;
-  repeated HashTable join_hash_tables = 3;
-  repeated InsertDestination insert_destinations = 4;
-  repeated LIPFilter lip_filters = 5;
-  repeated LIPFilterDeployment lip_filter_deployments = 6;
-  repeated Predicate predicates = 7;
-  repeated ScalarGroup scalar_groups = 8;
-  repeated SortConfiguration sort_configs = 9;
-  repeated Tuple tuples = 10;
-
-  // NOTE(zuyu): For UpdateWorkOrder only.
-  repeated UpdateGroup update_groups = 11;
-
-  repeated WindowAggregationOperationState window_aggregation_states = 12;
-
-  required uint64 query_id = 13;
-}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto 
b/query_execution/QueryExecutionMessages.proto
deleted file mode 100644
index e6d741a..0000000
--- a/query_execution/QueryExecutionMessages.proto
+++ /dev/null
@@ -1,162 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-syntax = "proto2";
-
-package quickstep.serialization;
-
-import "catalog/Catalog.proto";
-import "query_execution/QueryContext.proto";
-import "relational_operators/WorkOrder.proto";
-
-// Note: There are different types of completion messages for normal work 
orders
-// rebuild work orders. This can be potentially helpful when we want to collect
-// different statistics for executing different types of work orders.
-// e.g. In select normal work order completion message, we could be interested
-// in the selectivity of the block whose work order got execute. In rebuild 
work
-// order completion message, we may be interested in adding the compression
-// ratio or dictionary size of the rebuilt block.
-
-message WorkOrderCompletionMessage {
-  enum WorkOrderType {
-    NORMAL = 0;
-    REBUILD = 1;
-  }
-
-  required WorkOrderType work_order_type = 1;
-
-  required uint64 operator_index = 2;
-  required uint64 worker_thread_index = 3;
-  required uint64 query_id = 4;
-
-  // Epoch time in microseconds.
-  optional uint64 execution_start_time = 5;
-  optional uint64 execution_end_time = 6;
-
-  // Required in the distributed version.
-  optional uint64 shiftboss_index = 7;
-}
-
-message CatalogRelationNewBlockMessage {
-  required int32 relation_id = 1;
-  required fixed64 block_id = 2;
-
-  // Used by PartitionAwareInsertDestination.
-  optional uint64 partition_id = 3;
-  required uint64 query_id = 4;
-}
-
-message DataPipelineMessage {
-  required uint64 operator_index = 1;
-  required fixed64 block_id = 2;
-  required int32 relation_id = 3;
-  required uint64 query_id = 4;
-}
-
-// Distributed version related messages.
-message ShiftbossRegistrationMessage {
-  // The total Work Order processing capacity in Shiftboss, which equals to the
-  // sum of the capacity of each worker managed by Shiftboss.
-  required uint64 work_order_capacity = 1;
-}
-
-message ShiftbossRegistrationResponseMessage {
-  required uint64 shiftboss_index = 1;
-}
-
-message QueryInitiateMessage {
-  required uint64 query_id = 1;
-  required CatalogDatabase catalog_database_cache = 2;
-  required QueryContext query_context = 3;
-}
-
-message QueryInitiateResponseMessage {
-  required uint64 query_id = 1;
-}
-
-message WorkOrderMessage {
-  required uint64 query_id = 1;
-  required uint64 operator_index = 2;
-  required WorkOrder work_order = 3;
-}
-
-message InitiateRebuildMessage {
-  required uint64 query_id = 1;
-  required uint64 operator_index = 2;
-  required uint64 insert_destination_index = 3;
-  required  int32 relation_id = 4;
-}
-
-message InitiateRebuildResponseMessage {
-  required uint64 query_id = 1;
-  required uint64 operator_index = 2;
-  required uint64 num_rebuild_work_orders = 3;
-  required uint64 shiftboss_index = 4;
-}
-
-message QueryTeardownMessage {
-  required uint64 query_id = 1;
-}
-
-message SaveQueryResultMessage {
-  required uint64 query_id = 1;
-  required int32 relation_id = 2;
-  repeated fixed64 blocks = 3 [packed=true];
-
-  required uint32 cli_id = 4;  // tmb::client_id.
-}
-
-message SaveQueryResultResponseMessage {
-  required uint64 query_id = 1;
-  required int32 relation_id = 2;
-  required uint32 cli_id = 3;  // tmb::client_id.
-  required uint64 shiftboss_index = 4;
-}
-
-message QueryExecutionSuccessMessage {
-  optional CatalogRelationSchema result_relation = 1;
-}
-
-// BlockLocator related messages.
-message BlockDomainRegistrationMessage {
-  // Format IP:Port, i.e., "0.0.0.0:0".
-  required string domain_network_address = 1;
-}
-
-// Used for RegistrationResponse, Unregistration, and FailureReport.
-message BlockDomainMessage {
-  required uint32 block_domain = 1;
-}
-
-// Used when StorageManager loads or evicts a block or a blob from its buffer
-// pool.
-message BlockLocationMessage {
-  required fixed64 block_id = 1;
-  required uint32 block_domain = 2;
-}
-
-message BlockMessage {
-  required fixed64 block_id = 1;
-}
-
-message LocateBlockResponseMessage {
-  repeated uint32 block_domains = 1;
-}
-
-message GetPeerDomainNetworkAddressesResponseMessage {
-  repeated string domain_network_addresses = 1;
-}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryExecutionModule.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionModule.hpp 
b/query_execution/QueryExecutionModule.hpp
deleted file mode 100644
index 89979f1..0000000
--- a/query_execution/QueryExecutionModule.hpp
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-/** @defgroup QueryExecution
- *
- * The components of query execution including Workers, Foreman (co-ordinator),
- * message classes for queues and for inter thread communication.
-**/

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryExecutionState.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionState.hpp 
b/query_execution/QueryExecutionState.hpp
deleted file mode 100644
index f5281d5..0000000
--- a/query_execution/QueryExecutionState.hpp
+++ /dev/null
@@ -1,329 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_STATE_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_STATE_HPP_
-
-#include <cstddef>
-#include <unordered_map>
-#include <utility>
-#include <vector>
-
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A class that tracks the state of the execution of a query which
- *        includes status of operators, number of dispatched work orders etc.
- **/
-class QueryExecutionState {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param num_operators Number of relational operators in the query.
-   **/
-  explicit QueryExecutionState(const std::size_t num_operators)
-      : num_operators_(num_operators),
-        num_operators_finished_(0),
-        queued_workorders_per_op_(num_operators, 0),
-        rebuild_required_(num_operators, false),
-        done_gen_(num_operators, false),
-        execution_finished_(num_operators, false) {}
-
-  /**
-   * @brief Get the number of operators in the query.
-   **/
-  inline const std::size_t getNumOperators() const {
-    return num_operators_;
-  }
-
-  /**
-   * @brief Get the number of operators who have finished their execution.
-   **/
-  inline const std::size_t getNumOperatorsFinished() const {
-    return num_operators_finished_;
-  }
-
-  /**
-   * @brief Check if the query has finished its execution.
-   *
-   * @return True if the query has finished its execution, false otherwise.
-   **/
-  inline bool hasQueryExecutionFinished() const {
-    return num_operators_finished_ == num_operators_;
-  }
-
-  /**
-   * @brief Set the rebuild status of the given operator that includes the
-   *        flag for whether the rebuild has been initiated and if so, the
-   *        number of pending rebuild work orders.
-   *
-   * @param operator_index The index of the given operator.
-   * @param num_rebuild_workorders The number of rebuild workorders of the 
given
-   *        operator.
-   * @param rebuild_initiated True if the rebuild has been initiated, false
-   *        otherwise.
-   **/
-  inline void setRebuildStatus(const std::size_t operator_index,
-                               const std::size_t num_rebuild_workorders,
-                               const bool rebuild_initiated) {
-    DCHECK(operator_index < num_operators_);
-    auto search_res = rebuild_status_.find(operator_index);
-    if (search_res != rebuild_status_.end()) {
-      search_res->second.has_initiated = rebuild_initiated;
-      search_res->second.num_pending_workorders = num_rebuild_workorders;
-    } else {
-      RebuildStatus rebuild_status(rebuild_initiated, num_rebuild_workorders);
-
-      rebuild_status_.emplace(operator_index, std::move(rebuild_status));
-    }
-  }
-
-  /**
-   * @brief Check if the rebuild has been initiated for the given operator.
-   *
-   * @param operator_index The index of the given operator.
-   *
-   * @return True if the rebuild has been initiated, false otherwise.
-   **/
-  inline bool hasRebuildInitiated(const std::size_t operator_index) const {
-    DCHECK(operator_index < num_operators_);
-    const auto search_res = rebuild_status_.find(operator_index);
-    if (search_res != rebuild_status_.end()) {
-      return search_res->second.has_initiated;
-    }
-    return false;
-  }
-
-  /**
-   * @brief Get the number of pending rebuild workorders for the given 
operator.
-   *
-   * @param operator_index The index of the given operator.
-   *
-   * @return The number of pending rebuild workorders for the given operator.
-   **/
-  inline const std::size_t getNumRebuildWorkOrders(
-      const std::size_t operator_index) const {
-    DCHECK(operator_index < num_operators_);
-    const auto search_res = rebuild_status_.find(operator_index);
-    if (search_res != rebuild_status_.end()) {
-      return search_res->second.num_pending_workorders;
-    }
-    LOG(WARNING) << "Called QueryExecutionState::getNumRebuildWorkOrders() "
-                    "for an operator whose rebuild entry doesn't exist.";
-    return 0;
-  }
-
-  /**
-   * @brief Increment the number of rebuild WorkOrders for the given operator.
-   *
-   * @param operator_index The index of the given operator.
-   * @param num_rebuild_workorders The number of rebuild workorders of the 
given
-   *        operator.
-   **/
-  inline void incrementNumRebuildWorkOrders(const std::size_t operator_index,
-                                            const std::size_t 
num_rebuild_workorders) {
-    DCHECK_LT(operator_index, num_operators_);
-    auto search_res = rebuild_status_.find(operator_index);
-    DCHECK(search_res != rebuild_status_.end())
-        << "Called for an operator whose rebuild status does not exist.";
-    DCHECK(search_res->second.has_initiated);
-
-    search_res->second.num_pending_workorders += num_rebuild_workorders;
-  }
-
-  /**
-   * @brief Decrement the number of rebuild WorkOrders for the given operator.
-   *
-   * @param operator_index The index of the given operator.
-   **/
-  inline void decrementNumRebuildWorkOrders(const std::size_t operator_index) {
-    DCHECK(operator_index < num_operators_);
-    auto search_res = rebuild_status_.find(operator_index);
-    CHECK(search_res != rebuild_status_.end())
-        << "Called QueryExecutionState::decrementNumRebuildWorkOrders() for an 
"
-           "operator whose rebuild entry doesn't exist.";
-
-    DCHECK(search_res->second.has_initiated);
-    DCHECK_GE(search_res->second.num_pending_workorders, 1u);
-
-    --(search_res->second.num_pending_workorders);
-  }
-
-  /**
-   * @brief Increment the number of queued (normal) WorkOrders for the given
-   *        operator.
-   *
-   * @param operator_index The index of the given operator.
-   **/
-  inline void incrementNumQueuedWorkOrders(const std::size_t operator_index) {
-    DCHECK(operator_index < num_operators_);
-    ++queued_workorders_per_op_[operator_index];
-  }
-
-  /**
-   * @brief Decrement the number of queued (normal) WorkOrders for the given
-   *        operator.
-   *
-   * @param operator_index The index of the given operator.
-   **/
-  inline void decrementNumQueuedWorkOrders(const std::size_t operator_index) {
-    DCHECK(operator_index < num_operators_);
-    DCHECK_GT(queued_workorders_per_op_[operator_index], 0u);
-    --queued_workorders_per_op_[operator_index];
-  }
-
-  /**
-   * @brief Get the number of queued (normal) WorkOrders for the given 
operator.
-   *
-   * @note Queued WorkOrders mean those WorkOrders which have been dispatched
-   *       for execution by the Foreman and haven't yet completed. These are
-   *       different from pending WorkOrders which mean the WorkOrders that
-   *       haven't been dispatched for execution yet.
-   *
-   * @param operator_index The index of the given operator.
-   *
-   * @return The number of queued (normal) WorkOrders for the given operators.
-   **/
-  inline const std::size_t getNumQueuedWorkOrders(
-      const std::size_t operator_index) const {
-    DCHECK(operator_index < num_operators_);
-    return queued_workorders_per_op_[operator_index];
-  }
-
-  /**
-   * @brief Set the rebuild required flag as true for the given operator.
-   *
-   * @param operator_index The index of the given operator.
-   **/
-  inline void setRebuildRequired(const std::size_t operator_index) {
-    DCHECK(operator_index < num_operators_);
-    rebuild_required_[operator_index] = true;
-  }
-
-  /**
-   * @brief Get the rebuild required flag for the given operator.
-   *
-   * @param operator_index The index of the given operator.
-   **/
-  inline bool isRebuildRequired(const std::size_t operator_index) const {
-    DCHECK(operator_index < num_operators_);
-    return rebuild_required_[operator_index];
-  }
-
-  /**
-   * @brief Set the execution finished flag for the given operator as true.
-   *
-   * @note By default this flag is false.
-   *
-   * @param operator_index The index of the given operator.
-   **/
-  inline void setExecutionFinished(const std::size_t operator_index) {
-    DCHECK(operator_index < num_operators_);
-    execution_finished_[operator_index] = true;
-    ++num_operators_finished_;
-  }
-
-  /**
-   * @brief Get the execution finished flag for the given operator.
-   *
-   * @param operator_index The index of the given operator.
-   **/
-  inline bool hasExecutionFinished(const std::size_t operator_index) const {
-    DCHECK(operator_index < num_operators_);
-    return execution_finished_[operator_index];
-  }
-
-  /**
-   * @brief Set the "done generation of workorders" flag as true for the given
-   *        operator.
-   *
-   * @note By default this flag is false.
-   *
-   * @param operator_index The index of the given operator.
-   **/
-  inline void setDoneGenerationWorkOrders(const std::size_t operator_index) {
-    DCHECK(operator_index < num_operators_);
-    done_gen_[operator_index] = true;
-  }
-
-  /**
-   * @brief Get the "done generation of workorders" flag for the given 
operator.
-   *
-   * @param operator_index The index of the given operator.
-   **/
-  inline bool hasDoneGenerationWorkOrders(const std::size_t operator_index)
-      const {
-    DCHECK(operator_index < num_operators_);
-    return done_gen_[operator_index];
-  }
-
- private:
-  // Total number of operators in the query.
-  const std::size_t num_operators_;
-
-  // Number of operators who've finished their execution.
-  std::size_t num_operators_finished_;
-
-  // A vector to track the number of normal WorkOrders in execution.
-  std::vector<std::size_t> queued_workorders_per_op_;
-
-  // The ith bit denotes if the operator with ID = i requires generation of
-  // rebuild WorkOrders.
-  std::vector<bool> rebuild_required_;
-
-  // The ith bit denotes if the operator with ID = i has finished generating
-  // work orders.
-  std::vector<bool> done_gen_;
-
-  // The ith bit denotes if the operator with ID = i has finished its 
execution.
-  std::vector<bool> execution_finished_;
-
-  struct RebuildStatus {
-    RebuildStatus(const bool initiated,
-                  const std::size_t num_workorders)
-        : has_initiated(initiated),
-          num_pending_workorders(num_workorders) {}
-
-    // Whether rebuild for operator at index i has been initiated.
-    bool has_initiated;
-    // The number of pending rebuild workorders for the operator.
-    // Valid if and only if 'has_initiated' is true.
-    std::size_t num_pending_workorders;
-  };
-
-  // Key is dag_node_index for which rebuild is required.
-  std::unordered_map<std::size_t, RebuildStatus> rebuild_status_;
-
-  DISALLOW_COPY_AND_ASSIGN(QueryExecutionState);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_STATE_HPP_


Reply via email to