lidavidm commented on a change in pull request #11285:
URL: https://github.com/apache/arrow/pull/11285#discussion_r720262544



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -593,20 +593,23 @@ Result<EnumeratedRecordBatchGenerator> 
AsyncScanner::ScanBatchesUnorderedAsync(
   ARROW_ASSIGN_OR_RAISE(auto plan, 
compute::ExecPlan::Make(exec_context.get()));
   AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
 
+  util::BackpressureOptions backpressure =
+      util::MakeBackpressureOptions(kDefaultBackpressureLow, 
kDefaultBackpressureHigh);

Review comment:
       We should probably expose these in scan options somewhere, no?

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1645,6 +1667,47 @@ AsyncGenerator<T> MakeCancellable(AsyncGenerator<T> 
source, StopToken stop_token
   return CancellableGenerator<T>{std::move(source), std::move(stop_token)};
 }
 
+template <typename T>
+struct PauseableGenerator {
+ public:
+  PauseableGenerator(AsyncGenerator<T> source, 
std::shared_ptr<util::AsyncToggle> toggle)
+      : state_(std::make_shared<PauseableGeneratorState>(std::move(source),
+                                                         std::move(toggle))) {}
+
+  Future<T> operator()() { return (*state_)(); }
+
+ private:
+  struct PauseableGeneratorState
+      : public std::enable_shared_from_this<PauseableGeneratorState> {
+    PauseableGeneratorState(AsyncGenerator<T> source,
+                            std::shared_ptr<util::AsyncToggle> toggle)
+        : source_(std::move(source)), toggle_(std::move(toggle)) {}
+
+    Future<T> operator()() {
+      std::shared_ptr<PauseableGeneratorState> self = this->shared_from_this();
+      return toggle_->WhenOpen().Then([self] {

Review comment:
       Below there's a comment about how later calls to Open may finish before 
earlier calls - doesn't this mean that this generator can get inadvertently 
reordered?
   
   (Also, it relies on Future running callbacks in order, right?)

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -785,6 +786,24 @@ class ReadaheadGenerator {
 template <typename T>
 class PushGenerator {
   struct State {
+    explicit State(util::BackpressureOptions backpressure)
+        : backpressure(std::move(backpressure)) {}
+
+    void OpenBackpressureIfFree() {
+      if (backpressure.toggle && !backpressure.toggle->IsOpen() &&
+          result_q.size() < backpressure.resume_if_below) {
+        backpressure.toggle->Open();
+      }
+    }
+
+    void CloseBackpressureIfFull() {
+      if (backpressure.toggle && backpressure.toggle->IsOpen() &&
+          result_q.size() > backpressure.pause_if_above) {
+        backpressure.toggle->Close();

Review comment:
       Is there any value checking IsOpen since Close performs the same check 
internally?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to