westonpace commented on a change in pull request #9945:
URL: https://github.com/apache/arrow/pull/9945#discussion_r609982585



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1063,6 +1063,86 @@ AsyncGenerator<T> 
MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
   return MergedGenerator<T>(std::move(source), 1);
 }
 
+template <typename T>
+struct Enumerated {
+  util::optional<T> value;
+  int index;
+  bool last;
+};
+
+template <typename T>
+struct IterationTraits<Enumerated<T>> {
+  static Enumerated<T> End() { return Enumerated<T>{{}, -1, false}; }
+  static bool IsEnd(const Enumerated<T>& val) { return !val.value.has_value(); 
}
+};
+
+/// \see MakeEnumeratedGenerator
+template <typename T>
+class EnumeratingGenerator {
+ public:
+  EnumeratingGenerator(AsyncGenerator<T> source, T initial_value)
+      : state_(std::make_shared<State>(std::move(source), 
std::move(initial_value))) {}
+
+  Future<Enumerated<T>> operator()() {
+    if (state_->finished) {
+      return AsyncGeneratorEnd<Enumerated<T>>();
+    } else {
+      auto state = state_;
+      return state->source().Then([state](const T& next) {
+        auto finished = IsIterationEnd<T>(next);
+        auto prev = Enumerated<T>{state->prev_value, state->prev_index, 
finished};

Review comment:
       I start with `AsyncGenerator<Fragment>` which I enumerate to get 
`AsyncGenerator<Enumerated<Fragment>>`.
   
   Next I map each `Enumerated<Fragment>`.  I call `Scan` to get 
`AsyncGenerator<RecordBatch>` I enumerate that to get 
`AsyncGenerator<Enumerated<RecordBatch>>`.  I then combine that with the 
`Enumerated<Fragment>` to get `AsyncGenerator<EnumeratedRecordBatch>` which is 
defined as...
   
   ```
   struct EnumeratedRecordBatch {
     Enumerated<std::shared_ptr<RecordBatch>> record_batch;
     Enumerated<std::shared_ptr<Fragment>> fragment;
   };
   ```
   
   Each of these generators (one per fragment) are merged together which takes 
me from `AsyncGenerator<AsyncGenerator<EnumeratedRecordBatch>>` to 
`AsyncGenerator<EnumeratedRecordBatch>` which is exactly what I need (per 
ARROW-12288) to satisfy...
   
   ```
   virtual Result<EnumeratedRecordBatchIterator> ScanBatchesUnordered();
   ```
   
   I can then resequence that if the caller desires to get...
   
   ```
   struct TaggedRecordBatch {
     std::shared_ptr<RecordBatch> record_batch;
     std::shared_ptr<Fragment> fragment;
   };
   ```
   
   By the method...
   
   ```
   virtual Result<TaggedRecordBatchIterator> ScanBatches() = 0;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to