bkietz commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452490750
##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource
source,
std::move(physical_schema)),
row_groups_(std::move(row_groups)),
parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
- has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+ has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+ physical_schema_ != nullptr) {}
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
- const std::shared_ptr<Expression>& predicate) {
- auto simplified_predicate = predicate->Assume(partition_expression());
- if (!simplified_predicate->IsSatisfiable()) {
- return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader*
reader) {
+ if (HasCompleteMetadata()) {
+ return Status::OK();
}
- std::vector<RowGroupInfo> row_groups;
- if (HasCompleteMetadata()) {
- row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
- } else {
+ if (reader == nullptr) {
ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
- ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_,
reader.get()));
- row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+ return EnsureCompleteMetadata(reader.get());
+ }
+
+ std::shared_ptr<Schema> schema;
+ RETURN_NOT_OK(reader->GetSchema(&schema));
+ if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+ return Status::Invalid("Fragment initialized with physical schema ",
+ *physical_schema_, " but ", source_.path(), " has
schema ",
+ *schema);
+ }
+ physical_schema_ = std::move(schema);
+
+ auto metadata = reader->parquet_reader()->metadata();
+ auto num_row_groups = metadata->num_row_groups();
+
+ if (row_groups_.empty()) {
+ row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+ }
+
+ for (RowGroupInfo& info : row_groups_) {
+ // Ensure RowGroups are indexing valid RowGroups before augmenting.
+ if (info.id() >= num_row_groups) {
+ return Status::IndexError("Trying to scan row group ", info.id(), " but
",
+ source_.path(), " only has ", num_row_groups,
+ " row groups");
+ }
+ }
+
+ for (RowGroupInfo& info : row_groups_) {
Review comment:
If you prefer I can use an index and assign to a vector element that way
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]