Repository: incubator-quickstep Updated Branches: refs/heads/master b237969cb -> 3595bc1fd
Fix number of work orders generated for insert multiple tuples. (Also added unit tests) Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3595bc1f Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3595bc1f Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3595bc1f Branch: refs/heads/master Commit: 3595bc1fdd55bf3979b5a7e98e2263f5bf420406 Parents: b237969 Author: Robert Claus <[email protected]> Authored: Fri Nov 3 15:11:11 2017 -0500 Committer: Robert Claus <[email protected]> Committed: Mon Nov 20 14:30:17 2017 -0600 ---------------------------------------------------------------------- query_execution/QueryContext.hpp | 16 +++ query_optimizer/ExecutionGenerator.cpp | 105 ++++++++++--------- .../tests/execution_generator/Insert.test | 15 +++ query_optimizer/tests/resolver/Insert.test | 29 +++++ relational_operators/InsertOperator.cpp | 18 +++- relational_operators/InsertOperator.hpp | 13 +-- relational_operators/WorkOrder.proto | 2 +- relational_operators/WorkOrderFactory.cpp | 28 ++++- 8 files changed, 160 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3595bc1f/query_execution/QueryContext.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp index 7876821..e65f096 100644 --- a/query_execution/QueryContext.hpp +++ b/query_execution/QueryContext.hpp @@ -489,6 +489,22 @@ class QueryContext { } /** + * @brief Whether the given vector of Tuple ids is valid. + * + * @param ids The vector of Tuple ids. + * + * @return True if valid, otherwise false. + **/ + bool areValidTupleIds(const std::vector<tuple_id> &ids) const { + for (const tuple_id id : ids) { + if (id >= tuples_.size()) { + return false; + } + } + return true; + } + + /** * @brief Release the ownership of the Tuple referenced by the id. * * @note Each id should use only once. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3595bc1f/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 14d8949..b0d3c48 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -1461,72 +1461,75 @@ void ExecutionGenerator::convertInsertTuple( *catalog_database_->getRelationById( input_relation_info->relation->getID()); + + // Construct the tuple proto to be inserted. + std::vector<QueryContext::tuple_id> tuple_indexes; + for (const std::vector<expressions::ScalarLiteralPtr> &tuple : physical_plan->column_values()) { - // Construct the tuple proto to be inserted. const QueryContext::tuple_id tuple_index = query_context_proto_->tuples_size(); - S::Tuple *tuple_proto = query_context_proto_->add_tuples(); for (const E::ScalarLiteralPtr &literal : tuple) { tuple_proto->add_attribute_values()->CopyFrom(literal->value().getProto()); } + tuple_indexes.push_back(tuple_index); + } - // FIXME(qzeng): A better way is using a traits struct to look up whether a storage - // block supports ad-hoc insertion instead of hard-coding the block types. - const StorageBlockLayout &storage_block_layout = - input_relation.getDefaultStorageBlockLayout(); - if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() == - TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE || - storage_block_layout.getDescription().tuple_store_description().sub_block_type() == - TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) { - THROW_SQL_ERROR() << "INSERT statement is not supported for the relation " - << input_relation.getName() - << ", because its storage blocks do not support ad-hoc insertion"; - } + // FIXME(qzeng): A better way is using a traits struct to look up whether a storage + // block supports ad-hoc insertion instead of hard-coding the block types. + const StorageBlockLayout &storage_block_layout = + input_relation.getDefaultStorageBlockLayout(); + if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() == + TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE || + storage_block_layout.getDescription().tuple_store_description().sub_block_type() == + TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) { + THROW_SQL_ERROR() << "INSERT statement is not supported for the relation " + << input_relation.getName() + << ", because its storage blocks do not support ad-hoc insertion"; + } - // Create InsertDestination proto. - const QueryContext::insert_destination_id insert_destination_index = - query_context_proto_->insert_destinations_size(); - S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations(); + // Create InsertDestination proto. + const QueryContext::insert_destination_id insert_destination_index = + query_context_proto_->insert_destinations_size(); + S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations(); - insert_destination_proto->set_relation_id(input_relation.getID()); - insert_destination_proto->mutable_layout()->MergeFrom( - input_relation.getDefaultStorageBlockLayout().getDescription()); + insert_destination_proto->set_relation_id(input_relation.getID()); + insert_destination_proto->mutable_layout()->MergeFrom( + input_relation.getDefaultStorageBlockLayout().getDescription()); - if (input_relation.hasPartitionScheme()) { - insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE); - insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme) - ->MergeFrom(input_relation.getPartitionScheme()->getProto()); - } else { - insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL); + if (input_relation.hasPartitionScheme()) { + insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE); + insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme) + ->MergeFrom(input_relation.getPartitionScheme()->getProto()); + } else { + insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL); - const vector<block_id> blocks(input_relation.getBlocksSnapshot()); - for (const block_id block : blocks) { - insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block); - } + const vector<block_id> blocks(input_relation.getBlocksSnapshot()); + for (const block_id block : blocks) { + insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block); } + } - const QueryPlan::DAGNodeIndex insert_operator_index = - execution_plan_->addRelationalOperator( - new InsertOperator(query_handle_->query_id(), - input_relation, - insert_destination_index, - tuple_index)); - insert_destination_proto->set_relational_op_index(insert_operator_index); + const QueryPlan::DAGNodeIndex insert_operator_index = + execution_plan_->addRelationalOperator( + new InsertOperator(query_handle_->query_id(), + input_relation, + insert_destination_index, + tuple_indexes)); + insert_destination_proto->set_relational_op_index(insert_operator_index); - CatalogRelation *mutable_relation = - catalog_database_->getRelationByIdMutable(input_relation.getID()); - const QueryPlan::DAGNodeIndex save_blocks_index = - execution_plan_->addRelationalOperator( - new SaveBlocksOperator(query_handle_->query_id(), mutable_relation)); - if (!input_relation_info->isStoredRelation()) { - execution_plan_->addDirectDependency(insert_operator_index, - input_relation_info->producer_operator_index, - true /* is_pipeline_breaker */); - } - execution_plan_->addDirectDependency(save_blocks_index, - insert_operator_index, - false /* is_pipeline_breaker */); + CatalogRelation *mutable_relation = + catalog_database_->getRelationByIdMutable(input_relation.getID()); + const QueryPlan::DAGNodeIndex save_blocks_index = + execution_plan_->addRelationalOperator( + new SaveBlocksOperator(query_handle_->query_id(), mutable_relation)); + if (!input_relation_info->isStoredRelation()) { + execution_plan_->addDirectDependency(insert_operator_index, + input_relation_info->producer_operator_index, + true /* is_pipeline_breaker */); } + execution_plan_->addDirectDependency(save_blocks_index, + insert_operator_index, + false /* is_pipeline_breaker */); } void ExecutionGenerator::convertInsertSelection( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3595bc1f/query_optimizer/tests/execution_generator/Insert.test ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/execution_generator/Insert.test b/query_optimizer/tests/execution_generator/Insert.test index 1be7be9..8131fb2 100644 --- a/query_optimizer/tests/execution_generator/Insert.test +++ b/query_optimizer/tests/execution_generator/Insert.test @@ -132,3 +132,18 @@ SELECT * FROM bar4; | 5| 2016-01-01T00:00:00| NULL| abc| +-----------+-----------------------------------------+------------------------+--------------------------------+ == + +CREATE TABLE bar5 (x INT NULL, y INT); + +INSERT INTO bar5 VALUES (1,2),(3,4),(5,6); + +SELECT * FROM bar5; +-- ++-----------+-----------+ +|x |y | ++-----------+-----------+ +| 1| 2| +| 3| 4| +| 5| 6| ++-----------+-----------+ +== http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3595bc1f/query_optimizer/tests/resolver/Insert.test ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/resolver/Insert.test b/query_optimizer/tests/resolver/Insert.test index 88fff53..f21ce5c 100644 --- a/query_optimizer/tests/resolver/Insert.test +++ b/query_optimizer/tests/resolver/Insert.test @@ -153,3 +153,32 @@ insert into undefined_table values (1, 2) ERROR: Unrecognized relation undefined_table (1 : 13) insert into undefined_table values (1, 2) ^ +== + +insert into test values (null, 1, 2, 3, 'foo', 'foo'),(null, 4, 5, 6, 'foo', 'foo'); +-- +TopLevelPlan ++-plan=InsertTuple +| +-input=TableReference[relation_name=Test,relation_alias=test] +| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL] +| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long] +| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float] +| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL] +| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)] +| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL] +| +-column_values= +| | +-Literal[value=NULL,type=Int NULL] +| | +-Literal[value=1,type=Long] +| | +-Literal[value=2,type=Float] +| | +-Literal[value=3,type=Double NULL] +| | +-Literal[value=foo,type=Char(20)] +| | +-Literal[value=foo,type=VarChar(20) NULL] +| +-column_values= +| +-Literal[value=NULL,type=Int NULL] +| +-Literal[value=4,type=Long] +| +-Literal[value=5,type=Float] +| +-Literal[value=6,type=Double NULL] +| +-Literal[value=foo,type=Char(20)] +| +-Literal[value=foo,type=VarChar(20) NULL] ++-output_attributes= + +-[] http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3595bc1f/relational_operators/InsertOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/InsertOperator.cpp b/relational_operators/InsertOperator.cpp index fbd3a07..b8c9f07 100644 --- a/relational_operators/InsertOperator.cpp +++ b/relational_operators/InsertOperator.cpp @@ -20,6 +20,7 @@ #include "relational_operators/InsertOperator.hpp" #include <memory> +#include <vector> #include "query_execution/QueryContext.hpp" #include "query_execution/WorkOrderProtosContainer.hpp" @@ -43,12 +44,19 @@ bool InsertOperator::getAllWorkOrders( return true; } + std::vector<std::unique_ptr<Tuple>> tuples; + + for (const QueryContext::tuple_id tuple_index : tuple_indexes_) { + std::unique_ptr<Tuple> newTuple(query_context->releaseTuple(tuple_index)); + tuples.push_back(std::move(newTuple)); + } + DCHECK(query_context != nullptr); container->addNormalWorkOrder( new InsertWorkOrder( query_id_, query_context->getInsertDestination(output_destination_index_), - query_context->releaseTuple(tuple_index_)), + std::move(tuples)), op_index_); work_generated_ = true; @@ -64,7 +72,9 @@ bool InsertOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) proto->set_work_order_type(serialization::INSERT); proto->set_query_id(query_id_); proto->SetExtension(serialization::InsertWorkOrder::insert_destination_index, output_destination_index_); - proto->SetExtension(serialization::InsertWorkOrder::tuple_index, tuple_index_); + for (const QueryContext::tuple_id tuple_index : tuple_indexes_) { + proto->AddExtension(serialization::InsertWorkOrder::tuple_indexes, tuple_index); + } container->addWorkOrderProto(proto, op_index_); @@ -74,7 +84,9 @@ bool InsertOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) void InsertWorkOrder::execute() { - output_destination_->insertTuple(*tuple_); + for (const auto &tuple : tuples_) { + output_destination_->insertTuple(*tuple); + } } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3595bc1f/relational_operators/InsertOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/InsertOperator.hpp b/relational_operators/InsertOperator.hpp index b103538..3865a7f 100644 --- a/relational_operators/InsertOperator.hpp +++ b/relational_operators/InsertOperator.hpp @@ -23,6 +23,7 @@ #include <cstddef> #include <string> #include <memory> +#include <vector> #include "catalog/CatalogRelation.hpp" #include "catalog/CatalogTypedefs.hpp" @@ -67,11 +68,11 @@ class InsertOperator : public RelationalOperator { const std::size_t query_id, const CatalogRelation &output_relation, const QueryContext::insert_destination_id output_destination_index, - const QueryContext::tuple_id tuple_index) + const std::vector<QueryContext::tuple_id> &tuple_indexes) : RelationalOperator(query_id, 1u, false, output_relation.getNumPartitions()), output_relation_(output_relation), output_destination_index_(output_destination_index), - tuple_index_(tuple_index), + tuple_indexes_(tuple_indexes), work_generated_(false) {} ~InsertOperator() override {} @@ -103,7 +104,7 @@ class InsertOperator : public RelationalOperator { private: const CatalogRelation &output_relation_; const QueryContext::insert_destination_id output_destination_index_; - const QueryContext::tuple_id tuple_index_; + const std::vector<QueryContext::tuple_id> tuple_indexes_; bool work_generated_; DISALLOW_COPY_AND_ASSIGN(InsertOperator); @@ -125,10 +126,10 @@ class InsertWorkOrder : public WorkOrder { **/ InsertWorkOrder(const std::size_t query_id, InsertDestination *output_destination, - Tuple *tuple) + std::vector<std::unique_ptr<Tuple>> &&tuples) : WorkOrder(query_id), output_destination_(DCHECK_NOTNULL(output_destination)), - tuple_(DCHECK_NOTNULL(tuple)) {} + tuples_(std::move(tuples)) {} ~InsertWorkOrder() override {} @@ -140,7 +141,7 @@ class InsertWorkOrder : public WorkOrder { private: InsertDestination *output_destination_; - std::unique_ptr<Tuple> tuple_; + std::vector<std::unique_ptr<Tuple>> tuples_; DISALLOW_COPY_AND_ASSIGN(InsertWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3595bc1f/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index aaf7929..b84e758 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -201,7 +201,7 @@ message InsertWorkOrder { extend WorkOrder { // All required. optional int32 insert_destination_index = 176; - optional uint32 tuple_index = 177; + repeated uint32 tuple_indexes = 177; } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3595bc1f/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index 3a991bd..7f11e3e 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -395,12 +395,22 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder } case serialization::INSERT: { LOG(INFO) << "Creating InsertWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index; + + const int tuple_count = proto.ExtensionSize(serialization::InsertWorkOrder::tuple_indexes); + std::vector<std::unique_ptr<Tuple>> tuple_indexes; + + for (int specific_tuple_index = 0; specific_tuple_index < tuple_count; specific_tuple_index++) { + const int tuple_index = + proto.GetExtension(serialization::InsertWorkOrder::tuple_indexes, specific_tuple_index); + tuple_indexes.emplace_back( + std::unique_ptr<Tuple>(query_context->releaseTuple(tuple_index))); + } + return new InsertWorkOrder( query_id, query_context->getInsertDestination( proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)), - query_context->releaseTuple( - proto.GetExtension(serialization::InsertWorkOrder::tuple_index))); + std::move(tuple_indexes)); } case serialization::NESTED_LOOP_JOIN: { const partition_id part_id = @@ -852,12 +862,20 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, proto.HasExtension(serialization::InitializeAggregationWorkOrder::state_partition_id); } case serialization::INSERT: { + const int tuple_count = proto.ExtensionSize(serialization::InsertWorkOrder::tuple_indexes); + std::vector<QueryContext::tuple_id> tuple_indexes; + + for (int specific_tuple_index = 0; specific_tuple_index < tuple_count; specific_tuple_index++) { + const int tuple_index = + proto.GetExtension(serialization::InsertWorkOrder::tuple_indexes, specific_tuple_index); + tuple_indexes.push_back(tuple_index); + } + return proto.HasExtension(serialization::InsertWorkOrder::insert_destination_index) && query_context.isValidInsertDestinationId( proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)) && - proto.HasExtension(serialization::InsertWorkOrder::tuple_index) && - query_context.isValidTupleId( - proto.GetExtension(serialization::InsertWorkOrder::tuple_index)); + proto.HasExtension(serialization::InsertWorkOrder::tuple_indexes) && + query_context.areValidTupleIds(tuple_indexes); } case serialization::NESTED_LOOP_JOIN: { if (!proto.HasExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id) ||
