This is an automated email from the ASF dual-hosted git repository.
fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new bfdfeda ARROW-8943: [C++][Python][Dataset] Add partitioning support
to ParquetDatasetFactory
bfdfeda is described below
commit bfdfeda43ae09b68af058ca4ac274e5619298b44
Author: François Saint-Jacques <[email protected]>
AuthorDate: Thu Jun 18 11:24:57 2020 -0400
ARROW-8943: [C++][Python][Dataset] Add partitioning support to
ParquetDatasetFactory
Closes #7437 from fsaintjacques/ARROW-8943-parquet-dataset-partition
Authored-by: François Saint-Jacques <[email protected]>
Signed-off-by: François Saint-Jacques <[email protected]>
---
cpp/examples/arrow/dataset-parquet-scan-example.cc | 4 +-
cpp/src/arrow/dataset/discovery.cc | 36 +------
cpp/src/arrow/dataset/discovery.h | 3 -
cpp/src/arrow/dataset/file_parquet.cc | 116 ++++++++++++++++-----
cpp/src/arrow/dataset/file_parquet.h | 46 +++++++-
cpp/src/arrow/dataset/partition.cc | 25 +++++
cpp/src/arrow/dataset/partition.h | 12 ++-
cpp/src/arrow/dataset/partition_test.cc | 25 ++++-
python/pyarrow/_dataset.pyx | 89 +++++++++++++++-
python/pyarrow/dataset.py | 21 +++-
python/pyarrow/includes/libarrow_dataset.pxd | 11 +-
python/pyarrow/tests/test_dataset.py | 18 ++--
12 files changed, 320 insertions(+), 86 deletions(-)
diff --git a/cpp/examples/arrow/dataset-parquet-scan-example.cc
b/cpp/examples/arrow/dataset-parquet-scan-example.cc
index ed4b89d..3cdd298 100644
--- a/cpp/examples/arrow/dataset-parquet-scan-example.cc
+++ b/cpp/examples/arrow/dataset-parquet-scan-example.cc
@@ -100,7 +100,9 @@ std::shared_ptr<ds::Dataset> GetDatasetFromDirectory(
std::shared_ptr<ds::Dataset> GetParquetDatasetFromMetadata(
std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat>
format,
std::string metadata_path) {
- auto factory = ds::ParquetDatasetFactory::Make(metadata_path, fs,
format).ValueOrDie();
+ ds::ParquetFactoryOptions options;
+ auto factory =
+ ds::ParquetDatasetFactory::Make(metadata_path, fs, format,
options).ValueOrDie();
return factory->Finish().ValueOrDie();
}
diff --git a/cpp/src/arrow/dataset/discovery.cc
b/cpp/src/arrow/dataset/discovery.cc
index 295f07d..84e2502 100644
--- a/cpp/src/arrow/dataset/discovery.cc
+++ b/cpp/src/arrow/dataset/discovery.cc
@@ -110,12 +110,6 @@ FileSystemDatasetFactory::FileSystemDatasetFactory(
format_(std::move(format)),
options_(std::move(options)) {}
-util::optional<util::string_view>
FileSystemDatasetFactory::RemovePartitionBaseDir(
- util::string_view path) {
- const util::string_view partition_base_dir{options_.partition_base_dir};
- return fs::internal::RemoveAncestor(partition_base_dir, path);
-}
-
Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
std::shared_ptr<fs::FileSystem> filesystem, const
std::vector<std::string>& paths,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options) {
@@ -186,23 +180,6 @@ Result<std::shared_ptr<DatasetFactory>>
FileSystemDatasetFactory::Make(
std::move(options));
}
-Result<std::shared_ptr<Schema>> FileSystemDatasetFactory::PartitionSchema() {
- if (auto partitioning = options_.partitioning.partitioning()) {
- return partitioning->schema();
- }
-
- std::vector<std::string> relative_paths;
- for (const auto& path : paths_) {
- if (auto relative = RemovePartitionBaseDir(path)) {
- auto relative_str = relative->to_string();
- auto basename_filename =
fs::internal::GetAbstractPathParent(relative_str);
- relative_paths.push_back(basename_filename.first);
- }
- }
-
- return options_.partitioning.factory()->Inspect(relative_paths);
-}
-
Result<std::vector<std::shared_ptr<Schema>>>
FileSystemDatasetFactory::InspectSchemas(
InspectOptions options) {
std::vector<std::shared_ptr<Schema>> schemas;
@@ -215,7 +192,9 @@ Result<std::vector<std::shared_ptr<Schema>>>
FileSystemDatasetFactory::InspectSc
schemas.push_back(schema);
}
- ARROW_ASSIGN_OR_RAISE(auto partition_schema, PartitionSchema());
+ ARROW_ASSIGN_OR_RAISE(auto partition_schema,
+ options_.partitioning.GetOrInferSchema(
+ StripPrefixAndFilename(paths_,
options_.partition_base_dir)));
schemas.push_back(partition_schema);
return schemas;
@@ -245,13 +224,8 @@ Result<std::shared_ptr<Dataset>>
FileSystemDatasetFactory::Finish(FinishOptions
std::vector<std::shared_ptr<FileFragment>> fragments;
for (const auto& path : paths_) {
- std::shared_ptr<Expression> partition = scalar(true);
- if (auto relative = RemovePartitionBaseDir(path)) {
- auto relative_str = relative->to_string();
- auto basename_filename =
fs::internal::GetAbstractPathParent(relative_str);
- ARROW_ASSIGN_OR_RAISE(partition,
partitioning->Parse(basename_filename.first));
- }
-
+ auto fixed_path = StripPrefixAndFilename(path,
options_.partition_base_dir);
+ ARROW_ASSIGN_OR_RAISE(auto partition, partitioning->Parse(fixed_path));
ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({path, fs_},
partition));
fragments.push_back(fragment);
}
diff --git a/cpp/src/arrow/dataset/discovery.h
b/cpp/src/arrow/dataset/discovery.h
index b59c6e5..634073f 100644
--- a/cpp/src/arrow/dataset/discovery.h
+++ b/cpp/src/arrow/dataset/discovery.h
@@ -233,9 +233,6 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public
DatasetFactory {
std::shared_ptr<fs::FileSystem> fs_;
std::shared_ptr<FileFormat> format_;
FileSystemFactoryOptions options_;
-
- private:
- util::optional<util::string_view> RemovePartitionBaseDir(util::string_view
path);
};
} // namespace dataset
diff --git a/cpp/src/arrow/dataset/file_parquet.cc
b/cpp/src/arrow/dataset/file_parquet.cc
index 0abe9d6..f4a3b00 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -500,44 +500,49 @@ Result<FragmentVector>
ParquetFileFragment::SplitByRowGroup(
ParquetDatasetFactory::ParquetDatasetFactory(
std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<ParquetFileFormat> format,
- std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path)
+ std::shared_ptr<parquet::FileMetaData> metadata, std::string base_path,
+ ParquetFactoryOptions options)
: filesystem_(std::move(filesystem)),
format_(std::move(format)),
metadata_(std::move(metadata)),
- base_path_(std::move(base_path)) {}
+ base_path_(std::move(base_path)),
+ options_(std::move(options)) {}
Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
const std::string& metadata_path, std::shared_ptr<fs::FileSystem>
filesystem,
- std::shared_ptr<ParquetFileFormat> format) {
+ std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions options) {
// Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base
// directory of all parquet files is `dirname(metadata_path)`.
auto dirname =
arrow::fs::internal::GetAbstractPathParent(metadata_path).first;
- return Make({metadata_path, filesystem}, dirname, filesystem, format);
+ return Make({metadata_path, filesystem}, dirname, filesystem,
std::move(format),
+ std::move(options));
}
Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
const FileSource& metadata_source, const std::string& base_path,
- std::shared_ptr<fs::FileSystem> filesystem,
- std::shared_ptr<ParquetFileFormat> format) {
+ std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<ParquetFileFormat> format,
+ ParquetFactoryOptions options) {
DCHECK_NE(filesystem, nullptr);
DCHECK_NE(format, nullptr);
+ // By automatically setting the options base_dir to the metadata's base_path,
+ // we provide a better experience for user providing Partitioning that are
+ // relative to the base_dir instead of the full path.
+ if (options.partition_base_dir.empty()) {
+ options.partition_base_dir = base_path;
+ }
+
ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
auto metadata = reader->parquet_reader()->metadata();
- return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
- std::move(filesystem), std::move(format), std::move(metadata),
base_path));
-}
-
-Result<std::vector<std::shared_ptr<Schema>>>
ParquetDatasetFactory::InspectSchemas(
- InspectOptions options) {
- std::shared_ptr<Schema> schema;
- RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(),
&schema));
- return std::vector<std::shared_ptr<Schema>>{schema};
+ return std::shared_ptr<DatasetFactory>(
+ new ParquetDatasetFactory(std::move(filesystem), std::move(format),
+ std::move(metadata), base_path,
std::move(options)));
}
-static Result<std::string> FileFromRowGroup(const std::string& base_path,
- const parquet::RowGroupMetaData&
row_group) {
+static inline Result<std::string> FileFromRowGroup(
+ fs::FileSystem* filesystem, const std::string& base_path,
+ const parquet::RowGroupMetaData& row_group) {
try {
auto n_columns = row_group.num_columns();
if (n_columns == 0) {
@@ -565,17 +570,43 @@ static Result<std::string> FileFromRowGroup(const
std::string& base_path,
}
}
- return fs::internal::JoinAbstractPath(std::vector<std::string>{base_path,
path});
+ path = fs::internal::JoinAbstractPath(std::vector<std::string>{base_path,
path});
+ // Normalizing path is required for Windows.
+ return filesystem->NormalizePath(std::move(path));
} catch (const ::parquet::ParquetException& e) {
return Status::Invalid("Extracting file path from RowGroup failed. Parquet
threw:",
e.what());
}
}
+Result<std::vector<std::string>> ParquetDatasetFactory::CollectPaths(
+ const parquet::FileMetaData& metadata,
+ const parquet::ArrowReaderProperties& properties) {
+ try {
+ std::unordered_set<std::string> unique_paths;
+ ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata,
properties));
+
+ for (int i = 0; i < metadata.num_row_groups(); i++) {
+ auto row_group = metadata.RowGroup(i);
+ ARROW_ASSIGN_OR_RAISE(auto path,
+ FileFromRowGroup(filesystem_.get(), base_path_,
*row_group));
+ unique_paths.emplace(std::move(path));
+ }
+
+ std::vector<std::string> paths;
+ for (const auto& path : unique_paths) {
+ paths.emplace_back(path);
+ }
+ return paths;
+ } catch (const ::parquet::ParquetException& e) {
+ return Status::Invalid("Could not infer file paths from FileMetaData:",
e.what());
+ }
+}
+
Result<std::vector<std::shared_ptr<FileFragment>>>
ParquetDatasetFactory::CollectParquetFragments(
const parquet::FileMetaData& metadata,
- const parquet::ArrowReaderProperties& properties) {
+ const parquet::ArrowReaderProperties& properties, const Partitioning&
partitioning) {
try {
auto n_columns = metadata.num_columns();
if (n_columns == 0) {
@@ -584,14 +615,12 @@ ParquetDatasetFactory::CollectParquetFragments(
}
std::unordered_map<std::string, std::vector<RowGroupInfo>>
path_to_row_group_infos;
-
ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata,
properties));
for (int i = 0; i < metadata.num_row_groups(); i++) {
auto row_group = metadata.RowGroup(i);
- ARROW_ASSIGN_OR_RAISE(auto path, FileFromRowGroup(base_path_,
*row_group));
- // Normalizing path is required for Windows.
- ARROW_ASSIGN_OR_RAISE(path, filesystem_->NormalizePath(std::move(path)));
+ ARROW_ASSIGN_OR_RAISE(auto path,
+ FileFromRowGroup(filesystem_.get(), base_path_,
*row_group));
auto stats = RowGroupStatisticsAsExpression(*row_group, manifest);
auto num_rows = row_group->num_rows();
@@ -611,9 +640,13 @@ ParquetDatasetFactory::CollectParquetFragments(
std::vector<std::shared_ptr<FileFragment>> fragments;
fragments.reserve(path_to_row_group_infos.size());
for (auto&& elem : path_to_row_group_infos) {
- ARROW_ASSIGN_OR_RAISE(auto fragment,
- format_->MakeFragment({std::move(elem.first),
filesystem_},
- scalar(true),
std::move(elem.second)));
+ const auto& path = elem.first;
+ auto partition =
+ partitioning.Parse(StripPrefixAndFilename(path,
options_.partition_base_dir))
+ .ValueOr(scalar(true));
+ ARROW_ASSIGN_OR_RAISE(
+ auto fragment, format_->MakeFragment({path, filesystem_},
std::move(partition),
+ std::move(elem.second)));
fragments.push_back(std::move(fragment));
}
@@ -623,6 +656,28 @@ ParquetDatasetFactory::CollectParquetFragments(
}
}
+Result<std::vector<std::shared_ptr<Schema>>>
ParquetDatasetFactory::InspectSchemas(
+ InspectOptions options) {
+ std::vector<std::shared_ptr<Schema>> schemas;
+
+ std::shared_ptr<Schema> schema;
+ RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(),
&schema));
+ schemas.push_back(std::move(schema));
+
+ if (options_.partitioning.factory() != nullptr) {
+ // Gather paths found in RowGroups' ColumnChunks.
+ auto properties = MakeArrowReaderProperties(*format_, *metadata_);
+ ARROW_ASSIGN_OR_RAISE(auto paths, CollectPaths(*metadata_, properties));
+
+ ARROW_ASSIGN_OR_RAISE(auto partition_schema,
+
options_.partitioning.GetOrInferSchema(StripPrefixAndFilename(
+ paths, options_.partition_base_dir)));
+ schemas.push_back(std::move(partition_schema));
+ }
+
+ return schemas;
+}
+
Result<std::shared_ptr<Dataset>> ParquetDatasetFactory::Finish(FinishOptions
options) {
std::shared_ptr<Schema> schema = options.schema;
bool schema_missing = schema == nullptr;
@@ -630,8 +685,15 @@ Result<std::shared_ptr<Dataset>>
ParquetDatasetFactory::Finish(FinishOptions opt
ARROW_ASSIGN_OR_RAISE(schema, Inspect(options.inspect_options));
}
+ std::shared_ptr<Partitioning> partitioning =
options_.partitioning.partitioning();
+ if (partitioning == nullptr) {
+ auto factory = options_.partitioning.factory();
+ ARROW_ASSIGN_OR_RAISE(partitioning, factory->Finish(schema));
+ }
+
auto properties = MakeArrowReaderProperties(*format_, *metadata_);
- ARROW_ASSIGN_OR_RAISE(auto fragments, CollectParquetFragments(*metadata_,
properties));
+ ARROW_ASSIGN_OR_RAISE(auto fragments,
+ CollectParquetFragments(*metadata_, properties,
*partitioning));
return FileSystemDataset::Make(std::move(schema), scalar(true), format_,
std::move(fragments));
}
diff --git a/cpp/src/arrow/dataset/file_parquet.h
b/cpp/src/arrow/dataset/file_parquet.h
index 6ac8a1b..f5fba7e 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -215,6 +215,34 @@ class ARROW_DS_EXPORT ParquetFileFragment : public
FileFragment {
friend class ParquetFileFormat;
};
+struct ParquetFactoryOptions {
+ // Either an explicit Partitioning or a PartitioningFactory to discover one.
+ //
+ // If a factory is provided, it will be used to infer a schema for partition
fields
+ // based on file and directory paths then construct a Partitioning. The
default
+ // is a Partitioning which will yield no partition information.
+ //
+ // The (explicit or discovered) partitioning will be applied to discovered
files
+ // and the resulting partition information embedded in the Dataset.
+ PartitioningOrFactory partitioning{Partitioning::Default()};
+
+ // For the purposes of applying the partitioning, paths will be stripped
+ // of the partition_base_dir. Files not matching the partition_base_dir
+ // prefix will be skipped for partition discovery. The ignored files will
still
+ // be part of the Dataset, but will not have partition information.
+ //
+ // Example:
+ // partition_base_dir = "/dataset";
+ //
+ // - "/dataset/US/sales.csv" -> "US/sales.csv" will be given to the
partitioning
+ //
+ // - "/home/john/late_sales.csv" -> Will be ignored for partition discovery.
+ //
+ // This is useful for partitioning which parses directory when ordering
+ // is important, e.g. DirectoryPartitioning.
+ std::string partition_base_dir;
+};
+
/// \brief Create FileSystemDataset from custom `_metadata` cache file.
///
/// Dask and other systems will generate a cache metadata file by concatenating
@@ -234,9 +262,10 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public
DatasetFactory {
/// \param[in] metadata_path path of the metadata parquet file
/// \param[in] filesystem from which to open/read the path
/// \param[in] format to read the file with.
+ /// \param[in] options see ParquetFactoryOptions
static Result<std::shared_ptr<DatasetFactory>> Make(
const std::string& metadata_path, std::shared_ptr<fs::FileSystem>
filesystem,
- std::shared_ptr<ParquetFileFormat> format);
+ std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions
options);
/// \brief Create a ParquetDatasetFactory from a metadata source.
///
@@ -248,10 +277,11 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public
DatasetFactory {
/// \param[in] base_path used as the prefix of every parquet files referenced
/// \param[in] filesystem from which to read the files referenced.
/// \param[in] format to read the file with.
+ /// \param[in] options see ParquetFactoryOptions
static Result<std::shared_ptr<DatasetFactory>> Make(
const FileSource& metadata, const std::string& base_path,
std::shared_ptr<fs::FileSystem> filesystem,
- std::shared_ptr<ParquetFileFormat> format);
+ std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions
options);
Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
InspectOptions options) override;
@@ -262,17 +292,25 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public
DatasetFactory {
ParquetDatasetFactory(std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<ParquetFileFormat> format,
std::shared_ptr<parquet::FileMetaData> metadata,
- std::string base_path);
+ std::string base_path, ParquetFactoryOptions options);
std::shared_ptr<fs::FileSystem> filesystem_;
std::shared_ptr<ParquetFileFormat> format_;
std::shared_ptr<parquet::FileMetaData> metadata_;
std::string base_path_;
+ ParquetFactoryOptions options_;
+ FragmentVector fragments_;
private:
- Result<std::vector<std::shared_ptr<FileFragment>>> CollectParquetFragments(
+ Result<std::vector<std::string>> CollectPaths(
const parquet::FileMetaData& metadata,
const parquet::ArrowReaderProperties& properties);
+
+ Result<std::vector<std::shared_ptr<FileFragment>>> CollectParquetFragments(
+ const parquet::FileMetaData& metadata,
+ const parquet::ArrowReaderProperties& properties, const Partitioning&
partitioning);
+
+ Result<std::shared_ptr<Schema>> PartitionSchema();
};
} // namespace dataset
diff --git a/cpp/src/arrow/dataset/partition.cc
b/cpp/src/arrow/dataset/partition.cc
index 2e44075..f2d8488 100644
--- a/cpp/src/arrow/dataset/partition.cc
+++ b/cpp/src/arrow/dataset/partition.cc
@@ -598,5 +598,30 @@ std::shared_ptr<PartitioningFactory>
HivePartitioning::MakeFactory() {
return std::shared_ptr<PartitioningFactory>(new HivePartitioningFactory());
}
+std::string StripPrefixAndFilename(const std::string& path, const std::string&
prefix) {
+ auto maybe_base_less = fs::internal::RemoveAncestor(prefix, path);
+ auto base_less = maybe_base_less ? maybe_base_less->to_string() : path;
+ auto basename_filename = fs::internal::GetAbstractPathParent(base_less);
+ return basename_filename.first;
+}
+
+std::vector<std::string> StripPrefixAndFilename(const
std::vector<std::string>& paths,
+ const std::string& prefix) {
+ std::vector<std::string> result;
+ for (const auto& path : paths) {
+ result.emplace_back(StripPrefixAndFilename(path, prefix));
+ }
+ return result;
+}
+
+Result<std::shared_ptr<Schema>> PartitioningOrFactory::GetOrInferSchema(
+ const std::vector<std::string>& paths) {
+ if (auto part = partitioning()) {
+ return part->schema();
+ }
+
+ return factory()->Inspect(paths);
+}
+
} // namespace dataset
} // namespace arrow
diff --git a/cpp/src/arrow/dataset/partition.h
b/cpp/src/arrow/dataset/partition.h
index 560669a..56a8db9 100644
--- a/cpp/src/arrow/dataset/partition.h
+++ b/cpp/src/arrow/dataset/partition.h
@@ -254,7 +254,15 @@ class ARROW_DS_EXPORT FunctionPartitioning : public
Partitioning {
std::string name_;
};
-// TODO(bkietz) use RE2 and named groups to provide RegexpPartitioning
+/// \brief Remove a prefix and the filename of a path.
+///
+/// e.g., `StripPrefixAndFilename("/data/year=2019/c.txt", "/data") ->
"year=2019"`
+ARROW_DS_EXPORT std::string StripPrefixAndFilename(const std::string& path,
+ const std::string& prefix);
+
+/// \brief Vector version of StripPrefixAndFilename.
+ARROW_DS_EXPORT std::vector<std::string> StripPrefixAndFilename(
+ const std::vector<std::string>& paths, const std::string& prefix);
/// \brief Either a Partitioning or a PartitioningFactory
class ARROW_DS_EXPORT PartitioningOrFactory {
@@ -289,6 +297,8 @@ class ARROW_DS_EXPORT PartitioningOrFactory {
return NULLPTR;
}
+ Result<std::shared_ptr<Schema>> GetOrInferSchema(const
std::vector<std::string>& paths);
+
private:
util::variant<std::shared_ptr<PartitioningFactory>,
std::shared_ptr<Partitioning>>
variant_;
diff --git a/cpp/src/arrow/dataset/partition_test.cc
b/cpp/src/arrow/dataset/partition_test.cc
index ceb6a11..fad81cc 100644
--- a/cpp/src/arrow/dataset/partition_test.cc
+++ b/cpp/src/arrow/dataset/partition_test.cc
@@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+#include "arrow/dataset/partition.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
#include <cstdint>
#include <map>
#include <memory>
@@ -22,11 +27,7 @@
#include <string>
#include <vector>
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
#include "arrow/dataset/file_base.h"
-#include "arrow/dataset/partition.h"
#include "arrow/dataset/test_util.h"
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/path_util.h"
@@ -471,5 +472,21 @@ TEST_F(TestPartitioningWritePlan, Errors) {
"b"_ == "forever alone"));
}
+TEST(TestStripPrefixAndFilename, Basic) {
+ ASSERT_EQ(StripPrefixAndFilename("", ""), "");
+ ASSERT_EQ(StripPrefixAndFilename("a.csv", ""), "");
+ ASSERT_EQ(StripPrefixAndFilename("a/b.csv", ""), "a");
+ ASSERT_EQ(StripPrefixAndFilename("/a/b/c.csv", "/a"), "b");
+ ASSERT_EQ(StripPrefixAndFilename("/a/b/c/d.csv", "/a"), "b/c");
+ ASSERT_EQ(StripPrefixAndFilename("/a/b/c.csv", "/a/b"), "");
+
+ std::vector<std::string> input{"/data/year=2019/file.parquet",
+ "/data/year=2019/month=12/file.parquet",
+
"/data/year=2019/month=12/day=01/file.parquet"};
+ EXPECT_THAT(StripPrefixAndFilename(input, "/data"),
+ testing::ElementsAre("year=2019", "year=2019/month=12",
+ "year=2019/month=12/day=01"));
+} // namespace dataset
+
} // namespace dataset
} // namespace arrow
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index ae8f19b..39e66b1 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -1378,6 +1378,9 @@ cdef class FileSystemFactoryOptions:
partition_base_dir prefix will be skipped for partitioning discovery.
The ignored files will still be part of the Dataset, but will not
have partition information.
+ partitioning: Partitioning/PartitioningFactory, optional
+ Apply the Partitioning to every discovered Fragment. See Partitioning or
+ PartitioningFactory documentation.
exclude_invalid_files : bool, optional (default True)
If True, invalid files will be excluded (file format specific check).
This will incur IO for each files in a serial and single threaded
@@ -1567,6 +1570,82 @@ cdef class UnionDatasetFactory(DatasetFactory):
self.union_factory = <CUnionDatasetFactory*> sp.get()
+cdef class ParquetFactoryOptions:
+ """
+ Influences the discovery of parquet dataset.
+
+ Parameters
+ ----------
+ partition_base_dir : str, optional
+ For the purposes of applying the partitioning, paths will be
+ stripped of the partition_base_dir. Files not matching the
+ partition_base_dir prefix will be skipped for partitioning discovery.
+ The ignored files will still be part of the Dataset, but will not
+ have partition information.
+ partitioning : Partitioning, PartitioningFactory, optional
+ The partitioning scheme applied to fragments, see ``Partitioning``.
+ """
+
+ cdef:
+ CParquetFactoryOptions options
+
+ __slots__ = () # avoid mistakingly creating attributes
+
+ def __init__(self, partition_base_dir=None, partitioning=None):
+ if isinstance(partitioning, PartitioningFactory):
+ self.partitioning_factory = partitioning
+ elif isinstance(partitioning, Partitioning):
+ self.partitioning = partitioning
+
+ if partition_base_dir is not None:
+ self.partition_base_dir = partition_base_dir
+
+ cdef inline CParquetFactoryOptions unwrap(self):
+ return self.options
+
+ @property
+ def partitioning(self):
+ """Partitioning to apply to discovered files.
+
+ NOTE: setting this property will overwrite partitioning_factory.
+ """
+ c_partitioning = self.options.partitioning.partitioning()
+ if c_partitioning.get() == nullptr:
+ return None
+ return Partitioning.wrap(c_partitioning)
+
+ @partitioning.setter
+ def partitioning(self, Partitioning value):
+ self.options.partitioning = (<Partitioning> value).unwrap()
+
+ @property
+ def partitioning_factory(self):
+ """PartitioningFactory to apply to discovered files and
+ discover a Partitioning.
+
+ NOTE: setting this property will overwrite partitioning.
+ """
+ c_factory = self.options.partitioning.factory()
+ if c_factory.get() == nullptr:
+ return None
+ return PartitioningFactory.wrap(c_factory)
+
+ @partitioning_factory.setter
+ def partitioning_factory(self, PartitioningFactory value):
+ self.options.partitioning = (<PartitioningFactory> value).unwrap()
+
+ @property
+ def partition_base_dir(self):
+ """
+ Base directory to strip paths before applying the partitioning.
+ """
+ return frombytes(self.options.partition_base_dir)
+
+ @partition_base_dir.setter
+ def partition_base_dir(self, value):
+ self.options.partition_base_dir = tobytes(value)
+
+
cdef class ParquetDatasetFactory(DatasetFactory):
"""
Create a ParquetDatasetFactory from a Parquet `_metadata` file.
@@ -1581,26 +1660,32 @@ cdef class ParquetDatasetFactory(DatasetFactory):
files.
format : ParquetFileFormat
Parquet format options.
+ options : ParquetFactoryOptions, optional
+ Various flags influencing the discovery of filesystem paths.
"""
cdef:
CParquetDatasetFactory* parquet_factory
def __init__(self, metadata_path, FileSystem filesystem not None,
- FileFormat format not None):
+ FileFormat format not None,
+ ParquetFactoryOptions options=None):
cdef:
c_string path
shared_ptr[CFileSystem] c_filesystem
shared_ptr[CParquetFileFormat] c_format
CResult[shared_ptr[CDatasetFactory]] result
+ CParquetFactoryOptions c_options
c_path = tobytes(metadata_path)
c_filesystem = filesystem.unwrap()
c_format = static_pointer_cast[CParquetFileFormat, CFileFormat](
format.unwrap())
+ options = options or ParquetFactoryOptions()
+ c_options = options.unwrap()
result = CParquetDatasetFactory.MakeFromMetaDataPath(
- c_path, c_filesystem, c_format)
+ c_path, c_filesystem, c_format, c_options)
self.init(GetResultValue(result))
cdef init(self, shared_ptr[CDatasetFactory]& sp):
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 8c58250..405caf6 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -36,6 +36,7 @@ from pyarrow._dataset import ( # noqa
HivePartitioning,
IpcFileFormat,
ParquetDatasetFactory,
+ ParquetFactoryOptions,
ParquetFileFormat,
ParquetFileFragment,
ParquetReadOptions,
@@ -445,7 +446,8 @@ def _union_dataset(children, schema=None, **kwargs):
return UnionDataset(schema, children)
-def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None):
+def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None,
+ partitioning=None, partition_base_dir=None):
"""
Create a FileSystemDataset from a `_metadata` file created via
`pyarrrow.parquet.write_metadata`.
@@ -468,6 +470,16 @@ def parquet_dataset(metadata_path, schema=None,
filesystem=None, format=None):
format : ParquetFileFormat
An instance of a ParquetFileFormat if special options needs to be
passed.
+ partitioning : Partitioning, PartitioningFactory, str, list of str
+ The partitioning scheme specified with the ``partitioning()``
+ function. A flavor string can be used as shortcut, and with a list of
+ field names a DirectionaryPartitioning will be inferred.
+ partition_base_dir : str, optional
+ For the purposes of applying the partitioning, paths will be
+ stripped of the partition_base_dir. Files not matching the
+ partition_base_dir prefix will be skipped for partitioning discovery.
+ The ignored files will still be part of the Dataset, but will not
+ have partition information.
Returns
-------
@@ -486,8 +498,13 @@ def parquet_dataset(metadata_path, schema=None,
filesystem=None, format=None):
filesystem, _ = _ensure_filesystem(filesystem)
metadata_path = _normalize_path(filesystem, _stringify_path(metadata_path))
+ options = ParquetFactoryOptions(
+ partition_base_dir=partition_base_dir,
+ partitioning=_ensure_partitioning(partitioning)
+ )
- factory = ParquetDatasetFactory(metadata_path, filesystem, format)
+ factory = ParquetDatasetFactory(
+ metadata_path, filesystem, format, options=options)
return factory.finish(schema)
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd
b/python/pyarrow/includes/libarrow_dataset.pxd
index e84ff78..f80fdb9 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -326,13 +326,19 @@ cdef extern from "arrow/dataset/api.h" namespace
"arrow::dataset" nogil:
CFileSystemFactoryOptions options
)
+ cdef cppclass CParquetFactoryOptions \
+ "arrow::dataset::ParquetFactoryOptions":
+ CPartitioningOrFactory partitioning
+ c_string partition_base_dir
+
cdef cppclass CParquetDatasetFactory \
"arrow::dataset::ParquetDatasetFactory"(CDatasetFactory):
@staticmethod
CResult[shared_ptr[CDatasetFactory]] MakeFromMetaDataPath "Make"(
const c_string& metadata_path,
shared_ptr[CFileSystem] filesystem,
- shared_ptr[CParquetFileFormat] format
+ shared_ptr[CParquetFileFormat] format,
+ CParquetFactoryOptions options
)
@staticmethod
@@ -340,5 +346,6 @@ cdef extern from "arrow/dataset/api.h" namespace
"arrow::dataset" nogil:
const CFileSource& metadata_path,
const c_string& base_path,
shared_ptr[CFileSystem] filesystem,
- shared_ptr[CParquetFileFormat] format
+ shared_ptr[CParquetFileFormat] format,
+ CParquetFactoryOptions options
)
diff --git a/python/pyarrow/tests/test_dataset.py
b/python/pyarrow/tests/test_dataset.py
index 4150c1b..9fbde57 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -1560,9 +1560,10 @@ def _create_metadata_file(root_path):
def _create_parquet_dataset_partitioned(root_path):
import pyarrow.parquet as pq
- table = pa.table({
- 'f1': range(20), 'f2': np.random.randn(20),
- 'part': np.repeat(['a', 'b'], 10)}
+ table = pa.table([
+ pa.array(range(20)), pa.array(np.random.randn(20)),
+ pa.array(np.repeat(['a', 'b'], 10))],
+ names=["f1", "f2", "part"]
)
pq.write_to_dataset(table, str(root_path), partition_cols=['part'])
return _create_metadata_file(root_path), table
@@ -1571,19 +1572,18 @@ def _create_parquet_dataset_partitioned(root_path):
@pytest.mark.parquet
@pytest.mark.pandas
def test_parquet_dataset_factory_partitioned(tempdir):
- # TODO support for specifying partitioning scheme
-
root_path = tempdir / "test_parquet_dataset_factory_partitioned"
metadata_path, table = _create_parquet_dataset_partitioned(root_path)
- dataset = ds.parquet_dataset(metadata_path)
- # TODO partition column not yet included
- # assert dataset.schema.equals(table.schema)
+ partitioning = ds.partitioning(flavor="hive")
+ dataset = ds.parquet_dataset(metadata_path, partitioning=partitioning)
+
+ assert dataset.schema.equals(table.schema)
assert len(dataset.files) == 2
result = dataset.to_table()
assert result.num_rows == 20
# the partitioned dataset does not preserve order
result = result.to_pandas().sort_values("f1").reset_index(drop=True)
- expected = table.to_pandas().drop(columns=["part"])
+ expected = table.to_pandas()
pd.testing.assert_frame_equal(result, expected)