This is an automated email from the ASF dual-hosted git repository. kou pushed a commit to branch maint-10.0.x in repository https://gitbox.apache.org/repos/asf/arrow.git
commit e79870d46817fc6d52aad690570e2e457b8d9b92 Author: rtpsw <[email protected]> AuthorDate: Mon Nov 14 18:40:05 2022 +0200 ARROW-18310: [C++] Use atomic backpressure counter (#14622) See https://issues.apache.org/jira/browse/ARROW-18310 Authored-by: Yaron Gvili <[email protected]> Signed-off-by: Weston Pace <[email protected]> --- cpp/src/arrow/compute/exec/sink_node.cc | 3 ++- cpp/src/arrow/compute/exec/source_node.cc | 3 ++- cpp/src/arrow/dataset/file_base.cc | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 96a34bff43..1f518ef75d 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -16,6 +16,7 @@ // specific language governing permissions and limitations // under the License. +#include <atomic> #include <mutex> #include <optional> @@ -386,7 +387,7 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl { AtomicCounter input_counter_; std::shared_ptr<SinkNodeConsumer> consumer_; std::vector<std::string> names_; - int32_t backpressure_counter_ = 0; + std::atomic<int32_t> backpressure_counter_ = 0; }; static Result<ExecNode*> MakeTableConsumingSinkNode( compute::ExecPlan* plan, std::vector<compute::ExecNode*> inputs, diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 1d51a5c1d2..e0534b1b39 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include <atomic> #include <mutex> #include <optional> @@ -216,7 +217,7 @@ struct SourceNode : ExecNode { private: std::mutex mutex_; - int32_t backpressure_counter_{0}; + std::atomic<int32_t> backpressure_counter_{0}; Future<> backpressure_future_ = Future<>::MakeFinished(); bool stop_requested_{false}; bool started_ = false; diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index bd19c99a52..10b9e82d5c 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -20,6 +20,7 @@ #include <arrow/compute/exec/exec_plan.h> #include <algorithm> +#include <atomic> #include <memory> #include <unordered_map> #include <variant> @@ -582,7 +583,7 @@ class TeeNode : public compute::MapNode { // only returns an unfinished future when it needs backpressure. Using a serial // scheduler here ensures we pause while we wait for backpressure to clear std::shared_ptr<util::AsyncTaskScheduler> serial_scheduler_; - int32_t backpressure_counter_ = 0; + std::atomic<int32_t> backpressure_counter_ = 0; }; } // namespace
