This is an automated email from the ASF dual-hosted git repository.
zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 596a60fa00 GH-47124: [C++][Dataset] Fix DatasetWriter deadlock on
concurrent WriteRecordBatch (#47129)
596a60fa00 is described below
commit 596a60fa00d9d88bbe562602543474e8faf9ea5a
Author: gitmodimo <[email protected]>
AuthorDate: Wed Jul 30 16:40:36 2025 +0200
GH-47124: [C++][Dataset] Fix DatasetWriter deadlock on concurrent
WriteRecordBatch (#47129)
### Rationale for this change
Throttle is accessed twice - once in Acquire and again using future. As a
result current_value_ may not be increased due to throttle being applied and
shorty after the returned future may become finished. That leads to issue
described in #47124
https://github.com/apache/arrow/blob/c8fe26898ce49c58514f511be58afddce176826b/cpp/src/arrow/dataset/dataset_writer.cc#L682-L684
### What changes are included in this PR?
Change throttle API to return optional (akin to
[ThrottledAsyncTaskScheduler
::Throttle](https://github.com/gitmodimo/arrow/blob/3ebe7ee1828793d0a619bcd773eb4d990ccb6b3c/cpp/src/arrow/util/async_util.h#L243))
and prevent race.
### Are these changes tested?
Yes
### Are there any user-facing changes?
No
* GitHub Issue: #47124
Lead-authored-by: RafaĆ Hibner <[email protected]>
Co-authored-by: gitmodimo <[email protected]>
Co-authored-by: Rossi Sun <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
---
cpp/src/arrow/dataset/dataset_writer.cc | 18 +++++++------
cpp/src/arrow/dataset/dataset_writer_test.cc | 38 +++++++++++++++++++++++++++-
2 files changed, 47 insertions(+), 9 deletions(-)
diff --git a/cpp/src/arrow/dataset/dataset_writer.cc
b/cpp/src/arrow/dataset/dataset_writer.cc
index a7807e30a6..f5104efb70 100644
--- a/cpp/src/arrow/dataset/dataset_writer.cc
+++ b/cpp/src/arrow/dataset/dataset_writer.cc
@@ -53,7 +53,7 @@ class Throttle {
bool Unthrottled() const { return max_value_ <= 0; }
- Future<> Acquire(uint64_t values) {
+ std::optional<Future<>> Acquire(uint64_t values) {
if (Unthrottled()) {
return Future<>::MakeFinished();
}
@@ -61,10 +61,11 @@ class Throttle {
if (current_value_ >= max_value_) {
in_waiting_ = values;
backpressure_ = Future<>::Make();
- } else {
- current_value_ += values;
+ return backpressure_;
}
- return backpressure_;
+ current_value_ += values;
+ DCHECK(backpressure_.is_finished());
+ return std::nullopt;
}
void Release(uint64_t values) {
@@ -662,7 +663,7 @@ class DatasetWriter::DatasetWriterImpl {
directory, prefix);
}));
std::shared_ptr<DatasetWriterDirectoryQueue> dir_queue =
dir_queue_itr->second;
- Future<> backpressure;
+ std::optional<Future<>> backpressure;
while (batch) {
// Keep opening new files until batch is done.
std::shared_ptr<RecordBatch> remainder;
@@ -681,13 +682,13 @@ class DatasetWriter::DatasetWriterImpl {
}
backpressure =
writer_state_->rows_in_flight_throttle.Acquire(next_chunk->num_rows());
- if (!backpressure.is_finished()) {
+ if (backpressure) {
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyRowsQueued");
break;
}
if (will_open_file) {
backpressure = writer_state_->open_files_throttle.Acquire(1);
- if (!backpressure.is_finished()) {
+ if (backpressure) {
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
writer_state_->rows_in_flight_throttle.Release(next_chunk->num_rows());
RETURN_NOT_OK(TryCloseLargestFile());
@@ -711,7 +712,8 @@ class DatasetWriter::DatasetWriterImpl {
}
if (batch) {
- return backpressure.Then([this, batch, directory, prefix] {
+ DCHECK(backpressure);
+ return backpressure->Then([this, batch, directory, prefix] {
return DoWriteRecordBatch(batch, directory, prefix);
});
}
diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc
b/cpp/src/arrow/dataset/dataset_writer_test.cc
index cbc3d25fbb..2f34c21aec 100644
--- a/cpp/src/arrow/dataset/dataset_writer_test.cc
+++ b/cpp/src/arrow/dataset/dataset_writer_test.cc
@@ -20,6 +20,7 @@
#include <chrono>
#include <mutex>
#include <optional>
+#include <thread>
#include <vector>
#include "arrow/array/builder_primitive.h"
@@ -232,7 +233,7 @@ class DatasetWriterTestFixture : public testing::Test {
util::AsyncTaskScheduler* scheduler_;
Future<> scheduler_finished_;
FileSystemDatasetWriteOptions write_options_;
- bool paused_{false};
+ std::atomic_bool paused_{false};
uint64_t counter_ = 0;
};
@@ -275,6 +276,41 @@ TEST_F(DatasetWriterTestFixture,
BatchGreaterThanMaxRowsQueued) {
ASSERT_EQ(paused_, false);
}
+TEST_F(DatasetWriterTestFixture, BatchWriteConcurrent) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
+ auto dataset_writer = MakeDatasetWriter(/*max_rows=*/5);
+
+ for (int threads = 1; threads < 5; threads++) {
+ for (int iter = 2; iter <= 256; iter *= 2) {
+ for (int batch = 2; batch <= 64; batch *= 2) {
+ std::vector<std::thread> workers;
+ for (int i = 0; i < threads; ++i) {
+ workers.push_back(std::thread([&, i = i]() {
+ for (int j = 0; j < iter; ++j) {
+ while (paused_) {
+ SleepABit();
+ }
+ dataset_writer->WriteRecordBatch(MakeBatch(0, batch + i + 10 *
j), "");
+ }
+ }));
+ }
+ for (std::thread& t : workers) {
+ if (t.joinable()) {
+ t.join();
+ }
+ while (paused_) {
+ SleepABit();
+ }
+ }
+ }
+ }
+ }
+ EndWriterChecked(dataset_writer.get());
+ ASSERT_EQ(paused_, false);
+}
+
TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
write_options_.max_rows_per_file = 10;
write_options_.max_rows_per_group = 10;