lidavidm commented on a change in pull request #10060:
URL: https://github.com/apache/arrow/pull/10060#discussion_r626088532
##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -385,6 +385,23 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
return MakeVectorIterator(std::move(tasks));
}
+util::optional<Future<int64_t>> ParquetFileFormat::CountRows(
+ const std::shared_ptr<FileFragment>& file, Expression predicate,
+ std::shared_ptr<ScanOptions> options) {
+ auto parquet_file =
internal::checked_pointer_cast<ParquetFileFragment>(file);
+ if (FieldsInExpression(predicate).size() > 0) {
Review comment:
It should be doable. However, it'll require rethinking the API a bit:
currently, it assumes we know up front whether we can use the fast path. But to
push down the predicate, we may need to load the footer, so it'll have to go
from optional<Future<int>> to Future<optional<int>>. I think that shouldn't
penalize the slow path really though since either the Future can be constructed
right away, or it'll do work that we need to do later in the scan anyways
(namely load the footer).
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -488,10 +507,37 @@ Result<AsyncGenerator<EnumeratedRecordBatchGenerator>>
FragmentsToBatches(
return MakeMappedGenerator<EnumeratedRecordBatchGenerator>(
std::move(enumerated_fragment_gen),
[scanner](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
- return FragmentToBatches(scanner, fragment);
+ return FragmentToBatches(scanner, fragment, scanner->options());
});
}
+Result<AsyncGenerator<AsyncGenerator<util::optional<int64_t>>>>
FragmentsToRowCount(
+ std::shared_ptr<AsyncScanner> scanner, FragmentGenerator fragment_gen) {
+ // Must use optional<int64_t> to avoid breaking the pipeline on empty batches
+ auto enumerated_fragment_gen =
MakeEnumeratedGenerator(std::move(fragment_gen));
+ auto options = std::make_shared<ScanOptions>(*scanner->options());
+ RETURN_NOT_OK(SetProjection(options.get(), std::vector<std::string>()));
+ auto count_fragment_fn =
+ [scanner, options](const Enumerated<std::shared_ptr<Fragment>>& fragment)
+ -> Result<AsyncGenerator<util::optional<int64_t>>> {
+ auto count = fragment.value->CountRows(options->filter, options);
+ // Fast path
+ if (count.has_value()) {
+ return MakeSingleFutureGenerator(count.value().Then(
+ [](int64_t val) { return util::make_optional<int64_t>(val); }));
Review comment:
Unfortunately GCC isn't happy with this one:
```
In file included from
/home/lidavidm/Code/upstream/arrow-9697/cpp/src/arrow/util/async_generator.h:25,
from
/home/lidavidm/Code/upstream/arrow-9697/cpp/src/arrow/dataset/scanner.h:36,
from
/home/lidavidm/Code/upstream/arrow-9697/cpp/src/arrow/dataset/scanner.cc:18:
/home/lidavidm/Code/upstream/arrow-9697/cpp/src/arrow/util/future.h: In
substitution of 'template<class Signature> using ForSignature =
arrow::detail::ContinueFuture::ForReturn<typename
std::result_of<_Signature>::type> [with Signature =
nonstd::optional_lite::optional<long int> (&(const long int&))(long int&&)]':
/home/lidavidm/Code/upstream/arrow-9697/cpp/src/arrow/util/future.h:491:13:
required from here
/home/lidavidm/Code/upstream/arrow-9697/cpp/src/arrow/util/future.h:67:9:
error: no type named 'type' in 'class
std::result_of<nonstd::optional_lite::optional<long int> (&(const long
int&))(long int&&)>'
67 | using ForSignature = ForReturn<result_of_t<Signature>>;
| ^~~~~~~~~~~~
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]