jorisvandenbossche commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r427163007



##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -97,53 +103,167 @@ class ARROW_DS_EXPORT ParquetFileFormat : public 
FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    const std::vector<int>& row_groups) const;
+                                    std::vector<RowGroupInfo> row_groups) 
const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) 
override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::vector<RowGroupInfo> row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
       std::vector<int> row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression) 
override;
+
+  /// \brief Return a FileReader on the given source.
+  Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
+      const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = 
NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public 
util::EqualityComparable<RowGroupInfo> {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> 
statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
+  static std::vector<RowGroupInfo> FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///            the filter.
+  /// If statistics are not provided, return 0.
+  int64_t num_rows() const { return num_rows_; }
+  void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
+
+  /// \brief Return the RowGroup's statistics
+  const std::shared_ptr<Expression>& statistics() const { return statistics_; }
+  void set_statistics(std::shared_ptr<Expression> statistics) {
+    statistics_ = std::move(statistics);
+  }
+
+  /// \brief Indicate if statistics are set.
+  bool HasStatistics() const { return statistics_ != NULLPTR; }
+
+  /// \brief Indicate if the RowGroup's statistics satisfy the predicate.
   ///
-  /// \return An iterator of fragment.
-  Result<FragmentIterator> GetRowGroupFragments(
-      const ParquetFileFragment& fragment,
-      std::shared_ptr<Expression> filter = scalar(true));
+  /// If the RowGroup was not initialized with statistics, it is deemd

Review comment:
       missing word at the end?

##########
File path: python/pyarrow/dataset.py
##########
@@ -443,6 +445,53 @@ def _union_dataset(children, schema=None, **kwargs):
     return UnionDataset(schema, children)
 
 
+def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None):
+    """
+    Create a FileSystemDataset from a `_metadata` file created via
+    `pyarrrow.parquet.write_metadata`.
+
+    Parameters
+    ----------
+    metadata_path : path,
+        Path pointing to a single file parquet metadata file
+    schema : Schema, optional
+        Optionally provide the Schema for the Dataset, in which case it will
+        not be inferred from the source.
+    filesystem : FileSystem or URI string, default None
+        If a single path is given as source and filesystem is None, then the
+        filesystem will be inferred from the path.
+        If an URI string is passed, then a filesystem object is constructed
+        using the URI's optional path component as a directory prefix. See the
+        examples below.
+        Note that the URIs on Windows must follow 'file:///C:...' or
+        'file:/C:...' patterns.
+    format : ParquetFileFormat
+        An instance of a ParquetFileFormat if special options needs to be
+        passed.
+
+    Returns
+    -------
+    FileSystemDataset
+    """
+    from pyarrow.fs import LocalFileSystem
+
+    if not isinstance(metadata_path, str):
+        raise ValueError("metadata_path argument must be a string")

Review comment:
       ```suggestion
       metadata_path = _stringify_path(metadata_path)
   ```
   
   (then it also handles pathlib.Path objects, and will also raise an error if 
is not a string / cannot be converted to a string)

##########
File path: cpp/src/parquet/metadata.h
##########
@@ -258,10 +258,26 @@ class PARQUET_EXPORT FileMetaData {
 
   const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;
 
-  // Set file_path ColumnChunk fields to a particular value
+  /// \brief Set a path to all ColumnChunk for all RowGroups.
+  ///
+  /// Commonly used by systems (Dask, Spark) who generates an metadata-only
+  /// parquet file. The path are usually relative to said index file.
+  ///
+  /// \param[in] path to set.
   void set_file_path(const std::string& path);
 
-  // Merge row-group metadata from "other" FileMetaData object
+  /// \brief Merge row groups from another metadata file into this one.
+  ///
+  /// The schema of the input FileMetaData must be equal to the
+  /// schema of this object.
+  ///
+  /// This is used by systems who creates an aggregate metadata only file by

Review comment:
       ```suggestion
     /// This is used by systems who creates an aggregate metadata-only file by
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-                                " only has ", 
reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), 
std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, 
reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+                                       std::move(reader), 
std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), 
{}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, 
*reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> 
ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = 
skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), 
{row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> 
partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), 
std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> 
ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    
std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), 
std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), 
std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, 
reader.get()));

Review comment:
       Does this mean that using `SplitByRowGroup` currently always invokes IO? 
(in principle it could be filtered/split based on the already-read RowGroupInfo 
objects ?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-                                " only has ", 
reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), 
std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, 
reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+                                       std::move(reader), 
std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), 
{}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, 
*reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> 
ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = 
skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), 
{row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> 
partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), 
std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> 
ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    
std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), 
std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), 
std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, 
reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, 
partition_expression(),
+                                                       
{std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, 
std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> 
filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = 
arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), 
base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> 
ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), 
&schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& 
row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to 
extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");

Review comment:
       Or reusing the phrase you used below as well: "Could not infer file 
paths from FileMetaData: .."

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -770,31 +806,43 @@ cdef class ParquetFileFragment(FileFragment):
 
     @property
     def row_groups(self):
-        row_groups = set(self.parquet_file_fragment.row_groups())
-        if len(row_groups) != 0:
-            return row_groups
-        return None
+        cdef:
+            vector[CRowGroupInfo] c_row_groups
+        c_row_groups = self.parquet_file_fragment.row_groups()
+        if c_row_groups.empty():
+            return None
+        return [RowGroupInfo.wrap(row_group) for row_group in c_row_groups]
 
-    def get_row_group_fragments(self, Expression extra_filter=None):
+    def split_by_row_group(self, Expression predicate=None):
         """
+        Split the fragment in multiple fragments.
+
         Yield a Fragment wrapping each row group in this ParquetFileFragment.
-        Row groups will be excluded whose metadata contradicts the either the
-        filter provided on construction of this Fragment or the extra_filter
-        argument.
+        Row groups will be excluded whose metadata contradicts the optional
+        predicate.
+
+        Parameters
+        ----------
+        predicate : Expression, default None
+            Exclude RowGroups whose statistics contradicts the predicate.
+
+        Returns
+        -------
+        A generator of Fragment.
         """
         cdef:
-            CParquetFileFormat* c_format
-            CFragmentIterator c_fragments
-            shared_ptr[CExpression] c_extra_filter
+            vector[shared_ptr[CFragment]] c_fragments
+            shared_ptr[CExpression] c_predicate
+            shared_ptr[CFragment] c_fragment
 
         schema = self.physical_schema
-        c_extra_filter = _insert_implicit_casts(extra_filter, schema)
-        c_format = <CParquetFileFormat*> self.file_fragment.format().get()
-        c_fragments = move(GetResultValue(c_format.GetRowGroupFragments(deref(
-            self.parquet_file_fragment), move(c_extra_filter))))
+        c_predicate = _insert_implicit_casts(predicate, schema)
+        with nogil:
+            c_fragments = move(GetResultValue(
+                self.parquet_file_fragment.SplitByRowGroup(move(c_predicate))))
 
-        for maybe_fragment in c_fragments:
-            yield Fragment.wrap(GetResultValue(move(maybe_fragment)))
+        for c_fragment in c_fragments:
+            yield Fragment.wrap(c_fragment)

Review comment:
       I am wondering, is it still needed to have this yield (generator) 
instead of returning a list of fragments? As by now, all the heavy work is 
already done, I think? (the potentially reading of parquet metadata and 
filtering, since the `SplitByRowGroup` is not a lazy iterator on the C++ side)

##########
File path: python/pyarrow/dataset.py
##########
@@ -35,11 +35,13 @@
     Fragment,
     HivePartitioning,
     IpcFileFormat,
+    ParquetDatasetFactory,
     ParquetFileFormat,
     ParquetFileFragment,
     ParquetReadOptions,
     Partitioning,
     PartitioningFactory,
+    RowGroupInfo,

Review comment:
       Maybe we should not expose this publicly? (is there a reason you would 
want to use this directly?)

##########
File path: cpp/src/parquet/metadata.h
##########
@@ -258,10 +258,26 @@ class PARQUET_EXPORT FileMetaData {
 
   const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;
 
-  // Set file_path ColumnChunk fields to a particular value
+  /// \brief Set a path to all ColumnChunk for all RowGroups.
+  ///
+  /// Commonly used by systems (Dask, Spark) who generates an metadata-only
+  /// parquet file. The path are usually relative to said index file.

Review comment:
       ```suggestion
     /// parquet file. The path is usually relative to said index file.
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -97,53 +103,167 @@ class ARROW_DS_EXPORT ParquetFileFormat : public 
FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    const std::vector<int>& row_groups) const;
+                                    std::vector<RowGroupInfo> row_groups) 
const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) 
override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::vector<RowGroupInfo> row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
       std::vector<int> row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression) 
override;
+
+  /// \brief Return a FileReader on the given source.
+  Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
+      const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = 
NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public 
util::EqualityComparable<RowGroupInfo> {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> 
statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
+  static std::vector<RowGroupInfo> FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///            the filter.
+  /// If statistics are not provided, return 0.

Review comment:
       On line 134 above, it seems that the default is -1 when the number of 
rows is not specified / not known (and not 0) ?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-                                " only has ", 
reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), 
std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, 
reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+                                       std::move(reader), 
std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), 
{}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, 
*reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> 
ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = 
skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), 
{row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> 
partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), 
std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> 
ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    
std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), 
std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), 
std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, 
reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, 
partition_expression(),
+                                                       
{std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, 
std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> 
filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = 
arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), 
base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> 
ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), 
&schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& 
row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to 
extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");

Review comment:
       It might be useful to have a bit longer / more explicit error message 
here (I actually ran into this, because an older version of dask was creating 
wrong metadata .. (fixed some time ago), and at first I didn't understand what 
is was about).
   
   Something like: *"Invalid metadata file for constructing a Parquet dataset: 
the column chunks file path should be set, but got empty file path."*, or 
something like this

##########
File path: python/pyarrow/parquet.py
##########
@@ -1739,33 +1739,53 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
             metadata_collector[-1].set_file_path(outfile)
 
 
-def write_metadata(schema, where, version='1.0',
-                   use_deprecated_int96_timestamps=False,
-                   coerce_timestamps=None):
+def write_metadata(schema, where, metadata_collector=None, **kwargs):
     """
-    Write metadata-only Parquet file from schema.
+    Write metadata-only Parquet file from schema. This can be used with
+    `write_to_dataset` to generate `_common_metadata` and `_metadata` sidecar
+    files.
 
     Parameters
     ----------
     schema : pyarrow.Schema
     where: string or pyarrow.NativeFile
-    version : {"1.0", "2.0"}, default "1.0"
-        The Parquet format version, defaults to 1.0.
-    use_deprecated_int96_timestamps : bool, default False
-        Write nanosecond resolution timestamps to INT96 Parquet format.
-    coerce_timestamps : str, default None
-        Cast timestamps a particular resolution.
-        Valid values: {None, 'ms', 'us'}.
-    filesystem : FileSystem, default None
-        If nothing passed, paths assumed to be found in the local on-disk
-        filesystem.
+    metadata_collector:
+    **kwargs : dict,
+        Additional kwargs for ParquetWriter class. See docstring for
+        `ParquetWriter` for more information.
+
+    Examples
+    --------
+
+    Write a dataset and collect metadata information.
+
+    >>> metadata_collector=[]

Review comment:
       ```suggestion
       >>> metadata_collector = []
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to