westonpace commented on a change in pull request #10568:
URL: https://github.com/apache/arrow/pull/10568#discussion_r665828836



##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -347,6 +348,175 @@ class ThreadedBlockReader : public BlockReader {
   }
 };
 
+struct ParsedBlock {
+  std::shared_ptr<BlockParser> parser;
+  int64_t block_index;
+  int64_t bytes_parsed_or_skipped;
+};
+
+}  // namespace
+
+}  // namespace csv
+
+template <>
+struct IterationTraits<csv::ParsedBlock> {
+  static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; }
+  static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; 
}
+};
+
+namespace csv {
+
+namespace {
+
+// A functor that takes in a buffer of CSV data and returns a parsed batch of 
CSV data.
+// The parsed batch contains a list of offsets for each of the columns so that 
columns
+// can be individually scanned
+//
+// This operator is not re-entrant
+class BlockParsingOperator {
+ public:
+  BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options,
+                       int num_csv_cols, bool count_rows)
+      : io_context_(io_context),
+        parse_options_(parse_options),
+        num_csv_cols_(num_csv_cols),
+        count_rows_(count_rows) {}
+
+  Result<ParsedBlock> operator()(const CSVBlock& block) {
+    static constexpr int32_t max_num_rows = 
std::numeric_limits<int32_t>::max();
+    auto parser = std::make_shared<BlockParser>(
+        io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, 
max_num_rows);
+
+    std::shared_ptr<Buffer> straddling;
+    std::vector<util::string_view> views;
+    if (block.partial->size() != 0 || block.completion->size() != 0) {
+      if (block.partial->size() == 0) {
+        straddling = block.completion;
+      } else if (block.completion->size() == 0) {
+        straddling = block.partial;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(
+            straddling,
+            ConcatenateBuffers({block.partial, block.completion}, 
io_context_.pool()));
+      }
+      views = {util::string_view(*straddling), 
util::string_view(*block.buffer)};
+    } else {
+      views = {util::string_view(*block.buffer)};
+    }
+    uint32_t parsed_size;
+    if (block.is_final) {
+      RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size));
+    } else {
+      RETURN_NOT_OK(parser->Parse(views, &parsed_size));
+    }
+    if (count_rows_) {
+      num_rows_seen_ += parser->num_rows();
+    }
+    RETURN_NOT_OK(block.consume_bytes(parsed_size));
+    return ParsedBlock{std::move(parser), block.block_index,
+                       static_cast<int64_t>(parsed_size) + 
block.bytes_skipped};
+  }
+
+ private:
+  io::IOContext io_context_;
+  ParseOptions parse_options_;
+  int num_csv_cols_;
+  bool count_rows_;
+  int num_rows_seen_ = 0;
+};
+
+class BlockDecodingOperator {
+ public:
+  Future<std::shared_ptr<RecordBatch>> operator()(const ParsedBlock& block) {
+    DCHECK(!state_->column_decoders.empty());
+    std::vector<Future<std::shared_ptr<Array>>> decoded_array_futs;
+    for (auto& decoder : state_->column_decoders) {
+      decoded_array_futs.push_back(decoder->Decode(block.parser));
+    }
+    auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped;
+    auto decoded_arrays_fut = All(decoded_array_futs);
+    auto state = state_;
+    return decoded_arrays_fut.Then(
+        [state, bytes_parsed_or_skipped](
+            const std::vector<Result<std::shared_ptr<Array>>>& 
maybe_decoded_arrays)
+            -> Result<std::shared_ptr<RecordBatch>> {
+          state->bytes_decoded_->fetch_add(bytes_parsed_or_skipped);
+          ARROW_ASSIGN_OR_RAISE(auto decoded_arrays,
+                                internal::UnwrapOrRaise(maybe_decoded_arrays));
+          return state->DecodedArraysToBatch(decoded_arrays);
+        });
+  }
+
+  static Result<BlockDecodingOperator> Make(io::IOContext io_context,
+                                            ConvertOptions convert_options,
+                                            ConversionSchema conversion_schema,
+                                            std::atomic<int64_t>* 
bytes_decoded) {
+    BlockDecodingOperator op(std::move(io_context), std::move(convert_options),
+                             std::move(conversion_schema), bytes_decoded);
+    RETURN_NOT_OK(op.state_->MakeColumnDecoders());
+    return op;
+  }
+
+ private:
+  BlockDecodingOperator(io::IOContext io_context, ConvertOptions 
convert_options,
+                        ConversionSchema conversion_schema,
+                        std::atomic<int64_t>* bytes_decoded)
+      : state_(std::make_shared<State>(std::move(io_context), 
std::move(convert_options),
+                                       std::move(conversion_schema), 
bytes_decoded)) {}
+
+  struct State {
+    State(io::IOContext io_context, ConvertOptions convert_options,
+          ConversionSchema conversion_schema, std::atomic<int64_t>* 
bytes_decoded)
+        : io_context(std::move(io_context)),
+          convert_options(std::move(convert_options)),
+          conversion_schema(std::move(conversion_schema)),
+          bytes_decoded_(bytes_decoded) {}
+
+    Result<std::shared_ptr<RecordBatch>> DecodedArraysToBatch(
+        std::vector<std::shared_ptr<Array>>& arrays) {
+      if (schema == nullptr) {
+        FieldVector fields(arrays.size());
+        for (size_t i = 0; i < arrays.size(); ++i) {
+          fields[i] = field(conversion_schema.columns[i].name, 
arrays[i]->type());
+        }
+        schema = arrow::schema(std::move(fields));
+      }
+      const auto n_rows = arrays[0]->length();
+      return RecordBatch::Make(schema, n_rows, std::move(arrays));
+    }
+
+    // Make column decoders from conversion schema
+    Status MakeColumnDecoders() {
+      for (const auto& column : conversion_schema.columns) {
+        std::shared_ptr<ColumnDecoder> decoder;
+        if (column.is_missing) {
+          ARROW_ASSIGN_OR_RAISE(decoder,
+                                ColumnDecoder::MakeNull(io_context.pool(), 
column.type));
+        } else if (column.type != nullptr) {
+          ARROW_ASSIGN_OR_RAISE(
+              decoder, ColumnDecoder::Make(io_context.pool(), column.type, 
column.index,
+                                           convert_options));
+        } else {
+          ARROW_ASSIGN_OR_RAISE(
+              decoder,
+              ColumnDecoder::Make(io_context.pool(), column.index, 
convert_options));
+        }
+        column_decoders.push_back(std::move(decoder));
+      }
+      return Status::OK();
+    }
+
+    io::IOContext io_context;
+    ConvertOptions convert_options;
+    ConversionSchema conversion_schema;
+    std::vector<std::shared_ptr<ColumnDecoder>> column_decoders;
+    std::shared_ptr<Schema> schema;
+    std::atomic<int64_t>* bytes_decoded_;

Review comment:
       I ended up getting rid of this member but yes, it should have been 
removed.

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -347,6 +348,175 @@ class ThreadedBlockReader : public BlockReader {
   }
 };
 
+struct ParsedBlock {
+  std::shared_ptr<BlockParser> parser;
+  int64_t block_index;
+  int64_t bytes_parsed_or_skipped;
+};
+
+}  // namespace
+
+}  // namespace csv
+
+template <>
+struct IterationTraits<csv::ParsedBlock> {
+  static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; }
+  static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; 
}
+};
+
+namespace csv {
+
+namespace {
+
+// A functor that takes in a buffer of CSV data and returns a parsed batch of 
CSV data.
+// The parsed batch contains a list of offsets for each of the columns so that 
columns
+// can be individually scanned
+//
+// This operator is not re-entrant
+class BlockParsingOperator {
+ public:
+  BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options,
+                       int num_csv_cols, bool count_rows)
+      : io_context_(io_context),
+        parse_options_(parse_options),
+        num_csv_cols_(num_csv_cols),
+        count_rows_(count_rows) {}
+
+  Result<ParsedBlock> operator()(const CSVBlock& block) {
+    static constexpr int32_t max_num_rows = 
std::numeric_limits<int32_t>::max();
+    auto parser = std::make_shared<BlockParser>(
+        io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, 
max_num_rows);
+
+    std::shared_ptr<Buffer> straddling;
+    std::vector<util::string_view> views;
+    if (block.partial->size() != 0 || block.completion->size() != 0) {
+      if (block.partial->size() == 0) {
+        straddling = block.completion;
+      } else if (block.completion->size() == 0) {
+        straddling = block.partial;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(
+            straddling,
+            ConcatenateBuffers({block.partial, block.completion}, 
io_context_.pool()));
+      }
+      views = {util::string_view(*straddling), 
util::string_view(*block.buffer)};
+    } else {
+      views = {util::string_view(*block.buffer)};
+    }
+    uint32_t parsed_size;
+    if (block.is_final) {
+      RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size));
+    } else {
+      RETURN_NOT_OK(parser->Parse(views, &parsed_size));
+    }
+    if (count_rows_) {
+      num_rows_seen_ += parser->num_rows();
+    }
+    RETURN_NOT_OK(block.consume_bytes(parsed_size));
+    return ParsedBlock{std::move(parser), block.block_index,
+                       static_cast<int64_t>(parsed_size) + 
block.bytes_skipped};
+  }
+
+ private:
+  io::IOContext io_context_;
+  ParseOptions parse_options_;
+  int num_csv_cols_;
+  bool count_rows_;
+  int num_rows_seen_ = 0;
+};
+
+class BlockDecodingOperator {
+ public:
+  Future<std::shared_ptr<RecordBatch>> operator()(const ParsedBlock& block) {
+    DCHECK(!state_->column_decoders.empty());
+    std::vector<Future<std::shared_ptr<Array>>> decoded_array_futs;
+    for (auto& decoder : state_->column_decoders) {
+      decoded_array_futs.push_back(decoder->Decode(block.parser));
+    }
+    auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped;
+    auto decoded_arrays_fut = All(decoded_array_futs);
+    auto state = state_;
+    return decoded_arrays_fut.Then(
+        [state, bytes_parsed_or_skipped](
+            const std::vector<Result<std::shared_ptr<Array>>>& 
maybe_decoded_arrays)
+            -> Result<std::shared_ptr<RecordBatch>> {
+          state->bytes_decoded_->fetch_add(bytes_parsed_or_skipped);
+          ARROW_ASSIGN_OR_RAISE(auto decoded_arrays,
+                                internal::UnwrapOrRaise(maybe_decoded_arrays));
+          return state->DecodedArraysToBatch(decoded_arrays);
+        });
+  }
+
+  static Result<BlockDecodingOperator> Make(io::IOContext io_context,
+                                            ConvertOptions convert_options,
+                                            ConversionSchema conversion_schema,
+                                            std::atomic<int64_t>* 
bytes_decoded) {
+    BlockDecodingOperator op(std::move(io_context), std::move(convert_options),
+                             std::move(conversion_schema), bytes_decoded);
+    RETURN_NOT_OK(op.state_->MakeColumnDecoders());
+    return op;
+  }
+
+ private:
+  BlockDecodingOperator(io::IOContext io_context, ConvertOptions 
convert_options,
+                        ConversionSchema conversion_schema,
+                        std::atomic<int64_t>* bytes_decoded)
+      : state_(std::make_shared<State>(std::move(io_context), 
std::move(convert_options),
+                                       std::move(conversion_schema), 
bytes_decoded)) {}
+
+  struct State {
+    State(io::IOContext io_context, ConvertOptions convert_options,
+          ConversionSchema conversion_schema, std::atomic<int64_t>* 
bytes_decoded)
+        : io_context(std::move(io_context)),
+          convert_options(std::move(convert_options)),
+          conversion_schema(std::move(conversion_schema)),
+          bytes_decoded_(bytes_decoded) {}
+
+    Result<std::shared_ptr<RecordBatch>> DecodedArraysToBatch(
+        std::vector<std::shared_ptr<Array>>& arrays) {
+      if (schema == nullptr) {
+        FieldVector fields(arrays.size());
+        for (size_t i = 0; i < arrays.size(); ++i) {
+          fields[i] = field(conversion_schema.columns[i].name, 
arrays[i]->type());
+        }
+        schema = arrow::schema(std::move(fields));
+      }
+      const auto n_rows = arrays[0]->length();
+      return RecordBatch::Make(schema, n_rows, std::move(arrays));
+    }
+
+    // Make column decoders from conversion schema
+    Status MakeColumnDecoders() {
+      for (const auto& column : conversion_schema.columns) {
+        std::shared_ptr<ColumnDecoder> decoder;
+        if (column.is_missing) {
+          ARROW_ASSIGN_OR_RAISE(decoder,
+                                ColumnDecoder::MakeNull(io_context.pool(), 
column.type));
+        } else if (column.type != nullptr) {
+          ARROW_ASSIGN_OR_RAISE(
+              decoder, ColumnDecoder::Make(io_context.pool(), column.type, 
column.index,
+                                           convert_options));
+        } else {
+          ARROW_ASSIGN_OR_RAISE(
+              decoder,
+              ColumnDecoder::Make(io_context.pool(), column.index, 
convert_options));
+        }
+        column_decoders.push_back(std::move(decoder));
+      }
+      return Status::OK();
+    }
+
+    io::IOContext io_context;

Review comment:
       Good observation, I've done this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to