This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new e3d6b9beac GH-37796: [C++][Acero] Fix race condition caused by
straggling input in the as-of-join node (#37839)
e3d6b9beac is described below
commit e3d6b9beac6924fd6914940eb74b248a5167f05b
Author: Jeremy Aguilon <[email protected]>
AuthorDate: Tue Oct 24 09:22:01 2023 -0400
GH-37796: [C++][Acero] Fix race condition caused by straggling input in the
as-of-join node (#37839)
### Rationale for this change
### What changes are included in this PR?
While asofjoining some large parquet datasets with many row groups, I ran
into a deadlock that I described here:
https://github.com/apache/arrow/issues/37796. Copy pasting below for
convenience:
1. The left hand side of the asofjoin completes and is matched with the
right hand tables, so `InputFinished` proceeds as
[expected](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1323).
So far so good
2. The right hand table(s) of the join are a huge dataset scan. They're
still streaming and can legally still call `AsofJoinNode::InputReceived` all
they want ([doc
ref](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero8ExecNode13InputReceivedEP8ExecNode9ExecBatch))
3. Each input batch is blindly pushed to the `InputState`s, which in turn
defer to `BackpressureHandler`s to decide whether to pause inputs. ([code
pointer](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1689))
4. If enough batches come in right after `EndFromProcessThread` is called,
then we might exceed the
[high_threshold](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L575)
and tell the input node to pause via the
[BackpressureController](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L540)
5. At this point, the process thread has stopped for the asofjoiner, so the
right hand table(s) won't be dequeue'd, meaning
`BackpressureController::Resume()` will never be called. This causes a
[deadlock](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero19BackpressureControl5PauseEv)
TLDR this is caused by a straggling input node being paused due to
backpressure _after_ the process thread has ended. And since every `PauseInput`
needs a corresponding `ResumeInput` to exit gracefully, we deadlock.
Turns out this is fairly easy to reproduce with small tables, if you make a
slow input node composed of 1-row record batches with a synthetic delay.
My solution is to:
1. Create a `ForceShutdown` hook that puts the input nodes in a resumed
state, and for good measure we call `StopProducing`
2. Also for good measure, if nodes come after the process thread exits, we
short circuit and return OK. This is because `InputReceived` can be called an
arbitrary number of times after `StopProducing`, so it makes sense to not
enqueue useless batches.
### Are these changes tested?
Yes, I added a delay to the batches of one of the already-existing asofjoin
backpressure tests. Checkout out `main`, we get a timeout failure. With my
changes, it passes.
I considered a more deterministic test, but I struggled to create callbacks
in a way that wasn't invasive to the Asof implementation. The idea of using
delays was inspired by things I saw in `source_node_test.cc`
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
3. Serve as another way to document the expected behavior of the code
### Are there any user-facing changes?
No
* Closes: #37796
Lead-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
---
cpp/src/arrow/acero/asof_join_node.cc | 45 ++++++++++++++++++++----
cpp/src/arrow/acero/asof_join_node_test.cc | 56 +++++++++++++++++++-----------
2 files changed, 74 insertions(+), 27 deletions(-)
diff --git a/cpp/src/arrow/acero/asof_join_node.cc
b/cpp/src/arrow/acero/asof_join_node.cc
index 23c07b8acb..d19d2db299 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -549,15 +549,16 @@ class BackpressureController : public BackpressureControl
{
class BackpressureHandler {
private:
- BackpressureHandler(size_t low_threshold, size_t high_threshold,
+ BackpressureHandler(ExecNode* input, size_t low_threshold, size_t
high_threshold,
std::unique_ptr<BackpressureControl>
backpressure_control)
- : low_threshold_(low_threshold),
+ : input_(input),
+ low_threshold_(low_threshold),
high_threshold_(high_threshold),
backpressure_control_(std::move(backpressure_control)) {}
public:
static Result<BackpressureHandler> Make(
- size_t low_threshold, size_t high_threshold,
+ ExecNode* input, size_t low_threshold, size_t high_threshold,
std::unique_ptr<BackpressureControl> backpressure_control) {
if (low_threshold >= high_threshold) {
return Status::Invalid("low threshold (", low_threshold,
@@ -566,7 +567,7 @@ class BackpressureHandler {
if (backpressure_control == NULLPTR) {
return Status::Invalid("null backpressure control parameter");
}
- BackpressureHandler backpressure_handler(low_threshold, high_threshold,
+ BackpressureHandler backpressure_handler(input, low_threshold,
high_threshold,
std::move(backpressure_control));
return std::move(backpressure_handler);
}
@@ -579,7 +580,16 @@ class BackpressureHandler {
}
}
+ Status ForceShutdown() {
+ // It may be unintuitive to call Resume() here, but this is to avoid a
deadlock.
+ // Since acero's executor won't terminate if any one node is paused, we
need to
+ // force resume the node before stopping production.
+ backpressure_control_->Resume();
+ return input_->StopProducing();
+ }
+
private:
+ ExecNode* input_;
size_t low_threshold_;
size_t high_threshold_;
std::unique_ptr<BackpressureControl> backpressure_control_;
@@ -629,6 +639,8 @@ class BackpressureConcurrentQueue : public
ConcurrentQueue<T> {
return ConcurrentQueue<T>::TryPopUnlocked();
}
+ Status ForceShutdown() { return handler_.ForceShutdown(); }
+
private:
BackpressureHandler handler_;
};
@@ -672,9 +684,9 @@ class InputState {
std::unique_ptr<BackpressureControl> backpressure_control =
std::make_unique<BackpressureController>(
/*node=*/asof_input, /*output=*/asof_node, backpressure_counter);
- ARROW_ASSIGN_OR_RAISE(auto handler,
- BackpressureHandler::Make(low_threshold,
high_threshold,
-
std::move(backpressure_control)));
+ ARROW_ASSIGN_OR_RAISE(
+ auto handler, BackpressureHandler::Make(asof_input, low_threshold,
high_threshold,
+
std::move(backpressure_control)));
return std::make_unique<InputState>(index, tolerance, must_hash,
may_rehash,
key_hasher, asof_node,
std::move(handler), schema,
time_col_index, key_col_index);
@@ -930,6 +942,12 @@ class InputState {
total_batches_ = n;
}
+ Status ForceShutdown() {
+ // Force the upstream input node to unpause. Necessary to avoid deadlock
when we
+ // terminate the process thread
+ return queue_.ForceShutdown();
+ }
+
private:
// Pending record batches. The latest is the front. Batches cannot be empty.
BackpressureConcurrentQueue<std::shared_ptr<RecordBatch>> queue_;
@@ -1323,6 +1341,9 @@ class AsofJoinNode : public ExecNode {
if (st.ok()) {
st = output_->InputFinished(this, batches_produced_);
}
+ for (const auto& s : state_) {
+ st &= s->ForceShutdown();
+ }
}));
}
@@ -1679,6 +1700,15 @@ class AsofJoinNode : public ExecNode {
const Ordering& ordering() const override { return ordering_; }
Status InputReceived(ExecNode* input, ExecBatch batch) override {
+ // InputReceived may be called after execution was finished. Pushing it to
the
+ // InputState is unnecessary since we're done (and anyway may cause the
+ // BackPressureController to pause the input, causing a deadlock), so drop
it.
+ if (process_task_.is_finished()) {
+ DEBUG_SYNC(this, "Input received while done. Short circuiting.",
+ DEBUG_MANIP(std::endl));
+ return Status::OK();
+ }
+
// Get the input
ARROW_DCHECK(std_has(inputs_, input));
size_t k = std_find(inputs_, input) - inputs_.begin();
@@ -1687,6 +1717,7 @@ class AsofJoinNode : public ExecNode {
auto rb = *batch.ToRecordBatch(input->output_schema());
DEBUG_SYNC(this, "received batch from input ", k, ":",
DEBUG_MANIP(std::endl),
rb->ToString(), DEBUG_MANIP(std::endl));
+
ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb));
process_.Push(true);
return Status::OK();
diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc
b/cpp/src/arrow/acero/asof_join_node_test.cc
index 96c00e6a4b..df3172b2a0 100644
--- a/cpp/src/arrow/acero/asof_join_node_test.cc
+++ b/cpp/src/arrow/acero/asof_join_node_test.cc
@@ -1424,7 +1424,8 @@ AsyncGenerator<std::optional<ExecBatch>>
GetGen(BatchesWithSchema bws) {
}
template <typename BatchesMaker>
-void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) {
+void TestBackpressure(BatchesMaker maker, int batch_size, int num_l_batches,
+ int num_r0_batches, int num_r1_batches, bool slow_r0) {
auto l_schema =
schema({field("time", int32()), field("key", int32()), field("l_value",
int32())});
auto r0_schema =
@@ -1432,16 +1433,17 @@ void TestBackpressure(BatchesMaker maker, int
num_batches, int batch_size) {
auto r1_schema =
schema({field("time", int32()), field("key", int32()), field("r1_value",
int32())});
- auto make_shift = [&maker, num_batches, batch_size](
- const std::shared_ptr<Schema>& schema, int shift) {
+ auto make_shift = [&maker, batch_size](int num_batches,
+ const std::shared_ptr<Schema>& schema,
+ int shift) {
return maker({[](int row) -> int64_t { return row; },
[num_batches](int row) -> int64_t { return row /
num_batches; },
[shift](int row) -> int64_t { return row * 10 + shift; }},
schema, num_batches, batch_size);
};
- ASSERT_OK_AND_ASSIGN(auto l_batches, make_shift(l_schema, 0));
- ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
- ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
+ ASSERT_OK_AND_ASSIGN(auto l_batches, make_shift(num_l_batches, l_schema, 0));
+ ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(num_r0_batches, r0_schema,
1));
+ ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(num_r1_batches, r1_schema,
2));
BackpressureCountingNode::Register();
RegisterTestNodes(); // for GatedNode
@@ -1449,6 +1451,7 @@ void TestBackpressure(BatchesMaker maker, int
num_batches, int batch_size) {
struct BackpressureSourceConfig {
std::string name_prefix;
bool is_gated;
+ bool is_delayed;
std::shared_ptr<Schema> schema;
decltype(l_batches) batches;
@@ -1463,9 +1466,9 @@ void TestBackpressure(BatchesMaker maker, int
num_batches, int batch_size) {
// Two ungated and one gated
std::vector<BackpressureSourceConfig> source_configs = {
- {"0", false, l_schema, l_batches},
- {"1", true, r0_schema, r0_batches},
- {"2", false, r1_schema, r1_batches},
+ {"0", false, false, l_schema, l_batches},
+ {"1", true, slow_r0, r0_schema, r0_batches},
+ {"2", false, false, r1_schema, r1_batches},
};
std::vector<BackpressureCounters> bp_counters(source_configs.size());
@@ -1474,9 +1477,16 @@ void TestBackpressure(BatchesMaker maker, int
num_batches, int batch_size) {
std::vector<Declaration::Input> bp_decls;
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& config = source_configs[i];
-
- src_decls.emplace_back("source",
- SourceNodeOptions(config.schema,
GetGen(config.batches)));
+ if (config.is_delayed) {
+ src_decls.emplace_back(
+ "source",
+ SourceNodeOptions(config.schema, MakeDelayedGen(config.batches,
"slow_source",
+ /*delay_sec=*/0.5,
+ /*noisy=*/false)));
+ } else {
+ src_decls.emplace_back("source",
+ SourceNodeOptions(config.schema,
GetGen(config.batches)));
+ }
bp_options.push_back(
std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
std::shared_ptr<ExecNodeOptions> options = bp_options.back();
@@ -1486,11 +1496,12 @@ void TestBackpressure(BatchesMaker maker, int
num_batches, int batch_size) {
if (config.is_gated) {
bp_decl = {std::string{GatedNodeOptions::kName}, {bp_decl},
gate_options};
}
- bp_decls.push_back(bp_decl);
+ bp_decls.emplace_back(bp_decl);
}
- Declaration asofjoin = {"asofjoin", bp_decls,
- GetRepeatedOptions(source_configs.size(), "time",
{"key"}, 0)};
+ auto opts = GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0);
+
+ Declaration asofjoin = {"asofjoin", bp_decls, opts};
ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
internal::ThreadPool::Make(1));
@@ -1512,14 +1523,14 @@ void TestBackpressure(BatchesMaker maker, int
num_batches, int batch_size) {
return true;
};
- BusyWait(10.0, has_bp_been_applied);
+ BusyWait(60.0, has_bp_been_applied);
ASSERT_TRUE(has_bp_been_applied());
gate.ReleaseAllBatches();
ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut);
- // One of the inputs is gated. The other two will eventually be resumed by
the asof
- // join node
+ // One of the inputs is gated and was released. The other two will
eventually be resumed
+ // by the asof join node
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& counters = bp_counters[i];
if (!source_configs[i].is_gated) {
@@ -1529,7 +1540,9 @@ void TestBackpressure(BatchesMaker maker, int
num_batches, int batch_size) {
}
TEST(AsofJoinTest, BackpressureWithBatches) {
- return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20,
/*batch_size=*/1);
+ // Give the first right hand table a delay to stress test race conditions
+ return TestBackpressure(MakeIntegerBatches, /*batch_size=*/1,
/*num_l_batches=*/20,
+ /*num_r0_batches=*/50, /*num_r1_batches=*/20,
/*slow_r0=*/true);
}
template <typename BatchesMaker>
@@ -1595,7 +1608,10 @@ TEST(AsofJoinTest, BackpressureWithBatchesGen) {
GTEST_SKIP() << "Skipping - see GH-36331";
int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20);
int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1);
- return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size);
+ return TestBackpressure(MakeIntegerBatchGenForTest,
/*batch_size=*/batch_size,
+ /*num_l_batches=*/num_batches,
+ /*num_r0_batches=*/num_batches,
/*num_r1_batches=*/num_batches,
+ /*slow_r0=*/false);
}
} // namespace acero