fsaintjacques commented on a change in pull request #7180: URL: https://github.com/apache/arrow/pull/7180#discussion_r427493814
########## File path: cpp/src/arrow/dataset/file_parquet.cc ########## @@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile( return ScanFile(source, std::move(options), std::move(context), {}); } +static inline std::vector<RowGroupInfo> FilterRowGroups( + std::vector<RowGroupInfo> row_groups, const Expression& predicate) { + // Keep the index of the last valid entry. + size_t idx = 0; + for (size_t i = 0; i < row_groups.size(); i++) { + const auto& info = row_groups[i]; + if (info.Satisfy(predicate)) { + row_groups[idx++] = info; + } + } + row_groups.resize(idx); + return row_groups; +} + +static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter( + std::vector<RowGroupInfo> row_groups, const Expression& predicate, + parquet::arrow::FileReader* reader) { + auto metadata = reader->parquet_reader()->metadata(); + auto manifest = reader->manifest(); + auto num_row_groups = metadata->num_row_groups(); + + // Augment a RowGroup with statistics if missing. + auto augment = [&](RowGroupInfo& info) { + auto id = info.id(); + if (!info.HasStatistics() && id < num_row_groups) { + auto row_group = metadata->RowGroup(info.id()); + info.set_num_rows(row_group->num_rows()); + info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest)); + } + }; + + if (row_groups.empty()) { + row_groups = RowGroupInfo::FromCount(num_row_groups); + } + + for (auto& row_group : row_groups) { + augment(row_group); + } + + return FilterRowGroups(std::move(row_groups), predicate); +} + Result<ScanTaskIterator> ParquetFileFormat::ScanFile( const FileSource& source, std::shared_ptr<ScanOptions> options, - std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const { - auto properties = MakeReaderProperties(*this, context->pool); - ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties))); + std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const { + // The following block is required to avoid any IO if all RowGroups are + // excluded due to prior statistics knowledge. + if (!row_groups.empty()) { + // Apply a pre-filtering if the user requested an explicit sub-set of + // row-groups. In the case where a RowGroup doesn't have statistics + // metdata, it will not be excluded. + row_groups = FilterRowGroups(row_groups, *options->filter); + if (row_groups.empty()) { + return MakeEmptyIterator<std::shared_ptr<ScanTask>>(); + } + } - for (int i : row_groups) { - if (i >= reader->metadata()->num_row_groups()) { - return Status::IndexError("trying to scan row group ", i, " but ", source.path(), - " only has ", reader->metadata()->num_row_groups(), + // Open the reader and pay the real IO cost. + ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get())); + + // Ensure RowGroups are indexing valid RowGroups before augmenting. + auto num_row_groups = reader->num_row_groups(); + for (const auto& row_group : row_groups) { + if (row_group.id() >= num_row_groups) { + return Status::IndexError("Trying to scan row group ", row_group.id(), " but ", + source.path(), " only has ", num_row_groups, " row groups"); } } - auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader); - return ParquetScanTaskIterator::Make(std::move(options), std::move(context), - std::move(reader), std::move(arrow_properties), - row_groups); + ARROW_ASSIGN_OR_RAISE(row_groups, + AugmentAndFilter(row_groups, *options->filter, reader.get())); + + if (row_groups.empty()) { + return MakeEmptyIterator<std::shared_ptr<ScanTask>>(); + } + + return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source, + std::move(reader), std::move(row_groups)); } Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment( FileSource source, std::shared_ptr<Expression> partition_expression, - std::vector<int> row_groups) { + std::vector<RowGroupInfo> row_groups) { return std::shared_ptr<FileFragment>( new ParquetFileFragment(std::move(source), shared_from_this(), std::move(partition_expression), std::move(row_groups))); } +Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment( + FileSource source, std::shared_ptr<Expression> partition_expression, + std::vector<int> row_groups) { + return std::shared_ptr<FileFragment>(new ParquetFileFragment( + std::move(source), shared_from_this(), std::move(partition_expression), + RowGroupInfo::FromIdentifiers(row_groups))); +} + Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment( FileSource source, std::shared_ptr<Expression> partition_expression) { return std::shared_ptr<FileFragment>(new ParquetFileFragment( std::move(source), shared_from_this(), std::move(partition_expression), {})); } -Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments( - const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) { - auto properties = MakeReaderProperties(*this); - ARROW_ASSIGN_OR_RAISE(auto reader, - OpenReader(fragment.source(), std::move(properties))); - - auto arrow_properties = - MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader); - auto metadata = reader->metadata(); +/// +/// RowGroupInfo +/// - auto row_groups = fragment.row_groups(); - if (row_groups.empty()) { - row_groups = internal::Iota(metadata->num_row_groups()); +std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> ids) { + std::vector<RowGroupInfo> results; + results.reserve(ids.size()); + for (auto i : ids) { + results.emplace_back(i); } - FragmentVector fragments(row_groups.size()); - - RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties), - std::move(filter), std::move(row_groups)); + return results; +} - for (int i = 0, row_group = skipper.Next(); - row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) { - ARROW_ASSIGN_OR_RAISE( - fragments[i++], - MakeFragment(fragment.source(), fragment.partition_expression(), {row_group})); +std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) { + std::vector<RowGroupInfo> result; + result.reserve(count); + for (int i = 0; i < count; i++) { + result.emplace_back(i); } + return result; +} - return MakeVectorIterator(std::move(fragments)); +bool RowGroupInfo::Satisfy(const Expression& predicate) const { + return !HasStatistics() || predicate.IsSatisfiableWith(statistics_); } +/// +/// ParquetFileFragment +/// + +ParquetFileFragment::ParquetFileFragment(FileSource source, + std::shared_ptr<FileFormat> format, + std::shared_ptr<Expression> partition_expression, + std::vector<RowGroupInfo> row_groups) + : FileFragment(std::move(source), std::move(format), std::move(partition_expression)), + row_groups_(std::move(row_groups)), + parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {} + Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context) { - return parquet_format().ScanFile(source_, std::move(options), std::move(context), - row_groups_); + return parquet_format_.ScanFile(source_, std::move(options), std::move(context), + row_groups_); +} + +Result<FragmentVector> ParquetFileFragment::SplitByRowGroup( + const std::shared_ptr<Expression>& predicate) { + ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_)); + ARROW_ASSIGN_OR_RAISE(auto row_groups, + AugmentAndFilter(row_groups_, *predicate, reader.get())); Review comment: Correct. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org