westonpace commented on code in PR #12228:
URL: https://github.com/apache/arrow/pull/12228#discussion_r855743738
##########
cpp/src/arrow/compute/exec/sink_node.cc:
##########
@@ -46,31 +46,81 @@ using internal::checked_cast;
namespace compute {
namespace {
+class BackpressureResevoir : public BackpressureMonitor {
+ public:
+ BackpressureResevoir(uint64_t resume_if_below, uint64_t pause_if_above)
+ : bytes_used_(0),
+ state_change_counter_(0),
+ resume_if_below_(resume_if_below),
+ pause_if_above_(pause_if_above) {}
+
+ uint64_t bytes_in_use() const override { return bytes_used_; }
+ bool is_paused() const override { return state_change_counter_ % 2 == 1; }
+ bool enabled() const { return pause_if_above_ > 0; }
+
+ int32_t RecordProduced(uint64_t num_bytes) {
Review Comment:
Which return value are you referring to? The return value from
`RecordProduced` is important as its the signal we send through the plan:
```
auto state_change = backpressure_queue_.RecordProduced(bytes_used);
if (state_change >= 0) {
EVENT(span_, "Backpressure applied", {{"backpressure.counter",
state_change}});
inputs_[0]->PauseProducing(this, state_change);
}
```
...and then later in the source node it shows up as `counter`...
```
void PauseProducing(ExecNode* output, int32_t counter) override {
std::lock_guard<std::mutex> lg(mutex_);
if (counter <= backpressure_counter_) {
return;
}
```
##########
cpp/src/arrow/compute/exec/sink_node.cc:
##########
@@ -46,31 +46,81 @@ using internal::checked_cast;
namespace compute {
namespace {
+class BackpressureResevoir : public BackpressureMonitor {
+ public:
+ BackpressureResevoir(uint64_t resume_if_below, uint64_t pause_if_above)
+ : bytes_used_(0),
+ state_change_counter_(0),
+ resume_if_below_(resume_if_below),
+ pause_if_above_(pause_if_above) {}
+
+ uint64_t bytes_in_use() const override { return bytes_used_; }
+ bool is_paused() const override { return state_change_counter_ % 2 == 1; }
+ bool enabled() const { return pause_if_above_ > 0; }
+
+ int32_t RecordProduced(uint64_t num_bytes) {
Review Comment:
Which return value are you referring to? The return value from
`RecordProduced` is important as it's the signal we send through the plan:
```
auto state_change = backpressure_queue_.RecordProduced(bytes_used);
if (state_change >= 0) {
EVENT(span_, "Backpressure applied", {{"backpressure.counter",
state_change}});
inputs_[0]->PauseProducing(this, state_change);
}
```
...and then later in the source node it shows up as `counter`...
```
void PauseProducing(ExecNode* output, int32_t counter) override {
std::lock_guard<std::mutex> lg(mutex_);
if (counter <= backpressure_counter_) {
return;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]