This is an automated email from the ASF dual-hosted git repository.
zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new cf10b4066e GH-44526: [C++][Acero] Fix crash when thread in asof_join
is not running (#44584)
cf10b4066e is described below
commit cf10b4066e3c5a9c0bee103d351203a87beb61aa
Author: mroz45 <[email protected]>
AuthorDate: Mon Nov 11 17:07:23 2024 +0100
GH-44526: [C++][Acero] Fix crash when thread in asof_join is not running
(#44584)
<!--
Thanks for opening a pull request!
If this is your first pull request you can find detailed information on
how
to contribute here:
* [New Contributor's
Guide](https://arrow.apache.org/docs/dev/developers/guide/step_by_step/pr_lifecycle.html#reviews-and-merge-of-the-pull-request)
* [Contributing
Overview](https://arrow.apache.org/docs/dev/developers/overview.html)
If this is not a [minor
PR](https://github.com/apache/arrow/blob/main/CONTRIBUTING.md#Minor-Fixes).
Could you open an issue for this pull request on GitHub?
https://github.com/apache/arrow/issues/new/choose
Opening GitHub issues ahead of time contributes to the
[Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.)
of the Apache Arrow project.
Then could you also rename the pull request title in the following
format?
GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
or
MINOR: [${COMPONENT}] ${SUMMARY}
In the case of PARQUET issues on JIRA the title also supports:
PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
-->
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
### What changes are included in this PR?
I add checking joinable before join process_thread. It will prevent
exeptions in case when plan is invalid and asof_join never_starts.
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please uncomment the
line below and explain which changes are breaking.
-->
<!-- **This PR includes breaking changes to public APIs.** -->
<!--
Please uncomment the line below (and provide explanation) if the changes
fix either (a) a security vulnerability, (b) a bug that caused incorrect
or invalid data to be produced, or (c) a bug that causes a crash (even
when the API contract is upheld). We use this to highlight fixes to
issues that may affect users without their knowledge. For this reason,
fixing bugs that cause errors don't count, since those are usually
obvious.
-->
<!-- **This PR contains a "Critical Fix".** -->
* GitHub Issue: #44526
---------
Co-authored-by: Sutou Kouhei <[email protected]>
Co-authored-by: Rossi Sun <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/acero/asof_join_node.cc | 4 +++-
cpp/src/arrow/acero/asof_join_node_test.cc | 31 ++++++++++++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git a/cpp/src/arrow/acero/asof_join_node.cc
b/cpp/src/arrow/acero/asof_join_node.cc
index a5a80c8805..92e404b207 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -1149,7 +1149,9 @@ class AsofJoinNode : public ExecNode {
virtual ~AsofJoinNode() {
#ifdef ARROW_ENABLE_THREADING
PushProcess(false);
- process_thread_.join();
+ if (process_thread_.joinable()) {
+ process_thread_.join();
+ }
#endif
}
diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc
b/cpp/src/arrow/acero/asof_join_node_test.cc
index 2983888183..64d41ccb1a 100644
--- a/cpp/src/arrow/acero/asof_join_node_test.cc
+++ b/cpp/src/arrow/acero/asof_join_node_test.cc
@@ -1736,5 +1736,36 @@ TEST(AsofJoinTest, RhsEmptinessRaceEmptyBy) {
AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch},
result.batches);
}
+// Reproduction of GH-44526: Provoke destruction of not started asofjoin node
by providing
+// a sink that fails on creation
+TEST(AsofJoinTest, DestroyNonStartedAsofJoinNode) {
+ auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])");
+ auto right_batch =
+ ExecBatchFromJSON({utf8(), int64()}, R"([["Z", 2], ["B", 3], ["A",
4]])");
+
+ Declaration left{"exec_batch_source",
+ ExecBatchSourceNodeOptions(schema({field("on", int64())}),
+ {std::move(left_batch)})};
+ Declaration right{
+ "exec_batch_source",
+ ExecBatchSourceNodeOptions(schema({field("colVals", utf8()), field("on",
int64())}),
+ {std::move(right_batch)})};
+ AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1);
+ Declaration asof_join{
+ "asofjoin", {std::move(left), std::move(right)},
std::move(asof_join_opts)};
+
+ // Setting invalid arguments, such as nullptr in generator or schema in
SinkNodeOptions,
+ // causes the execution plan to terminate before the asofjoin node is
started.
+ arrow::acero::SinkNodeOptions sink_node_options{/*generator=*/nullptr,
+ /*schema=*/nullptr};
+ auto sink = Declaration::Sequence({asof_join, {"sink", sink_node_options}});
+
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ Invalid,
+ ::testing::HasSubstr(
+ "`generator` is a required SinkNode option and cannot be null"),
+ DeclarationToStatus(std::move(sink)));
+}
+
} // namespace acero
} // namespace arrow