fsaintjacques commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428715816



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +317,316 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline bool RowGroupInfosAreComplete(const std::vector<RowGroupInfo>& 
infos) {
+  return !infos.empty() &&
+         std::all_of(infos.cbegin(), infos.cend(),
+                     [](const RowGroupInfo& i) { return i.HasStatistics(); });
+}
+
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  auto filter = [&predicate](const RowGroupInfo& info) {
+    return !info.Satisfy(predicate);
+  };
+  auto end = std::remove_if(row_groups.begin(), row_groups.end(), filter);
+  row_groups.erase(end, row_groups.end());
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentRowGroups(
+    std::vector<RowGroupInfo> row_groups, parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    if (!info.HasStatistics() && info.id() < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+  std::for_each(row_groups.begin(), row_groups.end(), augment);
+
+  return row_groups;
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  bool row_groups_are_complete = RowGroupInfosAreComplete(row_groups);
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (row_groups_are_complete) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
+
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-                                " only has ", 
reader->metadata()->num_row_groups(),
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), 
std::move(arrow_properties),
-                                       row_groups);
+  if (!row_groups_are_complete) {
+    ARROW_ASSIGN_OR_RAISE(row_groups,
+                          AugmentRowGroups(std::move(row_groups), 
reader.get()));
+    row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+  }
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+                                       std::move(reader), 
std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), 
{}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
+///
+/// RowGroupInfo
+///
 
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, 
*reader);
-  auto metadata = reader->metadata();
-
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> 
ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = 
skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), 
{row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> 
partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), 
std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)),
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+
 Result<ScanTaskIterator> 
ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    
std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), 
std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), 
std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  std::vector<RowGroupInfo> row_groups;
+  if (HasCompleteMetadata()) {

Review comment:
       @jorisvandenbossche this is now lazy.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to