pitrou commented on a change in pull request #11946:
URL: https://github.com/apache/arrow/pull/11946#discussion_r778168422
##########
File path: cpp/src/arrow/record_batch.h
##########
@@ -234,6 +234,53 @@ class ARROW_EXPORT RecordBatchReader {
return batch;
}
+ class RecordBatchReaderIterator {
+ public:
+ RecordBatchReaderIterator() : batch_(RecordBatchEnd()) {}
+
+ explicit RecordBatchReaderIterator(RecordBatchReader* reader)
+ : batch_(RecordBatchEnd()), reader_(reader) {
+ Next();
+ }
+
+ bool operator!=(const RecordBatchReaderIterator& other) const {
+ return batch_ != other.batch_;
+ }
+
+ Result<std::shared_ptr<RecordBatch>> operator*() {
+ ARROW_RETURN_NOT_OK(batch_.status());
+
+ auto batch = std::move(batch_);
+ batch_ = RecordBatchEnd();
Review comment:
It is rather weird for `operator*` to change the stored batch. That
means if I use this operator twice, I will get two different results; that
sounds rather undesired.
##########
File path: cpp/src/arrow/record_batch.h
##########
@@ -234,6 +234,53 @@ class ARROW_EXPORT RecordBatchReader {
return batch;
}
+ class RecordBatchReaderIterator {
+ public:
+ RecordBatchReaderIterator() : batch_(RecordBatchEnd()) {}
+
+ explicit RecordBatchReaderIterator(RecordBatchReader* reader)
+ : batch_(RecordBatchEnd()), reader_(reader) {
+ Next();
+ }
+
+ bool operator!=(const RecordBatchReaderIterator& other) const {
+ return batch_ != other.batch_;
+ }
+
+ Result<std::shared_ptr<RecordBatch>> operator*() {
+ ARROW_RETURN_NOT_OK(batch_.status());
+
+ auto batch = std::move(batch_);
+ batch_ = RecordBatchEnd();
+ return batch;
+ }
+
+ RecordBatchReaderIterator& operator++() {
+ Next();
+ return *this;
+ }
+
+ private:
+ std::shared_ptr<RecordBatch> RecordBatchEnd() {
+ return std::shared_ptr<RecordBatch>(NULLPTR);
+ }
+
+ void Next() {
+ if (!batch_.ok()) {
+ batch_ = RecordBatchEnd();
Review comment:
This means that if an error occurs, the iterator is ended afterwards.
I'm not sure why we should do that?
##########
File path: cpp/src/arrow/record_batch.h
##########
@@ -234,6 +234,53 @@ class ARROW_EXPORT RecordBatchReader {
return batch;
}
+ class RecordBatchReaderIterator {
+ public:
Review comment:
It would be nicer to satisfy the [input iterator
requirement](https://en.cppreference.com/w/cpp/named_req/InputIterator), i.e.
add type definitions (`value_type`, `reference`) as well as equality and
post-increment operators.
##########
File path: cpp/src/arrow/record_batch_test.cc
##########
@@ -332,4 +332,72 @@ TEST_F(TestRecordBatch, MakeEmpty) {
ASSERT_EQ(empty->num_rows(), 0);
}
+class TestRecordBatchReader : public TestBase {};
+
+TEST_F(TestRecordBatchReader, RangeForLoop) {
+ const int length = 10;
+
+ auto f0 = field("f0", int32());
+ auto f1 = field("f1", uint8());
+ auto f2 = field("f2", int16());
+
+ auto metadata = key_value_metadata({"foo"}, {"bar"});
+
+ std::vector<std::shared_ptr<Field>> fields = {f0, f1, f2};
+ auto schema = ::arrow::schema({f0, f1, f2});
+ auto schema2 = ::arrow::schema({f0, f1});
+ auto schema3 = ::arrow::schema({f0, f1, f2}, metadata);
+
+ auto a0 = MakeRandomArray<Int32Array>(length);
+ auto a1 = MakeRandomArray<UInt8Array>(length);
+ auto a2 = MakeRandomArray<Int16Array>(length);
+
+ auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2});
+ auto b2 = RecordBatch::Make(schema3, length, {a0, a1, a2});
+ auto b3 = RecordBatch::Make(schema2, length, {a0, a1});
+ auto b4 = RecordBatch::Make(schema, length, {a0, a1, a1});
+
+ std::vector<std::shared_ptr<RecordBatch>> batches = {b1, b2, b3, b4};
+
+ ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches));
+
+ int64_t i = 0;
+ for (auto maybe_batch : *reader) {
+ ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
+ AssertBatchesEqual(*batch, *batches[i++]);
+ }
+}
+
+TEST_F(TestRecordBatchReader, BeginEndForLoop) {
+ const int length = 10;
+
+ auto field1 = field("f1", int32());
+ auto field2 = field("f2", uint8());
+ auto field3 = field("f3", int16());
Review comment:
Can you factor out (in the fixture class above) and reuse the test data
for these two tests?
##########
File path: cpp/src/arrow/record_batch_test.cc
##########
@@ -332,4 +332,72 @@ TEST_F(TestRecordBatch, MakeEmpty) {
ASSERT_EQ(empty->num_rows(), 0);
}
+class TestRecordBatchReader : public TestBase {};
+
+TEST_F(TestRecordBatchReader, RangeForLoop) {
+ const int length = 10;
+
+ auto f0 = field("f0", int32());
+ auto f1 = field("f1", uint8());
+ auto f2 = field("f2", int16());
+
+ auto metadata = key_value_metadata({"foo"}, {"bar"});
+
+ std::vector<std::shared_ptr<Field>> fields = {f0, f1, f2};
+ auto schema = ::arrow::schema({f0, f1, f2});
+ auto schema2 = ::arrow::schema({f0, f1});
+ auto schema3 = ::arrow::schema({f0, f1, f2}, metadata);
+
+ auto a0 = MakeRandomArray<Int32Array>(length);
+ auto a1 = MakeRandomArray<UInt8Array>(length);
+ auto a2 = MakeRandomArray<Int16Array>(length);
+
+ auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2});
+ auto b2 = RecordBatch::Make(schema3, length, {a0, a1, a2});
+ auto b3 = RecordBatch::Make(schema2, length, {a0, a1});
+ auto b4 = RecordBatch::Make(schema, length, {a0, a1, a1});
+
+ std::vector<std::shared_ptr<RecordBatch>> batches = {b1, b2, b3, b4};
+
+ ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches));
+
+ int64_t i = 0;
+ for (auto maybe_batch : *reader) {
+ ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
+ AssertBatchesEqual(*batch, *batches[i++]);
+ }
+}
Review comment:
After the loop, check that all batches were seen: `ASSERT_EQ(i,
static_cast<int64_t>(batches.size()))`
##########
File path: cpp/src/arrow/record_batch.h
##########
@@ -234,6 +234,53 @@ class ARROW_EXPORT RecordBatchReader {
return batch;
}
+ class RecordBatchReaderIterator {
+ public:
+ RecordBatchReaderIterator() : batch_(RecordBatchEnd()) {}
+
+ explicit RecordBatchReaderIterator(RecordBatchReader* reader)
+ : batch_(RecordBatchEnd()), reader_(reader) {
+ Next();
+ }
+
+ bool operator!=(const RecordBatchReaderIterator& other) const {
+ return batch_ != other.batch_;
+ }
+
+ Result<std::shared_ptr<RecordBatch>> operator*() {
+ ARROW_RETURN_NOT_OK(batch_.status());
+
+ auto batch = std::move(batch_);
+ batch_ = RecordBatchEnd();
+ return batch;
+ }
+
+ RecordBatchReaderIterator& operator++() {
+ Next();
+ return *this;
+ }
+
+ private:
+ std::shared_ptr<RecordBatch> RecordBatchEnd() {
+ return std::shared_ptr<RecordBatch>(NULLPTR);
+ }
+
+ void Next() {
+ if (!batch_.ok()) {
+ batch_ = RecordBatchEnd();
+ return;
+ }
+ batch_ = reader_->Next();
Review comment:
What if `reader_` is null, for example if this is called on
`RecordBatchReader::end()`?
##########
File path: cpp/src/arrow/record_batch.h
##########
@@ -234,6 +234,53 @@ class ARROW_EXPORT RecordBatchReader {
return batch;
}
+ class RecordBatchReaderIterator {
+ public:
+ RecordBatchReaderIterator() : batch_(RecordBatchEnd()) {}
Review comment:
Do you leave `reader_` undefined here?
##########
File path: cpp/src/arrow/record_batch_test.cc
##########
@@ -332,4 +332,72 @@ TEST_F(TestRecordBatch, MakeEmpty) {
ASSERT_EQ(empty->num_rows(), 0);
}
+class TestRecordBatchReader : public TestBase {};
+
+TEST_F(TestRecordBatchReader, RangeForLoop) {
+ const int length = 10;
+
+ auto f0 = field("f0", int32());
+ auto f1 = field("f1", uint8());
+ auto f2 = field("f2", int16());
+
+ auto metadata = key_value_metadata({"foo"}, {"bar"});
+
+ std::vector<std::shared_ptr<Field>> fields = {f0, f1, f2};
+ auto schema = ::arrow::schema({f0, f1, f2});
+ auto schema2 = ::arrow::schema({f0, f1});
+ auto schema3 = ::arrow::schema({f0, f1, f2}, metadata);
+
+ auto a0 = MakeRandomArray<Int32Array>(length);
+ auto a1 = MakeRandomArray<UInt8Array>(length);
+ auto a2 = MakeRandomArray<Int16Array>(length);
+
+ auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2});
+ auto b2 = RecordBatch::Make(schema3, length, {a0, a1, a2});
+ auto b3 = RecordBatch::Make(schema2, length, {a0, a1});
+ auto b4 = RecordBatch::Make(schema, length, {a0, a1, a1});
+
+ std::vector<std::shared_ptr<RecordBatch>> batches = {b1, b2, b3, b4};
+
+ ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches));
+
+ int64_t i = 0;
+ for (auto maybe_batch : *reader) {
+ ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
+ AssertBatchesEqual(*batch, *batches[i++]);
Review comment:
To avoid crashes or undefined behabiour, first `ASSERT_LT(i,
static_cast<int64_t>(batches.size()))`.
##########
File path: cpp/src/arrow/record_batch_test.cc
##########
@@ -332,4 +332,72 @@ TEST_F(TestRecordBatch, MakeEmpty) {
ASSERT_EQ(empty->num_rows(), 0);
}
+class TestRecordBatchReader : public TestBase {};
+
+TEST_F(TestRecordBatchReader, RangeForLoop) {
+ const int length = 10;
+
+ auto f0 = field("f0", int32());
+ auto f1 = field("f1", uint8());
+ auto f2 = field("f2", int16());
+
+ auto metadata = key_value_metadata({"foo"}, {"bar"});
+
+ std::vector<std::shared_ptr<Field>> fields = {f0, f1, f2};
+ auto schema = ::arrow::schema({f0, f1, f2});
+ auto schema2 = ::arrow::schema({f0, f1});
+ auto schema3 = ::arrow::schema({f0, f1, f2}, metadata);
Review comment:
Hmm, it does not really make sense to iterate over record batches with
different schemas. You should simplify this by using the same schema.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]