This is an automated email from the ASF dual-hosted git repository.
emkornfield pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 54f8fe3770 PARQUET-2225: [C++][Parquet] Allow reading dense with
RecordReader (#17877)
54f8fe3770 is described below
commit 54f8fe377099ab9904e9858aa26c63caa199e3df
Author: Fatemah Panahi <[email protected]>
AuthorDate: Fri Mar 3 11:28:39 2023 -0800
PARQUET-2225: [C++][Parquet] Allow reading dense with RecordReader (#17877)
Currently ReadRecords reads spaced by default. Some readers may need to
read the values dense, and reading spaced is less efficient than reading dense.
We need an option for reading dense.
Lead-authored-by: Fatemah Panahi <[email protected]>
Co-authored-by: Fatemah Panahi <[email protected]>
Signed-off-by: Micah Kornfield <[email protected]>
---
cpp/src/parquet/column_reader.cc | 233 ++++++---
cpp/src/parquet/column_reader.h | 37 +-
cpp/src/parquet/column_reader_benchmark.cc | 112 ++++-
cpp/src/parquet/column_reader_test.cc | 734 ++++++++++++++++++++++-------
4 files changed, 855 insertions(+), 261 deletions(-)
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 7e650596a6..cf1316bceb 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -340,13 +340,13 @@ class SerializedPageReader : public PageReader {
void SerializedPageReader::InitDecryption() {
// Prepare the AAD for quick update later.
if (crypto_ctx_.data_decryptor != nullptr) {
- DCHECK(!crypto_ctx_.data_decryptor->file_aad().empty());
+ ARROW_DCHECK(!crypto_ctx_.data_decryptor->file_aad().empty());
data_page_aad_ = encryption::CreateModuleAad(
crypto_ctx_.data_decryptor->file_aad(), encryption::kDataPage,
crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal,
kNonPageOrdinal);
}
if (crypto_ctx_.meta_decryptor != nullptr) {
- DCHECK(!crypto_ctx_.meta_decryptor->file_aad().empty());
+ ARROW_DCHECK(!crypto_ctx_.meta_decryptor->file_aad().empty());
data_page_header_aad_ = encryption::CreateModuleAad(
crypto_ctx_.meta_decryptor->file_aad(), encryption::kDataPageHeader,
crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal,
kNonPageOrdinal);
@@ -355,7 +355,7 @@ void SerializedPageReader::InitDecryption() {
void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>&
decryptor,
int8_t module_type, std::string*
page_aad) {
- DCHECK(decryptor != nullptr);
+ ARROW_DCHECK(decryptor != nullptr);
if (crypto_ctx_.start_decrypt_with_dictionary_page) {
std::string aad = encryption::CreateModuleAad(
decryptor->file_aad(), module_type, crypto_ctx_.row_group_ordinal,
@@ -768,7 +768,7 @@ class ColumnReaderImplBase {
new_dictionary_ = true;
current_decoder_ = decoders_[encoding].get();
- DCHECK(current_decoder_);
+ ARROW_DCHECK(current_decoder_);
}
// Initialize repetition and definition level decoders on the next data page.
@@ -868,7 +868,7 @@ class ColumnReaderImplBase {
auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
- DCHECK(it->second.get() != nullptr);
+ ARROW_DCHECK(it->second.get() != nullptr);
current_decoder_ = it->second.get();
} else {
switch (encoding) {
@@ -1326,11 +1326,12 @@ class TypedRecordReader : public
TypedColumnReaderImpl<DType>,
public:
using T = typename DType::c_type;
using BASE = TypedColumnReaderImpl<DType>;
- TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
MemoryPool* pool)
+ TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
MemoryPool* pool,
+ bool read_dense_for_nullable)
// Pager must be set using SetPageReader.
: BASE(descr, /* pager = */ nullptr, pool) {
leaf_info_ = leaf_info;
- nullable_values_ = leaf_info.HasNullableValues();
+ nullable_values_ = leaf_info_.HasNullableValues();
at_record_start_ = true;
values_written_ = 0;
null_count_ = 0;
@@ -1338,6 +1339,7 @@ class TypedRecordReader : public
TypedColumnReaderImpl<DType>,
levels_written_ = 0;
levels_position_ = 0;
levels_capacity_ = 0;
+ read_dense_for_nullable_ = read_dense_for_nullable;
uses_values_ = !(descr->physical_type() == Type::BYTE_ARRAY);
if (uses_values_) {
@@ -1666,7 +1668,7 @@ class TypedRecordReader : public
TypedColumnReaderImpl<DType>,
}
std::shared_ptr<ResizableBuffer> ReleaseIsValid() override {
- if (leaf_info_.HasNullableValues()) {
+ if (nullable_values()) {
auto result = valid_bits_;
PARQUET_THROW_NOT_OK(result->Resize(bit_util::BytesForBits(values_written_),
/*shrink_to_fit=*/true));
@@ -1690,7 +1692,7 @@ class TypedRecordReader : public
TypedColumnReaderImpl<DType>,
const int16_t* def_levels = this->def_levels() + levels_position_;
const int16_t* rep_levels = this->rep_levels() + levels_position_;
- DCHECK_GT(this->max_rep_level_, 0);
+ ARROW_DCHECK_GT(this->max_rep_level_, 0);
// Count logical records and number of values to read
while (levels_position_ < levels_written_) {
@@ -1780,7 +1782,7 @@ class TypedRecordReader : public
TypedColumnReaderImpl<DType>,
}
values_capacity_ = new_values_capacity;
}
- if (leaf_info_.HasNullableValues()) {
+ if (nullable_values() && !read_dense_for_nullable_) {
int64_t valid_bytes_new = bit_util::BytesForBits(values_capacity_);
if (valid_bits_->size() < valid_bytes_new) {
int64_t valid_bytes_old = bit_util::BytesForBits(values_written_);
@@ -1834,7 +1836,100 @@ class TypedRecordReader : public
TypedColumnReaderImpl<DType>,
CheckNumberDecoded(num_decoded, values_to_read);
}
- // Return number of logical records read
+ // Reads repeated records and returns number of records read. Fills in
+ // values_to_read and null_count.
+ int64_t ReadRepeatedRecords(int64_t num_records, int64_t* values_to_read,
+ int64_t* null_count) {
+ const int64_t start_levels_position = levels_position_;
+ // Note that repeated records may be required or nullable. If they have
+ // an optional parent in the path, they will be nullable, otherwise,
+ // they are required. We use leaf_info_->HasNullableValues() that looks
+ // at repeated_ancestor_def_level to determine if it is required or
+ // nullable. Even if they are required, we may have to read ahead and
+ // delimit the records to get the right number of values and they will
+ // have associated levels.
+ int64_t records_read = DelimitRecords(num_records, values_to_read);
+ if (!nullable_values() || read_dense_for_nullable_) {
+ ReadValuesDense(*values_to_read);
+ // null_count is always 0 for required.
+ ARROW_DCHECK_EQ(*null_count, 0);
+ } else {
+ ReadSpacedForOptionalOrRepeated(start_levels_position, values_to_read,
null_count);
+ }
+ return records_read;
+ }
+
+ // Reads optional records and returns number of records read. Fills in
+ // values_to_read and null_count.
+ int64_t ReadOptionalRecords(int64_t num_records, int64_t* values_to_read,
+ int64_t* null_count) {
+ const int64_t start_levels_position = levels_position_;
+ // No repetition levels, skip delimiting logic. Each level represents a
+ // null or not null entry
+ int64_t records_read =
+ std::min<int64_t>(levels_written_ - levels_position_, num_records);
+ // This is advanced by DelimitRecords for the repeated field case above.
+ levels_position_ += records_read;
+
+ // Optional fields are always nullable.
+ if (read_dense_for_nullable_) {
+ ReadDenseForOptional(start_levels_position, values_to_read);
+ // We don't need to update null_count when reading dense. It should be
+ // already set to 0.
+ ARROW_DCHECK_EQ(*null_count, 0);
+ } else {
+ ReadSpacedForOptionalOrRepeated(start_levels_position, values_to_read,
null_count);
+ }
+ return records_read;
+ }
+
+ // Reads required records and returns number of records read. Fills in
+ // values_to_read.
+ int64_t ReadRequiredRecords(int64_t num_records, int64_t* values_to_read) {
+ *values_to_read = num_records;
+ ReadValuesDense(*values_to_read);
+ return num_records;
+ }
+
+ // Reads dense for optional records. First it figures out how many values to
+ // read.
+ void ReadDenseForOptional(int64_t start_levels_position, int64_t*
values_to_read) {
+ // levels_position_ must already be incremented based on number of records
+ // read.
+ ARROW_DCHECK_GE(levels_position_, start_levels_position);
+
+ // When reading dense we need to figure out number of values to read.
+ const int16_t* def_levels = this->def_levels();
+ for (int64_t i = start_levels_position; i < levels_position_; ++i) {
+ if (def_levels[i] == this->max_def_level_) {
+ ++(*values_to_read);
+ }
+ }
+ ReadValuesDense(*values_to_read);
+ }
+
+ // Reads spaced for optional or repeated fields.
+ void ReadSpacedForOptionalOrRepeated(int64_t start_levels_position,
+ int64_t* values_to_read, int64_t*
null_count) {
+ // levels_position_ must already be incremented based on number of records
+ // read.
+ ARROW_DCHECK_GE(levels_position_, start_levels_position);
+ ValidityBitmapInputOutput validity_io;
+ validity_io.values_read_upper_bound = levels_position_ -
start_levels_position;
+ validity_io.valid_bits = valid_bits_->mutable_data();
+ validity_io.valid_bits_offset = values_written_;
+
+ DefLevelsToBitmap(def_levels() + start_levels_position,
+ levels_position_ - start_levels_position, leaf_info_,
&validity_io);
+ *values_to_read = validity_io.values_read - validity_io.null_count;
+ *null_count = validity_io.null_count;
+ ARROW_DCHECK_GE(*values_to_read, 0);
+ ARROW_DCHECK_GE(*null_count, 0);
+ ReadValuesSpaced(validity_io.values_read, *null_count);
+ }
+
+ // Return number of logical records read.
+ // Updates levels_position_, values_written_, and null_count_.
int64_t ReadRecordData(int64_t num_records) {
// Conservative upper bound
const int64_t possible_num_values =
@@ -1843,49 +1938,45 @@ class TypedRecordReader : public
TypedColumnReaderImpl<DType>,
const int64_t start_levels_position = levels_position_;
- int64_t values_to_read = 0;
+ // To be updated by the function calls below for each of the repetition
+ // types.
int64_t records_read = 0;
+ int64_t values_to_read = 0;
+ int64_t null_count = 0;
if (this->max_rep_level_ > 0) {
- records_read = DelimitRecords(num_records, &values_to_read);
+ // Repeated fields may be nullable or not.
+ // This call updates levels_position_.
+ records_read = ReadRepeatedRecords(num_records, &values_to_read,
&null_count);
} else if (this->max_def_level_ > 0) {
- // No repetition levels, skip delimiting logic. Each level represents a
- // null or not null entry
- records_read = std::min<int64_t>(levels_written_ - levels_position_,
num_records);
-
- // This is advanced by DelimitRecords, which we skipped
- levels_position_ += records_read;
+ // Non-repeated optional values are always nullable.
+ // This call updates levels_position_.
+ ARROW_DCHECK(nullable_values());
+ records_read = ReadOptionalRecords(num_records, &values_to_read,
&null_count);
} else {
- records_read = values_to_read = num_records;
+ ARROW_DCHECK(!nullable_values());
+ records_read = ReadRequiredRecords(num_records, &values_to_read);
+ // We don't need to update null_count, since it is 0.
}
- int64_t null_count = 0;
- if (leaf_info_.HasNullableValues()) {
- ValidityBitmapInputOutput validity_io;
- validity_io.values_read_upper_bound = levels_position_ -
start_levels_position;
- validity_io.valid_bits = valid_bits_->mutable_data();
- validity_io.valid_bits_offset = values_written_;
-
- DefLevelsToBitmap(def_levels() + start_levels_position,
- levels_position_ - start_levels_position, leaf_info_,
- &validity_io);
- values_to_read = validity_io.values_read - validity_io.null_count;
- null_count = validity_io.null_count;
- DCHECK_GE(values_to_read, 0);
- ReadValuesSpaced(validity_io.values_read, null_count);
+ ARROW_DCHECK_GE(records_read, 0);
+ ARROW_DCHECK_GE(values_to_read, 0);
+ ARROW_DCHECK_GE(null_count, 0);
+
+ if (read_dense_for_nullable_) {
+ values_written_ += values_to_read;
+ ARROW_DCHECK_EQ(null_count, 0);
} else {
- DCHECK_GE(values_to_read, 0);
- ReadValuesDense(values_to_read);
+ values_written_ += values_to_read + null_count;
+ null_count_ += null_count;
}
- if (this->leaf_info_.def_level > 0) {
+ // Total values, including null spaces, if any
+ if (this->max_def_level_ > 0) {
// Optional, repeated, or some mix thereof
this->ConsumeBufferedValues(levels_position_ - start_levels_position);
} else {
// Flat, non-repeated
this->ConsumeBufferedValues(values_to_read);
}
- // Total values, including null spaces, if any
- values_written_ += values_to_read + null_count;
- null_count_ += null_count;
return records_read;
}
@@ -1941,9 +2032,10 @@ class FLBARecordReader : public
TypedRecordReader<FLBAType>,
virtual public BinaryRecordReader {
public:
FLBARecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
- ::arrow::MemoryPool* pool)
- : TypedRecordReader<FLBAType>(descr, leaf_info, pool), builder_(nullptr)
{
- DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
+ ::arrow::MemoryPool* pool, bool read_dense_for_nullable)
+ : TypedRecordReader<FLBAType>(descr, leaf_info, pool,
read_dense_for_nullable),
+ builder_(nullptr) {
+ ARROW_DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
int byte_width = descr_->type_length();
std::shared_ptr<::arrow::DataType> type =
::arrow::fixed_size_binary(byte_width);
builder_ = std::make_unique<::arrow::FixedSizeBinaryBuilder>(type,
this->pool_);
@@ -1975,7 +2067,7 @@ class FLBARecordReader : public
TypedRecordReader<FLBAType>,
int64_t num_decoded = this->current_decoder_->DecodeSpaced(
values, static_cast<int>(values_to_read), static_cast<int>(null_count),
valid_bits, valid_bits_offset);
- DCHECK_EQ(num_decoded, values_to_read);
+ ARROW_DCHECK_EQ(num_decoded, values_to_read);
for (int64_t i = 0; i < num_decoded; i++) {
if (::arrow::bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
@@ -1995,9 +2087,10 @@ class ByteArrayChunkedRecordReader : public
TypedRecordReader<ByteArrayType>,
virtual public BinaryRecordReader {
public:
ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo
leaf_info,
- ::arrow::MemoryPool* pool)
- : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool) {
- DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
+ ::arrow::MemoryPool* pool, bool
read_dense_for_nullable)
+ : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool,
+ read_dense_for_nullable) {
+ ARROW_DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
accumulator_.builder = std::make_unique<::arrow::BinaryBuilder>(pool);
}
@@ -2036,8 +2129,9 @@ class ByteArrayDictionaryRecordReader : public
TypedRecordReader<ByteArrayType>,
virtual public DictionaryRecordReader {
public:
ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, LevelInfo
leaf_info,
- ::arrow::MemoryPool* pool)
- : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool),
builder_(pool) {
+ ::arrow::MemoryPool* pool, bool
read_dense_for_nullable)
+ : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool,
read_dense_for_nullable),
+ builder_(pool) {
this->read_dictionary_ = true;
}
@@ -2103,7 +2197,7 @@ class ByteArrayDictionaryRecordReader : public
TypedRecordReader<ByteArrayType>,
/// Flush values since they have been copied into the builder
ResetValues();
}
- DCHECK_EQ(num_decoded, values_to_read - null_count);
+ ARROW_DCHECK_EQ(num_decoded, values_to_read - null_count);
}
private:
@@ -2126,11 +2220,14 @@ void TypedRecordReader<FLBAType>::DebugPrintState() {}
std::shared_ptr<RecordReader> MakeByteArrayRecordReader(const
ColumnDescriptor* descr,
LevelInfo leaf_info,
::arrow::MemoryPool*
pool,
- bool read_dictionary) {
+ bool read_dictionary,
+ bool
read_dense_for_nullable) {
if (read_dictionary) {
- return std::make_shared<ByteArrayDictionaryRecordReader>(descr, leaf_info,
pool);
+ return std::make_shared<ByteArrayDictionaryRecordReader>(descr, leaf_info,
pool,
+
read_dense_for_nullable);
} else {
- return std::make_shared<ByteArrayChunkedRecordReader>(descr, leaf_info,
pool);
+ return std::make_shared<ByteArrayChunkedRecordReader>(descr, leaf_info,
pool,
+
read_dense_for_nullable);
}
}
@@ -2138,24 +2235,34 @@ std::shared_ptr<RecordReader>
MakeByteArrayRecordReader(const ColumnDescriptor*
std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr,
LevelInfo leaf_info,
MemoryPool* pool,
- const bool read_dictionary) {
+ bool read_dictionary,
+ bool read_dense_for_nullable)
{
switch (descr->physical_type()) {
case Type::BOOLEAN:
- return std::make_shared<TypedRecordReader<BooleanType>>(descr,
leaf_info, pool);
+ return std::make_shared<TypedRecordReader<BooleanType>>(descr,
leaf_info, pool,
+
read_dense_for_nullable);
case Type::INT32:
- return std::make_shared<TypedRecordReader<Int32Type>>(descr, leaf_info,
pool);
+ return std::make_shared<TypedRecordReader<Int32Type>>(descr, leaf_info,
pool,
+
read_dense_for_nullable);
case Type::INT64:
- return std::make_shared<TypedRecordReader<Int64Type>>(descr, leaf_info,
pool);
+ return std::make_shared<TypedRecordReader<Int64Type>>(descr, leaf_info,
pool,
+
read_dense_for_nullable);
case Type::INT96:
- return std::make_shared<TypedRecordReader<Int96Type>>(descr, leaf_info,
pool);
+ return std::make_shared<TypedRecordReader<Int96Type>>(descr, leaf_info,
pool,
+
read_dense_for_nullable);
case Type::FLOAT:
- return std::make_shared<TypedRecordReader<FloatType>>(descr, leaf_info,
pool);
+ return std::make_shared<TypedRecordReader<FloatType>>(descr, leaf_info,
pool,
+
read_dense_for_nullable);
case Type::DOUBLE:
- return std::make_shared<TypedRecordReader<DoubleType>>(descr, leaf_info,
pool);
- case Type::BYTE_ARRAY:
- return MakeByteArrayRecordReader(descr, leaf_info, pool,
read_dictionary);
+ return std::make_shared<TypedRecordReader<DoubleType>>(descr, leaf_info,
pool,
+
read_dense_for_nullable);
+ case Type::BYTE_ARRAY: {
+ return MakeByteArrayRecordReader(descr, leaf_info, pool, read_dictionary,
+ read_dense_for_nullable);
+ }
case Type::FIXED_LEN_BYTE_ARRAY:
- return std::make_shared<FLBARecordReader>(descr, leaf_info, pool);
+ return std::make_shared<FLBARecordReader>(descr, leaf_info, pool,
+ read_dense_for_nullable);
default: {
// PARQUET-1481: This can occur if the file is corrupt
std::stringstream ss;
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index a9f6e217b1..0e6c091502 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -308,16 +308,24 @@ namespace internal {
/// \since 1.3.0
class PARQUET_EXPORT RecordReader {
public:
+ /// \brief Creates a record reader.
+ /// @param descr Column descriptor
+ /// @param leaf_info Level info, used to determine if a column is nullable
or not
+ /// @param pool Memory pool to use for buffering values and rep/def levels
+ /// @param read_dictionary True if reading directly as Arrow
dictionary-encoded
+ /// @param read_dense_for_nullable True if reading dense and not leaving
space for null
+ /// values
static std::shared_ptr<RecordReader> Make(
const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
- const bool read_dictionary = false);
+ bool read_dictionary = false, bool read_dense_for_nullable = false);
virtual ~RecordReader() = default;
/// \brief Attempt to read indicated number of records from column chunk
/// Note that for repeated fields, a record may have more than one value
- /// and all of them are read.
+ /// and all of them are read. If read_dense_for_nullable() it will
+ /// not leave any space for null values. Otherwise, it will read spaced.
/// \return number of records read
virtual int64_t ReadRecords(int64_t num_records) = 0;
@@ -332,6 +340,7 @@ class PARQUET_EXPORT RecordReader {
/// \brief Clear consumed values and repetition/definition levels as the
/// result of calling ReadRecords
+ /// For FLBA and ByteArray types, call GetBuilderChunks() to reset them.
virtual void Reset() = 0;
/// \brief Transfer filled values buffer to caller. A new one will be
@@ -367,10 +376,15 @@ class PARQUET_EXPORT RecordReader {
}
/// \brief Decoded values, including nulls, if any
+ /// FLBA and ByteArray types do not use this array and read into their own
+ /// builders.
uint8_t* values() const { return values_->mutable_data(); }
- /// \brief Number of values written including nulls (if any)
- /// There is no read-ahead/buffering for values.
+ /// \brief Number of values written, including space left for nulls if any.
+ /// If this Reader was constructed with read_dense_for_nullable(), there is
no space for
+ /// nulls and null_count() will be 0. There is no read-ahead/buffering for
values. For
+ /// FLBA and ByteArray types this value reflects the values written with the
last
+ /// ReadRecords call since thoser readers will reset the values after each
call.
int64_t values_written() const { return values_written_; }
/// \brief Number of definition / repetition levels (from those that have
@@ -383,7 +397,9 @@ class PARQUET_EXPORT RecordReader {
/// the record boundaries.
int64_t levels_written() const { return levels_written_; }
- /// \brief Number of nulls in the leaf that we have read so far.
+ /// \brief Number of nulls in the leaf that we have read so far into the
+ /// values vector. This is only valid when !read_dense_for_nullable(). When
+ /// read_dense_for_nullable() it will always be 0.
int64_t null_count() const { return null_count_; }
/// \brief True if the leaf values are nullable
@@ -392,8 +408,12 @@ class PARQUET_EXPORT RecordReader {
/// \brief True if reading directly as Arrow dictionary-encoded
bool read_dictionary() const { return read_dictionary_; }
+ /// \brief True if reading dense for nullable columns.
+ bool read_dense_for_nullable() const { return read_dense_for_nullable_; }
+
protected:
- /// \brief Indicates if we can have nullable values.
+ /// \brief Indicates if we can have nullable values. Note that repeated
fields
+ /// may or may not be nullable.
bool nullable_values_;
bool at_record_start_;
@@ -413,7 +433,7 @@ class PARQUET_EXPORT RecordReader {
int64_t null_count_;
/// \brief Each bit corresponds to one element in 'values_' and specifies if
it
- /// is null or not null.
+ /// is null or not null. Not set if read_dense_for_nullable_ is true.
std::shared_ptr<::arrow::ResizableBuffer> valid_bits_;
/// \brief Buffer for definition levels. May contain more levels than
@@ -437,6 +457,9 @@ class PARQUET_EXPORT RecordReader {
int64_t levels_capacity_;
bool read_dictionary_ = false;
+ // If true, we will not leave any space for the null values in the values_
+ // vector.
+ bool read_dense_for_nullable_ = false;
};
class BinaryRecordReader : virtual public RecordReader {
diff --git a/cpp/src/parquet/column_reader_benchmark.cc
b/cpp/src/parquet/column_reader_benchmark.cc
index 00af976851..49b2317ede 100644
--- a/cpp/src/parquet/column_reader_benchmark.cc
+++ b/cpp/src/parquet/column_reader_benchmark.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <type_traits>
#include "benchmark/benchmark.h"
#include "parquet/column_page.h"
#include "parquet/column_reader.h"
@@ -26,6 +27,7 @@ namespace parquet {
using benchmark::DoNotOptimize;
using parquet::Repetition;
+using parquet::internal::RecordReader;
using parquet::test::MakePages;
using schema::NodePtr;
@@ -56,19 +58,35 @@ class BenchmarkHelper {
}
}
- Int32Reader* ResetReader() {
+ Int32Reader* ResetColumnReader() {
std::unique_ptr<PageReader> pager;
pager.reset(new test::MockPageReader(pages_));
column_reader_ = ColumnReader::Make(descr_.get(), std::move(pager));
return static_cast<Int32Reader*>(column_reader_.get());
}
+ RecordReader* ResetRecordReader(bool read_dense_for_nullable) {
+ std::unique_ptr<PageReader> pager;
+ pager.reset(new test::MockPageReader(pages_));
+ internal::LevelInfo level_info;
+ level_info.def_level = descr_->max_definition_level();
+ level_info.rep_level = descr_->max_repetition_level();
+ record_reader_ = internal::RecordReader::Make(
+ descr_.get(), level_info, ::arrow::default_memory_pool(),
+ /*read_dictionary=*/false, read_dense_for_nullable);
+ record_reader_->SetPageReader(std::move(pager));
+ return record_reader_.get();
+ }
+
int64_t total_size() const { return total_size_; }
private:
std::vector<std::shared_ptr<Page>> pages_;
std::unique_ptr<ColumnDescriptor> descr_;
+ // Reader for column reader benchmarks.
std::shared_ptr<ColumnReader> column_reader_;
+ // Reader for record reader benchmarks.
+ std::shared_ptr<RecordReader> record_reader_;
int64_t total_size_ = 0;
};
@@ -83,7 +101,7 @@ static void ColumnReaderSkipInt32(::benchmark::State& state)
{
for (auto _ : state) {
state.PauseTiming();
- Int32Reader* reader = helper.ResetReader();
+ Int32Reader* reader = helper.ResetColumnReader();
int64_t values_count = -1;
state.ResumeTiming();
while (values_count != 0) {
@@ -109,7 +127,7 @@ static void ColumnReaderReadBatchInt32(::benchmark::State&
state) {
std::vector<int16_t> read_reps(batch_size, -1);
for (auto _ : state) {
state.PauseTiming();
- Int32Reader* reader = helper.ResetReader();
+ Int32Reader* reader = helper.ResetColumnReader();
int64_t values_count = -1;
state.ResumeTiming();
while (values_count != 0) {
@@ -123,35 +141,83 @@ static void
ColumnReaderReadBatchInt32(::benchmark::State& state) {
state.SetBytesProcessed(state.iterations() * helper.total_size());
}
+// Benchmarks ReadRecords for RecordReader with the following parameters in
order:
+// - repetition: 0 for REQUIRED, 1 for OPTIONAL, 2 for REPEATED.
+// - batch_size: sets how many values to read at each call.
+// - read_dense_for_nullable: sets reading dense or spaced.
+static void RecordReaderReadRecords(::benchmark::State& state) {
+ const auto repetition = static_cast<Repetition::type>(state.range(0));
+ const auto batch_size = static_cast<int64_t>(state.range(1));
+ const bool read_dense_for_nullable = state.range(2);
+
+ BenchmarkHelper helper(repetition, /*num_pages=*/16,
/*levels_per_page=*/80000);
+
+ // Vectors to read the values into.
+ for (auto _ : state) {
+ state.PauseTiming();
+ RecordReader* reader = helper.ResetRecordReader(read_dense_for_nullable);
+ int64_t records_read = -1;
+ state.ResumeTiming();
+ while (records_read != 0) {
+ DoNotOptimize(records_read = reader->ReadRecords(batch_size));
+ reader->Reset();
+ }
+ }
+
+ state.SetBytesProcessed(state.iterations() * helper.total_size());
+}
+
+// Benchmarks SkipRecords for RecordReader with the following parameters in
order:
+// - repetition: 0 for REQUIRED, 1 for OPTIONAL, 2 for REPEATED.
+// - batch_size: sets how many values to read at each call.
+static void RecordReaderSkipRecords(::benchmark::State& state) {
+ const auto repetition = static_cast<Repetition::type>(state.range(0));
+ const auto batch_size = static_cast<int64_t>(state.range(1));
+
+ BenchmarkHelper helper(repetition, /*num_pages=*/16,
/*levels_per_page=*/80000);
+
+ // Vectors to read the values into.
+ for (auto _ : state) {
+ state.PauseTiming();
+ // read_dense_for_nullable should not matter for skip.
+ RecordReader* reader =
helper.ResetRecordReader(/*read_dense_for_nullable=*/false);
+ int64_t records_skipped = -1;
+ state.ResumeTiming();
+ while (records_skipped != 0) {
+ DoNotOptimize(records_skipped = reader->SkipRecords(batch_size));
+ reader->Reset();
+ }
+ }
+
+ state.SetBytesProcessed(state.iterations() * helper.total_size());
+}
+
BENCHMARK(ColumnReaderSkipInt32)
->ArgNames({"Repetition", "BatchSize"})
- ->Args({0, 100})
->Args({0, 1000})
- ->Args({0, 10000})
- ->Args({0, 100000})
- ->Args({1, 100})
->Args({1, 1000})
- ->Args({1, 10000})
- ->Args({1, 100000})
- ->Args({2, 100})
- ->Args({2, 1000})
- ->Args({2, 10000})
- ->Args({2, 100000});
+ ->Args({2, 1000});
BENCHMARK(ColumnReaderReadBatchInt32)
->ArgNames({"Repetition", "BatchSize"})
- ->Args({0, 100})
->Args({0, 1000})
- ->Args({0, 10000})
- ->Args({0, 100000})
- ->Args({1, 100})
->Args({1, 1000})
- ->Args({1, 10000})
- ->Args({1, 100000})
- ->Args({2, 100})
- ->Args({2, 1000})
- ->Args({2, 10000})
- ->Args({2, 100000});
+ ->Args({2, 1000});
+
+BENCHMARK(RecordReaderSkipRecords)
+ ->ArgNames({"Repetition", "BatchSize"})
+ ->Args({0, 1000})
+ ->Args({1, 1000})
+ ->Args({2, 1000});
+
+BENCHMARK(RecordReaderReadRecords)
+ ->ArgNames({"Repetition", "BatchSize", "ReadDense"})
+ ->Args({0, 1000, true})
+ ->Args({0, 1000, false})
+ ->Args({1, 1000, true})
+ ->Args({1, 1000, false})
+ ->Args({2, 1000, true})
+ ->Args({2, 1000, false});
} // namespace benchmark
} // namespace parquet
diff --git a/cpp/src/parquet/column_reader_test.cc
b/cpp/src/parquet/column_reader_test.cc
index a8ce72165a..414d79b8fb 100644
--- a/cpp/src/parquet/column_reader_test.cc
+++ b/cpp/src/parquet/column_reader_test.cc
@@ -37,9 +37,13 @@
namespace parquet {
-using parquet::Repetition;
-using parquet::internal::BinaryRecordReader;
+using ParquetType = parquet::Type;
+
+using internal::BinaryRecordReader;
+using internal::LevelInfo;
+using schema::GroupNode;
using schema::NodePtr;
+using schema::PrimitiveNode;
using testing::ElementsAre;
namespace test {
@@ -644,20 +648,40 @@ TEST_F(TestPrimitiveReader,
TestNonDictionaryEncodedPagesWithExposeEncoding) {
pages_.clear();
}
-class RecordReaderTest : public ::testing::Test {
- public:
- const int32_t kNullValue = -1;
+namespace {
- void Init(int32_t max_def_level, int32_t max_rep_level, Repetition::type
repetition) {
- level_info_.def_level = max_def_level;
- level_info_.rep_level = max_rep_level;
- repetition_type_ = repetition;
+internal::LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) {
+ internal::LevelInfo level_info;
+ level_info.def_level = descr->max_definition_level();
+ level_info.rep_level = descr->max_repetition_level();
+
+ int16_t min_spaced_def_level = descr->max_definition_level();
+ const ::parquet::schema::Node* node = descr->schema_node().get();
+ while (node != nullptr && !node->is_repeated()) {
+ if (node->is_optional()) {
+ min_spaced_def_level--;
+ }
+ node = node->parent();
+ }
+ level_info.repeated_ancestor_def_level = min_spaced_def_level;
+ return level_info;
+}
- NodePtr type = schema::Int32("b", repetition);
- descr_ = std::make_unique<ColumnDescriptor>(type, level_info_.def_level,
- level_info_.rep_level);
+} // namespace
- record_reader_ = internal::RecordReader::Make(descr_.get(), level_info_);
+using ReadDenseForNullable = bool;
+class RecordReaderPrimitiveTypeTest
+ : public ::testing::TestWithParam<ReadDenseForNullable> {
+ public:
+ const int32_t kNullValue = -1;
+
+ void Init(NodePtr column) {
+ NodePtr root = GroupNode::Make("root", Repetition::REQUIRED, {column});
+ schema_descriptor_.Init(root);
+ descr_ = schema_descriptor_.Column(0);
+ record_reader_ = internal::RecordReader::Make(descr_,
ComputeLevelInfo(descr_),
+
::arrow::default_memory_pool(),
+ /*read_dictionary=*/false,
GetParam());
}
void CheckReadValues(std::vector<int32_t> expected_values,
@@ -673,14 +697,14 @@ class RecordReaderTest : public ::testing::Test {
}
}
- if (repetition_type_ != Repetition::REQUIRED) {
+ if (descr_->schema_node()->is_required()) {
std::vector<int16_t> read_defs(
record_reader_->def_levels(),
record_reader_->def_levels() + record_reader_->levels_position());
ASSERT_TRUE(vector_equal(expected_defs, read_defs));
}
- if (repetition_type_ == Repetition::REPEATED) {
+ if (descr_->schema_node()->is_repeated()) {
std::vector<int16_t> read_reps(
record_reader_->rep_levels(),
record_reader_->rep_levels() + record_reader_->levels_position());
@@ -697,28 +721,212 @@ class RecordReaderTest : public ::testing::Test {
}
protected:
+ SchemaDescriptor schema_descriptor_;
std::shared_ptr<internal::RecordReader> record_reader_;
- std::unique_ptr<ColumnDescriptor> descr_;
- internal::LevelInfo level_info_;
- Repetition::type repetition_type_;
+ const ColumnDescriptor* descr_;
};
-// Tests reading a repeated field using the RecordReader.
-TEST_F(RecordReaderTest, BasicReadRepeatedField) {
- Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED);
+// Tests reading a required field. The expected results are the same for
+// reading dense and spaced.
+TEST_P(RecordReaderPrimitiveTypeTest, ReadRequired) {
+ Init(schema::Int32("b", Repetition::REQUIRED));
+
+ // Records look like: {10, 20, 20, 30, 30, 30}
+ std::vector<std::shared_ptr<Page>> pages;
+ std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+ std::vector<int16_t> def_levels = {};
+ std::vector<int16_t> rep_levels = {};
+
+ std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+ descr_, values, /*num_values=*/static_cast<int>(def_levels.size()),
Encoding::PLAIN,
+ /*indices=*/{},
+ /*indices_size=*/0, def_levels, descr_->max_definition_level(),
rep_levels,
+ descr_->max_repetition_level());
+ pages.push_back(std::move(page));
+ auto pager = std::make_unique<MockPageReader>(pages);
+ record_reader_->SetPageReader(std::move(pager));
+
+ // Read [10]
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
+ ASSERT_EQ(records_read, 1);
+ CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/0,
+ /*levels_position=*/0);
+ CheckReadValues(/*expected_values=*/{10}, /*expected_defs=*/{},
+ /*expected_reps=*/{});
+ record_reader_->Reset();
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
+ /*levels_position=*/0);
+
+ // Read 20, 20, 30, 30, 30
+ records_read = record_reader_->ReadRecords(/*num_records=*/10);
+ ASSERT_EQ(records_read, 5);
+ CheckState(/*values_written=*/5, /*null_count=*/0, /*levels_written=*/0,
+ /*levels_position=*/0);
+ CheckReadValues(/*expected_values=*/{20, 20, 30, 30, 30},
+ /*expected_defs=*/{},
+ /*expected_reps=*/{});
+ record_reader_->Reset();
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
+ /*levels_position=*/0);
+}
+
+// Tests reading an optional field.
+// Use a max definition field > 1 to test both cases where parent is present or
+// parent is missing.
+TEST_P(RecordReaderPrimitiveTypeTest, ReadOptional) {
+ NodePtr column = GroupNode::Make(
+ "a", Repetition::OPTIONAL,
+ {PrimitiveNode::Make("element", Repetition::OPTIONAL,
ParquetType::INT32)});
+ Init(column);
- // Records look like: {[10], null, [20, 20], null, [30, 30, 30], null}
+ // Records look like: {10, null, 20, 20, null, 30, 30, 30, null}
std::vector<std::shared_ptr<Page>> pages;
std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
- std::vector<int16_t> def_levels = {1, 0, 1, 1, 0, 1, 1, 1, 0};
- std::vector<int16_t> rep_levels = {0, 0, 0, 1, 0, 0, 1, 1, 0};
+ std::vector<int16_t> def_levels = {2, 0, 2, 2, 1, 2, 2, 2, 0};
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
- descr_.get(), values, /*num_values=*/static_cast<int>(def_levels.size()),
- Encoding::PLAIN,
+ descr_, values, /*num_values=*/static_cast<int>(def_levels.size()),
Encoding::PLAIN,
/*indices=*/{},
- /*indices_size=*/0, def_levels, level_info_.def_level, rep_levels,
- level_info_.rep_level);
+ /*indices_size=*/0, def_levels, descr_->max_definition_level(),
/*rep_levels=*/{},
+ descr_->max_repetition_level());
+ pages.push_back(std::move(page));
+ auto pager = std::make_unique<MockPageReader>(pages);
+ record_reader_->SetPageReader(std::move(pager));
+
+ // Read 10, null
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/2);
+ ASSERT_EQ(records_read, 2);
+ if (GetParam() == /*read_dense_for_nullable=*/true) {
+ CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/9,
+ /*levels_position=*/2);
+ CheckReadValues(/*expected_values=*/{10}, /*expected_defs=*/{2, 0},
+ /*expected_reps=*/{});
+ } else {
+ CheckState(/*values_written=*/2, /*null_count=*/1, /*levels_written=*/9,
+ /*levels_position=*/2);
+ CheckReadValues(/*expected_values=*/{10, kNullValue},
/*expected_defs=*/{2, 0},
+ /*expected_reps=*/{});
+ }
+ record_reader_->Reset();
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/7,
+ /*levels_position=*/0);
+
+ // Read 20, 20, null (parent present), 30, 30, 30
+ records_read = record_reader_->ReadRecords(/*num_records=*/6);
+ ASSERT_EQ(records_read, 6);
+ if (GetParam() == /*read_dense_for_nullable=*/true) {
+ CheckState(/*values_written=*/5, /*null_count=*/0, /*levels_written=*/7,
+ /*levels_position=*/6);
+ CheckReadValues(/*expected_values=*/{20, 20, 30, 30, 30},
+ /*expected_defs=*/{2, 2, 1, 2, 2, 2},
+ /*expected_reps=*/{});
+ } else {
+ CheckState(/*values_written=*/6, /*null_count=*/1, /*levels_written=*/7,
+ /*levels_position=*/6);
+ CheckReadValues(/*expected_values=*/{20, 20, kNullValue, 30, 30, 30},
+ /*expected_defs=*/{2, 2, 1, 2, 2, 2},
+ /*expected_reps=*/{});
+ }
+ record_reader_->Reset();
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
+ /*levels_position=*/0);
+
+ // Read the last null value and read past the end.
+ records_read = record_reader_->ReadRecords(/*num_records=*/3);
+ ASSERT_EQ(records_read, 1);
+ if (GetParam() == /*read_dense_for_nullable=*/true) {
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
+ /*levels_position=*/1);
+ CheckReadValues(/*expected_values=*/{},
+ /*expected_defs=*/{0},
+ /*expected_reps=*/{});
+ } else {
+ CheckState(/*values_written=*/1, /*null_count=*/1, /*levels_written=*/1,
+ /*levels_position=*/1);
+ CheckReadValues(/*expected_values=*/{kNullValue},
+ /*expected_defs=*/{0},
+ /*expected_reps=*/{});
+ }
+ record_reader_->Reset();
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
+ /*levels_position=*/0);
+}
+
+// Tests reading a required repeated field. The results are the same for
reading
+// dense or spaced.
+TEST_P(RecordReaderPrimitiveTypeTest, ReadRequiredRepeated) {
+ NodePtr column = GroupNode::Make(
+ "p", Repetition::REQUIRED,
+ {GroupNode::Make(
+ "list", Repetition::REPEATED,
+ {PrimitiveNode::Make("element", Repetition::REQUIRED,
ParquetType::INT32)})});
+ Init(column);
+
+ // Records look like: {[10], [20, 20], [30, 30, 30]}
+ std::vector<std::shared_ptr<Page>> pages;
+ std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+ std::vector<int16_t> def_levels = {1, 1, 1, 1, 1, 1};
+ std::vector<int16_t> rep_levels = {0, 0, 1, 0, 1, 1};
+
+ std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+ descr_, values, /*num_values=*/static_cast<int>(def_levels.size()),
Encoding::PLAIN,
+ /*indices=*/{},
+ /*indices_size=*/0, def_levels, descr_->max_definition_level(),
rep_levels,
+ descr_->max_repetition_level());
+ pages.push_back(std::move(page));
+ auto pager = std::make_unique<MockPageReader>(pages);
+ record_reader_->SetPageReader(std::move(pager));
+
+ // Read [10]
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
+ ASSERT_EQ(records_read, 1);
+ CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/6,
+ /*levels_position=*/1);
+ CheckReadValues(/*expected_values=*/{10}, /*expected_defs=*/{1},
+ /*expected_reps=*/{0});
+ record_reader_->Reset();
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/5,
+ /*levels_position=*/0);
+
+ // Read [20, 20], [30, 30, 30]
+ records_read = record_reader_->ReadRecords(/*num_records=*/3);
+ ASSERT_EQ(records_read, 2);
+ CheckState(/*values_written=*/5, /*null_count=*/0, /*levels_written=*/5,
+ /*levels_position=*/5);
+ CheckReadValues(/*expected_values=*/{20, 20, 30, 30, 30},
+ /*expected_defs=*/{1, 1, 1, 1, 1},
+ /*expected_reps=*/{0, 1, 0, 1, 1});
+ record_reader_->Reset();
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
+ /*levels_position=*/0);
+}
+
+// Tests reading a nullable repeated field. Tests reading null values at
+// differnet levels and reading an empty list.
+TEST_P(RecordReaderPrimitiveTypeTest, ReadNullableRepeated) {
+ NodePtr column = GroupNode::Make(
+ "p", Repetition::OPTIONAL,
+ {GroupNode::Make(
+ "list", Repetition::REPEATED,
+ {PrimitiveNode::Make("element", Repetition::OPTIONAL,
ParquetType::INT32)})});
+ Init(column);
+
+ // Records look like: {[10], null, [20, 20], [], [30, 30, null, 30]}
+ // Some explanation regarding the behavior. When reading spaced, for an
empty list or
+ // for a top-level null, we do not leave a space and we do not count it
towards
+ // null_count. For a leaf-level null, we leave a space for it and we count
it towards
+ // null_count. When reading dense, null_count is always 0, and we do not
leave any space
+ // for values.
+ std::vector<std::shared_ptr<Page>> pages;
+ std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+ std::vector<int16_t> def_levels = {3, 0, 3, 3, 1, 3, 3, 2, 3};
+ std::vector<int16_t> rep_levels = {0, 0, 0, 1, 0, 0, 1, 1, 1};
+
+ std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+ descr_, values, /*num_values=*/static_cast<int>(def_levels.size()),
Encoding::PLAIN,
+ /*indices=*/{},
+ /*indices_size=*/0, def_levels, descr_->max_definition_level(),
rep_levels,
+ descr_->max_repetition_level());
pages.push_back(std::move(page));
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
@@ -731,56 +939,82 @@ TEST_F(RecordReaderTest, BasicReadRepeatedField) {
ASSERT_EQ(record_reader_->descr()->max_definition_level(), 1);
// Read [10], null
+ // We do not read this null for both reading dense and spaced.
records_read = record_reader_->ReadRecords(/*num_records=*/2);
ASSERT_EQ(records_read, 2);
- CheckState(/*values_written=*/2, /*null_count=*/1, /*levels_written=*/9,
- /*levels_position=*/2);
- CheckReadValues(/*expected_values=*/{10, kNullValue}, /*expected_defs=*/{1,
0},
- /*expected_reps=*/{0, 0});
+ if (GetParam() == /*read_dense_for_nullable=*/true) {
+ CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/9,
+ /*levels_position=*/2);
+ CheckReadValues(/*expected_values=*/{10}, /*expected_defs=*/{3, 0},
+ /*expected_reps=*/{0, 0});
+ } else {
+ CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/9,
+ /*levels_position=*/2);
+ CheckReadValues(/*expected_values=*/{10}, /*expected_defs=*/{3, 0},
+ /*expected_reps=*/{0, 0});
+ }
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/7,
/*levels_position=*/0);
- // Read [20, 20], null, [30, 30, 30]
- records_read = record_reader_->ReadRecords(/*num_records=*/3);
- ASSERT_EQ(records_read, 3);
- CheckState(/*values_written=*/6, /*null_count=*/1, /*levels_written=*/7,
- /*levels_position=*/6);
- CheckReadValues(/*expected_values=*/{20, 20, kNullValue, 30, 30, 30},
- /*expected_defs=*/{1, 1, 0, 1, 1, 1},
- /*expected_reps=*/{0, 1, 0, 0, 1, 1});
+
+ // Read [20, 20], []
+ // We do not read any value for this, it will be counted towards null count
+ // when reading spaced.
+ records_read = record_reader_->ReadRecords(/*num_records=*/2);
+ ASSERT_EQ(records_read, 2);
+ if (GetParam() == /*read_dense_for_nullable=*/true) {
+ CheckState(/*values_written=*/2, /*null_count=*/0, /*levels_written=*/7,
+ /*levels_position=*/3);
+ CheckReadValues(/*expected_values=*/{20, 20},
+ /*expected_defs=*/{3, 3, 1},
+ /*expected_reps=*/{0, 1, 0});
+ } else {
+ CheckState(/*values_written=*/2, /*null_count=*/0, /*levels_written=*/7,
+ /*levels_position=*/3);
+ CheckReadValues(/*expected_values=*/{20, 20},
+ /*expected_defs=*/{3, 3, 1},
+ /*expected_reps=*/{0, 1, 0});
+ }
record_reader_->Reset();
- CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/4,
/*levels_position=*/0);
// Test reading 0 records.
records_read = record_reader_->ReadRecords(/*num_records=*/0);
ASSERT_EQ(records_read, 0);
- // Read the last null value and read past the end.
- records_read = record_reader_->ReadRecords(/*num_records=*/3);
+ // Read the last record.
+ records_read = record_reader_->ReadRecords(/*num_records=*/1);
ASSERT_EQ(records_read, 1);
- CheckState(/*values_written=*/1, /*null_count=*/1, /*levels_written=*/1,
- /*levels_position=*/1);
- CheckReadValues(/*expected_values=*/{kNullValue},
- /*expected_defs=*/{0},
- /*expected_reps=*/{0});
+ if (GetParam() == /*read_dense_for_nullable=*/true) {
+ CheckState(/*values_written=*/3, /*null_count=*/0, /*levels_written=*/4,
+ /*levels_position=*/4);
+ CheckReadValues(/*expected_values=*/{30, 30, 30},
+ /*expected_defs=*/{3, 3, 2, 3},
+ /*expected_reps=*/{0, 1, 1, 1});
+ } else {
+ CheckState(/*values_written=*/4, /*null_count=*/1, /*levels_written=*/4,
+ /*levels_position=*/4);
+ CheckReadValues(/*expected_values=*/{30, 30, kNullValue, 30},
+ /*expected_defs=*/{3, 3, 2, 3},
+ /*expected_reps=*/{0, 1, 1, 1});
+ }
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
/*levels_position=*/0);
}
// Test that we can skip required top level field.
-TEST_F(RecordReaderTest, SkipRequiredTopLevel) {
- Init(/*max_def_level=*/0, /*max_rep_level=*/0, Repetition::REQUIRED);
+TEST_P(RecordReaderPrimitiveTypeTest, SkipRequiredTopLevel) {
+ Init(schema::Int32("b", Repetition::REQUIRED));
std::vector<std::shared_ptr<Page>> pages;
std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
- descr_.get(), values, /*num_values=*/static_cast<int>(values.size()),
- Encoding::PLAIN,
+ descr_, values, /*num_values=*/static_cast<int>(values.size()),
Encoding::PLAIN,
/*indices=*/{},
- /*indices_size=*/0, /*def_levels=*/{}, level_info_.def_level,
- /*rep_levels=*/{}, level_info_.rep_level);
+ /*indices_size=*/0, /*def_levels=*/{}, descr_->max_definition_level(),
+ /*rep_levels=*/{}, descr_->max_repetition_level());
pages.push_back(std::move(page));
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
@@ -802,8 +1036,8 @@ TEST_F(RecordReaderTest, SkipRequiredTopLevel) {
}
// Skip an optional field. Intentionally included some null values.
-TEST_F(RecordReaderTest, SkipOptional) {
- Init(/*max_def_level=*/1, /*max_rep_level=*/0, Repetition::OPTIONAL);
+TEST_P(RecordReaderPrimitiveTypeTest, SkipOptional) {
+ Init(schema::Int32("b", Repetition::OPTIONAL));
// Records look like {null, 10, 20, 30, null, 40, 50, 60}
std::vector<std::shared_ptr<Page>> pages;
@@ -811,11 +1045,10 @@ TEST_F(RecordReaderTest, SkipOptional) {
std::vector<int16_t> def_levels = {0, 1, 1, 0, 1, 1, 1, 1};
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
- descr_.get(), values, /*num_values=*/static_cast<int>(values.size()),
- Encoding::PLAIN,
+ descr_, values, /*num_values=*/static_cast<int>(values.size()),
Encoding::PLAIN,
/*indices=*/{},
- /*indices_size=*/0, def_levels, level_info_.def_level,
- /*rep_levels=*/{}, level_info_.rep_level);
+ /*indices_size=*/0, def_levels, descr_->max_definition_level(),
+ /*rep_levels=*/{}, descr_->max_repetition_level());
pages.push_back(std::move(page));
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
@@ -834,27 +1067,33 @@ TEST_F(RecordReaderTest, SkipOptional) {
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/3);
ASSERT_EQ(records_read, 3);
- // values_written() includes null values.
- // We had skipped 2 of the levels above. So there is only 6 left in total
to
- // read, and we read 3 of them here.
- CheckState(/*values_written=*/3, /*null_count=*/1, /*levels_written=*/6,
- /*levels_position=*/3);
+ if (GetParam() == /*read_dense_for_nullable=*/true) {
+ // We had skipped 2 of the levels above. So there is only 6 left in
total to
+ // read, and we read 3 of them here.
+ CheckState(/*values_written=*/2, /*null_count=*/0, /*levels_written=*/6,
+ /*levels_position=*/3);
+ CheckReadValues(/*expected_values=*/{20, 30}, /*expected_defs=*/{1, 0,
1},
+ /*expected_reps=*/{});
+ } else {
+ CheckState(/*values_written=*/3, /*null_count=*/1, /*levels_written=*/6,
+ /*levels_position=*/3);
+ CheckReadValues(/*expected_values=*/{20, kNullValue, 30},
+ /*expected_defs=*/{1, 0, 1},
+ /*expected_reps=*/{});
+ }
- // ReadRecords for optional fields uses ReadValuesSpaced, so there is a
- // placeholder for null.
- CheckReadValues(/*expected_values=*/{20, kNullValue, 30},
/*expected_defs=*/{1, 0, 1},
- /*expected_reps=*/{});
+ record_reader_->Reset();
}
{
// Skip {40, 50}.
int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/2);
ASSERT_EQ(records_skipped, 2);
- CheckState(/*values_written=*/3, /*null_count=*/1, /*levels_written=*/4,
- /*levels_position=*/3);
- CheckReadValues(/*expected_values=*/{20, kNullValue, 30},
/*expected_defs=*/{1, 0, 1},
+ CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
+ /*levels_position=*/0);
+ CheckReadValues(/*expected_values=*/{}, /*expected_defs=*/{},
/*expected_reps=*/{});
- // Reset after a Skip.
+ // Try reset after a Skip.
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
/*levels_position=*/0);
@@ -880,8 +1119,8 @@ TEST_F(RecordReaderTest, SkipOptional) {
}
// Test skipping for repeated fields.
-TEST_F(RecordReaderTest, SkipRepeated) {
- Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED);
+TEST_P(RecordReaderPrimitiveTypeTest, SkipRepeated) {
+ Init(schema::Int32("b", Repetition::REPEATED));
// Records look like {null, [20, 20, 20], null, [30, 30], [40]}
std::vector<std::shared_ptr<Page>> pages;
@@ -890,11 +1129,10 @@ TEST_F(RecordReaderTest, SkipRepeated) {
std::vector<int16_t> rep_levels = {0, 0, 1, 1, 0, 0, 1, 0};
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
- descr_.get(), values, /*num_values=*/static_cast<int>(values.size()),
- Encoding::PLAIN,
+ descr_, values, /*num_values=*/static_cast<int>(values.size()),
Encoding::PLAIN,
/*indices=*/{},
- /*indices_size=*/0, def_levels, level_info_.def_level, rep_levels,
- level_info_.rep_level);
+ /*indices_size=*/0, def_levels, descr_->max_definition_level(),
rep_levels,
+ descr_->max_repetition_level());
pages.push_back(std::move(page));
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
@@ -965,8 +1203,8 @@ TEST_F(RecordReaderTest, SkipRepeated) {
// Tests that for repeated fields, we first consume what is in the buffer
// before reading more levels.
-TEST_F(RecordReaderTest, SkipRepeatedConsumeBufferFirst) {
- Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED);
+TEST_P(RecordReaderPrimitiveTypeTest, SkipRepeatedConsumeBufferFirst) {
+ Init(schema::Int32("b", Repetition::REPEATED));
std::vector<std::shared_ptr<Page>> pages;
std::vector<int32_t> values(2048, 10);
@@ -974,11 +1212,10 @@ TEST_F(RecordReaderTest, SkipRepeatedConsumeBufferFirst)
{
std::vector<int16_t> rep_levels(2048, 0);
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
- descr_.get(), values, /*num_values=*/static_cast<int>(values.size()),
- Encoding::PLAIN,
+ descr_, values, /*num_values=*/static_cast<int>(values.size()),
Encoding::PLAIN,
/*indices=*/{},
- /*indices_size=*/0, def_levels, level_info_.def_level, rep_levels,
- level_info_.rep_level);
+ /*indices_size=*/0, def_levels, descr_->max_definition_level(),
rep_levels,
+ descr_->max_repetition_level());
pages.push_back(std::move(page));
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
@@ -1010,38 +1247,38 @@ TEST_F(RecordReaderTest,
SkipRepeatedConsumeBufferFirst) {
}
// Test reading when one record spans multiple pages for a repeated field.
-TEST_F(RecordReaderTest, ReadPartialRecord) {
- Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED);
+TEST_P(RecordReaderPrimitiveTypeTest, ReadPartialRecord) {
+ Init(schema::Int32("b", Repetition::REPEATED));
std::vector<std::shared_ptr<Page>> pages;
// Page 1: {[10], [20, 20, 20 ... } continues to next page.
{
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
- descr_.get(), /*values=*/{10, 20, 20, 20}, /*num_values=*/4,
Encoding::PLAIN,
+ descr_, /*values=*/{10, 20, 20, 20}, /*num_values=*/4, Encoding::PLAIN,
/*indices=*/{},
- /*indices_size=*/0, /*def_levels=*/{1, 1, 1, 1}, level_info_.def_level,
- /*rep_levels=*/{0, 0, 1, 1}, level_info_.rep_level);
+ /*indices_size=*/0, /*def_levels=*/{1, 1, 1, 1},
descr_->max_definition_level(),
+ /*rep_levels=*/{0, 0, 1, 1}, descr_->max_repetition_level());
pages.push_back(std::move(page));
}
// Page 2: {... 20, 20, ...} continues from previous page and to next page.
{
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
- descr_.get(), /*values=*/{20, 20}, /*num_values=*/2, Encoding::PLAIN,
+ descr_, /*values=*/{20, 20}, /*num_values=*/2, Encoding::PLAIN,
/*indices=*/{},
- /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info_.def_level,
- /*rep_levels=*/{1, 1}, level_info_.rep_level);
+ /*indices_size=*/0, /*def_levels=*/{1, 1},
descr_->max_definition_level(),
+ /*rep_levels=*/{1, 1}, descr_->max_repetition_level());
pages.push_back(std::move(page));
}
// Page 3: { ... 20], [30]} continues from previous page.
{
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
- descr_.get(), /*values=*/{20, 30}, /*num_values=*/2, Encoding::PLAIN,
+ descr_, /*values=*/{20, 30}, /*num_values=*/2, Encoding::PLAIN,
/*indices=*/{},
- /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info_.def_level,
- /*rep_levels=*/{1, 0}, level_info_.rep_level);
+ /*indices_size=*/0, /*def_levels=*/{1, 1},
descr_->max_definition_level(),
+ /*rep_levels=*/{1, 0}, descr_->max_repetition_level());
pages.push_back(std::move(page));
}
@@ -1084,38 +1321,38 @@ TEST_F(RecordReaderTest, ReadPartialRecord) {
// Test skipping for repeated fields for the case when one record spans
multiple
// pages.
-TEST_F(RecordReaderTest, SkipPartialRecord) {
- Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED);
+TEST_P(RecordReaderPrimitiveTypeTest, SkipPartialRecord) {
+ Init(schema::Int32("b", Repetition::REPEATED));
std::vector<std::shared_ptr<Page>> pages;
// Page 1: {[10], [20, 20, 20 ... } continues to next page.
{
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
- descr_.get(), /*values=*/{10, 20, 20, 20}, /*num_values=*/4,
Encoding::PLAIN,
+ descr_, /*values=*/{10, 20, 20, 20}, /*num_values=*/4, Encoding::PLAIN,
/*indices=*/{},
- /*indices_size=*/0, /*def_levels=*/{1, 1, 1, 1}, level_info_.def_level,
- /*rep_levels=*/{0, 0, 1, 1}, level_info_.rep_level);
+ /*indices_size=*/0, /*def_levels=*/{1, 1, 1, 1},
descr_->max_definition_level(),
+ /*rep_levels=*/{0, 0, 1, 1}, descr_->max_repetition_level());
pages.push_back(std::move(page));
}
// Page 2: {... 20, 20, ...} continues from previous page and to next page.
{
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
- descr_.get(), /*values=*/{20, 20}, /*num_values=*/2, Encoding::PLAIN,
+ descr_, /*values=*/{20, 20}, /*num_values=*/2, Encoding::PLAIN,
/*indices=*/{},
- /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info_.def_level,
- /*rep_levels=*/{1, 1}, level_info_.rep_level);
+ /*indices_size=*/0, /*def_levels=*/{1, 1},
descr_->max_definition_level(),
+ /*rep_levels=*/{1, 1}, descr_->max_repetition_level());
pages.push_back(std::move(page));
}
// Page 3: { ... 20, [30]} continues from previous page.
{
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
- descr_.get(), /*values=*/{20, 30}, /*num_values=*/2, Encoding::PLAIN,
+ descr_, /*values=*/{20, 30}, /*num_values=*/2, Encoding::PLAIN,
/*indices=*/{},
- /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info_.def_level,
- /*rep_levels=*/{1, 0}, level_info_.rep_level);
+ /*indices_size=*/0, /*def_levels=*/{1, 1},
descr_->max_definition_level(),
+ /*rep_levels=*/{1, 0}, descr_->max_repetition_level());
pages.push_back(std::move(page));
}
@@ -1158,83 +1395,244 @@ TEST_F(RecordReaderTest, SkipPartialRecord) {
}
}
-// Test that SkipRecords works on ByteArrays. Specifically, this is testing
-// ReadAndThrowAwayValues for ByteArrays.
-TEST(RecordReaderByteArrayTest, SkipByteArray) {
- internal::LevelInfo level_info;
- level_info.def_level = 1;
- level_info.rep_level = 0;
+INSTANTIATE_TEST_SUITE_P(RecordReaderPrimitveTypeTests,
RecordReaderPrimitiveTypeTest,
+ ::testing::Values(/*read_dense_for_nullable=*/true,
false),
+ testing::PrintToStringParamName());
- // Must use REPEATED to exercise ReadAndThrowAwayValues for ByteArrays. It
- // does not do any buffering for Optional or Required fields as it calls
- // ResetValues after every read.
- NodePtr type = schema::ByteArray("b", Repetition::OPTIONAL);
- const ColumnDescriptor descr(type, level_info.def_level,
level_info.rep_level);
+// Parameterized test for FLBA record reader.
+class FLBARecordReaderTest : public ::testing::TestWithParam<bool> {
+ public:
+ bool read_dense_for_nullable() { return GetParam(); }
- std::vector<std::shared_ptr<Page>> pages;
- int levels_per_page = 90;
- int num_pages = 1;
+ void MakeRecordReader(int levels_per_page, int num_pages, int
FLBA_type_length) {
+ levels_per_page_ = levels_per_page;
+ FLBA_type_length_ = FLBA_type_length;
+ internal::LevelInfo level_info;
+ level_info.def_level = 1;
+ level_info.rep_level = 0;
+ NodePtr type = ::parquet::schema::PrimitiveNode::Make(
+ "b", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY,
ConvertedType::NONE,
+ FLBA_type_length_);
+ descr_ = std::make_unique<ColumnDescriptor>(type, level_info.def_level,
+ level_info.rep_level);
+ MakePages<FLBAType>(descr_.get(), num_pages, levels_per_page, def_levels_,
+ rep_levels_, values_, buffer_, pages_,
Encoding::PLAIN);
+ auto pager = std::make_unique<MockPageReader>(pages_);
+ record_reader_ = internal::RecordReader::Make(
+ descr_.get(), level_info, ::arrow::default_memory_pool(),
+ /*read_dictionary=*/false, read_dense_for_nullable());
+ record_reader_->SetPageReader(std::move(pager));
+ }
- std::vector<int16_t> def_levels;
- std::vector<int16_t> rep_levels;
- std::vector<ByteArray> values;
- std::vector<uint8_t> buffer;
+ // Returns expected values in row range.
+ // We need this since some values are null.
+ std::vector<std::string_view> expected_values(int start, int end) {
+ std::vector<std::string_view> result;
+ // Find out where in the values_ vector we start from.
+ size_t values_index = 0;
+ for (int i = 0; i < start; ++i) {
+ if (def_levels_[i] != 0) {
+ ++values_index;
+ }
+ }
- MakePages<ByteArrayType>(&descr, num_pages, levels_per_page, def_levels,
rep_levels,
- values, buffer, pages, Encoding::PLAIN);
+ for (int i = start; i < end; ++i) {
+ if (def_levels_[i] == 0) {
+ if (!read_dense_for_nullable()) {
+ result.emplace_back();
+ }
+ continue;
+ }
+ result.emplace_back(reinterpret_cast<const
char*>(values_[values_index].ptr),
+ FLBA_type_length_);
+ ++values_index;
+ }
+ return result;
+ }
- auto pager = std::make_unique<MockPageReader>(pages);
+ void CheckReadValues(int start, int end) {
+ auto binary_reader =
dynamic_cast<BinaryRecordReader*>(record_reader_.get());
+ ASSERT_NE(binary_reader, nullptr);
+ // Chunks are reset after this call.
+ ::arrow::ArrayVector array_vector = binary_reader->GetBuilderChunks();
+ ASSERT_EQ(array_vector.size(), 1);
+ auto binary_array =
+ dynamic_cast<::arrow::FixedSizeBinaryArray*>(array_vector[0].get());
+
+ ASSERT_NE(binary_array, nullptr);
+ ASSERT_EQ(binary_array->length(), record_reader_->values_written());
+ if (read_dense_for_nullable()) {
+ ASSERT_EQ(binary_array->null_count(), 0);
+ ASSERT_EQ(record_reader_->null_count(), 0);
+ } else {
+ ASSERT_EQ(binary_array->null_count(), record_reader_->null_count());
+ }
- std::shared_ptr<internal::RecordReader> record_reader =
- internal::RecordReader::Make(&descr, level_info);
- record_reader->SetPageReader(std::move(pager));
+ std::vector<std::string_view> expected = expected_values(start, end);
+ for (size_t i = 0; i < expected.size(); ++i) {
+ if (def_levels_[i + start] == 0) {
+ ASSERT_EQ(!read_dense_for_nullable(), binary_array->IsNull(i));
+ } else {
+ ASSERT_EQ(expected[i].compare(binary_array->GetView(i)), 0);
+ ASSERT_FALSE(binary_array->IsNull(i));
+ }
+ }
+ }
+
+ protected:
+ std::shared_ptr<internal::RecordReader> record_reader_;
+
+ private:
+ int levels_per_page_;
+ int FLBA_type_length_;
+ std::vector<std::shared_ptr<Page>> pages_;
+ std::vector<int16_t> def_levels_;
+ std::vector<int16_t> rep_levels_;
+ std::vector<FixedLenByteArray> values_;
+ std::vector<uint8_t> buffer_;
+ std::unique_ptr<ColumnDescriptor> descr_;
+};
+
+// Similar to above, except for Byte arrays. FLBA and Byte arrays are
+// sufficiently different to warrant a separate class for readability.
+class ByteArrayRecordReaderTest : public ::testing::TestWithParam<bool> {
+ public:
+ bool read_dense_for_nullable() { return GetParam(); }
+
+ void MakeRecordReader(int levels_per_page, int num_pages) {
+ levels_per_page_ = levels_per_page;
+ internal::LevelInfo level_info;
+ level_info.def_level = 1;
+ level_info.rep_level = 0;
+ NodePtr type = schema::ByteArray("b", Repetition::OPTIONAL);
+ descr_ = std::make_unique<ColumnDescriptor>(type, level_info.def_level,
+ level_info.rep_level);
+ MakePages<ByteArrayType>(descr_.get(), num_pages, levels_per_page,
def_levels_,
+ rep_levels_, values_, buffer_, pages_,
Encoding::PLAIN);
+
+ auto pager = std::make_unique<MockPageReader>(pages_);
+
+ record_reader_ = internal::RecordReader::Make(
+ descr_.get(), level_info, ::arrow::default_memory_pool(),
+ /*read_dictionary=*/false, read_dense_for_nullable());
+ record_reader_->SetPageReader(std::move(pager));
+ }
+
+ // Returns expected values in row range.
+ // We need this since some values are null.
+ std::vector<std::string_view> expected_values(int start, int end) {
+ std::vector<std::string_view> result;
+ // Find out where in the values_ vector we start from.
+ size_t values_index = 0;
+ for (int i = 0; i < start; ++i) {
+ if (def_levels_[i] != 0) {
+ ++values_index;
+ }
+ }
+
+ for (int i = start; i < end; ++i) {
+ if (def_levels_[i] == 0) {
+ if (!read_dense_for_nullable()) {
+ result.emplace_back();
+ }
+ continue;
+ }
+ result.emplace_back(reinterpret_cast<const
char*>(values_[values_index].ptr),
+ values_[values_index].len);
+ ++values_index;
+ }
+ return result;
+ }
+
+ void CheckReadValues(int start, int end) {
+ auto binary_reader =
dynamic_cast<BinaryRecordReader*>(record_reader_.get());
+ ASSERT_NE(binary_reader, nullptr);
+ // Chunks are reset after this call.
+ ::arrow::ArrayVector array_vector = binary_reader->GetBuilderChunks();
+ ASSERT_EQ(array_vector.size(), 1);
+ ::arrow::BinaryArray* binary_array =
+ dynamic_cast<::arrow::BinaryArray*>(array_vector[0].get());
+
+ ASSERT_NE(binary_array, nullptr);
+ ASSERT_EQ(binary_array->length(), record_reader_->values_written());
+ if (read_dense_for_nullable()) {
+ ASSERT_EQ(binary_array->null_count(), 0);
+ ASSERT_EQ(record_reader_->null_count(), 0);
+ } else {
+ ASSERT_EQ(binary_array->null_count(), record_reader_->null_count());
+ }
+
+ std::vector<std::string_view> expected = expected_values(start, end);
+ for (size_t i = 0; i < expected.size(); ++i) {
+ if (def_levels_[i + start] == 0) {
+ ASSERT_EQ(!read_dense_for_nullable(), binary_array->IsNull(i));
+ } else {
+ ASSERT_EQ(expected[i].compare(binary_array->GetView(i)), 0);
+ ASSERT_FALSE(binary_array->IsNull(i));
+ }
+ }
+ }
+
+ protected:
+ std::shared_ptr<internal::RecordReader> record_reader_;
+
+ private:
+ int levels_per_page_;
+ std::vector<std::shared_ptr<Page>> pages_;
+ std::vector<int16_t> def_levels_;
+ std::vector<int16_t> rep_levels_;
+ std::vector<ByteArray> values_;
+ std::vector<uint8_t> buffer_;
+ std::unique_ptr<ColumnDescriptor> descr_;
+};
+
+// Tests reading and skipping a ByteArray field.
+// The binary readers only differ in DeocdeDense and DecodeSpaced functions, so
+// testing optional is sufficient in excercising those code paths.
+TEST_P(ByteArrayRecordReaderTest, ReadAndSkipOptional) {
+ MakeRecordReader(/*levels_per_page=*/90, /*num_pages=*/1);
// Read one-third of the page.
- ASSERT_EQ(record_reader->ReadRecords(/*num_records=*/30), 30);
+ ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/30), 30);
+ CheckReadValues(0, 30);
+ record_reader_->Reset();
// Skip 30 records.
- ASSERT_EQ(record_reader->SkipRecords(/*num_records=*/30), 30);
+ ASSERT_EQ(record_reader_->SkipRecords(/*num_records=*/30), 30);
// Read 60 more records. Only 30 will be read, since we read 30 and skipped
30,
// so only 30 is left.
- ASSERT_EQ(record_reader->ReadRecords(/*num_records=*/60), 30);
-
- auto binary_reader = dynamic_cast<BinaryRecordReader*>(record_reader.get());
- ASSERT_NE(binary_reader, nullptr);
- // Chunks are reset after this call.
- ::arrow::ArrayVector array_vector = binary_reader->GetBuilderChunks();
- ASSERT_EQ(array_vector.size(), 1);
- auto binary_array =
dynamic_cast<::arrow::BinaryArray*>(array_vector[0].get());
- ASSERT_NE(binary_array, nullptr);
- ASSERT_EQ(binary_array->length(), 60);
-
- // Our values above are not spaced, however, the RecordReader will
- // read spaced for nullable values.
- // Create spaced expected values.
- std::vector<std::string_view> expected_values;
- size_t values_index = 0;
- for (int i = 0; i < 90; ++i) {
- if (def_levels[i] == 0) {
- expected_values.emplace_back();
- continue;
- }
- expected_values.emplace_back(reinterpret_cast<const
char*>(values[values_index].ptr),
- values[values_index].len);
- ++values_index;
- }
+ ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/60), 30);
+ CheckReadValues(60, 90);
+ record_reader_->Reset();
+}
- // Check that the expected values match the actual values.
- for (size_t i = 0; i < 30; ++i) {
- ASSERT_EQ(expected_values[i].compare(binary_array->GetView(i)), 0);
- ASSERT_EQ(def_levels[i] == 0, binary_array->IsNull(i));
- }
- // Repeat for the next range that we read.
- for (size_t i = 60; i < 90; ++i) {
- ASSERT_EQ(expected_values[i].compare(binary_array->GetView(i - 30)), 0);
- ASSERT_EQ(def_levels[i] == 0, binary_array->IsNull(i - 30));
- }
+// Tests reading and skipping an optional FLBA field.
+// The binary readers only differ in DeocdeDense and DecodeSpaced functions, so
+// testing optional is sufficient in excercising those code paths.
+TEST_P(FLBARecordReaderTest, ReadAndSkipOptional) {
+ MakeRecordReader(/*levels_per_page=*/90, /*num_pages=*/1,
/*FLBA_type_length=*/4);
+
+ // Read one-third of the page.
+ ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/30), 30);
+ CheckReadValues(0, 30);
+ record_reader_->Reset();
+
+ // Skip 30 records.
+ ASSERT_EQ(record_reader_->SkipRecords(/*num_records=*/30), 30);
+
+ // Read 60 more records. Only 30 will be read, since we read 30 and skipped
30,
+ // so only 30 is left.
+ ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/60), 30);
+ CheckReadValues(60, 90);
+ record_reader_->Reset();
}
+INSTANTIATE_TEST_SUITE_P(ByteArrayRecordReaderTests, ByteArrayRecordReaderTest,
+ testing::Bool());
+
+INSTANTIATE_TEST_SUITE_P(FLBARecordReaderTests, FLBARecordReaderTest,
testing::Bool());
+
// Test random combination of ReadRecords and SkipRecords.
class RecordReaderStressTest : public
::testing::TestWithParam<Repetition::type> {};