westonpace commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r564750134
##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -837,20 +885,121 @@ class ThreadedTableReader : public BaseTableReader {
}
protected:
- ThreadPool* thread_pool_;
+ Executor* thread_pool_;
+ Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+};
+
+class AsyncThreadedTableReader
+ : public BaseTableReader,
+ public std::enable_shared_from_this<AsyncThreadedTableReader> {
+ public:
+ using BaseTableReader::BaseTableReader;
+
+ AsyncThreadedTableReader(MemoryPool* pool, std::shared_ptr<io::InputStream>
input,
+ const ReadOptions& read_options,
+ const ParseOptions& parse_options,
+ const ConvertOptions& convert_options, Executor*
thread_pool)
+ : BaseTableReader(pool, input, read_options, parse_options,
convert_options),
+ thread_pool_(thread_pool) {}
+
+ ~AsyncThreadedTableReader() override {
+ if (task_group_) {
+ // In case of error, make sure all pending tasks are finished before
+ // we start destroying BaseTableReader members
+ ARROW_UNUSED(task_group_->Finish());
+ }
+ }
+
+ Status Init() override {
+ ARROW_ASSIGN_OR_RAISE(auto istream_it,
+ io::MakeInputStreamIterator(input_,
read_options_.block_size));
+
+ ARROW_ASSIGN_OR_RAISE(auto bg_it,
+ MakeBackgroundIterator(std::move(istream_it),
thread_pool_));
+
+ int32_t block_queue_size = thread_pool_->GetCapacity();
+ auto rh_it = AddReadahead(bg_it, block_queue_size);
+ buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(rh_it));
+ return Status::OK();
+ }
+
+ Result<std::shared_ptr<Table>> Read() override { return
ReadAsync().result(); }
+
+ Future<std::shared_ptr<Table>> ReadAsync() override {
+ task_group_ = internal::TaskGroup::MakeThreaded(thread_pool_);
Review comment:
Ben was interested in this as well. However, as it stands, there would
be no clear benefit in doing so and it would require parallel async
implementations of the column builder code (which currently rely on task group
being present). These parse & convert tasks do not block (except for maybe
very briefly on shared collection access) and so using task group here isn't
detrimental.
If we want to remove TaskGroup for the purpose of simplifying down to a
single "scheduler-like" interface then we could do that. However, in that
case, we should modify the existing serial & threaded table readers as well and
I think it would make sense to do it as a separate story.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]