This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new f7286a9530 GH-38884: [C++] DatasetWriter release
rows_in_flight_throttle when allocate writing failed (#38885)
f7286a9530 is described below
commit f7286a95304a848033d96817f704ace0765c0ec7
Author: mwish <[email protected]>
AuthorDate: Thu Dec 7 16:21:14 2023 +0800
GH-38884: [C++] DatasetWriter release rows_in_flight_throttle when allocate
writing failed (#38885)
### Rationale for this change
When file-queue is fall or write failed, the
`DatasetWriterImpl::DoWriteRecordBatch` might failed, however, the resources
are not released.
### What changes are included in this PR?
When file-queue is full or cannot open file, release the `row` resources.
### Are these changes tested?
yes
### Are there any user-facing changes?
no
* Closes: #38884
Authored-by: mwish <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
cpp/src/arrow/dataset/dataset_writer.cc | 15 ++++++++++++--
cpp/src/arrow/dataset/dataset_writer_test.cc | 31 ++++++++++++++++++++++++++--
2 files changed, 42 insertions(+), 4 deletions(-)
diff --git a/cpp/src/arrow/dataset/dataset_writer.cc
b/cpp/src/arrow/dataset/dataset_writer.cc
index a2096d691b..ae9fb36484 100644
--- a/cpp/src/arrow/dataset/dataset_writer.cc
+++ b/cpp/src/arrow/dataset/dataset_writer.cc
@@ -87,7 +87,7 @@ class Throttle {
private:
Future<> backpressure_ = Future<>::MakeFinished();
- uint64_t max_value_;
+ const uint64_t max_value_;
uint64_t in_waiting_ = 0;
uint64_t current_value_ = 0;
std::mutex mutex_;
@@ -621,11 +621,21 @@ class DatasetWriter::DatasetWriterImpl {
backpressure = writer_state_.open_files_throttle.Acquire(1);
if (!backpressure.is_finished()) {
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
+
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
RETURN_NOT_OK(TryCloseLargestFile());
break;
}
}
- RETURN_NOT_OK(dir_queue->StartWrite(next_chunk));
+ auto s = dir_queue->StartWrite(next_chunk);
+ if (!s.ok()) {
+ // If `StartWrite` succeeded, it will Release the
+ // `rows_in_flight_throttle` when the write task is finished.
+ //
+ // `open_files_throttle` will be handed by
`DatasetWriterDirectoryQueue`
+ // so we don't need to release it here.
+ writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
+ return s;
+ }
batch = std::move(remainder);
if (batch) {
RETURN_NOT_OK(dir_queue->FinishCurrentFile());
@@ -647,6 +657,7 @@ class DatasetWriter::DatasetWriterImpl {
DatasetWriterState writer_state_;
std::function<void()> pause_callback_;
std::function<void()> resume_callback_;
+ // Map from directory + prefix to the queue for that directory
std::unordered_map<std::string, std::shared_ptr<DatasetWriterDirectoryQueue>>
directory_queues_;
std::mutex mutex_;
diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc
b/cpp/src/arrow/dataset/dataset_writer_test.cc
index c76e79d79b..e62e779f71 100644
--- a/cpp/src/arrow/dataset/dataset_writer_test.cc
+++ b/cpp/src/arrow/dataset/dataset_writer_test.cc
@@ -189,7 +189,8 @@ class DatasetWriterTestFixture : public testing::Test {
}
}
- void AssertCreatedData(const std::vector<ExpectedFile>& expected_files) {
+ void AssertCreatedData(const std::vector<ExpectedFile>& expected_files,
+ bool check_num_record_batches = true) {
counter_ = 0;
for (const auto& expected_file : expected_files) {
std::optional<MockFileInfo> written_file =
FindFile(expected_file.filename);
@@ -197,7 +198,9 @@ class DatasetWriterTestFixture : public testing::Test {
int num_batches = 0;
AssertBatchesEqual(*MakeBatch(expected_file.start,
expected_file.num_rows),
*ReadAsBatch(written_file->data, &num_batches));
- ASSERT_EQ(expected_file.num_record_batches, num_batches);
+ if (check_num_record_batches) {
+ ASSERT_EQ(expected_file.num_record_batches, num_batches);
+ }
}
}
@@ -277,6 +280,30 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
{"testdir/chunk-3.arrow", 30, 5}});
}
+TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteBackpresure) {
+ // GH-38884: This test is to make sure that the writer can handle
+ // throttle resources in `WriteRecordBatch`.
+
+ constexpr auto kFileSizeLimit = static_cast<uint64_t>(10);
+ write_options_.max_rows_per_file = kFileSizeLimit;
+ write_options_.max_rows_per_group = kFileSizeLimit;
+ write_options_.max_open_files = 2;
+ write_options_.min_rows_per_group = kFileSizeLimit - 1;
+ auto dataset_writer = MakeDatasetWriter(/*max_rows=*/kFileSizeLimit);
+ for (int i = 0; i < 20; ++i) {
+ dataset_writer->WriteRecordBatch(MakeBatch(kFileSizeLimit * 5), "");
+ }
+ EndWriterChecked(dataset_writer.get());
+ std::vector<ExpectedFile> expected_files;
+ for (int i = 0; i < 100; ++i) {
+ expected_files.emplace_back("testdir/chunk-" + std::to_string(i) +
".arrow",
+ kFileSizeLimit * i, kFileSizeLimit);
+ }
+ // Not checking the number of record batches because file may contain the
+ // zero-length record batch.
+ AssertCreatedData(expected_files, /*check_num_record_batches=*/false);
+}
+
TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteWithFunctor) {
// Left padding with up to four zeros
write_options_.max_rows_per_group = 10;