westonpace commented on a change in pull request #9607:
URL: https://github.com/apache/arrow/pull/9607#discussion_r605163587
##########
File path: cpp/src/arrow/dataset/dataset.cc
##########
@@ -105,60 +109,63 @@ Result<std::shared_ptr<ScannerBuilder>>
Dataset::NewScan() {
return NewScan(std::make_shared<ScanOptions>());
}
-Result<FragmentIterator> Dataset::GetFragments() {
+Future<FragmentVector> Dataset::GetFragmentsAsync() {
ARROW_ASSIGN_OR_RAISE(auto predicate, literal(true).Bind(*schema_));
- return GetFragments(std::move(predicate));
+ return GetFragmentsAsync(std::move(predicate));
}
-Result<FragmentIterator> Dataset::GetFragments(Expression predicate) {
+Future<FragmentVector> Dataset::GetFragmentsAsync(Expression predicate) {
ARROW_ASSIGN_OR_RAISE(
predicate, SimplifyWithGuarantee(std::move(predicate),
partition_expression_));
return predicate.IsSatisfiable() ? GetFragmentsImpl(std::move(predicate))
- :
MakeEmptyIterator<std::shared_ptr<Fragment>>();
+ : FragmentVector{};
}
-struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
- explicit VectorRecordBatchGenerator(RecordBatchVector batches)
+struct VectorRecordBatchVectorFactory :
InMemoryDataset::RecordBatchVectorFactory {
+ explicit VectorRecordBatchVectorFactory(RecordBatchVector batches)
: batches_(std::move(batches)) {}
- RecordBatchIterator Get() const final { return MakeVectorIterator(batches_);
}
+ Result<RecordBatchVector> Get() const final { return batches_; }
RecordBatchVector batches_;
};
InMemoryDataset::InMemoryDataset(std::shared_ptr<Schema> schema,
RecordBatchVector batches)
: Dataset(std::move(schema)),
- get_batches_(new VectorRecordBatchGenerator(std::move(batches))) {}
+ get_batches_(new VectorRecordBatchVectorFactory(std::move(batches))) {}
-struct TableRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
- explicit TableRecordBatchGenerator(std::shared_ptr<Table> table)
+struct TableRecordBatchVectorFactory :
InMemoryDataset::RecordBatchVectorFactory {
+ explicit TableRecordBatchVectorFactory(std::shared_ptr<Table> table)
: table_(std::move(table)) {}
- RecordBatchIterator Get() const final {
+ Result<RecordBatchVector> Get() const final {
auto reader = std::make_shared<TableBatchReader>(*table_);
auto table = table_;
- return MakeFunctionIterator([reader, table] { return reader->Next(); });
+ auto iter = MakeFunctionIterator([reader, table] { return reader->Next();
});
+ return iter.ToVector();
}
std::shared_ptr<Table> table_;
};
InMemoryDataset::InMemoryDataset(std::shared_ptr<Table> table)
: Dataset(table->schema()),
- get_batches_(new TableRecordBatchGenerator(std::move(table))) {}
+ get_batches_(new TableRecordBatchVectorFactory(std::move(table))) {}
Result<std::shared_ptr<Dataset>> InMemoryDataset::ReplaceSchema(
std::shared_ptr<Schema> schema) const {
RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
return std::make_shared<InMemoryDataset>(std::move(schema), get_batches_);
}
-Result<FragmentIterator> InMemoryDataset::GetFragmentsImpl(Expression) {
+Future<FragmentVector> InMemoryDataset::GetFragmentsImpl(Expression) {
auto schema = this->schema();
- auto create_fragment =
- [schema](std::shared_ptr<RecordBatch> batch) ->
Result<std::shared_ptr<Fragment>> {
+ // FIXME Need auto here
Review comment:
I wasn't entirely sure of the logic behind `get_batches_` but it was
being treated as an iterator before so I kept the semantic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]