Repository: incubator-quickstep
Updated Branches:
  refs/heads/copy-to 9eae02253 -> cbd18561c (forced update)


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/relational_operators/TableExportOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableExportOperator.hpp 
b/relational_operators/TableExportOperator.hpp
new file mode 100644
index 0000000..a8152c8
--- /dev/null
+++ b/relational_operators/TableExportOperator.hpp
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_TABLE_EXPORT_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_TABLE_EXPORT_OPERATOR_HPP_
+
+#include <cstddef>
+#include <cstdio>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "threading/SpinMutex.hpp"
+#include "utility/BulkIOConfiguration.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogRelationSchema;
+class InsertDestination;
+class StorageManager;
+class ValueAccessor;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ *  @{
+ */
+
+class TableExportOperator : public RelationalOperator {
+ public:
+  enum FeedbackMessageType : WorkOrder::FeedbackMessageType {
+      kBlockOutputMessage,
+  };
+
+  TableExportOperator(const std::size_t query_id,
+                      const CatalogRelation &input_relation,
+                      const bool input_relation_is_stored,
+                      const std::string &file_name,
+                      const BulkIOConfigurationPtr &options)
+      : RelationalOperator(query_id),
+        input_relation_(input_relation),
+        input_relation_is_stored_(input_relation_is_stored),
+        file_name_(file_name),
+        options_(options),
+        input_relation_block_ids_(input_relation_is_stored
+                                      ? input_relation.getBlocksSnapshot()
+                                      : std::vector<block_id>()),
+        num_workorders_generated_(0),
+        started_(false),
+        query_context_(nullptr),
+        num_blocks_written_(0),
+        file_(nullptr) {}
+
+  ~TableExportOperator() override {}
+
+  OperatorType getOperatorType() const override {
+    return kTableExport;
+  }
+
+  std::string getName() const override {
+    return "TableExportOperator";
+  }
+
+  const CatalogRelation& input_relation() const {
+    return input_relation_;
+  }
+
+  bool getAllWorkOrders(WorkOrdersContainer *container,
+                        QueryContext *query_context,
+                        StorageManager *storage_manager,
+                        const tmb::client_id scheduler_client_id,
+                        tmb::MessageBus *bus) override;
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+  void feedInputBlock(const block_id input_block_id,
+                      const relation_id input_relation_id,
+                      const partition_id part_id) override {
+    if (input_relation_id == input_relation_.getID()) {
+      SpinMutexLock lock(block_ids_mutex_);
+      input_relation_block_ids_.emplace_back(input_block_id);
+    }
+  }
+
+  void receiveFeedbackMessage(const WorkOrder::FeedbackMessage &msg) override;
+
+  void updateCatalogOnCompletion() override;
+
+ private:
+  const CatalogRelation &input_relation_;
+  const bool input_relation_is_stored_;
+  const std::string file_name_;
+  const BulkIOConfigurationPtr options_;
+
+  std::vector<block_id> input_relation_block_ids_;
+  std::size_t num_workorders_generated_;
+
+  bool started_;
+
+  QueryContext *query_context_;
+  std::size_t num_blocks_written_;
+  std::unordered_map<block_id, std::unique_ptr<std::string>> outputs_;
+  SpinMutex block_ids_mutex_;
+
+  FILE *file_;
+
+  DISALLOW_COPY_AND_ASSIGN(TableExportOperator);
+};
+
+class TableExportWorkOrder : public WorkOrder {
+ public:
+  TableExportWorkOrder(const std::size_t query_id,
+                       const CatalogRelationSchema &input_relation,
+                       const block_id input_block_id,
+                       const BulkIOFormat format,
+                       const char column_delimiter,
+                       const bool escape_strings,
+                       const char quote_character,
+                       const std::string null_string,
+                       const std::size_t operator_index,
+                       const tmb::client_id scheduler_client_id,
+                       StorageManager *storage_manager,
+                       MessageBus *bus,
+                       std::string *output_buffer)
+      : WorkOrder(query_id),
+        input_relation_(input_relation),
+        input_block_id_(input_block_id),
+        format_(format),
+        column_delimiter_(column_delimiter),
+        escape_strings_(escape_strings),
+        quote_character_(quote_character),
+        null_string_(null_string),
+        operator_index_(operator_index),
+        scheduler_client_id_(scheduler_client_id),
+        storage_manager_(storage_manager),
+        bus_(bus),
+        output_buffer_(output_buffer) {
+  }
+
+  ~TableExportWorkOrder() override {}
+
+  void execute() override;
+
+ private:
+  inline std::string quoteCSVField(std::string &&field) const;
+  inline std::string escapeTextField(std::string &&field) const;
+
+  template <std::string (TableExportWorkOrder::*transform)(std::string&&) 
const,
+            typename Container, typename Functor>
+  inline void writeEachToString(const Container &container,
+                                std::string *output,
+                                const Functor &functor) const;
+
+  template <std::string (TableExportWorkOrder::*transform)(std::string&&) 
const>
+  void writeToString(ValueAccessor *accessor, std::string *output) const;
+
+  const CatalogRelationSchema &input_relation_;
+  const block_id input_block_id_;
+
+  const BulkIOFormat format_;
+  const char column_delimiter_;
+  const bool escape_strings_;
+  const char quote_character_;
+  const std::string null_string_;
+
+  const std::size_t operator_index_;
+  const tmb::client_id scheduler_client_id_;
+  StorageManager *storage_manager_;
+  MessageBus *bus_;
+
+  std::string *output_buffer_;
+
+  DISALLOW_COPY_AND_ASSIGN(TableExportWorkOrder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_RELATIONAL_OPERATORS_TABLE_EXPORT_OPERATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp 
b/relational_operators/TextScanOperator.cpp
index a133e0c..d016ada 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -31,6 +31,7 @@
 #include <cstdint>
 #include <cstdio>
 #include <cstdlib>
+#include <exception>
 #include <memory>
 #include <string>
 #include <utility>
@@ -54,6 +55,7 @@
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "types/containers/Tuple.hpp"
+#include "utility/BulkIOConfiguration.hpp"
 #include "utility/Glob.hpp"
 
 #include "gflags/gflags.h"
@@ -90,6 +92,10 @@ namespace {
 size_t getFileSize(const string &file_name) {
   // Use standard C libary to retrieve the file size.
   FILE *fp = std::fopen(file_name.c_str(), "rb");
+  // TODO(quickstep-team): Decent handling of exceptions at query runtime.
+  if (fp == nullptr) {
+    throw std::runtime_error("Can not open file " + file_name + " for 
reading");
+  }
   std::fseek(fp, 0, SEEK_END);
   const std::size_t file_size = std::ftell(fp);
   std::fclose(fp);
@@ -135,8 +141,8 @@ bool TextScanOperator::getAllWorkOrders(
                                   file,
                                   text_offset,
                                   FLAGS_textscan_text_segment_size,
-                                  field_terminator_,
-                                  process_escape_sequences_,
+                                  options_->getDelimiter(),
+                                  options_->escapeStrings(),
                                   output_destination),
             op_index_);
       }
@@ -149,8 +155,8 @@ bool TextScanOperator::getAllWorkOrders(
                                   file,
                                   text_offset,
                                   file_size - text_offset,
-                                  field_terminator_,
-                                  process_escape_sequences_,
+                                  options_->getDelimiter(),
+                                  options_->escapeStrings(),
                                   output_destination),
             op_index_);
       }
@@ -196,9 +202,10 @@ serialization::WorkOrder* 
TextScanOperator::createWorkOrderProto(const string &f
   proto->SetExtension(serialization::TextScanWorkOrder::filename, filename);
   proto->SetExtension(serialization::TextScanWorkOrder::text_offset, 
text_offset);
   proto->SetExtension(serialization::TextScanWorkOrder::text_segment_size, 
text_segment_size);
-  proto->SetExtension(serialization::TextScanWorkOrder::field_terminator, 
field_terminator_);
+  proto->SetExtension(serialization::TextScanWorkOrder::field_terminator,
+                      options_->getDelimiter());
   
proto->SetExtension(serialization::TextScanWorkOrder::process_escape_sequences,
-                      process_escape_sequences_);
+                      options_->escapeStrings());
   
proto->SetExtension(serialization::TextScanWorkOrder::insert_destination_index,
                       output_destination_index_);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp 
b/relational_operators/TextScanOperator.hpp
index f6be8c8..bc18d9a 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -32,6 +32,7 @@
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
 #include "types/containers/Tuple.hpp"
+#include "utility/BulkIOConfiguration.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -121,14 +122,12 @@ class TextScanOperator : public RelationalOperator {
    **/
   TextScanOperator(const std::size_t query_id,
                    const std::string &file_pattern,
-                   const char field_terminator,
-                   const bool process_escape_sequences,
+                   const BulkIOConfigurationPtr &options,
                    const CatalogRelation &output_relation,
                    const QueryContext::insert_destination_id 
output_destination_index)
       : RelationalOperator(query_id),
         file_pattern_(file_pattern),
-        field_terminator_(field_terminator),
-        process_escape_sequences_(process_escape_sequences),
+        options_(options),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
         work_generated_(false) {}
@@ -165,8 +164,7 @@ class TextScanOperator : public RelationalOperator {
                                                  const std::size_t 
text_segment_size);
 
   const std::string file_pattern_;
-  const char field_terminator_;
-  const bool process_escape_sequences_;
+  const BulkIOConfigurationPtr options_;
 
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto 
b/relational_operators/WorkOrder.proto
index 42a0e7d..7025c16 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -21,7 +21,7 @@ package quickstep.serialization;
 
 import "relational_operators/SortMergeRunOperator.proto";
 
-// Next tag: 26.
+// Next tag: 27.
 enum WorkOrderType {
   AGGREGATION = 1;
   BUILD_AGGREGATION_EXISTENCE_MAP = 23;
@@ -43,6 +43,7 @@ enum WorkOrderType {
   SELECT = 15;
   SORT_MERGE_RUN = 16;
   SORT_RUN_GENERATION = 17;
+  TABLE_EXPORT = 26;
   TABLE_GENERATOR = 18;
   TEXT_SCAN = 19;
   UNION_ALL = 24;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/utility/BulkIOConfiguration.cpp
----------------------------------------------------------------------
diff --git a/utility/BulkIOConfiguration.cpp b/utility/BulkIOConfiguration.cpp
new file mode 100644
index 0000000..af95dca
--- /dev/null
+++ b/utility/BulkIOConfiguration.cpp
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/BulkIOConfiguration.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+void BulkIOConfiguration::initializeDefaultParameters(const BulkIOFormat 
format) {
+  switch (format) {
+    case BulkIOFormat::kCSV: {
+      delimiter_ = ',';
+      escape_strings_ = false;
+      header_ = true;
+      quote_ = '"';
+      null_string_ = "";
+      break;
+    }
+    case BulkIOFormat::kText: {
+      delimiter_ = '\t';
+      escape_strings_ = true;
+      header_ = false;
+      quote_ = 0;
+      null_string_ = "\\N";
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unexpected format in "
+                 << "BulkIOConfiguration::initializeDefaultParameters()";
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/utility/BulkIOConfiguration.hpp
----------------------------------------------------------------------
diff --git a/utility/BulkIOConfiguration.hpp b/utility/BulkIOConfiguration.hpp
new file mode 100644
index 0000000..d5f190f
--- /dev/null
+++ b/utility/BulkIOConfiguration.hpp
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_BULK_IO_CONFIGURATION_HPP_
+#define QUICKSTEP_UTILITY_BULK_IO_CONFIGURATION_HPP_
+
+#include <memory>
+#include <string>
+
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+enum class BulkIOFormat {
+  kCSV,
+  kText
+};
+
+class BulkIOConfiguration;
+typedef std::shared_ptr<const BulkIOConfiguration> BulkIOConfigurationPtr;
+
+class BulkIOConfiguration {
+ public:
+  BulkIOConfiguration(const BulkIOFormat format)
+      : format_(format) {
+    initializeDefaultParameters(format);
+  }
+
+  inline BulkIOFormat getFormat() const {
+    return format_;
+  }
+
+  inline std::string getFormatName() const {
+    switch (format_) {
+      case BulkIOFormat::kCSV:
+        return "CSV";
+      case BulkIOFormat::kText:
+        return "TEXT";
+    }
+  }
+
+  inline char getDelimiter() const {
+    return delimiter_;
+  }
+
+  inline void setDelimiter(const char delimiter) {
+    delimiter_ = delimiter;
+  }
+
+  inline bool escapeStrings() const {
+    return escape_strings_;
+  }
+
+  inline void setEscapeStrings(const bool escape_strings) {
+    escape_strings_ = escape_strings;
+  }
+
+  inline bool hasHeader() const {
+    return header_;
+  }
+
+  inline void setHeader(const bool header) {
+    header_ = header;
+  }
+
+  inline char getQuoteCharacter() const {
+    return quote_;
+  }
+
+  inline void setQuoteCharacter(const char quote) {
+    quote_ = quote;
+  }
+
+  inline const std::string& getNullString() const {
+    return null_string_;
+  }
+
+  inline void setNullString(const std::string &null_string) {
+    null_string_ = null_string;
+  }
+
+ private:
+  void initializeDefaultParameters(const BulkIOFormat format);
+
+  const BulkIOFormat format_;
+
+  char delimiter_;
+  bool escape_strings_;
+  bool header_;
+  char quote_;
+  std::string null_string_;
+
+  DISALLOW_COPY_AND_ASSIGN(BulkIOConfiguration);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_BULK_IO_CONFIGURATION_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbd18561/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index 16a83ee..59d843d 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -168,6 +168,7 @@ add_library(quickstep_utility_BloomFilter ../empty_src.cpp 
BloomFilter.hpp)
 add_library(quickstep_utility_BloomFilter_proto
             ${quickstep_utility_BloomFilter_proto_srcs}
             ${quickstep_utility_BloomFilter_proto_hdrs})
+add_library(quickstep_utility_BulkIOConfiguration BulkIOConfiguration.cpp 
BulkIOConfiguration.hpp)
 add_library(quickstep_utility_CalculateInstalledMemory 
CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp)
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
@@ -249,6 +250,8 @@ target_link_libraries(quickstep_utility_CompositeHash
 target_link_libraries(quickstep_utility_BarrieredReadWriteConcurrentBitVector
                       quickstep_utility_BitManipulation
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_BulkIOConfiguration
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_DAG
                       glog
                       quickstep_utility_Macros)
@@ -347,6 +350,7 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_BitVector
                       quickstep_utility_BloomFilter
                       quickstep_utility_BloomFilter_proto
+                      quickstep_utility_BulkIOConfiguration
                       quickstep_utility_CalculateInstalledMemory
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf

Reply via email to