Repository: incubator-quickstep Updated Branches: refs/heads/feedInputBlock-part-id [created] b553e0115
Added partition_id in feedInputBlock. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b553e011 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b553e011 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b553e011 Branch: refs/heads/feedInputBlock-part-id Commit: b553e0115fe4130fc54d9419d0a1e6ab3ccfbfc4 Parents: b0e5968 Author: Zuyu Zhang <zu...@apache.org> Authored: Sun Jan 15 19:53:54 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Mon Jan 16 09:34:24 2017 -0800 ---------------------------------------------------------------------- query_execution/PolicyEnforcerBase.cpp | 2 +- query_execution/QueryExecutionMessages.proto | 3 +++ query_execution/QueryManagerBase.cpp | 5 +++-- query_execution/QueryManagerBase.hpp | 5 ++++- query_execution/tests/QueryManagerSingleNode_unittest.cpp | 4 ++-- relational_operators/AggregationOperator.hpp | 3 ++- relational_operators/BuildHashOperator.hpp | 4 ++-- relational_operators/DeleteOperator.hpp | 3 ++- relational_operators/HashJoinOperator.hpp | 4 ++-- relational_operators/NestedLoopsJoinOperator.hpp | 3 ++- relational_operators/RelationalOperator.hpp | 6 +++--- relational_operators/SampleOperator.hpp | 3 ++- relational_operators/SaveBlocksOperator.hpp | 3 ++- relational_operators/SelectOperator.hpp | 5 ++--- relational_operators/SortMergeRunOperator.hpp | 4 ++-- relational_operators/SortRunGenerationOperator.hpp | 3 ++- relational_operators/tests/SortMergeRunOperator_unittest.cpp | 2 +- storage/InsertDestination.cpp | 2 +- storage/InsertDestination.hpp | 4 +++- 19 files changed, 41 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/query_execution/PolicyEnforcerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp index 1a2ab46..a26b84e 100644 --- a/query_execution/PolicyEnforcerBase.cpp +++ b/query_execution/PolicyEnforcerBase.cpp @@ -118,7 +118,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) { op_index = proto.operator_index(); admitted_queries_[query_id]->processDataPipelineMessage( - op_index, proto.block_id(), proto.relation_id()); + op_index, proto.block_id(), proto.relation_id(), proto.partition_id()); break; } case kWorkOrderFeedbackMessage: { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 28b5ebd..115a9a3 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -65,6 +65,9 @@ message DataPipelineMessage { required fixed64 block_id = 2; required int32 relation_id = 3; required uint64 query_id = 4; + + // Used by PartitionAwareInsertDestination. + optional uint64 partition_id = 5 [default = 0]; } // Distributed version related messages. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/query_execution/QueryManagerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp index 8e37da8..5f8c6a3 100644 --- a/query_execution/QueryManagerBase.cpp +++ b/query_execution/QueryManagerBase.cpp @@ -192,13 +192,14 @@ void QueryManagerBase::processOperator(const dag_node_index index, void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index, const block_id block, - const relation_id rel_id) { + const relation_id rel_id, + const partition_id part_id) { for (const dag_node_index consumer_index : output_consumers_[op_index]) { // Feed the streamed block to the consumer. Note that 'output_consumers_' // only contain those dependents of operator with index = op_index which are // eligible to receive streamed input. - query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id); + query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id, part_id); // Because of the streamed input just fed, check if there are any new // WorkOrders available and if so, fetch them. fetchNormalWorkOrders(consumer_index); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/query_execution/QueryManagerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp index ddb76d5..d0bb0ea 100644 --- a/query_execution/QueryManagerBase.hpp +++ b/query_execution/QueryManagerBase.hpp @@ -109,10 +109,13 @@ class QueryManagerBase { * for the pipelining block. * @param block The block id. * @param rel_id The ID of the relation that produced 'block'. + * @param part_id The partition ID of 'block', if any. By default, a block + * blongs to the only partition (aka, no partition). **/ void processDataPipelineMessage(const dag_node_index op_index, const block_id block, - const relation_id rel_id); + const relation_id rel_id, + const partition_id part_id = 0); /** * @brief Fetch all work orders currently available in relational operator and http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/query_execution/tests/QueryManagerSingleNode_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp index 87b8934..28ab388 100644 --- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp +++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp @@ -177,8 +177,8 @@ class MockOperator: public RelationalOperator { return true; } - void feedInputBlock(const block_id input_block_id, - const relation_id input_relation_id) override { + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) override { ++num_calls_feedblock_; MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")"; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/AggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp index ce6015d..cc1009a 100644 --- a/relational_operators/AggregationOperator.hpp +++ b/relational_operators/AggregationOperator.hpp @@ -98,7 +98,8 @@ class AggregationOperator : public RelationalOperator { bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; - void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) override { input_relation_block_ids_.push_back(input_block_id); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/BuildHashOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp index f0f42e3..9d2319a 100644 --- a/relational_operators/BuildHashOperator.hpp +++ b/relational_operators/BuildHashOperator.hpp @@ -114,8 +114,8 @@ class BuildHashOperator : public RelationalOperator { bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; - void feedInputBlock(const block_id input_block_id, - const relation_id input_relation_id) override { + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) override { input_relation_block_ids_.push_back(input_block_id); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/DeleteOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp index 9c3f357..7b69d9c 100644 --- a/relational_operators/DeleteOperator.hpp +++ b/relational_operators/DeleteOperator.hpp @@ -100,7 +100,8 @@ class DeleteOperator : public RelationalOperator { return relation_.getID(); } - void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) override { DCHECK(!relation_is_stored_); relation_block_ids_.push_back(input_block_id); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/HashJoinOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp index 8829d1f..508cd03 100644 --- a/relational_operators/HashJoinOperator.hpp +++ b/relational_operators/HashJoinOperator.hpp @@ -190,8 +190,8 @@ class HashJoinOperator : public RelationalOperator { bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; - void feedInputBlock(const block_id input_block_id, - const relation_id input_relation_id) override { + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) override { DCHECK(input_relation_id == probe_relation_.getID()); probe_relation_block_ids_.push_back(input_block_id); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/NestedLoopsJoinOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/NestedLoopsJoinOperator.hpp b/relational_operators/NestedLoopsJoinOperator.hpp index 951851d..f8eb080 100644 --- a/relational_operators/NestedLoopsJoinOperator.hpp +++ b/relational_operators/NestedLoopsJoinOperator.hpp @@ -141,7 +141,8 @@ class NestedLoopsJoinOperator : public RelationalOperator { } } - void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) override { if (input_relation_id == left_input_relation_.getID()) { left_relation_block_ids_.push_back(input_block_id); } else if (input_relation_id == right_input_relation_.getID()) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/RelationalOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp index a7f4177..fdea307 100644 --- a/relational_operators/RelationalOperator.hpp +++ b/relational_operators/RelationalOperator.hpp @@ -138,11 +138,11 @@ class RelationalOperator { * @brief Receive input blocks for this RelationalOperator. * * @param input_block_id The ID of the input block. - * * @param relation_id The ID of the relation that produced this input_block. + * @param part_id The partition ID of 'input_block_id'. **/ - virtual void feedInputBlock(const block_id input_block_id, - const relation_id input_relation_id) {} + virtual void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) {} /** * @brief Signal the end of feeding of input blocks for this http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/SampleOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SampleOperator.hpp b/relational_operators/SampleOperator.hpp index ccf6595..e56201a 100644 --- a/relational_operators/SampleOperator.hpp +++ b/relational_operators/SampleOperator.hpp @@ -108,7 +108,8 @@ class SampleOperator : public RelationalOperator { bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; - void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) override { input_relation_block_ids_.push_back(input_block_id); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/SaveBlocksOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SaveBlocksOperator.hpp b/relational_operators/SaveBlocksOperator.hpp index 27fd911..cd79733 100644 --- a/relational_operators/SaveBlocksOperator.hpp +++ b/relational_operators/SaveBlocksOperator.hpp @@ -83,7 +83,8 @@ class SaveBlocksOperator : public RelationalOperator { bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; - void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) override { destination_block_ids_.push_back(input_block_id); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/SelectOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp index 5846eda..c8e6162 100644 --- a/relational_operators/SelectOperator.hpp +++ b/relational_operators/SelectOperator.hpp @@ -210,10 +210,9 @@ class SelectOperator : public RelationalOperator { bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; - void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) override { if (input_relation_.hasPartitionScheme()) { - const partition_id part_id = - input_relation_.getPartitionScheme().getPartitionForBlock(input_block_id); input_relation_block_ids_in_partition_[part_id].push_back(input_block_id); } else { input_relation_block_ids_.push_back(input_block_id); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/SortMergeRunOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp index aff7d8d..d2d9a2a 100644 --- a/relational_operators/SortMergeRunOperator.hpp +++ b/relational_operators/SortMergeRunOperator.hpp @@ -144,8 +144,8 @@ class SortMergeRunOperator : public RelationalOperator { bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; - void feedInputBlock(const block_id input_block_id, - const relation_id input_relation_id) override { + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) override { input_relation_block_ids_.push_back(input_block_id); if (started_) { initializeInputRuns(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/SortRunGenerationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SortRunGenerationOperator.hpp b/relational_operators/SortRunGenerationOperator.hpp index a2ffb2b..25a1273 100644 --- a/relational_operators/SortRunGenerationOperator.hpp +++ b/relational_operators/SortRunGenerationOperator.hpp @@ -124,7 +124,8 @@ class SortRunGenerationOperator : public RelationalOperator { bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; - void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) override { DCHECK(input_relation_id == input_relation_.getID()); input_relation_block_ids_.push_back(input_block_id); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/tests/SortMergeRunOperator_unittest.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/tests/SortMergeRunOperator_unittest.cpp b/relational_operators/tests/SortMergeRunOperator_unittest.cpp index 74fecec..7a46e6e 100644 --- a/relational_operators/tests/SortMergeRunOperator_unittest.cpp +++ b/relational_operators/tests/SortMergeRunOperator_unittest.cpp @@ -1602,7 +1602,7 @@ class SortMergeRunOperatorTest : public ::testing::Test { // Feed blocks. DVLOG(1) << "Feeding " << to_feed.size() << " blocks."; for (const block_id block : to_feed) { - merge_op_->feedInputBlock(block, input_table_->getID()); + merge_op_->feedInputBlock(block, input_table_->getID(), 0 /* partition_id */); } // Remove fed blocks. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/storage/InsertDestination.cpp ---------------------------------------------------------------------- diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp index 19bb356..944998f 100644 --- a/storage/InsertDestination.cpp +++ b/storage/InsertDestination.cpp @@ -789,7 +789,7 @@ void PartitionAwareInsertDestination::returnBlockInPartition(MutableBlockReferen << "invalidated one or more IndexSubBlocks."); } // Note that the block will only be sent if it's full (true). - sendBlockFilledMessage(block->getID()); + sendBlockFilledMessage(block->getID(), part_id); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/storage/InsertDestination.hpp ---------------------------------------------------------------------- diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp index 3487638..c3c40bd 100644 --- a/storage/InsertDestination.hpp +++ b/storage/InsertDestination.hpp @@ -216,13 +216,15 @@ class InsertDestination : public InsertDestinationInterface { * scheduler. * * @param id The id of the StorageBlock to be pipelined. + * @param part_id The partition id of Block <id>, if any. **/ - void sendBlockFilledMessage(const block_id id) const { + void sendBlockFilledMessage(const block_id id, const partition_id part_id = 0) const { serialization::DataPipelineMessage proto; proto.set_operator_index(relational_op_index_); proto.set_block_id(id); proto.set_relation_id(relation_.getID()); proto.set_query_id(query_id_); + proto.set_partition_id(part_id); // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. const std::size_t proto_length = proto.ByteSize();