n3world commented on a change in pull request #10568: URL: https://github.com/apache/arrow/pull/10568#discussion_r662678017
########## File path: cpp/src/arrow/csv/reader_test.cc ########## @@ -227,37 +289,47 @@ TEST(StreamingReaderTest, BytesRead) { auto read_options = ReadOptions::Defaults(); read_options.block_size = 20; + read_options.use_threads = false; ASSERT_OK_AND_ASSIGN( auto streaming_reader, StreamingReader::Make(io::default_io_context(), input, read_options, ParseOptions::Defaults(), ConvertOptions::Defaults())); std::shared_ptr<RecordBatch> batch; - int64_t bytes = 6; // Size of header + int64_t bytes = 18; // Size of header and first batch do { ASSERT_EQ(bytes, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); bytes += 12; // Add size of each row - } while (batch); + } while (bytes <= 42); + ASSERT_EQ(42, streaming_reader->bytes_read()); + // Should be able to read past the end without bumping bytes_read + ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_EQ(42, streaming_reader->bytes_read()); + ASSERT_EQ(batch.get(), nullptr); } // Interaction of skip_rows and bytes_read() { auto input = std::make_shared<io::BufferReader>(table_buffer); auto read_options = ReadOptions::Defaults(); - read_options.skip_rows = 2; + read_options.skip_rows = 1; + read_options.block_size = 32; ASSERT_OK_AND_ASSIGN( auto streaming_reader, StreamingReader::Make(io::default_io_context(), input, read_options, ParseOptions::Defaults(), ConvertOptions::Defaults())); std::shared_ptr<RecordBatch> batch; - // first two rows and third row as header + // Skip the actual header (6 bytes) and then treat first row as header (12 bytes) + // and then streaming reader reads in first batch (12 bytes) ASSERT_EQ(30, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_NE(batch.get(), nullptr); ASSERT_EQ(42, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); + ASSERT_NE(batch.get(), nullptr); + ASSERT_EQ(42, streaming_reader->bytes_read()); + ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_EQ(batch.get(), nullptr); Review comment: I would say this changes the intent I had for `bytes_read()` when threads are used. The goal was to be able to report progress along with the batch. So that after a batch was retrieved with `ReadNext()` `bytes_read()` could be used to calculate the progress of this batch. In this example the second to last batch would be calculated as 100% complete and this can become more skewed with more read ahead a parallel processing. However with the futures you never know when the record batch is retrieved from the future making it impossible for `bytes_read()` to work that way. My only thought on how to solve this would be to have ReadNextAsync() or a new similar method return a Future on a pair where one of the values was the bytes read so that anybody who actually wants to associate progress with a batch will just use that API. ########## File path: cpp/src/arrow/csv/reader.cc ########## @@ -640,264 +815,112 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader { ///////////////////////////////////////////////////////////////////////// // Base class for streaming readers -class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader { +class StreamingReaderImpl : public ReaderMixin, + public csv::StreamingReader, + public std::enable_shared_from_this<StreamingReaderImpl> { public: - BaseStreamingReader(io::IOContext io_context, Executor* cpu_executor, - std::shared_ptr<io::InputStream> input, + StreamingReaderImpl(io::IOContext io_context, std::shared_ptr<io::InputStream> input, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options, bool count_rows) : ReaderMixin(io_context, std::move(input), read_options, parse_options, convert_options, count_rows), - cpu_executor_(cpu_executor) {} - - virtual Future<std::shared_ptr<csv::StreamingReader>> Init() = 0; - - std::shared_ptr<Schema> schema() const override { return schema_; } - - Status ReadNext(std::shared_ptr<RecordBatch>* batch) override { - auto next_fut = ReadNextAsync(); - auto next_result = next_fut.result(); - return std::move(next_result).Value(batch); - } + bytes_decoded_(0) {} - protected: - // Make column decoders from conversion schema - Status MakeColumnDecoders() { - for (const auto& column : conversion_schema_.columns) { - std::shared_ptr<ColumnDecoder> decoder; - if (column.is_missing) { - ARROW_ASSIGN_OR_RAISE(decoder, ColumnDecoder::MakeNull(io_context_.pool(), - column.type, task_group_)); - } else if (column.type != nullptr) { - ARROW_ASSIGN_OR_RAISE( - decoder, ColumnDecoder::Make(io_context_.pool(), column.type, column.index, - convert_options_, task_group_)); - } else { - ARROW_ASSIGN_OR_RAISE(decoder, - ColumnDecoder::Make(io_context_.pool(), column.index, - convert_options_, task_group_)); - } - column_decoders_.push_back(std::move(decoder)); - } - return Status::OK(); - } - - Result<int64_t> ParseAndInsert(const std::shared_ptr<Buffer>& partial, - const std::shared_ptr<Buffer>& completion, - const std::shared_ptr<Buffer>& block, - int64_t block_index, bool is_final) { - ARROW_ASSIGN_OR_RAISE(auto result, - Parse(partial, completion, block, block_index, is_final)); - RETURN_NOT_OK(ProcessData(result.parser, block_index)); - return result.parsed_bytes; - } - - // Trigger conversion of parsed block data - Status ProcessData(const std::shared_ptr<BlockParser>& parser, int64_t block_index) { - for (auto& decoder : column_decoders_) { - decoder->Insert(block_index, parser); - } - return Status::OK(); - } - - Result<std::shared_ptr<RecordBatch>> DecodeNextBatch() { - DCHECK(!column_decoders_.empty()); - ArrayVector arrays; - arrays.reserve(column_decoders_.size()); - Status st; - for (auto& decoder : column_decoders_) { - auto maybe_array = decoder->NextChunk(); - if (!maybe_array.ok()) { - // If there's an error, still fetch results from other decoders to - // keep them in sync. - st &= maybe_array.status(); - } else { - arrays.push_back(*std::move(maybe_array)); - } - } - RETURN_NOT_OK(st); - DCHECK_EQ(arrays.size(), column_decoders_.size()); - const bool is_null = (arrays[0] == nullptr); -#ifndef NDEBUG - for (const auto& array : arrays) { - DCHECK_EQ(array == nullptr, is_null); - } -#endif - if (is_null) { - eof_ = true; - return nullptr; - } - - if (schema_ == nullptr) { - FieldVector fields(arrays.size()); - for (size_t i = 0; i < arrays.size(); ++i) { - fields[i] = field(conversion_schema_.columns[i].name, arrays[i]->type()); - } - schema_ = arrow::schema(std::move(fields)); - } - const auto n_rows = arrays[0]->length(); - return RecordBatch::Make(schema_, n_rows, std::move(arrays)); - } - - // Column decoders (in ConversionSchema order) - std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_; - std::shared_ptr<Schema> schema_; - std::shared_ptr<RecordBatch> pending_batch_; - AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_; - Executor* cpu_executor_; - bool eof_ = false; -}; - -///////////////////////////////////////////////////////////////////////// -// Serial StreamingReader implementation - -class SerialStreamingReader : public BaseStreamingReader, - public std::enable_shared_from_this<SerialStreamingReader> { - public: - using BaseStreamingReader::BaseStreamingReader; - - Future<std::shared_ptr<csv::StreamingReader>> Init() override { + Future<> Init(Executor* cpu_executor) { ARROW_ASSIGN_OR_RAISE(auto istream_it, io::MakeInputStreamIterator(input_, read_options_.block_size)); // TODO Consider exposing readahead as a read option (ARROW-12090) ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it), io_context_.executor())); - auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_); + auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor); - buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it)); - task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token()); + auto buffer_generator = CSVBufferIterator::MakeAsync(std::move(transferred_it)); + int max_readahead = cpu_executor->GetCapacity(); auto self = shared_from_this(); - // Read schema from first batch - return ReadNextAsync(true).Then( - [self](const std::shared_ptr<RecordBatch>& first_batch) - -> Result<std::shared_ptr<csv::StreamingReader>> { - self->pending_batch_ = first_batch; - DCHECK_NE(self->schema_, nullptr); - return self; - }); - } - Result<std::shared_ptr<RecordBatch>> DecodeBatchAndUpdateSchema() { - auto maybe_batch = DecodeNextBatch(); - if (schema_ == nullptr && maybe_batch.ok()) { - schema_ = (*maybe_batch)->schema(); - } - return maybe_batch; + return buffer_generator().Then([self, buffer_generator, max_readahead]( + const std::shared_ptr<Buffer>& first_buffer) { + return self->InitAfterFirstBuffer(first_buffer, buffer_generator, max_readahead); + }); } - Future<std::shared_ptr<RecordBatch>> DoReadNext( - std::shared_ptr<SerialStreamingReader> self) { - auto batch = std::move(pending_batch_); - if (batch != nullptr) { - return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch); - } + std::shared_ptr<Schema> schema() const override { return schema_; } - if (!source_eof_) { - return block_generator_() - .Then([self](const CSVBlock& maybe_block) -> Status { - if (!IsIterationEnd(maybe_block)) { - self->bytes_parsed_ += maybe_block.bytes_skipped; - self->last_block_index_ = maybe_block.block_index; - auto maybe_parsed = self->ParseAndInsert( - maybe_block.partial, maybe_block.completion, maybe_block.buffer, - maybe_block.block_index, maybe_block.is_final); - if (!maybe_parsed.ok()) { - // Parse error => bail out - self->eof_ = true; - return maybe_parsed.status(); - } - self->bytes_parsed_ += *maybe_parsed; - RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed)); - } else { - self->source_eof_ = true; - for (auto& decoder : self->column_decoders_) { - decoder->SetEOF(self->last_block_index_ + 1); - } - } - return Status::OK(); - }) - .Then([self]() -> Result<std::shared_ptr<RecordBatch>> { - return self->DecodeBatchAndUpdateSchema(); - }); - } - return Future<std::shared_ptr<RecordBatch>>::MakeFinished( - DecodeBatchAndUpdateSchema()); - } + int64_t bytes_read() const override { return bytes_decoded_.load(); } - Future<std::shared_ptr<RecordBatch>> ReadNextSkippingEmpty( - std::shared_ptr<SerialStreamingReader> self, bool internal_read) { - return DoReadNext(self).Then( - [self, internal_read](const std::shared_ptr<RecordBatch>& batch) { - if (batch != nullptr && batch->num_rows() == 0) { - return self->ReadNextSkippingEmpty(self, internal_read); - } - if (!internal_read) { - self->bytes_decoded_ += self->bytes_parsed_; - self->bytes_parsed_ = 0; - } - return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch); - }); + Status ReadNext(std::shared_ptr<RecordBatch>* batch) override { + auto next_fut = ReadNextAsync(); + auto next_result = next_fut.result(); + return std::move(next_result).Value(batch); } Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override { - return ReadNextAsync(false); - }; - - int64_t bytes_read() const override { return bytes_decoded_; } + return record_batch_gen_(); + } protected: - Future<> SetupReader(std::shared_ptr<SerialStreamingReader> self) { - return buffer_generator_().Then([self](const std::shared_ptr<Buffer>& first_buffer) { - if (first_buffer == nullptr) { - return Status::Invalid("Empty CSV file"); - } - auto own_first_buffer = first_buffer; - auto start = own_first_buffer->data(); - RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer)); - self->bytes_decoded_ = own_first_buffer->data() - start; - RETURN_NOT_OK(self->MakeColumnDecoders()); - - self->block_generator_ = SerialBlockReader::MakeAsyncIterator( - std::move(self->buffer_generator_), MakeChunker(self->parse_options_), - std::move(own_first_buffer), self->read_options_.skip_rows_after_names); - return Status::OK(); - }); + Future<> InitAfterFirstBuffer(const std::shared_ptr<Buffer>& first_buffer, + AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator, + int max_readahead) { + if (first_buffer == nullptr) { + return Status::Invalid("Empty CSV file"); + } + + std::shared_ptr<Buffer> after_header; + ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed, + ProcessHeader(first_buffer, &after_header)); + bytes_decoded_.fetch_add(header_bytes_consumed); + auto parser_op = + BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, count_rows_); + ARROW_ASSIGN_OR_RAISE(auto decoder_op, BlockDecodingOperator::Make( + io_context_, convert_options_, + conversion_schema_, &bytes_decoded_)); + auto block_gen = SerialBlockReader::MakeAsyncIterator( + std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header), + read_options_.skip_rows_after_names); + auto parsed_block_gen = + MakeMappedGenerator<ParsedBlock>(std::move(block_gen), std::move(parser_op)); + auto rb_gen = MakeMappedGenerator<std::shared_ptr<RecordBatch>>( + std::move(parsed_block_gen), decoder_op); + auto self = shared_from_this(); + return rb_gen().Then( + [self, rb_gen, max_readahead](const std::shared_ptr<RecordBatch>& first_batch) { + return self->InitAfterFirstBatch(first_batch, std::move(rb_gen), max_readahead); + }); } - Future<std::shared_ptr<RecordBatch>> ReadNextAsync(bool internal_read) { - if (eof_) { - return Future<std::shared_ptr<RecordBatch>>::MakeFinished(nullptr); - } - if (io_context_.stop_token().IsStopRequested()) { - eof_ = true; - return io_context_.stop_token().Poll(); + Status InitAfterFirstBatch(const std::shared_ptr<RecordBatch>& first_batch, + AsyncGenerator<std::shared_ptr<RecordBatch>> batch_gen, + int max_readahead) { + schema_ = first_batch->schema(); + + AsyncGenerator<std::shared_ptr<RecordBatch>> readahead_gen; + if (read_options_.use_threads) { + readahead_gen = MakeReadaheadGenerator(std::move(batch_gen), max_readahead); + } else { + readahead_gen = std::move(batch_gen); } - auto self = shared_from_this(); - if (!block_generator_) { - return SetupReader(self).Then( - [self, internal_read]() -> Future<std::shared_ptr<RecordBatch>> { - return self->ReadNextSkippingEmpty(self, internal_read); - }, - [self](const Status& err) -> Result<std::shared_ptr<RecordBatch>> { - self->eof_ = true; - return err; - }); + + AsyncGenerator<std::shared_ptr<RecordBatch>> restarted_gen; + // Streaming reader should not emit empty record batches + if (first_batch->num_rows() > 0) { + restarted_gen = MakeGeneratorStartsWith({first_batch}, std::move(readahead_gen)); } else { - return self->ReadNextSkippingEmpty(self, internal_read); + restarted_gen = std::move(readahead_gen); } + record_batch_gen_ = + MakeCancellable(std::move(restarted_gen), io_context_.stop_token()); + return Status::OK(); } - bool source_eof_ = false; - int64_t last_block_index_ = 0; - AsyncGenerator<CSVBlock> block_generator_; - // bytes of data parsed but not yet decoded - int64_t bytes_parsed_ = 0; + std::shared_ptr<Schema> schema_; + AsyncGenerator<std::shared_ptr<RecordBatch>> record_batch_gen_; // bytes which have been decoded for caller - int64_t bytes_decoded_ = 0; -}; + std::atomic<int64_t> bytes_decoded_; +}; // namespace Review comment: I don't think this is the end of a namespace ########## File path: cpp/src/arrow/csv/reader.cc ########## @@ -640,264 +815,112 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader { ///////////////////////////////////////////////////////////////////////// // Base class for streaming readers -class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader { +class StreamingReaderImpl : public ReaderMixin, + public csv::StreamingReader, + public std::enable_shared_from_this<StreamingReaderImpl> { public: - BaseStreamingReader(io::IOContext io_context, Executor* cpu_executor, - std::shared_ptr<io::InputStream> input, + StreamingReaderImpl(io::IOContext io_context, std::shared_ptr<io::InputStream> input, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options, bool count_rows) : ReaderMixin(io_context, std::move(input), read_options, parse_options, convert_options, count_rows), - cpu_executor_(cpu_executor) {} - - virtual Future<std::shared_ptr<csv::StreamingReader>> Init() = 0; - - std::shared_ptr<Schema> schema() const override { return schema_; } - - Status ReadNext(std::shared_ptr<RecordBatch>* batch) override { - auto next_fut = ReadNextAsync(); - auto next_result = next_fut.result(); - return std::move(next_result).Value(batch); - } + bytes_decoded_(0) {} - protected: - // Make column decoders from conversion schema - Status MakeColumnDecoders() { - for (const auto& column : conversion_schema_.columns) { - std::shared_ptr<ColumnDecoder> decoder; - if (column.is_missing) { - ARROW_ASSIGN_OR_RAISE(decoder, ColumnDecoder::MakeNull(io_context_.pool(), - column.type, task_group_)); - } else if (column.type != nullptr) { - ARROW_ASSIGN_OR_RAISE( - decoder, ColumnDecoder::Make(io_context_.pool(), column.type, column.index, - convert_options_, task_group_)); - } else { - ARROW_ASSIGN_OR_RAISE(decoder, - ColumnDecoder::Make(io_context_.pool(), column.index, - convert_options_, task_group_)); - } - column_decoders_.push_back(std::move(decoder)); - } - return Status::OK(); - } - - Result<int64_t> ParseAndInsert(const std::shared_ptr<Buffer>& partial, - const std::shared_ptr<Buffer>& completion, - const std::shared_ptr<Buffer>& block, - int64_t block_index, bool is_final) { - ARROW_ASSIGN_OR_RAISE(auto result, - Parse(partial, completion, block, block_index, is_final)); - RETURN_NOT_OK(ProcessData(result.parser, block_index)); - return result.parsed_bytes; - } - - // Trigger conversion of parsed block data - Status ProcessData(const std::shared_ptr<BlockParser>& parser, int64_t block_index) { - for (auto& decoder : column_decoders_) { - decoder->Insert(block_index, parser); - } - return Status::OK(); - } - - Result<std::shared_ptr<RecordBatch>> DecodeNextBatch() { - DCHECK(!column_decoders_.empty()); - ArrayVector arrays; - arrays.reserve(column_decoders_.size()); - Status st; - for (auto& decoder : column_decoders_) { - auto maybe_array = decoder->NextChunk(); - if (!maybe_array.ok()) { - // If there's an error, still fetch results from other decoders to - // keep them in sync. - st &= maybe_array.status(); - } else { - arrays.push_back(*std::move(maybe_array)); - } - } - RETURN_NOT_OK(st); - DCHECK_EQ(arrays.size(), column_decoders_.size()); - const bool is_null = (arrays[0] == nullptr); -#ifndef NDEBUG - for (const auto& array : arrays) { - DCHECK_EQ(array == nullptr, is_null); - } -#endif - if (is_null) { - eof_ = true; - return nullptr; - } - - if (schema_ == nullptr) { - FieldVector fields(arrays.size()); - for (size_t i = 0; i < arrays.size(); ++i) { - fields[i] = field(conversion_schema_.columns[i].name, arrays[i]->type()); - } - schema_ = arrow::schema(std::move(fields)); - } - const auto n_rows = arrays[0]->length(); - return RecordBatch::Make(schema_, n_rows, std::move(arrays)); - } - - // Column decoders (in ConversionSchema order) - std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_; - std::shared_ptr<Schema> schema_; - std::shared_ptr<RecordBatch> pending_batch_; - AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_; - Executor* cpu_executor_; - bool eof_ = false; -}; - -///////////////////////////////////////////////////////////////////////// -// Serial StreamingReader implementation - -class SerialStreamingReader : public BaseStreamingReader, - public std::enable_shared_from_this<SerialStreamingReader> { - public: - using BaseStreamingReader::BaseStreamingReader; - - Future<std::shared_ptr<csv::StreamingReader>> Init() override { + Future<> Init(Executor* cpu_executor) { ARROW_ASSIGN_OR_RAISE(auto istream_it, io::MakeInputStreamIterator(input_, read_options_.block_size)); // TODO Consider exposing readahead as a read option (ARROW-12090) ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it), io_context_.executor())); - auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_); + auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor); - buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it)); - task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token()); + auto buffer_generator = CSVBufferIterator::MakeAsync(std::move(transferred_it)); + int max_readahead = cpu_executor->GetCapacity(); auto self = shared_from_this(); - // Read schema from first batch - return ReadNextAsync(true).Then( - [self](const std::shared_ptr<RecordBatch>& first_batch) - -> Result<std::shared_ptr<csv::StreamingReader>> { - self->pending_batch_ = first_batch; - DCHECK_NE(self->schema_, nullptr); - return self; - }); - } - Result<std::shared_ptr<RecordBatch>> DecodeBatchAndUpdateSchema() { - auto maybe_batch = DecodeNextBatch(); - if (schema_ == nullptr && maybe_batch.ok()) { - schema_ = (*maybe_batch)->schema(); - } - return maybe_batch; + return buffer_generator().Then([self, buffer_generator, max_readahead]( + const std::shared_ptr<Buffer>& first_buffer) { + return self->InitAfterFirstBuffer(first_buffer, buffer_generator, max_readahead); + }); } - Future<std::shared_ptr<RecordBatch>> DoReadNext( - std::shared_ptr<SerialStreamingReader> self) { - auto batch = std::move(pending_batch_); - if (batch != nullptr) { - return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch); - } + std::shared_ptr<Schema> schema() const override { return schema_; } - if (!source_eof_) { - return block_generator_() - .Then([self](const CSVBlock& maybe_block) -> Status { - if (!IsIterationEnd(maybe_block)) { - self->bytes_parsed_ += maybe_block.bytes_skipped; - self->last_block_index_ = maybe_block.block_index; - auto maybe_parsed = self->ParseAndInsert( - maybe_block.partial, maybe_block.completion, maybe_block.buffer, - maybe_block.block_index, maybe_block.is_final); - if (!maybe_parsed.ok()) { - // Parse error => bail out - self->eof_ = true; - return maybe_parsed.status(); - } - self->bytes_parsed_ += *maybe_parsed; - RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed)); - } else { - self->source_eof_ = true; - for (auto& decoder : self->column_decoders_) { - decoder->SetEOF(self->last_block_index_ + 1); - } - } - return Status::OK(); - }) - .Then([self]() -> Result<std::shared_ptr<RecordBatch>> { - return self->DecodeBatchAndUpdateSchema(); - }); - } - return Future<std::shared_ptr<RecordBatch>>::MakeFinished( - DecodeBatchAndUpdateSchema()); - } + int64_t bytes_read() const override { return bytes_decoded_.load(); } - Future<std::shared_ptr<RecordBatch>> ReadNextSkippingEmpty( - std::shared_ptr<SerialStreamingReader> self, bool internal_read) { - return DoReadNext(self).Then( - [self, internal_read](const std::shared_ptr<RecordBatch>& batch) { - if (batch != nullptr && batch->num_rows() == 0) { - return self->ReadNextSkippingEmpty(self, internal_read); - } - if (!internal_read) { - self->bytes_decoded_ += self->bytes_parsed_; - self->bytes_parsed_ = 0; - } - return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch); - }); + Status ReadNext(std::shared_ptr<RecordBatch>* batch) override { + auto next_fut = ReadNextAsync(); + auto next_result = next_fut.result(); + return std::move(next_result).Value(batch); } Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override { - return ReadNextAsync(false); - }; - - int64_t bytes_read() const override { return bytes_decoded_; } + return record_batch_gen_(); + } protected: - Future<> SetupReader(std::shared_ptr<SerialStreamingReader> self) { - return buffer_generator_().Then([self](const std::shared_ptr<Buffer>& first_buffer) { - if (first_buffer == nullptr) { - return Status::Invalid("Empty CSV file"); - } - auto own_first_buffer = first_buffer; - auto start = own_first_buffer->data(); - RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer)); - self->bytes_decoded_ = own_first_buffer->data() - start; - RETURN_NOT_OK(self->MakeColumnDecoders()); - - self->block_generator_ = SerialBlockReader::MakeAsyncIterator( - std::move(self->buffer_generator_), MakeChunker(self->parse_options_), - std::move(own_first_buffer), self->read_options_.skip_rows_after_names); - return Status::OK(); - }); + Future<> InitAfterFirstBuffer(const std::shared_ptr<Buffer>& first_buffer, + AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator, + int max_readahead) { + if (first_buffer == nullptr) { + return Status::Invalid("Empty CSV file"); + } + + std::shared_ptr<Buffer> after_header; + ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed, + ProcessHeader(first_buffer, &after_header)); + bytes_decoded_.fetch_add(header_bytes_consumed); + auto parser_op = + BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, count_rows_); + ARROW_ASSIGN_OR_RAISE(auto decoder_op, BlockDecodingOperator::Make( + io_context_, convert_options_, + conversion_schema_, &bytes_decoded_)); + auto block_gen = SerialBlockReader::MakeAsyncIterator( + std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header), + read_options_.skip_rows_after_names); + auto parsed_block_gen = + MakeMappedGenerator<ParsedBlock>(std::move(block_gen), std::move(parser_op)); + auto rb_gen = MakeMappedGenerator<std::shared_ptr<RecordBatch>>( + std::move(parsed_block_gen), decoder_op); Review comment: Could use `std::move(decoder_op)` ########## File path: python/pyarrow/tests/test_csv.py ########## @@ -1507,8 +1488,8 @@ def test_stress_block_sizes(self): class TestSerialStreamingCSVRead(BaseTestStreamingCSVRead, unittest.TestCase): def open_csv(self, *args, **kwargs): - read_options = kwargs.setdefault('read_options', ReadOptions()) - read_options.use_threads = False + # read_options = kwargs.setdefault('read_options', ReadOptions()) + # read_options.use_threads = False Review comment: Is this still to test SerialStreamingCSV? Should there be two classes so that all test get run for serial and non serial? ########## File path: cpp/src/arrow/csv/reader.cc ########## @@ -546,6 +719,8 @@ class ReaderMixin { return ParseResult{std::move(parser), static_cast<int64_t>(parsed_size)}; } + friend class HeaderParsingOperator; Review comment: What is this class? ########## File path: cpp/src/arrow/csv/reader.cc ########## @@ -347,6 +348,175 @@ class ThreadedBlockReader : public BlockReader { } }; +struct ParsedBlock { + std::shared_ptr<BlockParser> parser; + int64_t block_index; + int64_t bytes_parsed_or_skipped; +}; + +} // namespace + +} // namespace csv + +template <> +struct IterationTraits<csv::ParsedBlock> { + static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; } + static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; } +}; + +namespace csv { + +namespace { + +// A functor that takes in a buffer of CSV data and returns a parsed batch of CSV data. +// The parsed batch contains a list of offsets for each of the columns so that columns +// can be individually scanned +// +// This operator is not re-entrant +class BlockParsingOperator { + public: + BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options, + int num_csv_cols, bool count_rows) + : io_context_(io_context), + parse_options_(parse_options), + num_csv_cols_(num_csv_cols), + count_rows_(count_rows) {} + + Result<ParsedBlock> operator()(const CSVBlock& block) { + static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max(); + auto parser = std::make_shared<BlockParser>( + io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows); + + std::shared_ptr<Buffer> straddling; + std::vector<util::string_view> views; + if (block.partial->size() != 0 || block.completion->size() != 0) { + if (block.partial->size() == 0) { + straddling = block.completion; + } else if (block.completion->size() == 0) { + straddling = block.partial; + } else { + ARROW_ASSIGN_OR_RAISE( + straddling, + ConcatenateBuffers({block.partial, block.completion}, io_context_.pool())); + } + views = {util::string_view(*straddling), util::string_view(*block.buffer)}; + } else { + views = {util::string_view(*block.buffer)}; + } + uint32_t parsed_size; + if (block.is_final) { + RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size)); + } else { + RETURN_NOT_OK(parser->Parse(views, &parsed_size)); + } + if (count_rows_) { + num_rows_seen_ += parser->num_rows(); + } + RETURN_NOT_OK(block.consume_bytes(parsed_size)); + return ParsedBlock{std::move(parser), block.block_index, + static_cast<int64_t>(parsed_size) + block.bytes_skipped}; + } + + private: + io::IOContext io_context_; + ParseOptions parse_options_; + int num_csv_cols_; + bool count_rows_; + int num_rows_seen_ = 0; +}; + +class BlockDecodingOperator { + public: + Future<std::shared_ptr<RecordBatch>> operator()(const ParsedBlock& block) { + DCHECK(!state_->column_decoders.empty()); + std::vector<Future<std::shared_ptr<Array>>> decoded_array_futs; + for (auto& decoder : state_->column_decoders) { + decoded_array_futs.push_back(decoder->Decode(block.parser)); + } + auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped; + auto decoded_arrays_fut = All(decoded_array_futs); + auto state = state_; + return decoded_arrays_fut.Then( + [state, bytes_parsed_or_skipped]( + const std::vector<Result<std::shared_ptr<Array>>>& maybe_decoded_arrays) + -> Result<std::shared_ptr<RecordBatch>> { + state->bytes_decoded_->fetch_add(bytes_parsed_or_skipped); + ARROW_ASSIGN_OR_RAISE(auto decoded_arrays, + internal::UnwrapOrRaise(maybe_decoded_arrays)); + return state->DecodedArraysToBatch(decoded_arrays); + }); + } + + static Result<BlockDecodingOperator> Make(io::IOContext io_context, + ConvertOptions convert_options, + ConversionSchema conversion_schema, + std::atomic<int64_t>* bytes_decoded) { + BlockDecodingOperator op(std::move(io_context), std::move(convert_options), + std::move(conversion_schema), bytes_decoded); + RETURN_NOT_OK(op.state_->MakeColumnDecoders()); + return op; + } + + private: + BlockDecodingOperator(io::IOContext io_context, ConvertOptions convert_options, + ConversionSchema conversion_schema, + std::atomic<int64_t>* bytes_decoded) + : state_(std::make_shared<State>(std::move(io_context), std::move(convert_options), + std::move(conversion_schema), bytes_decoded)) {} + + struct State { + State(io::IOContext io_context, ConvertOptions convert_options, + ConversionSchema conversion_schema, std::atomic<int64_t>* bytes_decoded) + : io_context(std::move(io_context)), + convert_options(std::move(convert_options)), + conversion_schema(std::move(conversion_schema)), + bytes_decoded_(bytes_decoded) {} + + Result<std::shared_ptr<RecordBatch>> DecodedArraysToBatch( + std::vector<std::shared_ptr<Array>>& arrays) { + if (schema == nullptr) { + FieldVector fields(arrays.size()); + for (size_t i = 0; i < arrays.size(); ++i) { + fields[i] = field(conversion_schema.columns[i].name, arrays[i]->type()); + } + schema = arrow::schema(std::move(fields)); + } + const auto n_rows = arrays[0]->length(); + return RecordBatch::Make(schema, n_rows, std::move(arrays)); + } + + // Make column decoders from conversion schema + Status MakeColumnDecoders() { + for (const auto& column : conversion_schema.columns) { + std::shared_ptr<ColumnDecoder> decoder; + if (column.is_missing) { + ARROW_ASSIGN_OR_RAISE(decoder, + ColumnDecoder::MakeNull(io_context.pool(), column.type)); + } else if (column.type != nullptr) { + ARROW_ASSIGN_OR_RAISE( + decoder, ColumnDecoder::Make(io_context.pool(), column.type, column.index, + convert_options)); + } else { + ARROW_ASSIGN_OR_RAISE( + decoder, + ColumnDecoder::Make(io_context.pool(), column.index, convert_options)); + } + column_decoders.push_back(std::move(decoder)); + } + return Status::OK(); + } + + io::IOContext io_context; Review comment: This is only used by MakeColumnDecoders and probably could be passed in as an argument to just that method. ########## File path: cpp/src/arrow/csv/reader.cc ########## @@ -347,6 +348,175 @@ class ThreadedBlockReader : public BlockReader { } }; +struct ParsedBlock { + std::shared_ptr<BlockParser> parser; + int64_t block_index; + int64_t bytes_parsed_or_skipped; +}; + +} // namespace + +} // namespace csv + +template <> +struct IterationTraits<csv::ParsedBlock> { + static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; } + static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; } +}; + +namespace csv { + +namespace { + +// A functor that takes in a buffer of CSV data and returns a parsed batch of CSV data. +// The parsed batch contains a list of offsets for each of the columns so that columns +// can be individually scanned +// +// This operator is not re-entrant +class BlockParsingOperator { + public: + BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options, + int num_csv_cols, bool count_rows) + : io_context_(io_context), + parse_options_(parse_options), + num_csv_cols_(num_csv_cols), + count_rows_(count_rows) {} + + Result<ParsedBlock> operator()(const CSVBlock& block) { + static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max(); + auto parser = std::make_shared<BlockParser>( + io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows); + + std::shared_ptr<Buffer> straddling; + std::vector<util::string_view> views; + if (block.partial->size() != 0 || block.completion->size() != 0) { + if (block.partial->size() == 0) { + straddling = block.completion; + } else if (block.completion->size() == 0) { + straddling = block.partial; + } else { + ARROW_ASSIGN_OR_RAISE( + straddling, + ConcatenateBuffers({block.partial, block.completion}, io_context_.pool())); + } + views = {util::string_view(*straddling), util::string_view(*block.buffer)}; + } else { + views = {util::string_view(*block.buffer)}; + } + uint32_t parsed_size; + if (block.is_final) { + RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size)); + } else { + RETURN_NOT_OK(parser->Parse(views, &parsed_size)); + } + if (count_rows_) { + num_rows_seen_ += parser->num_rows(); + } + RETURN_NOT_OK(block.consume_bytes(parsed_size)); + return ParsedBlock{std::move(parser), block.block_index, + static_cast<int64_t>(parsed_size) + block.bytes_skipped}; + } + + private: + io::IOContext io_context_; + ParseOptions parse_options_; + int num_csv_cols_; + bool count_rows_; + int num_rows_seen_ = 0; +}; + +class BlockDecodingOperator { + public: + Future<std::shared_ptr<RecordBatch>> operator()(const ParsedBlock& block) { + DCHECK(!state_->column_decoders.empty()); + std::vector<Future<std::shared_ptr<Array>>> decoded_array_futs; + for (auto& decoder : state_->column_decoders) { + decoded_array_futs.push_back(decoder->Decode(block.parser)); + } + auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped; + auto decoded_arrays_fut = All(decoded_array_futs); + auto state = state_; + return decoded_arrays_fut.Then( + [state, bytes_parsed_or_skipped]( + const std::vector<Result<std::shared_ptr<Array>>>& maybe_decoded_arrays) + -> Result<std::shared_ptr<RecordBatch>> { + state->bytes_decoded_->fetch_add(bytes_parsed_or_skipped); + ARROW_ASSIGN_OR_RAISE(auto decoded_arrays, + internal::UnwrapOrRaise(maybe_decoded_arrays)); + return state->DecodedArraysToBatch(decoded_arrays); + }); + } + + static Result<BlockDecodingOperator> Make(io::IOContext io_context, + ConvertOptions convert_options, + ConversionSchema conversion_schema, + std::atomic<int64_t>* bytes_decoded) { + BlockDecodingOperator op(std::move(io_context), std::move(convert_options), + std::move(conversion_schema), bytes_decoded); + RETURN_NOT_OK(op.state_->MakeColumnDecoders()); + return op; + } + + private: + BlockDecodingOperator(io::IOContext io_context, ConvertOptions convert_options, + ConversionSchema conversion_schema, + std::atomic<int64_t>* bytes_decoded) + : state_(std::make_shared<State>(std::move(io_context), std::move(convert_options), + std::move(conversion_schema), bytes_decoded)) {} + + struct State { + State(io::IOContext io_context, ConvertOptions convert_options, + ConversionSchema conversion_schema, std::atomic<int64_t>* bytes_decoded) + : io_context(std::move(io_context)), + convert_options(std::move(convert_options)), + conversion_schema(std::move(conversion_schema)), + bytes_decoded_(bytes_decoded) {} + + Result<std::shared_ptr<RecordBatch>> DecodedArraysToBatch( + std::vector<std::shared_ptr<Array>>& arrays) { + if (schema == nullptr) { + FieldVector fields(arrays.size()); + for (size_t i = 0; i < arrays.size(); ++i) { + fields[i] = field(conversion_schema.columns[i].name, arrays[i]->type()); + } + schema = arrow::schema(std::move(fields)); + } + const auto n_rows = arrays[0]->length(); + return RecordBatch::Make(schema, n_rows, std::move(arrays)); + } + + // Make column decoders from conversion schema + Status MakeColumnDecoders() { + for (const auto& column : conversion_schema.columns) { + std::shared_ptr<ColumnDecoder> decoder; + if (column.is_missing) { + ARROW_ASSIGN_OR_RAISE(decoder, + ColumnDecoder::MakeNull(io_context.pool(), column.type)); + } else if (column.type != nullptr) { + ARROW_ASSIGN_OR_RAISE( + decoder, ColumnDecoder::Make(io_context.pool(), column.type, column.index, + convert_options)); + } else { + ARROW_ASSIGN_OR_RAISE( + decoder, + ColumnDecoder::Make(io_context.pool(), column.index, convert_options)); + } + column_decoders.push_back(std::move(decoder)); + } + return Status::OK(); + } + + io::IOContext io_context; + ConvertOptions convert_options; + ConversionSchema conversion_schema; + std::vector<std::shared_ptr<ColumnDecoder>> column_decoders; + std::shared_ptr<Schema> schema; + std::atomic<int64_t>* bytes_decoded_; Review comment: Should the `_` suffix be removed here like the other struct members? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org