westonpace commented on a change in pull request #9607:
URL: https://github.com/apache/arrow/pull/9607#discussion_r605535681



##########
File path: cpp/src/arrow/dataset/dataset.cc
##########
@@ -105,60 +109,63 @@ Result<std::shared_ptr<ScannerBuilder>> 
Dataset::NewScan() {
   return NewScan(std::make_shared<ScanOptions>());
 }
 
-Result<FragmentIterator> Dataset::GetFragments() {
+Future<FragmentVector> Dataset::GetFragmentsAsync() {
   ARROW_ASSIGN_OR_RAISE(auto predicate, literal(true).Bind(*schema_));
-  return GetFragments(std::move(predicate));
+  return GetFragmentsAsync(std::move(predicate));
 }
 
-Result<FragmentIterator> Dataset::GetFragments(Expression predicate) {
+Future<FragmentVector> Dataset::GetFragmentsAsync(Expression predicate) {
   ARROW_ASSIGN_OR_RAISE(
       predicate, SimplifyWithGuarantee(std::move(predicate), 
partition_expression_));
   return predicate.IsSatisfiable() ? GetFragmentsImpl(std::move(predicate))
-                                   : 
MakeEmptyIterator<std::shared_ptr<Fragment>>();
+                                   : FragmentVector{};
 }
 
-struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
-  explicit VectorRecordBatchGenerator(RecordBatchVector batches)
+struct VectorRecordBatchVectorFactory : 
InMemoryDataset::RecordBatchVectorFactory {
+  explicit VectorRecordBatchVectorFactory(RecordBatchVector batches)
       : batches_(std::move(batches)) {}
 
-  RecordBatchIterator Get() const final { return MakeVectorIterator(batches_); 
}
+  Result<RecordBatchVector> Get() const final { return batches_; }
 
   RecordBatchVector batches_;
 };
 
 InMemoryDataset::InMemoryDataset(std::shared_ptr<Schema> schema,
                                  RecordBatchVector batches)
     : Dataset(std::move(schema)),
-      get_batches_(new VectorRecordBatchGenerator(std::move(batches))) {}
+      get_batches_(new VectorRecordBatchVectorFactory(std::move(batches))) {}
 
-struct TableRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
-  explicit TableRecordBatchGenerator(std::shared_ptr<Table> table)
+struct TableRecordBatchVectorFactory : 
InMemoryDataset::RecordBatchVectorFactory {
+  explicit TableRecordBatchVectorFactory(std::shared_ptr<Table> table)
       : table_(std::move(table)) {}
 
-  RecordBatchIterator Get() const final {
+  Result<RecordBatchVector> Get() const final {
     auto reader = std::make_shared<TableBatchReader>(*table_);
     auto table = table_;
-    return MakeFunctionIterator([reader, table] { return reader->Next(); });
+    auto iter = MakeFunctionIterator([reader, table] { return reader->Next(); 
});
+    return iter.ToVector();
   }
 
   std::shared_ptr<Table> table_;
 };
 
 InMemoryDataset::InMemoryDataset(std::shared_ptr<Table> table)
     : Dataset(table->schema()),
-      get_batches_(new TableRecordBatchGenerator(std::move(table))) {}
+      get_batches_(new TableRecordBatchVectorFactory(std::move(table))) {}
 
 Result<std::shared_ptr<Dataset>> InMemoryDataset::ReplaceSchema(
     std::shared_ptr<Schema> schema) const {
   RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
   return std::make_shared<InMemoryDataset>(std::move(schema), get_batches_);
 }
 
-Result<FragmentIterator> InMemoryDataset::GetFragmentsImpl(Expression) {
+Future<FragmentVector> InMemoryDataset::GetFragmentsImpl(Expression) {
   auto schema = this->schema();
 
-  auto create_fragment =
-      [schema](std::shared_ptr<RecordBatch> batch) -> 
Result<std::shared_ptr<Fragment>> {
+  // FIXME Need auto here

Review comment:
       Cleaned up InMemoryDataset to get rid of the iterator.

##########
File path: cpp/src/arrow/dataset/dataset_internal.h
##########
@@ -29,34 +29,40 @@
 #include "arrow/record_batch.h"
 #include "arrow/scalar.h"
 #include "arrow/type.h"
+#include "arrow/util/future.h"
 #include "arrow/util/iterator.h"
 #include "arrow/util/optional.h"
+#include "arrow/util/vector.h"
 
 namespace arrow {
 namespace dataset {
 
 /// \brief GetFragmentsFromDatasets transforms a vector<Dataset> into a
 /// flattened FragmentIterator.

Review comment:
       Fixed.

##########
File path: cpp/src/arrow/dataset/dataset_internal.h
##########
@@ -29,34 +29,40 @@
 #include "arrow/record_batch.h"
 #include "arrow/scalar.h"
 #include "arrow/type.h"
+#include "arrow/util/future.h"
 #include "arrow/util/iterator.h"
 #include "arrow/util/optional.h"
+#include "arrow/util/vector.h"
 
 namespace arrow {
 namespace dataset {
 
 /// \brief GetFragmentsFromDatasets transforms a vector<Dataset> into a
 /// flattened FragmentIterator.
-inline Result<FragmentIterator> GetFragmentsFromDatasets(const DatasetVector& 
datasets,
-                                                         Expression predicate) 
{
-  // Iterator<Dataset>
-  auto datasets_it = MakeVectorIterator(datasets);
-
-  // Dataset -> Iterator<Fragment>
-  auto fn = [predicate](std::shared_ptr<Dataset> dataset) -> 
Result<FragmentIterator> {
-    return dataset->GetFragments(predicate);
+inline Future<FragmentVector> GetFragmentsFromDatasets(const DatasetVector& 
datasets,
+                                                       Expression predicate) {
+  // Dataset -> Future<FragmentVector>
+  auto fn = [predicate](std::shared_ptr<Dataset> dataset) -> 
Future<FragmentVector> {
+    return dataset->GetFragmentsAsync(predicate);
   };
 
-  // Iterator<Iterator<Fragment>>
-  auto fragments_it = MakeMaybeMapIterator(fn, std::move(datasets_it));
+  auto fragment_futures = internal::MapVector(fn, datasets);
 
-  // Iterator<Fragment>
-  return MakeFlattenIterator(std::move(fragments_it));
+  return All(fragment_futures)
+      .Then([](const std::vector<Result<FragmentVector>>& fragment_vecs)
+                -> Result<FragmentVector> {
+        ARROW_ASSIGN_OR_RAISE(auto unwrapped_vecs, 
internal::UnwrapOrRaise(fragment_vecs))
+        return internal::FlattenVectors(std::move(unwrapped_vecs));
+      });
 }
 
-inline RecordBatchIterator IteratorFromReader(
-    const std::shared_ptr<RecordBatchReader>& reader) {
-  return MakeFunctionIterator([reader] { return reader->Next(); });
+inline RecordBatchGenerator GeneratorFromReader(

Review comment:
       Good catch.  Removed.

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -640,6 +666,40 @@ class SerialReadaheadGenerator {
   std::shared_ptr<State> state_;
 };
 
+template <typename T>
+class FutureFirstGenerator {
+ public:
+  explicit FutureFirstGenerator(Future<AsyncGenerator<T>> future)
+      : state_(std::make_shared<State>(std::move(future))) {}
+
+  Future<T> operator()() {
+    if (state_->source_) {
+      return state_->source_();
+    } else {
+      auto state = state_;
+      return state_->future_.Then([state](const AsyncGenerator<T>& source) {
+        state->source_ = source;
+        return state->source_();
+      });
+    }
+  }
+
+ private:
+  struct State {
+    explicit State(Future<AsyncGenerator<T>> future) : future_(future), 
source_() {}
+
+    Future<AsyncGenerator<T>> future_;
+    AsyncGenerator<T> source_;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+template <typename T>

Review comment:
       Done.

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -998,6 +1058,19 @@ class MergedGenerator {
   std::shared_ptr<State> state_;
 };
 
+template <typename T>

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to