This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 88fd523759 GH-34565: [C++] Teach dataset_writer to accept custom
filename functor (#34984)
88fd523759 is described below
commit 88fd5237591c6eb521a8eea1bbe1eb1507b61c3d
Author: Haocheng Liu <[email protected]>
AuthorDate: Wed Apr 12 12:59:21 2023 -0400
GH-34565: [C++] Teach dataset_writer to accept custom filename functor
(#34984)
### Rationale for this change
Existing basename_template will only use a monotonically increasing int as
new filenames. when there is needs for custom filenames(left padding,
hash-code), downstream users must rename the files in a post-processing step.
### What changes are included in this PR?
A new functor is added to FileSystemDatasetWriteOptions which allows users
to provide a custom name for dataset_writer.
### Are these changes tested?
Yes. Unit tests are added for normal and ill-formed lambdas.
### Are there any user-facing changes?
Yes. It allows users to customize output file names.
* Closes: #34565
Authored-by: Haocheng Liu <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/arrow/dataset/dataset_writer.cc | 18 +++++++++++++++--
cpp/src/arrow/dataset/dataset_writer_test.cc | 29 ++++++++++++++++++++++++++++
cpp/src/arrow/dataset/file_base.h | 6 ++++++
3 files changed, 51 insertions(+), 2 deletions(-)
diff --git a/cpp/src/arrow/dataset/dataset_writer.cc
b/cpp/src/arrow/dataset/dataset_writer.cc
index b1ebb660d6..686c7712fe 100644
--- a/cpp/src/arrow/dataset/dataset_writer.cc
+++ b/cpp/src/arrow/dataset/dataset_writer.cc
@@ -21,6 +21,7 @@
#include <memory>
#include <mutex>
#include <unordered_map>
+#include <unordered_set>
#include "arrow/filesystem/path_util.h"
#include "arrow/record_batch.h"
@@ -308,11 +309,22 @@ class DatasetWriterDirectoryQueue {
}
Result<std::string> GetNextFilename() {
- auto basename =
::arrow::internal::Replace(write_options_.basename_template,
- kIntegerToken,
ToChars(file_counter_++));
+ std::optional<std::string> basename;
+ if (write_options_.basename_template_functor == nullptr) {
+ basename = ::arrow::internal::Replace(write_options_.basename_template,
+ kIntegerToken,
ToChars(file_counter_++));
+ } else {
+ basename = ::arrow::internal::Replace(
+ write_options_.basename_template, kIntegerToken,
+ write_options_.basename_template_functor(file_counter_++));
+ }
if (!basename) {
return Status::Invalid("string interpolation of basename template
failed");
}
+ if (!used_filenames_.insert(*basename).second) {
+ return Status::Invalid("filename ", *basename,
+ " is already used before. Check
basename_template_functor");
+ }
return fs::internal::ConcatAbstractPath(directory_, prefix_ + *basename);
}
@@ -406,6 +418,7 @@ class DatasetWriterDirectoryQueue {
latest_open_file_tasks_.reset();
latest_open_file_ = nullptr;
}
+ used_filenames_.clear();
return Status::OK();
}
@@ -418,6 +431,7 @@ class DatasetWriterDirectoryQueue {
DatasetWriterState* writer_state_;
Future<> init_future_;
std::string current_filename_;
+ std::unordered_set<std::string> used_filenames_;
DatasetWriterFileQueue* latest_open_file_ = nullptr;
std::unique_ptr<util::ThrottledAsyncTaskScheduler> latest_open_file_tasks_;
uint64_t rows_written_ = 0;
diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc
b/cpp/src/arrow/dataset/dataset_writer_test.cc
index d7d058558f..d2480cd482 100644
--- a/cpp/src/arrow/dataset/dataset_writer_test.cc
+++ b/cpp/src/arrow/dataset/dataset_writer_test.cc
@@ -276,6 +276,35 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
{"testdir/chunk-3.arrow", 30, 5}});
}
+TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteWithFunctor) {
+ // Left padding with up to four zeros
+ write_options_.max_rows_per_group = 10;
+ write_options_.max_rows_per_file = 10;
+ write_options_.basename_template_functor = [](int v) {
+ size_t n_zero = 4;
+ return std::string(n_zero - std::min(n_zero, std::to_string(v).length()),
'0') +
+ std::to_string(v);
+ };
+ auto dataset_writer = MakeDatasetWriter();
+ dataset_writer->WriteRecordBatch(MakeBatch(25), "");
+ EndWriterChecked(dataset_writer.get());
+ AssertCreatedData({{"testdir/chunk-0000.arrow", 0, 10},
+ {"testdir/chunk-0001.arrow", 10, 10},
+ {"testdir/chunk-0002.arrow", 20, 5}});
+}
+
+TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteWithBrokenFunctor) {
+ // Rewriting an exiting file will error out
+ write_options_.max_rows_per_group = 10;
+ write_options_.max_rows_per_file = 10;
+ write_options_.basename_template_functor = [](int v) { return "SAME"; };
+ auto dataset_writer = MakeDatasetWriter();
+ dataset_writer->WriteRecordBatch(MakeBatch(25), "");
+ dataset_writer->Finish();
+ test_done_with_tasks_.MarkFinished();
+ ASSERT_FINISHES_AND_RAISES(Invalid, scheduler_finished_);
+}
+
TEST_F(DatasetWriterTestFixture, MaxRowsManyWrites) {
write_options_.max_rows_per_file = 10;
write_options_.max_rows_per_group = 10;
diff --git a/cpp/src/arrow/dataset/file_base.h
b/cpp/src/arrow/dataset/file_base.h
index ca28cb1e4a..788a1bb432 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -404,6 +404,12 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
/// {i} will be replaced by an auto incremented integer.
std::string basename_template;
+ /// A functor which will be applied on an incremented counter. The result
will be
+ /// inserted into the basename_template in place of {i}.
+ ///
+ /// This can be used, for example, to left-pad the file counter.
+ std::function<std::string(int)> basename_template_functor;
+
/// If greater than 0 then this will limit the maximum number of files that
can be left
/// open. If an attempt is made to open too many files then the least
recently used file
/// will be closed. If this setting is set too low you may end up
fragmenting your data