westonpace commented on a change in pull request #9589:
URL: https://github.com/apache/arrow/pull/9589#discussion_r583932908



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -224,5 +225,100 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   
FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState {
+  explicit ToBatchesState(size_t n_tasks)
+      : batches(n_tasks), task_drained(n_tasks, false) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::vector<std::deque<std::shared_ptr<RecordBatch>>> batches;
+  std::vector<bool> task_drained;
+  size_t pop_cursor = 0;
+
+  void Push(std::shared_ptr<RecordBatch> b, size_t i_task) {
+    std::lock_guard<std::mutex> lock(mutex);
+    if (batches.size() <= i_task) {
+      batches.resize(i_task + 1);
+      task_drained.resize(i_task + 1);
+    }
+    batches[i_task].push_back(std::move(b));
+  }
+
+  Status Finish(size_t position) {
+    std::lock_guard<std::mutex> lock(mutex);
+    task_drained[position] = true;
+    return Status::OK();
+  }
+
+  std::shared_ptr<RecordBatch> Pop() {
+    std::unique_lock<std::mutex> lock(mutex);
+    std::condition_variable().wait_for(lock, std::chrono::milliseconds{1}, 
[this] {

Review comment:
       This seems a little off.  It looks like you're saying "If the current 
batch is still being filled then wait up to 1ms for it to add a new item."  But 
what happens if that 1ms expires?  Also, it looks like you're ignoring the 
return of `wait_for`.
   
   For example, let's pretend that there are two scan tasks.
   
   Scan task 1 RB 1 arrives at 10ms
   Scan task 1 RB 1 arrives at 20ms
   Scan task 2 RB 1 arrives at 30ms
   
   The consumer grabs scan tasks very quickly.  So they come in at timestamp 
15ms and they try to `Pop`.  batches[0].  It will sit in this loop for about 
1ms (because tasks_drained[0] == false) and then break out.  Then pop_cursor == 
batches.size() will be true and so it will return null.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -224,5 +225,100 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   
FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState {
+  explicit ToBatchesState(size_t n_tasks)
+      : batches(n_tasks), task_drained(n_tasks, false) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::vector<std::deque<std::shared_ptr<RecordBatch>>> batches;
+  std::vector<bool> task_drained;
+  size_t pop_cursor = 0;
+
+  void Push(std::shared_ptr<RecordBatch> b, size_t i_task) {
+    std::lock_guard<std::mutex> lock(mutex);
+    if (batches.size() <= i_task) {
+      batches.resize(i_task + 1);
+      task_drained.resize(i_task + 1);
+    }
+    batches[i_task].push_back(std::move(b));
+  }
+
+  Status Finish(size_t position) {
+    std::lock_guard<std::mutex> lock(mutex);
+    task_drained[position] = true;
+    return Status::OK();
+  }
+
+  std::shared_ptr<RecordBatch> Pop() {
+    std::unique_lock<std::mutex> lock(mutex);
+    std::condition_variable().wait_for(lock, std::chrono::milliseconds{1}, 
[this] {
+      while (pop_cursor < batches.size()) {
+        // queue for current scan task contains at least one batch, pop that
+        if (!batches[pop_cursor].empty()) return true;
+
+        // queue is empty but will be appended to eventually, wait for that
+        if (!task_drained[pop_cursor]) return false;
+
+        ++pop_cursor;
+      }
+      // all scan tasks drained, terminate
+      return true;
+    });
+
+    if (pop_cursor == batches.size()) return nullptr;
+
+    auto batch = std::move(batches[pop_cursor].front());
+    batches[pop_cursor].pop_front();
+    return batch;
+  }
+};
+
+Result<RecordBatchIterator> Scanner::ToBatches() {
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_vector, scan_task_it.ToVector());
+
+  auto task_group = scan_context_->TaskGroup();
+  auto state = std::make_shared<ToBatchesState>(scan_task_vector.size());
+
+  size_t scan_task_id = 0;
+  for (auto scan_task : scan_task_vector) {
+    auto id = scan_task_id++;
+    task_group->Append([state, id, scan_task] {

Review comment:
       Again, what I'm working on will work around this so maybe not stress at 
the moment but there's no back-pressure here.  If the batch consumer is not 
fast enough and the dataset is larger than RAM the system will run out of RAM.
   
   It's a bit odd because you are fixing ARROW-11800 here (though without 
pressure) and then I'll be breaking it again with my implementation (the first 
pass of my impl will have back pressure and parse loaded buffers a bit more 
serially)

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -224,5 +225,100 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   
FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState {
+  explicit ToBatchesState(size_t n_tasks)
+      : batches(n_tasks), task_drained(n_tasks, false) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::vector<std::deque<std::shared_ptr<RecordBatch>>> batches;
+  std::vector<bool> task_drained;
+  size_t pop_cursor = 0;
+
+  void Push(std::shared_ptr<RecordBatch> b, size_t i_task) {
+    std::lock_guard<std::mutex> lock(mutex);
+    if (batches.size() <= i_task) {
+      batches.resize(i_task + 1);
+      task_drained.resize(i_task + 1);
+    }
+    batches[i_task].push_back(std::move(b));
+  }
+
+  Status Finish(size_t position) {
+    std::lock_guard<std::mutex> lock(mutex);
+    task_drained[position] = true;
+    return Status::OK();
+  }
+
+  std::shared_ptr<RecordBatch> Pop() {
+    std::unique_lock<std::mutex> lock(mutex);
+    std::condition_variable().wait_for(lock, std::chrono::milliseconds{1}, 
[this] {
+      while (pop_cursor < batches.size()) {
+        // queue for current scan task contains at least one batch, pop that
+        if (!batches[pop_cursor].empty()) return true;
+
+        // queue is empty but will be appended to eventually, wait for that
+        if (!task_drained[pop_cursor]) return false;
+
+        ++pop_cursor;
+      }
+      // all scan tasks drained, terminate
+      return true;
+    });
+
+    if (pop_cursor == batches.size()) return nullptr;
+
+    auto batch = std::move(batches[pop_cursor].front());
+    batches[pop_cursor].pop_front();
+    return batch;
+  }
+};
+
+Result<RecordBatchIterator> Scanner::ToBatches() {
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_vector, scan_task_it.ToVector());

Review comment:
       I'll be replacing this with something better so I don't know how much we 
care to worry but this is not ideal.  For example, with parquet, this would 
fetch metadata for every file in the scan before starting to read any 
individual file.  It introduces more latency than necessary.
   
   Also, I'm not sure how this will interact with parquet preloading.

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -163,12 +164,20 @@ class ARROW_DS_EXPORT Scanner {
   /// in a concurrent fashion and outlive the iterator.
   Result<ScanTaskIterator> Scan();
 
+  /// \brief Apply a visitor to each RecordBatch as it is scanned. If multiple
+  /// threads are used, the visitor will be invoked from those threads and is
+  /// responsible for any synchronization.
+  Status Scan(std::function<Status(std::shared_ptr<RecordBatch>)> visitor);

Review comment:
       Why both `Scan(visitor)` and `ToBatches`?  Couldn't you just do 
`ToBatches().Visit(visitor)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to