lidavidm commented on a change in pull request #10061:
URL: https://github.com/apache/arrow/pull/10061#discussion_r615874727
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1475,11 +1475,44 @@ static Result<AsyncGenerator<T>>
MakeBackgroundGenerator(
return BackgroundGenerator<T>(std::move(iterator), io_executor, max_q,
q_restart);
}
-/// \see MakeGeneratorIterator
-template <typename T>
+template <typename GeneratorFactory,
+ typename Generator =
internal::call_traits::return_type<GeneratorFactory>,
+ typename FT = typename Generator::result_type,
+ typename T = typename FT::ValueType>
class GeneratorIterator {
public:
- explicit GeneratorIterator(AsyncGenerator<T> source) :
source_(std::move(source)) {}
+ explicit GeneratorIterator(GeneratorFactory factory)
+ : proxy_executor_(new internal::ProxyExecutor()) {
+ source_ = factory(proxy_executor_.get());
+ }
+
+ Result<T> Next() {
+ return internal::SerialExecutor::RunInSerialExecutor<T>(
+ [this](internal::Executor* executor) {
+ proxy_executor_->target = executor;
+ return source_();
+ });
+ }
+
+ private:
+ AsyncGenerator<T> source_;
+ std::unique_ptr<internal::ProxyExecutor> proxy_executor_;
+};
+
+template <typename GeneratorFactory,
+ typename Generator =
internal::call_traits::return_type<GeneratorFactory>,
+ typename FT = typename Generator::result_type,
+ typename T = typename FT::ValueType>
+Result<Iterator<T>> MakeGeneratorIterator(GeneratorFactory factory) {
Review comment:
It would be nice to explain the purpose of this class and whether it's
reentrant or not (well, the question is moot presumably because it's blocking).
This essentially 1) lets you parameterize a generator on an executor (which is
guaranteed to resolve to a SerialExecutor) and then 2) synchronously blocks on
the generator via the SerialExecutor, effectively running all work on the main
thread?
##########
File path: cpp/src/arrow/util/async_generator_test.cc
##########
@@ -1102,6 +1105,46 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
ASSERT_TRUE(IsIterationEnd(definitely_last));
}
+class GeneratorIteratorTestFixture : public GeneratorTestFixture {};
+
+TEST_P(GeneratorIteratorTestFixture, Basic) {
+ ASSERT_OK_AND_ASSIGN(
+ auto it, MakeGeneratorIterator(
+ [this](internal::Executor* executor) ->
AsyncGenerator<TestInt> {
+ return MakeSource({1, 2, 3}, executor);
+ }));
+
+ ASSERT_OK_AND_ASSIGN(auto actual, it.ToVector());
+ ASSERT_EQ(std::vector<TestInt>({1, 2, 3}), actual);
+}
+
+TEST_P(GeneratorIteratorTestFixture, Error) {
+ ASSERT_OK_AND_ASSIGN(
+ auto it, MakeGeneratorIterator(
+ [this](internal::Executor* executor) ->
AsyncGenerator<TestInt> {
+ return FailsAt(MakeSource({1, 2, 3}, executor), 1);
+ }));
+
+ ASSERT_RAISES(Invalid, it.ToVector());
+}
+
+TEST_P(GeneratorIteratorTestFixture, Transferred) {
+ ASSERT_OK_AND_ASSIGN(auto mock_io_executor, internal::ThreadPool::Make(1));
+ ASSERT_OK_AND_ASSIGN(
+ auto it,
+ MakeGeneratorIterator([this, &mock_io_executor](internal::Executor*
executor) {
+ auto source = MakeSource({1, 2, 3}, executor);
+ auto to_io = MakeTransferredGenerator(std::move(source),
mock_io_executor.get());
+ auto back = MakeTransferredGenerator(std::move(to_io), executor);
+ return back;
+ }));
+ ASSERT_OK_AND_ASSIGN(auto actual, it.ToVector());
Review comment:
Just to check my understanding, one call here (when IsSlow()) will
synchronously sleep on the CPU executor, attempt to transfer to the I/O
executor (which is a no-op as the future is already complete), then transfer
back to the CPU executor (also a no-op)?
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -152,6 +152,9 @@ class ARROW_DS_EXPORT ScanTask {
/// resulting from the Scan. Execution semantics are encapsulated in the
/// particular ScanTask implementation
virtual Result<RecordBatchIterator> Execute() = 0;
Review comment:
(Note #10062 removes the ScanTask bindings from Python, leaving just
CGlib.)
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -152,6 +152,9 @@ class ARROW_DS_EXPORT ScanTask {
/// resulting from the Scan. Execution semantics are encapsulated in the
/// particular ScanTask implementation
virtual Result<RecordBatchIterator> Execute() = 0;
Review comment:
So to make sure, SafeExecute/SafeVisit are just a way to safely
integrate async readers while we still have SyncScanner?
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1489,17 +1522,23 @@ class GeneratorIterator {
/// \brief Converts an AsyncGenerator<T> to an Iterator<T> by blocking until
each future
/// is finished
+///
+/// If this underlying generator transfers to the CPU pool then this blocking
call will be
+/// considered "nested parallelism" and is not safe to call from a CPU pool
thread.
+///
+/// To avoid this you can use MakeGeneratorIterator
template <typename T>
-Result<Iterator<T>> MakeGeneratorIterator(AsyncGenerator<T> source) {
- return Iterator<T>(GeneratorIterator<T>(std::move(source)));
+Result<Iterator<T>> MakeSimpleGeneratorIterator(AsyncGenerator<T> source) {
+ return Iterator<T>(SimpleGeneratorIterator<T>(std::move(source)));
}
/// \brief Adds readahead to an iterator using a background thread.
///
/// Under the hood this is converting the iterator to a generator using
/// MakeBackgroundGenerator, adding readahead to the converted generator with
/// MakeReadaheadGenerator, and then converting back to an iterator using
-/// MakeGeneratorIterator.
+/// MakeSimpleGeneratorIterator (this is safe because we the generator never
Review comment:
```suggestion
/// MakeSimpleGeneratorIterator (this is safe because the generator never
```
##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -154,9 +180,26 @@ class CsvScanTask : public ScanTask {
source_(fragment->source()) {}
Result<RecordBatchIterator> Execute() override {
- ARROW_ASSIGN_OR_RAISE(auto reader,
- OpenReader(source_, *format_, options(),
options()->pool));
- return IteratorFromReader(std::move(reader));
+ auto reader_fut = OpenReaderAsync(source_, *format_, options(),
+ internal::GetCpuThreadPool(),
options()->pool);
+ auto reader_gen = GeneratorFromReader(std::move(reader_fut));
+ return MakeSimpleGeneratorIterator(std::move(reader_gen));
+ }
+
+ Future<RecordBatchVector> SafeExecute(internal::Executor* executor) override
{
+ auto reader_fut =
+ OpenReaderAsync(source_, *format_, options(), executor,
options()->pool);
Review comment:
So `executor` here will be a SerialExecutor. Isn't that a behavior
change? Previously, `StreamingReader::Make` would use the CPU thread pool. Now
we're having it use the main thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]