This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new f93004f23f GH-40592: [C++][Parquet] Implement SizeStatistics (#40594)
f93004f23f is described below
commit f93004f23f7cb1a641abb805b10fb845c77bb23f
Author: Gang Wu <[email protected]>
AuthorDate: Wed Dec 18 16:48:23 2024 +0800
GH-40592: [C++][Parquet] Implement SizeStatistics (#40594)
### Rationale for this change
Parquet format 2.10.0 has introduced SizeStatistics. parquet-mr has also
implemented this: https://github.com/apache/parquet-mr/pull/1177. Now it is
time for parquet-cpp to pick the ball.
### What changes are included in this PR?
Implement reading and writing size statistics for parquet-cpp.
### Are these changes tested?
Yes, a bunch of test cases have been added.
### Are there any user-facing changes?
Yes, now parquet users are able to read and write size statistics.
* GitHub Issue: #40592
Authored-by: Gang Wu <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/util/hashing.h | 8 +
cpp/src/parquet/CMakeLists.txt | 2 +
cpp/src/parquet/column_page.h | 22 ++-
cpp/src/parquet/column_writer.cc | 123 +++++++++++---
cpp/src/parquet/encoder.cc | 30 ++++
cpp/src/parquet/encoding.h | 5 +
cpp/src/parquet/encoding_test.cc | 28 ++++
cpp/src/parquet/metadata.cc | 23 +++
cpp/src/parquet/metadata.h | 16 +-
cpp/src/parquet/page_index.cc | 90 ++++++++++-
cpp/src/parquet/page_index.h | 36 +++--
cpp/src/parquet/page_index_benchmark.cc | 2 +-
cpp/src/parquet/page_index_test.cc | 116 +++++++++++--
cpp/src/parquet/properties.h | 37 ++++-
cpp/src/parquet/size_statistics.cc | 94 +++++++++++
cpp/src/parquet/size_statistics.h | 92 +++++++++++
cpp/src/parquet/size_statistics_test.cc | 279 ++++++++++++++++++++++++++++++++
cpp/src/parquet/thrift_internal.h | 20 +++
cpp/src/parquet/type_fwd.h | 15 ++
19 files changed, 967 insertions(+), 71 deletions(-)
diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h
index 4ead1a7283..52525a83aa 100644
--- a/cpp/src/arrow/util/hashing.h
+++ b/cpp/src/arrow/util/hashing.h
@@ -843,6 +843,14 @@ class BinaryMemoTable : public MemoTable {
}
}
+ // Visit the stored value at a specific index in insertion order.
+ // The visitor function should have the signature `void(std::string_view)`
+ // or `void(const std::string_view&)`.
+ template <typename VisitFunc>
+ void VisitValue(int32_t idx, VisitFunc&& visit) const {
+ visit(binary_builder_.GetView(idx));
+ }
+
protected:
struct Payload {
int32_t memo_index;
diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt
index 9c28b749e4..0a9f92cebb 100644
--- a/cpp/src/parquet/CMakeLists.txt
+++ b/cpp/src/parquet/CMakeLists.txt
@@ -181,6 +181,7 @@ set(PARQUET_SRCS
printer.cc
properties.cc
schema.cc
+ size_statistics.cc
statistics.cc
stream_reader.cc
stream_writer.cc
@@ -373,6 +374,7 @@ add_parquet_test(internals-test
metadata_test.cc
page_index_test.cc
public_api_test.cc
+ size_statistics_test.cc
types_test.cc)
set_source_files_properties(public_api_test.cc PROPERTIES
SKIP_PRECOMPILE_HEADERS ON
diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h
index b389ffd98e..111265a842 100644
--- a/cpp/src/parquet/column_page.h
+++ b/cpp/src/parquet/column_page.h
@@ -26,6 +26,7 @@
#include <optional>
#include <string>
+#include "parquet/size_statistics.h"
#include "parquet/statistics.h"
#include "parquet/types.h"
@@ -69,20 +70,22 @@ class DataPage : public Page {
/// Currently it is only present from data pages created by ColumnWriter in
order
/// to collect page index.
std::optional<int64_t> first_row_index() const { return first_row_index_; }
+ const SizeStatistics& size_statistics() const { return size_statistics_; }
virtual ~DataPage() = default;
protected:
DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t
num_values,
Encoding::type encoding, int64_t uncompressed_size,
- EncodedStatistics statistics = EncodedStatistics(),
- std::optional<int64_t> first_row_index = std::nullopt)
+ EncodedStatistics statistics, std::optional<int64_t>
first_row_index,
+ SizeStatistics size_statistics)
: Page(buffer, type),
num_values_(num_values),
encoding_(encoding),
uncompressed_size_(uncompressed_size),
statistics_(std::move(statistics)),
- first_row_index_(std::move(first_row_index)) {}
+ first_row_index_(std::move(first_row_index)),
+ size_statistics_(std::move(size_statistics)) {}
int32_t num_values_;
Encoding::type encoding_;
@@ -90,6 +93,7 @@ class DataPage : public Page {
EncodedStatistics statistics_;
/// Row ordinal within the row group to the first row in the data page.
std::optional<int64_t> first_row_index_;
+ SizeStatistics size_statistics_;
};
class DataPageV1 : public DataPage {
@@ -98,9 +102,11 @@ class DataPageV1 : public DataPage {
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t
uncompressed_size,
EncodedStatistics statistics = EncodedStatistics(),
- std::optional<int64_t> first_row_index = std::nullopt)
+ std::optional<int64_t> first_row_index = std::nullopt,
+ SizeStatistics size_statistics = SizeStatistics())
: DataPage(PageType::DATA_PAGE, buffer, num_values, encoding,
uncompressed_size,
- std::move(statistics), std::move(first_row_index)),
+ std::move(statistics), std::move(first_row_index),
+ std::move(size_statistics)),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding) {}
@@ -120,9 +126,11 @@ class DataPageV2 : public DataPage {
int32_t definition_levels_byte_length, int32_t
repetition_levels_byte_length,
int64_t uncompressed_size, bool is_compressed = false,
EncodedStatistics statistics = EncodedStatistics(),
- std::optional<int64_t> first_row_index = std::nullopt)
+ std::optional<int64_t> first_row_index = std::nullopt,
+ SizeStatistics size_statistics = SizeStatistics())
: DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding,
uncompressed_size,
- std::move(statistics), std::move(first_row_index)),
+ std::move(statistics), std::move(first_row_index),
+ std::move(size_statistics)),
num_nulls_(num_nulls),
num_rows_(num_rows),
definition_levels_byte_length_(definition_levels_byte_length),
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index d3e0fdfa81..12cbcf20af 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -55,6 +55,7 @@
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
+#include "parquet/size_statistics.h"
#include "parquet/statistics.h"
#include "parquet/thrift_internal.h"
#include "parquet/types.h"
@@ -437,7 +438,7 @@ class SerializedPageWriter : public PageWriter {
/// Collect page index
if (column_index_builder_ != nullptr) {
- column_index_builder_->AddPage(page.statistics());
+ column_index_builder_->AddPage(page.statistics(),
page.size_statistics());
}
if (offset_index_builder_ != nullptr) {
const int64_t compressed_size = output_data_len + header_size;
@@ -451,8 +452,9 @@ class SerializedPageWriter : public PageWriter {
/// start_pos is a relative offset in the buffered mode. It should be
/// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter
/// has flushed all data pages.
- offset_index_builder_->AddPage(start_pos,
static_cast<int32_t>(compressed_size),
- *page.first_row_index());
+ offset_index_builder_->AddPage(
+ start_pos, static_cast<int32_t>(compressed_size),
*page.first_row_index(),
+ page.size_statistics().unencoded_byte_array_data_bytes);
}
total_uncompressed_size_ += uncompressed_size + header_size;
@@ -774,11 +776,17 @@ class ColumnWriterImpl {
// Serializes Dictionary Page if enabled
virtual void WriteDictionaryPage() = 0;
+ // A convenience struct to combine the encoded statistics and size statistics
+ struct StatisticsPair {
+ EncodedStatistics encoded_stats;
+ SizeStatistics size_stats;
+ };
+
// Plain-encoded statistics of the current page
- virtual EncodedStatistics GetPageStatistics() = 0;
+ virtual StatisticsPair GetPageStatistics() = 0;
// Plain-encoded statistics of the whole chunk
- virtual EncodedStatistics GetChunkStatistics() = 0;
+ virtual StatisticsPair GetChunkStatistics() = 0;
// Merges page statistics into chunk statistics, then resets the values
virtual void ResetPageStatistics() = 0;
@@ -981,8 +989,7 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t
definition_levels_rle_size,
PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size,
values,
uncompressed_data_->mutable_data());
-
- EncodedStatistics page_stats = GetPageStatistics();
+ auto [page_stats, page_size_stats] = GetPageStatistics();
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
ResetPageStatistics();
@@ -1006,13 +1013,15 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t
definition_levels_rle_size,
compressed_data->CopySlice(0, compressed_data->size(), allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
compressed_data_copy, num_values, encoding_, Encoding::RLE,
Encoding::RLE,
- uncompressed_size, std::move(page_stats), first_row_index);
+ uncompressed_size, std::move(page_stats), first_row_index,
+ std::move(page_size_stats));
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page_ptr));
} else { // Eagerly write pages
DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE,
Encoding::RLE,
- uncompressed_size, std::move(page_stats), first_row_index);
+ uncompressed_size, std::move(page_stats), first_row_index,
+ std::move(page_size_stats));
WriteDataPage(page);
}
}
@@ -1039,7 +1048,7 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t
definition_levels_rle_size,
ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size,
compressed_values, combined->mutable_data());
- EncodedStatistics page_stats = GetPageStatistics();
+ auto [page_stats, page_size_stats] = GetPageStatistics();
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
ResetPageStatistics();
@@ -1062,14 +1071,15 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t
definition_levels_rle_size,
combined->CopySlice(0, combined->size(),
allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>(
combined, num_values, null_count, num_rows, encoding_,
def_levels_byte_length,
- rep_levels_byte_length, uncompressed_size, pager_->has_compressor(),
page_stats,
- first_row_index);
+ rep_levels_byte_length, uncompressed_size, pager_->has_compressor(),
+ std::move(page_stats), first_row_index, std::move(page_size_stats));
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page_ptr));
} else {
DataPageV2 page(combined, num_values, null_count, num_rows, encoding_,
def_levels_byte_length, rep_levels_byte_length,
uncompressed_size,
- pager_->has_compressor(), page_stats, first_row_index);
+ pager_->has_compressor(), std::move(page_stats),
first_row_index,
+ std::move(page_size_stats));
WriteDataPage(page);
}
}
@@ -1083,7 +1093,7 @@ int64_t ColumnWriterImpl::Close() {
FlushBufferedDataPages();
- EncodedStatistics chunk_statistics = GetChunkStatistics();
+ auto [chunk_statistics, chunk_size_statistics] = GetChunkStatistics();
chunk_statistics.ApplyStatSizeLimits(
properties_->max_statistics_size(descr_->path()));
chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
@@ -1092,6 +1102,9 @@ int64_t ColumnWriterImpl::Close() {
if (rows_written_ > 0 && chunk_statistics.is_set()) {
metadata_->SetStatistics(chunk_statistics);
}
+ if (rows_written_ > 0 && chunk_size_statistics.is_set()) {
+ metadata_->SetSizeStatistics(chunk_size_statistics);
+ }
metadata_->SetKeyValueMetadata(key_value_metadata_);
pager_->Close(has_dictionary_, fallback_);
}
@@ -1217,6 +1230,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
}
+ if (properties->size_statistics_level() ==
SizeStatisticsLevel::ColumnChunk ||
+ properties->size_statistics_level() ==
SizeStatisticsLevel::PageAndColumnChunk) {
+ page_size_statistics_ = SizeStatistics::Make(descr_);
+ chunk_size_statistics_ = SizeStatistics::Make(descr_);
+ }
pages_change_on_record_boundaries_ =
properties->data_page_version() == ParquetDataPageVersion::V2 ||
properties->page_index_enabled(descr_->path());
@@ -1355,15 +1373,26 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
total_bytes_written_ += pager_->WriteDictionaryPage(page);
}
- EncodedStatistics GetPageStatistics() override {
- EncodedStatistics result;
- if (page_statistics_) result = page_statistics_->Encode();
+ StatisticsPair GetPageStatistics() override {
+ StatisticsPair result;
+ if (page_statistics_) {
+ result.encoded_stats = page_statistics_->Encode();
+ }
+ if (properties_->size_statistics_level() ==
SizeStatisticsLevel::PageAndColumnChunk) {
+ ARROW_DCHECK(page_size_statistics_ != nullptr);
+ result.size_stats = *page_size_statistics_;
+ }
return result;
}
- EncodedStatistics GetChunkStatistics() override {
- EncodedStatistics result;
- if (chunk_statistics_) result = chunk_statistics_->Encode();
+ StatisticsPair GetChunkStatistics() override {
+ StatisticsPair result;
+ if (chunk_statistics_) {
+ result.encoded_stats = chunk_statistics_->Encode();
+ }
+ if (chunk_size_statistics_) {
+ result.size_stats = *chunk_size_statistics_;
+ }
return result;
}
@@ -1372,6 +1401,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
chunk_statistics_->Merge(*page_statistics_);
page_statistics_->Reset();
}
+ if (page_size_statistics_ != nullptr) {
+ chunk_size_statistics_->Merge(*page_size_statistics_);
+ page_size_statistics_->Reset();
+ }
}
Type::type type() const override { return descr_->physical_type(); }
@@ -1425,6 +1458,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
DictEncoder<DType>* current_dict_encoder_;
std::shared_ptr<TypedStats> page_statistics_;
std::shared_ptr<TypedStats> chunk_statistics_;
+ std::unique_ptr<SizeStatistics> page_size_statistics_;
+ std::shared_ptr<SizeStatistics> chunk_size_statistics_;
bool pages_change_on_record_boundaries_;
// If writing a sequence of ::arrow::DictionaryArray to the writer, we keep
the
@@ -1467,6 +1502,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
rows_written_ += num_values;
num_buffered_rows_ += num_values;
}
+
+ UpdateLevelHistogram(num_values, def_levels, rep_levels);
return values_to_write;
}
@@ -1558,6 +1595,47 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
rows_written_ += num_levels;
num_buffered_rows_ += num_levels;
}
+
+ UpdateLevelHistogram(num_levels, def_levels, rep_levels);
+ }
+
+ void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels) const {
+ if (page_size_statistics_ == nullptr) {
+ return;
+ }
+
+ auto add_levels = [](std::vector<int64_t>& level_histogram,
+ ::arrow::util::span<const int16_t> levels) {
+ for (int16_t level : levels) {
+ ARROW_DCHECK_LT(level, static_cast<int16_t>(level_histogram.size()));
+ ++level_histogram[level];
+ }
+ };
+
+ if (descr_->max_definition_level() > 0) {
+ add_levels(page_size_statistics_->definition_level_histogram,
+ {def_levels, static_cast<size_t>(num_levels)});
+ } else {
+ page_size_statistics_->definition_level_histogram[0] += num_levels;
+ }
+
+ if (descr_->max_repetition_level() > 0) {
+ add_levels(page_size_statistics_->repetition_level_histogram,
+ {rep_levels, static_cast<size_t>(num_levels)});
+ } else {
+ page_size_statistics_->repetition_level_histogram[0] += num_levels;
+ }
+ }
+
+ // Update the unencoded data bytes for ByteArray only per the specification.
+ void UpdateUnencodedDataBytes() const {
+ if constexpr (std::is_same_v<T, ByteArray>) {
+ if (page_size_statistics_ != nullptr) {
+ page_size_statistics_->IncrementUnencodedByteArrayDataBytes(
+ current_encoder_->ReportUnencodedDataBytes());
+ }
+ }
}
void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
@@ -1611,6 +1689,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
if (page_statistics_ != nullptr) {
page_statistics_->Update(values, num_values, num_nulls);
}
+ UpdateUnencodedDataBytes();
}
/// \brief Write values with spaces and update page statistics accordingly.
@@ -1639,6 +1718,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset,
num_spaced_values, num_values, num_nulls);
}
+ UpdateUnencodedDataBytes();
}
};
@@ -1739,6 +1819,8 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
writeable_indices,
MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool));
dict_encoder->PutIndices(*writeable_indices);
+ // Update unencoded byte array data size to size statistics
+ UpdateUnencodedDataBytes();
CommitWriteAndCheckPageLimit(batch_size, batch_num_values, null_count,
check_page);
value_offset += batch_num_spaced_values;
};
@@ -2219,6 +2301,7 @@ Status
TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
page_statistics_->IncrementNullCount(batch_size - non_null);
page_statistics_->IncrementNumValues(non_null);
}
+ UpdateUnencodedDataBytes();
CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size -
non_null,
check_page);
CheckDictionarySizeLimit();
diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc
index 89d5d44c52..f41eb9a191 100644
--- a/cpp/src/parquet/encoder.cc
+++ b/cpp/src/parquet/encoder.cc
@@ -79,6 +79,15 @@ class EncoderImpl : virtual public Encoder {
MemoryPool* memory_pool() const override { return pool_; }
+ int64_t ReportUnencodedDataBytes() override {
+ if (descr_->physical_type() != Type::BYTE_ARRAY) {
+ throw ParquetException("ReportUnencodedDataBytes is only supported for
BYTE_ARRAY");
+ }
+ int64_t bytes = unencoded_byte_array_data_bytes_;
+ unencoded_byte_array_data_bytes_ = 0;
+ return bytes;
+ }
+
protected:
// For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
const ColumnDescriptor* descr_;
@@ -87,6 +96,8 @@ class EncoderImpl : virtual public Encoder {
/// Type length from descr
const int type_length_;
+ /// Number of unencoded bytes written to the encoder. Used for ByteArray
type only.
+ int64_t unencoded_byte_array_data_bytes_ = 0;
};
// ----------------------------------------------------------------------
@@ -132,6 +143,7 @@ class PlainEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
sink_.UnsafeAppend(&length, sizeof(uint32_t));
sink_.UnsafeAppend(data, static_cast<int64_t>(length));
+ unencoded_byte_array_data_bytes_ += length;
}
void Put(const ByteArray& val) {
@@ -513,6 +525,18 @@ class DictEncoderImpl : public EncoderImpl, virtual public
DictEncoder<DType> {
static_cast<int32_t>(values[i + position]);
}
});
+
+ // Track unencoded bytes based on dictionary value type
+ if constexpr (std::is_same_v<DType, ByteArrayType>) {
+ // For ByteArray, need to look up actual lengths from dictionary
+ for (size_t idx =
+ buffer_position - static_cast<size_t>(data.length() -
data.null_count());
+ idx < buffer_position; ++idx) {
+ memo_table_.VisitValue(buffered_indices_[idx], [&](std::string_view
value) {
+ unencoded_byte_array_data_bytes_ += value.length();
+ });
+ }
+ }
}
void PutIndices(const ::arrow::Array& data) override {
@@ -656,6 +680,7 @@ inline void
DictEncoderImpl<ByteArrayType>::PutByteArray(const void* ptr,
PARQUET_THROW_NOT_OK(
memo_table_.GetOrInsert(ptr, length, on_found, on_not_found,
&memo_index));
buffered_indices_.push_back(memo_index);
+ unencoded_byte_array_data_bytes_ += length;
}
template <>
@@ -1268,6 +1293,7 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl,
}
length_encoder_.Put({static_cast<int32_t>(view.length())}, 1);
PARQUET_THROW_NOT_OK(sink_.Append(view.data(), view.length()));
+ unencoded_byte_array_data_bytes_ += view.size();
return Status::OK();
},
[]() { return Status::OK(); }));
@@ -1313,6 +1339,7 @@ void DeltaLengthByteArrayEncoder::Put(const T* src, int
num_values) {
for (int idx = 0; idx < num_values; idx++) {
sink_.UnsafeAppend(src[idx].ptr, src[idx].len);
}
+ unencoded_byte_array_data_bytes_ += total_increment_size;
}
void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values,
@@ -1444,6 +1471,8 @@ class DeltaByteArrayEncoder : public EncoderImpl, virtual
public TypedEncoder<DT
// Convert to ByteArray, so it can be passed to the suffix_encoder_.
const ByteArray suffix(suffix_length, suffix_ptr);
suffixes[j] = suffix;
+
+ unencoded_byte_array_data_bytes_ += len;
}
suffix_encoder_.Put(suffixes.data(), batch_size);
prefix_length_encoder_.Put(prefix_lengths.data(), batch_size);
@@ -1488,6 +1517,7 @@ class DeltaByteArrayEncoder : public EncoderImpl, virtual
public TypedEncoder<DT
const ByteArray suffix(suffix_length, suffix_ptr);
suffix_encoder_.Put(&suffix, 1);
+ unencoded_byte_array_data_bytes_ += len;
return Status::OK();
},
[]() { return Status::OK(); }));
diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h
index 5717886f10..f2d64dfc80 100644
--- a/cpp/src/parquet/encoding.h
+++ b/cpp/src/parquet/encoding.h
@@ -159,6 +159,11 @@ class Encoder {
virtual void Put(const ::arrow::Array& values) = 0;
+ // Report the number of bytes written to the encoder since the last report.
+ // It only works for BYTE_ARRAY type and throw for other types.
+ // This call is not idempotent since it resets the internal counter.
+ virtual int64_t ReportUnencodedDataBytes() = 0;
+
virtual MemoryPool* memory_pool() const = 0;
};
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 78bf26587e..974e9ce622 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -178,6 +178,7 @@ class TestEncodingBase : public ::testing::Test {
void SetUp() {
descr_ = ExampleDescr<Type>();
type_length_ = descr_->type_length();
+ unencoded_byte_array_data_bytes_ = 0;
allocator_ = default_memory_pool();
}
@@ -197,6 +198,8 @@ class TestEncodingBase : public ::testing::Test {
draws_[nvalues * j + i] = draws_[i];
}
}
+
+ InitUnencodedByteArrayDataBytes();
}
virtual void CheckRoundtrip() = 0;
@@ -222,6 +225,16 @@ class TestEncodingBase : public ::testing::Test {
}
}
+ void InitUnencodedByteArrayDataBytes() {
+ // Calculate expected unencoded bytes based on type
+ if constexpr (std::is_same_v<Type, ByteArrayType>) {
+ unencoded_byte_array_data_bytes_ = 0;
+ for (int i = 0; i < num_values_; i++) {
+ unencoded_byte_array_data_bytes_ += draws_[i].len;
+ }
+ }
+ }
+
protected:
MemoryPool* allocator_;
@@ -235,6 +248,7 @@ class TestEncodingBase : public ::testing::Test {
std::shared_ptr<Buffer> encode_buffer_;
std::shared_ptr<ColumnDescriptor> descr_;
+ int64_t unencoded_byte_array_data_bytes_; // unencoded data size for dense
values
};
// Member variables are not visible to templated subclasses. Possibly figure
@@ -261,6 +275,10 @@ class TestPlainEncoding : public TestEncodingBase<Type> {
auto decoder = MakeTypedDecoder<Type>(Encoding::PLAIN, descr_.get());
encoder->Put(draws_, num_values_);
encode_buffer_ = encoder->FlushValues();
+ if constexpr (std::is_same_v<Type, ByteArrayType>) {
+ ASSERT_EQ(encoder->ReportUnencodedDataBytes(),
+ this->unencoded_byte_array_data_bytes_);
+ }
decoder->SetData(num_values_, encode_buffer_->data(),
static_cast<int>(encode_buffer_->size()));
@@ -346,6 +364,10 @@ class TestDictionaryEncoding : public
TestEncodingBase<Type> {
AllocateBuffer(default_memory_pool(),
dict_traits->dict_encoded_size());
dict_traits->WriteDict(dict_buffer_->mutable_data());
std::shared_ptr<Buffer> indices = encoder->FlushValues();
+ if constexpr (std::is_same_v<Type, ByteArrayType>) {
+ ASSERT_EQ(encoder->ReportUnencodedDataBytes(),
+ this->unencoded_byte_array_data_bytes_);
+ }
auto base_spaced_encoder =
MakeEncoder(Type::type_num, Encoding::PLAIN, true, descr_.get());
@@ -1992,6 +2014,10 @@ class TestDeltaLengthByteArrayEncoding : public
TestEncodingBase<Type> {
encoder->Put(draws_, num_values_);
encode_buffer_ = encoder->FlushValues();
+ if constexpr (std::is_same_v<Type, ByteArrayType>) {
+ ASSERT_EQ(encoder->ReportUnencodedDataBytes(),
+ this->unencoded_byte_array_data_bytes_);
+ }
decoder->SetData(num_values_, encode_buffer_->data(),
static_cast<int>(encode_buffer_->size()));
@@ -2296,6 +2322,8 @@ class TestDeltaByteArrayEncoding : public
TestDeltaLengthByteArrayEncoding<Type>
draws_[nvalues * j + i] = draws_[i];
}
}
+
+ TestEncodingBase<Type>::InitUnencodedByteArrayDataBytes();
}
Encoding::type GetEncoding() override { return Encoding::DELTA_BYTE_ARRAY; }
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index 8f577be45b..f47c614219 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -37,6 +37,7 @@
#include "parquet/exception.h"
#include "parquet/schema.h"
#include "parquet/schema_internal.h"
+#include "parquet/size_statistics.h"
#include "parquet/thrift_internal.h"
namespace parquet {
@@ -265,6 +266,11 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
LoadEnumSafe(&encoding_stats.encoding),
encoding_stats.count});
}
+ if (column_metadata_->__isset.size_statistics) {
+ size_statistics_ =
+
std::make_shared<SizeStatistics>(FromThrift(column_metadata_->size_statistics));
+ size_statistics_->Validate(descr_);
+ }
possible_stats_ = nullptr;
InitKeyValueMetadata();
}
@@ -308,6 +314,10 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
return is_stats_set() ? possible_stats_ : nullptr;
}
+ inline std::shared_ptr<SizeStatistics> size_statistics() const {
+ return size_statistics_;
+ }
+
inline Compression::type compression() const {
return LoadEnumSafe(&column_metadata_->codec);
}
@@ -396,6 +406,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
const ReaderProperties properties_;
const ApplicationVersion* writer_version_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
+ std::shared_ptr<SizeStatistics> size_statistics_;
};
std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
@@ -439,6 +450,10 @@ std::shared_ptr<Statistics>
ColumnChunkMetaData::statistics() const {
bool ColumnChunkMetaData::is_stats_set() const { return impl_->is_stats_set();
}
+std::shared_ptr<SizeStatistics> ColumnChunkMetaData::size_statistics() const {
+ return impl_->size_statistics();
+}
+
std::optional<int64_t> ColumnChunkMetaData::bloom_filter_offset() const {
return impl_->bloom_filter_offset();
}
@@ -1543,6 +1558,10 @@ class
ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
column_chunk_->meta_data.__set_statistics(ToThrift(val));
}
+ void SetSizeStatistics(const SizeStatistics& size_stats) {
+ column_chunk_->meta_data.__set_size_statistics(ToThrift(size_stats));
+ }
+
void Finish(int64_t num_values, int64_t dictionary_page_offset,
int64_t index_page_offset, int64_t data_page_offset,
int64_t compressed_size, int64_t uncompressed_size, bool
has_dictionary,
@@ -1752,6 +1771,10 @@ void ColumnChunkMetaDataBuilder::SetStatistics(const
EncodedStatistics& result)
impl_->SetStatistics(result);
}
+void ColumnChunkMetaDataBuilder::SetSizeStatistics(const SizeStatistics&
size_stats) {
+ impl_->SetSizeStatistics(size_stats);
+}
+
void ColumnChunkMetaDataBuilder::SetKeyValueMetadata(
std::shared_ptr<const KeyValueMetadata> key_value_metadata) {
impl_->SetKeyValueMetadata(std::move(key_value_metadata));
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index dc97d816da..9a3964f7d6 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -28,23 +28,9 @@
#include "parquet/encryption/type_fwd.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
-#include "parquet/schema.h"
-#include "parquet/types.h"
namespace parquet {
-class ColumnDescriptor;
-class EncodedStatistics;
-class FileCryptoMetaData;
-class Statistics;
-class SchemaDescriptor;
-
-namespace schema {
-
-class ColumnPath;
-
-} // namespace schema
-
using KeyValueMetadata = ::arrow::KeyValueMetadata;
class PARQUET_EXPORT ApplicationVersion {
@@ -156,6 +142,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
std::shared_ptr<schema::ColumnPath> path_in_schema() const;
bool is_stats_set() const;
std::shared_ptr<Statistics> statistics() const;
+ std::shared_ptr<SizeStatistics> size_statistics() const;
Compression::type compression() const;
// Indicate if the ColumnChunk compression is supported by the current
@@ -451,6 +438,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
// column metadata
void SetStatistics(const EncodedStatistics& stats);
+ void SetSizeStatistics(const SizeStatistics& size_stats);
void SetKeyValueMetadata(std::shared_ptr<const KeyValueMetadata>
key_value_metadata);
diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc
index afda4c6064..8cc819f10c 100644
--- a/cpp/src/parquet/page_index.cc
+++ b/cpp/src/parquet/page_index.cc
@@ -159,6 +159,22 @@ class TypedColumnIndexImpl : public
TypedColumnIndex<DType> {
const std::vector<T>& max_values() const override { return max_values_; }
+ bool has_definition_level_histograms() const override {
+ return column_index_.__isset.definition_level_histograms;
+ }
+
+ bool has_repetition_level_histograms() const override {
+ return column_index_.__isset.repetition_level_histograms;
+ }
+
+ const std::vector<int64_t>& definition_level_histograms() const override {
+ return column_index_.definition_level_histograms;
+ }
+
+ const std::vector<int64_t>& repetition_level_histograms() const override {
+ return column_index_.repetition_level_histograms;
+ }
+
private:
/// Wrapped thrift column index.
const format::ColumnIndex column_index_;
@@ -178,14 +194,22 @@ class OffsetIndexImpl : public OffsetIndex {
page_location.compressed_page_size,
page_location.first_row_index});
}
+ if (offset_index.__isset.unencoded_byte_array_data_bytes) {
+ unencoded_byte_array_data_bytes_ =
offset_index.unencoded_byte_array_data_bytes;
+ }
}
const std::vector<PageLocation>& page_locations() const override {
return page_locations_;
}
+ const std::vector<int64_t>& unencoded_byte_array_data_bytes() const override
{
+ return unencoded_byte_array_data_bytes_;
+ }
+
private:
std::vector<PageLocation> page_locations_;
+ std::vector<int64_t> unencoded_byte_array_data_bytes_;
};
class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader {
@@ -460,7 +484,8 @@ class ColumnIndexBuilderImpl final : public
ColumnIndexBuilder {
column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
}
- void AddPage(const EncodedStatistics& stats) override {
+ void AddPage(const EncodedStatistics& stats,
+ const SizeStatistics& size_stats) override {
if (state_ == BuilderState::kFinished) {
throw ParquetException("Cannot add page to finished
ColumnIndexBuilder.");
} else if (state_ == BuilderState::kDiscarded) {
@@ -493,6 +518,17 @@ class ColumnIndexBuilderImpl final : public
ColumnIndexBuilder {
column_index_.__isset.null_counts = false;
column_index_.null_counts.clear();
}
+
+ if (size_stats.is_set()) {
+ const auto& page_def_level_hist = size_stats.definition_level_histogram;
+ const auto& page_ref_level_hist = size_stats.repetition_level_histogram;
+ column_index_.definition_level_histograms.insert(
+ column_index_.definition_level_histograms.end(),
page_def_level_hist.cbegin(),
+ page_def_level_hist.cend());
+ column_index_.repetition_level_histograms.insert(
+ column_index_.repetition_level_histograms.end(),
page_ref_level_hist.cbegin(),
+ page_ref_level_hist.cend());
+ }
}
void Finish() override {
@@ -533,6 +569,29 @@ class ColumnIndexBuilderImpl final : public
ColumnIndexBuilder {
/// Decide the boundary order from decoded min/max values.
auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
column_index_.__set_boundary_order(ToThrift(boundary_order));
+
+ // Finalize level histogram.
+ const int64_t num_pages = column_index_.null_pages.size();
+ const int64_t def_level_hist_size =
column_index_.definition_level_histograms.size();
+ const int64_t rep_level_hist_size =
column_index_.repetition_level_histograms.size();
+ if (def_level_hist_size != 0 &&
+ def_level_hist_size != (descr_->max_definition_level() + 1) *
num_pages) {
+ std::stringstream ss;
+ ss << "Invalid definition level histogram size: " << def_level_hist_size
+ << ", expected: " << (descr_->max_definition_level() + 1) * num_pages;
+ throw ParquetException(ss.str());
+ }
+ if (rep_level_hist_size != 0 &&
+ rep_level_hist_size != (descr_->max_repetition_level() + 1) *
num_pages) {
+ std::stringstream ss;
+ ss << "Invalid repetition level histogram size: " << rep_level_hist_size
+ << ", expected: " << (descr_->max_repetition_level() + 1) * num_pages;
+ throw ParquetException(ss.str());
+ }
+ column_index_.__isset.definition_level_histograms =
+ !column_index_.definition_level_histograms.empty();
+ column_index_.__isset.repetition_level_histograms =
+ !column_index_.repetition_level_histograms.empty();
}
void WriteTo(::arrow::io::OutputStream* sink, Encryptor* encryptor) const
override {
@@ -604,8 +663,8 @@ class OffsetIndexBuilderImpl final : public
OffsetIndexBuilder {
public:
OffsetIndexBuilderImpl() = default;
- void AddPage(int64_t offset, int32_t compressed_page_size,
- int64_t first_row_index) override {
+ void AddPage(int64_t offset, int32_t compressed_page_size, int64_t
first_row_index,
+ std::optional<int64_t> unencoded_byte_array_length) override {
if (state_ == BuilderState::kFinished) {
throw ParquetException("Cannot add page to finished
OffsetIndexBuilder.");
} else if (state_ == BuilderState::kDiscarded) {
@@ -620,6 +679,10 @@ class OffsetIndexBuilderImpl final : public
OffsetIndexBuilder {
page_location.__set_compressed_page_size(compressed_page_size);
page_location.__set_first_row_index(first_row_index);
offset_index_.page_locations.emplace_back(std::move(page_location));
+ if (unencoded_byte_array_length.has_value()) {
+ offset_index_.unencoded_byte_array_data_bytes.emplace_back(
+ unencoded_byte_array_length.value());
+ }
}
void Finish(int64_t final_position) override {
@@ -636,6 +699,19 @@ class OffsetIndexBuilderImpl final : public
OffsetIndexBuilder {
page_location.__set_offset(page_location.offset + final_position);
}
}
+
+ // Finalize unencoded_byte_array_data_bytes and make sure page sizes
match.
+ if (offset_index_.page_locations.size() ==
+ offset_index_.unencoded_byte_array_data_bytes.size()) {
+ offset_index_.__isset.unencoded_byte_array_data_bytes = true;
+ } else if (!offset_index_.unencoded_byte_array_data_bytes.empty()) {
+ std::stringstream ss;
+ ss << "Invalid count of unencoded BYTE_ARRAY data bytes: "
+ << offset_index_.unencoded_byte_array_data_bytes.size()
+ << ", expected page count: " <<
offset_index_.page_locations.size();
+ throw ParquetException(ss.str());
+ }
+
state_ = BuilderState::kFinished;
break;
}
@@ -813,6 +889,14 @@ class PageIndexBuilderImpl final : public PageIndexBuilder
{
} // namespace
+void OffsetIndexBuilder::AddPage(const PageLocation& page_location,
+ const SizeStatistics& size_stats) {
+ this->AddPage(
+ page_location.offset, page_location.compressed_page_size,
+ page_location.first_row_index,
+ size_stats.is_set() ? size_stats.unencoded_byte_array_data_bytes :
std::nullopt);
+}
+
RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup(
const RowGroupMetaData& row_group_metadata, const std::vector<int32_t>&
columns) {
int64_t ci_start = std::numeric_limits<int64_t>::max();
diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h
index d45c59cab2..3083159783 100644
--- a/cpp/src/parquet/page_index.h
+++ b/cpp/src/parquet/page_index.h
@@ -19,6 +19,7 @@
#include "arrow/io/interfaces.h"
#include "parquet/encryption/type_fwd.h"
+#include "parquet/type_fwd.h"
#include "parquet/types.h"
#include <optional>
@@ -26,9 +27,6 @@
namespace parquet {
-class EncodedStatistics;
-struct PageIndexLocation;
-
/// \brief ColumnIndex is a proxy around format::ColumnIndex.
class PARQUET_EXPORT ColumnIndex {
public:
@@ -76,6 +74,18 @@ class PARQUET_EXPORT ColumnIndex {
/// \brief A vector of page indices for non-null pages.
virtual const std::vector<int32_t>& non_null_page_indices() const = 0;
+
+ /// \brief Whether definition level histogram is available.
+ virtual bool has_definition_level_histograms() const = 0;
+
+ /// \brief Whether repetition level histogram is available.
+ virtual bool has_repetition_level_histograms() const = 0;
+
+ /// \brief List of definition level histograms for each page concatenated
together.
+ virtual const std::vector<int64_t>& definition_level_histograms() const = 0;
+
+ /// \brief List of repetition level histograms for each page concatenated
together.
+ virtual const std::vector<int64_t>& repetition_level_histograms() const = 0;
};
/// \brief Typed implementation of ColumnIndex.
@@ -129,6 +139,10 @@ class PARQUET_EXPORT OffsetIndex {
/// \brief A vector of locations for each data page in this column.
virtual const std::vector<PageLocation>& page_locations() const = 0;
+
+ /// \brief A vector of unencoded/uncompressed size of each page for
BYTE_ARRAY types,
+ /// or empty for other types.
+ virtual const std::vector<int64_t>& unencoded_byte_array_data_bytes() const
= 0;
};
/// \brief Interface for reading the page index for a Parquet row group.
@@ -266,7 +280,9 @@ class PARQUET_EXPORT ColumnIndexBuilder {
/// not update statistics anymore.
///
/// \param stats Page statistics in the encoded form.
- virtual void AddPage(const EncodedStatistics& stats) = 0;
+ /// \param size_stats Size statistics of the page if available.
+ virtual void AddPage(const EncodedStatistics& stats,
+ const SizeStatistics& size_stats) = 0;
/// \brief Complete the column index.
///
@@ -299,15 +315,13 @@ class PARQUET_EXPORT OffsetIndexBuilder {
virtual ~OffsetIndexBuilder() = default;
- /// \brief Add page location of a data page.
+ /// \brief Add page location and size stats of a data page.
virtual void AddPage(int64_t offset, int32_t compressed_page_size,
- int64_t first_row_index) = 0;
+ int64_t first_row_index,
+ std::optional<int64_t> unencoded_byte_array_length =
{}) = 0;
- /// \brief Add page location of a data page.
- void AddPage(const PageLocation& page_location) {
- AddPage(page_location.offset, page_location.compressed_page_size,
- page_location.first_row_index);
- }
+ /// \brief Add page location and size stats of a data page.
+ void AddPage(const PageLocation& page_location, const SizeStatistics&
size_stats);
/// \brief Complete the offset index.
///
diff --git a/cpp/src/parquet/page_index_benchmark.cc
b/cpp/src/parquet/page_index_benchmark.cc
index 5631034105..e94fa0365d 100644
--- a/cpp/src/parquet/page_index_benchmark.cc
+++ b/cpp/src/parquet/page_index_benchmark.cc
@@ -82,7 +82,7 @@ void BM_ReadColumnIndex(::benchmark::State& state) {
GenerateBenchmarkData(values_per_page, /*seed=*/0, values.data(), &heap,
kDataStringLength);
stats->Update(values.data(), values_per_page, /*null_count=*/0);
- builder->AddPage(stats->Encode());
+ builder->AddPage(stats->Encode(), /*size_stats=*/{});
}
builder->Finish();
diff --git a/cpp/src/parquet/page_index_test.cc
b/cpp/src/parquet/page_index_test.cc
index 4db49b4267..916e28f8ce 100644
--- a/cpp/src/parquet/page_index_test.cc
+++ b/cpp/src/parquet/page_index_test.cc
@@ -419,15 +419,20 @@ TEST(PageIndex,
DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
-1);
}
-TEST(PageIndex, WriteOffsetIndex) {
+void TestWriteOffsetIndex(bool write_size_stats) {
/// Create offset index via the OffsetIndexBuilder interface.
auto builder = OffsetIndexBuilder::Make();
const size_t num_pages = 5;
const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000,
40000};
+ const std::vector<int64_t> unencoded_byte_array_lengths = {1111, 2222, 0,
3333, 4444};
for (size_t i = 0; i < num_pages; ++i) {
- builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+ auto unencoded_byte_array_length =
+ write_size_stats ? std::make_optional(unencoded_byte_array_lengths[i])
+ : std::nullopt;
+ builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i],
+ unencoded_byte_array_length);
}
const int64_t final_position = 4096;
builder->Finish(final_position);
@@ -446,23 +451,73 @@ TEST(PageIndex, WriteOffsetIndex) {
/// Verify the data of the offset index.
for (const auto& offset_index : offset_indexes) {
ASSERT_EQ(num_pages, offset_index->page_locations().size());
+ if (write_size_stats) {
+ ASSERT_EQ(num_pages,
offset_index->unencoded_byte_array_data_bytes().size());
+ } else {
+ ASSERT_TRUE(offset_index->unencoded_byte_array_data_bytes().empty());
+ }
for (size_t i = 0; i < num_pages; ++i) {
const auto& page_location = offset_index->page_locations().at(i);
ASSERT_EQ(offsets[i] + final_position, page_location.offset);
ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+ if (write_size_stats) {
+ ASSERT_EQ(unencoded_byte_array_lengths[i],
+ offset_index->unencoded_byte_array_data_bytes()[i]);
+ }
}
}
}
+TEST(PageIndex, WriteOffsetIndexWithoutSizeStats) {
+ TestWriteOffsetIndex(/*write_size_stats=*/false);
+}
+
+TEST(PageIndex, WriteOffsetIndexWithSizeStats) {
+ TestWriteOffsetIndex(/*write_size_stats=*/true);
+}
+
+struct PageLevelHistogram {
+ std::vector<int64_t> def_levels;
+ std::vector<int64_t> rep_levels;
+};
+
+std::unique_ptr<SizeStatistics> ConstructFakeSizeStatistics(
+ const ColumnDescriptor* descr, const PageLevelHistogram&
page_level_histogram) {
+ auto stats = SizeStatistics::Make(descr);
+ stats->definition_level_histogram = page_level_histogram.def_levels;
+ stats->repetition_level_histogram = page_level_histogram.rep_levels;
+ return stats;
+}
+
+void VerifyPageLevelHistogram(size_t page_id,
+ const std::vector<int64_t>& expected_page_levels,
+ const std::vector<int64_t>& all_page_levels) {
+ const size_t max_level = expected_page_levels.size() - 1;
+ const size_t offset = page_id * (max_level + 1);
+ for (size_t level = 0; level <= max_level; ++level) {
+ ASSERT_EQ(expected_page_levels[level], all_page_levels[offset + level]);
+ }
+}
+
void TestWriteTypedColumnIndex(schema::NodePtr node,
const std::vector<EncodedStatistics>&
page_stats,
- BoundaryOrder::type boundary_order, bool
has_null_counts) {
- auto descr = std::make_unique<ColumnDescriptor>(node,
/*max_definition_level=*/1, 0);
-
+ BoundaryOrder::type boundary_order, bool
has_null_counts,
+ int16_t max_definition_level = 1,
+ int16_t max_repetition_level = 0,
+ const std::vector<PageLevelHistogram>&
page_levels = {}) {
+ const bool build_size_stats = !page_levels.empty();
+ if (build_size_stats) {
+ ASSERT_EQ(page_levels.size(), page_stats.size());
+ }
+ auto descr = std::make_unique<ColumnDescriptor>(node, max_definition_level,
+ max_repetition_level);
auto builder = ColumnIndexBuilder::Make(descr.get());
- for (const auto& stats : page_stats) {
- builder->AddPage(stats);
+ for (size_t i = 0; i < page_stats.size(); ++i) {
+ auto size_stats = build_size_stats
+ ? ConstructFakeSizeStatistics(descr.get(),
page_levels[i])
+ : std::make_unique<SizeStatistics>();
+ builder->AddPage(page_stats[i], *size_stats);
}
ASSERT_NO_THROW(builder->Finish());
@@ -482,6 +537,13 @@ void TestWriteTypedColumnIndex(schema::NodePtr node,
ASSERT_EQ(boundary_order, column_index->boundary_order());
ASSERT_EQ(has_null_counts, column_index->has_null_counts());
const size_t num_pages = column_index->null_pages().size();
+ if (build_size_stats) {
+ ASSERT_EQ(num_pages * (max_repetition_level + 1),
+ column_index->repetition_level_histograms().size());
+ ASSERT_EQ(num_pages * (max_definition_level + 1),
+ column_index->definition_level_histograms().size());
+ }
+
for (size_t i = 0; i < num_pages; ++i) {
ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
@@ -489,6 +551,12 @@ void TestWriteTypedColumnIndex(schema::NodePtr node,
if (has_null_counts) {
ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
}
+ if (build_size_stats) {
+ ASSERT_NO_FATAL_FAILURE(VerifyPageLevelHistogram(
+ i, page_levels[i].def_levels,
column_index->definition_level_histograms()));
+ ASSERT_NO_FATAL_FAILURE(VerifyPageLevelHistogram(
+ i, page_levels[i].rep_levels,
column_index->repetition_level_histograms()));
+ }
}
}
}
@@ -640,7 +708,7 @@ TEST(PageIndex, WriteColumnIndexWithCorruptedStats) {
ColumnDescriptor descr(schema::Int32("c1"), /*max_definition_level=*/1, 0);
auto builder = ColumnIndexBuilder::Make(&descr);
for (const auto& stats : page_stats) {
- builder->AddPage(stats);
+ builder->AddPage(stats, SizeStatistics());
}
ASSERT_NO_THROW(builder->Finish());
ASSERT_EQ(nullptr, builder->Build());
@@ -651,6 +719,31 @@ TEST(PageIndex, WriteColumnIndexWithCorruptedStats) {
EXPECT_EQ(0, buffer->size());
}
+TEST(PageIndex, WriteInt64ColumnIndexWithSizeStats) {
+ auto encode = [=](int64_t value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+ };
+
+ // Integer values in the descending order.
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
+ page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
+ page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));
+
+ // Page level histograms.
+ std::vector<PageLevelHistogram> page_levels;
+ page_levels.push_back(
+ PageLevelHistogram{/*def_levels=*/{2, 4, 6, 8}, /*rep_levels=*/{10, 5,
5}});
+ page_levels.push_back(
+ PageLevelHistogram{/*def_levels=*/{1, 3, 5, 7}, /*rep_levels=*/{4, 8,
4}});
+ page_levels.push_back(
+ PageLevelHistogram{/*def_levels=*/{0, 2, 4, 6}, /*rep_levels=*/{3, 4,
5}});
+
+ TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats,
BoundaryOrder::Descending,
+ /*has_null_counts=*/true,
/*max_definition_level=*/3,
+ /*max_repetition_level=*/2, page_levels);
+}
+
TEST(PageIndex, TestPageIndexBuilderWithZeroRowGroup) {
schema::NodeVector fields = {schema::Int32("c1"), schema::ByteArray("c2")};
schema::NodePtr root = schema::GroupNode::Make("schema",
Repetition::REPEATED, fields);
@@ -689,14 +782,15 @@ class PageIndexBuilderTest : public ::testing::Test {
for (int column = 0; column < num_columns; ++column) {
if (static_cast<size_t>(column) < page_stats[row_group].size()) {
auto column_index_builder = builder->GetColumnIndexBuilder(column);
-
ASSERT_NO_THROW(column_index_builder->AddPage(page_stats[row_group][column]));
+ ASSERT_NO_THROW(
+ column_index_builder->AddPage(page_stats[row_group][column],
{}));
ASSERT_NO_THROW(column_index_builder->Finish());
}
if (static_cast<size_t>(column) < page_locations[row_group].size()) {
auto offset_index_builder = builder->GetOffsetIndexBuilder(column);
- ASSERT_NO_THROW(
-
offset_index_builder->AddPage(page_locations[row_group][column]));
+
ASSERT_NO_THROW(offset_index_builder->AddPage(page_locations[row_group][column],
+ /*size_stats=*/{}));
ASSERT_NO_THROW(offset_index_builder->Finish(final_position));
}
}
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index a8e4430a03..c942010396 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -47,6 +47,16 @@ namespace parquet {
/// DataPageV2 at all.
enum class ParquetDataPageVersion { V1, V2 };
+/// Controls the level of size statistics that are written to the file.
+enum class SizeStatisticsLevel : uint8_t {
+ // No size statistics are written.
+ None = 0,
+ // Only column chunk size statistics are written.
+ ColumnChunk,
+ // Both size statistics in the column chunk and page index are written.
+ PageAndColumnChunk
+};
+
/// Align the default buffer size to a small multiple of a page size.
constexpr int64_t kDefaultBufferSize = 4096 * 4;
@@ -247,7 +257,8 @@ class PARQUET_EXPORT WriterProperties {
data_page_version_(ParquetDataPageVersion::V1),
created_by_(DEFAULT_CREATED_BY),
store_decimal_as_integer_(false),
- page_checksum_enabled_(false) {}
+ page_checksum_enabled_(false),
+ size_statistics_level_(SizeStatisticsLevel::None) {}
explicit Builder(const WriterProperties& properties)
: pool_(properties.memory_pool()),
@@ -649,6 +660,16 @@ class PARQUET_EXPORT WriterProperties {
return this->disable_write_page_index(path->ToDotString());
}
+ /// \brief Set the level to write size statistics for all columns. Default
is None.
+ ///
+ /// \param level The level to write size statistics. Note that if page
index is not
+ /// enabled, page level size statistics will not be written even if the
level
+ /// is set to PageAndColumnChunk.
+ Builder* set_size_statistics_level(SizeStatisticsLevel level) {
+ size_statistics_level_ = level;
+ return this;
+ }
+
/// \brief Build the WriterProperties with the builder parameters.
/// \return The WriterProperties defined by the builder.
std::shared_ptr<WriterProperties> build() {
@@ -675,9 +696,9 @@ class PARQUET_EXPORT WriterProperties {
return std::shared_ptr<WriterProperties>(new WriterProperties(
pool_, dictionary_pagesize_limit_, write_batch_size_,
max_row_group_length_,
pagesize_, version_, created_by_, page_checksum_enabled_,
- std::move(file_encryption_properties_), default_column_properties_,
- column_properties, data_page_version_, store_decimal_as_integer_,
- std::move(sorting_columns_)));
+ size_statistics_level_, std::move(file_encryption_properties_),
+ default_column_properties_, column_properties, data_page_version_,
+ store_decimal_as_integer_, std::move(sorting_columns_)));
}
private:
@@ -691,6 +712,7 @@ class PARQUET_EXPORT WriterProperties {
std::string created_by_;
bool store_decimal_as_integer_;
bool page_checksum_enabled_;
+ SizeStatisticsLevel size_statistics_level_;
std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
@@ -729,6 +751,10 @@ class PARQUET_EXPORT WriterProperties {
inline bool page_checksum_enabled() const { return page_checksum_enabled_; }
+ inline SizeStatisticsLevel size_statistics_level() const {
+ return size_statistics_level_;
+ }
+
inline Encoding::type dictionary_index_encoding() const {
if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
return Encoding::PLAIN_DICTIONARY;
@@ -822,6 +848,7 @@ class PARQUET_EXPORT WriterProperties {
MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t
write_batch_size,
int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type
version,
const std::string& created_by, bool page_write_checksum_enabled,
+ SizeStatisticsLevel size_statistics_level,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
const std::unordered_map<std::string, ColumnProperties>&
column_properties,
@@ -837,6 +864,7 @@ class PARQUET_EXPORT WriterProperties {
parquet_created_by_(created_by),
store_decimal_as_integer_(store_short_decimal_as_integer),
page_checksum_enabled_(page_write_checksum_enabled),
+ size_statistics_level_(size_statistics_level),
file_encryption_properties_(file_encryption_properties),
sorting_columns_(std::move(sorting_columns)),
default_column_properties_(default_column_properties),
@@ -852,6 +880,7 @@ class PARQUET_EXPORT WriterProperties {
std::string parquet_created_by_;
bool store_decimal_as_integer_;
bool page_checksum_enabled_;
+ SizeStatisticsLevel size_statistics_level_;
std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
diff --git a/cpp/src/parquet/size_statistics.cc
b/cpp/src/parquet/size_statistics.cc
new file mode 100644
index 0000000000..a02cef7aba
--- /dev/null
+++ b/cpp/src/parquet/size_statistics.cc
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliancec
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/size_statistics.h"
+
+#include <algorithm>
+
+#include "arrow/util/logging.h"
+#include "parquet/exception.h"
+#include "parquet/schema.h"
+
+namespace parquet {
+
+void SizeStatistics::Merge(const SizeStatistics& other) {
+ if (repetition_level_histogram.size() !=
other.repetition_level_histogram.size()) {
+ throw ParquetException("Repetition level histogram size mismatch");
+ }
+ if (definition_level_histogram.size() !=
other.definition_level_histogram.size()) {
+ throw ParquetException("Definition level histogram size mismatch");
+ }
+ if (unencoded_byte_array_data_bytes.has_value() !=
+ other.unencoded_byte_array_data_bytes.has_value()) {
+ throw ParquetException("Unencoded byte array data bytes are not
consistent");
+ }
+ std::transform(repetition_level_histogram.begin(),
repetition_level_histogram.end(),
+ other.repetition_level_histogram.begin(),
+ repetition_level_histogram.begin(), std::plus<>());
+ std::transform(definition_level_histogram.begin(),
definition_level_histogram.end(),
+ other.definition_level_histogram.begin(),
+ definition_level_histogram.begin(), std::plus<>());
+ if (unencoded_byte_array_data_bytes.has_value()) {
+ unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes.value() +
+
other.unencoded_byte_array_data_bytes.value();
+ }
+}
+
+void SizeStatistics::IncrementUnencodedByteArrayDataBytes(int64_t value) {
+ ARROW_CHECK(unencoded_byte_array_data_bytes.has_value());
+ unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes.value() +
value;
+}
+
+void SizeStatistics::Validate(const ColumnDescriptor* descr) const {
+ if (repetition_level_histogram.size() !=
+ static_cast<size_t>(descr->max_repetition_level() + 1)) {
+ throw ParquetException("Repetition level histogram size mismatch");
+ }
+ if (definition_level_histogram.size() !=
+ static_cast<size_t>(descr->max_definition_level() + 1)) {
+ throw ParquetException("Definition level histogram size mismatch");
+ }
+ if (unencoded_byte_array_data_bytes.has_value() &&
+ descr->physical_type() != Type::BYTE_ARRAY) {
+ throw ParquetException("Unencoded byte array data bytes does not support "
+
+ TypeToString(descr->physical_type()));
+ }
+ if (!unencoded_byte_array_data_bytes.has_value() &&
+ descr->physical_type() == Type::BYTE_ARRAY) {
+ throw ParquetException("Missing unencoded byte array data bytes");
+ }
+}
+
+void SizeStatistics::Reset() {
+ repetition_level_histogram.assign(repetition_level_histogram.size(), 0);
+ definition_level_histogram.assign(definition_level_histogram.size(), 0);
+ if (unencoded_byte_array_data_bytes.has_value()) {
+ unencoded_byte_array_data_bytes = 0;
+ }
+}
+
+std::unique_ptr<SizeStatistics> SizeStatistics::Make(const ColumnDescriptor*
descr) {
+ auto size_stats = std::make_unique<SizeStatistics>();
+ size_stats->repetition_level_histogram.resize(descr->max_repetition_level()
+ 1, 0);
+ size_stats->definition_level_histogram.resize(descr->max_definition_level()
+ 1, 0);
+ if (descr->physical_type() == Type::BYTE_ARRAY) {
+ size_stats->unencoded_byte_array_data_bytes = 0;
+ }
+ return size_stats;
+}
+
+} // namespace parquet
diff --git a/cpp/src/parquet/size_statistics.h
b/cpp/src/parquet/size_statistics.h
new file mode 100644
index 0000000000..c25e70ee36
--- /dev/null
+++ b/cpp/src/parquet/size_statistics.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+
+#include "parquet/platform.h"
+#include "parquet/type_fwd.h"
+
+namespace parquet {
+
+/// A structure for capturing metadata for estimating the unencoded,
+/// uncompressed size of data written. This is useful for readers to estimate
+/// how much memory is needed to reconstruct data in their memory model and for
+/// fine-grained filter push down on nested structures (the histograms
contained
+/// in this structure can help determine the number of nulls at a particular
+/// nesting level and maximum length of lists).
+struct PARQUET_EXPORT SizeStatistics {
+ /// When present, there is expected to be one element corresponding to each
+ /// definition (i.e. size=max definition+1) where each element
+ /// represents the number of times the definition level was observed in the
+ /// data.
+ ///
+ /// This field may be omitted (a.k.a. zero-length vector) if
max_definition_level
+ /// is 0 without loss of information.
+ std::vector<int64_t> definition_level_histogram;
+
+ /// Same as definition_level_histogram except for repetition levels.
+ ///
+ /// This field may be omitted (a.k.a. zero-length vector) if
max_repetition_level
+ /// is 0 without loss of information.
+ std::vector<int64_t> repetition_level_histogram;
+
+ /// The number of physical bytes stored for BYTE_ARRAY data values assuming
+ /// no encoding. This is exclusive of the bytes needed to store the length of
+ /// each byte array. In other words, this field is equivalent to the `(size
+ /// of PLAIN-ENCODING the byte array values) - (4 bytes * number of values
+ /// written)`. To determine unencoded sizes of other types readers can use
+ /// schema information multiplied by the number of non-null and null values.
+ /// The number of null/non-null values can be inferred from the histograms
+ /// below.
+ ///
+ /// For example, if a column chunk is dictionary-encoded with dictionary
+ /// ["a", "bc", "cde"], and a data page contains the indices [0, 0, 1, 2],
+ /// then this value for that data page should be 7 (1 + 1 + 2 + 3).
+ ///
+ /// This field should only be set for types that use BYTE_ARRAY as their
+ /// physical type.
+ std::optional<int64_t> unencoded_byte_array_data_bytes;
+
+ /// \brief Check if the SizeStatistics is set.
+ bool is_set() const {
+ return !repetition_level_histogram.empty() ||
!definition_level_histogram.empty() ||
+ unencoded_byte_array_data_bytes.has_value();
+ }
+
+ /// \brief Increment the unencoded byte array data bytes.
+ void IncrementUnencodedByteArrayDataBytes(int64_t value);
+
+ /// \brief Merge two SizeStatistics.
+ /// \throws ParquetException if SizeStatistics to merge is not compatible.
+ void Merge(const SizeStatistics& other);
+
+ /// \brief Validate the SizeStatistics
+ /// \throws ParquetException if the histograms don't have the right length,
+ /// or if unencoded_byte_array_data_bytes is present for a non-BYTE_ARRAY
column.
+ void Validate(const ColumnDescriptor* descr) const;
+
+ /// \brief Reset the SizeStatistics to be empty.
+ void Reset();
+
+ /// \brief Make an empty SizeStatistics object for specific type.
+ static std::unique_ptr<SizeStatistics> Make(const ColumnDescriptor* descr);
+};
+
+} // namespace parquet
diff --git a/cpp/src/parquet/size_statistics_test.cc
b/cpp/src/parquet/size_statistics_test.cc
new file mode 100644
index 0000000000..cefd31dce2
--- /dev/null
+++ b/cpp/src/parquet/size_statistics_test.cc
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include <algorithm>
+#include <random>
+
+#include "arrow/buffer.h"
+#include "arrow/table.h"
+#include "arrow/testing/builder.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/span.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/reader_internal.h"
+#include "parquet/arrow/schema.h"
+#include "parquet/arrow/writer.h"
+#include "parquet/column_writer.h"
+#include "parquet/file_writer.h"
+#include "parquet/page_index.h"
+#include "parquet/schema.h"
+#include "parquet/size_statistics.h"
+#include "parquet/test_util.h"
+#include "parquet/thrift_internal.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+TEST(SizeStatistics, ThriftSerDe) {
+ const std::vector<int64_t> kDefLevels = {128, 64, 32, 16};
+ const std::vector<int64_t> kRepLevels = {100, 80, 60, 40, 20};
+ constexpr int64_t kUnencodedByteArrayDataBytes = 1234;
+
+ for (const auto& descr :
+ {std::make_unique<ColumnDescriptor>(schema::Int32("a"),
/*max_def_level=*/3,
+ /*max_rep_level=*/4),
+ std::make_unique<ColumnDescriptor>(schema::ByteArray("a"),
/*max_def_level=*/3,
+ /*max_rep_level=*/4)}) {
+ auto size_statistics = SizeStatistics::Make(descr.get());
+ size_statistics->repetition_level_histogram = kRepLevels;
+ size_statistics->definition_level_histogram = kDefLevels;
+ if (descr->physical_type() == Type::BYTE_ARRAY) {
+
size_statistics->IncrementUnencodedByteArrayDataBytes(kUnencodedByteArrayDataBytes);
+ }
+ auto thrift_statistics = ToThrift(*size_statistics);
+ auto restored_statistics = FromThrift(thrift_statistics);
+ EXPECT_EQ(restored_statistics.definition_level_histogram, kDefLevels);
+ EXPECT_EQ(restored_statistics.repetition_level_histogram, kRepLevels);
+ if (descr->physical_type() == Type::BYTE_ARRAY) {
+
EXPECT_TRUE(restored_statistics.unencoded_byte_array_data_bytes.has_value());
+ EXPECT_EQ(restored_statistics.unencoded_byte_array_data_bytes.value(),
+ kUnencodedByteArrayDataBytes);
+ } else {
+
EXPECT_FALSE(restored_statistics.unencoded_byte_array_data_bytes.has_value());
+ }
+ }
+}
+
+bool operator==(const SizeStatistics& lhs, const SizeStatistics& rhs) {
+ return lhs.repetition_level_histogram == rhs.repetition_level_histogram &&
+ lhs.definition_level_histogram == rhs.definition_level_histogram &&
+ lhs.unencoded_byte_array_data_bytes ==
rhs.unencoded_byte_array_data_bytes;
+}
+
+struct PageSizeStatistics {
+ std::vector<int64_t> def_levels;
+ std::vector<int64_t> rep_levels;
+ std::vector<int64_t> byte_array_bytes;
+ bool operator==(const PageSizeStatistics& other) const {
+ return def_levels == other.def_levels && rep_levels == other.rep_levels &&
+ byte_array_bytes == other.byte_array_bytes;
+ }
+};
+
+class SizeStatisticsRoundTripTest : public ::testing::Test {
+ public:
+ void WriteFile(SizeStatisticsLevel level,
+ const std::shared_ptr<::arrow::Table>& table) {
+ auto writer_properties = WriterProperties::Builder()
+ .max_row_group_length(2) /* every row group
has 2 rows */
+ ->data_pagesize(1) /* every page has 1
row */
+ ->enable_write_page_index()
+ ->enable_statistics()
+ ->set_size_statistics_level(level)
+ ->build();
+
+ // Get schema from table.
+ auto schema = table->schema();
+ std::shared_ptr<SchemaDescriptor> parquet_schema;
+ auto arrow_writer_properties = default_arrow_writer_properties();
+ ASSERT_OK_NO_THROW(arrow::ToParquetSchema(schema.get(), *writer_properties,
+ *arrow_writer_properties,
&parquet_schema));
+ auto schema_node =
+
std::static_pointer_cast<schema::GroupNode>(parquet_schema->schema_root());
+
+ // Write table to buffer.
+ auto sink = CreateOutputStream();
+ auto pool = ::arrow::default_memory_pool();
+ auto writer = ParquetFileWriter::Open(sink, schema_node,
writer_properties);
+ std::unique_ptr<arrow::FileWriter> arrow_writer;
+ ASSERT_OK(arrow::FileWriter::Make(pool, std::move(writer), schema,
+ arrow_writer_properties, &arrow_writer));
+ ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+ ASSERT_OK_NO_THROW(arrow_writer->Close());
+ ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish());
+ }
+
+ void ReadSizeStatistics() {
+ auto read_properties = default_arrow_reader_properties();
+ auto reader =
+
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_));
+
+ // Read row group size statistics in order.
+ auto metadata = reader->metadata();
+ for (int i = 0; i < metadata->num_row_groups(); ++i) {
+ auto row_group_metadata = metadata->RowGroup(i);
+ for (int j = 0; j < metadata->num_columns(); ++j) {
+ auto column_metadata = row_group_metadata->ColumnChunk(j);
+ auto size_stats = column_metadata->size_statistics();
+ row_group_stats_.push_back(size_stats ? *size_stats :
SizeStatistics{});
+ }
+ }
+
+ // Read page size statistics in order.
+ auto page_index_reader = reader->GetPageIndexReader();
+ ASSERT_NE(page_index_reader, nullptr);
+
+ for (int i = 0; i < metadata->num_row_groups(); ++i) {
+ auto row_group_index_reader = page_index_reader->RowGroup(i);
+ ASSERT_NE(row_group_index_reader, nullptr);
+
+ for (int j = 0; j < metadata->num_columns(); ++j) {
+ PageSizeStatistics page_stats;
+
+ auto column_index = row_group_index_reader->GetColumnIndex(j);
+ if (column_index != nullptr) {
+ if (column_index->has_definition_level_histograms()) {
+ page_stats.def_levels =
column_index->definition_level_histograms();
+ }
+ if (column_index->has_repetition_level_histograms()) {
+ page_stats.rep_levels =
column_index->repetition_level_histograms();
+ }
+ }
+
+ auto offset_index = row_group_index_reader->GetOffsetIndex(j);
+ if (offset_index != nullptr) {
+ page_stats.byte_array_bytes =
offset_index->unencoded_byte_array_data_bytes();
+ }
+
+ page_stats_.emplace_back(std::move(page_stats));
+ }
+ }
+ }
+
+ void Reset() {
+ buffer_.reset();
+ row_group_stats_.clear();
+ page_stats_.clear();
+ }
+
+ protected:
+ std::shared_ptr<Buffer> buffer_;
+ std::vector<SizeStatistics> row_group_stats_;
+ std::vector<PageSizeStatistics> page_stats_;
+ inline static const SizeStatistics kEmptyRowGroupStats{};
+ inline static const PageSizeStatistics kEmptyPageStats{};
+};
+
+TEST_F(SizeStatisticsRoundTripTest, EnableSizeStats) {
+ auto schema = ::arrow::schema({
+ ::arrow::field("a", ::arrow::list(::arrow::list(::arrow::int32()))),
+ ::arrow::field("b", ::arrow::list(::arrow::list(::arrow::utf8()))),
+ });
+ // First two rows are in one row group, and the other two rows are in
another row group.
+ auto table = ::arrow::TableFromJSON(schema, {R"([
+ [ [[1],[1,1],[1,1,1]], [["a"],["a","a"],["a","a","a"]] ],
+ [ [[0,1,null]], [["foo","bar",null]] ],
+ [ [], [] ],
+ [ [[],[null],null], [[],[null],null] ]
+ ])"});
+
+ for (auto size_stats_level :
+ {SizeStatisticsLevel::None, SizeStatisticsLevel::ColumnChunk,
+ SizeStatisticsLevel::PageAndColumnChunk}) {
+ WriteFile(size_stats_level, table);
+ ReadSizeStatistics();
+
+ if (size_stats_level == SizeStatisticsLevel::None) {
+ EXPECT_THAT(row_group_stats_,
+ ::testing::ElementsAre(kEmptyRowGroupStats,
kEmptyRowGroupStats,
+ kEmptyRowGroupStats,
kEmptyRowGroupStats));
+ } else {
+ EXPECT_THAT(row_group_stats_, ::testing::ElementsAre(
+ SizeStatistics{/*def_levels=*/{0, 0,
0, 0, 1, 8},
+ /*rep_levels=*/{2, 2,
5},
+
/*byte_array_bytes=*/std::nullopt},
+ SizeStatistics{/*def_levels=*/{0, 0,
0, 0, 1, 8},
+ /*rep_levels=*/{2, 2,
5},
+
/*byte_array_bytes=*/12},
+ SizeStatistics{/*def_levels=*/{0, 1,
1, 1, 1, 0},
+ /*rep_levels=*/{2, 2,
0},
+
/*byte_array_bytes=*/std::nullopt},
+ SizeStatistics{/*def_levels=*/{0, 1,
1, 1, 1, 0},
+ /*rep_levels=*/{2, 2,
0},
+
/*byte_array_bytes=*/0}));
+ }
+
+ if (size_stats_level == SizeStatisticsLevel::PageAndColumnChunk) {
+ EXPECT_THAT(
+ page_stats_,
+ ::testing::ElementsAre(
+ PageSizeStatistics{/*def_levels=*/{0, 0, 0, 0, 0, 6, 0, 0, 0, 0,
1, 2},
+ /*rep_levels=*/{1, 2, 3, 1, 0, 2},
+ /*byte_array_bytes=*/{}},
+ PageSizeStatistics{/*def_levels=*/{0, 0, 0, 0, 0, 6, 0, 0, 0, 0,
1, 2},
+ /*rep_levels=*/{1, 2, 3, 1, 0, 2},
+ /*byte_array_bytes=*/{6, 6}},
+ PageSizeStatistics{/*def_levels=*/{0, 1, 0, 0, 0, 0, 0, 0, 1, 1,
1, 0},
+ /*rep_levels=*/{1, 0, 0, 1, 2, 0},
+ /*byte_array_bytes=*/{}},
+ PageSizeStatistics{/*def_levels=*/{0, 1, 0, 0, 0, 0, 0, 0, 1, 1,
1, 0},
+ /*rep_levels=*/{1, 0, 0, 1, 2, 0},
+ /*byte_array_bytes=*/{0, 0}}));
+ } else {
+ EXPECT_THAT(page_stats_, ::testing::ElementsAre(kEmptyPageStats,
kEmptyPageStats,
+ kEmptyPageStats,
kEmptyPageStats));
+ }
+
+ Reset();
+ }
+}
+
+TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) {
+ auto schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::dictionary(::arrow::int16(),
::arrow::utf8()))});
+ WriteFile(
+ SizeStatisticsLevel::PageAndColumnChunk,
+ ::arrow::TableFromJSON(schema,
{R"([["aa"],["aaa"],[null],["a"],["aaa"],["a"]])"}));
+
+ ReadSizeStatistics();
+ EXPECT_THAT(row_group_stats_,
+ ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{0, 2},
+ /*rep_levels=*/{2},
+ /*byte_array_bytes=*/5},
+ SizeStatistics{/*def_levels=*/{1, 1},
+ /*rep_levels=*/{2},
+ /*byte_array_bytes=*/1},
+ SizeStatistics{/*def_levels=*/{0, 2},
+ /*rep_levels=*/{2},
+ /*byte_array_bytes=*/4}));
+ EXPECT_THAT(page_stats_,
+ ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{0, 2},
+ /*rep_levels=*/{2},
+
/*byte_array_bytes=*/{5}},
+ PageSizeStatistics{/*def_levels=*/{1, 1},
+ /*rep_levels=*/{2},
+
/*byte_array_bytes=*/{1}},
+ PageSizeStatistics{/*def_levels=*/{0, 2},
+ /*rep_levels=*/{2},
+
/*byte_array_bytes=*/{4}}));
+}
+
+} // namespace parquet
diff --git a/cpp/src/parquet/thrift_internal.h
b/cpp/src/parquet/thrift_internal.h
index e7bfd434c8..744af74311 100644
--- a/cpp/src/parquet/thrift_internal.h
+++ b/cpp/src/parquet/thrift_internal.h
@@ -43,6 +43,7 @@
#include "parquet/exception.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
+#include "parquet/size_statistics.h"
#include "parquet/statistics.h"
#include "parquet/types.h"
@@ -254,6 +255,14 @@ static inline SortingColumn
FromThrift(format::SortingColumn thrift_sorting_colu
return sorting_column;
}
+static inline SizeStatistics FromThrift(const format::SizeStatistics&
size_stats) {
+ return SizeStatistics{
+ size_stats.definition_level_histogram,
size_stats.repetition_level_histogram,
+ size_stats.__isset.unencoded_byte_array_data_bytes
+ ? std::make_optional(size_stats.unencoded_byte_array_data_bytes)
+ : std::nullopt};
+}
+
// ----------------------------------------------------------------------
// Convert Thrift enums from Parquet enums
@@ -383,6 +392,17 @@ static inline format::EncryptionAlgorithm
ToThrift(EncryptionAlgorithm encryptio
return encryption_algorithm;
}
+static inline format::SizeStatistics ToThrift(const SizeStatistics&
size_stats) {
+ format::SizeStatistics size_statistics;
+
size_statistics.__set_definition_level_histogram(size_stats.definition_level_histogram);
+
size_statistics.__set_repetition_level_histogram(size_stats.repetition_level_histogram);
+ if (size_stats.unencoded_byte_array_data_bytes.has_value()) {
+ size_statistics.__set_unencoded_byte_array_data_bytes(
+ size_stats.unencoded_byte_array_data_bytes.value());
+ }
+ return size_statistics;
+}
+
// ----------------------------------------------------------------------
// Thrift struct serialization / deserialization utilities
diff --git a/cpp/src/parquet/type_fwd.h b/cpp/src/parquet/type_fwd.h
index da0d0f7bde..cda0dc5a77 100644
--- a/cpp/src/parquet/type_fwd.h
+++ b/cpp/src/parquet/type_fwd.h
@@ -68,7 +68,10 @@ struct ParquetVersion {
};
};
+struct PageIndexLocation;
+
class FileMetaData;
+class FileCryptoMetaData;
class RowGroupMetaData;
class ColumnDescriptor;
@@ -82,10 +85,22 @@ class WriterPropertiesBuilder;
class ArrowWriterProperties;
class ArrowWriterPropertiesBuilder;
+class EncodedStatistics;
+class Statistics;
+struct SizeStatistics;
+
+class ColumnIndex;
+class OffsetIndex;
+
namespace arrow {
class FileWriter;
class FileReader;
} // namespace arrow
+
+namespace schema {
+class ColumnPath;
+} // namespace schema
+
} // namespace parquet