westonpace commented on code in PR #12228:
URL: https://github.com/apache/arrow/pull/12228#discussion_r855748314


##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -129,17 +129,86 @@ class ARROW_EXPORT AggregateNodeOptions : public 
ExecNodeOptions {
   std::vector<FieldRef> keys;
 };
 
+constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30;  // 1GiB
+constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28;   // 256MiB
+
+class BackpressureMonitor {
+ public:
+  virtual ~BackpressureMonitor() = default;
+  virtual uint64_t bytes_in_use() const = 0;
+  virtual bool is_paused() const = 0;
+};
+
+/// \brief Options to control backpressure behavior
+struct ARROW_EXPORT BackpressureOptions {
+  /// \brief Create default options that perform no backpressure
+  BackpressureOptions() : resume_if_below(0), pause_if_above(0) {}
+  /// \brief Create options that will perform backpressure
+  ///
+  /// \param resume_if_below The producer should resume producing if the 
backpressure
+  ///                        queue has fewer than resume_if_below items.
+  /// \param pause_if_above The producer should pause producing if the 
backpressure
+  ///                       queue has more than pause_if_above items
+  BackpressureOptions(uint32_t resume_if_below, uint32_t pause_if_above)
+      : resume_if_below(resume_if_below), pause_if_above(pause_if_above) {}
+
+  static BackpressureOptions DefaultBackpressure() {
+    return BackpressureOptions(kDefaultBackpressureLowBytes,
+                               kDefaultBackpressureHighBytes);
+  }
+
+  inline bool should_apply_backpressure() const { return pause_if_above > 0; }
+
+  uint64_t resume_if_below;
+  uint64_t pause_if_above;
+};
+
 /// \brief Add a sink node which forwards to an AsyncGenerator<ExecBatch>
 ///
 /// Emitted batches will not be ordered.
 class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
  public:
-  explicit SinkNodeOptions(std::function<Future<util::optional<ExecBatch>>()>* 
generator,
-                           util::BackpressureOptions backpressure = {})
-      : generator(generator), backpressure(std::move(backpressure)) {}
-
+  explicit SinkNodeOptions(
+      std::function<Future<util::optional<ExecBatch>>()>* generator,
+      BackpressureOptions backpressure = {},
+      std::shared_ptr<BackpressureMonitor>* backpressure_monitor = NULLPTR)

Review Comment:
   Actually, since the generator is now capturing `this` I rescind my previous 
statement.  We _could_ change the generator to also capture both the generator 
AND the backpressure reservoir (so it doesn't have to capture `this`) but I 
think that starts to complicate things.  I think we want to say:
   
   `All inputs and outputs to a plan become invalid when the plan is destroyed`
   
   This is a slight change in behavior so I added a guard to the AsyncGenerator 
so it should now be obvious to callers if the generator outlives the plan.
   
   With this change I went ahead and changed `backpressure_monitor` to 
`BackpressureMonitor**`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to