bkietz commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428183533



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-                                " only has ", 
reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), 
std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, 
reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+                                       std::move(reader), 
std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), 
{}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, 
*reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> 
ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = 
skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), 
{row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> 
partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), 
std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> 
ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    
std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), 
std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), 
std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, 
reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, 
partition_expression(),
+                                                       
{std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, 
std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> 
filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = 
arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), 
base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> 
ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), 
&schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& 
row_group) {
+  try {

Review comment:
       please restrict the scope of try blocks to wrap just the function calls 
which may throw

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -315,20 +233,28 @@ class ParquetScanTaskIterator {
   }
 
   ParquetScanTaskIterator(std::shared_ptr<ScanOptions> options,
-                          std::shared_ptr<ScanContext> context,
-                          std::vector<int> column_projection, RowGroupSkipper 
skipper,
-                          std::unique_ptr<parquet::arrow::FileReader> reader)
+                          std::shared_ptr<ScanContext> context, FileSource 
source,
+                          std::unique_ptr<parquet::arrow::FileReader> reader,
+                          std::vector<int> column_projection,
+                          std::vector<RowGroupInfo> row_groups)
       : options_(std::move(options)),
         context_(std::move(context)),
+        source_(std::move(source)),
+        reader_(std::move(reader)),
         column_projection_(std::move(column_projection)),
-        skipper_(std::move(skipper)),
-        reader_(std::move(reader)) {}
+        row_groups_(std::move(row_groups)) {}
 
   std::shared_ptr<ScanOptions> options_;
   std::shared_ptr<ScanContext> context_;
-  std::vector<int> column_projection_;
-  RowGroupSkipper skipper_;
+
+  FileSource source_;

Review comment:
       What is this used for?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);

Review comment:
       ```suggestion
       row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());

Review comment:
       ```suggestion
         auto row_group = metadata->RowGroup(id);
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-                                " only has ", 
reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), 
std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, 
reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+                                       std::move(reader), 
std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), 
{}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, 
*reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> 
ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = 
skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), 
{row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> 
partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), 
std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> 
ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    
std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), 
std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), 
std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, 
reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, 
partition_expression(),
+                                                       
{std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, 
std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> 
filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = 
arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), 
base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> 
ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), 
&schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& 
row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to 
extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");
+    }
+
+    for (int i = 1; i < n_columns; i++) {
+      auto column = row_group.ColumnChunk(i);
+      auto column_path = column->file_path();
+      if (column_path != path) {
+        return Status::Invalid("Path '", column_path, "' not equal to path '", 
path,
+                               ", for ColumnChunk at index ", i);
+      }
+    }
+
+    return fs::internal::ConcatAbstractPath(base_path, path);
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file path from RowGroup :", 
e.what());
+  }
 }
 
-const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
-  return internal::checked_cast<const ParquetFileFormat&>(*format_);
+Result<std::vector<std::shared_ptr<FileFragment>>>
+ParquetDatasetFactory::CollectParquetFragments(
+    const parquet::FileMetaData& metadata,
+    const parquet::ArrowReaderProperties& properties) {
+  try {
+    auto n_columns = metadata.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("ParquetDatasetFactory at least one column");
+    }
+
+    std::unordered_map<std::string, std::vector<RowGroupInfo>> 
paths_and_row_group_size;
+
+    ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, 
properties));
+
+    for (int i = 0; i < metadata.num_row_groups(); i++) {
+      auto row_group = metadata.RowGroup(i);
+      ARROW_ASSIGN_OR_RAISE(auto path, FileFromRowGroup(base_path_, 
*row_group));
+      auto stats = RowGroupStatisticsAsExpression(*row_group, manifest);
+      auto num_rows = row_group->num_rows();
+
+      // Insert the path, or increase the count of row groups. It will be
+      // assumed that the RowGroup of a file are ordered exactly like in
+      // the metadata file.
+      auto elem_and_inserted =
+          paths_and_row_group_size.insert({path, {{0, num_rows, stats}}});
+      if (!elem_and_inserted.second) {
+        auto& path_and_count = *elem_and_inserted.first;
+        auto& row_groups = path_and_count.second;
+        path_and_count.second.emplace_back(row_groups.size(), num_rows, stats);
+      }
+    }
+
+    std::vector<std::shared_ptr<FileFragment>> fragments;
+    for (const auto& elem : paths_and_row_group_size) {

Review comment:
       ```suggestion
       for (auto&& elem : path_to_row_group_info) {
   ```
   std::move doesn't forward to a move cosntructor if the argument is const

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -163,124 +158,47 @@ static std::shared_ptr<Expression> 
ColumnChunkStatisticsAsExpression(
               less_equal(field_expr, scalar(max)));
 }
 
-static Result<std::shared_ptr<Expression>> RowGroupStatisticsAsExpression(
-    const parquet::RowGroupMetaData& metadata,
-    const parquet::ArrowReaderProperties& properties) {
-  ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, 
properties));
-
-  ExpressionVector expressions;
-  for (const auto& schema_field : manifest.schema_fields) {
-    expressions.emplace_back(ColumnChunkStatisticsAsExpression(schema_field, 
metadata));
+static std::shared_ptr<Expression> RowGroupStatisticsAsExpression(
+    const parquet::RowGroupMetaData& metadata, const SchemaManifest& manifest) 
{
+  const auto& fields = manifest.schema_fields;
+  ExpressionVector expressions{fields.size()};
+  for (const auto& field : fields) {
+    expressions.emplace_back(ColumnChunkStatisticsAsExpression(field, 
metadata));
   }
 
   return expressions.empty() ? scalar(true) : and_(expressions);
 }
 
-// Skip RowGroups with a filter and metadata
-class RowGroupSkipper {
- public:
-  static constexpr int kIterationDone = -1;
-
-  RowGroupSkipper(std::shared_ptr<parquet::FileMetaData> metadata,
-                  parquet::ArrowReaderProperties arrow_properties,
-                  std::shared_ptr<Expression> filter, std::vector<int> 
row_groups)
-      : metadata_(std::move(metadata)),
-        arrow_properties_(std::move(arrow_properties)),
-        filter_(std::move(filter)),
-        row_group_idx_(0),
-        row_groups_(std::move(row_groups)),
-        num_row_groups_(row_groups_.empty() ? metadata_->num_row_groups()
-                                            : 
static_cast<int>(row_groups_.size())) {}
-
-  int Next() {
-    while (row_group_idx_ < num_row_groups_) {
-      const int row_group =
-          row_groups_.empty() ? row_group_idx_++ : 
row_groups_[row_group_idx_++];
-
-      const auto row_group_metadata = metadata_->RowGroup(row_group);
-
-      const int64_t num_rows = row_group_metadata->num_rows();
-      if (CanSkip(*row_group_metadata)) {
-        rows_skipped_ += num_rows;
-        continue;
-      }
-
-      return row_group;
-    }
-
-    return kIterationDone;
-  }
-
- private:
-  bool CanSkip(const parquet::RowGroupMetaData& metadata) const {
-    auto maybe_stats_expr = RowGroupStatisticsAsExpression(metadata, 
arrow_properties_);
-    // Errors with statistics are ignored and post-filtering will apply.
-    if (!maybe_stats_expr.ok()) {
-      return false;
-    }
-
-    auto stats_expr = maybe_stats_expr.ValueOrDie();
-    return !filter_->Assume(stats_expr)->IsSatisfiable();
-  }
-
-  std::shared_ptr<parquet::FileMetaData> metadata_;
-  parquet::ArrowReaderProperties arrow_properties_;
-  std::shared_ptr<Expression> filter_;
-  int row_group_idx_;
-  std::vector<int> row_groups_;
-  int num_row_groups_;
-  int64_t rows_skipped_;
-};
-
 class ParquetScanTaskIterator {
  public:
   static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options,

Review comment:
       Nothing here can fail so we can just make the constructor public

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-                                " only has ", 
reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), 
std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, 
reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+                                       std::move(reader), 
std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), 
{}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, 
*reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> 
ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = 
skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), 
{row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> 
partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), 
std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> 
ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    
std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), 
std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), 
std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, 
reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, 
partition_expression(),
+                                                       
{std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, 
std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> 
filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = 
arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), 
base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> 
ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), 
&schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& 
row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to 
extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");
+    }
+
+    for (int i = 1; i < n_columns; i++) {
+      auto column = row_group.ColumnChunk(i);
+      auto column_path = column->file_path();
+      if (column_path != path) {
+        return Status::Invalid("Path '", column_path, "' not equal to path '", 
path,
+                               ", for ColumnChunk at index ", i);
+      }
+    }
+
+    return fs::internal::ConcatAbstractPath(base_path, path);
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file path from RowGroup :", 
e.what());
+  }
 }
 
-const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
-  return internal::checked_cast<const ParquetFileFormat&>(*format_);
+Result<std::vector<std::shared_ptr<FileFragment>>>
+ParquetDatasetFactory::CollectParquetFragments(
+    const parquet::FileMetaData& metadata,
+    const parquet::ArrowReaderProperties& properties) {
+  try {
+    auto n_columns = metadata.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("ParquetDatasetFactory at least one column");
+    }
+
+    std::unordered_map<std::string, std::vector<RowGroupInfo>> 
paths_and_row_group_size;
+
+    ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, 
properties));
+
+    for (int i = 0; i < metadata.num_row_groups(); i++) {
+      auto row_group = metadata.RowGroup(i);
+      ARROW_ASSIGN_OR_RAISE(auto path, FileFromRowGroup(base_path_, 
*row_group));
+      auto stats = RowGroupStatisticsAsExpression(*row_group, manifest);
+      auto num_rows = row_group->num_rows();
+
+      // Insert the path, or increase the count of row groups. It will be
+      // assumed that the RowGroup of a file are ordered exactly like in
+      // the metadata file.
+      auto elem_and_inserted =
+          paths_and_row_group_size.insert({path, {{0, num_rows, stats}}});
+      if (!elem_and_inserted.second) {
+        auto& path_and_count = *elem_and_inserted.first;
+        auto& row_groups = path_and_count.second;
+        path_and_count.second.emplace_back(row_groups.size(), num_rows, stats);
+      }

Review comment:
       You can avoid constructing the vector for failed insertion:
   ```suggestion
         auto it = path_to_row_group_info.insert({path, {{}}}).first;
         auto& row_groups = it->second;
         row_groups.emplace_back(row_groups.size(), num_rows, std::move(stats));
   ```

##########
File path: cpp/src/parquet/arrow/reader_internal.cc
##########
@@ -747,8 +747,7 @@ Status TypedIntegralStatisticsAsScalars(const Statistics& 
statistics,
       using CType = typename StatisticsType::T;
       return MakeMinMaxScalar<CType, StatisticsType>(statistics, min, max);
     default:
-      return Status::NotImplemented("Cannot extract statistics for type ",
-                                    logical_type->ToString());
+      return Status::NotImplemented("Cannot extract statistics for type ");

Review comment:
       Maybe this should return `optional<pair<shared_ptr<Scalar>, 
shared_ptr<Scalar>>>` instead so that callers can decide whether a verbose 
error is appropriate? Follow up, in any case

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -163,124 +158,47 @@ static std::shared_ptr<Expression> 
ColumnChunkStatisticsAsExpression(
               less_equal(field_expr, scalar(max)));
 }
 
-static Result<std::shared_ptr<Expression>> RowGroupStatisticsAsExpression(
-    const parquet::RowGroupMetaData& metadata,
-    const parquet::ArrowReaderProperties& properties) {
-  ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, 
properties));
-
-  ExpressionVector expressions;
-  for (const auto& schema_field : manifest.schema_fields) {
-    expressions.emplace_back(ColumnChunkStatisticsAsExpression(schema_field, 
metadata));
+static std::shared_ptr<Expression> RowGroupStatisticsAsExpression(
+    const parquet::RowGroupMetaData& metadata, const SchemaManifest& manifest) 
{
+  const auto& fields = manifest.schema_fields;
+  ExpressionVector expressions{fields.size()};

Review comment:
       ```suggestion
     ExpressionVector expressions;
     expressions.reserve(fields.size());
   ```
   (otherwise the first `fields.size()` entries will be null)

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);

Review comment:
       std algorithms could be used here
   ```suggestion
     auto new_end = std::filter(row_groups.begin(), row_groups.end(), [&](const 
RowGroupInfo& info) {
       return info.Satisfy(predicate);
     });
     row_groups.erase(new_end, row_groups.end());
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -163,124 +158,47 @@ static std::shared_ptr<Expression> 
ColumnChunkStatisticsAsExpression(
               less_equal(field_expr, scalar(max)));
 }
 
-static Result<std::shared_ptr<Expression>> RowGroupStatisticsAsExpression(
-    const parquet::RowGroupMetaData& metadata,
-    const parquet::ArrowReaderProperties& properties) {
-  ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, 
properties));
-
-  ExpressionVector expressions;
-  for (const auto& schema_field : manifest.schema_fields) {
-    expressions.emplace_back(ColumnChunkStatisticsAsExpression(schema_field, 
metadata));
+static std::shared_ptr<Expression> RowGroupStatisticsAsExpression(
+    const parquet::RowGroupMetaData& metadata, const SchemaManifest& manifest) 
{
+  const auto& fields = manifest.schema_fields;
+  ExpressionVector expressions{fields.size()};
+  for (const auto& field : fields) {
+    expressions.emplace_back(ColumnChunkStatisticsAsExpression(field, 
metadata));
   }
 
   return expressions.empty() ? scalar(true) : and_(expressions);
 }
 
-// Skip RowGroups with a filter and metadata
-class RowGroupSkipper {
- public:
-  static constexpr int kIterationDone = -1;
-
-  RowGroupSkipper(std::shared_ptr<parquet::FileMetaData> metadata,
-                  parquet::ArrowReaderProperties arrow_properties,
-                  std::shared_ptr<Expression> filter, std::vector<int> 
row_groups)
-      : metadata_(std::move(metadata)),
-        arrow_properties_(std::move(arrow_properties)),
-        filter_(std::move(filter)),
-        row_group_idx_(0),
-        row_groups_(std::move(row_groups)),
-        num_row_groups_(row_groups_.empty() ? metadata_->num_row_groups()
-                                            : 
static_cast<int>(row_groups_.size())) {}
-
-  int Next() {
-    while (row_group_idx_ < num_row_groups_) {
-      const int row_group =
-          row_groups_.empty() ? row_group_idx_++ : 
row_groups_[row_group_idx_++];
-
-      const auto row_group_metadata = metadata_->RowGroup(row_group);
-
-      const int64_t num_rows = row_group_metadata->num_rows();
-      if (CanSkip(*row_group_metadata)) {
-        rows_skipped_ += num_rows;
-        continue;
-      }
-
-      return row_group;
-    }
-
-    return kIterationDone;
-  }
-
- private:
-  bool CanSkip(const parquet::RowGroupMetaData& metadata) const {
-    auto maybe_stats_expr = RowGroupStatisticsAsExpression(metadata, 
arrow_properties_);
-    // Errors with statistics are ignored and post-filtering will apply.
-    if (!maybe_stats_expr.ok()) {
-      return false;
-    }
-
-    auto stats_expr = maybe_stats_expr.ValueOrDie();
-    return !filter_->Assume(stats_expr)->IsSatisfiable();
-  }
-
-  std::shared_ptr<parquet::FileMetaData> metadata_;
-  parquet::ArrowReaderProperties arrow_properties_;
-  std::shared_ptr<Expression> filter_;
-  int row_group_idx_;
-  std::vector<int> row_groups_;
-  int num_row_groups_;
-  int64_t rows_skipped_;
-};
-
 class ParquetScanTaskIterator {
  public:
   static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options,
                                        std::shared_ptr<ScanContext> context,
-                                       
std::unique_ptr<parquet::ParquetFileReader> reader,
-                                       parquet::ArrowReaderProperties 
arrow_properties,
-                                       const std::vector<int>& row_groups) {
-    auto metadata = reader->metadata();
-
-    auto column_projection = InferColumnProjection(*metadata, 
arrow_properties, options);
-
-    std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
-    RETURN_NOT_OK(parquet::arrow::FileReader::Make(context->pool, 
std::move(reader),
-                                                   arrow_properties, 
&arrow_reader));
-
-    RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                            options->filter, row_groups);
-
-    return ScanTaskIterator(ParquetScanTaskIterator(
-        std::move(options), std::move(context), std::move(column_projection),
-        std::move(skipper), std::move(arrow_reader)));
+                                       FileSource source,
+                                       
std::unique_ptr<parquet::arrow::FileReader> reader,
+                                       std::vector<RowGroupInfo> row_groups) {
+    auto column_projection = InferColumnProjection(*reader, *options);
+    return static_cast<ScanTaskIterator>(ParquetScanTaskIterator(
+        std::move(options), std::move(context), std::move(source), 
std::move(reader),
+        std::move(column_projection), std::move(row_groups)));
   }
 
   Result<std::shared_ptr<ScanTask>> Next() {
-    auto row_group = skipper_.Next();
-
-    // Iteration is done.
-    if (row_group == RowGroupSkipper::kIterationDone) {
+    if (idx_ >= row_groups_.size()) {
       return nullptr;
     }
 
+    auto row_group = row_groups_[idx_++];
     return std::shared_ptr<ScanTask>(
         new ParquetScanTask(row_group, column_projection_, reader_, options_, 
context_));
   }
 
  private:
   // Compute the column projection out of an optional arrow::Schema
-  static std::vector<int> InferColumnProjection(
-      const parquet::FileMetaData& metadata,
-      const parquet::ArrowReaderProperties& arrow_properties,
-      const std::shared_ptr<ScanOptions>& options) {
-    auto maybe_manifest = GetSchemaManifest(metadata, arrow_properties);
-    if (!maybe_manifest.ok()) {
-      return internal::Iota(metadata.num_columns());
-    }
-    auto manifest = std::move(maybe_manifest).ValueOrDie();
-
+  static std::vector<int> InferColumnProjection(const 
parquet::arrow::FileReader& reader,
+                                                const ScanOptions& options) {
+    auto manifest = reader.manifest();
     // Checks if the field is needed in either the projection or the filter.
-    auto fields_name = options->MaterializedFields();
+    auto fields_name = options.MaterializedFields();

Review comment:
       ```suggestion
       auto field_names = options.MaterializedFields();
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-                                " only has ", 
reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), 
std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, 
reader.get()));

Review comment:
       ```suggestion
                           AugmentAndFilter(std::move(row_groups), 
*options->filter, reader.get()));
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-                                " only has ", 
reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), 
std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, 
reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+                                       std::move(reader), 
std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), 
{}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, 
*reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> 
ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = 
skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), 
{row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> 
partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), 
std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> 
ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    
std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), 
std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), 
std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, 
reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, 
partition_expression(),
+                                                       
{std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, 
std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> 
filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = 
arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), 
base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> 
ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), 
&schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& 
row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to 
extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");
+    }
+
+    for (int i = 1; i < n_columns; i++) {
+      auto column = row_group.ColumnChunk(i);
+      auto column_path = column->file_path();
+      if (column_path != path) {
+        return Status::Invalid("Path '", column_path, "' not equal to path '", 
path,
+                               ", for ColumnChunk at index ", i);
+      }
+    }
+
+    return fs::internal::ConcatAbstractPath(base_path, path);
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file path from RowGroup :", 
e.what());
+  }
 }
 
-const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
-  return internal::checked_cast<const ParquetFileFormat&>(*format_);
+Result<std::vector<std::shared_ptr<FileFragment>>>
+ParquetDatasetFactory::CollectParquetFragments(
+    const parquet::FileMetaData& metadata,
+    const parquet::ArrowReaderProperties& properties) {
+  try {
+    auto n_columns = metadata.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("ParquetDatasetFactory at least one column");

Review comment:
       ```suggestion
         return Status::Invalid("ParquetDatasetFactory must contain a schema 
with at least one column");
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -97,53 +103,167 @@ class ARROW_DS_EXPORT ParquetFileFormat : public 
FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    const std::vector<int>& row_groups) const;
+                                    std::vector<RowGroupInfo> row_groups) 
const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) 
override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::vector<RowGroupInfo> row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
       std::vector<int> row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression) 
override;
+
+  /// \brief Return a FileReader on the given source.
+  Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
+      const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = 
NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public 
util::EqualityComparable<RowGroupInfo> {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> 
statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
+  static std::vector<RowGroupInfo> FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///            the filter.
+  /// If statistics are not provided, return 0.
+  int64_t num_rows() const { return num_rows_; }
+  void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
+
+  /// \brief Return the RowGroup's statistics
+  const std::shared_ptr<Expression>& statistics() const { return statistics_; }
+  void set_statistics(std::shared_ptr<Expression> statistics) {
+    statistics_ = std::move(statistics);
+  }
+
+  /// \brief Indicate if statistics are set.
+  bool HasStatistics() const { return statistics_ != NULLPTR; }
+
+  /// \brief Indicate if the RowGroup's statistics satisfy the predicate.
   ///
-  /// \return An iterator of fragment.
-  Result<FragmentIterator> GetRowGroupFragments(
-      const ParquetFileFragment& fragment,
-      std::shared_ptr<Expression> filter = scalar(true));
+  /// If the RowGroup was not initialized with statistics, it is deemd
+  bool Satisfy(const Expression& predicate) const;
+
+  /// \brief Indicate if the other RowGroup points to the same RowGroup.
+  bool Equals(const RowGroupInfo& other) const { return id() == other.id(); }
+
+ private:
+  int id_;
+  int64_t num_rows_;
+  std::shared_ptr<Expression> statistics_;
 };
 
+/// \brief A FileFragment with parquet logic.
+///
+/// ParquetFileFragment provides a lazy (with respect to IO) interface to
+/// scan parquet files. Any heavy IO calls is deferred in the Scan() method.
+///
+/// The caller can provide an optional list of selected RowGroups to limit the
+/// number of scanned RowGroups, or control parallelism partitioning.

Review comment:
       "or control parallelism partitioning" is not very clear, could you 
change this into an example use case for limiting the number of scanned 
RowGroups? (I think that matches your intent here)

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-                                " only has ", 
reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), 
std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, 
reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+                                       std::move(reader), 
std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), 
{}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, 
*reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> 
ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = 
skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), 
{row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> 
partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), 
std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> 
ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    
std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), 
std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), 
std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, 
reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {

Review comment:
       ```suggestion
     for (auto&& row_group : row_groups) {
   ```

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -770,31 +806,43 @@ cdef class ParquetFileFragment(FileFragment):
 
     @property
     def row_groups(self):
-        row_groups = set(self.parquet_file_fragment.row_groups())
-        if len(row_groups) != 0:
-            return row_groups
-        return None
+        cdef:
+            vector[CRowGroupInfo] c_row_groups
+        c_row_groups = self.parquet_file_fragment.row_groups()
+        if c_row_groups.empty():
+            return None
+        return [RowGroupInfo.wrap(row_group) for row_group in c_row_groups]
 
-    def get_row_group_fragments(self, Expression extra_filter=None):
+    def split_by_row_group(self, Expression predicate=None):
         """
+        Split the fragment in multiple fragments.

Review comment:
       ```suggestion
           Split the fragment into multiple fragments.
   ```
   Could you replicate this comment in c++?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-                                " only has ", 
reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), 
std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, 
reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+                                       std::move(reader), 
std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), 
{}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, 
*reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> 
ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = 
skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), 
{row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> 
partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), 
std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> 
ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    
std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), 
std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), 
std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, 
reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, 
partition_expression(),
+                                                       
{std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, 
std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> 
filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = 
arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), 
base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> 
ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), 
&schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& 
row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to 
extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");
+    }
+
+    for (int i = 1; i < n_columns; i++) {
+      auto column = row_group.ColumnChunk(i);
+      auto column_path = column->file_path();
+      if (column_path != path) {
+        return Status::Invalid("Path '", column_path, "' not equal to path '", 
path,
+                               ", for ColumnChunk at index ", i);
+      }
+    }
+
+    return fs::internal::ConcatAbstractPath(base_path, path);
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file path from RowGroup :", 
e.what());
+  }
 }
 
-const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
-  return internal::checked_cast<const ParquetFileFormat&>(*format_);
+Result<std::vector<std::shared_ptr<FileFragment>>>
+ParquetDatasetFactory::CollectParquetFragments(
+    const parquet::FileMetaData& metadata,
+    const parquet::ArrowReaderProperties& properties) {
+  try {
+    auto n_columns = metadata.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("ParquetDatasetFactory at least one column");
+    }
+
+    std::unordered_map<std::string, std::vector<RowGroupInfo>> 
paths_and_row_group_size;

Review comment:
       ```suggestion
       std::unordered_map<std::string, std::vector<RowGroupInfo>> 
path_to_row_group_info;
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -97,53 +103,167 @@ class ARROW_DS_EXPORT ParquetFileFormat : public 
FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    const std::vector<int>& row_groups) const;
+                                    std::vector<RowGroupInfo> row_groups) 
const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) 
override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::vector<RowGroupInfo> row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
       std::vector<int> row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression) 
override;
+
+  /// \brief Return a FileReader on the given source.
+  Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
+      const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = 
NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public 
util::EqualityComparable<RowGroupInfo> {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> 
statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
+  static std::vector<RowGroupInfo> FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///            the filter.
+  /// If statistics are not provided, return 0.
+  int64_t num_rows() const { return num_rows_; }
+  void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
+
+  /// \brief Return the RowGroup's statistics
+  const std::shared_ptr<Expression>& statistics() const { return statistics_; }
+  void set_statistics(std::shared_ptr<Expression> statistics) {
+    statistics_ = std::move(statistics);
+  }
+
+  /// \brief Indicate if statistics are set.
+  bool HasStatistics() const { return statistics_ != NULLPTR; }
+
+  /// \brief Indicate if the RowGroup's statistics satisfy the predicate.
   ///
-  /// \return An iterator of fragment.
-  Result<FragmentIterator> GetRowGroupFragments(
-      const ParquetFileFragment& fragment,
-      std::shared_ptr<Expression> filter = scalar(true));
+  /// If the RowGroup was not initialized with statistics, it is deemd
+  bool Satisfy(const Expression& predicate) const;
+
+  /// \brief Indicate if the other RowGroup points to the same RowGroup.
+  bool Equals(const RowGroupInfo& other) const { return id() == other.id(); }
+
+ private:
+  int id_;
+  int64_t num_rows_;
+  std::shared_ptr<Expression> statistics_;
 };
 
+/// \brief A FileFragment with parquet logic.
+///
+/// ParquetFileFragment provides a lazy (with respect to IO) interface to
+/// scan parquet files. Any heavy IO calls is deferred in the Scan() method.
+///
+/// The caller can provide an optional list of selected RowGroups to limit the
+/// number of scanned RowGroups, or control parallelism partitioning.
+///
+/// It can also attach optional statistics with each RowGroups, providing
+/// pushdown predicate benefits before invoking any heavy IO. This can induce
+/// significant performance boost when scanning high latency file systems.
 class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
  public:
   Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options,
                                 std::shared_ptr<ScanContext> context) override;
 
-  /// \brief The row groups viewed by this Fragment. This may be empty which 
signifies all
-  /// row groups are selected.
-  const std::vector<int>& row_groups() const { return row_groups_; }
+  Result<FragmentVector> SplitByRowGroup(const std::shared_ptr<Expression>& 
predicate);
+
+  /// \brief Return the RowGroups selected by this fragment. An empty list
+  /// represents all RowGroups in the parquet file.
+  const std::vector<RowGroupInfo>& row_groups() const { return row_groups_; }
 
  private:
   ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
                       std::shared_ptr<Expression> partition_expression,
-                      std::vector<int> row_groups)
-      : FileFragment(std::move(source), std::move(format),
-                     std::move(partition_expression)),
-        row_groups_(std::move(row_groups)) {}
-
-  const ParquetFileFormat& parquet_format() const;
+                      std::vector<RowGroupInfo> row_groups);
 
-  std::vector<int> row_groups_;
+  std::vector<RowGroupInfo> row_groups_;
+  ParquetFileFormat& parquet_format_;
 
   friend class ParquetFileFormat;
 };
 
+/// \brief Create FileSystemDataset from custom `_metadata` cache file.
+///
+/// Dask and other systems will generate a cache metadata file by concatenating
+/// the RowGroupMetaData of multiple parquet files in a single parquet file.

Review comment:
       ```suggestion
   /// the RowGroupMetaData of multiple parquet files into a single (otherwise
   /// empty) parquet file.
   ```

##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -97,53 +103,167 @@ class ARROW_DS_EXPORT ParquetFileFormat : public 
FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    const std::vector<int>& row_groups) const;
+                                    std::vector<RowGroupInfo> row_groups) 
const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) 
override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::vector<RowGroupInfo> row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
       std::vector<int> row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression) 
override;
+
+  /// \brief Return a FileReader on the given source.
+  Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
+      const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = 
NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public 
util::EqualityComparable<RowGroupInfo> {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> 
statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
+  static std::vector<RowGroupInfo> FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///            the filter.
+  /// If statistics are not provided, return 0.
+  int64_t num_rows() const { return num_rows_; }
+  void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
+
+  /// \brief Return the RowGroup's statistics
+  const std::shared_ptr<Expression>& statistics() const { return statistics_; }
+  void set_statistics(std::shared_ptr<Expression> statistics) {
+    statistics_ = std::move(statistics);
+  }
+
+  /// \brief Indicate if statistics are set.
+  bool HasStatistics() const { return statistics_ != NULLPTR; }
+
+  /// \brief Indicate if the RowGroup's statistics satisfy the predicate.
   ///
-  /// \return An iterator of fragment.
-  Result<FragmentIterator> GetRowGroupFragments(
-      const ParquetFileFragment& fragment,
-      std::shared_ptr<Expression> filter = scalar(true));
+  /// If the RowGroup was not initialized with statistics, it is deemd

Review comment:
       ```suggestion
     /// This will return true if the RowGroup was not initialized with 
statistics
     /// (rather than silently reading metadata for a complete check).
   ```
   Probably overkill: this could return `optional<bool>`, with `nullopt` 
indicating that we have not performed a complete check?

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -380,77 +316,297 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector<RowGroupInfo> FilterRowGroups(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+    const auto& info = row_groups[i];
+    if (info.Satisfy(predicate)) {
+      row_groups[idx++] = info;
+    }
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result<std::vector<RowGroupInfo>> AugmentAndFilter(
+    std::vector<RowGroupInfo> row_groups, const Expression& predicate,
+    parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+    auto id = info.id();
+    if (!info.HasStatistics() && id < num_row_groups) {
+      auto row_group = metadata->RowGroup(info.id());
+      info.set_num_rows(row_group->num_rows());
+      info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+    }
+  };
+
+  if (row_groups.empty()) {
+    row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+    augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
     const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+    // Apply a pre-filtering if the user requested an explicit sub-set of
+    // row-groups. In the case where a RowGroup doesn't have statistics
+    // metdata, it will not be excluded.
+    row_groups = FilterRowGroups(row_groups, *options->filter);
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
+  }
 
-  for (int i : row_groups) {
-    if (i >= reader->metadata()->num_row_groups()) {
-      return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-                                " only has ", 
reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+    if (row_group.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+                                source.path(), " only has ", num_row_groups,
                                 " row groups");
     }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-                                       std::move(reader), 
std::move(arrow_properties),
-                                       row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+                        AugmentAndFilter(row_groups, *options->filter, 
reader.get()));
+
+  if (row_groups.empty()) {
+    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+                                       std::move(reader), 
std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups) {
   return std::shared_ptr<FileFragment>(
       new ParquetFileFragment(std::move(source), shared_from_this(),
                               std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::vector<int> row_groups) {
+  return std::shared_ptr<FileFragment>(new ParquetFileFragment(
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression), 
{}));
 }
 
-Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
-    const ParquetFileFragment& fragment, std::shared_ptr<Expression> filter) {
-  auto properties = MakeReaderProperties(*this);
-  ARROW_ASSIGN_OR_RAISE(auto reader,
-                        OpenReader(fragment.source(), std::move(properties)));
-
-  auto arrow_properties =
-      MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, 
*reader);
-  auto metadata = reader->metadata();
+///
+/// RowGroupInfo
+///
 
-  auto row_groups = fragment.row_groups();
-  if (row_groups.empty()) {
-    row_groups = internal::Iota(metadata->num_row_groups());
+std::vector<RowGroupInfo> RowGroupInfo::FromIdentifiers(const std::vector<int> 
ids) {
+  std::vector<RowGroupInfo> results;
+  results.reserve(ids.size());
+  for (auto i : ids) {
+    results.emplace_back(i);
   }
-  FragmentVector fragments(row_groups.size());
-
-  RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
-                          std::move(filter), std::move(row_groups));
+  return results;
+}
 
-  for (int i = 0, row_group = skipper.Next();
-       row_group != RowGroupSkipper::kIterationDone; row_group = 
skipper.Next()) {
-    ARROW_ASSIGN_OR_RAISE(
-        fragments[i++],
-        MakeFragment(fragment.source(), fragment.partition_expression(), 
{row_group}));
+std::vector<RowGroupInfo> RowGroupInfo::FromCount(int count) {
+  std::vector<RowGroupInfo> result;
+  result.reserve(count);
+  for (int i = 0; i < count; i++) {
+    result.emplace_back(i);
   }
+  return result;
+}
 
-  return MakeVectorIterator(std::move(fragments));
+bool RowGroupInfo::Satisfy(const Expression& predicate) const {
+  return !HasStatistics() || predicate.IsSatisfiableWith(statistics_);
 }
 
+///
+/// ParquetFileFragment
+///
+
+ParquetFileFragment::ParquetFileFragment(FileSource source,
+                                         std::shared_ptr<FileFormat> format,
+                                         std::shared_ptr<Expression> 
partition_expression,
+                                         std::vector<RowGroupInfo> row_groups)
+    : FileFragment(std::move(source), std::move(format), 
std::move(partition_expression)),
+      row_groups_(std::move(row_groups)),
+      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}
+
 Result<ScanTaskIterator> 
ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                                    
std::shared_ptr<ScanContext> context) {
-  return parquet_format().ScanFile(source_, std::move(options), 
std::move(context),
-                                   row_groups_);
+  return parquet_format_.ScanFile(source_, std::move(options), 
std::move(context),
+                                  row_groups_);
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
+  ARROW_ASSIGN_OR_RAISE(auto row_groups,
+                        AugmentAndFilter(row_groups_, *predicate, 
reader.get()));
+
+  FragmentVector fragments;
+  for (auto row_group : row_groups) {
+    ARROW_ASSIGN_OR_RAISE(auto fragment,
+                          parquet_format_.MakeFragment(source_, 
partition_expression(),
+                                                       
{std::move(row_group)}));
+    fragments.push_back(std::move(fragment));
+  }
+
+  return fragments;
+}
+
+///
+/// ParquetDatasetFactory
+///
+
+ParquetDatasetFactory::ParquetDatasetFactory(
+    std::shared_ptr<fs::FileSystem> filesystem, 
std::shared_ptr<ParquetFileFormat> format,
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    : filesystem_(std::move(filesystem)),
+      format_(std::move(format)),
+      metadata_(std::move(metadata)),
+      base_path_(std::move(base_path)) {}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const std::string& metadata_path, std::shared_ptr<fs::FileSystem> 
filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
+  // directory of all parquet files is `dirname(metadata_path)`.
+  auto dirname = 
arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
+  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+}
+
+Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
+    const FileSource& metadata_source, const std::string& base_path,
+    std::shared_ptr<fs::FileSystem> filesystem,
+    std::shared_ptr<ParquetFileFormat> format) {
+  DCHECK_NE(filesystem, nullptr);
+  DCHECK_NE(format, nullptr);
+
+  ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
+  auto metadata = reader->parquet_reader()->metadata();
+
+  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
+      std::move(filesystem), std::move(format), std::move(metadata), 
base_path));
+}
+
+Result<std::vector<std::shared_ptr<Schema>>> 
ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), 
&schema));
+  return std::vector<std::shared_ptr<Schema>>{schema};
+}
+
+static Result<std::string> FileFromRowGroup(const std::string& base_path,
+                                            const parquet::RowGroupMetaData& 
row_group) {
+  try {
+    auto n_columns = row_group.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("RowGroup must have a least one columns to 
extract path");
+    }
+
+    auto first_column = row_group.ColumnChunk(0);
+    auto path = first_column->file_path();
+    if (path == "") {
+      return Status::Invalid("Got empty file path");
+    }
+
+    for (int i = 1; i < n_columns; i++) {
+      auto column = row_group.ColumnChunk(i);
+      auto column_path = column->file_path();
+      if (column_path != path) {
+        return Status::Invalid("Path '", column_path, "' not equal to path '", 
path,
+                               ", for ColumnChunk at index ", i);
+      }
+    }
+
+    return fs::internal::ConcatAbstractPath(base_path, path);
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file path from RowGroup :", 
e.what());
+  }
 }
 
-const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
-  return internal::checked_cast<const ParquetFileFormat&>(*format_);
+Result<std::vector<std::shared_ptr<FileFragment>>>
+ParquetDatasetFactory::CollectParquetFragments(
+    const parquet::FileMetaData& metadata,
+    const parquet::ArrowReaderProperties& properties) {
+  try {
+    auto n_columns = metadata.num_columns();
+    if (n_columns == 0) {
+      return Status::Invalid("ParquetDatasetFactory at least one column");
+    }
+
+    std::unordered_map<std::string, std::vector<RowGroupInfo>> 
paths_and_row_group_size;
+
+    ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, 
properties));
+
+    for (int i = 0; i < metadata.num_row_groups(); i++) {
+      auto row_group = metadata.RowGroup(i);
+      ARROW_ASSIGN_OR_RAISE(auto path, FileFromRowGroup(base_path_, 
*row_group));
+      auto stats = RowGroupStatisticsAsExpression(*row_group, manifest);
+      auto num_rows = row_group->num_rows();
+
+      // Insert the path, or increase the count of row groups. It will be
+      // assumed that the RowGroup of a file are ordered exactly like in
+      // the metadata file.
+      auto elem_and_inserted =
+          paths_and_row_group_size.insert({path, {{0, num_rows, stats}}});
+      if (!elem_and_inserted.second) {
+        auto& path_and_count = *elem_and_inserted.first;
+        auto& row_groups = path_and_count.second;
+        path_and_count.second.emplace_back(row_groups.size(), num_rows, stats);
+      }
+    }
+
+    std::vector<std::shared_ptr<FileFragment>> fragments;
+    for (const auto& elem : paths_and_row_group_size) {
+      ARROW_ASSIGN_OR_RAISE(auto fragment,
+                            format_->MakeFragment({std::move(elem.first), 
filesystem_},
+                                                  scalar(true), 
std::move(elem.second)));
+      fragments.push_back(std::move(fragment));
+    }
+
+    return fragments;
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file paths from FileMetaData:", 
e.what());
+  }
+}
+
+Result<std::shared_ptr<Dataset>> ParquetDatasetFactory::Finish(FinishOptions 
options) {
+  std::shared_ptr<Schema> schema = options.schema;
+  bool schema_missing = schema == nullptr;
+  if (schema_missing) {
+    ARROW_ASSIGN_OR_RAISE(schema, Inspect(options.inspect_options));
+  }
+
+  auto properties = MakeArrowReaderProperties(*format_, *metadata_);
+  ARROW_ASSIGN_OR_RAISE(auto fragments, CollectParquetFragments(*metadata_, 
properties));
+  return FileSystemDataset::Make(std::move(schema), scalar(true), format_, 
fragments);

Review comment:
       ```suggestion
     return FileSystemDataset::Make(std::move(schema), scalar(true), format_, 
std::move(fragments));
   ```

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -177,32 +177,38 @@ static inline RecordBatchVector FlattenRecordBatchVector(
   return flattened;
 }
 
+struct TableAssemblyState {
+  /// Protecting mutating accesses to batches
+  std::mutex mutex{};
+  std::vector<RecordBatchVector> batches{};
+
+  void Emplace(RecordBatchVector b, size_t position) {
+    std::lock_guard<std::mutex> lock(mutex);
+    if (batches.size() <= position) {
+      batches.resize(position + 1);
+    }
+    batches[position] = std::move(b);
+  }
+};
+
 Result<std::shared_ptr<Table>> Scanner::ToTable() {
   ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
   auto task_group = scan_context_->TaskGroup();
 
-  // Protecting mutating accesses to batches
-  std::mutex mutex;
-  std::vector<RecordBatchVector> batches;
+  /// Wraps the state in a shared_ptr to ensure that a failing ScanTask don't
+  /// invalidate the concurrent running tasks because Finish() early returns
+  /// and the mutex/batches may got out of scope.

Review comment:
       ```suggestion
     /// Wraps the state in a shared_ptr to ensure that failing ScanTasks don't
     /// invalidate concurrently running tasks when Finish() early returns
     /// and the mutex/batches fall out of scope.
   ```
   nice catch

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -770,31 +806,43 @@ cdef class ParquetFileFragment(FileFragment):
 
     @property
     def row_groups(self):
-        row_groups = set(self.parquet_file_fragment.row_groups())
-        if len(row_groups) != 0:
-            return row_groups
-        return None
+        cdef:
+            vector[CRowGroupInfo] c_row_groups
+        c_row_groups = self.parquet_file_fragment.row_groups()
+        if c_row_groups.empty():
+            return None
+        return [RowGroupInfo.wrap(row_group) for row_group in c_row_groups]
 
-    def get_row_group_fragments(self, Expression extra_filter=None):
+    def split_by_row_group(self, Expression predicate=None):
         """
+        Split the fragment in multiple fragments.
+
         Yield a Fragment wrapping each row group in this ParquetFileFragment.
-        Row groups will be excluded whose metadata contradicts the either the
-        filter provided on construction of this Fragment or the extra_filter
-        argument.
+        Row groups will be excluded whose metadata contradicts the optional
+        predicate.
+
+        Parameters
+        ----------
+        predicate : Expression, default None
+            Exclude RowGroups whose statistics contradicts the predicate.
+
+        Returns
+        -------
+        A generator of Fragment.
         """
         cdef:
-            CParquetFileFormat* c_format
-            CFragmentIterator c_fragments
-            shared_ptr[CExpression] c_extra_filter
+            vector[shared_ptr[CFragment]] c_fragments
+            shared_ptr[CExpression] c_predicate
+            shared_ptr[CFragment] c_fragment
 
         schema = self.physical_schema
-        c_extra_filter = _insert_implicit_casts(extra_filter, schema)
-        c_format = <CParquetFileFormat*> self.file_fragment.format().get()
-        c_fragments = move(GetResultValue(c_format.GetRowGroupFragments(deref(
-            self.parquet_file_fragment), move(c_extra_filter))))
+        c_predicate = _insert_implicit_casts(predicate, schema)
+        with nogil:
+            c_fragments = move(GetResultValue(
+                self.parquet_file_fragment.SplitByRowGroup(move(c_predicate))))
 
-        for maybe_fragment in c_fragments:
-            yield Fragment.wrap(GetResultValue(move(maybe_fragment)))
+        for c_fragment in c_fragments:
+            yield Fragment.wrap(c_fragment)

Review comment:
       agreed
   ```suggestion
           return [Fragment.wrap(c_fragment) for c_fragment in c_fragments]
   ```

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1446,6 +1494,47 @@ cdef class UnionDatasetFactory(DatasetFactory):
         self.union_factory = <CUnionDatasetFactory*> sp.get()
 
 
+cdef class ParquetDatasetFactory(DatasetFactory):
+    """
+    Create a ParquetDatasetFactory from a Parquet `_metadata` file.
+
+    Parameters
+    ----------
+    metadata_path: str

Review comment:
       ```suggestion
       metadata_path : str
   ```
   Not sure if the doccomment parser actually relies on that space

##########
File path: cpp/src/arrow/dataset/file_parquet.h
##########
@@ -97,53 +103,167 @@ class ARROW_DS_EXPORT ParquetFileFormat : public 
FileFormat {
   Result<ScanTaskIterator> ScanFile(const FileSource& source,
                                     std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    const std::vector<int>& row_groups) const;
+                                    std::vector<RowGroupInfo> row_groups) 
const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) 
override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::vector<RowGroupInfo> row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
       std::vector<int> row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression) 
override;
+
+  /// \brief Return a FileReader on the given source.
+  Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
+      const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = 
NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public 
util::EqualityComparable<RowGroupInfo> {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> 
statistics)
+      : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
+  static std::vector<RowGroupInfo> FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///            the filter.
+  /// If statistics are not provided, return 0.
+  int64_t num_rows() const { return num_rows_; }
+  void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
+
+  /// \brief Return the RowGroup's statistics
+  const std::shared_ptr<Expression>& statistics() const { return statistics_; }
+  void set_statistics(std::shared_ptr<Expression> statistics) {
+    statistics_ = std::move(statistics);
+  }
+
+  /// \brief Indicate if statistics are set.
+  bool HasStatistics() const { return statistics_ != NULLPTR; }
+
+  /// \brief Indicate if the RowGroup's statistics satisfy the predicate.
   ///
-  /// \return An iterator of fragment.
-  Result<FragmentIterator> GetRowGroupFragments(
-      const ParquetFileFragment& fragment,
-      std::shared_ptr<Expression> filter = scalar(true));
+  /// If the RowGroup was not initialized with statistics, it is deemd
+  bool Satisfy(const Expression& predicate) const;
+
+  /// \brief Indicate if the other RowGroup points to the same RowGroup.
+  bool Equals(const RowGroupInfo& other) const { return id() == other.id(); }
+
+ private:
+  int id_;
+  int64_t num_rows_;
+  std::shared_ptr<Expression> statistics_;
 };
 
+/// \brief A FileFragment with parquet logic.
+///
+/// ParquetFileFragment provides a lazy (with respect to IO) interface to
+/// scan parquet files. Any heavy IO calls is deferred in the Scan() method.

Review comment:
       ```suggestion
   /// scan parquet files. Any heavy IO calls are deferred to the Scan() method.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to