Removed an unused message type.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/61689962 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/61689962 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/61689962 Branch: refs/heads/partitioned-aggregation Commit: 6168996216af8278d5c789c67aa4ec8325fab483 Parents: 2c0ce6a Author: Zuyu Zhang <zu...@twitter.com> Authored: Mon Aug 8 15:32:34 2016 -0700 Committer: Zuyu Zhang <zu...@twitter.com> Committed: Tue Aug 9 10:44:50 2016 -0700 ---------------------------------------------------------------------- query_execution/ForemanSingleNode.cpp | 4 +--- query_execution/PolicyEnforcerBase.cpp | 13 ------------- query_execution/QueryExecutionMessages.proto | 5 ----- query_execution/QueryExecutionTypedefs.hpp | 1 - query_execution/Shiftboss.cpp | 1 - query_execution/Shiftboss.hpp | 1 - query_execution/Worker.hpp | 1 - 7 files changed, 1 insertion(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/ForemanSingleNode.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp index 23db379..d064a6f 100644 --- a/query_execution/ForemanSingleNode.cpp +++ b/query_execution/ForemanSingleNode.cpp @@ -87,7 +87,6 @@ ForemanSingleNode::ForemanSingleNode( kPoisonMessage, kRebuildWorkOrderCompleteMessage, kWorkOrderFeedbackMessage, - kWorkOrdersAvailableMessage, kWorkOrderCompleteMessage}; for (const auto message_type : receiver_message_types) { @@ -122,8 +121,7 @@ void ForemanSingleNode::run() { case kDataPipelineMessage: case kRebuildWorkOrderCompleteMessage: case kWorkOrderCompleteMessage: - case kWorkOrderFeedbackMessage: - case kWorkOrdersAvailableMessage: { + case kWorkOrderFeedbackMessage: { policy_enforcer_->processMessage(tagged_message); break; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/PolicyEnforcerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp index 78f7b44..4174bd6 100644 --- a/query_execution/PolicyEnforcerBase.cpp +++ b/query_execution/PolicyEnforcerBase.cpp @@ -107,19 +107,6 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) { op_index, proto.block_id(), proto.relation_id()); break; } - case kWorkOrdersAvailableMessage: { - serialization::WorkOrdersAvailableMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), - tagged_message.message_bytes())); - query_id = proto.query_id(); - DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); - - op_index = proto.operator_index(); - - // Check if new work orders are available. - admitted_queries_[query_id]->fetchNormalWorkOrders(op_index); - break; - } case kWorkOrderFeedbackMessage: { WorkOrder::FeedbackMessage msg( const_cast<void *>(tagged_message.message()), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 20b684e..060efa1 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -74,11 +74,6 @@ message DataPipelineMessage { required uint64 query_id = 4; } -message WorkOrdersAvailableMessage { - required uint64 operator_index = 1; - required uint64 query_id = 2; -} - // Distributed version related messages. message ShiftbossRegistrationMessage { // The total Work Order processing capacity in Shiftboss, which equals to the http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index d154d84..33a93b0 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -69,7 +69,6 @@ enum QueryExecutionMessageType : message_type_id { kWorkOrderCompleteMessage, // From Worker to Foreman. kCatalogRelationNewBlockMessage, // From InsertDestination to Foreman. kDataPipelineMessage, // From InsertDestination or some WorkOrders to Foreman. - kWorkOrdersAvailableMessage, // From some WorkOrders to Foreman. kWorkOrderFeedbackMessage, // From some WorkOrders to Foreman on behalf of // their corresponding RelationalOperators. kRebuildWorkOrderMessage, // From Foreman to Worker. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp index 24c91fe..bd83dd4 100644 --- a/query_execution/Shiftboss.cpp +++ b/query_execution/Shiftboss.cpp @@ -152,7 +152,6 @@ void Shiftboss::run() { case kWorkOrderCompleteMessage: // Fall through. case kRebuildWorkOrderCompleteMessage: case kDataPipelineMessage: - case kWorkOrdersAvailableMessage: case kWorkOrderFeedbackMessage: { LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ << "') forwarded typed '" << annotated_message.tagged_message.message_type() http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/Shiftboss.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp index 9464a4d..30a8d1a 100644 --- a/query_execution/Shiftboss.hpp +++ b/query_execution/Shiftboss.hpp @@ -103,7 +103,6 @@ class Shiftboss : public Thread { // Message sent to Foreman. bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage); bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrdersAvailableMessage); bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage); // Forward the following message types from Foreman to Workers. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/Worker.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp index 44a7447..aa39bb3 100644 --- a/query_execution/Worker.hpp +++ b/query_execution/Worker.hpp @@ -68,7 +68,6 @@ class Worker : public Thread { kRebuildWorkOrderCompleteMessage); bus_->RegisterClientAsSender(worker_client_id_, kCatalogRelationNewBlockMessage); bus_->RegisterClientAsSender(worker_client_id_, kDataPipelineMessage); - bus_->RegisterClientAsSender(worker_client_id_, kWorkOrdersAvailableMessage); bus_->RegisterClientAsSender(worker_client_id_, kWorkOrderFeedbackMessage); bus_->RegisterClientAsReceiver(worker_client_id_, kWorkOrderMessage);