westonpace commented on code in PR #12228:
URL: https://github.com/apache/arrow/pull/12228#discussion_r855738848
##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -129,17 +129,86 @@ class ARROW_EXPORT AggregateNodeOptions : public
ExecNodeOptions {
std::vector<FieldRef> keys;
};
+constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30; // 1GiB
+constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28; // 256MiB
+
+class BackpressureMonitor {
+ public:
+ virtual ~BackpressureMonitor() = default;
+ virtual uint64_t bytes_in_use() const = 0;
+ virtual bool is_paused() const = 0;
+};
+
+/// \brief Options to control backpressure behavior
+struct ARROW_EXPORT BackpressureOptions {
+ /// \brief Create default options that perform no backpressure
+ BackpressureOptions() : resume_if_below(0), pause_if_above(0) {}
+ /// \brief Create options that will perform backpressure
+ ///
+ /// \param resume_if_below The producer should resume producing if the
backpressure
+ /// queue has fewer than resume_if_below items.
+ /// \param pause_if_above The producer should pause producing if the
backpressure
+ /// queue has more than pause_if_above items
+ BackpressureOptions(uint32_t resume_if_below, uint32_t pause_if_above)
+ : resume_if_below(resume_if_below), pause_if_above(pause_if_above) {}
+
+ static BackpressureOptions DefaultBackpressure() {
+ return BackpressureOptions(kDefaultBackpressureLowBytes,
+ kDefaultBackpressureHighBytes);
+ }
+
+ inline bool should_apply_backpressure() const { return pause_if_above > 0; }
+
+ uint64_t resume_if_below;
+ uint64_t pause_if_above;
+};
+
/// \brief Add a sink node which forwards to an AsyncGenerator<ExecBatch>
///
/// Emitted batches will not be ordered.
class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
public:
- explicit SinkNodeOptions(std::function<Future<util::optional<ExecBatch>>()>*
generator,
- util::BackpressureOptions backpressure = {})
- : generator(generator), backpressure(std::move(backpressure)) {}
-
+ explicit SinkNodeOptions(
+ std::function<Future<util::optional<ExecBatch>>()>* generator,
+ BackpressureOptions backpressure = {},
+ std::shared_ptr<BackpressureMonitor>* backpressure_monitor = NULLPTR)
Review Comment:
It's an out pointer that we want to share ownership. I was a little torn on
this as we could use a raw pointer but then it wouldn't be valid to call any of
the monitor methods after the plan is destroyed. I think that is probably ok
in general but `generator` _will_ actually keep the queue alive beyond the plan
(`std::function` is essentially a shared pointer to its target) and so I wanted
to match the lifetime.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]