bkietz commented on a change in pull request #7692:
URL: https://github.com/apache/arrow/pull/7692#discussion_r452486948



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource 
source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* 
reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, 
reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has 
schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {

Review comment:
       This one doesn't

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -508,36 +456,93 @@ ParquetFileFragment::ParquetFileFragment(FileSource 
source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* 
reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, 
reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has 
schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  auto metadata = reader->parquet_reader()->metadata();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Ensure RowGroups are indexing valid RowGroups before augmenting.
+    if (info.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", info.id(), " but 
",
+                                source_.path(), " only has ", num_row_groups,
+                                " row groups");
+    }
+  }
+
+  for (RowGroupInfo& info : row_groups_) {

Review comment:
       yes; in this loop row group infos which have only partial statistics are 
hydrated from the reader




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to