This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 6cd0a67834 ARROW-16515: [C++] Adding a Close method to
RecordBatchReader (#13205)
6cd0a67834 is described below
commit 6cd0a67834fd7d0e1c0c6c04b2aa860da1d0c6fd
Author: Vibhatha Lakmal Abeykoon <[email protected]>
AuthorDate: Thu Jun 9 15:55:32 2022 +0530
ARROW-16515: [C++] Adding a Close method to RecordBatchReader (#13205)
Draft PR
- [x] Adding `Close` method for `RecordBatchReader`
Lead-authored-by: Vibhatha Abeykoon <[email protected]>
Co-authored-by: Vibhatha Lakmal Abeykoon <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/arrow/compute/exec/exec_plan.cc | 10 ++++++++++
cpp/src/arrow/dataset/scanner.cc | 9 +++++++++
cpp/src/arrow/record_batch.cc | 7 +++++++
cpp/src/arrow/record_batch.h | 5 ++++-
4 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc
b/cpp/src/arrow/compute/exec/exec_plan.cc
index bb197f8db8..bbceb3151d 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.cc
+++ b/cpp/src/arrow/compute/exec/exec_plan.cc
@@ -476,6 +476,16 @@ std::shared_ptr<RecordBatchReader> MakeGeneratorReader(
return Status::OK();
}
+ Status Close() override {
+ // reading from generator until end is reached.
+ std::shared_ptr<RecordBatch> batch;
+ RETURN_NOT_OK(ReadNext(&batch));
+ while (batch != NULLPTR) {
+ RETURN_NOT_OK(ReadNext(&batch));
+ }
+ return Status::OK();
+ }
+
MemoryPool* pool_;
std::shared_ptr<Schema> schema_;
Iterator<util::optional<ExecBatch>> iterator_;
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 3c1230251b..02f658181c 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -85,6 +85,15 @@ class ScannerRecordBatchReader : public RecordBatchReader {
return Status::OK();
}
+ Status Close() override {
+ std::shared_ptr<RecordBatch> batch;
+ RETURN_NOT_OK(ReadNext(&batch));
+ while (batch != nullptr) {
+ RETURN_NOT_OK(ReadNext(&batch));
+ }
+ return Status::OK();
+ }
+
private:
std::shared_ptr<Schema> schema_;
TaggedRecordBatchIterator delegate_;
diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc
index bb2e702727..6210eb405d 100644
--- a/cpp/src/arrow/record_batch.cc
+++ b/cpp/src/arrow/record_batch.cc
@@ -390,4 +390,11 @@ Result<std::shared_ptr<RecordBatchReader>>
RecordBatchReader::Make(
return std::make_shared<SimpleRecordBatchReader>(std::move(batches), schema);
}
+RecordBatchReader::~RecordBatchReader() {
+ auto st = this->Close();
+ if (!st.ok()) {
+ ARROW_LOG(WARNING) << "Implicitly called RecordBatchReader::Close failed:
" << st;
+ }
+}
+
} // namespace arrow
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
index 60aa9ad9c9..b80c36d78c 100644
--- a/cpp/src/arrow/record_batch.h
+++ b/cpp/src/arrow/record_batch.h
@@ -220,7 +220,7 @@ class ARROW_EXPORT RecordBatchReader {
public:
using ValueType = std::shared_ptr<RecordBatch>;
- virtual ~RecordBatchReader() = default;
+ virtual ~RecordBatchReader();
/// \return the shared schema of the record batches in the stream
virtual std::shared_ptr<Schema> schema() const = 0;
@@ -243,6 +243,9 @@ class ARROW_EXPORT RecordBatchReader {
return batch;
}
+ /// \brief finalize reader
+ virtual Status Close() { return Status::OK(); }
+
class RecordBatchReaderIterator {
public:
using iterator_category = std::input_iterator_tag;