westonpace commented on a change in pull request #9644:
URL: https://github.com/apache/arrow/pull/9644#discussion_r601747060
##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -672,101 +687,142 @@ class BaseStreamingReader : public ReaderMixin, public
csv::StreamingReader {
std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<RecordBatch> pending_batch_;
- Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+ AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
+ Executor* cpu_executor_;
bool eof_ = false;
};
/////////////////////////////////////////////////////////////////////////
// Serial StreamingReader implementation
-class SerialStreamingReader : public BaseStreamingReader {
+class SerialStreamingReader : public BaseStreamingReader,
+ public
std::enable_shared_from_this<SerialStreamingReader> {
public:
using BaseStreamingReader::BaseStreamingReader;
- Status Init() override {
+ Future<std::shared_ptr<csv::StreamingReader>> Init() override {
ARROW_ASSIGN_OR_RAISE(auto istream_it,
io::MakeInputStreamIterator(input_,
read_options_.block_size));
- // Since we're converting serially, no need to readahead more than one
block
- int32_t block_queue_size = 1;
- ARROW_ASSIGN_OR_RAISE(auto rh_it,
- MakeReadaheadIterator(std::move(istream_it),
block_queue_size));
- buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it));
- task_group_ = internal::TaskGroup::MakeSerial(stop_token_);
+ ARROW_ASSIGN_OR_RAISE(auto bg_it,
MakeBackgroundGenerator(std::move(istream_it),
+
io_context_.executor()));
+
+ auto rh_it = MakeSerialReadaheadGenerator(std::move(bg_it), 8);
Review comment:
I'll update it to be based on the CPU executor parallelism. This will
make it similar to the non-streaming reader. That's a good sane minimum value.
In some (rare?) situations you may benefit from a larger value. I've created
ARROW-12090 in case anyone wants to consider exposing it as an option.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]