kou commented on PR #41125:
URL: https://github.com/apache/arrow/pull/41125#issuecomment-2067800016
Could you use our PR template instead of removing it entirely next time?
Can we avoid using `process_`/`process_thread_`/`process_task_` entirely
without threading? Something like:
```diff
diff --git a/cpp/src/arrow/acero/asof_join_node.cc
b/cpp/src/arrow/acero/asof_join_node.cc
index 48cc83dd3d..649ea32bc7 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -948,7 +948,7 @@ class AsofJoinNode : public ExecNode {
return true;
}
- Result<std::shared_ptr<RecordBatch>> ProcessInner() {
+ Status ProcessInner() {
DCHECK(!state_.empty());
auto& lhs = *state_.at(0);
@@ -992,10 +992,18 @@ class AsofJoinNode : public ExecNode {
// Emit the batch
if (dst.empty()) {
- return NULLPTR;
+ return Status::OK();
} else {
ARROW_ASSIGN_OR_RAISE(auto out, dst.Materialize());
- return out.has_value() ? out.value() : NULLPTR;
+ if (!out.has_value()) {
+ return Status::OK();
+ }
+ auto out_rb = out.value();
+ ExecBatch out_b(*out_rb);
+ out_b.index = batches_produced_++;
+ DEBUG_SYNC(this, "produce batch ", out_b.index, ":",
DEBUG_MANIP(std::endl),
+ out_rb->ToString(), DEBUG_MANIP(std::endl));
+ return output_->InputReceived(this, std::move(out_b));
}
}
@@ -1006,6 +1014,7 @@ class AsofJoinNode : public ExecNode {
~Defer() noexcept { callable(); }
};
+#ifdef ARROW_ENABLE_THREADING
void EndFromProcessThread(Status st = Status::OK()) {
// We must spawn a new task to transfer off the process thread when
// marking this finished. Otherwise there is a chance that doing so
could
@@ -1039,21 +1048,10 @@ class AsofJoinNode : public ExecNode {
// Process batches while we have data
for (;;) {
- Result<std::shared_ptr<RecordBatch>> result = ProcessInner();
-
- if (result.ok()) {
- auto out_rb = *result;
- if (!out_rb) break;
- ExecBatch out_b(*out_rb);
- out_b.index = batches_produced_++;
- DEBUG_SYNC(this, "produce batch ", out_b.index, ":",
DEBUG_MANIP(std::endl),
- out_rb->ToString(), DEBUG_MANIP(std::endl));
- Status st = output_->InputReceived(this, std::move(out_b));
- if (!st.ok()) {
- EndFromProcessThread(std::move(st));
- }
- } else {
- EndFromProcessThread(result.status());
+ Status st = ProcessInner();
+ const auto ok = st.ok();
+ EndFromProcessThread(std::move(st));
+ if (!ok) {
return false;
}
}
@@ -1085,6 +1083,7 @@ class AsofJoinNode : public ExecNode {
}
static void ProcessThreadWrapper(AsofJoinNode* node) {
node->ProcessThread(); }
+#endif
public:
AsofJoinNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string>
input_labels,
@@ -1116,8 +1115,10 @@ class AsofJoinNode : public ExecNode {
}
virtual ~AsofJoinNode() {
+#ifdef ARROW_ENABLE_THREADING
process_.Push(false); // poison pill
process_thread_.join();
+#endif
}
const std::vector<col_index_t>& indices_of_on_key() { return
indices_of_on_key_; }
@@ -1375,7 +1376,18 @@ class AsofJoinNode : public ExecNode {
const char* kind_name() const override { return "AsofJoinNode"; }
const Ordering& ordering() const override { return ordering_; }
+ Status PushProcess() {
+#ifdef ARROW_ENABLE_THREADING
+ process_.Push(true);
+#else
+ ARROW_RETURN_NOT_OK(ProcessInner());
+ ARROW_RETURN_NOT_OK(output_->InputFinished(this, batches_produced_));
+#endif
+ return Status::OK();
+ }
+
Status InputReceived(ExecNode* input, ExecBatch batch) override {
+#ifdef ARROW_ENABLE_THREADING
// InputReceived may be called after execution was finished. Pushing it
to the
// InputState is unnecessary since we're done (and anyway may cause the
// BackPressureController to pause the input, causing a deadlock), so
drop it.
@@ -1384,6 +1396,7 @@ class AsofJoinNode : public ExecNode {
DEBUG_MANIP(std::endl));
return Status::OK();
}
+#endif
// Get the input
ARROW_DCHECK(std_has(inputs_, input));
@@ -1395,7 +1408,7 @@ class AsofJoinNode : public ExecNode {
rb->ToString(), DEBUG_MANIP(std::endl));
ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb));
- process_.Push(true);
+ ARROW_RETURN_NOT_OK(PushProcess());
return Status::OK();
}
@@ -1410,15 +1423,12 @@ class AsofJoinNode : public ExecNode {
// The reason for this is that there are cases at the end of a table
where we don't
// know whether the RHS of the join is up-to-date until we know that
the table is
// finished.
- process_.Push(true);
+ ARROW_RETURN_NOT_OK(PushProcess());
return Status::OK();
}
Status StartProducing() override {
-#ifndef ARROW_ENABLE_THREADING
- return Status::NotImplemented("ASOF join requires threading enabled");
-#endif
-
+#ifdef ARROW_ENABLE_THREADING
ARROW_ASSIGN_OR_RAISE(process_task_,
plan_->query_context()->BeginExternalTask(
"AsofJoinNode::ProcessThread"));
if (!process_task_.is_valid()) {
@@ -1426,6 +1436,7 @@ class AsofJoinNode : public ExecNode {
return Status::OK();
}
process_thread_ = std::thread(&AsofJoinNode::ProcessThreadWrapper,
this);
+#endif
return Status::OK();
}
@@ -1433,8 +1444,10 @@ class AsofJoinNode : public ExecNode {
void ResumeProducing(ExecNode* output, int32_t counter) override {}
Status StopProducingImpl() override {
+#ifdef ARROW_ENABLE_THREADING
process_.Clear();
process_.Push(false);
+#endif
return Status::OK();
}
@@ -1464,12 +1477,14 @@ class AsofJoinNode : public ExecNode {
// Backpressure counter common to all inputs
std::atomic<int32_t> backpressure_counter_;
+#ifdef ARROW_ENABLE_THREADING
// Queue for triggering processing of a given input
// (a false value is a poison pill)
ConcurrentQueue<bool> process_;
// Worker thread
std::thread process_thread_;
Future<> process_task_;
+#endif
// In-progress batches produced
int batches_produced_ = 0;
@@ -1496,9 +1511,13 @@ AsofJoinNode::AsofJoinNode(ExecPlan* plan, NodeVector
inputs,
debug_os_(join_options.debug_opts ? join_options.debug_opts->os :
nullptr),
debug_mutex_(join_options.debug_opts ? join_options.debug_opts->mutex
: nullptr),
#endif
- backpressure_counter_(1),
+ backpressure_counter_(1)
+#ifdef ARROW_ENABLE_THREADING
+ ,
process_(),
- process_thread_() {
+ process_thread_()
+#endif
+ {
for (auto& key_hasher : key_hashers_) {
key_hasher->node_ = this;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]