westonpace commented on code in PR #12228:
URL: https://github.com/apache/arrow/pull/12228#discussion_r855744214


##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -129,17 +129,86 @@ class ARROW_EXPORT AggregateNodeOptions : public 
ExecNodeOptions {
   std::vector<FieldRef> keys;
 };
 
+constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30;  // 1GiB
+constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28;   // 256MiB
+
+class BackpressureMonitor {

Review Comment:
   Thanks.  Added.



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -129,17 +129,86 @@ class ARROW_EXPORT AggregateNodeOptions : public 
ExecNodeOptions {
   std::vector<FieldRef> keys;
 };
 
+constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30;  // 1GiB
+constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28;   // 256MiB
+
+class BackpressureMonitor {
+ public:
+  virtual ~BackpressureMonitor() = default;
+  virtual uint64_t bytes_in_use() const = 0;
+  virtual bool is_paused() const = 0;
+};
+
+/// \brief Options to control backpressure behavior
+struct ARROW_EXPORT BackpressureOptions {
+  /// \brief Create default options that perform no backpressure
+  BackpressureOptions() : resume_if_below(0), pause_if_above(0) {}
+  /// \brief Create options that will perform backpressure
+  ///
+  /// \param resume_if_below The producer should resume producing if the 
backpressure
+  ///                        queue has fewer than resume_if_below items.
+  /// \param pause_if_above The producer should pause producing if the 
backpressure
+  ///                       queue has more than pause_if_above items
+  BackpressureOptions(uint32_t resume_if_below, uint32_t pause_if_above)
+      : resume_if_below(resume_if_below), pause_if_above(pause_if_above) {}
+
+  static BackpressureOptions DefaultBackpressure() {
+    return BackpressureOptions(kDefaultBackpressureLowBytes,
+                               kDefaultBackpressureHighBytes);
+  }
+
+  inline bool should_apply_backpressure() const { return pause_if_above > 0; }
+
+  uint64_t resume_if_below;
+  uint64_t pause_if_above;
+};
+
 /// \brief Add a sink node which forwards to an AsyncGenerator<ExecBatch>
 ///
 /// Emitted batches will not be ordered.
 class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
  public:
-  explicit SinkNodeOptions(std::function<Future<util::optional<ExecBatch>>()>* 
generator,
-                           util::BackpressureOptions backpressure = {})
-      : generator(generator), backpressure(std::move(backpressure)) {}
-
+  explicit SinkNodeOptions(
+      std::function<Future<util::optional<ExecBatch>>()>* generator,
+      BackpressureOptions backpressure = {},
+      std::shared_ptr<BackpressureMonitor>* backpressure_monitor = NULLPTR)
+      : generator(generator),
+        backpressure(std::move(backpressure)),
+        backpressure_monitor(backpressure_monitor) {}
+
+  /// \brief A pointer to a generator of batches.
+  ///
+  /// This will be set when the node is added to the plan and should be used 
to consume
+  /// data from the plan.  If this function is not called frequently enough 
then the sink
+  /// node will start to accumulate data and may apply backpressure.
   std::function<Future<util::optional<ExecBatch>>()>* generator;
-  util::BackpressureOptions backpressure;
+  /// \brief Options to control when to apply backpressure
+  ///
+  /// This is optional, the default is to never apply backpressure.  If the 
plan is not
+  /// consumed quickly enough the system may eventually run out of memory.
+  BackpressureOptions backpressure;
+  /// \brief A pointer to a backpressure monitor
+  ///
+  /// This will be set when the node is added to the plan.  This can be used 
to inspect
+  /// the amount of data currently queued in the sink node.  This is an 
optional utility
+  /// and backpressure can be applied even if this is not used.
+  std::shared_ptr<BackpressureMonitor>* backpressure_monitor;
+};
+
+/// \brief Control used by a SinkNodeConsumer to pause & resume
+///
+/// Callers should ensure that they do not call Pause and Resume 
simultaneously and they
+/// should sequence things so that a call to Pause() is always followed by an 
eventual
+/// call to Resume()
+class BackpressureControl {

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to