Added the query print support in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/68fc7456 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/68fc7456 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/68fc7456 Branch: refs/heads/LIP-time-decomposition Commit: 68fc745648fa51b13631510682054dedc01ab094 Parents: 6909e7c Author: Zuyu Zhang <zu...@apache.org> Authored: Mon Mar 13 13:37:18 2017 -0700 Committer: Zuyu Zhang <zu...@apache.org> Committed: Mon Mar 13 16:41:53 2017 -0700 ---------------------------------------------------------------------- cli/Flags.cpp | 4 ++ cli/Flags.hpp | 2 + cli/QuickstepCli.cpp | 3 -- cli/distributed/Conductor.cpp | 16 +++++-- relational_operators/WorkOrderFactory.cpp | 64 ++++++++++++++++---------- 5 files changed, 59 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68fc7456/cli/Flags.cpp ---------------------------------------------------------------------- diff --git a/cli/Flags.cpp b/cli/Flags.cpp index 74915ae..362eac3 100644 --- a/cli/Flags.cpp +++ b/cli/Flags.cpp @@ -32,6 +32,10 @@ using std::fprintf; namespace quickstep { +DEFINE_bool(print_query, false, + "Print each input query statement. This is useful when running a " + "large number of queries in a batch."); + DEFINE_bool(initialize_db, false, "If true, initialize a database."); static bool ValidateNumWorkers(const char *flagname, int value) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68fc7456/cli/Flags.hpp ---------------------------------------------------------------------- diff --git a/cli/Flags.hpp b/cli/Flags.hpp index a268e39..1ae37c4 100644 --- a/cli/Flags.hpp +++ b/cli/Flags.hpp @@ -33,6 +33,8 @@ namespace quickstep { * single-node and the distributed version. **/ +DECLARE_bool(print_query); + DECLARE_bool(initialize_db); DECLARE_int32(num_workers); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68fc7456/cli/QuickstepCli.cpp ---------------------------------------------------------------------- diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp index 26cb154..c2634bc 100644 --- a/cli/QuickstepCli.cpp +++ b/cli/QuickstepCli.cpp @@ -119,9 +119,6 @@ using tmb::client_id; namespace quickstep { -DEFINE_bool(print_query, false, - "Print each input query statement. This is useful when running a " - "large number of queries in a batch."); DEFINE_string(profile_file_name, "", "If nonempty, enable profiling using GOOGLE CPU Profiler, and write " "its output to the given file name. This flag has no effect if " http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68fc7456/cli/distributed/Conductor.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp index a8408ef..a13ab21 100644 --- a/cli/distributed/Conductor.cpp +++ b/cli/distributed/Conductor.cpp @@ -214,9 +214,11 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm CHECK(MessageBus::SendStatus::kOK == QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message))); } else { - auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(), - sender, - statement.getPriority()); + if (FLAGS_print_query) { + printf("\nQuery %zu: %s\n", query_processor_->query_id(), command_string->c_str()); + } + + auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority()); query_processor_->generateQueryHandle(statement, query_handle.get()); DCHECK(query_handle->getQueryPlanMutable() != nullptr); @@ -299,6 +301,10 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars query_string->append(rel_name); query_string->append("\";"); + if (FLAGS_print_query) { + printf("\nQuery %zu: %s\n", query_processor_->query_id(), query_string->c_str()); + } + parser_wrapper.feedNextBuffer(query_string); const ParseResult parse_result = parser_wrapper.getNextStatement(); DCHECK_EQ(ParseResult::kSuccess, parse_result.condition); @@ -322,6 +328,10 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars query_string->append(rel_name); query_string->append("\";"); + if (FLAGS_print_query) { + printf("\nQuery %zu: %s\n", query_processor_->query_id(), query_string->c_str()); + } + parser_wrapper.feedNextBuffer(query_string); const ParseResult parse_result = parser_wrapper.getNextStatement(); DCHECK_EQ(ParseResult::kSuccess, parse_result.condition); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68fc7456/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index ae57e6f..56f431b 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -84,7 +84,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder switch (proto.work_order_type()) { case serialization::AGGREGATION: { - LOG(INFO) << "Creating AggregationWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating AggregationWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; return new AggregationWorkOrder( proto.query_id(), proto.GetExtension(serialization::AggregationWorkOrder::block_id), @@ -94,7 +95,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context)); } case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: { - LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; return new BuildAggregationExistenceMapWorkOrder( proto.query_id(), @@ -107,7 +109,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder storage_manager); } case serialization::BUILD_LIP_FILTER: { - LOG(INFO) << "Creating BuildLIPFilterWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating BuildLIPFilterWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; const QueryContext::lip_deployment_id lip_deployment_index = proto.GetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index); @@ -124,7 +127,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder CreateLIPFilterBuilderHelper(lip_deployment_index, query_context)); } case serialization::BUILD_HASH: { - LOG(INFO) << "Creating BuildHashWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating BuildHashWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; vector<attribute_id> join_key_attributes; for (int i = 0; i < proto.ExtensionSize(serialization::BuildHashWorkOrder::join_key_attributes); ++i) { join_key_attributes.push_back( @@ -149,7 +152,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index), query_context)); } case serialization::DELETE: { - LOG(INFO) << "Creating DeleteWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating DeleteWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; return new DeleteWorkOrder( proto.query_id(), catalog_database->getRelationSchemaById( @@ -163,7 +166,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder bus); } case serialization::DESTROY_AGGREGATION_STATE: { - LOG(INFO) << "Creating DestroyAggregationStateWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating DestroyAggregationStateWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; return new DestroyAggregationStateWorkOrder( proto.query_id(), proto.GetExtension( @@ -171,7 +175,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder query_context); } case serialization::DESTROY_HASH: { - LOG(INFO) << "Creating DestroyHashWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating DestroyHashWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; return new DestroyHashWorkOrder( proto.query_id(), proto.GetExtension( @@ -181,7 +186,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder query_context); } case serialization::DROP_TABLE: { - LOG(INFO) << "Creating DropTableWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating DropTableWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; vector<block_id> blocks; for (int i = 0; i < proto.ExtensionSize(serialization::DropTableWorkOrder::block_ids); ++i) { blocks.push_back( @@ -198,7 +203,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder catalog_database); } case serialization::FINALIZE_AGGREGATION: { - LOG(INFO) << "Creating FinalizeAggregationWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating FinalizeAggregationWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; // TODO(quickstep-team): Handle inner-table partitioning in the distributed // setting. return new FinalizeAggregationWorkOrder( @@ -258,7 +264,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder switch (hash_join_work_order_type) { case serialization::HashJoinWorkOrder::HASH_ANTI_JOIN: { - LOG(INFO) << "Creating HashAntiJoinWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating HashAntiJoinWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; return new HashAntiJoinWorkOrder( proto.query_id(), build_relation, @@ -275,7 +282,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder lip_filter_adaptive_prober); } case serialization::HashJoinWorkOrder::HASH_INNER_JOIN: { - LOG(INFO) << "Creating HashInnerJoinWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating HashInnerJoinWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; return new HashInnerJoinWorkOrder( proto.query_id(), build_relation, @@ -300,7 +308,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::HashJoinWorkOrder::is_selection_on_build, i)); } - LOG(INFO) << "Creating HashOuterJoinWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating HashOuterJoinWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; return new HashOuterJoinWorkOrder( proto.query_id(), build_relation, @@ -317,7 +326,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder lip_filter_adaptive_prober); } case serialization::HashJoinWorkOrder::HASH_SEMI_JOIN: { - LOG(INFO) << "Creating HashSemiJoinWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating HashSemiJoinWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; return new HashSemiJoinWorkOrder( proto.query_id(), build_relation, @@ -338,7 +348,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder } } case serialization::INSERT: { - LOG(INFO) << "Creating InsertWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating InsertWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; return new InsertWorkOrder( proto.query_id(), query_context->getInsertDestination( @@ -347,7 +357,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::InsertWorkOrder::tuple_index))); } case serialization::NESTED_LOOP_JOIN: { - LOG(INFO) << "Creating NestedLoopsJoinWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating NestedLoopsJoinWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; return new NestedLoopsJoinWorkOrder( proto.query_id(), catalog_database->getRelationSchemaById( @@ -365,7 +376,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder storage_manager); } case serialization::SAMPLE: { - LOG(INFO) << "Creating SampleWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating SampleWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; return new SampleWorkOrder( proto.query_id(), catalog_database->getRelationSchemaById( @@ -378,7 +389,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder storage_manager); } case serialization::SAVE_BLOCKS: { - LOG(INFO) << "Creating SaveBlocksWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating SaveBlocksWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; return new SaveBlocksWorkOrder( proto.query_id(), proto.GetExtension(serialization::SaveBlocksWorkOrder::block_id), @@ -386,7 +398,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder storage_manager); } case serialization::SELECT: { - LOG(INFO) << "Creating SelectWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating SelectWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; const bool simple_projection = proto.GetExtension(serialization::SelectWorkOrder::simple_projection); vector<attribute_id> simple_selection; @@ -414,7 +426,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::SelectWorkOrder::lip_deployment_index), query_context)); } case serialization::SORT_MERGE_RUN: { - LOG(INFO) << "Creating SortMergeRunWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating SortMergeRunWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; vector<merge_run_operator::Run> runs; for (int i = 0; i < proto.ExtensionSize(serialization::SortMergeRunWorkOrder::runs); ++i) { merge_run_operator::Run run; @@ -443,7 +456,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder bus); } case serialization::SORT_RUN_GENERATION: { - LOG(INFO) << "Creating SortRunGenerationWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating SortRunGenerationWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; return new SortRunGenerationWorkOrder( proto.query_id(), catalog_database->getRelationSchemaById( @@ -456,7 +470,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder storage_manager); } case serialization::TABLE_GENERATOR: { - LOG(INFO) << "Creating SortRunGenerationWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating SortRunGenerationWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; return new TableGeneratorWorkOrder( proto.query_id(), query_context->getGeneratorFunctionHandle( @@ -465,7 +480,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index))); } case serialization::TEXT_SCAN: { - LOG(INFO) << "Creating TextScanWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating TextScanWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; return new TextScanWorkOrder( proto.query_id(), proto.GetExtension(serialization::TextScanWorkOrder::filename), @@ -478,7 +493,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder hdfs); } case serialization::UPDATE: { - LOG(INFO) << "Creating UpdateWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating UpdateWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; return new UpdateWorkOrder( proto.query_id(), catalog_database->getRelationSchemaById( @@ -496,7 +511,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder bus); } case serialization::WINDOW_AGGREGATION: { - LOG(INFO) << "Creating WindowAggregationWorkOrder in Shiftboss " << shiftboss_index; + LOG(INFO) << "Creating WindowAggregationWorkOrder for Query " << proto.query_id() + << " in Shiftboss " << shiftboss_index; vector<block_id> blocks; for (int i = 0; i < proto.ExtensionSize(serialization::WindowAggregationWorkOrder::block_ids); ++i) { blocks.push_back(