http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/parser/preprocessed/SqlParser_gen.hpp ---------------------------------------------------------------------- diff --git a/parser/preprocessed/SqlParser_gen.hpp b/parser/preprocessed/SqlParser_gen.hpp index a6d12e2..f6b5247 100644 --- a/parser/preprocessed/SqlParser_gen.hpp +++ b/parser/preprocessed/SqlParser_gen.hpp @@ -94,90 +94,91 @@ extern int quickstep_yydebug; TOKEN_DECIMAL = 304, TOKEN_DEFAULT = 305, TOKEN_DELETE = 306, - TOKEN_DELIMITER = 307, - TOKEN_DESC = 308, - TOKEN_DISTINCT = 309, - TOKEN_DOUBLE = 310, - TOKEN_DROP = 311, - TOKEN_ELSE = 312, - TOKEN_END = 313, - TOKEN_ESCAPE_STRINGS = 314, - TOKEN_EXISTS = 315, - TOKEN_EXTRACT = 316, - TOKEN_FALSE = 317, - TOKEN_FIRST = 318, - TOKEN_FLOAT = 319, - TOKEN_FOLLOWING = 320, - TOKEN_FOR = 321, - TOKEN_FOREIGN = 322, - TOKEN_FROM = 323, - TOKEN_FULL = 324, - TOKEN_GROUP = 325, - TOKEN_HASH = 326, - TOKEN_HAVING = 327, - TOKEN_HOUR = 328, - TOKEN_IN = 329, - TOKEN_INDEX = 330, - TOKEN_INNER = 331, - TOKEN_INSERT = 332, - TOKEN_INTEGER = 333, - TOKEN_INTERVAL = 334, - TOKEN_INTO = 335, - TOKEN_JOIN = 336, - TOKEN_KEY = 337, - TOKEN_LAST = 338, - TOKEN_LEFT = 339, - TOKEN_LIMIT = 340, - TOKEN_LONG = 341, - TOKEN_MINUTE = 342, - TOKEN_MONTH = 343, - TOKEN_NULL = 344, - TOKEN_NULLS = 345, - TOKEN_OFF = 346, - TOKEN_ON = 347, - TOKEN_ORDER = 348, - TOKEN_OUTER = 349, - TOKEN_OVER = 350, - TOKEN_PARTITION = 351, - TOKEN_PARTITIONS = 352, - TOKEN_PERCENT = 353, - TOKEN_PRECEDING = 354, - TOKEN_PRIMARY = 355, - TOKEN_PRIORITY = 356, - TOKEN_QUIT = 357, - TOKEN_RANGE = 358, - TOKEN_REAL = 359, - TOKEN_REFERENCES = 360, - TOKEN_RIGHT = 361, - TOKEN_ROW = 362, - TOKEN_ROW_DELIMITER = 363, - TOKEN_ROWS = 364, - TOKEN_SECOND = 365, - TOKEN_SELECT = 366, - TOKEN_SET = 367, - TOKEN_SMA = 368, - TOKEN_SMALLINT = 369, + TOKEN_DESC = 307, + TOKEN_DISTINCT = 308, + TOKEN_DOUBLE = 309, + TOKEN_DROP = 310, + TOKEN_ELSE = 311, + TOKEN_END = 312, + TOKEN_EXISTS = 313, + TOKEN_EXTRACT = 314, + TOKEN_FALSE = 315, + TOKEN_FIRST = 316, + TOKEN_FLOAT = 317, + TOKEN_FOLLOWING = 318, + TOKEN_FOR = 319, + TOKEN_FOREIGN = 320, + TOKEN_FROM = 321, + TOKEN_FULL = 322, + TOKEN_GROUP = 323, + TOKEN_HASH = 324, + TOKEN_HAVING = 325, + TOKEN_HOUR = 326, + TOKEN_IN = 327, + TOKEN_INDEX = 328, + TOKEN_INNER = 329, + TOKEN_INSERT = 330, + TOKEN_INTEGER = 331, + TOKEN_INTERVAL = 332, + TOKEN_INTO = 333, + TOKEN_JOIN = 334, + TOKEN_KEY = 335, + TOKEN_LAST = 336, + TOKEN_LEFT = 337, + TOKEN_LIMIT = 338, + TOKEN_LONG = 339, + TOKEN_MINUTE = 340, + TOKEN_MONTH = 341, + TOKEN_NULL = 342, + TOKEN_NULLS = 343, + TOKEN_OFF = 344, + TOKEN_ON = 345, + TOKEN_ORDER = 346, + TOKEN_OUTER = 347, + TOKEN_OVER = 348, + TOKEN_PARTITION = 349, + TOKEN_PARTITIONS = 350, + TOKEN_PERCENT = 351, + TOKEN_PRECEDING = 352, + TOKEN_PRIMARY = 353, + TOKEN_PRIORITY = 354, + TOKEN_QUIT = 355, + TOKEN_RANGE = 356, + TOKEN_REAL = 357, + TOKEN_REFERENCES = 358, + TOKEN_RIGHT = 359, + TOKEN_ROW = 360, + TOKEN_ROW_DELIMITER = 361, + TOKEN_ROWS = 362, + TOKEN_SECOND = 363, + TOKEN_SELECT = 364, + TOKEN_SET = 365, + TOKEN_SMA = 366, + TOKEN_SMALLINT = 367, + TOKEN_STDERR = 368, + TOKEN_STDOUT = 369, TOKEN_SUBSTRING = 370, TOKEN_TABLE = 371, TOKEN_THEN = 372, TOKEN_TIME = 373, TOKEN_TIMESTAMP = 374, - TOKEN_TRUE = 375, - TOKEN_TUPLESAMPLE = 376, - TOKEN_UNBOUNDED = 377, - TOKEN_UNIQUE = 378, - TOKEN_UPDATE = 379, - TOKEN_USING = 380, - TOKEN_VALUES = 381, - TOKEN_VARCHAR = 382, - TOKEN_WHEN = 383, - TOKEN_WHERE = 384, - TOKEN_WINDOW = 385, - TOKEN_WITH = 386, - TOKEN_YEAR = 387, - TOKEN_YEARMONTH = 388, - TOKEN_EOF = 389, - TOKEN_LEX_ERROR = 390 + TOKEN_TO = 375, + TOKEN_TRUE = 376, + TOKEN_TUPLESAMPLE = 377, + TOKEN_UNBOUNDED = 378, + TOKEN_UNIQUE = 379, + TOKEN_UPDATE = 380, + TOKEN_USING = 381, + TOKEN_VALUES = 382, + TOKEN_VARCHAR = 383, + TOKEN_WHEN = 384, + TOKEN_WHERE = 385, + TOKEN_WINDOW = 386, + TOKEN_WITH = 387, + TOKEN_YEAR = 388, + TOKEN_YEARMONTH = 389, + TOKEN_EOF = 390, + TOKEN_LEX_ERROR = 391 }; #endif @@ -237,8 +238,7 @@ union YYSTYPE quickstep::ParseKeyStringValue *key_string_value_; quickstep::ParseKeyStringList *key_string_list_; quickstep::ParseKeyIntegerValue *key_integer_value_; - - quickstep::ParseCopyFromParams *copy_from_params_; + quickstep::ParseKeyBoolValue *key_bool_value_; quickstep::ParseAssignment *assignment_; quickstep::PtrList<quickstep::ParseAssignment> *assignment_list_; @@ -251,7 +251,7 @@ union YYSTYPE quickstep::ParseStatementUpdate *update_statement_; quickstep::ParseStatementInsert *insert_statement_; quickstep::ParseStatementDelete *delete_statement_; - quickstep::ParseStatementCopyFrom *copy_from_statement_; + quickstep::ParseStatementCopy *copy_statement_; quickstep::ParseStatementCreateTable *create_table_statement_; quickstep::ParsePartitionClause *partition_clause_; quickstep::ParseBlockProperties *block_properties_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 4c3b52a..e5eaccf 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -215,6 +215,7 @@ target_link_libraries(quickstep_queryexecution_QueryContext quickstep_storage_HashTableFactory quickstep_storage_InsertDestination quickstep_storage_InsertDestination_proto + quickstep_storage_StorageBlockInfo quickstep_storage_WindowAggregationOperationState quickstep_threading_SpinSharedMutex quickstep_types_TypedValue http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_execution/QueryContext.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp index 7876821..b146e13 100644 --- a/query_execution/QueryContext.hpp +++ b/query_execution/QueryContext.hpp @@ -23,6 +23,7 @@ #include <cstddef> #include <cstdint> #include <memory> +#include <string> #include <unordered_map> #include <vector> @@ -33,6 +34,7 @@ #include "storage/AggregationOperationState.hpp" #include "storage/HashTable.hpp" #include "storage/InsertDestination.hpp" +#include "storage/StorageBlockInfo.hpp" #include "storage/WindowAggregationOperationState.hpp" #include "threading/SpinSharedMutex.hpp" #include "types/containers/Tuple.hpp" @@ -571,6 +573,36 @@ class QueryContext { } /** + * @brief Release the output text buffer for the specified block. + * + * @param id The id of the block to release its output text buffer. + * + * @return The output text buffer for the specified block. Caller should take + * ownership of the returned object. + **/ + inline std::string* releaseBlockOutputTextBuffer(const block_id id) { + SpinSharedMutexExclusiveLock<false> lock(block_output_text_buffer_mutex_); + auto it = block_output_text_buffers_.find(id); + DCHECK(it != block_output_text_buffers_.end()); + std::unique_ptr<std::string> output = std::move(it->second); + block_output_text_buffers_.erase(it); + return output.release(); + } + + /** + * @brief Set the output text buffer for the specified block. + * + * @param id The id of the block. + * @param output The output buffer to set. Caller relinquishes ownership of + * \p output by calling this method. + **/ + inline void setBlockOutputTextBuffer(const block_id id, std::string *output) { + SpinSharedMutexExclusiveLock<false> lock(block_output_text_buffer_mutex_); + DCHECK(block_output_text_buffers_.find(id) == block_output_text_buffers_.end()); + block_output_text_buffers_.emplace(id, std::unique_ptr<std::string>(output)); + } + + /** * @brief Get the total memory footprint of the temporary data structures * used for query execution (e.g. join hash tables, aggregation hash * tables) in bytes. @@ -634,10 +666,12 @@ class QueryContext { std::vector<std::unique_ptr<Tuple>> tuples_; std::vector<std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>> update_groups_; std::vector<std::unique_ptr<WindowAggregationOperationState>> window_aggregation_states_; + std::unordered_map<block_id, std::unique_ptr<std::string>> block_output_text_buffers_; mutable SpinSharedMutex<false> hash_tables_mutex_; mutable SpinSharedMutex<false> aggregation_states_mutex_; mutable SpinSharedMutex<false> insert_destinations_mutex_; + mutable SpinSharedMutex<false> block_output_text_buffer_mutex_; DISALLOW_COPY_AND_ASSIGN(QueryContext); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt index fdf8796..4ea21b2 100644 --- a/query_optimizer/CMakeLists.txt +++ b/query_optimizer/CMakeLists.txt @@ -94,6 +94,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_queryoptimizer_expressions_WindowAggregateFunction quickstep_queryoptimizer_physical_Aggregate quickstep_queryoptimizer_physical_CopyFrom + quickstep_queryoptimizer_physical_CopyTo quickstep_queryoptimizer_physical_CreateIndex quickstep_queryoptimizer_physical_CreateTable quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate @@ -140,6 +141,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_relationaloperators_SelectOperator quickstep_relationaloperators_SortMergeRunOperator quickstep_relationaloperators_SortRunGenerationOperator + quickstep_relationaloperators_TableExportOperator quickstep_relationaloperators_TableGeneratorOperator quickstep_relationaloperators_TextScanOperator quickstep_relationaloperators_UnionAllOperator http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 9bfd136..bf26a5b 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -78,6 +78,7 @@ #include "query_optimizer/expressions/WindowAggregateFunction.hpp" #include "query_optimizer/physical/Aggregate.hpp" #include "query_optimizer/physical/CopyFrom.hpp" +#include "query_optimizer/physical/CopyTo.hpp" #include "query_optimizer/physical/CreateIndex.hpp" #include "query_optimizer/physical/CreateTable.hpp" #include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp" @@ -124,6 +125,7 @@ #include "relational_operators/SelectOperator.hpp" #include "relational_operators/SortMergeRunOperator.hpp" #include "relational_operators/SortRunGenerationOperator.hpp" +#include "relational_operators/TableExportOperator.hpp" #include "relational_operators/TableGeneratorOperator.hpp" #include "relational_operators/TextScanOperator.hpp" #include "relational_operators/UnionAllOperator.hpp" @@ -409,6 +411,9 @@ void ExecutionGenerator::generatePlanInternal( case P::PhysicalType::kCopyFrom: return convertCopyFrom( std::static_pointer_cast<const P::CopyFrom>(physical_plan)); + case P::PhysicalType::kCopyTo: + return convertCopyTo( + std::static_pointer_cast<const P::CopyTo>(physical_plan)); case P::PhysicalType::kCreateIndex: return convertCreateIndex( std::static_pointer_cast<const P::CreateIndex>(physical_plan)); @@ -1238,8 +1243,7 @@ void ExecutionGenerator::convertCopyFrom( new TextScanOperator( query_handle_->query_id(), physical_plan->file_name(), - physical_plan->column_delimiter(), - physical_plan->escape_strings(), + physical_plan->options(), *output_relation, insert_destination_index)); insert_destination_proto->set_relational_op_index(scan_operator_index); @@ -1254,6 +1258,40 @@ void ExecutionGenerator::convertCopyFrom( false /* is_pipeline_breaker */); } +void ExecutionGenerator::convertCopyTo(const P::CopyToPtr &physical_plan) { + // CopyTo is converted to a TableExport operator. + + const CatalogRelation *input_relation; + bool input_relation_is_stored; + + const P::PhysicalPtr &input = physical_plan->input(); + P::TableReferencePtr table_reference; + const CatalogRelationInfo *input_relation_info = nullptr; + if (P::SomeTableReference::MatchesWithConditionalCast(input, &table_reference)) { + input_relation = table_reference->relation(); + input_relation_is_stored = true; + } else { + input_relation_info = findRelationInfoOutputByPhysical(input); + input_relation = input_relation_info->relation; + input_relation_is_stored = false; + } + + DCHECK(input_relation != nullptr); + const QueryPlan::DAGNodeIndex table_export_operator_index = + execution_plan_->addRelationalOperator( + new TableExportOperator(query_handle_->query_id(), + *input_relation, + input_relation_is_stored, + physical_plan->file_name(), + physical_plan->options())); + if (!input_relation_is_stored) { + DCHECK(input_relation_info != nullptr); + execution_plan_->addDirectDependency(table_export_operator_index, + input_relation_info->producer_operator_index, + false /* is_pipeline_breaker */); + } +} + void ExecutionGenerator::convertCreateIndex( const P::CreateIndexPtr &physical_plan) { // CreateIndex is converted to a CreateIndex operator. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/ExecutionGenerator.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp index 19e75c1..bc9f88b 100644 --- a/query_optimizer/ExecutionGenerator.hpp +++ b/query_optimizer/ExecutionGenerator.hpp @@ -44,6 +44,7 @@ #include "query_optimizer/expressions/Predicate.hpp" #include "query_optimizer/physical/Aggregate.hpp" #include "query_optimizer/physical/CopyFrom.hpp" +#include "query_optimizer/physical/CopyTo.hpp" #include "query_optimizer/physical/CreateIndex.hpp" #include "query_optimizer/physical/CreateTable.hpp" #include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp" @@ -282,6 +283,13 @@ class ExecutionGenerator { void convertCopyFrom(const physical::CopyFromPtr &physical_plan); /** + * @brief Converts a CopyTo to a TableExport operator. + * + * @param physical_plan The CopyTo to be converted. + */ + void convertCopyTo(const physical::CopyToPtr &physical_plan); + + /** * @brief Converts a CreateIndex to a CreateIndex operator. * * @param physical_plan The CreateIndex to be converted. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/logical/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/logical/CMakeLists.txt b/query_optimizer/logical/CMakeLists.txt index 4480e0b..b3c9e36 100644 --- a/query_optimizer/logical/CMakeLists.txt +++ b/query_optimizer/logical/CMakeLists.txt @@ -19,6 +19,7 @@ add_library(quickstep_queryoptimizer_logical_Aggregate Aggregate.cpp Aggregate.hpp) add_library(quickstep_queryoptimizer_logical_BinaryJoin BinaryJoin.cpp BinaryJoin.hpp) add_library(quickstep_queryoptimizer_logical_CopyFrom CopyFrom.cpp CopyFrom.hpp) +add_library(quickstep_queryoptimizer_logical_CopyTo CopyTo.cpp CopyTo.hpp) add_library(quickstep_queryoptimizer_logical_CreateIndex CreateIndex.cpp CreateIndex.hpp) add_library(quickstep_queryoptimizer_logical_CreateTable CreateTable.cpp CreateTable.hpp) add_library(quickstep_queryoptimizer_logical_DeleteTuples DeleteTuples.cpp DeleteTuples.hpp) @@ -74,6 +75,16 @@ target_link_libraries(quickstep_queryoptimizer_logical_CopyFrom quickstep_queryoptimizer_expressions_AttributeReference quickstep_queryoptimizer_logical_Logical quickstep_queryoptimizer_logical_LogicalType + quickstep_utility_BulkIOConfiguration + quickstep_utility_Macros + quickstep_utility_StringUtil) +target_link_libraries(quickstep_queryoptimizer_logical_CopyTo + glog + quickstep_queryoptimizer_OptimizerTree + quickstep_queryoptimizer_expressions_AttributeReference + quickstep_queryoptimizer_logical_Logical + quickstep_queryoptimizer_logical_LogicalType + quickstep_utility_BulkIOConfiguration quickstep_utility_Macros quickstep_utility_StringUtil) target_link_libraries(quickstep_queryoptimizer_logical_CreateIndex @@ -290,6 +301,7 @@ target_link_libraries(quickstep_queryoptimizer_logical quickstep_queryoptimizer_logical_Aggregate quickstep_queryoptimizer_logical_BinaryJoin quickstep_queryoptimizer_logical_CopyFrom + quickstep_queryoptimizer_logical_CopyTo quickstep_queryoptimizer_logical_CreateIndex quickstep_queryoptimizer_logical_CreateTable quickstep_queryoptimizer_logical_DeleteTuples http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/logical/CopyFrom.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/logical/CopyFrom.cpp b/query_optimizer/logical/CopyFrom.cpp index b0a1423..a80c701 100644 --- a/query_optimizer/logical/CopyFrom.cpp +++ b/query_optimizer/logical/CopyFrom.cpp @@ -44,11 +44,11 @@ void CopyFrom::getFieldStringItems( inline_field_values->push_back(file_name_); inline_field_names->push_back("column_delimiter"); - inline_field_values->push_back("\"" + EscapeSpecialChars(std::string(1, column_delimiter_)) + - "\""); + inline_field_values->push_back( + "\"" + EscapeSpecialChars(std::string(1, options_->getDelimiter())) + "\""); inline_field_names->push_back("escape_strings"); - inline_field_values->push_back(escape_strings_ ? "true" : "false"); + inline_field_values->push_back(options_->escapeStrings() ? "true" : "false"); } } // namespace logical http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/logical/CopyFrom.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/logical/CopyFrom.hpp b/query_optimizer/logical/CopyFrom.hpp index 7c5907f..e0545dc 100644 --- a/query_optimizer/logical/CopyFrom.hpp +++ b/query_optimizer/logical/CopyFrom.hpp @@ -17,8 +17,8 @@ * under the License. **/ -#ifndef QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPYFROM_HPP_ -#define QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPYFROM_HPP_ +#ifndef QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_FROM_HPP_ +#define QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_FROM_HPP_ #include <memory> #include <string> @@ -28,6 +28,7 @@ #include "query_optimizer/expressions/AttributeReference.hpp" #include "query_optimizer/logical/Logical.hpp" #include "query_optimizer/logical/LogicalType.hpp" +#include "utility/BulkIOConfiguration.hpp" #include "utility/Macros.hpp" #include "glog/logging.h" @@ -66,20 +67,14 @@ class CopyFrom : public Logical { const std::string& file_name() const { return file_name_; } /** - * @return The delimiter used in the text file to separate columns. + * @return The options for this COPY FROM statement. */ - const char column_delimiter() const { return column_delimiter_; } - - /** - * @return Whether to decode escape sequences in the text file. - */ - bool escape_strings() const { return escape_strings_; } + BulkIOConfigurationPtr options() const { return options_; } LogicalPtr copyWithNewChildren( const std::vector<LogicalPtr> &new_children) const override { DCHECK(new_children.empty()); - return Create(catalog_relation_, file_name_, column_delimiter_, - escape_strings_); + return Create(catalog_relation_, file_name_, options_); } std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override { @@ -95,19 +90,13 @@ class CopyFrom : public Logical { * * @param catalog_relation The catalog relation to insert the tuples to. * @param file_name The name of the file to read the data from. - * @param column_delimiter The delimiter used in the text file to separate - * columns. - * @param escape_strings Whether to decode escape sequences in the text file. + * @param options The options for this COPY FROM statement. * @return An immutable CopyFrom logical node. */ static CopyFromPtr Create(const CatalogRelation *catalog_relation, const std::string &file_name, - const char column_delimiter, - bool escape_strings) { - return CopyFromPtr(new CopyFrom(catalog_relation, - file_name, - column_delimiter, - escape_strings)); + const BulkIOConfigurationPtr &options) { + return CopyFromPtr(new CopyFrom(catalog_relation, file_name, options)); } protected: @@ -122,18 +111,14 @@ class CopyFrom : public Logical { private: CopyFrom(const CatalogRelation *catalog_relation, const std::string &file_name, - const char column_delimiter, - bool escape_strings) + const BulkIOConfigurationPtr &options) : catalog_relation_(catalog_relation), file_name_(file_name), - column_delimiter_(column_delimiter), - escape_strings_(escape_strings) {} + options_(options) {} const CatalogRelation *catalog_relation_; - std::string file_name_; - - const char column_delimiter_; - const bool escape_strings_; + const std::string file_name_; + const BulkIOConfigurationPtr options_; DISALLOW_COPY_AND_ASSIGN(CopyFrom); }; @@ -144,4 +129,4 @@ class CopyFrom : public Logical { } // namespace optimizer } // namespace quickstep -#endif /* QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPYFROM_HPP_ */ +#endif // QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_FROM_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/logical/CopyTo.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/logical/CopyTo.cpp b/query_optimizer/logical/CopyTo.cpp new file mode 100644 index 0000000..369f732 --- /dev/null +++ b/query_optimizer/logical/CopyTo.cpp @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "query_optimizer/logical/CopyTo.hpp" + +#include <string> +#include <vector> + +#include "query_optimizer/OptimizerTree.hpp" +#include "utility/StringUtil.hpp" + +namespace quickstep { +namespace optimizer { +namespace logical { + +void CopyTo::getFieldStringItems( + std::vector<std::string> *inline_field_names, + std::vector<std::string> *inline_field_values, + std::vector<std::string> *non_container_child_field_names, + std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields, + std::vector<std::string> *container_child_field_names, + std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const { + inline_field_names->push_back("file_name"); + inline_field_values->push_back(file_name_); + + non_container_child_field_names->push_back("input"); + non_container_child_fields->push_back(input_); + + inline_field_names->push_back("format"); + inline_field_values->push_back(options_->getFormatName()); + + inline_field_names->push_back("column_delimiter"); + inline_field_values->push_back( + "\"" + EscapeSpecialChars(std::string(1, options_->getDelimiter())) + "\""); + + if (options_->escapeStrings()) { + inline_field_names->push_back("escape_strings"); + inline_field_values->push_back("true"); + } + + if (options_->hasHeader()) { + inline_field_names->push_back("header"); + inline_field_values->push_back("true"); + } + + if (options_->getQuoteCharacter() != 0) { + inline_field_names->push_back("quote"); + inline_field_values->push_back(std::string(1, options_->getQuoteCharacter())); + } + + if (options_->getNullString() != "") { + inline_field_names->push_back("null_string"); + inline_field_values->push_back(options_->getNullString()); + } +} + +} // namespace logical +} // namespace optimizer +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/logical/CopyTo.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/logical/CopyTo.hpp b/query_optimizer/logical/CopyTo.hpp new file mode 100644 index 0000000..eafef0e --- /dev/null +++ b/query_optimizer/logical/CopyTo.hpp @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_TO_HPP_ +#define QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_TO_HPP_ + +#include <memory> +#include <string> +#include <vector> + +#include "query_optimizer/OptimizerTree.hpp" +#include "query_optimizer/expressions/AttributeReference.hpp" +#include "query_optimizer/logical/Logical.hpp" +#include "query_optimizer/logical/LogicalType.hpp" +#include "utility/BulkIOConfiguration.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +namespace quickstep { +namespace optimizer { +namespace logical { + +/** \addtogroup OptimizerLogical + * @{ + */ + +class CopyTo; +typedef std::shared_ptr<const CopyTo> CopyToPtr; + +/** + * @brief Represents an operation that copies data from a relation to a text file. + */ +class CopyTo : public Logical { + public: + LogicalType getLogicalType() const override { + return LogicalType::kCopyTo; + } + + std::string getName() const override { + return "CopyTo"; + } + + /** + * @return The input relation whose data is to be exported. + */ + const LogicalPtr& input() const { + return input_; + } + + /** + * @return The name of the file to write the data to. + */ + const std::string& file_name() const { + return file_name_; + } + + /** + * @return The options for this COPY TO statement. + */ + BulkIOConfigurationPtr options() const { + return options_; + } + + LogicalPtr copyWithNewChildren( + const std::vector<LogicalPtr> &new_children) const override { + DCHECK(new_children.size() == 1); + return Create(new_children.front(), file_name_, options_); + } + + std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override { + return {}; + } + + std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override { + return input_->getOutputAttributes(); + } + + /** + * @brief Creates a CopyTo logical node. + * + * @param input The input relation whose data is to be exported. + * @param file_name The name of the file to write the data to. + * @param options The options for this COPY TO statement. + * @return An immutable CopyTo logical node. + */ + static CopyToPtr Create(const LogicalPtr &input, + const std::string &file_name, + const BulkIOConfigurationPtr &options) { + return CopyToPtr(new CopyTo(input, file_name, options)); + } + + protected: + void getFieldStringItems( + std::vector<std::string> *inline_field_names, + std::vector<std::string> *inline_field_values, + std::vector<std::string> *non_container_child_field_names, + std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields, + std::vector<std::string> *container_child_field_names, + std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override; + + private: + CopyTo(const LogicalPtr &input, + const std::string &file_name, + const BulkIOConfigurationPtr &options) + : input_(input), + file_name_(file_name), + options_(options) { + addChild(input); + } + + const LogicalPtr input_; + const std::string file_name_; + const BulkIOConfigurationPtr options_; + + DISALLOW_COPY_AND_ASSIGN(CopyTo); +}; + +/** @} */ + +} // namespace logical +} // namespace optimizer +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_TO_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/logical/LogicalType.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/logical/LogicalType.hpp b/query_optimizer/logical/LogicalType.hpp index 21ffdca..a629637 100644 --- a/query_optimizer/logical/LogicalType.hpp +++ b/query_optimizer/logical/LogicalType.hpp @@ -34,6 +34,7 @@ namespace logical { enum class LogicalType { kAggregate, kCopyFrom, + kCopyTo, kCreateIndex, kCreateTable, kDeleteTuples, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/physical/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt index e510f6b..1c8dc3d 100644 --- a/query_optimizer/physical/CMakeLists.txt +++ b/query_optimizer/physical/CMakeLists.txt @@ -19,6 +19,7 @@ add_library(quickstep_queryoptimizer_physical_Aggregate Aggregate.cpp Aggregate.hpp) add_library(quickstep_queryoptimizer_physical_BinaryJoin BinaryJoin.cpp BinaryJoin.hpp) add_library(quickstep_queryoptimizer_physical_CopyFrom CopyFrom.cpp CopyFrom.hpp) +add_library(quickstep_queryoptimizer_physical_CopyTo CopyTo.cpp CopyTo.hpp) add_library(quickstep_queryoptimizer_physical_CreateIndex CreateIndex.cpp CreateIndex.hpp) add_library(quickstep_queryoptimizer_physical_CreateTable CreateTable.cpp CreateTable.hpp) add_library(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate @@ -81,7 +82,18 @@ target_link_libraries(quickstep_queryoptimizer_physical_CopyFrom quickstep_queryoptimizer_expressions_NamedExpression quickstep_queryoptimizer_physical_Physical quickstep_queryoptimizer_physical_PhysicalType - quickstep_utility_Macros) + quickstep_utility_BulkIOConfiguration + quickstep_utility_Macros + quickstep_utility_StringUtil) +target_link_libraries(quickstep_queryoptimizer_physical_CopyTo + glog + quickstep_queryoptimizer_OptimizerTree + quickstep_queryoptimizer_expressions_AttributeReference + quickstep_queryoptimizer_physical_Physical + quickstep_queryoptimizer_physical_PhysicalType + quickstep_utility_BulkIOConfiguration + quickstep_utility_Macros + quickstep_utility_StringUtil) target_link_libraries(quickstep_queryoptimizer_physical_CreateIndex glog quickstep_queryoptimizer_OptimizerTree @@ -327,6 +339,7 @@ target_link_libraries(quickstep_queryoptimizer_physical quickstep_queryoptimizer_physical_Aggregate quickstep_queryoptimizer_physical_BinaryJoin quickstep_queryoptimizer_physical_CopyFrom + quickstep_queryoptimizer_physical_CopyTo quickstep_queryoptimizer_physical_CreateIndex quickstep_queryoptimizer_physical_CreateTable quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/physical/CopyFrom.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/CopyFrom.cpp b/query_optimizer/physical/CopyFrom.cpp index 8448d4e..65279fe 100644 --- a/query_optimizer/physical/CopyFrom.cpp +++ b/query_optimizer/physical/CopyFrom.cpp @@ -24,6 +24,7 @@ #include "catalog/CatalogRelation.hpp" #include "query_optimizer/OptimizerTree.hpp" +#include "utility/StringUtil.hpp" namespace quickstep { namespace optimizer { @@ -43,10 +44,11 @@ void CopyFrom::getFieldStringItems( inline_field_values->push_back(file_name_); inline_field_names->push_back("column_delimiter"); - inline_field_values->push_back(std::string(1, column_delimiter_)); + inline_field_values->push_back( + "\"" + EscapeSpecialChars(std::string(1, options_->getDelimiter())) + "\""); inline_field_names->push_back("escape_strings"); - inline_field_values->push_back(escape_strings_ ? "true" : "false"); + inline_field_values->push_back(options_->escapeStrings() ? "true" : "false"); } } // namespace physical http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/physical/CopyFrom.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/CopyFrom.hpp b/query_optimizer/physical/CopyFrom.hpp index ecbf318..6aa60cb 100644 --- a/query_optimizer/physical/CopyFrom.hpp +++ b/query_optimizer/physical/CopyFrom.hpp @@ -30,6 +30,7 @@ #include "query_optimizer/expressions/NamedExpression.hpp" #include "query_optimizer/physical/Physical.hpp" #include "query_optimizer/physical/PhysicalType.hpp" +#include "utility/BulkIOConfiguration.hpp" #include "utility/Macros.hpp" #include "glog/logging.h" @@ -68,22 +69,16 @@ class CopyFrom : public Physical { const std::string& file_name() const { return file_name_; } /** - * @return The delimiter used in the text file to separate columns. + * @return The options for this COPY FROM statement. */ - const char column_delimiter() const { return column_delimiter_; } - - /** - * @return Whether to decode escape sequences in the text file. - */ - bool escape_strings() const { return escape_strings_; } + BulkIOConfigurationPtr options() const { return options_; } PhysicalPtr copyWithNewChildren( const std::vector<PhysicalPtr> &new_children) const override { DCHECK(new_children.empty()); return Create(catalog_relation_, file_name_, - column_delimiter_, - escape_strings_); + options_); } std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override { @@ -112,12 +107,8 @@ class CopyFrom : public Physical { */ static CopyFromPtr Create(const CatalogRelation *catalog_relation, const std::string &file_name, - const char &column_delimiter, - bool escape_strings) { - return CopyFromPtr(new CopyFrom(catalog_relation, - file_name, - column_delimiter, - escape_strings)); + const BulkIOConfigurationPtr &options) { + return CopyFromPtr(new CopyFrom(catalog_relation, file_name, options)); } protected: @@ -132,18 +123,14 @@ class CopyFrom : public Physical { private: CopyFrom(const CatalogRelation *catalog_relation, const std::string &file_name, - const char column_delimiter, - bool escape_strings) + const BulkIOConfigurationPtr &options) : catalog_relation_(catalog_relation), file_name_(file_name), - column_delimiter_(column_delimiter), - escape_strings_(escape_strings) {} + options_(options) {} const CatalogRelation *catalog_relation_; - std::string file_name_; - - const char column_delimiter_; - const bool escape_strings_; + const std::string file_name_; + const BulkIOConfigurationPtr options_; DISALLOW_COPY_AND_ASSIGN(CopyFrom); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/physical/CopyTo.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/CopyTo.cpp b/query_optimizer/physical/CopyTo.cpp new file mode 100644 index 0000000..9cd954e --- /dev/null +++ b/query_optimizer/physical/CopyTo.cpp @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "query_optimizer/physical/CopyTo.hpp" + +#include <string> +#include <vector> + +#include "query_optimizer/OptimizerTree.hpp" +#include "utility/StringUtil.hpp" + +namespace quickstep { +namespace optimizer { +namespace physical { + +void CopyTo::getFieldStringItems( + std::vector<std::string> *inline_field_names, + std::vector<std::string> *inline_field_values, + std::vector<std::string> *non_container_child_field_names, + std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields, + std::vector<std::string> *container_child_field_names, + std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const { + inline_field_names->push_back("file_name"); + inline_field_values->push_back(file_name_); + + non_container_child_field_names->push_back("input"); + non_container_child_fields->push_back(input_); + + inline_field_names->push_back("format"); + inline_field_values->push_back(options_->getFormatName()); + + inline_field_names->push_back("column_delimiter"); + inline_field_values->push_back( + "\"" + EscapeSpecialChars(std::string(1, options_->getDelimiter())) + "\""); + + if (options_->escapeStrings()) { + inline_field_names->push_back("escape_strings"); + inline_field_values->push_back("true"); + } + + if (options_->hasHeader()) { + inline_field_names->push_back("header"); + inline_field_values->push_back("true"); + } + + if (options_->getQuoteCharacter() != 0) { + inline_field_names->push_back("quote"); + inline_field_values->push_back(std::string(1, options_->getQuoteCharacter())); + } + + if (options_->getNullString() != "") { + inline_field_names->push_back("null_string"); + inline_field_values->push_back(options_->getNullString()); + } +} + +} // namespace physical +} // namespace optimizer +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/physical/CopyTo.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/CopyTo.hpp b/query_optimizer/physical/CopyTo.hpp new file mode 100644 index 0000000..69d5c7a --- /dev/null +++ b/query_optimizer/physical/CopyTo.hpp @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPY_TO_HPP_ +#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPY_TO_HPP_ + +#include <memory> +#include <string> +#include <vector> + +#include "query_optimizer/OptimizerTree.hpp" +#include "query_optimizer/expressions/AttributeReference.hpp" +#include "query_optimizer/physical/Physical.hpp" +#include "query_optimizer/physical/PhysicalType.hpp" +#include "utility/BulkIOConfiguration.hpp" +#include "utility/BulkIOConfiguration.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +namespace quickstep { +namespace optimizer { +namespace physical { + +/** \addtogroup OptimizerPhysical + * @{ + */ + +class CopyTo; +typedef std::shared_ptr<const CopyTo> CopyToPtr; + +/** + * @brief Represents an operation that copies data from a relation to a text file. + */ +class CopyTo : public Physical { + public: + PhysicalType getPhysicalType() const override { + return PhysicalType::kCopyTo; + } + + std::string getName() const override { + return "CopyTo"; + } + + /** + * @return The input relation whose data is to be exported. + */ + const PhysicalPtr& input() const { + return input_; + } + + /** + * @return The name of the file to write the data to. + */ + const std::string& file_name() const { + return file_name_; + } + + /** + * @return The options for this COPY TO statement. + */ + BulkIOConfigurationPtr options() const { + return options_; + } + + PhysicalPtr copyWithNewChildren( + const std::vector<PhysicalPtr> &new_children) const override { + DCHECK(new_children.size() == 1); + return Create(new_children.front(), file_name_, options_); + } + + std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override { + return {}; + } + + std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override { + return input_->getOutputAttributes(); + } + + bool maybeCopyWithPrunedExpressions( + const expressions::UnorderedNamedExpressionSet &referenced_expressions, + PhysicalPtr *output) const override { + return false; + } + + /** + * @brief Creates a CopyTo physical node. + * + * @param input The input relation whose data is to be exported. + * @param file_name The name of the file to write the data to. + * @param options The options for this COPY TO statement. + * @return An immutable CopyTo physical node. + */ + static CopyToPtr Create(const PhysicalPtr &input, + const std::string &file_name, + const BulkIOConfigurationPtr &options) { + return CopyToPtr(new CopyTo(input, file_name, options)); + } + + protected: + void getFieldStringItems( + std::vector<std::string> *inline_field_names, + std::vector<std::string> *inline_field_values, + std::vector<std::string> *non_container_child_field_names, + std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields, + std::vector<std::string> *container_child_field_names, + std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override; + + private: + CopyTo(const PhysicalPtr &input, + const std::string &file_name, + const BulkIOConfigurationPtr &options) + : input_(input), + file_name_(file_name), + options_(options) { + addChild(input); + } + + const PhysicalPtr input_; + const std::string file_name_; + const BulkIOConfigurationPtr options_; + + DISALLOW_COPY_AND_ASSIGN(CopyTo); +}; + +/** @} */ + +} // namespace physical +} // namespace optimizer +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPY_TO_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/physical/PhysicalType.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/PhysicalType.hpp b/query_optimizer/physical/PhysicalType.hpp index 47db7ec..ac4376e 100644 --- a/query_optimizer/physical/PhysicalType.hpp +++ b/query_optimizer/physical/PhysicalType.hpp @@ -34,6 +34,7 @@ namespace physical { enum class PhysicalType { kAggregate, kCopyFrom, + kCopyTo, kCreateIndex, kCreateTable, kCrossReferenceCoalesceAggregate, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/resolver/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/resolver/CMakeLists.txt b/query_optimizer/resolver/CMakeLists.txt index 4e364a6..18a03c7 100644 --- a/query_optimizer/resolver/CMakeLists.txt +++ b/query_optimizer/resolver/CMakeLists.txt @@ -97,6 +97,7 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver quickstep_queryoptimizer_expressions_WindowAggregateFunction quickstep_queryoptimizer_logical_Aggregate quickstep_queryoptimizer_logical_CopyFrom + quickstep_queryoptimizer_logical_CopyTo quickstep_queryoptimizer_logical_CreateIndex quickstep_queryoptimizer_logical_CreateTable quickstep_queryoptimizer_logical_DeleteTuples @@ -131,6 +132,7 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver quickstep_types_operations_unaryoperations_DateExtractOperation quickstep_types_operations_unaryoperations_SubstringOperation quickstep_types_operations_unaryoperations_UnaryOperation + quickstep_utility_BulkIOConfiguration quickstep_utility_Macros quickstep_utility_PtrList quickstep_utility_PtrVector http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/resolver/Resolver.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp index 0f65255..4bccf89 100644 --- a/query_optimizer/resolver/Resolver.cpp +++ b/query_optimizer/resolver/Resolver.cpp @@ -93,6 +93,7 @@ #include "query_optimizer/expressions/WindowAggregateFunction.hpp" #include "query_optimizer/logical/Aggregate.hpp" #include "query_optimizer/logical/CopyFrom.hpp" +#include "query_optimizer/logical/CopyTo.hpp" #include "query_optimizer/logical/CreateIndex.hpp" #include "query_optimizer/logical/CreateTable.hpp" #include "query_optimizer/logical/DeleteTuples.hpp" @@ -126,6 +127,7 @@ #include "types/operations/unary_operations/DateExtractOperation.hpp" #include "types/operations/unary_operations/SubstringOperation.hpp" #include "types/operations/unary_operations/UnaryOperation.hpp" +#include "utility/BulkIOConfiguration.hpp" #include "utility/PtrList.hpp" #include "utility/PtrVector.hpp" #include "utility/SqlError.hpp" @@ -143,6 +145,44 @@ namespace E = ::quickstep::optimizer::expressions; namespace L = ::quickstep::optimizer::logical; namespace S = ::quickstep::serialization; +namespace { + +attribute_id GetAttributeIdFromName(const PtrList<ParseAttributeDefinition> &attribute_definition_list, + const std::string &attribute_name) { + const std::string lower_attribute_name = ToLower(attribute_name); + + attribute_id attr_id = 0; + for (const ParseAttributeDefinition &attribute_definition : attribute_definition_list) { + if (lower_attribute_name == ToLower(attribute_definition.name()->value())) { + return attr_id; + } + + ++attr_id; + } + + return kInvalidAttributeID; +} + +const ParseString* GetKeyValueString(const ParseKeyValue &key_value) { + if (key_value.getKeyValueType() != ParseKeyValue::kStringString) { + THROW_SQL_ERROR_AT(&key_value) + << "Invalid value type for " << key_value.key()->value() + << ", expected a string."; + } + return static_cast<const ParseKeyStringValue&>(key_value).value(); +} + +bool GetKeyValueBool(const ParseKeyValue &key_value) { + if (key_value.getKeyValueType() != ParseKeyValue::kStringBool) { + THROW_SQL_ERROR_AT(&key_value) + << "Invalid value for " << key_value.key()->value() + << ", expected true or false."; + } + return static_cast<const ParseKeyBoolValue&>(key_value).value(); +} + +} // namespace + struct Resolver::ExpressionResolutionInfo { /** * @brief Constructs an ExpressionResolutionInfo that disallows aggregate @@ -316,11 +356,25 @@ struct Resolver::SelectListInfo { L::LogicalPtr Resolver::resolve(const ParseStatement &parse_query) { switch (parse_query.getStatementType()) { - case ParseStatement::kCopyFrom: - context_->set_is_catalog_changed(); - logical_plan_ = resolveCopyFrom( - static_cast<const ParseStatementCopyFrom&>(parse_query)); + case ParseStatement::kCopy: { + const ParseStatementCopy ©_statemnt = + static_cast<const ParseStatementCopy&>(parse_query); + if (copy_statemnt.getCopyDirection() == ParseStatementCopy::kFrom) { + context_->set_is_catalog_changed(); + logical_plan_ = resolveCopyFrom(copy_statemnt); + } else { + DCHECK(copy_statemnt.getCopyDirection() == ParseStatementCopy::kTo); + if (copy_statemnt.with_clause() != nullptr) { + resolveWithClause(*copy_statemnt.with_clause()); + } + logical_plan_ = resolveCopyTo(copy_statemnt); + + if (copy_statemnt.with_clause() != nullptr) { + reportIfWithClauseUnused(*copy_statemnt.with_clause()); + } + } break; + } case ParseStatement::kCreateTable: context_->set_is_catalog_changed(); logical_plan_ = resolveCreateTable( @@ -359,16 +413,7 @@ L::LogicalPtr Resolver::resolve(const ParseStatement &parse_query) { logical_plan_ = resolveInsertSelection(insert_selection_statement); if (insert_selection_statement.with_clause() != nullptr) { - // Report an error if there is a WITH query that is not actually used. - if (!with_queries_info_.unreferenced_query_indexes.empty()) { - int unreferenced_with_query_index = *with_queries_info_.unreferenced_query_indexes.begin(); - const ParseSubqueryTableReference &unreferenced_with_query = - (*insert_selection_statement.with_clause())[unreferenced_with_query_index]; - THROW_SQL_ERROR_AT(&unreferenced_with_query) - << "WITH query " - << unreferenced_with_query.table_reference_signature()->table_alias()->value() - << " is defined but not used"; - } + reportIfWithClauseUnused(*insert_selection_statement.with_clause()); } } break; @@ -385,16 +430,7 @@ L::LogicalPtr Resolver::resolve(const ParseStatement &parse_query) { nullptr /* type_hints */, nullptr /* parent_resolver */); if (set_operation_statement.with_clause() != nullptr) { - // Report an error if there is a WITH query that is not actually used. - if (!with_queries_info_.unreferenced_query_indexes.empty()) { - int unreferenced_with_query_index = *with_queries_info_.unreferenced_query_indexes.begin(); - const ParseSubqueryTableReference &unreferenced_with_query = - (*set_operation_statement.with_clause())[unreferenced_with_query_index]; - THROW_SQL_ERROR_AT(&unreferenced_with_query) - << "WITH query " - << unreferenced_with_query.table_reference_signature()->table_alias()->value() - << " is defined but not used"; - } + reportIfWithClauseUnused(*set_operation_statement.with_clause()); } break; } @@ -418,27 +454,157 @@ L::LogicalPtr Resolver::resolve(const ParseStatement &parse_query) { } L::LogicalPtr Resolver::resolveCopyFrom( - const ParseStatementCopyFrom ©_from_statement) { - // Default parameters. - std::string column_delimiter_ = "\t"; - bool escape_strings_ = true; + const ParseStatementCopy ©_from_statement) { + DCHECK(copy_from_statement.getCopyDirection() == ParseStatementCopy::kFrom); + const PtrList<ParseKeyValue> *params = copy_from_statement.params(); - const ParseCopyFromParams *params = copy_from_statement.params(); + BulkIOFormat file_format = BulkIOFormat::kText; if (params != nullptr) { - if (params->delimiter != nullptr) { - column_delimiter_ = params->delimiter->value(); - if (column_delimiter_.size() != 1) { - THROW_SQL_ERROR_AT(params->delimiter) - << "DELIMITER is not a single character"; + for (const ParseKeyValue ¶m : *params) { + const std::string &key = ToLower(param.key()->value()); + if (key == "format") { + const ParseString *parse_format = GetKeyValueString(param); + const std::string format = ToLower(parse_format->value()); + // TODO(jianqiao): Support other bulk load formats such as CSV. + if (format != "text") { + THROW_SQL_ERROR_AT(parse_format) << "Unsupported file format: " << format; + } + // Update file_format when other formats get supported. + break; + } + } + } + + std::unique_ptr<BulkIOConfiguration> options = + std::make_unique<BulkIOConfiguration>(file_format); + if (params != nullptr) { + for (const ParseKeyValue ¶m : *params) { + const std::string &key = ToLower(param.key()->value()); + if (key == "delimiter") { + const ParseString *parse_delimiter = GetKeyValueString(param); + const std::string &delimiter = parse_delimiter->value(); + if (delimiter.size() != 1) { + THROW_SQL_ERROR_AT(parse_delimiter) + << "Delimiter must be a single character"; + } + options->setDelimiter(delimiter.front()); + } else if (key == "escape_strings") { + options->setEscapeStrings(GetKeyValueBool(param)); + } else if (key != "format") { + THROW_SQL_ERROR_AT(¶m) << "Unsupported copy option: " << key; } } - escape_strings_ = params->escape_strings; } return L::CopyFrom::Create(resolveRelationName(copy_from_statement.relation_name()), - copy_from_statement.source_filename()->value(), - column_delimiter_[0], - escape_strings_); + copy_from_statement.file_name()->value(), + BulkIOConfigurationPtr(options.release())); +} + +L::LogicalPtr Resolver::resolveCopyTo( + const ParseStatementCopy ©_to_statement) { + DCHECK(copy_to_statement.getCopyDirection() == ParseStatementCopy::kTo); + const PtrList<ParseKeyValue> *params = copy_to_statement.params(); + + // Check if copy format is explicitly specified. + BulkIOFormat file_format = BulkIOFormat::kText; + bool format_specified = false; + if (params != nullptr) { + for (const ParseKeyValue ¶m : *params) { + const std::string &key = ToLower(param.key()->value()); + if (key == "format") { + const ParseString *parse_format = GetKeyValueString(param); + const std::string format = ToLower(parse_format->value()); + if (format == "csv") { + file_format = BulkIOFormat::kCSV; + } else if (format == "text") { + file_format = BulkIOFormat::kText; + } else { + THROW_SQL_ERROR_AT(parse_format) << "Unsupported file format: " << format; + } + format_specified = true; + break; + } + } + } + + const std::string &file_name = copy_to_statement.file_name()->value(); + if (file_name.length() <= 1) { + THROW_SQL_ERROR_AT(copy_to_statement.file_name()) + << "File name can not be empty"; + } + + // Infer copy format from file name extension. + if (!format_specified) { + if (file_name.length() > 4) { + if (ToLower(file_name.substr(file_name.length() - 4)) == ".csv") { + file_format = BulkIOFormat::kCSV; + } + } + } + + // Resolve the copy options. + std::unique_ptr<BulkIOConfiguration> options = + std::make_unique<BulkIOConfiguration>(file_format); + if (params != nullptr) { + for (const ParseKeyValue ¶m : *params) { + const std::string &key = ToLower(param.key()->value()); + if (key == "delimiter") { + const ParseString *parse_delimiter = GetKeyValueString(param); + const std::string &delimiter = parse_delimiter->value(); + if (delimiter.size() != 1) { + THROW_SQL_ERROR_AT(parse_delimiter) + << "Delimiter must be a single character"; + } + options->setDelimiter(delimiter.front()); + } else if (file_format == BulkIOFormat::kText && key == "escape_strings") { + options->setEscapeStrings(GetKeyValueBool(param)); + } else if (file_format == BulkIOFormat::kCSV && key == "header") { + options->setHeader(GetKeyValueBool(param)); + } else if (file_format == BulkIOFormat::kCSV && key == "quote") { + const ParseString *parse_quote = GetKeyValueString(param); + const std::string "e = parse_quote->value(); + if (quote.size() != 1) { + THROW_SQL_ERROR_AT(parse_quote) + << "Quote must be a single character"; + } + options->setQuoteCharacter(quote.front()); + } else if (key == "null_string") { + const ParseString *parse_null_string = GetKeyValueString(param); + options->setNullString(parse_null_string->value()); + } else if (key != "format") { + THROW_SQL_ERROR_AT(¶m) + << "Unsupported copy option \"" << key + << "\" for file format " << options->getFormatName(); + } + } + } + + // Resolve the source relation. + L::LogicalPtr input; + if (copy_to_statement.set_operation_query() != nullptr) { + input = resolveSetOperation(*copy_to_statement.set_operation_query(), + "" /* set_operation_name */, + nullptr /* type_hints */, + nullptr /* parent_resolver */); + } else { + const ParseString *relation_name = copy_to_statement.relation_name(); + DCHECK(relation_name != nullptr); + std::unique_ptr<ParseTableReference> table_reference = + std::make_unique<ParseSimpleTableReference>( + relation_name->line_number(), + relation_name->column_number(), + new ParseString(relation_name->line_number(), + relation_name->column_number(), + relation_name->value()), + nullptr /* sample */); + NameResolver name_resolver; + input = resolveTableReference(*table_reference, &name_resolver); + } + + return L::CopyTo::Create(input, + copy_to_statement.file_name()->value(), + BulkIOConfigurationPtr(options.release())); } L::LogicalPtr Resolver::resolveCreateTable( @@ -491,26 +657,6 @@ L::LogicalPtr Resolver::resolveCreateTable( return L::CreateTable::Create(relation_name, attributes, block_properties, partition_scheme_header_proto); } -namespace { - -attribute_id GetAttributeIdFromName(const PtrList<ParseAttributeDefinition> &attribute_definition_list, - const std::string &attribute_name) { - const std::string lower_attribute_name = ToLower(attribute_name); - - attribute_id attr_id = 0; - for (const ParseAttributeDefinition &attribute_definition : attribute_definition_list) { - if (lower_attribute_name == ToLower(attribute_definition.name()->value())) { - return attr_id; - } - - ++attr_id; - } - - return kInvalidAttributeID; -} - -} // namespace - StorageBlockLayoutDescription* Resolver::resolveBlockProperties( const ParseStatementCreateTable &create_table_statement) { const ParseBlockProperties *block_properties @@ -1595,6 +1741,19 @@ void Resolver::appendProjectIfNeedPrecomputationAfterAggregation( } } +void Resolver::reportIfWithClauseUnused( + const PtrVector<ParseSubqueryTableReference> &with_list) const { + if (!with_queries_info_.unreferenced_query_indexes.empty()) { + int unreferenced_with_query_index = *with_queries_info_.unreferenced_query_indexes.begin(); + const ParseSubqueryTableReference &unreferenced_with_query = + with_list[unreferenced_with_query_index]; + THROW_SQL_ERROR_AT(&unreferenced_with_query) + << "WITH query " + << unreferenced_with_query.table_reference_signature()->table_alias()->value() + << " is defined but not used"; + } +} + void Resolver::validateSelectExpressionsForAggregation( const ParseSelectionClause &parse_selection, const std::vector<E::NamedExpressionPtr> &select_list_expressions, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/resolver/Resolver.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/resolver/Resolver.hpp b/query_optimizer/resolver/Resolver.hpp index 1ae565a..a7c1a80 100644 --- a/query_optimizer/resolver/Resolver.hpp +++ b/query_optimizer/resolver/Resolver.hpp @@ -55,7 +55,7 @@ class ParseSimpleCaseExpression; class ParseSimpleTableReference; class ParseSubqueryTableReference; class ParseStatement; -class ParseStatementCopyFrom; +class ParseStatementCopy; class ParseStatementCreateTable; class ParseStatementCreateIndex; class ParseStatementDelete; @@ -283,7 +283,16 @@ class Resolver { * @return A logical plan for the COPY FROM query. */ logical::LogicalPtr resolveCopyFrom( - const ParseStatementCopyFrom ©_from_statement); + const ParseStatementCopy ©_from_statement); + + /** + * @brief Resolves a COPY TO query and returns a logical plan. + * + * @param copy_to_statement The COPY TO parse tree. + * @return A logical plan for the COPY TO query. + */ + logical::LogicalPtr resolveCopyTo( + const ParseStatementCopy ©_to_statement); /** * @brief Resolves a UPDATE query and returns a logical plan. @@ -621,6 +630,14 @@ class Resolver { static std::string GenerateOrderingAttributeAlias(int index); /** + * @brief Report an error if there is a WITH query that is not actually used. + * + * @param with_list The list of subqueries in WITH clause. + */ + void reportIfWithClauseUnused( + const PtrVector<ParseSubqueryTableReference> &with_list) const; + + /** * @brief Validates each SELECT-list expression to ensure that it does not * reference a named expression with an ID not in \p valid_expr_id_set. * http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/strategy/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/strategy/CMakeLists.txt b/query_optimizer/strategy/CMakeLists.txt index e011126..20a4eb4 100644 --- a/query_optimizer/strategy/CMakeLists.txt +++ b/query_optimizer/strategy/CMakeLists.txt @@ -76,6 +76,7 @@ target_link_libraries(quickstep_queryoptimizer_strategy_OneToOne quickstep_queryoptimizer_expressions_AttributeReference quickstep_queryoptimizer_expressions_ExpressionUtil quickstep_queryoptimizer_logical_CopyFrom + quickstep_queryoptimizer_logical_CopyTo quickstep_queryoptimizer_logical_CreateIndex quickstep_queryoptimizer_logical_CreateTable quickstep_queryoptimizer_logical_DeleteTuples @@ -95,6 +96,7 @@ target_link_libraries(quickstep_queryoptimizer_strategy_OneToOne quickstep_queryoptimizer_logical_WindowAggregate quickstep_queryoptimizer_physical_Aggregate quickstep_queryoptimizer_physical_CopyFrom + quickstep_queryoptimizer_physical_CopyTo quickstep_queryoptimizer_physical_CreateIndex quickstep_queryoptimizer_physical_CreateTable quickstep_queryoptimizer_physical_DeleteTuples http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/strategy/OneToOne.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/strategy/OneToOne.cpp b/query_optimizer/strategy/OneToOne.cpp index af4e150..3cfe013 100644 --- a/query_optimizer/strategy/OneToOne.cpp +++ b/query_optimizer/strategy/OneToOne.cpp @@ -27,6 +27,7 @@ #include "query_optimizer/expressions/AttributeReference.hpp" #include "query_optimizer/expressions/ExpressionUtil.hpp" #include "query_optimizer/logical/CopyFrom.hpp" +#include "query_optimizer/logical/CopyTo.hpp" #include "query_optimizer/logical/CreateIndex.hpp" #include "query_optimizer/logical/CreateTable.hpp" #include "query_optimizer/logical/DeleteTuples.hpp" @@ -45,6 +46,7 @@ #include "query_optimizer/logical/WindowAggregate.hpp" #include "query_optimizer/physical/Aggregate.hpp" #include "query_optimizer/physical/CopyFrom.hpp" +#include "query_optimizer/physical/CopyTo.hpp" #include "query_optimizer/physical/CreateIndex.hpp" #include "query_optimizer/physical/CreateTable.hpp" #include "query_optimizer/physical/DeleteTuples.hpp" @@ -104,19 +106,28 @@ bool OneToOne::generatePlan(const L::LogicalPtr &logical_input, case L::LogicalType::kCopyFrom: { const L::CopyFromPtr copy_from = std::static_pointer_cast<const L::CopyFrom>(logical_input); - *physical_output = P::CopyFrom::Create( - copy_from->catalog_relation(), copy_from->file_name(), - copy_from->column_delimiter(), copy_from->escape_strings()); + *physical_output = P::CopyFrom::Create(copy_from->catalog_relation(), + copy_from->file_name(), + copy_from->options()); + return true; + } + case L::LogicalType::kCopyTo: { + const L::CopyToPtr copy_to = + std::static_pointer_cast<const L::CopyTo>(logical_input); + *physical_output = P::CopyTo::Create( + physical_mapper_->createOrGetPhysicalFromLogical(copy_to->input()), + copy_to->file_name(), + copy_to->options()); return true; } case L::LogicalType::kCreateIndex: { const L::CreateIndexPtr create_index = std::static_pointer_cast<const L::CreateIndex>(logical_input); - *physical_output = P::CreateIndex::Create(physical_mapper_->createOrGetPhysicalFromLogical( - create_index->input()), - create_index->index_name(), - create_index->index_attributes(), - create_index->index_description()); + *physical_output = P::CreateIndex::Create( + physical_mapper_->createOrGetPhysicalFromLogical(create_index->input()), + create_index->index_name(), + create_index->index_attributes(), + create_index->index_description()); return true; } case L::LogicalType::kCreateTable: { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index 5ad9c3b..0d0fe41 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -71,6 +71,7 @@ add_library(quickstep_relationaloperators_SortMergeRunOperatorHelpers SortMergeR SortMergeRunOperatorHelpers.hpp) add_library(quickstep_relationaloperators_SortRunGenerationOperator SortRunGenerationOperator.cpp SortRunGenerationOperator.hpp) +add_library(quickstep_relationaloperators_TableExportOperator TableExportOperator.cpp TableExportOperator.hpp) add_library(quickstep_relationaloperators_TableGeneratorOperator TableGeneratorOperator.cpp TableGeneratorOperator.hpp) add_library(quickstep_relationaloperators_TextScanOperator TextScanOperator.cpp TextScanOperator.hpp) add_library(quickstep_relationaloperators_UnionAllOperator UnionAllOperator.cpp UnionAllOperator.hpp) @@ -473,6 +474,25 @@ target_link_libraries(quickstep_relationaloperators_SortRunGenerationOperator quickstep_utility_Macros quickstep_utility_SortConfiguration tmb) +target_link_libraries(quickstep_relationaloperators_TableExportOperator + glog + quickstep_catalog_CatalogAttribute + quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogTypedefs + quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer + quickstep_queryexecution_WorkOrdersContainer + quickstep_relationaloperators_RelationalOperator + quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto + quickstep_storage_StorageBlockInfo + quickstep_storage_ValueAccessor + quickstep_threading_SpinMutex + quickstep_types_TypedValue + quickstep_types_containers_Tuple + quickstep_utility_BulkIOConfiguration + quickstep_utility_Macros + quickstep_utility_StringUtil) target_link_libraries(quickstep_relationaloperators_TableGeneratorOperator glog quickstep_catalog_CatalogRelation @@ -508,6 +528,7 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator quickstep_types_containers_ColumnVector quickstep_types_containers_ColumnVectorsValueAccessor quickstep_types_containers_Tuple + quickstep_utility_BulkIOConfiguration quickstep_utility_Glob quickstep_utility_Macros tmb) @@ -635,6 +656,7 @@ target_link_libraries(quickstep_relationaloperators quickstep_relationaloperators_SortMergeRunOperatorHelpers quickstep_relationaloperators_SortMergeRunOperator_proto quickstep_relationaloperators_SortRunGenerationOperator + quickstep_relationaloperators_TableExportOperator quickstep_relationaloperators_TableGeneratorOperator quickstep_relationaloperators_TextScanOperator quickstep_relationaloperators_UnionAllOperator http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/relational_operators/RelationalOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp index 425fa32..64397c7 100644 --- a/relational_operators/RelationalOperator.hpp +++ b/relational_operators/RelationalOperator.hpp @@ -85,6 +85,7 @@ class RelationalOperator { kSelect, kSortMergeRun, kSortRunGeneration, + kTableExport, kTableGenerator, kTextScan, kUnionAll, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/relational_operators/TableExportOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/TableExportOperator.cpp b/relational_operators/TableExportOperator.cpp new file mode 100644 index 0000000..03e260f --- /dev/null +++ b/relational_operators/TableExportOperator.cpp @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "relational_operators/TableExportOperator.hpp" + +#include <cstdio> +#include <exception> +#include <string> +#include <utility> + +#include "catalog/CatalogAttribute.hpp" +#include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" +#include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" +#include "storage/StorageBlockInfo.hpp" +#include "storage/ValueAccessor.hpp" +#include "threading/SpinMutex.hpp" +#include "types/TypedValue.hpp" +#include "types/containers/Tuple.hpp" +#include "utility/BulkIOConfiguration.hpp" +#include "utility/StringUtil.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace quickstep { + +bool TableExportOperator::getAllWorkOrders( + WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) { + if (query_context_ == nullptr) { + query_context_ = query_context; + } + + const auto add_work_order = [&](const block_id input_block_id) -> void { + std::unique_ptr<std::string> output_buffer = std::make_unique<std::string>(); + container->addNormalWorkOrder( + new TableExportWorkOrder(query_id_, + input_relation_, + input_block_id, + options_->getFormat(), + options_->getDelimiter(), + options_->escapeStrings(), + options_->getQuoteCharacter(), + options_->getNullString(), + op_index_, + scheduler_client_id, + storage_manager, + bus, + output_buffer.get()), + op_index_); + query_context->setBlockOutputTextBuffer(input_block_id, output_buffer.release()); + }; + + if (input_relation_is_stored_) { + if (!started_) { + for (const block_id input_block_id : input_relation_block_ids_) { + add_work_order(input_block_id); + } + started_ = true; + } + return true; + } else { + while (num_workorders_generated_ < input_relation_block_ids_.size()) { + add_work_order(input_relation_block_ids_[num_workorders_generated_]); + ++num_workorders_generated_; + } + return done_feeding_input_relation_; + } +} + +bool TableExportOperator::getAllWorkOrderProtos( + WorkOrderProtosContainer *container) { + // TODO(quickstep-team): Implement TextExportOperator for the distributed case. + LOG(FATAL) << "TableExportOperator::getAllWorkOrderProtos() is not supported"; +} + +void TableExportOperator::receiveFeedbackMessage( + const WorkOrder::FeedbackMessage &msg) { + DCHECK(TableExportOperator::kBlockOutputMessage == msg.type()); + DCHECK(msg.payload_size() == sizeof(block_id)); + + if (file_ == nullptr) { + const std::string lo_file_name = ToLower(file_name_); + if (lo_file_name == "$stdout") { + file_ = stdout; + } else if (lo_file_name == "$stderr") { + file_ = stderr; + } else { + file_ = std::fopen(file_name_.substr(1).c_str(), "wb"); + // TODO(quickstep-team): Decent handling of exceptions at query runtime. + if (file_ == nullptr) { + throw std::runtime_error("Can not open file " + file_name_ + " for writing"); + } + } + } + + block_id completed_block_id = *static_cast<const block_id*>(msg.payload()); + outputs_.emplace(completed_block_id, std::unique_ptr<std::string>( + query_context_->releaseBlockOutputTextBuffer(completed_block_id))); + + while (true) { + block_id next_block_id; + { + SpinMutexLock lock(block_ids_mutex_); + next_block_id = input_relation_block_ids_[num_blocks_written_]; + } + auto it = outputs_.find(next_block_id); + if (it == outputs_.end()) { + break; + } + std::fwrite(it->second->c_str(), 1, it->second->length(), file_); + ++num_blocks_written_; + outputs_.erase(it); + } +} + +void TableExportOperator::updateCatalogOnCompletion() { + if (file_ != nullptr && file_ != stdout && file_ != stderr) { + std::fclose(file_); + } + file_ = nullptr; +} + +void TableExportWorkOrder::execute() { + BlockReference block( + storage_manager_->getBlock(input_block_id_, input_relation_)); + std::unique_ptr<ValueAccessor> accessor( + block->getTupleStorageSubBlock().createValueAccessor()); + + switch (format_) { + case BulkIOFormat::kCSV: + writeToString<&TableExportWorkOrder::quoteCSVField>( + accessor.get(), output_buffer_); + break; + case BulkIOFormat::kText: + writeToString<&TableExportWorkOrder::escapeTextField>( + accessor.get(), output_buffer_); + break; + default: + LOG(FATAL) << "Unsupported export format in TableExportWorkOrder::execute()"; + } + + // Send completion message to operator. + FeedbackMessage msg(TableExportOperator::kBlockOutputMessage, + getQueryID(), + operator_index_, + new block_id(input_block_id_), + sizeof(input_block_id_)); + SendFeedbackMessage( + bus_, ClientIDMap::Instance()->getValue(), scheduler_client_id_, msg); +} + +inline std::string TableExportWorkOrder::quoteCSVField(std::string &&field) const { + bool need_quote = false; + for (const char c : field) { + if (c == column_delimiter_ || c == quote_character_) { + need_quote = true; + break; + } + } + if (!need_quote) { + return std::move(field); + } + + std::string quoted; + quoted.push_back(quote_character_); + for (const char c : field) { + if (c == quote_character_) { + quoted.push_back(c); + } + quoted.push_back(c); + } + quoted.push_back(quote_character_); + return quoted; +} + + +inline std::string TableExportWorkOrder::escapeTextField(std::string &&field) const { + if (escape_strings_ == false || field == "\\N") { + return std::move(field); + } + bool need_escape = false; + for (const unsigned char c : field) { + if (c < ' ' || c == '\\' || c == column_delimiter_) { + need_escape = true; + break; + } + } + if (!need_escape) { + return std::move(field); + } + + std::string escaped; + for (const unsigned char c : field) { + if (c < 32) { + switch (c) { + case '\b': + // Backspace. + escaped.append("\\b"); + break; + case '\f': + // Form-feed. + escaped.append("\\f"); + break; + case '\n': + // Newline. + escaped.append("\\n"); + break; + case '\r': + // Carriage return. + escaped.append("\\r"); + break; + case '\t': + // Tab. + escaped.append("\\t"); + break; + case '\v': + // Vertical tab + escaped.append("\\v"); + break; + default: { + // Use hexidecimal representation. + static const std::string digits = "0123456789ABCDEF"; + escaped.append("\\x"); + escaped.push_back(digits.at(c >> 4)); + escaped.push_back(digits.at(c & 0xF)); + break; + } + } + } else { + if (c == '\\' || c == column_delimiter_) { + escaped.push_back('\\'); + } + escaped.push_back(c); + } + } + return escaped; +} + +template <std::string (TableExportWorkOrder::*transform)(std::string&&) const, + typename Container, typename Functor> +inline void TableExportWorkOrder::writeEachToString(const Container &container, + std::string *output, + const Functor &functor) const { + auto it = container.begin(); + if (it != container.end()) { + std::size_t idx = 0; + output->append((this->*transform)(functor(*it, idx++))); + while ((++it) != container.end()) { + output->push_back(column_delimiter_); + output->append((this->*transform)(functor(*it, idx++))); + } + } +} + +template <std::string (TableExportWorkOrder::*transform)(std::string&&) const> +void TableExportWorkOrder::writeToString( + ValueAccessor *accessor, std::string *output) const { + std::vector<const Type*> value_types; + value_types.reserve(input_relation_.size()); + for (const CatalogAttribute &attribute : input_relation_) { + value_types.emplace_back(&attribute.getType()); + } + + accessor->beginIterationVirtual(); + while (accessor->nextVirtual()) { + std::unique_ptr<Tuple> tuple(accessor->getTupleVirtual()); + writeEachToString<transform>( + *tuple, output, + [&](const TypedValue &value, const std::size_t idx) -> std::string { + if (value.isNull()) { + return null_string_; + } else { + return value_types[idx]->printValueToString(value); + } + }); + output->push_back('\n'); + } +} + +} // namespace quickstep