pitrou commented on a change in pull request #9620:
URL: https://github.com/apache/arrow/pull/9620#discussion_r642908613
##########
File path: cpp/src/arrow/util/future.h
##########
@@ -829,6 +829,9 @@ Future<std::vector<Result<T>>> All(std::vector<Future<T>>
futures) {
return out;
}
+template <>
+inline Future<>::Future(Status s) :
Future(internal::Empty::ToResult(std::move(s))) {}
Review comment:
Hmm... can you explain what the point of this is? Also, why isn't the
constructor defined directly at its declaration point in the class?
##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -289,21 +358,94 @@ class SerializedFile : public ParquetFileReader::Contents
{
"Parquet magic bytes not found in footer. Either the file is
corrupted or this "
"is not a parquet file.");
}
+ // Both encrypted/unencrypted footers have the same footer length check.
+ uint32_t metadata_len = ::arrow::util::SafeLoadAs<uint32_t>(
+ reinterpret_cast<const uint8_t*>(footer_buffer->data()) +
footer_read_size -
+ kFooterSize);
+ if (metadata_len > source_size_ - kFooterSize) {
+ throw ParquetInvalidOrCorruptedFileException(
+ "Parquet file size is ", source_size_,
+ " bytes, smaller than the size reported by footer's (",
metadata_len, "bytes)");
+ }
+ return metadata_len;
+ }
+
+ // Does not throw.
+ ::arrow::Future<> ParseMetaDataAsync() {
+ int64_t footer_read_size;
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ footer_read_size = GetFooterReadSize();
+ END_PARQUET_CATCH_EXCEPTIONS
+ // Assumes this is kept alive externally
+ return source_->ReadAsync(source_size_ - footer_read_size,
footer_read_size)
+ .Then([=](const std::shared_ptr<::arrow::Buffer>& footer_buffer)
+ -> ::arrow::Future<> {
+ uint32_t metadata_len;
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ metadata_len = ParseFooterLength(footer_buffer, footer_read_size);
+ END_PARQUET_CATCH_EXCEPTIONS
+ int64_t metadata_start = source_size_ - kFooterSize - metadata_len;
+
+ std::shared_ptr<::arrow::Buffer> metadata_buffer;
+ if (footer_read_size >= (metadata_len + kFooterSize)) {
+ metadata_buffer =
+ SliceBuffer(footer_buffer, footer_read_size - metadata_len -
kFooterSize,
+ metadata_len);
+ return ParseMaybeEncryptedMetaDataAsync(footer_buffer,
+ std::move(metadata_buffer),
+ footer_read_size,
metadata_len);
+ }
+ return source_->ReadAsync(metadata_start, metadata_len)
+ .Then([=](const std::shared_ptr<::arrow::Buffer>&
metadata_buffer) {
+ return ParseMaybeEncryptedMetaDataAsync(footer_buffer,
metadata_buffer,
+ footer_read_size,
metadata_len);
+ });
+ });
+ }
- if (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic,
4) == 0) {
+ // Continuation
+ ::arrow::Future<> ParseMaybeEncryptedMetaDataAsync(
+ std::shared_ptr<::arrow::Buffer> footer_buffer,
+ std::shared_ptr<::arrow::Buffer> metadata_buffer, int64_t
footer_read_size,
+ uint32_t metadata_len) {
+ // Parse the footer depending on encryption type
+ const bool is_encrypted_footer =
+ memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic,
4) == 0;
+ if (is_encrypted_footer) {
// Encrypted file with Encrypted footer.
- ParseMetaDataOfEncryptedFileWithEncryptedFooter(footer_buffer,
footer_read_size);
- return;
+ std::pair<int64_t, uint32_t> read_size;
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ read_size =
+ ParseMetaDataOfEncryptedFileWithEncryptedFooter(metadata_buffer,
metadata_len);
+ END_PARQUET_CATCH_EXCEPTIONS
+ // Read the actual footer
+ int64_t metadata_start = read_size.first;
+ metadata_len = read_size.second;
+ return source_->ReadAsync(metadata_start, metadata_len)
+ .Then([=](const std::shared_ptr<::arrow::Buffer>& metadata_buffer) {
+ // Continue and read the file footer
+ return ParseMetaDataAsync(std::move(footer_buffer),
metadata_buffer,
+ footer_read_size, metadata_len,
+ is_encrypted_footer);
+ });
}
+ return ParseMetaDataAsync(std::move(footer_buffer),
std::move(metadata_buffer),
+ footer_read_size, metadata_len,
is_encrypted_footer);
+ }
- // No encryption or encryption with plaintext footer mode.
- std::shared_ptr<Buffer> metadata_buffer;
- uint32_t metadata_len, read_metadata_len;
- ParseUnencryptedFileMetadata(footer_buffer, footer_read_size,
&metadata_buffer,
- &metadata_len, &read_metadata_len);
-
+ // Continuation
+ ::arrow::Status ParseMetaDataAsync(std::shared_ptr<::arrow::Buffer>
footer_buffer,
Review comment:
This is not async, rename this function? For example
`ParseMetadataBuffer`.
##########
File path: cpp/src/parquet/arrow/reader.h
##########
@@ -21,6 +21,8 @@
#include <memory>
#include <vector>
+#include "arrow/util/async_generator.h"
Review comment:
This is a heavy include, can we avoid adding it?
##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -2331,6 +2330,63 @@ TEST(TestArrowReadWrite, GetRecordBatchReaderNoColumns) {
ASSERT_EQ(actual_batch->num_rows(), num_rows);
}
+TEST(TestArrowReadWrite, GetRecordBatchGenerator) {
+ ArrowReaderProperties properties = default_arrow_reader_properties();
+ const int num_rows = 1024;
+ const int row_group_size = 512;
+ const int num_columns = 2;
+
+ std::shared_ptr<Table> table;
+ ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
+
+ std::shared_ptr<Buffer> buffer;
+ ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
+
default_arrow_writer_properties(), &buffer));
+
+ std::shared_ptr<FileReader> reader;
+ {
+ std::unique_ptr<FileReader> unique_reader;
+ FileReaderBuilder builder;
+ ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
+ ASSERT_OK(builder.properties(properties)->Build(&unique_reader));
+ reader = std::move(unique_reader);
+ }
+
+ auto check_batches = [](const std::shared_ptr<::arrow::RecordBatch>& batch,
+ int num_columns, int num_rows) {
+ ASSERT_NE(batch, nullptr);
+ ASSERT_EQ(batch->num_columns(), num_columns);
+ ASSERT_EQ(batch->num_rows(), num_rows);
Review comment:
This doesn't seem to actually check the contents read from file. Could
you do that?
##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -563,6 +692,28 @@ std::unique_ptr<ParquetFileReader>
ParquetFileReader::OpenFile(
return Open(std::move(source), props, std::move(metadata));
}
+::arrow::Future<std::unique_ptr<ParquetFileReader>>
ParquetFileReader::OpenAsync(
+ std::shared_ptr<::arrow::io::RandomAccessFile> source, const
ReaderProperties& props,
+ std::shared_ptr<FileMetaData> metadata) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ auto fut = SerializedFile::OpenAsync(std::move(source), props,
std::move(metadata));
+ // TODO(ARROW-12259): workaround since we have Future<(move-only type)>
Review comment:
I don't understand this comment. Where is the workaround?
##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -264,23 +264,92 @@ class SerializedFile : public ParquetFileReader::Contents
{
}
}
PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges));
- return cached_source_->Wait();
}
+ ::arrow::Future<> WhenBuffered(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices) const
{
+ if (!cached_source_) {
+ return ::arrow::Status::Invalid("Must call PreBuffer before
WhenBuffered");
+ }
+ std::vector<::arrow::io::ReadRange> ranges;
+ for (int row : row_groups) {
+ for (int col : column_indices) {
+ ranges.push_back(
+ ComputeColumnChunkRange(file_metadata_.get(), source_size_, row,
col));
+ }
+ }
+ return cached_source_->WaitFor(ranges);
+ }
+
+ // Metadata/footer parsing. Divided up to separate sync/async paths, and to
use
+ // exceptions for error handling (with the async path converting to
Future/Status).
+
void ParseMetaData() {
Review comment:
Can we express this in terms of `ParseMetadataAsync`? I would rather not
have duplicate code paths for this.
##########
File path: cpp/src/parquet/arrow/reader.h
##########
@@ -175,6 +177,19 @@ class PARQUET_EXPORT FileReader {
const std::vector<int>& row_group_indices, const std::vector<int>&
column_indices,
std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;
+ /// \brief Return a generator of record batches.
+ ///
+ /// The FileReader must outlive the generator, so this requires that you
pass in a
+ /// shared_ptr.
+ ///
+ /// \returns error Result if either row_group_indices or column_indices
contains an
+ /// invalid index
+ virtual
::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
+ GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
+ const std::vector<int> row_group_indices,
+ const std::vector<int> column_indices,
+ ::arrow::internal::Executor* executor = NULLPTR) = 0;
Review comment:
Should explain whether the executor is meant for IO or CPU work.
##########
File path: cpp/src/parquet/encryption/test_encryption_util.cc
##########
@@ -334,7 +335,10 @@ void FileDecryptor::DecryptFile(
reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
}
- auto file_reader = parquet::ParquetFileReader::OpenFile(file, false,
reader_properties);
+ std::shared_ptr<::arrow::io::RandomAccessFile> source;
+ PARQUET_ASSIGN_OR_THROW(
+ source, ::arrow::io::ReadableFile::Open(file,
reader_properties.memory_pool()));
Review comment:
For the record, why did you need to change this?
##########
File path: cpp/src/parquet/arrow/reader.h
##########
@@ -21,6 +21,8 @@
#include <memory>
#include <vector>
+#include "arrow/util/async_generator.h"
+#include "arrow/util/optional.h"
Review comment:
Is this being used?
##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -325,10 +469,9 @@ class SerializedFile : public ParquetFileReader::Contents {
std::shared_ptr<InternalFileDecryptor> file_decryptor_;
- void ParseUnencryptedFileMetadata(const std::shared_ptr<Buffer>&
footer_buffer,
- int64_t footer_read_size,
- std::shared_ptr<Buffer>* metadata_buffer,
- uint32_t* metadata_len, uint32_t*
read_metadata_len);
+ // \return The true length of the metadata
Review comment:
Length in bytes?
##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -968,6 +980,102 @@ Status FileReaderImpl::GetRecordBatchReader(const
std::vector<int>& row_groups,
return Status::OK();
}
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch generators (where each sub-generator is the contents of a single row
group).
+class RowGroupGenerator {
+ public:
+ using RecordBatchGenerator =
+ ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;
+
+ explicit RowGroupGenerator(std::shared_ptr<FileReaderImpl> arrow_reader,
+ ::arrow::internal::Executor* executor,
+ std::vector<int> row_groups, std::vector<int>
column_indices)
+ : arrow_reader_(std::move(arrow_reader)),
+ executor_(executor),
+ row_groups_(std::move(row_groups)),
+ column_indices_(std::move(column_indices)),
+ index_(0) {}
+
+ ::arrow::Future<RecordBatchGenerator> operator()() {
+ if (index_ >= row_groups_.size()) {
+ return ::arrow::AsyncGeneratorEnd<RecordBatchGenerator>();
+ }
+ int row_group = row_groups_[index_++];
+ std::vector<int> column_indices = column_indices_;
+ auto reader = arrow_reader_;
+ if (!reader->properties().pre_buffer()) {
+ return SubmitRead(executor_, reader, row_group, column_indices);
+ }
+ auto ready = reader->parquet_reader()->WhenBuffered({row_group},
column_indices);
+ if (executor_) ready = executor_->Transfer(ready);
+ return ready.Then([=]() -> ::arrow::Result<RecordBatchGenerator> {
+ return ReadOneRowGroup(reader, row_group, column_indices);
+ });
+ }
+
+ private:
+ // Synchronous fallback for when pre-buffer isn't enabled.
+ //
+ // Making the Parquet reader truly asynchronous requires heavy refactoring,
so the
+ // generator piggybacks on ReadRangeCache. The lazy ReadRangeCache can be
used for
+ // async I/O without forcing readahead.
+ static ::arrow::Future<RecordBatchGenerator> SubmitRead(
+ ::arrow::internal::Executor* executor, std::shared_ptr<FileReaderImpl>
self,
+ const int row_group, const std::vector<int>& column_indices) {
+ if (!executor) {
+ return Future<RecordBatchGenerator>::MakeFinished(
+ ReadOneRowGroup(self, row_group, column_indices));
+ }
+ // If we have an executor, then force transfer (even if I/O was complete)
+ return ::arrow::DeferNotOk(
+ executor->Submit(ReadOneRowGroup, self, row_group, column_indices));
+ }
+
+ static ::arrow::Result<RecordBatchGenerator> ReadOneRowGroup(
+ std::shared_ptr<FileReaderImpl> self, const int row_group,
+ const std::vector<int>& column_indices) {
+ std::shared_ptr<::arrow::Table> table;
+ // Skips bound checks/pre-buffering, since we've done that already
+ RETURN_NOT_OK(self->DecodeRowGroups({row_group}, column_indices, &table));
+ auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
+ ::arrow::RecordBatchVector batches;
+ while (true) {
+ std::shared_ptr<::arrow::RecordBatch> batch;
+ RETURN_NOT_OK(table_reader->ReadNext(&batch));
+ if (!batch) {
+ break;
+ }
+ batches.push_back(batch);
+ }
+ return ::arrow::MakeVectorGenerator(std::move(batches));
+ }
+
+ std::shared_ptr<FileReaderImpl> arrow_reader_;
+ ::arrow::internal::Executor* executor_;
+ std::vector<int> row_groups_;
+ std::vector<int> column_indices_;
+ size_t index_;
+};
+
+::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
+FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
+ const std::vector<int>
row_group_indices,
+ const std::vector<int> column_indices,
+ ::arrow::internal::Executor* executor)
{
+ RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices));
+ if (reader_properties_.pre_buffer()) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ reader_->PreBuffer(row_group_indices, column_indices,
reader_properties_.io_context(),
+ reader_properties_.cache_options());
+ END_PARQUET_CATCH_EXCEPTIONS
+ }
+
::arrow::AsyncGenerator<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
Review comment:
Use `auto`?
##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -968,6 +980,102 @@ Status FileReaderImpl::GetRecordBatchReader(const
std::vector<int>& row_groups,
return Status::OK();
}
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch generators (where each sub-generator is the contents of a single row
group).
+class RowGroupGenerator {
+ public:
+ using RecordBatchGenerator =
+ ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;
+
+ explicit RowGroupGenerator(std::shared_ptr<FileReaderImpl> arrow_reader,
+ ::arrow::internal::Executor* executor,
+ std::vector<int> row_groups, std::vector<int>
column_indices)
+ : arrow_reader_(std::move(arrow_reader)),
+ executor_(executor),
+ row_groups_(std::move(row_groups)),
+ column_indices_(std::move(column_indices)),
+ index_(0) {}
+
+ ::arrow::Future<RecordBatchGenerator> operator()() {
+ if (index_ >= row_groups_.size()) {
+ return ::arrow::AsyncGeneratorEnd<RecordBatchGenerator>();
+ }
+ int row_group = row_groups_[index_++];
+ std::vector<int> column_indices = column_indices_;
+ auto reader = arrow_reader_;
+ if (!reader->properties().pre_buffer()) {
+ return SubmitRead(executor_, reader, row_group, column_indices);
+ }
+ auto ready = reader->parquet_reader()->WhenBuffered({row_group},
column_indices);
+ if (executor_) ready = executor_->Transfer(ready);
+ return ready.Then([=]() -> ::arrow::Result<RecordBatchGenerator> {
Review comment:
Can you open a JIRA to reuse @westonpace 's work to always transfer the
future? (we don't want CPU-heavy Parquet decoding to happen on an IO thread)
##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -289,21 +358,94 @@ class SerializedFile : public ParquetFileReader::Contents
{
"Parquet magic bytes not found in footer. Either the file is
corrupted or this "
"is not a parquet file.");
}
+ // Both encrypted/unencrypted footers have the same footer length check.
+ uint32_t metadata_len = ::arrow::util::SafeLoadAs<uint32_t>(
+ reinterpret_cast<const uint8_t*>(footer_buffer->data()) +
footer_read_size -
+ kFooterSize);
+ if (metadata_len > source_size_ - kFooterSize) {
+ throw ParquetInvalidOrCorruptedFileException(
+ "Parquet file size is ", source_size_,
+ " bytes, smaller than the size reported by footer's (",
metadata_len, "bytes)");
+ }
+ return metadata_len;
+ }
+
+ // Does not throw.
+ ::arrow::Future<> ParseMetaDataAsync() {
+ int64_t footer_read_size;
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ footer_read_size = GetFooterReadSize();
+ END_PARQUET_CATCH_EXCEPTIONS
+ // Assumes this is kept alive externally
+ return source_->ReadAsync(source_size_ - footer_read_size,
footer_read_size)
+ .Then([=](const std::shared_ptr<::arrow::Buffer>& footer_buffer)
+ -> ::arrow::Future<> {
+ uint32_t metadata_len;
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ metadata_len = ParseFooterLength(footer_buffer, footer_read_size);
+ END_PARQUET_CATCH_EXCEPTIONS
+ int64_t metadata_start = source_size_ - kFooterSize - metadata_len;
+
+ std::shared_ptr<::arrow::Buffer> metadata_buffer;
+ if (footer_read_size >= (metadata_len + kFooterSize)) {
+ metadata_buffer =
+ SliceBuffer(footer_buffer, footer_read_size - metadata_len -
kFooterSize,
+ metadata_len);
+ return ParseMaybeEncryptedMetaDataAsync(footer_buffer,
+ std::move(metadata_buffer),
+ footer_read_size,
metadata_len);
+ }
+ return source_->ReadAsync(metadata_start, metadata_len)
+ .Then([=](const std::shared_ptr<::arrow::Buffer>&
metadata_buffer) {
+ return ParseMaybeEncryptedMetaDataAsync(footer_buffer,
metadata_buffer,
+ footer_read_size,
metadata_len);
+ });
+ });
+ }
- if (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic,
4) == 0) {
+ // Continuation
+ ::arrow::Future<> ParseMaybeEncryptedMetaDataAsync(
+ std::shared_ptr<::arrow::Buffer> footer_buffer,
+ std::shared_ptr<::arrow::Buffer> metadata_buffer, int64_t
footer_read_size,
+ uint32_t metadata_len) {
+ // Parse the footer depending on encryption type
+ const bool is_encrypted_footer =
+ memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic,
4) == 0;
+ if (is_encrypted_footer) {
// Encrypted file with Encrypted footer.
- ParseMetaDataOfEncryptedFileWithEncryptedFooter(footer_buffer,
footer_read_size);
- return;
+ std::pair<int64_t, uint32_t> read_size;
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ read_size =
+ ParseMetaDataOfEncryptedFileWithEncryptedFooter(metadata_buffer,
metadata_len);
+ END_PARQUET_CATCH_EXCEPTIONS
+ // Read the actual footer
+ int64_t metadata_start = read_size.first;
+ metadata_len = read_size.second;
+ return source_->ReadAsync(metadata_start, metadata_len)
+ .Then([=](const std::shared_ptr<::arrow::Buffer>& metadata_buffer) {
+ // Continue and read the file footer
+ return ParseMetaDataAsync(std::move(footer_buffer),
metadata_buffer,
+ footer_read_size, metadata_len,
+ is_encrypted_footer);
+ });
}
+ return ParseMetaDataAsync(std::move(footer_buffer),
std::move(metadata_buffer),
+ footer_read_size, metadata_len,
is_encrypted_footer);
+ }
- // No encryption or encryption with plaintext footer mode.
- std::shared_ptr<Buffer> metadata_buffer;
- uint32_t metadata_len, read_metadata_len;
- ParseUnencryptedFileMetadata(footer_buffer, footer_read_size,
&metadata_buffer,
- &metadata_len, &read_metadata_len);
-
+ // Continuation
+ ::arrow::Status ParseMetaDataAsync(std::shared_ptr<::arrow::Buffer>
footer_buffer,
+ std::shared_ptr<::arrow::Buffer>
metadata_buffer,
+ int64_t footer_read_size, uint32_t
metadata_len,
+ const bool is_encrypted_footer) {
Review comment:
`footer_buffer` and `footer_read_size` don't seem used here?
##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -2331,6 +2330,63 @@ TEST(TestArrowReadWrite, GetRecordBatchReaderNoColumns) {
ASSERT_EQ(actual_batch->num_rows(), num_rows);
}
+TEST(TestArrowReadWrite, GetRecordBatchGenerator) {
+ ArrowReaderProperties properties = default_arrow_reader_properties();
+ const int num_rows = 1024;
+ const int row_group_size = 512;
+ const int num_columns = 2;
+
+ std::shared_ptr<Table> table;
+ ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
+
+ std::shared_ptr<Buffer> buffer;
+ ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
+
default_arrow_writer_properties(), &buffer));
+
+ std::shared_ptr<FileReader> reader;
+ {
+ std::unique_ptr<FileReader> unique_reader;
+ FileReaderBuilder builder;
+ ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
+ ASSERT_OK(builder.properties(properties)->Build(&unique_reader));
+ reader = std::move(unique_reader);
+ }
+
+ auto check_batches = [](const std::shared_ptr<::arrow::RecordBatch>& batch,
+ int num_columns, int num_rows) {
+ ASSERT_NE(batch, nullptr);
+ ASSERT_EQ(batch->num_columns(), num_columns);
+ ASSERT_EQ(batch->num_rows(), num_rows);
+ };
+ {
+ ASSERT_OK_AND_ASSIGN(auto batch_generator,
+ reader->GetRecordBatchGenerator(reader, {0, 1}, {0,
1}));
+ auto fut1 = batch_generator();
+ auto fut2 = batch_generator();
+ auto fut3 = batch_generator();
+ ASSERT_OK_AND_ASSIGN(auto batch1, fut1.result());
+ ASSERT_OK_AND_ASSIGN(auto batch2, fut2.result());
+ ASSERT_OK_AND_ASSIGN(auto batch3, fut3.result());
+ ASSERT_EQ(batch3, nullptr);
+ check_batches(batch1, num_columns, row_group_size);
+ check_batches(batch2, num_columns, row_group_size);
+ }
+ {
+ // No columns case
+ ASSERT_OK_AND_ASSIGN(auto batch_generator,
+ reader->GetRecordBatchGenerator(reader, {0, 1}, {}));
Review comment:
So you always have to pass columns explicitly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]