kou commented on PR #41125:
URL: https://github.com/apache/arrow/pull/41125#issuecomment-2067800016

   Could you use our PR template instead of removing it entirely next time?
   
   Can we avoid using `process_`/`process_thread_`/`process_task_` entirely 
without threading? Something like:
   
   ```diff
   
   diff --git a/cpp/src/arrow/acero/asof_join_node.cc 
b/cpp/src/arrow/acero/asof_join_node.cc
   index 48cc83dd3d..649ea32bc7 100644
   --- a/cpp/src/arrow/acero/asof_join_node.cc
   +++ b/cpp/src/arrow/acero/asof_join_node.cc
   @@ -948,7 +948,7 @@ class AsofJoinNode : public ExecNode {
        return true;
      }
    
   -  Result<std::shared_ptr<RecordBatch>> ProcessInner() {
   +  Status ProcessInner() {
        DCHECK(!state_.empty());
        auto& lhs = *state_.at(0);
    
   @@ -992,10 +992,18 @@ class AsofJoinNode : public ExecNode {
    
        // Emit the batch
        if (dst.empty()) {
   -      return NULLPTR;
   +      return Status::OK();
        } else {
          ARROW_ASSIGN_OR_RAISE(auto out, dst.Materialize());
   -      return out.has_value() ? out.value() : NULLPTR;
   +      if (!out.has_value()) {
   +        return Status::OK();
   +      }
   +      auto out_rb = out.value();
   +      ExecBatch out_b(*out_rb);
   +      out_b.index = batches_produced_++;
   +      DEBUG_SYNC(this, "produce batch ", out_b.index, ":", 
DEBUG_MANIP(std::endl),
   +                 out_rb->ToString(), DEBUG_MANIP(std::endl));
   +      return output_->InputReceived(this, std::move(out_b));
        }
      }
    
   @@ -1006,6 +1014,7 @@ class AsofJoinNode : public ExecNode {
        ~Defer() noexcept { callable(); }
      };
    
   +#ifdef ARROW_ENABLE_THREADING
      void EndFromProcessThread(Status st = Status::OK()) {
        // We must spawn a new task to transfer off the process thread when
        // marking this finished.  Otherwise there is a chance that doing so 
could
   @@ -1039,21 +1048,10 @@ class AsofJoinNode : public ExecNode {
    
        // Process batches while we have data
        for (;;) {
   -      Result<std::shared_ptr<RecordBatch>> result = ProcessInner();
   -
   -      if (result.ok()) {
   -        auto out_rb = *result;
   -        if (!out_rb) break;
   -        ExecBatch out_b(*out_rb);
   -        out_b.index = batches_produced_++;
   -        DEBUG_SYNC(this, "produce batch ", out_b.index, ":", 
DEBUG_MANIP(std::endl),
   -                   out_rb->ToString(), DEBUG_MANIP(std::endl));
   -        Status st = output_->InputReceived(this, std::move(out_b));
   -        if (!st.ok()) {
   -          EndFromProcessThread(std::move(st));
   -        }
   -      } else {
   -        EndFromProcessThread(result.status());
   +      Status st = ProcessInner();
   +      const auto ok = st.ok();
   +      EndFromProcessThread(std::move(st));
   +      if (!ok) {
            return false;
          }
        }
   @@ -1085,6 +1083,7 @@ class AsofJoinNode : public ExecNode {
      }
    
      static void ProcessThreadWrapper(AsofJoinNode* node) { 
node->ProcessThread(); }
   +#endif
    
     public:
      AsofJoinNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> 
input_labels,
   @@ -1116,8 +1115,10 @@ class AsofJoinNode : public ExecNode {
      }
    
      virtual ~AsofJoinNode() {
   +#ifdef ARROW_ENABLE_THREADING
        process_.Push(false);  // poison pill
        process_thread_.join();
   +#endif
      }
    
      const std::vector<col_index_t>& indices_of_on_key() { return 
indices_of_on_key_; }
   @@ -1375,7 +1376,18 @@ class AsofJoinNode : public ExecNode {
      const char* kind_name() const override { return "AsofJoinNode"; }
      const Ordering& ordering() const override { return ordering_; }
    
   +  Status PushProcess() {
   +#ifdef ARROW_ENABLE_THREADING
   +    process_.Push(true);
   +#else
   +    ARROW_RETURN_NOT_OK(ProcessInner());
   +    ARROW_RETURN_NOT_OK(output_->InputFinished(this, batches_produced_));
   +#endif
   +    return Status::OK();
   +  }
   +
      Status InputReceived(ExecNode* input, ExecBatch batch) override {
   +#ifdef ARROW_ENABLE_THREADING
        // InputReceived may be called after execution was finished. Pushing it 
to the
        // InputState is unnecessary since we're done (and anyway may cause the
        // BackPressureController to pause the input, causing a deadlock), so 
drop it.
   @@ -1384,6 +1396,7 @@ class AsofJoinNode : public ExecNode {
                     DEBUG_MANIP(std::endl));
          return Status::OK();
        }
   +#endif
    
        // Get the input
        ARROW_DCHECK(std_has(inputs_, input));
   @@ -1395,7 +1408,7 @@ class AsofJoinNode : public ExecNode {
                   rb->ToString(), DEBUG_MANIP(std::endl));
    
        ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb));
   -    process_.Push(true);
   +    ARROW_RETURN_NOT_OK(PushProcess());
        return Status::OK();
      }
    
   @@ -1410,15 +1423,12 @@ class AsofJoinNode : public ExecNode {
        // The reason for this is that there are cases at the end of a table 
where we don't
        // know whether the RHS of the join is up-to-date until we know that 
the table is
        // finished.
   -    process_.Push(true);
   +    ARROW_RETURN_NOT_OK(PushProcess());
        return Status::OK();
      }
    
      Status StartProducing() override {
   -#ifndef ARROW_ENABLE_THREADING
   -    return Status::NotImplemented("ASOF join requires threading enabled");
   -#endif
   -
   +#ifdef ARROW_ENABLE_THREADING
        ARROW_ASSIGN_OR_RAISE(process_task_, 
plan_->query_context()->BeginExternalTask(
                                                 
"AsofJoinNode::ProcessThread"));
        if (!process_task_.is_valid()) {
   @@ -1426,6 +1436,7 @@ class AsofJoinNode : public ExecNode {
          return Status::OK();
        }
        process_thread_ = std::thread(&AsofJoinNode::ProcessThreadWrapper, 
this);
   +#endif
        return Status::OK();
      }
    
   @@ -1433,8 +1444,10 @@ class AsofJoinNode : public ExecNode {
      void ResumeProducing(ExecNode* output, int32_t counter) override {}
    
      Status StopProducingImpl() override {
   +#ifdef ARROW_ENABLE_THREADING
        process_.Clear();
        process_.Push(false);
   +#endif
        return Status::OK();
      }
    
   @@ -1464,12 +1477,14 @@ class AsofJoinNode : public ExecNode {
    
      // Backpressure counter common to all inputs
      std::atomic<int32_t> backpressure_counter_;
   +#ifdef ARROW_ENABLE_THREADING
      // Queue for triggering processing of a given input
      // (a false value is a poison pill)
      ConcurrentQueue<bool> process_;
      // Worker thread
      std::thread process_thread_;
      Future<> process_task_;
   +#endif
    
      // In-progress batches produced
      int batches_produced_ = 0;
   @@ -1496,9 +1511,13 @@ AsofJoinNode::AsofJoinNode(ExecPlan* plan, NodeVector 
inputs,
          debug_os_(join_options.debug_opts ? join_options.debug_opts->os : 
nullptr),
          debug_mutex_(join_options.debug_opts ? join_options.debug_opts->mutex 
: nullptr),
    #endif
   -      backpressure_counter_(1),
   +      backpressure_counter_(1)
   +#ifdef ARROW_ENABLE_THREADING
   +      ,
          process_(),
   -      process_thread_() {
   +      process_thread_()
   +#endif
   +      {
      for (auto& key_hasher : key_hashers_) {
        key_hasher->node_ = this;
      }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to