westonpace commented on a change in pull request #12530:
URL: https://github.com/apache/arrow/pull/12530#discussion_r821004375



##########
File path: cpp/src/arrow/dataset/dataset_writer.cc
##########
@@ -249,6 +249,16 @@ class DatasetWriterDirectoryQueue : public 
util::AsyncDestroyable {
         write_options_(write_options),
         writer_state_(writer_state) {}
 
+  DatasetWriterDirectoryQueue(std::string directory, std::string prefix,

Review comment:
       This should replace the existing constructor.  This class is only used 
internally (e.g. it isn't exposed via dataset_writer.h) and so it only ever 
gets constructed in exactly one place so we only need to support one type of 
constructor.

##########
File path: cpp/src/arrow/dataset/dataset_writer.cc
##########
@@ -342,11 +357,12 @@ class DatasetWriterDirectoryQueue : public 
util::AsyncDestroyable {
   Make(util::AsyncTaskGroup* task_group,
        const FileSystemDatasetWriteOptions& write_options,
        DatasetWriterState* writer_state, std::shared_ptr<Schema> schema,
-       std::string dir) {
+       std::string directory, std::string prefix) {
     auto dir_queue = util::MakeUniqueAsync<DatasetWriterDirectoryQueue>(
-        std::move(dir), std::move(schema), write_options, writer_state);
+        std::move(directory), std::move(prefix), std::move(schema), 
write_options,
+        writer_state);
     RETURN_NOT_OK(task_group->AddTask(dir_queue->on_closed()));
-    dir_queue->PrepareDirectory();
+    dir_queue->PrepareDirectory(prefix);

Review comment:
       By this point you have already called `std::move` on `prefix` so that 
variable is no longer valid.  You get away with it when the prefix is small 
because C++'s small string optimization means that a move on a small string is 
a no-op.  However, with a large string, this could lead to errors.
   
   Also, since we've already stored it in `prefix_` we can just use `prefix_` 
directly and `PrepareDirectory` doesn't need to take any arguments.

##########
File path: cpp/src/arrow/dataset/partition_test.cc
##########
@@ -61,12 +61,12 @@ class TestPartitioning : public ::testing::Test {
     // formatted partition expressions are bound to the schema of the dataset 
being

Review comment:
       This method should be updated to:
   
   ```
   void AssertFormat(compute::Expression expr, const std::string& 
expected_directory, const std::string& expected_prefix  = "")
   ```
   
   Then it should check both `first` and `second`.  Finally, we should have at 
least one test case for filename partitioning that uses `AssertFormat`.

##########
File path: cpp/src/arrow/dataset/partition.cc
##########
@@ -561,6 +624,73 @@ class DirectoryPartitioningFactory : public 
KeyValuePartitioningFactory {
   std::vector<std::string> field_names_;
 };
 
+class FilenamePartitioningFactory : public KeyValuePartitioningFactory {
+ public:
+  FilenamePartitioningFactory(std::vector<std::string> field_names,
+                              PartitioningFactoryOptions options)
+      : KeyValuePartitioningFactory(options), 
field_names_(std::move(field_names)) {
+    Reset();
+    util::InitializeUTF8();
+  }
+
+  std::string type_name() const override { return "filename"; }
+
+  Result<std::shared_ptr<Schema>> Inspect(
+      const std::vector<std::string>& paths) override {
+    for (auto path : paths) {

Review comment:
       ```suggestion
       for (const auto& path : paths) {
   ```

##########
File path: cpp/src/arrow/dataset/dataset_writer.cc
##########
@@ -326,9 +336,14 @@ class DatasetWriterDirectoryQueue : public 
util::AsyncDestroyable {
 
   uint64_t rows_written() const { return rows_written_; }
 
-  void PrepareDirectory() {
-    init_future_ = 
DeferNotOk(write_options_.filesystem->io_context().executor()->Submit(
-        [this]() { return write_options_.filesystem->CreateDir(directory_); 
}));
+  void PrepareDirectory(std::string& prefix) {
+    if (prefix.empty()) {

Review comment:
       ```suggestion
       if (!directory_.empty()) {
   ```
   At the moment, `!directory_.empty() == prefix.empty()` so this works but in 
the future I think it would be very possible for some custom partitioning to 
manipulate both the prefix and the directory (e.g. maybe the user wants to 
partition directories by year but files by month).

##########
File path: cpp/src/arrow/filesystem/path_util.h
##########
@@ -55,6 +60,11 @@ Status ValidateAbstractPathParts(const 
std::vector<std::string>& parts);
 ARROW_EXPORT
 std::string ConcatAbstractPath(const std::string& base, const std::string& 
stem);
 
+// Append a non-empty stem to an abstract path with a filename prefix.
+ARROW_EXPORT
+std::string ConcatAbstractPath(const std::string& base, const std::string& 
prefix,

Review comment:
       I think this method is a little too single purpose.  This file is in the 
`filesystem` module and is meant to contain general purpose utilities that are 
common for anyone working with files.  There is no concept of "partitions" or 
"prefixes" in the domain of this module.  Perhaps you could instead create a 
more general version:
   
   ```
   std::string ConcatAbstractPaths(const std::vector<std::string>& parts);
   ```

##########
File path: cpp/src/arrow/filesystem/path_util.h
##########
@@ -38,6 +39,10 @@ constexpr char kSep = '/';
 ARROW_EXPORT
 std::vector<std::string> SplitAbstractPath(const std::string& s);
 
+// Split a filename into its individual partitions.
+ARROW_EXPORT
+std::vector<std::string> SplitFilename(const std::string& s);

Review comment:
       Similar to some of the comments I made below, this method should either 
move to `partitioning.h` or be made more generic.  Users that are using 
`path_util.h` aren't going to be aware of the fact that we are splitting on `_` 
and even if we document it better they might be confused why such a method 
would exist since its purpose is very unique to partitioning.

##########
File path: cpp/src/arrow/filesystem/path_util.cc
##########
@@ -33,6 +33,7 @@ namespace internal {
 std::vector<std::string> SplitAbstractPath(const std::string& path) {
   std::vector<std::string> parts;
   auto v = util::string_view(path);
+

Review comment:
       ```suggestion
   ```

##########
File path: cpp/src/arrow/dataset/dataset_writer.cc
##########
@@ -468,13 +485,15 @@ class DatasetWriter::DatasetWriterImpl : public 
util::AsyncDestroyable {
   }
 
   Future<> DoWriteRecordBatch(std::shared_ptr<RecordBatch> batch,
-                              const std::string& directory) {
+                              const std::string& directory, const std::string& 
prefix) {
     ARROW_ASSIGN_OR_RAISE(
         auto dir_queue_itr,
         ::arrow::internal::GetOrInsertGenerated(
-            &directory_queues_, directory, [this, &batch](const std::string& 
dir) {
-              return DatasetWriterDirectoryQueue::Make(
-                  &task_group_, write_options_, &writer_state_, 
batch->schema(), dir);
+            &directory_queues_, directory + prefix,
+            [this, &batch, &directory, &prefix](const std::string& dir) {

Review comment:
       ```suggestion
               [this, &batch, &directory, &prefix](const std::string& key) {
   ```
   A small change that may help future readers understand why we need to 
capture `directory` instead of using `dir`.

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1592,6 +1593,145 @@ cdef class HivePartitioning(Partitioning):
             res.append(pyarrow_wrap_array(arr))
         return res
 
+cdef class FilenamePartitioning(Partitioning):

Review comment:
       We should add some python tests which cover this logic.  Hopefully we 
can adapt some of the existing partitioning tests pretty easily.

##########
File path: cpp/src/arrow/filesystem/path_util.h
##########
@@ -113,6 +123,27 @@ std::string JoinAbstractPath(const StringRange& range) {
   return JoinAbstractPath(range.begin(), range.end());
 }
 
+// Join the components of filename partitions
+template <class StringIt>
+std::string JoinFilenamePartitions(StringIt it, StringIt end) {

Review comment:
       Same concern here.  This class is meant to be more generic so we can't 
refer to things like "partitions" and `_` which are concepts that only make 
sense within the domain of `partitioning.h`.
   
   Perhaps move this method inside of `partitioning.cc`?  Or you could make a 
generic `JoinPaths(const StringRange& range, const std::string& sep)` but I 
wouldn't expect it to put `sep` at the end of the path.

##########
File path: cpp/src/arrow/dataset/dataset_writer_test.cc
##########
@@ -206,6 +206,14 @@ TEST_F(DatasetWriterTestFixture, Basic) {
   AssertCreatedData({{"testdir/chunk-0.arrow", 0, 100}});
 }
 
+TEST_F(DatasetWriterTestFixture, BasicFilePrefix) {

Review comment:
       Can you add a test that uses both a directory and a prefix?  We don't 
have any partitioning schemes (yet) that can generate this sort of thing but it 
should be perfectly valid as far as this class is concerned.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to