http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/parser/preprocessed/SqlParser_gen.hpp ---------------------------------------------------------------------- diff --git a/parser/preprocessed/SqlParser_gen.hpp b/parser/preprocessed/SqlParser_gen.hpp index 142059d..96c649d 100644 --- a/parser/preprocessed/SqlParser_gen.hpp +++ b/parser/preprocessed/SqlParser_gen.hpp @@ -156,29 +156,30 @@ extern int quickstep_yydebug; TOKEN_SMA = 366, TOKEN_SMALLINT = 367, TOKEN_STDERR = 368, - TOKEN_STDOUT = 369, - TOKEN_SUBSTRING = 370, - TOKEN_TABLE = 371, - TOKEN_THEN = 372, - TOKEN_TIME = 373, - TOKEN_TIMESTAMP = 374, - TOKEN_TO = 375, - TOKEN_TRUE = 376, - TOKEN_TUPLESAMPLE = 377, - TOKEN_UNBOUNDED = 378, - TOKEN_UNIQUE = 379, - TOKEN_UPDATE = 380, - TOKEN_USING = 381, - TOKEN_VALUES = 382, - TOKEN_VARCHAR = 383, - TOKEN_WHEN = 384, - TOKEN_WHERE = 385, - TOKEN_WINDOW = 386, - TOKEN_WITH = 387, - TOKEN_YEAR = 388, - TOKEN_YEARMONTH = 389, - TOKEN_EOF = 390, - TOKEN_LEX_ERROR = 391 + TOKEN_STDIN = 369, + TOKEN_STDOUT = 370, + TOKEN_SUBSTRING = 371, + TOKEN_TABLE = 372, + TOKEN_THEN = 373, + TOKEN_TIME = 374, + TOKEN_TIMESTAMP = 375, + TOKEN_TO = 376, + TOKEN_TRUE = 377, + TOKEN_TUPLESAMPLE = 378, + TOKEN_UNBOUNDED = 379, + TOKEN_UNIQUE = 380, + TOKEN_UPDATE = 381, + TOKEN_USING = 382, + TOKEN_VALUES = 383, + TOKEN_VARCHAR = 384, + TOKEN_WHEN = 385, + TOKEN_WHERE = 386, + TOKEN_WINDOW = 387, + TOKEN_WITH = 388, + TOKEN_YEAR = 389, + TOKEN_YEARMONTH = 390, + TOKEN_EOF = 391, + TOKEN_LEX_ERROR = 392 }; #endif @@ -289,7 +290,7 @@ union YYSTYPE quickstep::ParsePriority *opt_priority_clause_; -#line 293 "SqlParser_gen.hpp" /* yacc.c:1915 */ +#line 294 "SqlParser_gen.hpp" /* yacc.c:1915 */ }; typedef union YYSTYPE YYSTYPE;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 14d8949..3bca344 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -1212,10 +1212,13 @@ void ExecutionGenerator::convertCopyFrom( ->MergeFrom(output_relation->getPartitionScheme()->getProto()); } else { insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL); - - const vector<block_id> blocks(output_relation->getBlocksSnapshot()); - for (const block_id block : blocks) { - insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block); + const StorageBlockLayout &layout = output_relation->getDefaultStorageBlockLayout(); + const auto sub_block_type = layout.getDescription().tuple_store_description().sub_block_type(); + if (sub_block_type != TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE) { + const vector<block_id> blocks(output_relation->getBlocksSnapshot()); + for (const block_id block : blocks) { + insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block); + } } } @@ -1880,7 +1883,6 @@ void ExecutionGenerator::convertAggregate( use_parallel_initialization = true; aggr_state_num_partitions = CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(max_num_groups); - DCHECK(!group_by_aggrs_info.empty()); CalculateCollisionFreeAggregationInfo(max_num_groups, group_by_aggrs_info, aggr_state_proto->mutable_collision_free_vector_info()); } else { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp index e0e3dff..2c84fc5 100644 --- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp +++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp @@ -486,7 +486,7 @@ std::size_t StarSchemaSimpleCostModel::getNumDistinctValues( return stat.getNumDistinctValues(rel_attr_id); } } - return estimateCardinalityForTableReference(table_reference); + return estimateCardinalityForTableReference(table_reference) * 0.5; } bool StarSchemaSimpleCostModel::impliesUniqueAttributes( @@ -520,7 +520,7 @@ bool StarSchemaSimpleCostModel::impliesUniqueAttributes( std::static_pointer_cast<const P::TableReference>(physical_plan); const CatalogRelationStatistics &stat = table_reference->relation()->getStatistics(); - if (stat.hasNumTuples()) { + if (stat.isExact() && stat.hasNumTuples()) { const std::size_t num_tuples = stat.getNumTuples(); for (const auto &attr : attributes) { const attribute_id rel_attr_id = http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/query_optimizer/resolver/Resolver.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp index 0b6dc22..17198c2 100644 --- a/query_optimizer/resolver/Resolver.cpp +++ b/query_optimizer/resolver/Resolver.cpp @@ -1019,10 +1019,13 @@ L::LogicalPtr Resolver::resolveInsertSelection( if (destination_type.equals(selection_type)) { cast_expressions.emplace_back(selection_attributes[aid]); } else { - // TODO(jianqiao): implement Cast operation for non-numeric types. + // TODO(jianqiao): Implement Cast operation for non-numeric types. + // TODO(jianqiao): We temporarily disable the safely-coercible check for + // tricks that work around "argmin". Will switch it back once the "Cast" + // function is supported. if (destination_type.getSuperTypeID() == Type::SuperTypeID::kNumeric && selection_type.getSuperTypeID() == Type::SuperTypeID::kNumeric && - destination_type.isSafelyCoercibleFrom(selection_type)) { + destination_type.isCoercibleFrom(selection_type)) { // Add cast operation const E::AttributeReferencePtr attr = selection_attributes[aid]; const E::ExpressionPtr cast_expr = @@ -1038,7 +1041,7 @@ L::LogicalPtr Resolver::resolveInsertSelection( << insert_statement.relation_name()->value() << "." << destination_attributes[aid]->attribute_name() << " has type " << selection_attributes[aid]->getValueType().getName() - << ", which cannot be safely coerced to the column's type " + << ", which cannot be coerced to the column's type " << destination_attributes[aid]->getValueType().getName(); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/relational_operators/TableExportOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/TableExportOperator.cpp b/relational_operators/TableExportOperator.cpp index f6a73bf..cb90d72 100644 --- a/relational_operators/TableExportOperator.cpp +++ b/relational_operators/TableExportOperator.cpp @@ -120,6 +120,8 @@ void TableExportOperator::receiveFeedbackMessage( } else if (lo_file_name == "$stderr") { file_ = stderr; } else { + DCHECK(!file_name_.empty()); + DCHECK_EQ('@', file_name_.front()); file_ = std::fopen(file_name_.substr(1).c_str(), "wb"); // TODO(quickstep-team): Decent handling of exceptions at query runtime. if (file_ == nullptr) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/relational_operators/TextScanOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp index 66137d8..88b7214 100644 --- a/relational_operators/TextScanOperator.cpp +++ b/relational_operators/TextScanOperator.cpp @@ -57,6 +57,7 @@ #include "types/containers/Tuple.hpp" #include "utility/BulkIoConfiguration.hpp" #include "utility/Glob.hpp" +#include "utility/ScopedBuffer.hpp" #include "gflags/gflags.h" #include "glog/logging.h" @@ -110,19 +111,36 @@ bool TextScanOperator::getAllWorkOrders( const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) { DCHECK(query_context != nullptr); + DCHECK(!file_pattern_.empty()); - const std::vector<std::string> files = utility::file::GlobExpand(file_pattern_); - - CHECK_NE(files.size(), 0u) - << "No files matched '" << file_pattern_ << "'. Exiting."; + if (work_generated_) { + return true; + } InsertDestination *output_destination = query_context->getInsertDestination(output_destination_index_); - if (work_generated_) { + if (file_pattern_ == "$stdin") { + container->addNormalWorkOrder( + new TextScanWorkOrder(query_id_, + file_pattern_, + 0, + -1 /* text_segment_size */, + options_->getDelimiter(), + options_->escapeStrings(), + output_destination), + op_index_); + work_generated_ = true; return true; } + DCHECK_EQ('@', file_pattern_.front()); + const std::vector<std::string> files = + utility::file::GlobExpand(file_pattern_.substr(1)); + + CHECK_NE(files.size(), 0u) + << "No files matched '" << file_pattern_ << "'. Exiting."; + for (const std::string &file : files) { #ifdef QUICKSTEP_HAVE_UNISTD // Check file permissions before trying to open it. @@ -221,6 +239,12 @@ serialization::WorkOrder* TextScanOperator::createWorkOrderProto( } void TextScanWorkOrder::execute() { + DCHECK(!filename_.empty()); + if (filename_ == "$stdin") { + executeInputStream(); + return; + } + const CatalogRelationSchema &relation = output_destination_->getRelation(); std::vector<Tuple> tuples; bool is_faulty; @@ -436,6 +460,76 @@ void TextScanWorkOrder::execute() { output_destination_->bulkInsertTuples(&column_vectors); } +void TextScanWorkOrder::executeInputStream() { + std::string data; + const int len = std::ftell(stdin); + if (len >= 0) { + ScopedBuffer buffer(len, false); + std::rewind(stdin); + std::fread(buffer.get(), 1, len, stdin); + data = std::string(static_cast<char*>(buffer.get()), len); + } else { + std::unique_ptr<std::ostringstream> oss = std::make_unique<std::ostringstream>(); + *oss << std::cin.rdbuf(); + data = oss->str(); + oss.reset(); + } + + if (data.back() != '\n') { + data.push_back('\n'); + } + + const CatalogRelationSchema &relation = output_destination_->getRelation(); + std::vector<Tuple> tuples; + std::vector<TypedValue> row_tuple; + bool is_faulty; + + const char *row_ptr = data.c_str(); + const char *end_ptr = row_ptr + data.length(); + + while (row_ptr < end_ptr) { + if (*row_ptr == '\r' || *row_ptr == '\n') { + // Skip empty lines. + ++row_ptr; + } else { + row_tuple = parseRow(&row_ptr, relation, &is_faulty); + if (is_faulty) { + // Skip faulty rows + LOG(INFO) << "Faulty row found. Hence switching to next row."; + } else { + // Convert vector returned to tuple only when a valid row is encountered. + tuples.emplace_back(Tuple(std::move(row_tuple))); + } + } + } + + // Store the tuples in a ColumnVectorsValueAccessor for bulk insert. + ColumnVectorsValueAccessor column_vectors; + std::size_t attr_id = 0; + for (const auto &attribute : relation) { + const Type &attr_type = attribute.getType(); + if (attr_type.isVariableLength()) { + std::unique_ptr<IndirectColumnVector> column( + new IndirectColumnVector(attr_type, tuples.size())); + for (const auto &tuple : tuples) { + column->appendTypedValue(tuple.getAttributeValue(attr_id)); + } + column_vectors.addColumn(column.release()); + } else { + std::unique_ptr<NativeColumnVector> column( + new NativeColumnVector(attr_type, tuples.size())); + for (const auto &tuple : tuples) { + column->appendTypedValue(tuple.getAttributeValue(attr_id)); + } + column_vectors.addColumn(column.release()); + } + ++attr_id; + } + + // Bulk insert the tuples. + output_destination_->bulkInsertTuples(&column_vectors); +} + std::vector<TypedValue> TextScanWorkOrder::parseRow(const char **row_ptr, const CatalogRelationSchema &relation, bool *is_faulty) const { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/relational_operators/TextScanOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp index 01c559c..30462d7 100644 --- a/relational_operators/TextScanOperator.hpp +++ b/relational_operators/TextScanOperator.hpp @@ -225,6 +225,8 @@ class TextScanWorkOrder : public WorkOrder { void execute() override; private: + void executeInputStream(); + /** * @brief Extract a field string starting at \p *field_ptr. This method also * expands escape sequences if \p process_escape_sequences_ is true. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 73f1983..9d58107 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -393,8 +393,7 @@ bool AggregationOperationState::ProtoIsValid( } const S::CollisionFreeVectorInfo &proto_collision_free_vector_info = proto.collision_free_vector_info(); - if (!proto_collision_free_vector_info.IsInitialized() || - proto_collision_free_vector_info.state_offsets_size() != group_by_expressions_size) { + if (!proto_collision_free_vector_info.IsInitialized()) { return false; } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/utility/ExecutionDAGVisualizer.cpp ---------------------------------------------------------------------- diff --git a/utility/ExecutionDAGVisualizer.cpp b/utility/ExecutionDAGVisualizer.cpp index 8059ef3..f009a72 100644 --- a/utility/ExecutionDAGVisualizer.cpp +++ b/utility/ExecutionDAGVisualizer.cpp @@ -234,9 +234,6 @@ void ExecutionDAGVisualizer::bindProfilingStats( std::max(time_end[relop_index], workorder_end_time); time_elapsed[relop_index] += (workorder_end_time - workorder_start_time); - if (workorders_count.find(relop_index) == workorders_count.end()) { - workorders_count[relop_index] = 0; - } ++workorders_count[relop_index]; if (mean_time_per_workorder.find(relop_index) == mean_time_per_workorder.end()) { @@ -292,8 +289,7 @@ void ExecutionDAGVisualizer::bindProfilingStats( node_info.labels.emplace_back( "effective concurrency: " + FormatDigits(concurrency, 2)); - DCHECK(workorders_count.find(node_index) != workorders_count.end()); - const std::size_t workorders_count_for_node = workorders_count.at(node_index); + const std::size_t workorders_count_for_node = workorders_count[node_index]; if (workorders_count_for_node > 0) { mean_time_per_workorder[node_index] = mean_time_per_workorder[node_index] /
