Fixed the distributed version due to query execution engine simplification.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e496cb58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e496cb58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e496cb58 Branch: refs/heads/trace Commit: e496cb58e10d32de9dc83d69ece84df3f5b62747 Parents: 0898a77 Author: Zuyu Zhang <[email protected]> Authored: Fri Oct 6 22:33:02 2017 -0500 Committer: Zuyu Zhang <[email protected]> Committed: Fri Oct 6 22:33:02 2017 -0500 ---------------------------------------------------------------------- query_execution/QueryManagerDistributed.cpp | 24 +++++++++--------------- relational_operators/WorkOrderFactory.cpp | 4 ++-- 2 files changed, 11 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e496cb58/query_execution/QueryManagerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp index 30a1396..97b451f 100644 --- a/query_execution/QueryManagerDistributed.cpp +++ b/query_execution/QueryManagerDistributed.cpp @@ -70,8 +70,11 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle, // Collect all the workorders from all the non-blocking relational operators in the DAG. for (const dag_node_index index : non_dependent_operators_) { if (!fetchNormalWorkOrders(index)) { - DCHECK(!checkRebuildRequired(index) || initiateRebuild(index)); - markOperatorFinished(index); + if (checkRebuildRequired(index)) { + initiateRebuild(index); + } else { + markOperatorFinished(index); + } } } @@ -201,21 +204,12 @@ void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_no const std::size_t shiftboss_index) { query_exec_state_->updateRebuildStatus(op_index, num_rebuild_work_orders, shiftboss_index); - if (!query_exec_state_->hasRebuildFinished(op_index, num_shiftbosses_)) { - // Wait for the rebuild work orders to finish. - return; + if (query_exec_state_->hasRebuildFinished(op_index, num_shiftbosses_)) { + // No needs for rebuilds, or the rebuild has finished. + markOperatorFinished(op_index); } - // No needs for rebuilds, or the rebuild has finished. - markOperatorFinished(op_index); - - for (const std::pair<dag_node_index, bool> &dependent_link : - query_dag_->getDependents(op_index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - fetchNormalWorkOrders(dependent_op_index); - } - } + // Wait for the rebuild work orders to finish. } bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e496cb58/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index 5baa21b..25cc81a 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -741,7 +741,7 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, proto.GetExtension(serialization::DeleteWorkOrder::predicate_index)) && proto.HasExtension(serialization::DeleteWorkOrder::block_id) && proto.HasExtension(serialization::DeleteWorkOrder::operator_index) && - proto.GetExtension(serialization::DeleteWorkOrder::partition_id); + proto.HasExtension(serialization::DeleteWorkOrder::partition_id); } case serialization::DESTROY_AGGREGATION_STATE: { return proto.HasExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index) && @@ -1033,7 +1033,7 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, proto.GetExtension(serialization::UpdateWorkOrder::update_group_index)) && proto.HasExtension(serialization::UpdateWorkOrder::operator_index) && proto.HasExtension(serialization::UpdateWorkOrder::block_id) && - proto.GetExtension(serialization::UpdateWorkOrder::partition_id); + proto.HasExtension(serialization::UpdateWorkOrder::partition_id); } case serialization::WINDOW_AGGREGATION: { return proto.HasExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index) &&
