lidavidm commented on a change in pull request #10061:
URL: https://github.com/apache/arrow/pull/10061#discussion_r615874727



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1475,11 +1475,44 @@ static Result<AsyncGenerator<T>> 
MakeBackgroundGenerator(
   return BackgroundGenerator<T>(std::move(iterator), io_executor, max_q, 
q_restart);
 }
 
-/// \see MakeGeneratorIterator
-template <typename T>
+template <typename GeneratorFactory,
+          typename Generator = 
internal::call_traits::return_type<GeneratorFactory>,
+          typename FT = typename Generator::result_type,
+          typename T = typename FT::ValueType>
 class GeneratorIterator {
  public:
-  explicit GeneratorIterator(AsyncGenerator<T> source) : 
source_(std::move(source)) {}
+  explicit GeneratorIterator(GeneratorFactory factory)
+      : proxy_executor_(new internal::ProxyExecutor()) {
+    source_ = factory(proxy_executor_.get());
+  }
+
+  Result<T> Next() {
+    return internal::SerialExecutor::RunInSerialExecutor<T>(
+        [this](internal::Executor* executor) {
+          proxy_executor_->target = executor;
+          return source_();
+        });
+  }
+
+ private:
+  AsyncGenerator<T> source_;
+  std::unique_ptr<internal::ProxyExecutor> proxy_executor_;
+};
+
+template <typename GeneratorFactory,
+          typename Generator = 
internal::call_traits::return_type<GeneratorFactory>,
+          typename FT = typename Generator::result_type,
+          typename T = typename FT::ValueType>
+Result<Iterator<T>> MakeGeneratorIterator(GeneratorFactory factory) {

Review comment:
       It would be nice to explain the purpose of this class and whether it's 
reentrant or not (well, the question is moot presumably because it's blocking). 
This essentially 1) lets you parameterize a generator on an executor (which is 
guaranteed to resolve to a SerialExecutor) and then 2) synchronously blocks on 
the generator via the SerialExecutor, effectively running all work on the main 
thread? 

##########
File path: cpp/src/arrow/util/async_generator_test.cc
##########
@@ -1102,6 +1105,46 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
   ASSERT_TRUE(IsIterationEnd(definitely_last));
 }
 
+class GeneratorIteratorTestFixture : public GeneratorTestFixture {};
+
+TEST_P(GeneratorIteratorTestFixture, Basic) {
+  ASSERT_OK_AND_ASSIGN(
+      auto it, MakeGeneratorIterator(
+                   [this](internal::Executor* executor) -> 
AsyncGenerator<TestInt> {
+                     return MakeSource({1, 2, 3}, executor);
+                   }));
+
+  ASSERT_OK_AND_ASSIGN(auto actual, it.ToVector());
+  ASSERT_EQ(std::vector<TestInt>({1, 2, 3}), actual);
+}
+
+TEST_P(GeneratorIteratorTestFixture, Error) {
+  ASSERT_OK_AND_ASSIGN(
+      auto it, MakeGeneratorIterator(
+                   [this](internal::Executor* executor) -> 
AsyncGenerator<TestInt> {
+                     return FailsAt(MakeSource({1, 2, 3}, executor), 1);
+                   }));
+
+  ASSERT_RAISES(Invalid, it.ToVector());
+}
+
+TEST_P(GeneratorIteratorTestFixture, Transferred) {
+  ASSERT_OK_AND_ASSIGN(auto mock_io_executor, internal::ThreadPool::Make(1));
+  ASSERT_OK_AND_ASSIGN(
+      auto it,
+      MakeGeneratorIterator([this, &mock_io_executor](internal::Executor* 
executor) {
+        auto source = MakeSource({1, 2, 3}, executor);
+        auto to_io = MakeTransferredGenerator(std::move(source), 
mock_io_executor.get());
+        auto back = MakeTransferredGenerator(std::move(to_io), executor);
+        return back;
+      }));
+  ASSERT_OK_AND_ASSIGN(auto actual, it.ToVector());

Review comment:
       Just to check my understanding, one call here (when IsSlow()) will 
synchronously sleep on the CPU executor, attempt to transfer to the I/O 
executor (which is a no-op as the future is already complete), then transfer 
back to the CPU executor (also a no-op)?

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -152,6 +152,9 @@ class ARROW_DS_EXPORT ScanTask {
   /// resulting from the Scan. Execution semantics are encapsulated in the
   /// particular ScanTask implementation
   virtual Result<RecordBatchIterator> Execute() = 0;

Review comment:
       (Note #10062 removes the ScanTask bindings from Python, leaving just 
CGlib.)

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -152,6 +152,9 @@ class ARROW_DS_EXPORT ScanTask {
   /// resulting from the Scan. Execution semantics are encapsulated in the
   /// particular ScanTask implementation
   virtual Result<RecordBatchIterator> Execute() = 0;

Review comment:
       So to make sure, SafeExecute/SafeVisit are just a way to safely 
integrate async readers while we still have SyncScanner?

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1489,17 +1522,23 @@ class GeneratorIterator {
 
 /// \brief Converts an AsyncGenerator<T> to an Iterator<T> by blocking until 
each future
 /// is finished
+///
+/// If this underlying generator transfers to the CPU pool then this blocking 
call will be
+/// considered "nested parallelism" and is not safe to call from a CPU pool 
thread.
+///
+/// To avoid this you can use MakeGeneratorIterator
 template <typename T>
-Result<Iterator<T>> MakeGeneratorIterator(AsyncGenerator<T> source) {
-  return Iterator<T>(GeneratorIterator<T>(std::move(source)));
+Result<Iterator<T>> MakeSimpleGeneratorIterator(AsyncGenerator<T> source) {
+  return Iterator<T>(SimpleGeneratorIterator<T>(std::move(source)));
 }
 
 /// \brief Adds readahead to an iterator using a background thread.
 ///
 /// Under the hood this is converting the iterator to a generator using
 /// MakeBackgroundGenerator, adding readahead to the converted generator with
 /// MakeReadaheadGenerator, and then converting back to an iterator using
-/// MakeGeneratorIterator.
+/// MakeSimpleGeneratorIterator (this is safe because we the generator never

Review comment:
       ```suggestion
   /// MakeSimpleGeneratorIterator (this is safe because the generator never
   ```

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -154,9 +180,26 @@ class CsvScanTask : public ScanTask {
         source_(fragment->source()) {}
 
   Result<RecordBatchIterator> Execute() override {
-    ARROW_ASSIGN_OR_RAISE(auto reader,
-                          OpenReader(source_, *format_, options(), 
options()->pool));
-    return IteratorFromReader(std::move(reader));
+    auto reader_fut = OpenReaderAsync(source_, *format_, options(),
+                                      internal::GetCpuThreadPool(), 
options()->pool);
+    auto reader_gen = GeneratorFromReader(std::move(reader_fut));
+    return MakeSimpleGeneratorIterator(std::move(reader_gen));
+  }
+
+  Future<RecordBatchVector> SafeExecute(internal::Executor* executor) override 
{
+    auto reader_fut =
+        OpenReaderAsync(source_, *format_, options(), executor, 
options()->pool);

Review comment:
       So `executor` here will be a SerialExecutor. Isn't that a behavior 
change? Previously, `StreamingReader::Make` would use the CPU thread pool. Now 
we're having it use the main thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to