fsaintjacques commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428664320



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-                                " only has ", 
reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), 
std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, 
reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+                                       std::move(reader), 
std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), 
{}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, 
*reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> 
ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = 
skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), 
{row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> 
partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), 
std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> 
ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    
std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), 
std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), 
std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, 
reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, 
partition_expression(),
+                                                       
{std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, 
std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> 
filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = 
arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), 
base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> 
ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), 
&schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& 
row_group) {
+  try {

Review comment:
       All parquet calls can throw, it's just simpler otherwise I have variable 
scope issues wrapped in multiple blocks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to