This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 88fd523759 GH-34565: [C++] Teach dataset_writer to accept custom 
filename functor (#34984)
88fd523759 is described below

commit 88fd5237591c6eb521a8eea1bbe1eb1507b61c3d
Author: Haocheng Liu <[email protected]>
AuthorDate: Wed Apr 12 12:59:21 2023 -0400

    GH-34565: [C++] Teach dataset_writer to accept custom filename functor 
(#34984)
    
    ### Rationale for this change
    
    Existing basename_template will only use a monotonically increasing int as 
new filenames. when there is needs for custom filenames(left padding, 
hash-code), downstream users must rename the files in a post-processing step.
    
    ### What changes are included in this PR?
    
    A new functor is added to FileSystemDatasetWriteOptions which allows users 
to provide a custom name for dataset_writer.
    
    ### Are these changes tested?
    
    Yes. Unit tests are added for normal and ill-formed lambdas.
    
    ### Are there any user-facing changes?
    
    Yes. It allows users to customize output file names.
    * Closes: #34565
    
    Authored-by: Haocheng Liu <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/arrow/dataset/dataset_writer.cc      | 18 +++++++++++++++--
 cpp/src/arrow/dataset/dataset_writer_test.cc | 29 ++++++++++++++++++++++++++++
 cpp/src/arrow/dataset/file_base.h            |  6 ++++++
 3 files changed, 51 insertions(+), 2 deletions(-)

diff --git a/cpp/src/arrow/dataset/dataset_writer.cc 
b/cpp/src/arrow/dataset/dataset_writer.cc
index b1ebb660d6..686c7712fe 100644
--- a/cpp/src/arrow/dataset/dataset_writer.cc
+++ b/cpp/src/arrow/dataset/dataset_writer.cc
@@ -21,6 +21,7 @@
 #include <memory>
 #include <mutex>
 #include <unordered_map>
+#include <unordered_set>
 
 #include "arrow/filesystem/path_util.h"
 #include "arrow/record_batch.h"
@@ -308,11 +309,22 @@ class DatasetWriterDirectoryQueue {
   }
 
   Result<std::string> GetNextFilename() {
-    auto basename = 
::arrow::internal::Replace(write_options_.basename_template,
-                                               kIntegerToken, 
ToChars(file_counter_++));
+    std::optional<std::string> basename;
+    if (write_options_.basename_template_functor == nullptr) {
+      basename = ::arrow::internal::Replace(write_options_.basename_template,
+                                            kIntegerToken, 
ToChars(file_counter_++));
+    } else {
+      basename = ::arrow::internal::Replace(
+          write_options_.basename_template, kIntegerToken,
+          write_options_.basename_template_functor(file_counter_++));
+    }
     if (!basename) {
       return Status::Invalid("string interpolation of basename template 
failed");
     }
+    if (!used_filenames_.insert(*basename).second) {
+      return Status::Invalid("filename ", *basename,
+                             " is already used before. Check 
basename_template_functor");
+    }
     return fs::internal::ConcatAbstractPath(directory_, prefix_ + *basename);
   }
 
@@ -406,6 +418,7 @@ class DatasetWriterDirectoryQueue {
       latest_open_file_tasks_.reset();
       latest_open_file_ = nullptr;
     }
+    used_filenames_.clear();
     return Status::OK();
   }
 
@@ -418,6 +431,7 @@ class DatasetWriterDirectoryQueue {
   DatasetWriterState* writer_state_;
   Future<> init_future_;
   std::string current_filename_;
+  std::unordered_set<std::string> used_filenames_;
   DatasetWriterFileQueue* latest_open_file_ = nullptr;
   std::unique_ptr<util::ThrottledAsyncTaskScheduler> latest_open_file_tasks_;
   uint64_t rows_written_ = 0;
diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc 
b/cpp/src/arrow/dataset/dataset_writer_test.cc
index d7d058558f..d2480cd482 100644
--- a/cpp/src/arrow/dataset/dataset_writer_test.cc
+++ b/cpp/src/arrow/dataset/dataset_writer_test.cc
@@ -276,6 +276,35 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
                      {"testdir/chunk-3.arrow", 30, 5}});
 }
 
+TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteWithFunctor) {
+  // Left padding with up to four zeros
+  write_options_.max_rows_per_group = 10;
+  write_options_.max_rows_per_file = 10;
+  write_options_.basename_template_functor = [](int v) {
+    size_t n_zero = 4;
+    return std::string(n_zero - std::min(n_zero, std::to_string(v).length()), 
'0') +
+           std::to_string(v);
+  };
+  auto dataset_writer = MakeDatasetWriter();
+  dataset_writer->WriteRecordBatch(MakeBatch(25), "");
+  EndWriterChecked(dataset_writer.get());
+  AssertCreatedData({{"testdir/chunk-0000.arrow", 0, 10},
+                     {"testdir/chunk-0001.arrow", 10, 10},
+                     {"testdir/chunk-0002.arrow", 20, 5}});
+}
+
+TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteWithBrokenFunctor) {
+  // Rewriting an exiting file will error out
+  write_options_.max_rows_per_group = 10;
+  write_options_.max_rows_per_file = 10;
+  write_options_.basename_template_functor = [](int v) { return "SAME"; };
+  auto dataset_writer = MakeDatasetWriter();
+  dataset_writer->WriteRecordBatch(MakeBatch(25), "");
+  dataset_writer->Finish();
+  test_done_with_tasks_.MarkFinished();
+  ASSERT_FINISHES_AND_RAISES(Invalid, scheduler_finished_);
+}
+
 TEST_F(DatasetWriterTestFixture, MaxRowsManyWrites) {
   write_options_.max_rows_per_file = 10;
   write_options_.max_rows_per_group = 10;
diff --git a/cpp/src/arrow/dataset/file_base.h 
b/cpp/src/arrow/dataset/file_base.h
index ca28cb1e4a..788a1bb432 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -404,6 +404,12 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
   /// {i} will be replaced by an auto incremented integer.
   std::string basename_template;
 
+  /// A functor which will be applied on an incremented counter.  The result 
will be
+  /// inserted into the basename_template in place of {i}.
+  ///
+  /// This can be used, for example, to left-pad the file counter.
+  std::function<std::string(int)> basename_template_functor;
+
   /// If greater than 0 then this will limit the maximum number of files that 
can be left
   /// open. If an attempt is made to open too many files then the least 
recently used file
   /// will be closed.  If this setting is set too low you may end up 
fragmenting your data

Reply via email to