westonpace commented on a change in pull request #10568:
URL: https://github.com/apache/arrow/pull/10568#discussion_r665828703
##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -640,264 +815,112 @@ class BaseTableReader : public ReaderMixin, public
csv::TableReader {
/////////////////////////////////////////////////////////////////////////
// Base class for streaming readers
-class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader {
+class StreamingReaderImpl : public ReaderMixin,
+ public csv::StreamingReader,
+ public
std::enable_shared_from_this<StreamingReaderImpl> {
public:
- BaseStreamingReader(io::IOContext io_context, Executor* cpu_executor,
- std::shared_ptr<io::InputStream> input,
+ StreamingReaderImpl(io::IOContext io_context,
std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions&
parse_options,
const ConvertOptions& convert_options, bool count_rows)
: ReaderMixin(io_context, std::move(input), read_options, parse_options,
convert_options, count_rows),
- cpu_executor_(cpu_executor) {}
-
- virtual Future<std::shared_ptr<csv::StreamingReader>> Init() = 0;
-
- std::shared_ptr<Schema> schema() const override { return schema_; }
-
- Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
- auto next_fut = ReadNextAsync();
- auto next_result = next_fut.result();
- return std::move(next_result).Value(batch);
- }
+ bytes_decoded_(0) {}
- protected:
- // Make column decoders from conversion schema
- Status MakeColumnDecoders() {
- for (const auto& column : conversion_schema_.columns) {
- std::shared_ptr<ColumnDecoder> decoder;
- if (column.is_missing) {
- ARROW_ASSIGN_OR_RAISE(decoder,
ColumnDecoder::MakeNull(io_context_.pool(),
- column.type,
task_group_));
- } else if (column.type != nullptr) {
- ARROW_ASSIGN_OR_RAISE(
- decoder, ColumnDecoder::Make(io_context_.pool(), column.type,
column.index,
- convert_options_, task_group_));
- } else {
- ARROW_ASSIGN_OR_RAISE(decoder,
- ColumnDecoder::Make(io_context_.pool(),
column.index,
- convert_options_,
task_group_));
- }
- column_decoders_.push_back(std::move(decoder));
- }
- return Status::OK();
- }
-
- Result<int64_t> ParseAndInsert(const std::shared_ptr<Buffer>& partial,
- const std::shared_ptr<Buffer>& completion,
- const std::shared_ptr<Buffer>& block,
- int64_t block_index, bool is_final) {
- ARROW_ASSIGN_OR_RAISE(auto result,
- Parse(partial, completion, block, block_index,
is_final));
- RETURN_NOT_OK(ProcessData(result.parser, block_index));
- return result.parsed_bytes;
- }
-
- // Trigger conversion of parsed block data
- Status ProcessData(const std::shared_ptr<BlockParser>& parser, int64_t
block_index) {
- for (auto& decoder : column_decoders_) {
- decoder->Insert(block_index, parser);
- }
- return Status::OK();
- }
-
- Result<std::shared_ptr<RecordBatch>> DecodeNextBatch() {
- DCHECK(!column_decoders_.empty());
- ArrayVector arrays;
- arrays.reserve(column_decoders_.size());
- Status st;
- for (auto& decoder : column_decoders_) {
- auto maybe_array = decoder->NextChunk();
- if (!maybe_array.ok()) {
- // If there's an error, still fetch results from other decoders to
- // keep them in sync.
- st &= maybe_array.status();
- } else {
- arrays.push_back(*std::move(maybe_array));
- }
- }
- RETURN_NOT_OK(st);
- DCHECK_EQ(arrays.size(), column_decoders_.size());
- const bool is_null = (arrays[0] == nullptr);
-#ifndef NDEBUG
- for (const auto& array : arrays) {
- DCHECK_EQ(array == nullptr, is_null);
- }
-#endif
- if (is_null) {
- eof_ = true;
- return nullptr;
- }
-
- if (schema_ == nullptr) {
- FieldVector fields(arrays.size());
- for (size_t i = 0; i < arrays.size(); ++i) {
- fields[i] = field(conversion_schema_.columns[i].name,
arrays[i]->type());
- }
- schema_ = arrow::schema(std::move(fields));
- }
- const auto n_rows = arrays[0]->length();
- return RecordBatch::Make(schema_, n_rows, std::move(arrays));
- }
-
- // Column decoders (in ConversionSchema order)
- std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_;
- std::shared_ptr<Schema> schema_;
- std::shared_ptr<RecordBatch> pending_batch_;
- AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
- Executor* cpu_executor_;
- bool eof_ = false;
-};
-
-/////////////////////////////////////////////////////////////////////////
-// Serial StreamingReader implementation
-
-class SerialStreamingReader : public BaseStreamingReader,
- public
std::enable_shared_from_this<SerialStreamingReader> {
- public:
- using BaseStreamingReader::BaseStreamingReader;
-
- Future<std::shared_ptr<csv::StreamingReader>> Init() override {
+ Future<> Init(Executor* cpu_executor) {
ARROW_ASSIGN_OR_RAISE(auto istream_it,
io::MakeInputStreamIterator(input_,
read_options_.block_size));
// TODO Consider exposing readahead as a read option (ARROW-12090)
ARROW_ASSIGN_OR_RAISE(auto bg_it,
MakeBackgroundGenerator(std::move(istream_it),
io_context_.executor()));
- auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);
+ auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor);
- buffer_generator_ =
CSVBufferIterator::MakeAsync(std::move(transferred_it));
- task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token());
+ auto buffer_generator =
CSVBufferIterator::MakeAsync(std::move(transferred_it));
+ int max_readahead = cpu_executor->GetCapacity();
auto self = shared_from_this();
- // Read schema from first batch
- return ReadNextAsync(true).Then(
- [self](const std::shared_ptr<RecordBatch>& first_batch)
- -> Result<std::shared_ptr<csv::StreamingReader>> {
- self->pending_batch_ = first_batch;
- DCHECK_NE(self->schema_, nullptr);
- return self;
- });
- }
- Result<std::shared_ptr<RecordBatch>> DecodeBatchAndUpdateSchema() {
- auto maybe_batch = DecodeNextBatch();
- if (schema_ == nullptr && maybe_batch.ok()) {
- schema_ = (*maybe_batch)->schema();
- }
- return maybe_batch;
+ return buffer_generator().Then([self, buffer_generator, max_readahead](
+ const std::shared_ptr<Buffer>&
first_buffer) {
+ return self->InitAfterFirstBuffer(first_buffer, buffer_generator,
max_readahead);
+ });
}
- Future<std::shared_ptr<RecordBatch>> DoReadNext(
- std::shared_ptr<SerialStreamingReader> self) {
- auto batch = std::move(pending_batch_);
- if (batch != nullptr) {
- return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
- }
+ std::shared_ptr<Schema> schema() const override { return schema_; }
- if (!source_eof_) {
- return block_generator_()
- .Then([self](const CSVBlock& maybe_block) -> Status {
- if (!IsIterationEnd(maybe_block)) {
- self->bytes_parsed_ += maybe_block.bytes_skipped;
- self->last_block_index_ = maybe_block.block_index;
- auto maybe_parsed = self->ParseAndInsert(
- maybe_block.partial, maybe_block.completion,
maybe_block.buffer,
- maybe_block.block_index, maybe_block.is_final);
- if (!maybe_parsed.ok()) {
- // Parse error => bail out
- self->eof_ = true;
- return maybe_parsed.status();
- }
- self->bytes_parsed_ += *maybe_parsed;
- RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed));
- } else {
- self->source_eof_ = true;
- for (auto& decoder : self->column_decoders_) {
- decoder->SetEOF(self->last_block_index_ + 1);
- }
- }
- return Status::OK();
- })
- .Then([self]() -> Result<std::shared_ptr<RecordBatch>> {
- return self->DecodeBatchAndUpdateSchema();
- });
- }
- return Future<std::shared_ptr<RecordBatch>>::MakeFinished(
- DecodeBatchAndUpdateSchema());
- }
+ int64_t bytes_read() const override { return bytes_decoded_.load(); }
- Future<std::shared_ptr<RecordBatch>> ReadNextSkippingEmpty(
- std::shared_ptr<SerialStreamingReader> self, bool internal_read) {
- return DoReadNext(self).Then(
- [self, internal_read](const std::shared_ptr<RecordBatch>& batch) {
- if (batch != nullptr && batch->num_rows() == 0) {
- return self->ReadNextSkippingEmpty(self, internal_read);
- }
- if (!internal_read) {
- self->bytes_decoded_ += self->bytes_parsed_;
- self->bytes_parsed_ = 0;
- }
- return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
- });
+ Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+ auto next_fut = ReadNextAsync();
+ auto next_result = next_fut.result();
+ return std::move(next_result).Value(batch);
}
Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
- return ReadNextAsync(false);
- };
-
- int64_t bytes_read() const override { return bytes_decoded_; }
+ return record_batch_gen_();
+ }
protected:
- Future<> SetupReader(std::shared_ptr<SerialStreamingReader> self) {
- return buffer_generator_().Then([self](const std::shared_ptr<Buffer>&
first_buffer) {
- if (first_buffer == nullptr) {
- return Status::Invalid("Empty CSV file");
- }
- auto own_first_buffer = first_buffer;
- auto start = own_first_buffer->data();
- RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer));
- self->bytes_decoded_ = own_first_buffer->data() - start;
- RETURN_NOT_OK(self->MakeColumnDecoders());
-
- self->block_generator_ = SerialBlockReader::MakeAsyncIterator(
- std::move(self->buffer_generator_),
MakeChunker(self->parse_options_),
- std::move(own_first_buffer),
self->read_options_.skip_rows_after_names);
- return Status::OK();
- });
+ Future<> InitAfterFirstBuffer(const std::shared_ptr<Buffer>& first_buffer,
+ AsyncGenerator<std::shared_ptr<Buffer>>
buffer_generator,
+ int max_readahead) {
+ if (first_buffer == nullptr) {
+ return Status::Invalid("Empty CSV file");
+ }
+
+ std::shared_ptr<Buffer> after_header;
+ ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed,
+ ProcessHeader(first_buffer, &after_header));
+ bytes_decoded_.fetch_add(header_bytes_consumed);
+ auto parser_op =
+ BlockParsingOperator(io_context_, parse_options_, num_csv_cols_,
count_rows_);
+ ARROW_ASSIGN_OR_RAISE(auto decoder_op, BlockDecodingOperator::Make(
+ io_context_, convert_options_,
+ conversion_schema_,
&bytes_decoded_));
+ auto block_gen = SerialBlockReader::MakeAsyncIterator(
+ std::move(buffer_generator), MakeChunker(parse_options_),
std::move(after_header),
+ read_options_.skip_rows_after_names);
+ auto parsed_block_gen =
+ MakeMappedGenerator<ParsedBlock>(std::move(block_gen),
std::move(parser_op));
+ auto rb_gen = MakeMappedGenerator<std::shared_ptr<RecordBatch>>(
+ std::move(parsed_block_gen), decoder_op);
+ auto self = shared_from_this();
+ return rb_gen().Then(
+ [self, rb_gen, max_readahead](const std::shared_ptr<RecordBatch>&
first_batch) {
+ return self->InitAfterFirstBatch(first_batch, std::move(rb_gen),
max_readahead);
+ });
}
- Future<std::shared_ptr<RecordBatch>> ReadNextAsync(bool internal_read) {
- if (eof_) {
- return Future<std::shared_ptr<RecordBatch>>::MakeFinished(nullptr);
- }
- if (io_context_.stop_token().IsStopRequested()) {
- eof_ = true;
- return io_context_.stop_token().Poll();
+ Status InitAfterFirstBatch(const std::shared_ptr<RecordBatch>& first_batch,
+ AsyncGenerator<std::shared_ptr<RecordBatch>>
batch_gen,
+ int max_readahead) {
+ schema_ = first_batch->schema();
+
+ AsyncGenerator<std::shared_ptr<RecordBatch>> readahead_gen;
+ if (read_options_.use_threads) {
+ readahead_gen = MakeReadaheadGenerator(std::move(batch_gen),
max_readahead);
+ } else {
+ readahead_gen = std::move(batch_gen);
}
- auto self = shared_from_this();
- if (!block_generator_) {
- return SetupReader(self).Then(
- [self, internal_read]() -> Future<std::shared_ptr<RecordBatch>> {
- return self->ReadNextSkippingEmpty(self, internal_read);
- },
- [self](const Status& err) -> Result<std::shared_ptr<RecordBatch>> {
- self->eof_ = true;
- return err;
- });
+
+ AsyncGenerator<std::shared_ptr<RecordBatch>> restarted_gen;
+ // Streaming reader should not emit empty record batches
+ if (first_batch->num_rows() > 0) {
+ restarted_gen = MakeGeneratorStartsWith({first_batch},
std::move(readahead_gen));
} else {
- return self->ReadNextSkippingEmpty(self, internal_read);
+ restarted_gen = std::move(readahead_gen);
}
+ record_batch_gen_ =
+ MakeCancellable(std::move(restarted_gen), io_context_.stop_token());
+ return Status::OK();
}
- bool source_eof_ = false;
- int64_t last_block_index_ = 0;
- AsyncGenerator<CSVBlock> block_generator_;
- // bytes of data parsed but not yet decoded
- int64_t bytes_parsed_ = 0;
+ std::shared_ptr<Schema> schema_;
+ AsyncGenerator<std::shared_ptr<RecordBatch>> record_batch_gen_;
// bytes which have been decoded for caller
- int64_t bytes_decoded_ = 0;
-};
+ std::atomic<int64_t> bytes_decoded_;
+}; // namespace
Review comment:
Oops. Removed.
##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -640,264 +815,112 @@ class BaseTableReader : public ReaderMixin, public
csv::TableReader {
/////////////////////////////////////////////////////////////////////////
// Base class for streaming readers
-class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader {
+class StreamingReaderImpl : public ReaderMixin,
+ public csv::StreamingReader,
+ public
std::enable_shared_from_this<StreamingReaderImpl> {
public:
- BaseStreamingReader(io::IOContext io_context, Executor* cpu_executor,
- std::shared_ptr<io::InputStream> input,
+ StreamingReaderImpl(io::IOContext io_context,
std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions&
parse_options,
const ConvertOptions& convert_options, bool count_rows)
: ReaderMixin(io_context, std::move(input), read_options, parse_options,
convert_options, count_rows),
- cpu_executor_(cpu_executor) {}
-
- virtual Future<std::shared_ptr<csv::StreamingReader>> Init() = 0;
-
- std::shared_ptr<Schema> schema() const override { return schema_; }
-
- Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
- auto next_fut = ReadNextAsync();
- auto next_result = next_fut.result();
- return std::move(next_result).Value(batch);
- }
+ bytes_decoded_(0) {}
- protected:
- // Make column decoders from conversion schema
- Status MakeColumnDecoders() {
- for (const auto& column : conversion_schema_.columns) {
- std::shared_ptr<ColumnDecoder> decoder;
- if (column.is_missing) {
- ARROW_ASSIGN_OR_RAISE(decoder,
ColumnDecoder::MakeNull(io_context_.pool(),
- column.type,
task_group_));
- } else if (column.type != nullptr) {
- ARROW_ASSIGN_OR_RAISE(
- decoder, ColumnDecoder::Make(io_context_.pool(), column.type,
column.index,
- convert_options_, task_group_));
- } else {
- ARROW_ASSIGN_OR_RAISE(decoder,
- ColumnDecoder::Make(io_context_.pool(),
column.index,
- convert_options_,
task_group_));
- }
- column_decoders_.push_back(std::move(decoder));
- }
- return Status::OK();
- }
-
- Result<int64_t> ParseAndInsert(const std::shared_ptr<Buffer>& partial,
- const std::shared_ptr<Buffer>& completion,
- const std::shared_ptr<Buffer>& block,
- int64_t block_index, bool is_final) {
- ARROW_ASSIGN_OR_RAISE(auto result,
- Parse(partial, completion, block, block_index,
is_final));
- RETURN_NOT_OK(ProcessData(result.parser, block_index));
- return result.parsed_bytes;
- }
-
- // Trigger conversion of parsed block data
- Status ProcessData(const std::shared_ptr<BlockParser>& parser, int64_t
block_index) {
- for (auto& decoder : column_decoders_) {
- decoder->Insert(block_index, parser);
- }
- return Status::OK();
- }
-
- Result<std::shared_ptr<RecordBatch>> DecodeNextBatch() {
- DCHECK(!column_decoders_.empty());
- ArrayVector arrays;
- arrays.reserve(column_decoders_.size());
- Status st;
- for (auto& decoder : column_decoders_) {
- auto maybe_array = decoder->NextChunk();
- if (!maybe_array.ok()) {
- // If there's an error, still fetch results from other decoders to
- // keep them in sync.
- st &= maybe_array.status();
- } else {
- arrays.push_back(*std::move(maybe_array));
- }
- }
- RETURN_NOT_OK(st);
- DCHECK_EQ(arrays.size(), column_decoders_.size());
- const bool is_null = (arrays[0] == nullptr);
-#ifndef NDEBUG
- for (const auto& array : arrays) {
- DCHECK_EQ(array == nullptr, is_null);
- }
-#endif
- if (is_null) {
- eof_ = true;
- return nullptr;
- }
-
- if (schema_ == nullptr) {
- FieldVector fields(arrays.size());
- for (size_t i = 0; i < arrays.size(); ++i) {
- fields[i] = field(conversion_schema_.columns[i].name,
arrays[i]->type());
- }
- schema_ = arrow::schema(std::move(fields));
- }
- const auto n_rows = arrays[0]->length();
- return RecordBatch::Make(schema_, n_rows, std::move(arrays));
- }
-
- // Column decoders (in ConversionSchema order)
- std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_;
- std::shared_ptr<Schema> schema_;
- std::shared_ptr<RecordBatch> pending_batch_;
- AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
- Executor* cpu_executor_;
- bool eof_ = false;
-};
-
-/////////////////////////////////////////////////////////////////////////
-// Serial StreamingReader implementation
-
-class SerialStreamingReader : public BaseStreamingReader,
- public
std::enable_shared_from_this<SerialStreamingReader> {
- public:
- using BaseStreamingReader::BaseStreamingReader;
-
- Future<std::shared_ptr<csv::StreamingReader>> Init() override {
+ Future<> Init(Executor* cpu_executor) {
ARROW_ASSIGN_OR_RAISE(auto istream_it,
io::MakeInputStreamIterator(input_,
read_options_.block_size));
// TODO Consider exposing readahead as a read option (ARROW-12090)
ARROW_ASSIGN_OR_RAISE(auto bg_it,
MakeBackgroundGenerator(std::move(istream_it),
io_context_.executor()));
- auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);
+ auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor);
- buffer_generator_ =
CSVBufferIterator::MakeAsync(std::move(transferred_it));
- task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token());
+ auto buffer_generator =
CSVBufferIterator::MakeAsync(std::move(transferred_it));
+ int max_readahead = cpu_executor->GetCapacity();
auto self = shared_from_this();
- // Read schema from first batch
- return ReadNextAsync(true).Then(
- [self](const std::shared_ptr<RecordBatch>& first_batch)
- -> Result<std::shared_ptr<csv::StreamingReader>> {
- self->pending_batch_ = first_batch;
- DCHECK_NE(self->schema_, nullptr);
- return self;
- });
- }
- Result<std::shared_ptr<RecordBatch>> DecodeBatchAndUpdateSchema() {
- auto maybe_batch = DecodeNextBatch();
- if (schema_ == nullptr && maybe_batch.ok()) {
- schema_ = (*maybe_batch)->schema();
- }
- return maybe_batch;
+ return buffer_generator().Then([self, buffer_generator, max_readahead](
+ const std::shared_ptr<Buffer>&
first_buffer) {
+ return self->InitAfterFirstBuffer(first_buffer, buffer_generator,
max_readahead);
+ });
}
- Future<std::shared_ptr<RecordBatch>> DoReadNext(
- std::shared_ptr<SerialStreamingReader> self) {
- auto batch = std::move(pending_batch_);
- if (batch != nullptr) {
- return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
- }
+ std::shared_ptr<Schema> schema() const override { return schema_; }
- if (!source_eof_) {
- return block_generator_()
- .Then([self](const CSVBlock& maybe_block) -> Status {
- if (!IsIterationEnd(maybe_block)) {
- self->bytes_parsed_ += maybe_block.bytes_skipped;
- self->last_block_index_ = maybe_block.block_index;
- auto maybe_parsed = self->ParseAndInsert(
- maybe_block.partial, maybe_block.completion,
maybe_block.buffer,
- maybe_block.block_index, maybe_block.is_final);
- if (!maybe_parsed.ok()) {
- // Parse error => bail out
- self->eof_ = true;
- return maybe_parsed.status();
- }
- self->bytes_parsed_ += *maybe_parsed;
- RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed));
- } else {
- self->source_eof_ = true;
- for (auto& decoder : self->column_decoders_) {
- decoder->SetEOF(self->last_block_index_ + 1);
- }
- }
- return Status::OK();
- })
- .Then([self]() -> Result<std::shared_ptr<RecordBatch>> {
- return self->DecodeBatchAndUpdateSchema();
- });
- }
- return Future<std::shared_ptr<RecordBatch>>::MakeFinished(
- DecodeBatchAndUpdateSchema());
- }
+ int64_t bytes_read() const override { return bytes_decoded_.load(); }
- Future<std::shared_ptr<RecordBatch>> ReadNextSkippingEmpty(
- std::shared_ptr<SerialStreamingReader> self, bool internal_read) {
- return DoReadNext(self).Then(
- [self, internal_read](const std::shared_ptr<RecordBatch>& batch) {
- if (batch != nullptr && batch->num_rows() == 0) {
- return self->ReadNextSkippingEmpty(self, internal_read);
- }
- if (!internal_read) {
- self->bytes_decoded_ += self->bytes_parsed_;
- self->bytes_parsed_ = 0;
- }
- return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
- });
+ Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+ auto next_fut = ReadNextAsync();
+ auto next_result = next_fut.result();
+ return std::move(next_result).Value(batch);
}
Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
- return ReadNextAsync(false);
- };
-
- int64_t bytes_read() const override { return bytes_decoded_; }
+ return record_batch_gen_();
+ }
protected:
- Future<> SetupReader(std::shared_ptr<SerialStreamingReader> self) {
- return buffer_generator_().Then([self](const std::shared_ptr<Buffer>&
first_buffer) {
- if (first_buffer == nullptr) {
- return Status::Invalid("Empty CSV file");
- }
- auto own_first_buffer = first_buffer;
- auto start = own_first_buffer->data();
- RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer));
- self->bytes_decoded_ = own_first_buffer->data() - start;
- RETURN_NOT_OK(self->MakeColumnDecoders());
-
- self->block_generator_ = SerialBlockReader::MakeAsyncIterator(
- std::move(self->buffer_generator_),
MakeChunker(self->parse_options_),
- std::move(own_first_buffer),
self->read_options_.skip_rows_after_names);
- return Status::OK();
- });
+ Future<> InitAfterFirstBuffer(const std::shared_ptr<Buffer>& first_buffer,
+ AsyncGenerator<std::shared_ptr<Buffer>>
buffer_generator,
+ int max_readahead) {
+ if (first_buffer == nullptr) {
+ return Status::Invalid("Empty CSV file");
+ }
+
+ std::shared_ptr<Buffer> after_header;
+ ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed,
+ ProcessHeader(first_buffer, &after_header));
+ bytes_decoded_.fetch_add(header_bytes_consumed);
+ auto parser_op =
+ BlockParsingOperator(io_context_, parse_options_, num_csv_cols_,
count_rows_);
+ ARROW_ASSIGN_OR_RAISE(auto decoder_op, BlockDecodingOperator::Make(
+ io_context_, convert_options_,
+ conversion_schema_,
&bytes_decoded_));
+ auto block_gen = SerialBlockReader::MakeAsyncIterator(
+ std::move(buffer_generator), MakeChunker(parse_options_),
std::move(after_header),
+ read_options_.skip_rows_after_names);
+ auto parsed_block_gen =
+ MakeMappedGenerator<ParsedBlock>(std::move(block_gen),
std::move(parser_op));
+ auto rb_gen = MakeMappedGenerator<std::shared_ptr<RecordBatch>>(
+ std::move(parsed_block_gen), decoder_op);
Review comment:
Thanks, added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]