http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/parser/preprocessed/SqlParser_gen.hpp
----------------------------------------------------------------------
diff --git a/parser/preprocessed/SqlParser_gen.hpp 
b/parser/preprocessed/SqlParser_gen.hpp
index a6d12e2..f6b5247 100644
--- a/parser/preprocessed/SqlParser_gen.hpp
+++ b/parser/preprocessed/SqlParser_gen.hpp
@@ -94,90 +94,91 @@ extern int quickstep_yydebug;
     TOKEN_DECIMAL = 304,
     TOKEN_DEFAULT = 305,
     TOKEN_DELETE = 306,
-    TOKEN_DELIMITER = 307,
-    TOKEN_DESC = 308,
-    TOKEN_DISTINCT = 309,
-    TOKEN_DOUBLE = 310,
-    TOKEN_DROP = 311,
-    TOKEN_ELSE = 312,
-    TOKEN_END = 313,
-    TOKEN_ESCAPE_STRINGS = 314,
-    TOKEN_EXISTS = 315,
-    TOKEN_EXTRACT = 316,
-    TOKEN_FALSE = 317,
-    TOKEN_FIRST = 318,
-    TOKEN_FLOAT = 319,
-    TOKEN_FOLLOWING = 320,
-    TOKEN_FOR = 321,
-    TOKEN_FOREIGN = 322,
-    TOKEN_FROM = 323,
-    TOKEN_FULL = 324,
-    TOKEN_GROUP = 325,
-    TOKEN_HASH = 326,
-    TOKEN_HAVING = 327,
-    TOKEN_HOUR = 328,
-    TOKEN_IN = 329,
-    TOKEN_INDEX = 330,
-    TOKEN_INNER = 331,
-    TOKEN_INSERT = 332,
-    TOKEN_INTEGER = 333,
-    TOKEN_INTERVAL = 334,
-    TOKEN_INTO = 335,
-    TOKEN_JOIN = 336,
-    TOKEN_KEY = 337,
-    TOKEN_LAST = 338,
-    TOKEN_LEFT = 339,
-    TOKEN_LIMIT = 340,
-    TOKEN_LONG = 341,
-    TOKEN_MINUTE = 342,
-    TOKEN_MONTH = 343,
-    TOKEN_NULL = 344,
-    TOKEN_NULLS = 345,
-    TOKEN_OFF = 346,
-    TOKEN_ON = 347,
-    TOKEN_ORDER = 348,
-    TOKEN_OUTER = 349,
-    TOKEN_OVER = 350,
-    TOKEN_PARTITION = 351,
-    TOKEN_PARTITIONS = 352,
-    TOKEN_PERCENT = 353,
-    TOKEN_PRECEDING = 354,
-    TOKEN_PRIMARY = 355,
-    TOKEN_PRIORITY = 356,
-    TOKEN_QUIT = 357,
-    TOKEN_RANGE = 358,
-    TOKEN_REAL = 359,
-    TOKEN_REFERENCES = 360,
-    TOKEN_RIGHT = 361,
-    TOKEN_ROW = 362,
-    TOKEN_ROW_DELIMITER = 363,
-    TOKEN_ROWS = 364,
-    TOKEN_SECOND = 365,
-    TOKEN_SELECT = 366,
-    TOKEN_SET = 367,
-    TOKEN_SMA = 368,
-    TOKEN_SMALLINT = 369,
+    TOKEN_DESC = 307,
+    TOKEN_DISTINCT = 308,
+    TOKEN_DOUBLE = 309,
+    TOKEN_DROP = 310,
+    TOKEN_ELSE = 311,
+    TOKEN_END = 312,
+    TOKEN_EXISTS = 313,
+    TOKEN_EXTRACT = 314,
+    TOKEN_FALSE = 315,
+    TOKEN_FIRST = 316,
+    TOKEN_FLOAT = 317,
+    TOKEN_FOLLOWING = 318,
+    TOKEN_FOR = 319,
+    TOKEN_FOREIGN = 320,
+    TOKEN_FROM = 321,
+    TOKEN_FULL = 322,
+    TOKEN_GROUP = 323,
+    TOKEN_HASH = 324,
+    TOKEN_HAVING = 325,
+    TOKEN_HOUR = 326,
+    TOKEN_IN = 327,
+    TOKEN_INDEX = 328,
+    TOKEN_INNER = 329,
+    TOKEN_INSERT = 330,
+    TOKEN_INTEGER = 331,
+    TOKEN_INTERVAL = 332,
+    TOKEN_INTO = 333,
+    TOKEN_JOIN = 334,
+    TOKEN_KEY = 335,
+    TOKEN_LAST = 336,
+    TOKEN_LEFT = 337,
+    TOKEN_LIMIT = 338,
+    TOKEN_LONG = 339,
+    TOKEN_MINUTE = 340,
+    TOKEN_MONTH = 341,
+    TOKEN_NULL = 342,
+    TOKEN_NULLS = 343,
+    TOKEN_OFF = 344,
+    TOKEN_ON = 345,
+    TOKEN_ORDER = 346,
+    TOKEN_OUTER = 347,
+    TOKEN_OVER = 348,
+    TOKEN_PARTITION = 349,
+    TOKEN_PARTITIONS = 350,
+    TOKEN_PERCENT = 351,
+    TOKEN_PRECEDING = 352,
+    TOKEN_PRIMARY = 353,
+    TOKEN_PRIORITY = 354,
+    TOKEN_QUIT = 355,
+    TOKEN_RANGE = 356,
+    TOKEN_REAL = 357,
+    TOKEN_REFERENCES = 358,
+    TOKEN_RIGHT = 359,
+    TOKEN_ROW = 360,
+    TOKEN_ROW_DELIMITER = 361,
+    TOKEN_ROWS = 362,
+    TOKEN_SECOND = 363,
+    TOKEN_SELECT = 364,
+    TOKEN_SET = 365,
+    TOKEN_SMA = 366,
+    TOKEN_SMALLINT = 367,
+    TOKEN_STDERR = 368,
+    TOKEN_STDOUT = 369,
     TOKEN_SUBSTRING = 370,
     TOKEN_TABLE = 371,
     TOKEN_THEN = 372,
     TOKEN_TIME = 373,
     TOKEN_TIMESTAMP = 374,
-    TOKEN_TRUE = 375,
-    TOKEN_TUPLESAMPLE = 376,
-    TOKEN_UNBOUNDED = 377,
-    TOKEN_UNIQUE = 378,
-    TOKEN_UPDATE = 379,
-    TOKEN_USING = 380,
-    TOKEN_VALUES = 381,
-    TOKEN_VARCHAR = 382,
-    TOKEN_WHEN = 383,
-    TOKEN_WHERE = 384,
-    TOKEN_WINDOW = 385,
-    TOKEN_WITH = 386,
-    TOKEN_YEAR = 387,
-    TOKEN_YEARMONTH = 388,
-    TOKEN_EOF = 389,
-    TOKEN_LEX_ERROR = 390
+    TOKEN_TO = 375,
+    TOKEN_TRUE = 376,
+    TOKEN_TUPLESAMPLE = 377,
+    TOKEN_UNBOUNDED = 378,
+    TOKEN_UNIQUE = 379,
+    TOKEN_UPDATE = 380,
+    TOKEN_USING = 381,
+    TOKEN_VALUES = 382,
+    TOKEN_VARCHAR = 383,
+    TOKEN_WHEN = 384,
+    TOKEN_WHERE = 385,
+    TOKEN_WINDOW = 386,
+    TOKEN_WITH = 387,
+    TOKEN_YEAR = 388,
+    TOKEN_YEARMONTH = 389,
+    TOKEN_EOF = 390,
+    TOKEN_LEX_ERROR = 391
   };
 #endif
 
@@ -237,8 +238,7 @@ union YYSTYPE
   quickstep::ParseKeyStringValue *key_string_value_;
   quickstep::ParseKeyStringList *key_string_list_;
   quickstep::ParseKeyIntegerValue *key_integer_value_;
-
-  quickstep::ParseCopyFromParams *copy_from_params_;
+  quickstep::ParseKeyBoolValue *key_bool_value_;
 
   quickstep::ParseAssignment *assignment_;
   quickstep::PtrList<quickstep::ParseAssignment> *assignment_list_;
@@ -251,7 +251,7 @@ union YYSTYPE
   quickstep::ParseStatementUpdate *update_statement_;
   quickstep::ParseStatementInsert *insert_statement_;
   quickstep::ParseStatementDelete *delete_statement_;
-  quickstep::ParseStatementCopyFrom *copy_from_statement_;
+  quickstep::ParseStatementCopy *copy_statement_;
   quickstep::ParseStatementCreateTable *create_table_statement_;
   quickstep::ParsePartitionClause *partition_clause_;
   quickstep::ParseBlockProperties *block_properties_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 4c3b52a..e5eaccf 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -215,6 +215,7 @@ target_link_libraries(quickstep_queryexecution_QueryContext
                       quickstep_storage_HashTableFactory
                       quickstep_storage_InsertDestination
                       quickstep_storage_InsertDestination_proto
+                      quickstep_storage_StorageBlockInfo
                       quickstep_storage_WindowAggregationOperationState
                       quickstep_threading_SpinSharedMutex
                       quickstep_types_TypedValue

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 7876821..b146e13 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -23,6 +23,7 @@
 #include <cstddef>
 #include <cstdint>
 #include <memory>
+#include <string>
 #include <unordered_map>
 #include <vector>
 
@@ -33,6 +34,7 @@
 #include "storage/AggregationOperationState.hpp"
 #include "storage/HashTable.hpp"
 #include "storage/InsertDestination.hpp"
+#include "storage/StorageBlockInfo.hpp"
 #include "storage/WindowAggregationOperationState.hpp"
 #include "threading/SpinSharedMutex.hpp"
 #include "types/containers/Tuple.hpp"
@@ -571,6 +573,36 @@ class QueryContext {
   }
 
   /**
+   * @brief Release the output text buffer for the specified block.
+   *
+   * @param id The id of the block to release its output text buffer.
+   *
+   * @return The output text buffer for the specified block. Caller should take
+   *         ownership of the returned object.
+   **/
+  inline std::string* releaseBlockOutputTextBuffer(const block_id id) {
+    SpinSharedMutexExclusiveLock<false> lock(block_output_text_buffer_mutex_);
+    auto it = block_output_text_buffers_.find(id);
+    DCHECK(it != block_output_text_buffers_.end());
+    std::unique_ptr<std::string> output = std::move(it->second);
+    block_output_text_buffers_.erase(it);
+    return output.release();
+  }
+
+  /**
+   * @brief Set the output text buffer for the specified block.
+   *
+   * @param id The id of the block.
+   * @param output The output buffer to set. Caller relinquishes ownership of
+   *        \p output by calling this method.
+   **/
+  inline void setBlockOutputTextBuffer(const block_id id, std::string *output) 
{
+    SpinSharedMutexExclusiveLock<false> lock(block_output_text_buffer_mutex_);
+    DCHECK(block_output_text_buffers_.find(id) == 
block_output_text_buffers_.end());
+    block_output_text_buffers_.emplace(id, 
std::unique_ptr<std::string>(output));
+  }
+
+  /**
    * @brief Get the total memory footprint of the temporary data structures
    *        used for query execution (e.g. join hash tables, aggregation hash
    *        tables) in bytes.
@@ -634,10 +666,12 @@ class QueryContext {
   std::vector<std::unique_ptr<Tuple>> tuples_;
   std::vector<std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>> 
update_groups_;
   std::vector<std::unique_ptr<WindowAggregationOperationState>> 
window_aggregation_states_;
+  std::unordered_map<block_id, std::unique_ptr<std::string>> 
block_output_text_buffers_;
 
   mutable SpinSharedMutex<false> hash_tables_mutex_;
   mutable SpinSharedMutex<false> aggregation_states_mutex_;
   mutable SpinSharedMutex<false> insert_destinations_mutex_;
+  mutable SpinSharedMutex<false> block_output_text_buffer_mutex_;
 
   DISALLOW_COPY_AND_ASSIGN(QueryContext);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index fdf8796..4ea21b2 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -94,6 +94,7 @@ 
target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       
quickstep_queryoptimizer_expressions_WindowAggregateFunction
                       quickstep_queryoptimizer_physical_Aggregate
                       quickstep_queryoptimizer_physical_CopyFrom
+                      quickstep_queryoptimizer_physical_CopyTo
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
                       
quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
@@ -140,6 +141,7 @@ 
target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_relationaloperators_SelectOperator
                       quickstep_relationaloperators_SortMergeRunOperator
                       quickstep_relationaloperators_SortRunGenerationOperator
+                      quickstep_relationaloperators_TableExportOperator
                       quickstep_relationaloperators_TableGeneratorOperator
                       quickstep_relationaloperators_TextScanOperator
                       quickstep_relationaloperators_UnionAllOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp 
b/query_optimizer/ExecutionGenerator.cpp
index 9bfd136..bf26a5b 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -78,6 +78,7 @@
 #include "query_optimizer/expressions/WindowAggregateFunction.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/CopyFrom.hpp"
+#include "query_optimizer/physical/CopyTo.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
 #include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
@@ -124,6 +125,7 @@
 #include "relational_operators/SelectOperator.hpp"
 #include "relational_operators/SortMergeRunOperator.hpp"
 #include "relational_operators/SortRunGenerationOperator.hpp"
+#include "relational_operators/TableExportOperator.hpp"
 #include "relational_operators/TableGeneratorOperator.hpp"
 #include "relational_operators/TextScanOperator.hpp"
 #include "relational_operators/UnionAllOperator.hpp"
@@ -409,6 +411,9 @@ void ExecutionGenerator::generatePlanInternal(
     case P::PhysicalType::kCopyFrom:
       return convertCopyFrom(
           std::static_pointer_cast<const P::CopyFrom>(physical_plan));
+    case P::PhysicalType::kCopyTo:
+      return convertCopyTo(
+          std::static_pointer_cast<const P::CopyTo>(physical_plan));
     case P::PhysicalType::kCreateIndex:
       return convertCreateIndex(
           std::static_pointer_cast<const P::CreateIndex>(physical_plan));
@@ -1238,8 +1243,7 @@ void ExecutionGenerator::convertCopyFrom(
           new TextScanOperator(
               query_handle_->query_id(),
               physical_plan->file_name(),
-              physical_plan->column_delimiter(),
-              physical_plan->escape_strings(),
+              physical_plan->options(),
               *output_relation,
               insert_destination_index));
   insert_destination_proto->set_relational_op_index(scan_operator_index);
@@ -1254,6 +1258,40 @@ void ExecutionGenerator::convertCopyFrom(
                                        false /* is_pipeline_breaker */);
 }
 
+void ExecutionGenerator::convertCopyTo(const P::CopyToPtr &physical_plan) {
+  // CopyTo is converted to a TableExport operator.
+
+  const CatalogRelation *input_relation;
+  bool input_relation_is_stored;
+
+  const P::PhysicalPtr &input = physical_plan->input();
+  P::TableReferencePtr table_reference;
+  const CatalogRelationInfo *input_relation_info = nullptr;
+  if (P::SomeTableReference::MatchesWithConditionalCast(input, 
&table_reference)) {
+    input_relation = table_reference->relation();
+    input_relation_is_stored = true;
+  } else {
+    input_relation_info = findRelationInfoOutputByPhysical(input);
+    input_relation = input_relation_info->relation;
+    input_relation_is_stored = false;
+  }
+
+  DCHECK(input_relation != nullptr);
+  const QueryPlan::DAGNodeIndex table_export_operator_index =
+      execution_plan_->addRelationalOperator(
+          new TableExportOperator(query_handle_->query_id(),
+                                  *input_relation,
+                                  input_relation_is_stored,
+                                  physical_plan->file_name(),
+                                  physical_plan->options()));
+  if (!input_relation_is_stored) {
+    DCHECK(input_relation_info != nullptr);
+    execution_plan_->addDirectDependency(table_export_operator_index,
+                                         
input_relation_info->producer_operator_index,
+                                         false /* is_pipeline_breaker */);
+  }
+}
+
 void ExecutionGenerator::convertCreateIndex(
   const P::CreateIndexPtr &physical_plan) {
   // CreateIndex is converted to a CreateIndex operator.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp 
b/query_optimizer/ExecutionGenerator.hpp
index 19e75c1..bc9f88b 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -44,6 +44,7 @@
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/CopyFrom.hpp"
+#include "query_optimizer/physical/CopyTo.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
 #include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
@@ -282,6 +283,13 @@ class ExecutionGenerator {
   void convertCopyFrom(const physical::CopyFromPtr &physical_plan);
 
   /**
+   * @brief Converts a CopyTo to a TableExport operator.
+   *
+   * @param physical_plan The CopyTo to be converted.
+   */
+  void convertCopyTo(const physical::CopyToPtr &physical_plan);
+
+  /**
    * @brief Converts a CreateIndex to a CreateIndex operator.
    *
    * @param physical_plan The CreateIndex to be converted.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/logical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/CMakeLists.txt 
b/query_optimizer/logical/CMakeLists.txt
index 4480e0b..b3c9e36 100644
--- a/query_optimizer/logical/CMakeLists.txt
+++ b/query_optimizer/logical/CMakeLists.txt
@@ -19,6 +19,7 @@
 add_library(quickstep_queryoptimizer_logical_Aggregate Aggregate.cpp 
Aggregate.hpp)
 add_library(quickstep_queryoptimizer_logical_BinaryJoin BinaryJoin.cpp 
BinaryJoin.hpp)
 add_library(quickstep_queryoptimizer_logical_CopyFrom CopyFrom.cpp 
CopyFrom.hpp)
+add_library(quickstep_queryoptimizer_logical_CopyTo CopyTo.cpp CopyTo.hpp)
 add_library(quickstep_queryoptimizer_logical_CreateIndex CreateIndex.cpp 
CreateIndex.hpp)
 add_library(quickstep_queryoptimizer_logical_CreateTable CreateTable.cpp 
CreateTable.hpp)
 add_library(quickstep_queryoptimizer_logical_DeleteTuples DeleteTuples.cpp 
DeleteTuples.hpp)
@@ -74,6 +75,16 @@ 
target_link_libraries(quickstep_queryoptimizer_logical_CopyFrom
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_logical_LogicalType
+                      quickstep_utility_BulkIOConfiguration
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil)
+target_link_libraries(quickstep_queryoptimizer_logical_CopyTo
+                      glog
+                      quickstep_queryoptimizer_OptimizerTree
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_logical_Logical
+                      quickstep_queryoptimizer_logical_LogicalType
+                      quickstep_utility_BulkIOConfiguration
                       quickstep_utility_Macros
                       quickstep_utility_StringUtil)
 target_link_libraries(quickstep_queryoptimizer_logical_CreateIndex
@@ -290,6 +301,7 @@ target_link_libraries(quickstep_queryoptimizer_logical
                       quickstep_queryoptimizer_logical_Aggregate
                       quickstep_queryoptimizer_logical_BinaryJoin
                       quickstep_queryoptimizer_logical_CopyFrom
+                      quickstep_queryoptimizer_logical_CopyTo
                       quickstep_queryoptimizer_logical_CreateIndex
                       quickstep_queryoptimizer_logical_CreateTable
                       quickstep_queryoptimizer_logical_DeleteTuples

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/logical/CopyFrom.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/CopyFrom.cpp 
b/query_optimizer/logical/CopyFrom.cpp
index b0a1423..a80c701 100644
--- a/query_optimizer/logical/CopyFrom.cpp
+++ b/query_optimizer/logical/CopyFrom.cpp
@@ -44,11 +44,11 @@ void CopyFrom::getFieldStringItems(
   inline_field_values->push_back(file_name_);
 
   inline_field_names->push_back("column_delimiter");
-  inline_field_values->push_back("\"" + EscapeSpecialChars(std::string(1, 
column_delimiter_)) +
-                                 "\"");
+  inline_field_values->push_back(
+      "\"" + EscapeSpecialChars(std::string(1, options_->getDelimiter())) + 
"\"");
 
   inline_field_names->push_back("escape_strings");
-  inline_field_values->push_back(escape_strings_ ? "true" : "false");
+  inline_field_values->push_back(options_->escapeStrings() ? "true" : "false");
 }
 
 }  // namespace logical

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/logical/CopyFrom.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/CopyFrom.hpp 
b/query_optimizer/logical/CopyFrom.hpp
index 7c5907f..e0545dc 100644
--- a/query_optimizer/logical/CopyFrom.hpp
+++ b/query_optimizer/logical/CopyFrom.hpp
@@ -17,8 +17,8 @@
  * under the License.
  **/
 
-#ifndef QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPYFROM_HPP_
-#define QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPYFROM_HPP_
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_FROM_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_FROM_HPP_
 
 #include <memory>
 #include <string>
@@ -28,6 +28,7 @@
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/logical/LogicalType.hpp"
+#include "utility/BulkIOConfiguration.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -66,20 +67,14 @@ class CopyFrom : public Logical {
   const std::string& file_name() const { return file_name_; }
 
   /**
-   * @return The delimiter used in the text file to separate columns.
+   * @return The options for this COPY FROM statement.
    */
-  const char column_delimiter() const { return column_delimiter_; }
-
-  /**
-   * @return Whether to decode escape sequences in the text file.
-   */
-  bool escape_strings() const { return escape_strings_; }
+  BulkIOConfigurationPtr options() const { return options_; }
 
   LogicalPtr copyWithNewChildren(
       const std::vector<LogicalPtr> &new_children) const override {
     DCHECK(new_children.empty());
-    return Create(catalog_relation_, file_name_, column_delimiter_,
-                  escape_strings_);
+    return Create(catalog_relation_, file_name_, options_);
   }
 
   std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const 
override {
@@ -95,19 +90,13 @@ class CopyFrom : public Logical {
    *
    * @param catalog_relation The catalog relation to insert the tuples to.
    * @param file_name The name of the file to read the data from.
-   * @param column_delimiter The delimiter used in the text file to separate
-   *                         columns.
-   * @param escape_strings Whether to decode escape sequences in the text file.
+   * @param options The options for this COPY FROM statement.
    * @return An immutable CopyFrom logical node.
    */
   static CopyFromPtr Create(const CatalogRelation *catalog_relation,
                             const std::string &file_name,
-                            const char column_delimiter,
-                            bool escape_strings) {
-    return CopyFromPtr(new CopyFrom(catalog_relation,
-                                    file_name,
-                                    column_delimiter,
-                                    escape_strings));
+                            const BulkIOConfigurationPtr &options) {
+    return CopyFromPtr(new CopyFrom(catalog_relation, file_name, options));
   }
 
  protected:
@@ -122,18 +111,14 @@ class CopyFrom : public Logical {
  private:
   CopyFrom(const CatalogRelation *catalog_relation,
            const std::string &file_name,
-           const char column_delimiter,
-           bool escape_strings)
+           const BulkIOConfigurationPtr &options)
       : catalog_relation_(catalog_relation),
         file_name_(file_name),
-        column_delimiter_(column_delimiter),
-        escape_strings_(escape_strings) {}
+        options_(options) {}
 
   const CatalogRelation *catalog_relation_;
-  std::string file_name_;
-
-  const char column_delimiter_;
-  const bool escape_strings_;
+  const std::string file_name_;
+  const BulkIOConfigurationPtr options_;
 
   DISALLOW_COPY_AND_ASSIGN(CopyFrom);
 };
@@ -144,4 +129,4 @@ class CopyFrom : public Logical {
 }  // namespace optimizer
 }  // namespace quickstep
 
-#endif /* QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPYFROM_HPP_ */
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_FROM_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/logical/CopyTo.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/CopyTo.cpp 
b/query_optimizer/logical/CopyTo.cpp
new file mode 100644
index 0000000..369f732
--- /dev/null
+++ b/query_optimizer/logical/CopyTo.cpp
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/logical/CopyTo.hpp"
+
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "utility/StringUtil.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace logical {
+
+void CopyTo::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<OptimizerTreeBaseNodePtr>> 
*container_child_fields) const {
+  inline_field_names->push_back("file_name");
+  inline_field_values->push_back(file_name_);
+
+  non_container_child_field_names->push_back("input");
+  non_container_child_fields->push_back(input_);
+
+  inline_field_names->push_back("format");
+  inline_field_values->push_back(options_->getFormatName());
+
+  inline_field_names->push_back("column_delimiter");
+  inline_field_values->push_back(
+      "\"" + EscapeSpecialChars(std::string(1, options_->getDelimiter())) + 
"\"");
+
+  if (options_->escapeStrings()) {
+    inline_field_names->push_back("escape_strings");
+    inline_field_values->push_back("true");
+  }
+
+  if (options_->hasHeader()) {
+    inline_field_names->push_back("header");
+    inline_field_values->push_back("true");
+  }
+
+  if (options_->getQuoteCharacter() != 0) {
+    inline_field_names->push_back("quote");
+    inline_field_values->push_back(std::string(1, 
options_->getQuoteCharacter()));
+  }
+
+  if (options_->getNullString() != "") {
+    inline_field_names->push_back("null_string");
+    inline_field_values->push_back(options_->getNullString());
+  }
+}
+
+}  // namespace logical
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/logical/CopyTo.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/CopyTo.hpp 
b/query_optimizer/logical/CopyTo.hpp
new file mode 100644
index 0000000..eafef0e
--- /dev/null
+++ b/query_optimizer/logical/CopyTo.hpp
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_TO_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_TO_HPP_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/logical/Logical.hpp"
+#include "query_optimizer/logical/LogicalType.hpp"
+#include "utility/BulkIOConfiguration.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace logical {
+
+/** \addtogroup OptimizerLogical
+ *  @{
+ */
+
+class CopyTo;
+typedef std::shared_ptr<const CopyTo> CopyToPtr;
+
+/**
+ * @brief Represents an operation that copies data from a relation to a text 
file.
+ */
+class CopyTo : public Logical {
+ public:
+  LogicalType getLogicalType() const override {
+    return LogicalType::kCopyTo;
+  }
+
+  std::string getName() const override {
+    return "CopyTo";
+  }
+
+  /**
+   * @return The input relation whose data is to be exported.
+   */
+  const LogicalPtr& input() const {
+    return input_;
+  }
+
+  /**
+   * @return The name of the file to write the data to.
+   */
+  const std::string& file_name() const {
+    return file_name_;
+  }
+
+  /**
+   * @return The options for this COPY TO statement.
+   */
+  BulkIOConfigurationPtr options() const {
+    return options_;
+  }
+
+  LogicalPtr copyWithNewChildren(
+      const std::vector<LogicalPtr> &new_children) const override {
+    DCHECK(new_children.size() == 1);
+    return Create(new_children.front(), file_name_, options_);
+  }
+
+  std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const 
override {
+    return {};
+  }
+
+  std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() 
const override {
+    return input_->getOutputAttributes();
+  }
+
+  /**
+   * @brief Creates a CopyTo logical node.
+   *
+   * @param input The input relation whose data is to be exported.
+   * @param file_name The name of the file to write the data to.
+   * @param options The options for this COPY TO statement.
+   * @return An immutable CopyTo logical node.
+   */
+  static CopyToPtr Create(const LogicalPtr &input,
+                          const std::string &file_name,
+                          const BulkIOConfigurationPtr &options) {
+    return CopyToPtr(new CopyTo(input, file_name, options));
+  }
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<OptimizerTreeBaseNodePtr>> 
*container_child_fields) const override;
+
+ private:
+  CopyTo(const LogicalPtr &input,
+         const std::string &file_name,
+         const BulkIOConfigurationPtr &options)
+      : input_(input),
+        file_name_(file_name),
+        options_(options) {
+    addChild(input);
+  }
+
+  const LogicalPtr input_;
+  const std::string file_name_;
+  const BulkIOConfigurationPtr options_;
+
+  DISALLOW_COPY_AND_ASSIGN(CopyTo);
+};
+
+/** @} */
+
+}  // namespace logical
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_TO_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/logical/LogicalType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/LogicalType.hpp 
b/query_optimizer/logical/LogicalType.hpp
index 21ffdca..a629637 100644
--- a/query_optimizer/logical/LogicalType.hpp
+++ b/query_optimizer/logical/LogicalType.hpp
@@ -34,6 +34,7 @@ namespace logical {
 enum class LogicalType {
   kAggregate,
   kCopyFrom,
+  kCopyTo,
   kCreateIndex,
   kCreateTable,
   kDeleteTuples,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/physical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CMakeLists.txt 
b/query_optimizer/physical/CMakeLists.txt
index e510f6b..1c8dc3d 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -19,6 +19,7 @@
 add_library(quickstep_queryoptimizer_physical_Aggregate Aggregate.cpp 
Aggregate.hpp)
 add_library(quickstep_queryoptimizer_physical_BinaryJoin BinaryJoin.cpp 
BinaryJoin.hpp)
 add_library(quickstep_queryoptimizer_physical_CopyFrom CopyFrom.cpp 
CopyFrom.hpp)
+add_library(quickstep_queryoptimizer_physical_CopyTo CopyTo.cpp CopyTo.hpp)
 add_library(quickstep_queryoptimizer_physical_CreateIndex CreateIndex.cpp 
CreateIndex.hpp)
 add_library(quickstep_queryoptimizer_physical_CreateTable CreateTable.cpp 
CreateTable.hpp)
 add_library(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
@@ -81,7 +82,18 @@ 
target_link_libraries(quickstep_queryoptimizer_physical_CopyFrom
                       quickstep_queryoptimizer_expressions_NamedExpression
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
-                      quickstep_utility_Macros)
+                      quickstep_utility_BulkIOConfiguration
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil)
+target_link_libraries(quickstep_queryoptimizer_physical_CopyTo
+                      glog
+                      quickstep_queryoptimizer_OptimizerTree
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_utility_BulkIOConfiguration
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil)
 target_link_libraries(quickstep_queryoptimizer_physical_CreateIndex
                       glog
                       quickstep_queryoptimizer_OptimizerTree
@@ -327,6 +339,7 @@ target_link_libraries(quickstep_queryoptimizer_physical
                       quickstep_queryoptimizer_physical_Aggregate
                       quickstep_queryoptimizer_physical_BinaryJoin
                       quickstep_queryoptimizer_physical_CopyFrom
+                      quickstep_queryoptimizer_physical_CopyTo
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
                       
quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/physical/CopyFrom.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CopyFrom.cpp 
b/query_optimizer/physical/CopyFrom.cpp
index 8448d4e..65279fe 100644
--- a/query_optimizer/physical/CopyFrom.cpp
+++ b/query_optimizer/physical/CopyFrom.cpp
@@ -24,6 +24,7 @@
 
 #include "catalog/CatalogRelation.hpp"
 #include "query_optimizer/OptimizerTree.hpp"
+#include "utility/StringUtil.hpp"
 
 namespace quickstep {
 namespace optimizer {
@@ -43,10 +44,11 @@ void CopyFrom::getFieldStringItems(
   inline_field_values->push_back(file_name_);
 
   inline_field_names->push_back("column_delimiter");
-  inline_field_values->push_back(std::string(1, column_delimiter_));
+  inline_field_values->push_back(
+      "\"" + EscapeSpecialChars(std::string(1, options_->getDelimiter())) + 
"\"");
 
   inline_field_names->push_back("escape_strings");
-  inline_field_values->push_back(escape_strings_ ? "true" : "false");
+  inline_field_values->push_back(options_->escapeStrings() ? "true" : "false");
 }
 
 }  // namespace physical

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/physical/CopyFrom.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CopyFrom.hpp 
b/query_optimizer/physical/CopyFrom.hpp
index ecbf318..6aa60cb 100644
--- a/query_optimizer/physical/CopyFrom.hpp
+++ b/query_optimizer/physical/CopyFrom.hpp
@@ -30,6 +30,7 @@
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
+#include "utility/BulkIOConfiguration.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -68,22 +69,16 @@ class CopyFrom : public Physical {
   const std::string& file_name() const { return file_name_; }
 
   /**
-   * @return The delimiter used in the text file to separate columns.
+   * @return The options for this COPY FROM statement.
    */
-  const char column_delimiter() const { return column_delimiter_; }
-
-  /**
-   * @return Whether to decode escape sequences in the text file.
-   */
-  bool escape_strings() const { return escape_strings_; }
+  BulkIOConfigurationPtr options() const { return options_; }
 
   PhysicalPtr copyWithNewChildren(
       const std::vector<PhysicalPtr> &new_children) const override {
     DCHECK(new_children.empty());
     return Create(catalog_relation_,
                   file_name_,
-                  column_delimiter_,
-                  escape_strings_);
+                  options_);
   }
 
   std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const 
override {
@@ -112,12 +107,8 @@ class CopyFrom : public Physical {
    */
   static CopyFromPtr Create(const CatalogRelation *catalog_relation,
                             const std::string &file_name,
-                            const char &column_delimiter,
-                            bool escape_strings) {
-    return CopyFromPtr(new CopyFrom(catalog_relation,
-                                    file_name,
-                                    column_delimiter,
-                                    escape_strings));
+                            const BulkIOConfigurationPtr &options) {
+    return CopyFromPtr(new CopyFrom(catalog_relation, file_name, options));
   }
 
  protected:
@@ -132,18 +123,14 @@ class CopyFrom : public Physical {
  private:
   CopyFrom(const CatalogRelation *catalog_relation,
            const std::string &file_name,
-           const char column_delimiter,
-           bool escape_strings)
+           const BulkIOConfigurationPtr &options)
       : catalog_relation_(catalog_relation),
         file_name_(file_name),
-        column_delimiter_(column_delimiter),
-        escape_strings_(escape_strings) {}
+        options_(options) {}
 
   const CatalogRelation *catalog_relation_;
-  std::string file_name_;
-
-  const char column_delimiter_;
-  const bool escape_strings_;
+  const std::string file_name_;
+  const BulkIOConfigurationPtr options_;
 
   DISALLOW_COPY_AND_ASSIGN(CopyFrom);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/physical/CopyTo.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CopyTo.cpp 
b/query_optimizer/physical/CopyTo.cpp
new file mode 100644
index 0000000..9cd954e
--- /dev/null
+++ b/query_optimizer/physical/CopyTo.cpp
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/physical/CopyTo.hpp"
+
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "utility/StringUtil.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+void CopyTo::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<OptimizerTreeBaseNodePtr>> 
*container_child_fields) const {
+  inline_field_names->push_back("file_name");
+  inline_field_values->push_back(file_name_);
+
+  non_container_child_field_names->push_back("input");
+  non_container_child_fields->push_back(input_);
+
+  inline_field_names->push_back("format");
+  inline_field_values->push_back(options_->getFormatName());
+
+  inline_field_names->push_back("column_delimiter");
+  inline_field_values->push_back(
+      "\"" + EscapeSpecialChars(std::string(1, options_->getDelimiter())) + 
"\"");
+
+  if (options_->escapeStrings()) {
+    inline_field_names->push_back("escape_strings");
+    inline_field_values->push_back("true");
+  }
+
+  if (options_->hasHeader()) {
+    inline_field_names->push_back("header");
+    inline_field_values->push_back("true");
+  }
+
+  if (options_->getQuoteCharacter() != 0) {
+    inline_field_names->push_back("quote");
+    inline_field_values->push_back(std::string(1, 
options_->getQuoteCharacter()));
+  }
+
+  if (options_->getNullString() != "") {
+    inline_field_names->push_back("null_string");
+    inline_field_values->push_back(options_->getNullString());
+  }
+}
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/physical/CopyTo.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CopyTo.hpp 
b/query_optimizer/physical/CopyTo.hpp
new file mode 100644
index 0000000..69d5c7a
--- /dev/null
+++ b/query_optimizer/physical/CopyTo.hpp
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPY_TO_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPY_TO_HPP_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "utility/BulkIOConfiguration.hpp"
+#include "utility/BulkIOConfiguration.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+/** \addtogroup OptimizerPhysical
+ *  @{
+ */
+
+class CopyTo;
+typedef std::shared_ptr<const CopyTo> CopyToPtr;
+
+/**
+ * @brief Represents an operation that copies data from a relation to a text 
file.
+ */
+class CopyTo : public Physical {
+ public:
+  PhysicalType getPhysicalType() const override {
+    return PhysicalType::kCopyTo;
+  }
+
+  std::string getName() const override {
+    return "CopyTo";
+  }
+
+  /**
+   * @return The input relation whose data is to be exported.
+   */
+  const PhysicalPtr& input() const {
+    return input_;
+  }
+
+  /**
+   * @return The name of the file to write the data to.
+   */
+  const std::string& file_name() const {
+    return file_name_;
+  }
+
+  /**
+   * @return The options for this COPY TO statement.
+   */
+  BulkIOConfigurationPtr options() const {
+    return options_;
+  }
+
+  PhysicalPtr copyWithNewChildren(
+      const std::vector<PhysicalPtr> &new_children) const override {
+    DCHECK(new_children.size() == 1);
+    return Create(new_children.front(), file_name_, options_);
+  }
+
+  std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const 
override {
+    return {};
+  }
+
+  std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() 
const override {
+    return input_->getOutputAttributes();
+  }
+
+  bool maybeCopyWithPrunedExpressions(
+      const expressions::UnorderedNamedExpressionSet &referenced_expressions,
+      PhysicalPtr *output) const override {
+    return false;
+  }
+
+  /**
+   * @brief Creates a CopyTo physical node.
+   *
+   * @param input The input relation whose data is to be exported.
+   * @param file_name The name of the file to write the data to.
+   * @param options The options for this COPY TO statement.
+   * @return An immutable CopyTo physical node.
+   */
+  static CopyToPtr Create(const PhysicalPtr &input,
+                          const std::string &file_name,
+                          const BulkIOConfigurationPtr &options) {
+    return CopyToPtr(new CopyTo(input, file_name, options));
+  }
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<OptimizerTreeBaseNodePtr>> 
*container_child_fields) const override;
+
+ private:
+  CopyTo(const PhysicalPtr &input,
+         const std::string &file_name,
+         const BulkIOConfigurationPtr &options)
+      : input_(input),
+        file_name_(file_name),
+        options_(options) {
+    addChild(input);
+  }
+
+  const PhysicalPtr input_;
+  const std::string file_name_;
+  const BulkIOConfigurationPtr options_;
+
+  DISALLOW_COPY_AND_ASSIGN(CopyTo);
+};
+
+/** @} */
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPY_TO_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/physical/PhysicalType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PhysicalType.hpp 
b/query_optimizer/physical/PhysicalType.hpp
index 47db7ec..ac4376e 100644
--- a/query_optimizer/physical/PhysicalType.hpp
+++ b/query_optimizer/physical/PhysicalType.hpp
@@ -34,6 +34,7 @@ namespace physical {
 enum class PhysicalType {
   kAggregate,
   kCopyFrom,
+  kCopyTo,
   kCreateIndex,
   kCreateTable,
   kCrossReferenceCoalesceAggregate,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/resolver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/CMakeLists.txt 
b/query_optimizer/resolver/CMakeLists.txt
index 4e364a6..18a03c7 100644
--- a/query_optimizer/resolver/CMakeLists.txt
+++ b/query_optimizer/resolver/CMakeLists.txt
@@ -97,6 +97,7 @@ 
target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
                       
quickstep_queryoptimizer_expressions_WindowAggregateFunction
                       quickstep_queryoptimizer_logical_Aggregate
                       quickstep_queryoptimizer_logical_CopyFrom
+                      quickstep_queryoptimizer_logical_CopyTo
                       quickstep_queryoptimizer_logical_CreateIndex
                       quickstep_queryoptimizer_logical_CreateTable
                       quickstep_queryoptimizer_logical_DeleteTuples
@@ -131,6 +132,7 @@ 
target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
                       
quickstep_types_operations_unaryoperations_DateExtractOperation
                       
quickstep_types_operations_unaryoperations_SubstringOperation
                       quickstep_types_operations_unaryoperations_UnaryOperation
+                      quickstep_utility_BulkIOConfiguration
                       quickstep_utility_Macros
                       quickstep_utility_PtrList
                       quickstep_utility_PtrVector

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp 
b/query_optimizer/resolver/Resolver.cpp
index 0f65255..4bccf89 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -93,6 +93,7 @@
 #include "query_optimizer/expressions/WindowAggregateFunction.hpp"
 #include "query_optimizer/logical/Aggregate.hpp"
 #include "query_optimizer/logical/CopyFrom.hpp"
+#include "query_optimizer/logical/CopyTo.hpp"
 #include "query_optimizer/logical/CreateIndex.hpp"
 #include "query_optimizer/logical/CreateTable.hpp"
 #include "query_optimizer/logical/DeleteTuples.hpp"
@@ -126,6 +127,7 @@
 #include "types/operations/unary_operations/DateExtractOperation.hpp"
 #include "types/operations/unary_operations/SubstringOperation.hpp"
 #include "types/operations/unary_operations/UnaryOperation.hpp"
+#include "utility/BulkIOConfiguration.hpp"
 #include "utility/PtrList.hpp"
 #include "utility/PtrVector.hpp"
 #include "utility/SqlError.hpp"
@@ -143,6 +145,44 @@ namespace E = ::quickstep::optimizer::expressions;
 namespace L = ::quickstep::optimizer::logical;
 namespace S = ::quickstep::serialization;
 
+namespace {
+
+attribute_id GetAttributeIdFromName(const PtrList<ParseAttributeDefinition> 
&attribute_definition_list,
+                                    const std::string &attribute_name) {
+  const std::string lower_attribute_name = ToLower(attribute_name);
+
+  attribute_id attr_id = 0;
+  for (const ParseAttributeDefinition &attribute_definition : 
attribute_definition_list) {
+    if (lower_attribute_name == ToLower(attribute_definition.name()->value())) 
{
+      return attr_id;
+    }
+
+    ++attr_id;
+  }
+
+  return kInvalidAttributeID;
+}
+
+const ParseString* GetKeyValueString(const ParseKeyValue &key_value) {
+  if (key_value.getKeyValueType() != ParseKeyValue::kStringString) {
+      THROW_SQL_ERROR_AT(&key_value)
+          << "Invalid value type for " << key_value.key()->value()
+          << ", expected a string.";
+  }
+  return static_cast<const ParseKeyStringValue&>(key_value).value();
+}
+
+bool GetKeyValueBool(const ParseKeyValue &key_value) {
+  if (key_value.getKeyValueType() != ParseKeyValue::kStringBool) {
+      THROW_SQL_ERROR_AT(&key_value)
+          << "Invalid value for " << key_value.key()->value()
+          << ", expected true or false.";
+  }
+  return static_cast<const ParseKeyBoolValue&>(key_value).value();
+}
+
+}  // namespace
+
 struct Resolver::ExpressionResolutionInfo {
   /**
    * @brief Constructs an ExpressionResolutionInfo that disallows aggregate
@@ -316,11 +356,25 @@ struct Resolver::SelectListInfo {
 
 L::LogicalPtr Resolver::resolve(const ParseStatement &parse_query) {
   switch (parse_query.getStatementType()) {
-    case ParseStatement::kCopyFrom:
-      context_->set_is_catalog_changed();
-      logical_plan_ = resolveCopyFrom(
-          static_cast<const ParseStatementCopyFrom&>(parse_query));
+    case ParseStatement::kCopy: {
+      const ParseStatementCopy &copy_statemnt =
+          static_cast<const ParseStatementCopy&>(parse_query);
+      if (copy_statemnt.getCopyDirection() == ParseStatementCopy::kFrom) {
+        context_->set_is_catalog_changed();
+        logical_plan_ = resolveCopyFrom(copy_statemnt);
+      } else {
+        DCHECK(copy_statemnt.getCopyDirection() == ParseStatementCopy::kTo);
+        if (copy_statemnt.with_clause() != nullptr) {
+          resolveWithClause(*copy_statemnt.with_clause());
+        }
+        logical_plan_ = resolveCopyTo(copy_statemnt);
+
+        if (copy_statemnt.with_clause() != nullptr) {
+          reportIfWithClauseUnused(*copy_statemnt.with_clause());
+        }
+      }
       break;
+    }
     case ParseStatement::kCreateTable:
       context_->set_is_catalog_changed();
       logical_plan_ = resolveCreateTable(
@@ -359,16 +413,7 @@ L::LogicalPtr Resolver::resolve(const ParseStatement 
&parse_query) {
         logical_plan_ = resolveInsertSelection(insert_selection_statement);
 
         if (insert_selection_statement.with_clause() != nullptr) {
-          // Report an error if there is a WITH query that is not actually 
used.
-          if (!with_queries_info_.unreferenced_query_indexes.empty()) {
-            int unreferenced_with_query_index = 
*with_queries_info_.unreferenced_query_indexes.begin();
-            const ParseSubqueryTableReference &unreferenced_with_query =
-                
(*insert_selection_statement.with_clause())[unreferenced_with_query_index];
-            THROW_SQL_ERROR_AT(&unreferenced_with_query)
-                << "WITH query "
-                << 
unreferenced_with_query.table_reference_signature()->table_alias()->value()
-                << " is defined but not used";
-          }
+          reportIfWithClauseUnused(*insert_selection_statement.with_clause());
         }
       }
       break;
@@ -385,16 +430,7 @@ L::LogicalPtr Resolver::resolve(const ParseStatement 
&parse_query) {
                               nullptr /* type_hints */,
                               nullptr /* parent_resolver */);
       if (set_operation_statement.with_clause() != nullptr) {
-        // Report an error if there is a WITH query that is not actually used.
-        if (!with_queries_info_.unreferenced_query_indexes.empty()) {
-          int unreferenced_with_query_index = 
*with_queries_info_.unreferenced_query_indexes.begin();
-          const ParseSubqueryTableReference &unreferenced_with_query =
-              
(*set_operation_statement.with_clause())[unreferenced_with_query_index];
-          THROW_SQL_ERROR_AT(&unreferenced_with_query)
-              << "WITH query "
-              << 
unreferenced_with_query.table_reference_signature()->table_alias()->value()
-              << " is defined but not used";
-        }
+        reportIfWithClauseUnused(*set_operation_statement.with_clause());
       }
       break;
     }
@@ -418,27 +454,157 @@ L::LogicalPtr Resolver::resolve(const ParseStatement 
&parse_query) {
 }
 
 L::LogicalPtr Resolver::resolveCopyFrom(
-    const ParseStatementCopyFrom &copy_from_statement) {
-  // Default parameters.
-  std::string column_delimiter_ = "\t";
-  bool escape_strings_ = true;
+    const ParseStatementCopy &copy_from_statement) {
+  DCHECK(copy_from_statement.getCopyDirection() == ParseStatementCopy::kFrom);
+  const PtrList<ParseKeyValue> *params = copy_from_statement.params();
 
-  const ParseCopyFromParams *params = copy_from_statement.params();
+  BulkIOFormat file_format = BulkIOFormat::kText;
   if (params != nullptr) {
-    if (params->delimiter != nullptr) {
-      column_delimiter_ = params->delimiter->value();
-      if (column_delimiter_.size() != 1) {
-        THROW_SQL_ERROR_AT(params->delimiter)
-            << "DELIMITER is not a single character";
+    for (const ParseKeyValue &param : *params) {
+      const std::string &key = ToLower(param.key()->value());
+      if (key == "format") {
+        const ParseString *parse_format = GetKeyValueString(param);
+        const std::string format = ToLower(parse_format->value());
+        // TODO(jianqiao): Support other bulk load formats such as CSV.
+        if (format != "text") {
+          THROW_SQL_ERROR_AT(parse_format) << "Unsupported file format: " << 
format;
+        }
+        // Update file_format when other formats get supported.
+        break;
+      }
+    }
+  }
+
+  std::unique_ptr<BulkIOConfiguration> options =
+      std::make_unique<BulkIOConfiguration>(file_format);
+  if (params != nullptr) {
+    for (const ParseKeyValue &param : *params) {
+      const std::string &key = ToLower(param.key()->value());
+      if (key == "delimiter") {
+        const ParseString *parse_delimiter = GetKeyValueString(param);
+        const std::string &delimiter = parse_delimiter->value();
+        if (delimiter.size() != 1) {
+          THROW_SQL_ERROR_AT(parse_delimiter)
+              << "Delimiter must be a single character";
+        }
+        options->setDelimiter(delimiter.front());
+      } else if (key == "escape_strings") {
+        options->setEscapeStrings(GetKeyValueBool(param));
+      } else if (key != "format") {
+        THROW_SQL_ERROR_AT(&param) << "Unsupported copy option: " << key;
       }
     }
-    escape_strings_ = params->escape_strings;
   }
 
   return 
L::CopyFrom::Create(resolveRelationName(copy_from_statement.relation_name()),
-                             copy_from_statement.source_filename()->value(),
-                             column_delimiter_[0],
-                             escape_strings_);
+                             copy_from_statement.file_name()->value(),
+                             BulkIOConfigurationPtr(options.release()));
+}
+
+L::LogicalPtr Resolver::resolveCopyTo(
+    const ParseStatementCopy &copy_to_statement) {
+  DCHECK(copy_to_statement.getCopyDirection() == ParseStatementCopy::kTo);
+  const PtrList<ParseKeyValue> *params = copy_to_statement.params();
+
+  // Check if copy format is explicitly specified.
+  BulkIOFormat file_format = BulkIOFormat::kText;
+  bool format_specified = false;
+  if (params != nullptr) {
+    for (const ParseKeyValue &param : *params) {
+      const std::string &key = ToLower(param.key()->value());
+      if (key == "format") {
+        const ParseString *parse_format = GetKeyValueString(param);
+        const std::string format = ToLower(parse_format->value());
+        if (format == "csv") {
+          file_format = BulkIOFormat::kCSV;
+        } else if (format == "text") {
+          file_format = BulkIOFormat::kText;
+        } else {
+          THROW_SQL_ERROR_AT(parse_format) << "Unsupported file format: " << 
format;
+        }
+        format_specified = true;
+        break;
+      }
+    }
+  }
+
+  const std::string &file_name = copy_to_statement.file_name()->value();
+  if (file_name.length() <= 1) {
+    THROW_SQL_ERROR_AT(copy_to_statement.file_name())
+        << "File name can not be empty";
+  }
+
+  // Infer copy format from file name extension.
+  if (!format_specified) {
+    if (file_name.length() > 4) {
+      if (ToLower(file_name.substr(file_name.length() - 4)) == ".csv") {
+        file_format = BulkIOFormat::kCSV;
+      }
+    }
+  }
+
+  // Resolve the copy options.
+  std::unique_ptr<BulkIOConfiguration> options =
+      std::make_unique<BulkIOConfiguration>(file_format);
+  if (params != nullptr) {
+    for (const ParseKeyValue &param : *params) {
+      const std::string &key = ToLower(param.key()->value());
+      if (key == "delimiter") {
+        const ParseString *parse_delimiter = GetKeyValueString(param);
+        const std::string &delimiter = parse_delimiter->value();
+        if (delimiter.size() != 1) {
+          THROW_SQL_ERROR_AT(parse_delimiter)
+              << "Delimiter must be a single character";
+        }
+        options->setDelimiter(delimiter.front());
+      } else if (file_format == BulkIOFormat::kText && key == 
"escape_strings") {
+        options->setEscapeStrings(GetKeyValueBool(param));
+      } else if (file_format == BulkIOFormat::kCSV && key == "header") {
+        options->setHeader(GetKeyValueBool(param));
+      } else if (file_format == BulkIOFormat::kCSV && key == "quote") {
+        const ParseString *parse_quote = GetKeyValueString(param);
+        const std::string &quote = parse_quote->value();
+        if (quote.size() != 1) {
+          THROW_SQL_ERROR_AT(parse_quote)
+              << "Quote must be a single character";
+        }
+        options->setQuoteCharacter(quote.front());
+      } else if (key == "null_string") {
+        const ParseString *parse_null_string = GetKeyValueString(param);
+        options->setNullString(parse_null_string->value());
+      } else if (key != "format") {
+        THROW_SQL_ERROR_AT(&param)
+            << "Unsupported copy option \"" << key
+            << "\" for file format " << options->getFormatName();
+      }
+    }
+  }
+
+  // Resolve the source relation.
+  L::LogicalPtr input;
+  if (copy_to_statement.set_operation_query() != nullptr) {
+    input = resolveSetOperation(*copy_to_statement.set_operation_query(),
+                                "" /* set_operation_name */,
+                                nullptr /* type_hints */,
+                                nullptr /* parent_resolver */);
+  } else {
+    const ParseString *relation_name = copy_to_statement.relation_name();
+    DCHECK(relation_name != nullptr);
+    std::unique_ptr<ParseTableReference> table_reference =
+        std::make_unique<ParseSimpleTableReference>(
+            relation_name->line_number(),
+            relation_name->column_number(),
+            new ParseString(relation_name->line_number(),
+                            relation_name->column_number(),
+                            relation_name->value()),
+            nullptr /* sample */);
+    NameResolver name_resolver;
+    input = resolveTableReference(*table_reference, &name_resolver);
+  }
+
+  return L::CopyTo::Create(input,
+                           copy_to_statement.file_name()->value(),
+                           BulkIOConfigurationPtr(options.release()));
 }
 
 L::LogicalPtr Resolver::resolveCreateTable(
@@ -491,26 +657,6 @@ L::LogicalPtr Resolver::resolveCreateTable(
   return L::CreateTable::Create(relation_name, attributes, block_properties, 
partition_scheme_header_proto);
 }
 
-namespace {
-
-attribute_id GetAttributeIdFromName(const PtrList<ParseAttributeDefinition> 
&attribute_definition_list,
-                                    const std::string &attribute_name) {
-  const std::string lower_attribute_name = ToLower(attribute_name);
-
-  attribute_id attr_id = 0;
-  for (const ParseAttributeDefinition &attribute_definition : 
attribute_definition_list) {
-    if (lower_attribute_name == ToLower(attribute_definition.name()->value())) 
{
-      return attr_id;
-    }
-
-    ++attr_id;
-  }
-
-  return kInvalidAttributeID;
-}
-
-}  // namespace
-
 StorageBlockLayoutDescription* Resolver::resolveBlockProperties(
     const ParseStatementCreateTable &create_table_statement) {
   const ParseBlockProperties *block_properties
@@ -1595,6 +1741,19 @@ void 
Resolver::appendProjectIfNeedPrecomputationAfterAggregation(
   }
 }
 
+void Resolver::reportIfWithClauseUnused(
+    const PtrVector<ParseSubqueryTableReference> &with_list) const {
+  if (!with_queries_info_.unreferenced_query_indexes.empty()) {
+    int unreferenced_with_query_index = 
*with_queries_info_.unreferenced_query_indexes.begin();
+    const ParseSubqueryTableReference &unreferenced_with_query =
+        with_list[unreferenced_with_query_index];
+    THROW_SQL_ERROR_AT(&unreferenced_with_query)
+        << "WITH query "
+        << 
unreferenced_with_query.table_reference_signature()->table_alias()->value()
+        << " is defined but not used";
+  }
+}
+
 void Resolver::validateSelectExpressionsForAggregation(
     const ParseSelectionClause &parse_selection,
     const std::vector<E::NamedExpressionPtr> &select_list_expressions,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/resolver/Resolver.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.hpp 
b/query_optimizer/resolver/Resolver.hpp
index 1ae565a..a7c1a80 100644
--- a/query_optimizer/resolver/Resolver.hpp
+++ b/query_optimizer/resolver/Resolver.hpp
@@ -55,7 +55,7 @@ class ParseSimpleCaseExpression;
 class ParseSimpleTableReference;
 class ParseSubqueryTableReference;
 class ParseStatement;
-class ParseStatementCopyFrom;
+class ParseStatementCopy;
 class ParseStatementCreateTable;
 class ParseStatementCreateIndex;
 class ParseStatementDelete;
@@ -283,7 +283,16 @@ class Resolver {
    * @return A logical plan for the COPY FROM query.
    */
   logical::LogicalPtr resolveCopyFrom(
-      const ParseStatementCopyFrom &copy_from_statement);
+      const ParseStatementCopy &copy_from_statement);
+
+  /**
+   * @brief Resolves a COPY TO query and returns a logical plan.
+   *
+   * @param copy_to_statement The COPY TO parse tree.
+   * @return A logical plan for the COPY TO query.
+   */
+  logical::LogicalPtr resolveCopyTo(
+      const ParseStatementCopy &copy_to_statement);
 
   /**
    * @brief Resolves a UPDATE query and returns a logical plan.
@@ -621,6 +630,14 @@ class Resolver {
   static std::string GenerateOrderingAttributeAlias(int index);
 
   /**
+   * @brief Report an error if there is a WITH query that is not actually used.
+   *
+   * @param with_list The list of subqueries in WITH clause.
+   */
+  void reportIfWithClauseUnused(
+      const PtrVector<ParseSubqueryTableReference> &with_list) const;
+
+  /**
    * @brief Validates each SELECT-list expression to ensure that it does not
    *        reference a named expression with an ID not in \p 
valid_expr_id_set.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/strategy/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/strategy/CMakeLists.txt 
b/query_optimizer/strategy/CMakeLists.txt
index e011126..20a4eb4 100644
--- a/query_optimizer/strategy/CMakeLists.txt
+++ b/query_optimizer/strategy/CMakeLists.txt
@@ -76,6 +76,7 @@ 
target_link_libraries(quickstep_queryoptimizer_strategy_OneToOne
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ExpressionUtil
                       quickstep_queryoptimizer_logical_CopyFrom
+                      quickstep_queryoptimizer_logical_CopyTo
                       quickstep_queryoptimizer_logical_CreateIndex
                       quickstep_queryoptimizer_logical_CreateTable
                       quickstep_queryoptimizer_logical_DeleteTuples
@@ -95,6 +96,7 @@ 
target_link_libraries(quickstep_queryoptimizer_strategy_OneToOne
                       quickstep_queryoptimizer_logical_WindowAggregate
                       quickstep_queryoptimizer_physical_Aggregate
                       quickstep_queryoptimizer_physical_CopyFrom
+                      quickstep_queryoptimizer_physical_CopyTo
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
                       quickstep_queryoptimizer_physical_DeleteTuples

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/query_optimizer/strategy/OneToOne.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/strategy/OneToOne.cpp 
b/query_optimizer/strategy/OneToOne.cpp
index af4e150..3cfe013 100644
--- a/query_optimizer/strategy/OneToOne.cpp
+++ b/query_optimizer/strategy/OneToOne.cpp
@@ -27,6 +27,7 @@
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExpressionUtil.hpp"
 #include "query_optimizer/logical/CopyFrom.hpp"
+#include "query_optimizer/logical/CopyTo.hpp"
 #include "query_optimizer/logical/CreateIndex.hpp"
 #include "query_optimizer/logical/CreateTable.hpp"
 #include "query_optimizer/logical/DeleteTuples.hpp"
@@ -45,6 +46,7 @@
 #include "query_optimizer/logical/WindowAggregate.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/CopyFrom.hpp"
+#include "query_optimizer/physical/CopyTo.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
 #include "query_optimizer/physical/DeleteTuples.hpp"
@@ -104,19 +106,28 @@ bool OneToOne::generatePlan(const L::LogicalPtr 
&logical_input,
     case L::LogicalType::kCopyFrom: {
       const L::CopyFromPtr copy_from =
           std::static_pointer_cast<const L::CopyFrom>(logical_input);
-      *physical_output = P::CopyFrom::Create(
-          copy_from->catalog_relation(), copy_from->file_name(),
-          copy_from->column_delimiter(), copy_from->escape_strings());
+      *physical_output = P::CopyFrom::Create(copy_from->catalog_relation(),
+                                             copy_from->file_name(),
+                                             copy_from->options());
+      return true;
+    }
+    case L::LogicalType::kCopyTo: {
+      const L::CopyToPtr copy_to =
+          std::static_pointer_cast<const L::CopyTo>(logical_input);
+      *physical_output = P::CopyTo::Create(
+          physical_mapper_->createOrGetPhysicalFromLogical(copy_to->input()),
+          copy_to->file_name(),
+          copy_to->options());
       return true;
     }
     case L::LogicalType::kCreateIndex: {
       const L::CreateIndexPtr create_index =
           std::static_pointer_cast<const L::CreateIndex>(logical_input);
-      *physical_output = 
P::CreateIndex::Create(physical_mapper_->createOrGetPhysicalFromLogical(
-                                                                    
create_index->input()),
-                                                create_index->index_name(),
-                                                
create_index->index_attributes(),
-                                                
create_index->index_description());
+      *physical_output = P::CreateIndex::Create(
+          
physical_mapper_->createOrGetPhysicalFromLogical(create_index->input()),
+          create_index->index_name(),
+          create_index->index_attributes(),
+          create_index->index_description());
       return true;
     }
     case L::LogicalType::kCreateTable: {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt 
b/relational_operators/CMakeLists.txt
index 5ad9c3b..0d0fe41 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -71,6 +71,7 @@ 
add_library(quickstep_relationaloperators_SortMergeRunOperatorHelpers SortMergeR
             SortMergeRunOperatorHelpers.hpp)
 add_library(quickstep_relationaloperators_SortRunGenerationOperator 
SortRunGenerationOperator.cpp
             SortRunGenerationOperator.hpp)
+add_library(quickstep_relationaloperators_TableExportOperator 
TableExportOperator.cpp TableExportOperator.hpp)
 add_library(quickstep_relationaloperators_TableGeneratorOperator 
TableGeneratorOperator.cpp TableGeneratorOperator.hpp)
 add_library(quickstep_relationaloperators_TextScanOperator 
TextScanOperator.cpp TextScanOperator.hpp)
 add_library(quickstep_relationaloperators_UnionAllOperator 
UnionAllOperator.cpp UnionAllOperator.hpp)
@@ -473,6 +474,25 @@ 
target_link_libraries(quickstep_relationaloperators_SortRunGenerationOperator
                       quickstep_utility_Macros
                       quickstep_utility_SortConfiguration
                       tmb)
+target_link_libraries(quickstep_relationaloperators_TableExportOperator
+                      glog
+                      quickstep_catalog_CatalogAttribute
+                      quickstep_catalog_CatalogRelation
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_WorkOrderProtosContainer
+                      quickstep_queryexecution_WorkOrdersContainer
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_ValueAccessor
+                      quickstep_threading_SpinMutex
+                      quickstep_types_TypedValue
+                      quickstep_types_containers_Tuple
+                      quickstep_utility_BulkIOConfiguration
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil)
 target_link_libraries(quickstep_relationaloperators_TableGeneratorOperator
                       glog
                       quickstep_catalog_CatalogRelation
@@ -508,6 +528,7 @@ 
target_link_libraries(quickstep_relationaloperators_TextScanOperator
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_types_containers_Tuple
+                      quickstep_utility_BulkIOConfiguration
                       quickstep_utility_Glob
                       quickstep_utility_Macros
                       tmb)
@@ -635,6 +656,7 @@ target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_SortMergeRunOperatorHelpers
                       quickstep_relationaloperators_SortMergeRunOperator_proto
                       quickstep_relationaloperators_SortRunGenerationOperator
+                      quickstep_relationaloperators_TableExportOperator
                       quickstep_relationaloperators_TableGeneratorOperator
                       quickstep_relationaloperators_TextScanOperator
                       quickstep_relationaloperators_UnionAllOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp 
b/relational_operators/RelationalOperator.hpp
index 425fa32..64397c7 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -85,6 +85,7 @@ class RelationalOperator {
     kSelect,
     kSortMergeRun,
     kSortRunGeneration,
+    kTableExport,
     kTableGenerator,
     kTextScan,
     kUnionAll,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/relational_operators/TableExportOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableExportOperator.cpp 
b/relational_operators/TableExportOperator.cpp
new file mode 100644
index 0000000..03e260f
--- /dev/null
+++ b/relational_operators/TableExportOperator.cpp
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/TableExportOperator.hpp"
+
+#include <cstdio>
+#include <exception>
+#include <string>
+#include <utility>
+
+#include "catalog/CatalogAttribute.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "threading/SpinMutex.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/Tuple.hpp"
+#include "utility/BulkIOConfiguration.hpp"
+#include "utility/StringUtil.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool TableExportOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  if (query_context_ == nullptr) {
+    query_context_ = query_context;
+  }
+
+  const auto add_work_order = [&](const block_id input_block_id) -> void {
+    std::unique_ptr<std::string> output_buffer = 
std::make_unique<std::string>();
+    container->addNormalWorkOrder(
+        new TableExportWorkOrder(query_id_,
+                                 input_relation_,
+                                 input_block_id,
+                                 options_->getFormat(),
+                                 options_->getDelimiter(),
+                                 options_->escapeStrings(),
+                                 options_->getQuoteCharacter(),
+                                 options_->getNullString(),
+                                 op_index_,
+                                 scheduler_client_id,
+                                 storage_manager,
+                                 bus,
+                                 output_buffer.get()),
+        op_index_);
+    query_context->setBlockOutputTextBuffer(input_block_id, 
output_buffer.release());
+  };
+
+  if (input_relation_is_stored_) {
+    if (!started_) {
+      for (const block_id input_block_id : input_relation_block_ids_) {
+        add_work_order(input_block_id);
+      }
+      started_ = true;
+    }
+    return true;
+  } else {
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      add_work_order(input_relation_block_ids_[num_workorders_generated_]);
+      ++num_workorders_generated_;
+    }
+    return done_feeding_input_relation_;
+  }
+}
+
+bool TableExportOperator::getAllWorkOrderProtos(
+    WorkOrderProtosContainer *container) {
+  // TODO(quickstep-team): Implement TextExportOperator for the distributed 
case.
+  LOG(FATAL) << "TableExportOperator::getAllWorkOrderProtos() is not 
supported";
+}
+
+void TableExportOperator::receiveFeedbackMessage(
+    const WorkOrder::FeedbackMessage &msg) {
+  DCHECK(TableExportOperator::kBlockOutputMessage == msg.type());
+  DCHECK(msg.payload_size() == sizeof(block_id));
+
+  if (file_ == nullptr) {
+    const std::string lo_file_name = ToLower(file_name_);
+    if (lo_file_name == "$stdout") {
+      file_ = stdout;
+    } else if (lo_file_name == "$stderr") {
+      file_ = stderr;
+    } else {
+      file_ = std::fopen(file_name_.substr(1).c_str(), "wb");
+      // TODO(quickstep-team): Decent handling of exceptions at query runtime.
+      if (file_ == nullptr) {
+        throw std::runtime_error("Can not open file " + file_name_ + " for 
writing");
+      }
+    }
+  }
+
+  block_id completed_block_id = *static_cast<const block_id*>(msg.payload());
+  outputs_.emplace(completed_block_id, std::unique_ptr<std::string>(
+      query_context_->releaseBlockOutputTextBuffer(completed_block_id)));
+
+  while (true) {
+    block_id next_block_id;
+    {
+      SpinMutexLock lock(block_ids_mutex_);
+      next_block_id = input_relation_block_ids_[num_blocks_written_];
+    }
+    auto it = outputs_.find(next_block_id);
+    if (it == outputs_.end()) {
+      break;
+    }
+    std::fwrite(it->second->c_str(), 1, it->second->length(), file_);
+    ++num_blocks_written_;
+    outputs_.erase(it);
+  }
+}
+
+void TableExportOperator::updateCatalogOnCompletion() {
+  if (file_ != nullptr && file_ != stdout && file_ != stderr) {
+    std::fclose(file_);
+  }
+  file_ = nullptr;
+}
+
+void TableExportWorkOrder::execute() {
+  BlockReference block(
+      storage_manager_->getBlock(input_block_id_, input_relation_));
+  std::unique_ptr<ValueAccessor> accessor(
+      block->getTupleStorageSubBlock().createValueAccessor());
+
+  switch (format_) {
+    case BulkIOFormat::kCSV:
+      writeToString<&TableExportWorkOrder::quoteCSVField>(
+          accessor.get(), output_buffer_);
+      break;
+    case BulkIOFormat::kText:
+      writeToString<&TableExportWorkOrder::escapeTextField>(
+          accessor.get(), output_buffer_);
+      break;
+    default:
+      LOG(FATAL) << "Unsupported export format in 
TableExportWorkOrder::execute()";
+  }
+
+  // Send completion message to operator.
+  FeedbackMessage msg(TableExportOperator::kBlockOutputMessage,
+                      getQueryID(),
+                      operator_index_,
+                      new block_id(input_block_id_),
+                      sizeof(input_block_id_));
+  SendFeedbackMessage(
+      bus_, ClientIDMap::Instance()->getValue(), scheduler_client_id_, msg);
+}
+
+inline std::string TableExportWorkOrder::quoteCSVField(std::string &&field) 
const {
+  bool need_quote = false;
+  for (const char c : field) {
+    if (c == column_delimiter_ || c == quote_character_) {
+      need_quote = true;
+      break;
+    }
+  }
+  if (!need_quote) {
+    return std::move(field);
+  }
+
+  std::string quoted;
+  quoted.push_back(quote_character_);
+  for (const char c : field) {
+    if (c == quote_character_) {
+      quoted.push_back(c);
+    }
+    quoted.push_back(c);
+  }
+  quoted.push_back(quote_character_);
+  return quoted;
+}
+
+
+inline std::string TableExportWorkOrder::escapeTextField(std::string &&field) 
const {
+  if (escape_strings_ == false || field == "\\N") {
+    return std::move(field);
+  }
+  bool need_escape = false;
+  for (const unsigned char c : field) {
+    if (c < ' ' || c == '\\' || c == column_delimiter_) {
+      need_escape = true;
+      break;
+    }
+  }
+  if (!need_escape) {
+    return std::move(field);
+  }
+
+  std::string escaped;
+  for (const unsigned char c : field) {
+    if (c < 32) {
+      switch (c) {
+        case '\b':
+          // Backspace.
+          escaped.append("\\b");
+          break;
+        case '\f':
+          // Form-feed.
+          escaped.append("\\f");
+          break;
+        case '\n':
+          // Newline.
+          escaped.append("\\n");
+          break;
+        case '\r':
+          // Carriage return.
+          escaped.append("\\r");
+          break;
+        case '\t':
+          // Tab.
+          escaped.append("\\t");
+          break;
+        case '\v':
+          // Vertical tab
+          escaped.append("\\v");
+          break;
+        default: {
+          // Use hexidecimal representation.
+          static const std::string digits = "0123456789ABCDEF";
+          escaped.append("\\x");
+          escaped.push_back(digits.at(c >> 4));
+          escaped.push_back(digits.at(c & 0xF));
+          break;
+        }
+      }
+    } else {
+      if (c == '\\' || c == column_delimiter_) {
+        escaped.push_back('\\');
+      }
+      escaped.push_back(c);
+    }
+  }
+  return escaped;
+}
+
+template <std::string (TableExportWorkOrder::*transform)(std::string&&) const,
+          typename Container, typename Functor>
+inline void TableExportWorkOrder::writeEachToString(const Container &container,
+                                                    std::string *output,
+                                                    const Functor &functor) 
const {
+  auto it = container.begin();
+  if (it != container.end()) {
+    std::size_t idx = 0;
+    output->append((this->*transform)(functor(*it, idx++)));
+    while ((++it) != container.end()) {
+      output->push_back(column_delimiter_);
+      output->append((this->*transform)(functor(*it, idx++)));
+    }
+  }
+}
+
+template <std::string (TableExportWorkOrder::*transform)(std::string&&) const>
+void TableExportWorkOrder::writeToString(
+    ValueAccessor *accessor, std::string *output) const {
+  std::vector<const Type*> value_types;
+  value_types.reserve(input_relation_.size());
+  for (const CatalogAttribute &attribute : input_relation_) {
+    value_types.emplace_back(&attribute.getType());
+  }
+
+  accessor->beginIterationVirtual();
+  while (accessor->nextVirtual()) {
+    std::unique_ptr<Tuple> tuple(accessor->getTupleVirtual());
+    writeEachToString<transform>(
+        *tuple, output,
+        [&](const TypedValue &value, const std::size_t idx) -> std::string {
+      if (value.isNull()) {
+        return null_string_;
+      } else {
+        return value_types[idx]->printValueToString(value);
+      }
+    });
+    output->push_back('\n');
+  }
+}
+
+}  // namespace quickstep

Reply via email to