http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/parser/preprocessed/SqlParser_gen.hpp
----------------------------------------------------------------------
diff --git a/parser/preprocessed/SqlParser_gen.hpp 
b/parser/preprocessed/SqlParser_gen.hpp
index 142059d..96c649d 100644
--- a/parser/preprocessed/SqlParser_gen.hpp
+++ b/parser/preprocessed/SqlParser_gen.hpp
@@ -156,29 +156,30 @@ extern int quickstep_yydebug;
     TOKEN_SMA = 366,
     TOKEN_SMALLINT = 367,
     TOKEN_STDERR = 368,
-    TOKEN_STDOUT = 369,
-    TOKEN_SUBSTRING = 370,
-    TOKEN_TABLE = 371,
-    TOKEN_THEN = 372,
-    TOKEN_TIME = 373,
-    TOKEN_TIMESTAMP = 374,
-    TOKEN_TO = 375,
-    TOKEN_TRUE = 376,
-    TOKEN_TUPLESAMPLE = 377,
-    TOKEN_UNBOUNDED = 378,
-    TOKEN_UNIQUE = 379,
-    TOKEN_UPDATE = 380,
-    TOKEN_USING = 381,
-    TOKEN_VALUES = 382,
-    TOKEN_VARCHAR = 383,
-    TOKEN_WHEN = 384,
-    TOKEN_WHERE = 385,
-    TOKEN_WINDOW = 386,
-    TOKEN_WITH = 387,
-    TOKEN_YEAR = 388,
-    TOKEN_YEARMONTH = 389,
-    TOKEN_EOF = 390,
-    TOKEN_LEX_ERROR = 391
+    TOKEN_STDIN = 369,
+    TOKEN_STDOUT = 370,
+    TOKEN_SUBSTRING = 371,
+    TOKEN_TABLE = 372,
+    TOKEN_THEN = 373,
+    TOKEN_TIME = 374,
+    TOKEN_TIMESTAMP = 375,
+    TOKEN_TO = 376,
+    TOKEN_TRUE = 377,
+    TOKEN_TUPLESAMPLE = 378,
+    TOKEN_UNBOUNDED = 379,
+    TOKEN_UNIQUE = 380,
+    TOKEN_UPDATE = 381,
+    TOKEN_USING = 382,
+    TOKEN_VALUES = 383,
+    TOKEN_VARCHAR = 384,
+    TOKEN_WHEN = 385,
+    TOKEN_WHERE = 386,
+    TOKEN_WINDOW = 387,
+    TOKEN_WITH = 388,
+    TOKEN_YEAR = 389,
+    TOKEN_YEARMONTH = 390,
+    TOKEN_EOF = 391,
+    TOKEN_LEX_ERROR = 392
   };
 #endif
 
@@ -289,7 +290,7 @@ union YYSTYPE
 
   quickstep::ParsePriority *opt_priority_clause_;
 
-#line 293 "SqlParser_gen.hpp" /* yacc.c:1915  */
+#line 294 "SqlParser_gen.hpp" /* yacc.c:1915  */
 };
 
 typedef union YYSTYPE YYSTYPE;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp 
b/query_optimizer/ExecutionGenerator.cpp
index 14d8949..3bca344 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1212,10 +1212,13 @@ void ExecutionGenerator::convertCopyFrom(
         ->MergeFrom(output_relation->getPartitionScheme()->getProto());
   } else {
     
insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
-
-    const vector<block_id> blocks(output_relation->getBlocksSnapshot());
-    for (const block_id block : blocks) {
-      
insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, 
block);
+    const StorageBlockLayout &layout = 
output_relation->getDefaultStorageBlockLayout();
+    const auto sub_block_type = 
layout.getDescription().tuple_store_description().sub_block_type();
+    if (sub_block_type != 
TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE) {
+      const vector<block_id> blocks(output_relation->getBlocksSnapshot());
+      for (const block_id block : blocks) {
+        
insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, 
block);
+      }
     }
   }
 
@@ -1880,7 +1883,6 @@ void ExecutionGenerator::convertAggregate(
       use_parallel_initialization = true;
       aggr_state_num_partitions = 
CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(max_num_groups);
 
-      DCHECK(!group_by_aggrs_info.empty());
       CalculateCollisionFreeAggregationInfo(max_num_groups, 
group_by_aggrs_info,
                                             
aggr_state_proto->mutable_collision_free_vector_info());
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp 
b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index e0e3dff..2c84fc5 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -486,7 +486,7 @@ std::size_t StarSchemaSimpleCostModel::getNumDistinctValues(
       return stat.getNumDistinctValues(rel_attr_id);
     }
   }
-  return estimateCardinalityForTableReference(table_reference);
+  return estimateCardinalityForTableReference(table_reference) * 0.5;
 }
 
 bool StarSchemaSimpleCostModel::impliesUniqueAttributes(
@@ -520,7 +520,7 @@ bool StarSchemaSimpleCostModel::impliesUniqueAttributes(
           std::static_pointer_cast<const P::TableReference>(physical_plan);
       const CatalogRelationStatistics &stat =
           table_reference->relation()->getStatistics();
-      if (stat.hasNumTuples()) {
+      if (stat.isExact() && stat.hasNumTuples()) {
         const std::size_t num_tuples = stat.getNumTuples();
         for (const auto &attr : attributes) {
           const attribute_id rel_attr_id =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp 
b/query_optimizer/resolver/Resolver.cpp
index 0b6dc22..17198c2 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -1019,10 +1019,13 @@ L::LogicalPtr Resolver::resolveInsertSelection(
     if (destination_type.equals(selection_type)) {
       cast_expressions.emplace_back(selection_attributes[aid]);
     } else {
-      // TODO(jianqiao): implement Cast operation for non-numeric types.
+      // TODO(jianqiao): Implement Cast operation for non-numeric types.
+      // TODO(jianqiao): We temporarily disable the safely-coercible check for
+      // tricks that work around "argmin". Will switch it back once the "Cast"
+      // function is supported.
       if (destination_type.getSuperTypeID() == Type::SuperTypeID::kNumeric &&
           selection_type.getSuperTypeID() == Type::SuperTypeID::kNumeric &&
-          destination_type.isSafelyCoercibleFrom(selection_type)) {
+          destination_type.isCoercibleFrom(selection_type)) {
         // Add cast operation
         const E::AttributeReferencePtr attr = selection_attributes[aid];
         const E::ExpressionPtr cast_expr =
@@ -1038,7 +1041,7 @@ L::LogicalPtr Resolver::resolveInsertSelection(
             << insert_statement.relation_name()->value() << "."
             << destination_attributes[aid]->attribute_name() << " has type "
             << selection_attributes[aid]->getValueType().getName()
-            << ", which cannot be safely coerced to the column's type "
+            << ", which cannot be coerced to the column's type "
             << destination_attributes[aid]->getValueType().getName();
      }
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/relational_operators/TableExportOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableExportOperator.cpp 
b/relational_operators/TableExportOperator.cpp
index f6a73bf..cb90d72 100644
--- a/relational_operators/TableExportOperator.cpp
+++ b/relational_operators/TableExportOperator.cpp
@@ -120,6 +120,8 @@ void TableExportOperator::receiveFeedbackMessage(
     } else if (lo_file_name == "$stderr") {
       file_ = stderr;
     } else {
+      DCHECK(!file_name_.empty());
+      DCHECK_EQ('@', file_name_.front());
       file_ = std::fopen(file_name_.substr(1).c_str(), "wb");
       // TODO(quickstep-team): Decent handling of exceptions at query runtime.
       if (file_ == nullptr) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp 
b/relational_operators/TextScanOperator.cpp
index 66137d8..88b7214 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -57,6 +57,7 @@
 #include "types/containers/Tuple.hpp"
 #include "utility/BulkIoConfiguration.hpp"
 #include "utility/Glob.hpp"
+#include "utility/ScopedBuffer.hpp"
 
 #include "gflags/gflags.h"
 #include "glog/logging.h"
@@ -110,19 +111,36 @@ bool TextScanOperator::getAllWorkOrders(
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
   DCHECK(query_context != nullptr);
+  DCHECK(!file_pattern_.empty());
 
-  const std::vector<std::string> files = 
utility::file::GlobExpand(file_pattern_);
-
-  CHECK_NE(files.size(), 0u)
-      << "No files matched '" << file_pattern_ << "'. Exiting.";
+  if (work_generated_) {
+    return true;
+  }
 
   InsertDestination *output_destination =
       query_context->getInsertDestination(output_destination_index_);
 
-  if (work_generated_) {
+  if (file_pattern_ == "$stdin") {
+    container->addNormalWorkOrder(
+        new TextScanWorkOrder(query_id_,
+                              file_pattern_,
+                              0,
+                              -1 /* text_segment_size */,
+                              options_->getDelimiter(),
+                              options_->escapeStrings(),
+                              output_destination),
+        op_index_);
+    work_generated_ = true;
     return true;
   }
 
+  DCHECK_EQ('@', file_pattern_.front());
+  const std::vector<std::string> files =
+      utility::file::GlobExpand(file_pattern_.substr(1));
+
+  CHECK_NE(files.size(), 0u)
+      << "No files matched '" << file_pattern_ << "'. Exiting.";
+
   for (const std::string &file : files) {
 #ifdef QUICKSTEP_HAVE_UNISTD
     // Check file permissions before trying to open it.
@@ -221,6 +239,12 @@ serialization::WorkOrder* 
TextScanOperator::createWorkOrderProto(
 }
 
 void TextScanWorkOrder::execute() {
+  DCHECK(!filename_.empty());
+  if (filename_ == "$stdin") {
+    executeInputStream();
+    return;
+  }
+
   const CatalogRelationSchema &relation = output_destination_->getRelation();
   std::vector<Tuple> tuples;
   bool is_faulty;
@@ -436,6 +460,76 @@ void TextScanWorkOrder::execute() {
   output_destination_->bulkInsertTuples(&column_vectors);
 }
 
+void TextScanWorkOrder::executeInputStream() {
+  std::string data;
+  const int len = std::ftell(stdin);
+  if (len >= 0) {
+    ScopedBuffer buffer(len, false);
+    std::rewind(stdin);
+    std::fread(buffer.get(), 1, len, stdin);
+    data = std::string(static_cast<char*>(buffer.get()), len);
+  } else {
+    std::unique_ptr<std::ostringstream> oss = 
std::make_unique<std::ostringstream>();
+    *oss << std::cin.rdbuf();
+    data = oss->str();
+    oss.reset();
+  }
+
+  if (data.back() != '\n') {
+    data.push_back('\n');
+  }
+
+  const CatalogRelationSchema &relation = output_destination_->getRelation();
+  std::vector<Tuple> tuples;
+  std::vector<TypedValue> row_tuple;
+  bool is_faulty;
+
+  const char *row_ptr = data.c_str();
+  const char *end_ptr = row_ptr + data.length();
+
+  while (row_ptr < end_ptr) {
+    if (*row_ptr == '\r' || *row_ptr == '\n') {
+      // Skip empty lines.
+      ++row_ptr;
+    } else {
+      row_tuple = parseRow(&row_ptr, relation, &is_faulty);
+      if (is_faulty) {
+        // Skip faulty rows
+        LOG(INFO) << "Faulty row found. Hence switching to next row.";
+      } else {
+        // Convert vector returned to tuple only when a valid row is 
encountered.
+        tuples.emplace_back(Tuple(std::move(row_tuple)));
+      }
+    }
+  }
+
+  // Store the tuples in a ColumnVectorsValueAccessor for bulk insert.
+  ColumnVectorsValueAccessor column_vectors;
+  std::size_t attr_id = 0;
+  for (const auto &attribute : relation) {
+    const Type &attr_type = attribute.getType();
+    if (attr_type.isVariableLength()) {
+      std::unique_ptr<IndirectColumnVector> column(
+          new IndirectColumnVector(attr_type, tuples.size()));
+      for (const auto &tuple : tuples) {
+        column->appendTypedValue(tuple.getAttributeValue(attr_id));
+      }
+      column_vectors.addColumn(column.release());
+    } else {
+      std::unique_ptr<NativeColumnVector> column(
+          new NativeColumnVector(attr_type, tuples.size()));
+      for (const auto &tuple : tuples) {
+        column->appendTypedValue(tuple.getAttributeValue(attr_id));
+      }
+      column_vectors.addColumn(column.release());
+    }
+    ++attr_id;
+  }
+
+  // Bulk insert the tuples.
+  output_destination_->bulkInsertTuples(&column_vectors);
+}
+
 std::vector<TypedValue> TextScanWorkOrder::parseRow(const char **row_ptr,
                                                     const 
CatalogRelationSchema &relation,
                                                     bool *is_faulty) const {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp 
b/relational_operators/TextScanOperator.hpp
index 01c559c..30462d7 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -225,6 +225,8 @@ class TextScanWorkOrder : public WorkOrder {
   void execute() override;
 
  private:
+  void executeInputStream();
+
   /**
    * @brief Extract a field string starting at \p *field_ptr. This method also
    *        expands escape sequences if \p process_escape_sequences_ is true.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp 
b/storage/AggregationOperationState.cpp
index 73f1983..9d58107 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -393,8 +393,7 @@ bool AggregationOperationState::ProtoIsValid(
       }
 
       const S::CollisionFreeVectorInfo &proto_collision_free_vector_info = 
proto.collision_free_vector_info();
-      if (!proto_collision_free_vector_info.IsInitialized() ||
-          proto_collision_free_vector_info.state_offsets_size() != 
group_by_expressions_size) {
+      if (!proto_collision_free_vector_info.IsInitialized()) {
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aec7623a/utility/ExecutionDAGVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/ExecutionDAGVisualizer.cpp 
b/utility/ExecutionDAGVisualizer.cpp
index 8059ef3..f009a72 100644
--- a/utility/ExecutionDAGVisualizer.cpp
+++ b/utility/ExecutionDAGVisualizer.cpp
@@ -234,9 +234,6 @@ void ExecutionDAGVisualizer::bindProfilingStats(
         std::max(time_end[relop_index], workorder_end_time);
     time_elapsed[relop_index] += (workorder_end_time - workorder_start_time);
 
-    if (workorders_count.find(relop_index) == workorders_count.end()) {
-      workorders_count[relop_index] = 0;
-    }
     ++workorders_count[relop_index];
     if (mean_time_per_workorder.find(relop_index) ==
         mean_time_per_workorder.end()) {
@@ -292,8 +289,7 @@ void ExecutionDAGVisualizer::bindProfilingStats(
       node_info.labels.emplace_back(
           "effective concurrency: " + FormatDigits(concurrency, 2));
 
-      DCHECK(workorders_count.find(node_index) != workorders_count.end());
-      const std::size_t workorders_count_for_node = 
workorders_count.at(node_index);
+      const std::size_t workorders_count_for_node = 
workorders_count[node_index];
       if (workorders_count_for_node > 0) {
         mean_time_per_workorder[node_index] =
             mean_time_per_workorder[node_index] /

Reply via email to