westonpace commented on a change in pull request #10568:
URL: https://github.com/apache/arrow/pull/10568#discussion_r665828607
##########
File path: cpp/src/arrow/csv/reader_test.cc
##########
@@ -227,37 +289,47 @@ TEST(StreamingReaderTest, BytesRead) {
auto read_options = ReadOptions::Defaults();
read_options.block_size = 20;
+ read_options.use_threads = false;
ASSERT_OK_AND_ASSIGN(
auto streaming_reader,
StreamingReader::Make(io::default_io_context(), input, read_options,
ParseOptions::Defaults(),
ConvertOptions::Defaults()));
std::shared_ptr<RecordBatch> batch;
- int64_t bytes = 6; // Size of header
+ int64_t bytes = 18; // Size of header and first batch
do {
ASSERT_EQ(bytes, streaming_reader->bytes_read());
ASSERT_OK(streaming_reader->ReadNext(&batch));
bytes += 12; // Add size of each row
- } while (batch);
+ } while (bytes <= 42);
+ ASSERT_EQ(42, streaming_reader->bytes_read());
+ // Should be able to read past the end without bumping bytes_read
+ ASSERT_OK(streaming_reader->ReadNext(&batch));
ASSERT_EQ(42, streaming_reader->bytes_read());
+ ASSERT_EQ(batch.get(), nullptr);
}
// Interaction of skip_rows and bytes_read()
{
auto input = std::make_shared<io::BufferReader>(table_buffer);
auto read_options = ReadOptions::Defaults();
- read_options.skip_rows = 2;
+ read_options.skip_rows = 1;
+ read_options.block_size = 32;
ASSERT_OK_AND_ASSIGN(
auto streaming_reader,
StreamingReader::Make(io::default_io_context(), input, read_options,
ParseOptions::Defaults(),
ConvertOptions::Defaults()));
std::shared_ptr<RecordBatch> batch;
- // first two rows and third row as header
+ // Skip the actual header (6 bytes) and then treat first row as header (12
bytes)
+ // and then streaming reader reads in first batch (12 bytes)
ASSERT_EQ(30, streaming_reader->bytes_read());
ASSERT_OK(streaming_reader->ReadNext(&batch));
ASSERT_NE(batch.get(), nullptr);
ASSERT_EQ(42, streaming_reader->bytes_read());
ASSERT_OK(streaming_reader->ReadNext(&batch));
+ ASSERT_NE(batch.get(), nullptr);
+ ASSERT_EQ(42, streaming_reader->bytes_read());
+ ASSERT_OK(streaming_reader->ReadNext(&batch));
ASSERT_EQ(batch.get(), nullptr);
Review comment:
I moved the increment of `decoded_bytes_` to be after the readahead. So
now...
* bytes_decoded_ will not be incremented until the reader asks for the batch
* The header bytes and skip before header are still marked read after Make
(I think this is fair as they have been "consumed" by this point)
* Bytes skipped after the header are marked consumed after the first batch
is delivered
I think this is close enough to what you are after.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]