This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cd0a67834 ARROW-16515: [C++] Adding a Close method to 
RecordBatchReader (#13205)
6cd0a67834 is described below

commit 6cd0a67834fd7d0e1c0c6c04b2aa860da1d0c6fd
Author: Vibhatha Lakmal Abeykoon <[email protected]>
AuthorDate: Thu Jun 9 15:55:32 2022 +0530

    ARROW-16515: [C++] Adding a Close method to RecordBatchReader (#13205)
    
    Draft PR
    
    - [x] Adding `Close` method for `RecordBatchReader`
    
    Lead-authored-by: Vibhatha Abeykoon <[email protected]>
    Co-authored-by: Vibhatha Lakmal Abeykoon <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/arrow/compute/exec/exec_plan.cc | 10 ++++++++++
 cpp/src/arrow/dataset/scanner.cc        |  9 +++++++++
 cpp/src/arrow/record_batch.cc           |  7 +++++++
 cpp/src/arrow/record_batch.h            |  5 ++++-
 4 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc 
b/cpp/src/arrow/compute/exec/exec_plan.cc
index bb197f8db8..bbceb3151d 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.cc
+++ b/cpp/src/arrow/compute/exec/exec_plan.cc
@@ -476,6 +476,16 @@ std::shared_ptr<RecordBatchReader> MakeGeneratorReader(
       return Status::OK();
     }
 
+    Status Close() override {
+      // reading from generator until end is reached.
+      std::shared_ptr<RecordBatch> batch;
+      RETURN_NOT_OK(ReadNext(&batch));
+      while (batch != NULLPTR) {
+        RETURN_NOT_OK(ReadNext(&batch));
+      }
+      return Status::OK();
+    }
+
     MemoryPool* pool_;
     std::shared_ptr<Schema> schema_;
     Iterator<util::optional<ExecBatch>> iterator_;
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 3c1230251b..02f658181c 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -85,6 +85,15 @@ class ScannerRecordBatchReader : public RecordBatchReader {
     return Status::OK();
   }
 
+  Status Close() override {
+    std::shared_ptr<RecordBatch> batch;
+    RETURN_NOT_OK(ReadNext(&batch));
+    while (batch != nullptr) {
+      RETURN_NOT_OK(ReadNext(&batch));
+    }
+    return Status::OK();
+  }
+
  private:
   std::shared_ptr<Schema> schema_;
   TaggedRecordBatchIterator delegate_;
diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc
index bb2e702727..6210eb405d 100644
--- a/cpp/src/arrow/record_batch.cc
+++ b/cpp/src/arrow/record_batch.cc
@@ -390,4 +390,11 @@ Result<std::shared_ptr<RecordBatchReader>> 
RecordBatchReader::Make(
   return std::make_shared<SimpleRecordBatchReader>(std::move(batches), schema);
 }
 
+RecordBatchReader::~RecordBatchReader() {
+  auto st = this->Close();
+  if (!st.ok()) {
+    ARROW_LOG(WARNING) << "Implicitly called RecordBatchReader::Close failed: 
" << st;
+  }
+}
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
index 60aa9ad9c9..b80c36d78c 100644
--- a/cpp/src/arrow/record_batch.h
+++ b/cpp/src/arrow/record_batch.h
@@ -220,7 +220,7 @@ class ARROW_EXPORT RecordBatchReader {
  public:
   using ValueType = std::shared_ptr<RecordBatch>;
 
-  virtual ~RecordBatchReader() = default;
+  virtual ~RecordBatchReader();
 
   /// \return the shared schema of the record batches in the stream
   virtual std::shared_ptr<Schema> schema() const = 0;
@@ -243,6 +243,9 @@ class ARROW_EXPORT RecordBatchReader {
     return batch;
   }
 
+  /// \brief finalize reader
+  virtual Status Close() { return Status::OK(); }
+
   class RecordBatchReaderIterator {
    public:
     using iterator_category = std::input_iterator_tag;

Reply via email to