This is an automated email from the ASF dual-hosted git repository.

fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new bfdfeda  ARROW-8943: [C++][Python][Dataset] Add partitioning support 
to ParquetDatasetFactory
bfdfeda is described below

commit bfdfeda43ae09b68af058ca4ac274e5619298b44
Author: François Saint-Jacques <[email protected]>
AuthorDate: Thu Jun 18 11:24:57 2020 -0400

    ARROW-8943: [C++][Python][Dataset] Add partitioning support to 
ParquetDatasetFactory
    
    Closes #7437 from fsaintjacques/ARROW-8943-parquet-dataset-partition
    
    Authored-by: François Saint-Jacques <[email protected]>
    Signed-off-by: François Saint-Jacques <[email protected]>
---
 cpp/examples/arrow/dataset-parquet-scan-example.cc |   4 +-
 cpp/src/arrow/dataset/discovery.cc                 |  36 +------
 cpp/src/arrow/dataset/discovery.h                  |   3 -
 cpp/src/arrow/dataset/file_parquet.cc              | 116 ++++++++++++++++-----
 cpp/src/arrow/dataset/file_parquet.h               |  46 +++++++-
 cpp/src/arrow/dataset/partition.cc                 |  25 +++++
 cpp/src/arrow/dataset/partition.h                  |  12 ++-
 cpp/src/arrow/dataset/partition_test.cc            |  25 ++++-
 python/pyarrow/_dataset.pyx                        |  89 +++++++++++++++-
 python/pyarrow/dataset.py                          |  21 +++-
 python/pyarrow/includes/libarrow_dataset.pxd       |  11 +-
 python/pyarrow/tests/test_dataset.py               |  18 ++--
 12 files changed, 320 insertions(+), 86 deletions(-)

diff --git a/cpp/examples/arrow/dataset-parquet-scan-example.cc 
b/cpp/examples/arrow/dataset-parquet-scan-example.cc
index ed4b89d..3cdd298 100644
--- a/cpp/examples/arrow/dataset-parquet-scan-example.cc
+++ b/cpp/examples/arrow/dataset-parquet-scan-example.cc
@@ -100,7 +100,9 @@ std::shared_ptr<ds::Dataset> GetDatasetFromDirectory(
 std::shared_ptr<ds::Dataset> GetParquetDatasetFromMetadata(
     std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat> 
format,
     std::string metadata_path) {
-  auto factory = ds::ParquetDatasetFactory::Make(metadata_path, fs, 
format).ValueOrDie();
+  ds::ParquetFactoryOptions options;
+  auto factory =
+      ds::ParquetDatasetFactory::Make(metadata_path, fs, format, 
options).ValueOrDie();
   return factory->Finish().ValueOrDie();
 }
 
diff --git a/cpp/src/arrow/dataset/discovery.cc 
b/cpp/src/arrow/dataset/discovery.cc
index 295f07d..84e2502 100644
--- a/cpp/src/arrow/dataset/discovery.cc
+++ b/cpp/src/arrow/dataset/discovery.cc
@@ -110,12 +110,6 @@ FileSystemDatasetFactory::FileSystemDatasetFactory(
       format_(std::move(format)),
       options_(std::move(options)) {}
 
-util::optional<util::string_view> 
FileSystemDatasetFactory::RemovePartitionBaseDir(
-    util::string_view path) {
-  const util::string_view partition_base_dir{options_.partition_base_dir};
-  return fs::internal::RemoveAncestor(partition_base_dir, path);
-}
-
 Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
     std::shared_ptr<fs::FileSystem> filesystem, const 
std::vector<std::string>& paths,
     std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options) {
@@ -186,23 +180,6 @@ Result<std::shared_ptr<DatasetFactory>> 
FileSystemDatasetFactory::Make(
               std::move(options));
 }
 
-Result<std::shared_ptr<Schema>> FileSystemDatasetFactory::PartitionSchema() {
-  if (auto partitioning = options_.partitioning.partitioning()) {
-    return partitioning->schema();
-  }
-
-  std::vector<std::string> relative_paths;
-  for (const auto& path : paths_) {
-    if (auto relative = RemovePartitionBaseDir(path)) {
-      auto relative_str = relative->to_string();
-      auto basename_filename = 
fs::internal::GetAbstractPathParent(relative_str);
-      relative_paths.push_back(basename_filename.first);
-    }
-  }
-
-  return options_.partitioning.factory()->Inspect(relative_paths);
-}
-
 Result<std::vector<std::shared_ptr<Schema>>> 
FileSystemDatasetFactory::InspectSchemas(
     InspectOptions options) {
   std::vector<std::shared_ptr<Schema>> schemas;
@@ -215,7 +192,9 @@ Result<std::vector<std::shared_ptr<Schema>>> 
FileSystemDatasetFactory::InspectSc
     schemas.push_back(schema);
   }
 
-  ARROW_ASSIGN_OR_RAISE(auto partition_schema, PartitionSchema());
+  ARROW_ASSIGN_OR_RAISE(auto partition_schema,
+                        options_.partitioning.GetOrInferSchema(
+                            StripPrefixAndFilename(paths_, 
options_.partition_base_dir)));
   schemas.push_back(partition_schema);
 
   return schemas;
@@ -245,13 +224,8 @@ Result<std::shared_ptr<Dataset>> 
FileSystemDatasetFactory::Finish(FinishOptions
 
   std::vector<std::shared_ptr<FileFragment>> fragments;
   for (const auto& path : paths_) {
-    std::shared_ptr<Expression> partition = scalar(true);
-    if (auto relative = RemovePartitionBaseDir(path)) {
-      auto relative_str = relative->to_string();
-      auto basename_filename = 
fs::internal::GetAbstractPathParent(relative_str);
-      ARROW_ASSIGN_OR_RAISE(partition, 
partitioning->Parse(basename_filename.first));
-    }
-
+    auto fixed_path = StripPrefixAndFilename(path, 
options_.partition_base_dir);
+    ARROW_ASSIGN_OR_RAISE(auto partition, partitioning->Parse(fixed_path));
     ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({path, fs_}, 
partition));
     fragments.push_back(fragment);
   }
diff --git a/cpp/src/arrow/dataset/discovery.h 
b/cpp/src/arrow/dataset/discovery.h
index b59c6e5..634073f 100644
--- a/cpp/src/arrow/dataset/discovery.h
+++ b/cpp/src/arrow/dataset/discovery.h
@@ -233,9 +233,6 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public 
DatasetFactory {
   std::shared_ptr<fs::FileSystem> fs_;
   std::shared_ptr<FileFormat> format_;
   FileSystemFactoryOptions options_;
-
- private:
-  util::optional<util::string_view> RemovePartitionBaseDir(util::string_view 
path);
 };
 
 }  // namespace dataset
diff --git a/cpp/src/arrow/dataset/file_parquet.cc 
b/cpp/src/arrow/dataset/file_parquet.cc
index 0abe9d6..f4a3b00 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -500,44 +500,49 @@ Result<FragmentVector> 
ParquetFileFragment::SplitByRowGroup(
 
 ParquetDatasetFactory::ParquetDatasetFactory(
     std::shared_ptr<fs::FileSystem> filesystem, 
std::shared_ptr<ParquetFileFormat> format,
-    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+    std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path,
+    ParquetFactoryOptions options)
     : filesystem_(std::move(filesystem)),
       format_(std::move(format)),
       metadata_(std::move(metadata)),
-      base_path_(std::move(base_path)) {}
+      base_path_(std::move(base_path)),
+      options_(std::move(options)) {}
 
 Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
     const std::string& metadata_path, std::shared_ptr<fs::FileSystem> 
filesystem,
-    std::shared_ptr<ParquetFileFormat> format) {
+    std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions options) {
   // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
   // directory of all parquet files is `dirname(metadata_path)`.
   auto dirname = 
arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
-  return Make({metadata_path, filesystem}, dirname, filesystem, format);
+  return Make({metadata_path, filesystem}, dirname, filesystem, 
std::move(format),
+              std::move(options));
 }
 
 Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
     const FileSource& metadata_source, const std::string& base_path,
-    std::shared_ptr<fs::FileSystem> filesystem,
-    std::shared_ptr<ParquetFileFormat> format) {
+    std::shared_ptr<fs::FileSystem> filesystem, 
std::shared_ptr<ParquetFileFormat> format,
+    ParquetFactoryOptions options) {
   DCHECK_NE(filesystem, nullptr);
   DCHECK_NE(format, nullptr);
 
+  // By automatically setting the options base_dir to the metadata's base_path,
+  // we provide a better experience for user providing Partitioning that are
+  // relative to the base_dir instead of the full path.
+  if (options.partition_base_dir.empty()) {
+    options.partition_base_dir = base_path;
+  }
+
   ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
   auto metadata = reader->parquet_reader()->metadata();
 
-  return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
-      std::move(filesystem), std::move(format), std::move(metadata), 
base_path));
-}
-
-Result<std::vector<std::shared_ptr<Schema>>> 
ParquetDatasetFactory::InspectSchemas(
-    InspectOptions options) {
-  std::shared_ptr<Schema> schema;
-  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), 
&schema));
-  return std::vector<std::shared_ptr<Schema>>{schema};
+  return std::shared_ptr<DatasetFactory>(
+      new ParquetDatasetFactory(std::move(filesystem), std::move(format),
+                                std::move(metadata), base_path, 
std::move(options)));
 }
 
-static Result<std::string> FileFromRowGroup(const std::string& base_path,
-                                            const parquet::RowGroupMetaData& 
row_group) {
+static inline Result<std::string> FileFromRowGroup(
+    fs::FileSystem* filesystem, const std::string& base_path,
+    const parquet::RowGroupMetaData& row_group) {
   try {
     auto n_columns = row_group.num_columns();
     if (n_columns == 0) {
@@ -565,17 +570,43 @@ static Result<std::string> FileFromRowGroup(const 
std::string& base_path,
       }
     }
 
-    return fs::internal::JoinAbstractPath(std::vector<std::string>{base_path, 
path});
+    path = fs::internal::JoinAbstractPath(std::vector<std::string>{base_path, 
path});
+    // Normalizing path is required for Windows.
+    return filesystem->NormalizePath(std::move(path));
   } catch (const ::parquet::ParquetException& e) {
     return Status::Invalid("Extracting file path from RowGroup failed. Parquet 
threw:",
                            e.what());
   }
 }
 
+Result<std::vector<std::string>> ParquetDatasetFactory::CollectPaths(
+    const parquet::FileMetaData& metadata,
+    const parquet::ArrowReaderProperties& properties) {
+  try {
+    std::unordered_set<std::string> unique_paths;
+    ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, 
properties));
+
+    for (int i = 0; i < metadata.num_row_groups(); i++) {
+      auto row_group = metadata.RowGroup(i);
+      ARROW_ASSIGN_OR_RAISE(auto path,
+                            FileFromRowGroup(filesystem_.get(), base_path_, 
*row_group));
+      unique_paths.emplace(std::move(path));
+    }
+
+    std::vector<std::string> paths;
+    for (const auto& path : unique_paths) {
+      paths.emplace_back(path);
+    }
+    return paths;
+  } catch (const ::parquet::ParquetException& e) {
+    return Status::Invalid("Could not infer file paths from FileMetaData:", 
e.what());
+  }
+}
+
 Result<std::vector<std::shared_ptr<FileFragment>>>
 ParquetDatasetFactory::CollectParquetFragments(
     const parquet::FileMetaData& metadata,
-    const parquet::ArrowReaderProperties& properties) {
+    const parquet::ArrowReaderProperties& properties, const Partitioning& 
partitioning) {
   try {
     auto n_columns = metadata.num_columns();
     if (n_columns == 0) {
@@ -584,14 +615,12 @@ ParquetDatasetFactory::CollectParquetFragments(
     }
 
     std::unordered_map<std::string, std::vector<RowGroupInfo>> 
path_to_row_group_infos;
-
     ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, 
properties));
 
     for (int i = 0; i < metadata.num_row_groups(); i++) {
       auto row_group = metadata.RowGroup(i);
-      ARROW_ASSIGN_OR_RAISE(auto path, FileFromRowGroup(base_path_, 
*row_group));
-      // Normalizing path is required for Windows.
-      ARROW_ASSIGN_OR_RAISE(path, filesystem_->NormalizePath(std::move(path)));
+      ARROW_ASSIGN_OR_RAISE(auto path,
+                            FileFromRowGroup(filesystem_.get(), base_path_, 
*row_group));
       auto stats = RowGroupStatisticsAsExpression(*row_group, manifest);
       auto num_rows = row_group->num_rows();
 
@@ -611,9 +640,13 @@ ParquetDatasetFactory::CollectParquetFragments(
     std::vector<std::shared_ptr<FileFragment>> fragments;
     fragments.reserve(path_to_row_group_infos.size());
     for (auto&& elem : path_to_row_group_infos) {
-      ARROW_ASSIGN_OR_RAISE(auto fragment,
-                            format_->MakeFragment({std::move(elem.first), 
filesystem_},
-                                                  scalar(true), 
std::move(elem.second)));
+      const auto& path = elem.first;
+      auto partition =
+          partitioning.Parse(StripPrefixAndFilename(path, 
options_.partition_base_dir))
+              .ValueOr(scalar(true));
+      ARROW_ASSIGN_OR_RAISE(
+          auto fragment, format_->MakeFragment({path, filesystem_}, 
std::move(partition),
+                                               std::move(elem.second)));
       fragments.push_back(std::move(fragment));
     }
 
@@ -623,6 +656,28 @@ ParquetDatasetFactory::CollectParquetFragments(
   }
 }
 
+Result<std::vector<std::shared_ptr<Schema>>> 
ParquetDatasetFactory::InspectSchemas(
+    InspectOptions options) {
+  std::vector<std::shared_ptr<Schema>> schemas;
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), 
&schema));
+  schemas.push_back(std::move(schema));
+
+  if (options_.partitioning.factory() != nullptr) {
+    // Gather paths found in RowGroups' ColumnChunks.
+    auto properties = MakeArrowReaderProperties(*format_, *metadata_);
+    ARROW_ASSIGN_OR_RAISE(auto paths, CollectPaths(*metadata_, properties));
+
+    ARROW_ASSIGN_OR_RAISE(auto partition_schema,
+                          
options_.partitioning.GetOrInferSchema(StripPrefixAndFilename(
+                              paths, options_.partition_base_dir)));
+    schemas.push_back(std::move(partition_schema));
+  }
+
+  return schemas;
+}
+
 Result<std::shared_ptr<Dataset>> ParquetDatasetFactory::Finish(FinishOptions 
options) {
   std::shared_ptr<Schema> schema = options.schema;
   bool schema_missing = schema == nullptr;
@@ -630,8 +685,15 @@ Result<std::shared_ptr<Dataset>> 
ParquetDatasetFactory::Finish(FinishOptions opt
     ARROW_ASSIGN_OR_RAISE(schema, Inspect(options.inspect_options));
   }
 
+  std::shared_ptr<Partitioning> partitioning = 
options_.partitioning.partitioning();
+  if (partitioning == nullptr) {
+    auto factory = options_.partitioning.factory();
+    ARROW_ASSIGN_OR_RAISE(partitioning, factory->Finish(schema));
+  }
+
   auto properties = MakeArrowReaderProperties(*format_, *metadata_);
-  ARROW_ASSIGN_OR_RAISE(auto fragments, CollectParquetFragments(*metadata_, 
properties));
+  ARROW_ASSIGN_OR_RAISE(auto fragments,
+                        CollectParquetFragments(*metadata_, properties, 
*partitioning));
   return FileSystemDataset::Make(std::move(schema), scalar(true), format_,
                                  std::move(fragments));
 }
diff --git a/cpp/src/arrow/dataset/file_parquet.h 
b/cpp/src/arrow/dataset/file_parquet.h
index 6ac8a1b..f5fba7e 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -215,6 +215,34 @@ class ARROW_DS_EXPORT ParquetFileFragment : public 
FileFragment {
   friend class ParquetFileFormat;
 };
 
+struct ParquetFactoryOptions {
+  // Either an explicit Partitioning or a PartitioningFactory to discover one.
+  //
+  // If a factory is provided, it will be used to infer a schema for partition 
fields
+  // based on file and directory paths then construct a Partitioning. The 
default
+  // is a Partitioning which will yield no partition information.
+  //
+  // The (explicit or discovered) partitioning will be applied to discovered 
files
+  // and the resulting partition information embedded in the Dataset.
+  PartitioningOrFactory partitioning{Partitioning::Default()};
+
+  // For the purposes of applying the partitioning, paths will be stripped
+  // of the partition_base_dir. Files not matching the partition_base_dir
+  // prefix will be skipped for partition discovery. The ignored files will 
still
+  // be part of the Dataset, but will not have partition information.
+  //
+  // Example:
+  // partition_base_dir = "/dataset";
+  //
+  // - "/dataset/US/sales.csv" -> "US/sales.csv" will be given to the 
partitioning
+  //
+  // - "/home/john/late_sales.csv" -> Will be ignored for partition discovery.
+  //
+  // This is useful for partitioning which parses directory when ordering
+  // is important, e.g. DirectoryPartitioning.
+  std::string partition_base_dir;
+};
+
 /// \brief Create FileSystemDataset from custom `_metadata` cache file.
 ///
 /// Dask and other systems will generate a cache metadata file by concatenating
@@ -234,9 +262,10 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public 
DatasetFactory {
   /// \param[in] metadata_path path of the metadata parquet file
   /// \param[in] filesystem from which to open/read the path
   /// \param[in] format to read the file with.
+  /// \param[in] options see ParquetFactoryOptions
   static Result<std::shared_ptr<DatasetFactory>> Make(
       const std::string& metadata_path, std::shared_ptr<fs::FileSystem> 
filesystem,
-      std::shared_ptr<ParquetFileFormat> format);
+      std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions 
options);
 
   /// \brief Create a ParquetDatasetFactory from a metadata source.
   ///
@@ -248,10 +277,11 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public 
DatasetFactory {
   /// \param[in] base_path used as the prefix of every parquet files referenced
   /// \param[in] filesystem from which to read the files referenced.
   /// \param[in] format to read the file with.
+  /// \param[in] options see ParquetFactoryOptions
   static Result<std::shared_ptr<DatasetFactory>> Make(
       const FileSource& metadata, const std::string& base_path,
       std::shared_ptr<fs::FileSystem> filesystem,
-      std::shared_ptr<ParquetFileFormat> format);
+      std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions 
options);
 
   Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
       InspectOptions options) override;
@@ -262,17 +292,25 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public 
DatasetFactory {
   ParquetDatasetFactory(std::shared_ptr<fs::FileSystem> fs,
                         std::shared_ptr<ParquetFileFormat> format,
                         std::shared_ptr<parquet::FileMetaData> metadata,
-                        std::string base_path);
+                        std::string base_path, ParquetFactoryOptions options);
 
   std::shared_ptr<fs::FileSystem> filesystem_;
   std::shared_ptr<ParquetFileFormat> format_;
   std::shared_ptr<parquet::FileMetaData> metadata_;
   std::string base_path_;
+  ParquetFactoryOptions options_;
+  FragmentVector fragments_;
 
  private:
-  Result<std::vector<std::shared_ptr<FileFragment>>> CollectParquetFragments(
+  Result<std::vector<std::string>> CollectPaths(
       const parquet::FileMetaData& metadata,
       const parquet::ArrowReaderProperties& properties);
+
+  Result<std::vector<std::shared_ptr<FileFragment>>> CollectParquetFragments(
+      const parquet::FileMetaData& metadata,
+      const parquet::ArrowReaderProperties& properties, const Partitioning& 
partitioning);
+
+  Result<std::shared_ptr<Schema>> PartitionSchema();
 };
 
 }  // namespace dataset
diff --git a/cpp/src/arrow/dataset/partition.cc 
b/cpp/src/arrow/dataset/partition.cc
index 2e44075..f2d8488 100644
--- a/cpp/src/arrow/dataset/partition.cc
+++ b/cpp/src/arrow/dataset/partition.cc
@@ -598,5 +598,30 @@ std::shared_ptr<PartitioningFactory> 
HivePartitioning::MakeFactory() {
   return std::shared_ptr<PartitioningFactory>(new HivePartitioningFactory());
 }
 
+std::string StripPrefixAndFilename(const std::string& path, const std::string& 
prefix) {
+  auto maybe_base_less = fs::internal::RemoveAncestor(prefix, path);
+  auto base_less = maybe_base_less ? maybe_base_less->to_string() : path;
+  auto basename_filename = fs::internal::GetAbstractPathParent(base_less);
+  return basename_filename.first;
+}
+
+std::vector<std::string> StripPrefixAndFilename(const 
std::vector<std::string>& paths,
+                                                const std::string& prefix) {
+  std::vector<std::string> result;
+  for (const auto& path : paths) {
+    result.emplace_back(StripPrefixAndFilename(path, prefix));
+  }
+  return result;
+}
+
+Result<std::shared_ptr<Schema>> PartitioningOrFactory::GetOrInferSchema(
+    const std::vector<std::string>& paths) {
+  if (auto part = partitioning()) {
+    return part->schema();
+  }
+
+  return factory()->Inspect(paths);
+}
+
 }  // namespace dataset
 }  // namespace arrow
diff --git a/cpp/src/arrow/dataset/partition.h 
b/cpp/src/arrow/dataset/partition.h
index 560669a..56a8db9 100644
--- a/cpp/src/arrow/dataset/partition.h
+++ b/cpp/src/arrow/dataset/partition.h
@@ -254,7 +254,15 @@ class ARROW_DS_EXPORT FunctionPartitioning : public 
Partitioning {
   std::string name_;
 };
 
-// TODO(bkietz) use RE2 and named groups to provide RegexpPartitioning
+/// \brief Remove a prefix and the filename of a path.
+///
+/// e.g., `StripPrefixAndFilename("/data/year=2019/c.txt", "/data") -> 
"year=2019"`
+ARROW_DS_EXPORT std::string StripPrefixAndFilename(const std::string& path,
+                                                   const std::string& prefix);
+
+/// \brief Vector version of StripPrefixAndFilename.
+ARROW_DS_EXPORT std::vector<std::string> StripPrefixAndFilename(
+    const std::vector<std::string>& paths, const std::string& prefix);
 
 /// \brief Either a Partitioning or a PartitioningFactory
 class ARROW_DS_EXPORT PartitioningOrFactory {
@@ -289,6 +297,8 @@ class ARROW_DS_EXPORT PartitioningOrFactory {
     return NULLPTR;
   }
 
+  Result<std::shared_ptr<Schema>> GetOrInferSchema(const 
std::vector<std::string>& paths);
+
  private:
   util::variant<std::shared_ptr<PartitioningFactory>, 
std::shared_ptr<Partitioning>>
       variant_;
diff --git a/cpp/src/arrow/dataset/partition_test.cc 
b/cpp/src/arrow/dataset/partition_test.cc
index ceb6a11..fad81cc 100644
--- a/cpp/src/arrow/dataset/partition_test.cc
+++ b/cpp/src/arrow/dataset/partition_test.cc
@@ -15,6 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "arrow/dataset/partition.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
 #include <cstdint>
 #include <map>
 #include <memory>
@@ -22,11 +27,7 @@
 #include <string>
 #include <vector>
 
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
 #include "arrow/dataset/file_base.h"
-#include "arrow/dataset/partition.h"
 #include "arrow/dataset/test_util.h"
 #include "arrow/filesystem/localfs.h"
 #include "arrow/filesystem/path_util.h"
@@ -471,5 +472,21 @@ TEST_F(TestPartitioningWritePlan, Errors) {
                          "b"_ == "forever alone"));
 }
 
+TEST(TestStripPrefixAndFilename, Basic) {
+  ASSERT_EQ(StripPrefixAndFilename("", ""), "");
+  ASSERT_EQ(StripPrefixAndFilename("a.csv", ""), "");
+  ASSERT_EQ(StripPrefixAndFilename("a/b.csv", ""), "a");
+  ASSERT_EQ(StripPrefixAndFilename("/a/b/c.csv", "/a"), "b");
+  ASSERT_EQ(StripPrefixAndFilename("/a/b/c/d.csv", "/a"), "b/c");
+  ASSERT_EQ(StripPrefixAndFilename("/a/b/c.csv", "/a/b"), "");
+
+  std::vector<std::string> input{"/data/year=2019/file.parquet",
+                                 "/data/year=2019/month=12/file.parquet",
+                                 
"/data/year=2019/month=12/day=01/file.parquet"};
+  EXPECT_THAT(StripPrefixAndFilename(input, "/data"),
+              testing::ElementsAre("year=2019", "year=2019/month=12",
+                                   "year=2019/month=12/day=01"));
+}  // namespace dataset
+
 }  // namespace dataset
 }  // namespace arrow
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index ae8f19b..39e66b1 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -1378,6 +1378,9 @@ cdef class FileSystemFactoryOptions:
         partition_base_dir prefix will be skipped for partitioning discovery.
         The ignored files will still be part of the Dataset, but will not
         have partition information.
+    partitioning: Partitioning/PartitioningFactory, optional
+       Apply the Partitioning to every discovered Fragment. See Partitioning or
+       PartitioningFactory documentation.
     exclude_invalid_files : bool, optional (default True)
         If True, invalid files will be excluded (file format specific check).
         This will incur IO for each files in a serial and single threaded
@@ -1567,6 +1570,82 @@ cdef class UnionDatasetFactory(DatasetFactory):
         self.union_factory = <CUnionDatasetFactory*> sp.get()
 
 
+cdef class ParquetFactoryOptions:
+    """
+    Influences the discovery of parquet dataset.
+
+    Parameters
+    ----------
+    partition_base_dir : str, optional
+        For the purposes of applying the partitioning, paths will be
+        stripped of the partition_base_dir. Files not matching the
+        partition_base_dir prefix will be skipped for partitioning discovery.
+        The ignored files will still be part of the Dataset, but will not
+        have partition information.
+    partitioning : Partitioning, PartitioningFactory, optional
+        The partitioning scheme applied to fragments, see ``Partitioning``.
+    """
+
+    cdef:
+        CParquetFactoryOptions options
+
+    __slots__ = ()  # avoid mistakingly creating attributes
+
+    def __init__(self, partition_base_dir=None, partitioning=None):
+        if isinstance(partitioning, PartitioningFactory):
+            self.partitioning_factory = partitioning
+        elif isinstance(partitioning, Partitioning):
+            self.partitioning = partitioning
+
+        if partition_base_dir is not None:
+            self.partition_base_dir = partition_base_dir
+
+    cdef inline CParquetFactoryOptions unwrap(self):
+        return self.options
+
+    @property
+    def partitioning(self):
+        """Partitioning to apply to discovered files.
+
+        NOTE: setting this property will overwrite partitioning_factory.
+        """
+        c_partitioning = self.options.partitioning.partitioning()
+        if c_partitioning.get() == nullptr:
+            return None
+        return Partitioning.wrap(c_partitioning)
+
+    @partitioning.setter
+    def partitioning(self, Partitioning value):
+        self.options.partitioning = (<Partitioning> value).unwrap()
+
+    @property
+    def partitioning_factory(self):
+        """PartitioningFactory to apply to discovered files and
+        discover a Partitioning.
+
+        NOTE: setting this property will overwrite partitioning.
+        """
+        c_factory = self.options.partitioning.factory()
+        if c_factory.get() == nullptr:
+            return None
+        return PartitioningFactory.wrap(c_factory)
+
+    @partitioning_factory.setter
+    def partitioning_factory(self, PartitioningFactory value):
+        self.options.partitioning = (<PartitioningFactory> value).unwrap()
+
+    @property
+    def partition_base_dir(self):
+        """
+        Base directory to strip paths before applying the partitioning.
+        """
+        return frombytes(self.options.partition_base_dir)
+
+    @partition_base_dir.setter
+    def partition_base_dir(self, value):
+        self.options.partition_base_dir = tobytes(value)
+
+
 cdef class ParquetDatasetFactory(DatasetFactory):
     """
     Create a ParquetDatasetFactory from a Parquet `_metadata` file.
@@ -1581,26 +1660,32 @@ cdef class ParquetDatasetFactory(DatasetFactory):
         files.
     format : ParquetFileFormat
         Parquet format options.
+    options : ParquetFactoryOptions, optional
+        Various flags influencing the discovery of filesystem paths.
     """
 
     cdef:
         CParquetDatasetFactory* parquet_factory
 
     def __init__(self, metadata_path, FileSystem filesystem not None,
-                 FileFormat format not None):
+                 FileFormat format not None,
+                 ParquetFactoryOptions options=None):
         cdef:
             c_string path
             shared_ptr[CFileSystem] c_filesystem
             shared_ptr[CParquetFileFormat] c_format
             CResult[shared_ptr[CDatasetFactory]] result
+            CParquetFactoryOptions c_options
 
         c_path = tobytes(metadata_path)
         c_filesystem = filesystem.unwrap()
         c_format = static_pointer_cast[CParquetFileFormat, CFileFormat](
             format.unwrap())
+        options = options or ParquetFactoryOptions()
+        c_options = options.unwrap()
 
         result = CParquetDatasetFactory.MakeFromMetaDataPath(
-            c_path, c_filesystem, c_format)
+            c_path, c_filesystem, c_format, c_options)
         self.init(GetResultValue(result))
 
     cdef init(self, shared_ptr[CDatasetFactory]& sp):
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 8c58250..405caf6 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -36,6 +36,7 @@ from pyarrow._dataset import (  # noqa
     HivePartitioning,
     IpcFileFormat,
     ParquetDatasetFactory,
+    ParquetFactoryOptions,
     ParquetFileFormat,
     ParquetFileFragment,
     ParquetReadOptions,
@@ -445,7 +446,8 @@ def _union_dataset(children, schema=None, **kwargs):
     return UnionDataset(schema, children)
 
 
-def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None):
+def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None,
+                    partitioning=None, partition_base_dir=None):
     """
     Create a FileSystemDataset from a `_metadata` file created via
     `pyarrrow.parquet.write_metadata`.
@@ -468,6 +470,16 @@ def parquet_dataset(metadata_path, schema=None, 
filesystem=None, format=None):
     format : ParquetFileFormat
         An instance of a ParquetFileFormat if special options needs to be
         passed.
+    partitioning : Partitioning, PartitioningFactory, str, list of str
+        The partitioning scheme specified with the ``partitioning()``
+        function. A flavor string can be used as shortcut, and with a list of
+        field names a DirectionaryPartitioning will be inferred.
+    partition_base_dir : str, optional
+        For the purposes of applying the partitioning, paths will be
+        stripped of the partition_base_dir. Files not matching the
+        partition_base_dir prefix will be skipped for partitioning discovery.
+        The ignored files will still be part of the Dataset, but will not
+        have partition information.
 
     Returns
     -------
@@ -486,8 +498,13 @@ def parquet_dataset(metadata_path, schema=None, 
filesystem=None, format=None):
         filesystem, _ = _ensure_filesystem(filesystem)
 
     metadata_path = _normalize_path(filesystem, _stringify_path(metadata_path))
+    options = ParquetFactoryOptions(
+        partition_base_dir=partition_base_dir,
+        partitioning=_ensure_partitioning(partitioning)
+    )
 
-    factory = ParquetDatasetFactory(metadata_path, filesystem, format)
+    factory = ParquetDatasetFactory(
+        metadata_path, filesystem, format, options=options)
     return factory.finish(schema)
 
 
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd 
b/python/pyarrow/includes/libarrow_dataset.pxd
index e84ff78..f80fdb9 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -326,13 +326,19 @@ cdef extern from "arrow/dataset/api.h" namespace 
"arrow::dataset" nogil:
             CFileSystemFactoryOptions options
         )
 
+    cdef cppclass CParquetFactoryOptions \
+            "arrow::dataset::ParquetFactoryOptions":
+        CPartitioningOrFactory partitioning
+        c_string partition_base_dir
+
     cdef cppclass CParquetDatasetFactory \
             "arrow::dataset::ParquetDatasetFactory"(CDatasetFactory):
         @staticmethod
         CResult[shared_ptr[CDatasetFactory]] MakeFromMetaDataPath "Make"(
             const c_string& metadata_path,
             shared_ptr[CFileSystem] filesystem,
-            shared_ptr[CParquetFileFormat] format
+            shared_ptr[CParquetFileFormat] format,
+            CParquetFactoryOptions options
         )
 
         @staticmethod
@@ -340,5 +346,6 @@ cdef extern from "arrow/dataset/api.h" namespace 
"arrow::dataset" nogil:
             const CFileSource& metadata_path,
             const c_string& base_path,
             shared_ptr[CFileSystem] filesystem,
-            shared_ptr[CParquetFileFormat] format
+            shared_ptr[CParquetFileFormat] format,
+            CParquetFactoryOptions options
         )
diff --git a/python/pyarrow/tests/test_dataset.py 
b/python/pyarrow/tests/test_dataset.py
index 4150c1b..9fbde57 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -1560,9 +1560,10 @@ def _create_metadata_file(root_path):
 def _create_parquet_dataset_partitioned(root_path):
     import pyarrow.parquet as pq
 
-    table = pa.table({
-        'f1': range(20), 'f2': np.random.randn(20),
-        'part': np.repeat(['a', 'b'], 10)}
+    table = pa.table([
+        pa.array(range(20)), pa.array(np.random.randn(20)),
+        pa.array(np.repeat(['a', 'b'], 10))],
+        names=["f1", "f2", "part"]
     )
     pq.write_to_dataset(table, str(root_path), partition_cols=['part'])
     return _create_metadata_file(root_path), table
@@ -1571,19 +1572,18 @@ def _create_parquet_dataset_partitioned(root_path):
 @pytest.mark.parquet
 @pytest.mark.pandas
 def test_parquet_dataset_factory_partitioned(tempdir):
-    # TODO support for specifying partitioning scheme
-
     root_path = tempdir / "test_parquet_dataset_factory_partitioned"
     metadata_path, table = _create_parquet_dataset_partitioned(root_path)
 
-    dataset = ds.parquet_dataset(metadata_path)
-    # TODO partition column not yet included
-    # assert dataset.schema.equals(table.schema)
+    partitioning = ds.partitioning(flavor="hive")
+    dataset = ds.parquet_dataset(metadata_path, partitioning=partitioning)
+
+    assert dataset.schema.equals(table.schema)
     assert len(dataset.files) == 2
     result = dataset.to_table()
     assert result.num_rows == 20
 
     # the partitioned dataset does not preserve order
     result = result.to_pandas().sort_values("f1").reset_index(drop=True)
-    expected = table.to_pandas().drop(columns=["part"])
+    expected = table.to_pandas()
     pd.testing.assert_frame_equal(result, expected)

Reply via email to