westonpace commented on a change in pull request #12530:
URL: https://github.com/apache/arrow/pull/12530#discussion_r821004375
##########
File path: cpp/src/arrow/dataset/dataset_writer.cc
##########
@@ -249,6 +249,16 @@ class DatasetWriterDirectoryQueue : public
util::AsyncDestroyable {
write_options_(write_options),
writer_state_(writer_state) {}
+ DatasetWriterDirectoryQueue(std::string directory, std::string prefix,
Review comment:
This should replace the existing constructor. This class is only used
internally (e.g. it isn't exposed via dataset_writer.h) and so it only ever
gets constructed in exactly one place so we only need to support one type of
constructor.
##########
File path: cpp/src/arrow/dataset/dataset_writer.cc
##########
@@ -342,11 +357,12 @@ class DatasetWriterDirectoryQueue : public
util::AsyncDestroyable {
Make(util::AsyncTaskGroup* task_group,
const FileSystemDatasetWriteOptions& write_options,
DatasetWriterState* writer_state, std::shared_ptr<Schema> schema,
- std::string dir) {
+ std::string directory, std::string prefix) {
auto dir_queue = util::MakeUniqueAsync<DatasetWriterDirectoryQueue>(
- std::move(dir), std::move(schema), write_options, writer_state);
+ std::move(directory), std::move(prefix), std::move(schema),
write_options,
+ writer_state);
RETURN_NOT_OK(task_group->AddTask(dir_queue->on_closed()));
- dir_queue->PrepareDirectory();
+ dir_queue->PrepareDirectory(prefix);
Review comment:
By this point you have already called `std::move` on `prefix` so that
variable is no longer valid. You get away with it when the prefix is small
because C++'s small string optimization means that a move on a small string is
a no-op. However, with a large string, this could lead to errors.
Also, since we've already stored it in `prefix_` we can just use `prefix_`
directly and `PrepareDirectory` doesn't need to take any arguments.
##########
File path: cpp/src/arrow/dataset/partition_test.cc
##########
@@ -61,12 +61,12 @@ class TestPartitioning : public ::testing::Test {
// formatted partition expressions are bound to the schema of the dataset
being
Review comment:
This method should be updated to:
```
void AssertFormat(compute::Expression expr, const std::string&
expected_directory, const std::string& expected_prefix = "")
```
Then it should check both `first` and `second`. Finally, we should have at
least one test case for filename partitioning that uses `AssertFormat`.
##########
File path: cpp/src/arrow/dataset/partition.cc
##########
@@ -561,6 +624,73 @@ class DirectoryPartitioningFactory : public
KeyValuePartitioningFactory {
std::vector<std::string> field_names_;
};
+class FilenamePartitioningFactory : public KeyValuePartitioningFactory {
+ public:
+ FilenamePartitioningFactory(std::vector<std::string> field_names,
+ PartitioningFactoryOptions options)
+ : KeyValuePartitioningFactory(options),
field_names_(std::move(field_names)) {
+ Reset();
+ util::InitializeUTF8();
+ }
+
+ std::string type_name() const override { return "filename"; }
+
+ Result<std::shared_ptr<Schema>> Inspect(
+ const std::vector<std::string>& paths) override {
+ for (auto path : paths) {
Review comment:
```suggestion
for (const auto& path : paths) {
```
##########
File path: cpp/src/arrow/dataset/dataset_writer.cc
##########
@@ -326,9 +336,14 @@ class DatasetWriterDirectoryQueue : public
util::AsyncDestroyable {
uint64_t rows_written() const { return rows_written_; }
- void PrepareDirectory() {
- init_future_ =
DeferNotOk(write_options_.filesystem->io_context().executor()->Submit(
- [this]() { return write_options_.filesystem->CreateDir(directory_);
}));
+ void PrepareDirectory(std::string& prefix) {
+ if (prefix.empty()) {
Review comment:
```suggestion
if (!directory_.empty()) {
```
At the moment, `!directory_.empty() == prefix.empty()` so this works but in
the future I think it would be very possible for some custom partitioning to
manipulate both the prefix and the directory (e.g. maybe the user wants to
partition directories by year but files by month).
##########
File path: cpp/src/arrow/filesystem/path_util.h
##########
@@ -55,6 +60,11 @@ Status ValidateAbstractPathParts(const
std::vector<std::string>& parts);
ARROW_EXPORT
std::string ConcatAbstractPath(const std::string& base, const std::string&
stem);
+// Append a non-empty stem to an abstract path with a filename prefix.
+ARROW_EXPORT
+std::string ConcatAbstractPath(const std::string& base, const std::string&
prefix,
Review comment:
I think this method is a little too single purpose. This file is in the
`filesystem` module and is meant to contain general purpose utilities that are
common for anyone working with files. There is no concept of "partitions" or
"prefixes" in the domain of this module. Perhaps you could instead create a
more general version:
```
std::string ConcatAbstractPaths(const std::vector<std::string>& parts);
```
##########
File path: cpp/src/arrow/filesystem/path_util.h
##########
@@ -38,6 +39,10 @@ constexpr char kSep = '/';
ARROW_EXPORT
std::vector<std::string> SplitAbstractPath(const std::string& s);
+// Split a filename into its individual partitions.
+ARROW_EXPORT
+std::vector<std::string> SplitFilename(const std::string& s);
Review comment:
Similar to some of the comments I made below, this method should either
move to `partitioning.h` or be made more generic. Users that are using
`path_util.h` aren't going to be aware of the fact that we are splitting on `_`
and even if we document it better they might be confused why such a method
would exist since its purpose is very unique to partitioning.
##########
File path: cpp/src/arrow/filesystem/path_util.cc
##########
@@ -33,6 +33,7 @@ namespace internal {
std::vector<std::string> SplitAbstractPath(const std::string& path) {
std::vector<std::string> parts;
auto v = util::string_view(path);
+
Review comment:
```suggestion
```
##########
File path: cpp/src/arrow/dataset/dataset_writer.cc
##########
@@ -468,13 +485,15 @@ class DatasetWriter::DatasetWriterImpl : public
util::AsyncDestroyable {
}
Future<> DoWriteRecordBatch(std::shared_ptr<RecordBatch> batch,
- const std::string& directory) {
+ const std::string& directory, const std::string&
prefix) {
ARROW_ASSIGN_OR_RAISE(
auto dir_queue_itr,
::arrow::internal::GetOrInsertGenerated(
- &directory_queues_, directory, [this, &batch](const std::string&
dir) {
- return DatasetWriterDirectoryQueue::Make(
- &task_group_, write_options_, &writer_state_,
batch->schema(), dir);
+ &directory_queues_, directory + prefix,
+ [this, &batch, &directory, &prefix](const std::string& dir) {
Review comment:
```suggestion
[this, &batch, &directory, &prefix](const std::string& key) {
```
A small change that may help future readers understand why we need to
capture `directory` instead of using `dir`.
##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1592,6 +1593,145 @@ cdef class HivePartitioning(Partitioning):
res.append(pyarrow_wrap_array(arr))
return res
+cdef class FilenamePartitioning(Partitioning):
Review comment:
We should add some python tests which cover this logic. Hopefully we
can adapt some of the existing partitioning tests pretty easily.
##########
File path: cpp/src/arrow/filesystem/path_util.h
##########
@@ -113,6 +123,27 @@ std::string JoinAbstractPath(const StringRange& range) {
return JoinAbstractPath(range.begin(), range.end());
}
+// Join the components of filename partitions
+template <class StringIt>
+std::string JoinFilenamePartitions(StringIt it, StringIt end) {
Review comment:
Same concern here. This class is meant to be more generic so we can't
refer to things like "partitions" and `_` which are concepts that only make
sense within the domain of `partitioning.h`.
Perhaps move this method inside of `partitioning.cc`? Or you could make a
generic `JoinPaths(const StringRange& range, const std::string& sep)` but I
wouldn't expect it to put `sep` at the end of the path.
##########
File path: cpp/src/arrow/dataset/dataset_writer_test.cc
##########
@@ -206,6 +206,14 @@ TEST_F(DatasetWriterTestFixture, Basic) {
AssertCreatedData({{"testdir/chunk-0.arrow", 0, 100}});
}
+TEST_F(DatasetWriterTestFixture, BasicFilePrefix) {
Review comment:
Can you add a test that uses both a directory and a prefix? We don't
have any partitioning schemes (yet) that can generate this sort of thing but it
should be perfectly valid as far as this class is concerned.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]