This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new f93004f23f GH-40592: [C++][Parquet] Implement SizeStatistics (#40594)
f93004f23f is described below

commit f93004f23f7cb1a641abb805b10fb845c77bb23f
Author: Gang Wu <[email protected]>
AuthorDate: Wed Dec 18 16:48:23 2024 +0800

    GH-40592: [C++][Parquet] Implement SizeStatistics (#40594)
    
    ### Rationale for this change
    
    Parquet format 2.10.0 has introduced SizeStatistics. parquet-mr has also 
implemented this: https://github.com/apache/parquet-mr/pull/1177. Now it is 
time for parquet-cpp to pick the ball.
    
    ### What changes are included in this PR?
    
    Implement reading and writing size statistics for parquet-cpp.
    
    ### Are these changes tested?
    
    Yes, a bunch of test cases have been added.
    
    ### Are there any user-facing changes?
    
    Yes, now parquet users are able to read and write size statistics.
    
    * GitHub Issue: #40592
    
    Authored-by: Gang Wu <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/util/hashing.h            |   8 +
 cpp/src/parquet/CMakeLists.txt          |   2 +
 cpp/src/parquet/column_page.h           |  22 ++-
 cpp/src/parquet/column_writer.cc        | 123 +++++++++++---
 cpp/src/parquet/encoder.cc              |  30 ++++
 cpp/src/parquet/encoding.h              |   5 +
 cpp/src/parquet/encoding_test.cc        |  28 ++++
 cpp/src/parquet/metadata.cc             |  23 +++
 cpp/src/parquet/metadata.h              |  16 +-
 cpp/src/parquet/page_index.cc           |  90 ++++++++++-
 cpp/src/parquet/page_index.h            |  36 +++--
 cpp/src/parquet/page_index_benchmark.cc |   2 +-
 cpp/src/parquet/page_index_test.cc      | 116 +++++++++++--
 cpp/src/parquet/properties.h            |  37 ++++-
 cpp/src/parquet/size_statistics.cc      |  94 +++++++++++
 cpp/src/parquet/size_statistics.h       |  92 +++++++++++
 cpp/src/parquet/size_statistics_test.cc | 279 ++++++++++++++++++++++++++++++++
 cpp/src/parquet/thrift_internal.h       |  20 +++
 cpp/src/parquet/type_fwd.h              |  15 ++
 19 files changed, 967 insertions(+), 71 deletions(-)

diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h
index 4ead1a7283..52525a83aa 100644
--- a/cpp/src/arrow/util/hashing.h
+++ b/cpp/src/arrow/util/hashing.h
@@ -843,6 +843,14 @@ class BinaryMemoTable : public MemoTable {
     }
   }
 
+  // Visit the stored value at a specific index in insertion order.
+  // The visitor function should have the signature `void(std::string_view)`
+  // or `void(const std::string_view&)`.
+  template <typename VisitFunc>
+  void VisitValue(int32_t idx, VisitFunc&& visit) const {
+    visit(binary_builder_.GetView(idx));
+  }
+
  protected:
   struct Payload {
     int32_t memo_index;
diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt
index 9c28b749e4..0a9f92cebb 100644
--- a/cpp/src/parquet/CMakeLists.txt
+++ b/cpp/src/parquet/CMakeLists.txt
@@ -181,6 +181,7 @@ set(PARQUET_SRCS
     printer.cc
     properties.cc
     schema.cc
+    size_statistics.cc
     statistics.cc
     stream_reader.cc
     stream_writer.cc
@@ -373,6 +374,7 @@ add_parquet_test(internals-test
                  metadata_test.cc
                  page_index_test.cc
                  public_api_test.cc
+                 size_statistics_test.cc
                  types_test.cc)
 
 set_source_files_properties(public_api_test.cc PROPERTIES 
SKIP_PRECOMPILE_HEADERS ON
diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h
index b389ffd98e..111265a842 100644
--- a/cpp/src/parquet/column_page.h
+++ b/cpp/src/parquet/column_page.h
@@ -26,6 +26,7 @@
 #include <optional>
 #include <string>
 
+#include "parquet/size_statistics.h"
 #include "parquet/statistics.h"
 #include "parquet/types.h"
 
@@ -69,20 +70,22 @@ class DataPage : public Page {
   /// Currently it is only present from data pages created by ColumnWriter in 
order
   /// to collect page index.
   std::optional<int64_t> first_row_index() const { return first_row_index_; }
+  const SizeStatistics& size_statistics() const { return size_statistics_; }
 
   virtual ~DataPage() = default;
 
  protected:
   DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t 
num_values,
            Encoding::type encoding, int64_t uncompressed_size,
-           EncodedStatistics statistics = EncodedStatistics(),
-           std::optional<int64_t> first_row_index = std::nullopt)
+           EncodedStatistics statistics, std::optional<int64_t> 
first_row_index,
+           SizeStatistics size_statistics)
       : Page(buffer, type),
         num_values_(num_values),
         encoding_(encoding),
         uncompressed_size_(uncompressed_size),
         statistics_(std::move(statistics)),
-        first_row_index_(std::move(first_row_index)) {}
+        first_row_index_(std::move(first_row_index)),
+        size_statistics_(std::move(size_statistics)) {}
 
   int32_t num_values_;
   Encoding::type encoding_;
@@ -90,6 +93,7 @@ class DataPage : public Page {
   EncodedStatistics statistics_;
   /// Row ordinal within the row group to the first row in the data page.
   std::optional<int64_t> first_row_index_;
+  SizeStatistics size_statistics_;
 };
 
 class DataPageV1 : public DataPage {
@@ -98,9 +102,11 @@ class DataPageV1 : public DataPage {
              Encoding::type encoding, Encoding::type definition_level_encoding,
              Encoding::type repetition_level_encoding, int64_t 
uncompressed_size,
              EncodedStatistics statistics = EncodedStatistics(),
-             std::optional<int64_t> first_row_index = std::nullopt)
+             std::optional<int64_t> first_row_index = std::nullopt,
+             SizeStatistics size_statistics = SizeStatistics())
       : DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, 
uncompressed_size,
-                 std::move(statistics), std::move(first_row_index)),
+                 std::move(statistics), std::move(first_row_index),
+                 std::move(size_statistics)),
         definition_level_encoding_(definition_level_encoding),
         repetition_level_encoding_(repetition_level_encoding) {}
 
@@ -120,9 +126,11 @@ class DataPageV2 : public DataPage {
              int32_t definition_levels_byte_length, int32_t 
repetition_levels_byte_length,
              int64_t uncompressed_size, bool is_compressed = false,
              EncodedStatistics statistics = EncodedStatistics(),
-             std::optional<int64_t> first_row_index = std::nullopt)
+             std::optional<int64_t> first_row_index = std::nullopt,
+             SizeStatistics size_statistics = SizeStatistics())
       : DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, 
uncompressed_size,
-                 std::move(statistics), std::move(first_row_index)),
+                 std::move(statistics), std::move(first_row_index),
+                 std::move(size_statistics)),
         num_nulls_(num_nulls),
         num_rows_(num_rows),
         definition_levels_byte_length_(definition_levels_byte_length),
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index d3e0fdfa81..12cbcf20af 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -55,6 +55,7 @@
 #include "parquet/platform.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
+#include "parquet/size_statistics.h"
 #include "parquet/statistics.h"
 #include "parquet/thrift_internal.h"
 #include "parquet/types.h"
@@ -437,7 +438,7 @@ class SerializedPageWriter : public PageWriter {
 
     /// Collect page index
     if (column_index_builder_ != nullptr) {
-      column_index_builder_->AddPage(page.statistics());
+      column_index_builder_->AddPage(page.statistics(), 
page.size_statistics());
     }
     if (offset_index_builder_ != nullptr) {
       const int64_t compressed_size = output_data_len + header_size;
@@ -451,8 +452,9 @@ class SerializedPageWriter : public PageWriter {
       /// start_pos is a relative offset in the buffered mode. It should be
       /// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter
       /// has flushed all data pages.
-      offset_index_builder_->AddPage(start_pos, 
static_cast<int32_t>(compressed_size),
-                                     *page.first_row_index());
+      offset_index_builder_->AddPage(
+          start_pos, static_cast<int32_t>(compressed_size), 
*page.first_row_index(),
+          page.size_statistics().unencoded_byte_array_data_bytes);
     }
 
     total_uncompressed_size_ += uncompressed_size + header_size;
@@ -774,11 +776,17 @@ class ColumnWriterImpl {
   // Serializes Dictionary Page if enabled
   virtual void WriteDictionaryPage() = 0;
 
+  // A convenience struct to combine the encoded statistics and size statistics
+  struct StatisticsPair {
+    EncodedStatistics encoded_stats;
+    SizeStatistics size_stats;
+  };
+
   // Plain-encoded statistics of the current page
-  virtual EncodedStatistics GetPageStatistics() = 0;
+  virtual StatisticsPair GetPageStatistics() = 0;
 
   // Plain-encoded statistics of the whole chunk
-  virtual EncodedStatistics GetChunkStatistics() = 0;
+  virtual StatisticsPair GetChunkStatistics() = 0;
 
   // Merges page statistics into chunk statistics, then resets the values
   virtual void ResetPageStatistics() = 0;
@@ -981,8 +989,7 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t 
definition_levels_rle_size,
   PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
   ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, 
values,
                      uncompressed_data_->mutable_data());
-
-  EncodedStatistics page_stats = GetPageStatistics();
+  auto [page_stats, page_size_stats] = GetPageStatistics();
   
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
   page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
   ResetPageStatistics();
@@ -1006,13 +1013,15 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t 
definition_levels_rle_size,
         compressed_data->CopySlice(0, compressed_data->size(), allocator_));
     std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
         compressed_data_copy, num_values, encoding_, Encoding::RLE, 
Encoding::RLE,
-        uncompressed_size, std::move(page_stats), first_row_index);
+        uncompressed_size, std::move(page_stats), first_row_index,
+        std::move(page_size_stats));
     total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
 
     data_pages_.push_back(std::move(page_ptr));
   } else {  // Eagerly write pages
     DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE, 
Encoding::RLE,
-                    uncompressed_size, std::move(page_stats), first_row_index);
+                    uncompressed_size, std::move(page_stats), first_row_index,
+                    std::move(page_size_stats));
     WriteDataPage(page);
   }
 }
@@ -1039,7 +1048,7 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t 
definition_levels_rle_size,
   ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size,
                      compressed_values, combined->mutable_data());
 
-  EncodedStatistics page_stats = GetPageStatistics();
+  auto [page_stats, page_size_stats] = GetPageStatistics();
   
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
   page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
   ResetPageStatistics();
@@ -1062,14 +1071,15 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t 
definition_levels_rle_size,
                             combined->CopySlice(0, combined->size(), 
allocator_));
     std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>(
         combined, num_values, null_count, num_rows, encoding_, 
def_levels_byte_length,
-        rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), 
page_stats,
-        first_row_index);
+        rep_levels_byte_length, uncompressed_size, pager_->has_compressor(),
+        std::move(page_stats), first_row_index, std::move(page_size_stats));
     total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
     data_pages_.push_back(std::move(page_ptr));
   } else {
     DataPageV2 page(combined, num_values, null_count, num_rows, encoding_,
                     def_levels_byte_length, rep_levels_byte_length, 
uncompressed_size,
-                    pager_->has_compressor(), page_stats, first_row_index);
+                    pager_->has_compressor(), std::move(page_stats), 
first_row_index,
+                    std::move(page_size_stats));
     WriteDataPage(page);
   }
 }
@@ -1083,7 +1093,7 @@ int64_t ColumnWriterImpl::Close() {
 
     FlushBufferedDataPages();
 
-    EncodedStatistics chunk_statistics = GetChunkStatistics();
+    auto [chunk_statistics, chunk_size_statistics] = GetChunkStatistics();
     chunk_statistics.ApplyStatSizeLimits(
         properties_->max_statistics_size(descr_->path()));
     chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
@@ -1092,6 +1102,9 @@ int64_t ColumnWriterImpl::Close() {
     if (rows_written_ > 0 && chunk_statistics.is_set()) {
       metadata_->SetStatistics(chunk_statistics);
     }
+    if (rows_written_ > 0 && chunk_size_statistics.is_set()) {
+      metadata_->SetSizeStatistics(chunk_size_statistics);
+    }
     metadata_->SetKeyValueMetadata(key_value_metadata_);
     pager_->Close(has_dictionary_, fallback_);
   }
@@ -1217,6 +1230,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
       page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
       chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
     }
+    if (properties->size_statistics_level() == 
SizeStatisticsLevel::ColumnChunk ||
+        properties->size_statistics_level() == 
SizeStatisticsLevel::PageAndColumnChunk) {
+      page_size_statistics_ = SizeStatistics::Make(descr_);
+      chunk_size_statistics_ = SizeStatistics::Make(descr_);
+    }
     pages_change_on_record_boundaries_ =
         properties->data_page_version() == ParquetDataPageVersion::V2 ||
         properties->page_index_enabled(descr_->path());
@@ -1355,15 +1373,26 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
     total_bytes_written_ += pager_->WriteDictionaryPage(page);
   }
 
-  EncodedStatistics GetPageStatistics() override {
-    EncodedStatistics result;
-    if (page_statistics_) result = page_statistics_->Encode();
+  StatisticsPair GetPageStatistics() override {
+    StatisticsPair result;
+    if (page_statistics_) {
+      result.encoded_stats = page_statistics_->Encode();
+    }
+    if (properties_->size_statistics_level() == 
SizeStatisticsLevel::PageAndColumnChunk) {
+      ARROW_DCHECK(page_size_statistics_ != nullptr);
+      result.size_stats = *page_size_statistics_;
+    }
     return result;
   }
 
-  EncodedStatistics GetChunkStatistics() override {
-    EncodedStatistics result;
-    if (chunk_statistics_) result = chunk_statistics_->Encode();
+  StatisticsPair GetChunkStatistics() override {
+    StatisticsPair result;
+    if (chunk_statistics_) {
+      result.encoded_stats = chunk_statistics_->Encode();
+    }
+    if (chunk_size_statistics_) {
+      result.size_stats = *chunk_size_statistics_;
+    }
     return result;
   }
 
@@ -1372,6 +1401,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
       chunk_statistics_->Merge(*page_statistics_);
       page_statistics_->Reset();
     }
+    if (page_size_statistics_ != nullptr) {
+      chunk_size_statistics_->Merge(*page_size_statistics_);
+      page_size_statistics_->Reset();
+    }
   }
 
   Type::type type() const override { return descr_->physical_type(); }
@@ -1425,6 +1458,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
   DictEncoder<DType>* current_dict_encoder_;
   std::shared_ptr<TypedStats> page_statistics_;
   std::shared_ptr<TypedStats> chunk_statistics_;
+  std::unique_ptr<SizeStatistics> page_size_statistics_;
+  std::shared_ptr<SizeStatistics> chunk_size_statistics_;
   bool pages_change_on_record_boundaries_;
 
   // If writing a sequence of ::arrow::DictionaryArray to the writer, we keep 
the
@@ -1467,6 +1502,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
       rows_written_ += num_values;
       num_buffered_rows_ += num_values;
     }
+
+    UpdateLevelHistogram(num_values, def_levels, rep_levels);
     return values_to_write;
   }
 
@@ -1558,6 +1595,47 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
       rows_written_ += num_levels;
       num_buffered_rows_ += num_levels;
     }
+
+    UpdateLevelHistogram(num_levels, def_levels, rep_levels);
+  }
+
+  void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels,
+                            const int16_t* rep_levels) const {
+    if (page_size_statistics_ == nullptr) {
+      return;
+    }
+
+    auto add_levels = [](std::vector<int64_t>& level_histogram,
+                         ::arrow::util::span<const int16_t> levels) {
+      for (int16_t level : levels) {
+        ARROW_DCHECK_LT(level, static_cast<int16_t>(level_histogram.size()));
+        ++level_histogram[level];
+      }
+    };
+
+    if (descr_->max_definition_level() > 0) {
+      add_levels(page_size_statistics_->definition_level_histogram,
+                 {def_levels, static_cast<size_t>(num_levels)});
+    } else {
+      page_size_statistics_->definition_level_histogram[0] += num_levels;
+    }
+
+    if (descr_->max_repetition_level() > 0) {
+      add_levels(page_size_statistics_->repetition_level_histogram,
+                 {rep_levels, static_cast<size_t>(num_levels)});
+    } else {
+      page_size_statistics_->repetition_level_histogram[0] += num_levels;
+    }
+  }
+
+  // Update the unencoded data bytes for ByteArray only per the specification.
+  void UpdateUnencodedDataBytes() const {
+    if constexpr (std::is_same_v<T, ByteArray>) {
+      if (page_size_statistics_ != nullptr) {
+        page_size_statistics_->IncrementUnencodedByteArrayDataBytes(
+            current_encoder_->ReportUnencodedDataBytes());
+      }
+    }
   }
 
   void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
@@ -1611,6 +1689,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
     if (page_statistics_ != nullptr) {
       page_statistics_->Update(values, num_values, num_nulls);
     }
+    UpdateUnencodedDataBytes();
   }
 
   /// \brief Write values with spaces and update page statistics accordingly.
@@ -1639,6 +1718,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
       page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset,
                                      num_spaced_values, num_values, num_nulls);
     }
+    UpdateUnencodedDataBytes();
   }
 };
 
@@ -1739,6 +1819,8 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
         writeable_indices,
         MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool));
     dict_encoder->PutIndices(*writeable_indices);
+    // Update unencoded byte array data size to size statistics
+    UpdateUnencodedDataBytes();
     CommitWriteAndCheckPageLimit(batch_size, batch_num_values, null_count, 
check_page);
     value_offset += batch_num_spaced_values;
   };
@@ -2219,6 +2301,7 @@ Status 
TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
       page_statistics_->IncrementNullCount(batch_size - non_null);
       page_statistics_->IncrementNumValues(non_null);
     }
+    UpdateUnencodedDataBytes();
     CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size - 
non_null,
                                  check_page);
     CheckDictionarySizeLimit();
diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc
index 89d5d44c52..f41eb9a191 100644
--- a/cpp/src/parquet/encoder.cc
+++ b/cpp/src/parquet/encoder.cc
@@ -79,6 +79,15 @@ class EncoderImpl : virtual public Encoder {
 
   MemoryPool* memory_pool() const override { return pool_; }
 
+  int64_t ReportUnencodedDataBytes() override {
+    if (descr_->physical_type() != Type::BYTE_ARRAY) {
+      throw ParquetException("ReportUnencodedDataBytes is only supported for 
BYTE_ARRAY");
+    }
+    int64_t bytes = unencoded_byte_array_data_bytes_;
+    unencoded_byte_array_data_bytes_ = 0;
+    return bytes;
+  }
+
  protected:
   // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
   const ColumnDescriptor* descr_;
@@ -87,6 +96,8 @@ class EncoderImpl : virtual public Encoder {
 
   /// Type length from descr
   const int type_length_;
+  /// Number of unencoded bytes written to the encoder. Used for ByteArray 
type only.
+  int64_t unencoded_byte_array_data_bytes_ = 0;
 };
 
 // ----------------------------------------------------------------------
@@ -132,6 +143,7 @@ class PlainEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
     DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
     sink_.UnsafeAppend(&length, sizeof(uint32_t));
     sink_.UnsafeAppend(data, static_cast<int64_t>(length));
+    unencoded_byte_array_data_bytes_ += length;
   }
 
   void Put(const ByteArray& val) {
@@ -513,6 +525,18 @@ class DictEncoderImpl : public EncoderImpl, virtual public 
DictEncoder<DType> {
                 static_cast<int32_t>(values[i + position]);
           }
         });
+
+    // Track unencoded bytes based on dictionary value type
+    if constexpr (std::is_same_v<DType, ByteArrayType>) {
+      // For ByteArray, need to look up actual lengths from dictionary
+      for (size_t idx =
+               buffer_position - static_cast<size_t>(data.length() - 
data.null_count());
+           idx < buffer_position; ++idx) {
+        memo_table_.VisitValue(buffered_indices_[idx], [&](std::string_view 
value) {
+          unencoded_byte_array_data_bytes_ += value.length();
+        });
+      }
+    }
   }
 
   void PutIndices(const ::arrow::Array& data) override {
@@ -656,6 +680,7 @@ inline void 
DictEncoderImpl<ByteArrayType>::PutByteArray(const void* ptr,
   PARQUET_THROW_NOT_OK(
       memo_table_.GetOrInsert(ptr, length, on_found, on_not_found, 
&memo_index));
   buffered_indices_.push_back(memo_index);
+  unencoded_byte_array_data_bytes_ += length;
 }
 
 template <>
@@ -1268,6 +1293,7 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl,
           }
           length_encoder_.Put({static_cast<int32_t>(view.length())}, 1);
           PARQUET_THROW_NOT_OK(sink_.Append(view.data(), view.length()));
+          unencoded_byte_array_data_bytes_ += view.size();
           return Status::OK();
         },
         []() { return Status::OK(); }));
@@ -1313,6 +1339,7 @@ void DeltaLengthByteArrayEncoder::Put(const T* src, int 
num_values) {
   for (int idx = 0; idx < num_values; idx++) {
     sink_.UnsafeAppend(src[idx].ptr, src[idx].len);
   }
+  unencoded_byte_array_data_bytes_ += total_increment_size;
 }
 
 void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values,
@@ -1444,6 +1471,8 @@ class DeltaByteArrayEncoder : public EncoderImpl, virtual 
public TypedEncoder<DT
         // Convert to ByteArray, so it can be passed to the suffix_encoder_.
         const ByteArray suffix(suffix_length, suffix_ptr);
         suffixes[j] = suffix;
+
+        unencoded_byte_array_data_bytes_ += len;
       }
       suffix_encoder_.Put(suffixes.data(), batch_size);
       prefix_length_encoder_.Put(prefix_lengths.data(), batch_size);
@@ -1488,6 +1517,7 @@ class DeltaByteArrayEncoder : public EncoderImpl, virtual 
public TypedEncoder<DT
           const ByteArray suffix(suffix_length, suffix_ptr);
           suffix_encoder_.Put(&suffix, 1);
 
+          unencoded_byte_array_data_bytes_ += len;
           return Status::OK();
         },
         []() { return Status::OK(); }));
diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h
index 5717886f10..f2d64dfc80 100644
--- a/cpp/src/parquet/encoding.h
+++ b/cpp/src/parquet/encoding.h
@@ -159,6 +159,11 @@ class Encoder {
 
   virtual void Put(const ::arrow::Array& values) = 0;
 
+  // Report the number of bytes written to the encoder since the last report.
+  // It only works for BYTE_ARRAY type and throw for other types.
+  // This call is not idempotent since it resets the internal counter.
+  virtual int64_t ReportUnencodedDataBytes() = 0;
+
   virtual MemoryPool* memory_pool() const = 0;
 };
 
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 78bf26587e..974e9ce622 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -178,6 +178,7 @@ class TestEncodingBase : public ::testing::Test {
   void SetUp() {
     descr_ = ExampleDescr<Type>();
     type_length_ = descr_->type_length();
+    unencoded_byte_array_data_bytes_ = 0;
     allocator_ = default_memory_pool();
   }
 
@@ -197,6 +198,8 @@ class TestEncodingBase : public ::testing::Test {
         draws_[nvalues * j + i] = draws_[i];
       }
     }
+
+    InitUnencodedByteArrayDataBytes();
   }
 
   virtual void CheckRoundtrip() = 0;
@@ -222,6 +225,16 @@ class TestEncodingBase : public ::testing::Test {
     }
   }
 
+  void InitUnencodedByteArrayDataBytes() {
+    // Calculate expected unencoded bytes based on type
+    if constexpr (std::is_same_v<Type, ByteArrayType>) {
+      unencoded_byte_array_data_bytes_ = 0;
+      for (int i = 0; i < num_values_; i++) {
+        unencoded_byte_array_data_bytes_ += draws_[i].len;
+      }
+    }
+  }
+
  protected:
   MemoryPool* allocator_;
 
@@ -235,6 +248,7 @@ class TestEncodingBase : public ::testing::Test {
 
   std::shared_ptr<Buffer> encode_buffer_;
   std::shared_ptr<ColumnDescriptor> descr_;
+  int64_t unencoded_byte_array_data_bytes_;  // unencoded data size for dense 
values
 };
 
 // Member variables are not visible to templated subclasses. Possibly figure
@@ -261,6 +275,10 @@ class TestPlainEncoding : public TestEncodingBase<Type> {
     auto decoder = MakeTypedDecoder<Type>(Encoding::PLAIN, descr_.get());
     encoder->Put(draws_, num_values_);
     encode_buffer_ = encoder->FlushValues();
+    if constexpr (std::is_same_v<Type, ByteArrayType>) {
+      ASSERT_EQ(encoder->ReportUnencodedDataBytes(),
+                this->unencoded_byte_array_data_bytes_);
+    }
 
     decoder->SetData(num_values_, encode_buffer_->data(),
                      static_cast<int>(encode_buffer_->size()));
@@ -346,6 +364,10 @@ class TestDictionaryEncoding : public 
TestEncodingBase<Type> {
         AllocateBuffer(default_memory_pool(), 
dict_traits->dict_encoded_size());
     dict_traits->WriteDict(dict_buffer_->mutable_data());
     std::shared_ptr<Buffer> indices = encoder->FlushValues();
+    if constexpr (std::is_same_v<Type, ByteArrayType>) {
+      ASSERT_EQ(encoder->ReportUnencodedDataBytes(),
+                this->unencoded_byte_array_data_bytes_);
+    }
 
     auto base_spaced_encoder =
         MakeEncoder(Type::type_num, Encoding::PLAIN, true, descr_.get());
@@ -1992,6 +2014,10 @@ class TestDeltaLengthByteArrayEncoding : public 
TestEncodingBase<Type> {
 
     encoder->Put(draws_, num_values_);
     encode_buffer_ = encoder->FlushValues();
+    if constexpr (std::is_same_v<Type, ByteArrayType>) {
+      ASSERT_EQ(encoder->ReportUnencodedDataBytes(),
+                this->unencoded_byte_array_data_bytes_);
+    }
 
     decoder->SetData(num_values_, encode_buffer_->data(),
                      static_cast<int>(encode_buffer_->size()));
@@ -2296,6 +2322,8 @@ class TestDeltaByteArrayEncoding : public 
TestDeltaLengthByteArrayEncoding<Type>
         draws_[nvalues * j + i] = draws_[i];
       }
     }
+
+    TestEncodingBase<Type>::InitUnencodedByteArrayDataBytes();
   }
 
   Encoding::type GetEncoding() override { return Encoding::DELTA_BYTE_ARRAY; }
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index 8f577be45b..f47c614219 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -37,6 +37,7 @@
 #include "parquet/exception.h"
 #include "parquet/schema.h"
 #include "parquet/schema_internal.h"
+#include "parquet/size_statistics.h"
 #include "parquet/thrift_internal.h"
 
 namespace parquet {
@@ -265,6 +266,11 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
                                  LoadEnumSafe(&encoding_stats.encoding),
                                  encoding_stats.count});
     }
+    if (column_metadata_->__isset.size_statistics) {
+      size_statistics_ =
+          
std::make_shared<SizeStatistics>(FromThrift(column_metadata_->size_statistics));
+      size_statistics_->Validate(descr_);
+    }
     possible_stats_ = nullptr;
     InitKeyValueMetadata();
   }
@@ -308,6 +314,10 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
     return is_stats_set() ? possible_stats_ : nullptr;
   }
 
+  inline std::shared_ptr<SizeStatistics> size_statistics() const {
+    return size_statistics_;
+  }
+
   inline Compression::type compression() const {
     return LoadEnumSafe(&column_metadata_->codec);
   }
@@ -396,6 +406,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
   const ReaderProperties properties_;
   const ApplicationVersion* writer_version_;
   std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
+  std::shared_ptr<SizeStatistics> size_statistics_;
 };
 
 std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
@@ -439,6 +450,10 @@ std::shared_ptr<Statistics> 
ColumnChunkMetaData::statistics() const {
 
 bool ColumnChunkMetaData::is_stats_set() const { return impl_->is_stats_set(); 
}
 
+std::shared_ptr<SizeStatistics> ColumnChunkMetaData::size_statistics() const {
+  return impl_->size_statistics();
+}
+
 std::optional<int64_t> ColumnChunkMetaData::bloom_filter_offset() const {
   return impl_->bloom_filter_offset();
 }
@@ -1543,6 +1558,10 @@ class 
ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
     column_chunk_->meta_data.__set_statistics(ToThrift(val));
   }
 
+  void SetSizeStatistics(const SizeStatistics& size_stats) {
+    column_chunk_->meta_data.__set_size_statistics(ToThrift(size_stats));
+  }
+
   void Finish(int64_t num_values, int64_t dictionary_page_offset,
               int64_t index_page_offset, int64_t data_page_offset,
               int64_t compressed_size, int64_t uncompressed_size, bool 
has_dictionary,
@@ -1752,6 +1771,10 @@ void ColumnChunkMetaDataBuilder::SetStatistics(const 
EncodedStatistics& result)
   impl_->SetStatistics(result);
 }
 
+void ColumnChunkMetaDataBuilder::SetSizeStatistics(const SizeStatistics& 
size_stats) {
+  impl_->SetSizeStatistics(size_stats);
+}
+
 void ColumnChunkMetaDataBuilder::SetKeyValueMetadata(
     std::shared_ptr<const KeyValueMetadata> key_value_metadata) {
   impl_->SetKeyValueMetadata(std::move(key_value_metadata));
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index dc97d816da..9a3964f7d6 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -28,23 +28,9 @@
 #include "parquet/encryption/type_fwd.h"
 #include "parquet/platform.h"
 #include "parquet/properties.h"
-#include "parquet/schema.h"
-#include "parquet/types.h"
 
 namespace parquet {
 
-class ColumnDescriptor;
-class EncodedStatistics;
-class FileCryptoMetaData;
-class Statistics;
-class SchemaDescriptor;
-
-namespace schema {
-
-class ColumnPath;
-
-}  // namespace schema
-
 using KeyValueMetadata = ::arrow::KeyValueMetadata;
 
 class PARQUET_EXPORT ApplicationVersion {
@@ -156,6 +142,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::shared_ptr<schema::ColumnPath> path_in_schema() const;
   bool is_stats_set() const;
   std::shared_ptr<Statistics> statistics() const;
+  std::shared_ptr<SizeStatistics> size_statistics() const;
 
   Compression::type compression() const;
   // Indicate if the ColumnChunk compression is supported by the current
@@ -451,6 +438,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
 
   // column metadata
   void SetStatistics(const EncodedStatistics& stats);
+  void SetSizeStatistics(const SizeStatistics& size_stats);
 
   void SetKeyValueMetadata(std::shared_ptr<const KeyValueMetadata> 
key_value_metadata);
 
diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc
index afda4c6064..8cc819f10c 100644
--- a/cpp/src/parquet/page_index.cc
+++ b/cpp/src/parquet/page_index.cc
@@ -159,6 +159,22 @@ class TypedColumnIndexImpl : public 
TypedColumnIndex<DType> {
 
   const std::vector<T>& max_values() const override { return max_values_; }
 
+  bool has_definition_level_histograms() const override {
+    return column_index_.__isset.definition_level_histograms;
+  }
+
+  bool has_repetition_level_histograms() const override {
+    return column_index_.__isset.repetition_level_histograms;
+  }
+
+  const std::vector<int64_t>& definition_level_histograms() const override {
+    return column_index_.definition_level_histograms;
+  }
+
+  const std::vector<int64_t>& repetition_level_histograms() const override {
+    return column_index_.repetition_level_histograms;
+  }
+
  private:
   /// Wrapped thrift column index.
   const format::ColumnIndex column_index_;
@@ -178,14 +194,22 @@ class OffsetIndexImpl : public OffsetIndex {
                                                 
page_location.compressed_page_size,
                                                 
page_location.first_row_index});
     }
+    if (offset_index.__isset.unencoded_byte_array_data_bytes) {
+      unencoded_byte_array_data_bytes_ = 
offset_index.unencoded_byte_array_data_bytes;
+    }
   }
 
   const std::vector<PageLocation>& page_locations() const override {
     return page_locations_;
   }
 
+  const std::vector<int64_t>& unencoded_byte_array_data_bytes() const override 
{
+    return unencoded_byte_array_data_bytes_;
+  }
+
  private:
   std::vector<PageLocation> page_locations_;
+  std::vector<int64_t> unencoded_byte_array_data_bytes_;
 };
 
 class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader {
@@ -460,7 +484,8 @@ class ColumnIndexBuilderImpl final : public 
ColumnIndexBuilder {
     column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
   }
 
-  void AddPage(const EncodedStatistics& stats) override {
+  void AddPage(const EncodedStatistics& stats,
+               const SizeStatistics& size_stats) override {
     if (state_ == BuilderState::kFinished) {
       throw ParquetException("Cannot add page to finished 
ColumnIndexBuilder.");
     } else if (state_ == BuilderState::kDiscarded) {
@@ -493,6 +518,17 @@ class ColumnIndexBuilderImpl final : public 
ColumnIndexBuilder {
       column_index_.__isset.null_counts = false;
       column_index_.null_counts.clear();
     }
+
+    if (size_stats.is_set()) {
+      const auto& page_def_level_hist = size_stats.definition_level_histogram;
+      const auto& page_ref_level_hist = size_stats.repetition_level_histogram;
+      column_index_.definition_level_histograms.insert(
+          column_index_.definition_level_histograms.end(), 
page_def_level_hist.cbegin(),
+          page_def_level_hist.cend());
+      column_index_.repetition_level_histograms.insert(
+          column_index_.repetition_level_histograms.end(), 
page_ref_level_hist.cbegin(),
+          page_ref_level_hist.cend());
+    }
   }
 
   void Finish() override {
@@ -533,6 +569,29 @@ class ColumnIndexBuilderImpl final : public 
ColumnIndexBuilder {
     /// Decide the boundary order from decoded min/max values.
     auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
     column_index_.__set_boundary_order(ToThrift(boundary_order));
+
+    // Finalize level histogram.
+    const int64_t num_pages = column_index_.null_pages.size();
+    const int64_t def_level_hist_size = 
column_index_.definition_level_histograms.size();
+    const int64_t rep_level_hist_size = 
column_index_.repetition_level_histograms.size();
+    if (def_level_hist_size != 0 &&
+        def_level_hist_size != (descr_->max_definition_level() + 1) * 
num_pages) {
+      std::stringstream ss;
+      ss << "Invalid definition level histogram size: " << def_level_hist_size
+         << ", expected: " << (descr_->max_definition_level() + 1) * num_pages;
+      throw ParquetException(ss.str());
+    }
+    if (rep_level_hist_size != 0 &&
+        rep_level_hist_size != (descr_->max_repetition_level() + 1) * 
num_pages) {
+      std::stringstream ss;
+      ss << "Invalid repetition level histogram size: " << rep_level_hist_size
+         << ", expected: " << (descr_->max_repetition_level() + 1) * num_pages;
+      throw ParquetException(ss.str());
+    }
+    column_index_.__isset.definition_level_histograms =
+        !column_index_.definition_level_histograms.empty();
+    column_index_.__isset.repetition_level_histograms =
+        !column_index_.repetition_level_histograms.empty();
   }
 
   void WriteTo(::arrow::io::OutputStream* sink, Encryptor* encryptor) const 
override {
@@ -604,8 +663,8 @@ class OffsetIndexBuilderImpl final : public 
OffsetIndexBuilder {
  public:
   OffsetIndexBuilderImpl() = default;
 
-  void AddPage(int64_t offset, int32_t compressed_page_size,
-               int64_t first_row_index) override {
+  void AddPage(int64_t offset, int32_t compressed_page_size, int64_t 
first_row_index,
+               std::optional<int64_t> unencoded_byte_array_length) override {
     if (state_ == BuilderState::kFinished) {
       throw ParquetException("Cannot add page to finished 
OffsetIndexBuilder.");
     } else if (state_ == BuilderState::kDiscarded) {
@@ -620,6 +679,10 @@ class OffsetIndexBuilderImpl final : public 
OffsetIndexBuilder {
     page_location.__set_compressed_page_size(compressed_page_size);
     page_location.__set_first_row_index(first_row_index);
     offset_index_.page_locations.emplace_back(std::move(page_location));
+    if (unencoded_byte_array_length.has_value()) {
+      offset_index_.unencoded_byte_array_data_bytes.emplace_back(
+          unencoded_byte_array_length.value());
+    }
   }
 
   void Finish(int64_t final_position) override {
@@ -636,6 +699,19 @@ class OffsetIndexBuilderImpl final : public 
OffsetIndexBuilder {
             page_location.__set_offset(page_location.offset + final_position);
           }
         }
+
+        // Finalize unencoded_byte_array_data_bytes and make sure page sizes 
match.
+        if (offset_index_.page_locations.size() ==
+            offset_index_.unencoded_byte_array_data_bytes.size()) {
+          offset_index_.__isset.unencoded_byte_array_data_bytes = true;
+        } else if (!offset_index_.unencoded_byte_array_data_bytes.empty()) {
+          std::stringstream ss;
+          ss << "Invalid count of unencoded BYTE_ARRAY data bytes: "
+             << offset_index_.unencoded_byte_array_data_bytes.size()
+             << ", expected page count: " << 
offset_index_.page_locations.size();
+          throw ParquetException(ss.str());
+        }
+
         state_ = BuilderState::kFinished;
         break;
       }
@@ -813,6 +889,14 @@ class PageIndexBuilderImpl final : public PageIndexBuilder 
{
 
 }  // namespace
 
+void OffsetIndexBuilder::AddPage(const PageLocation& page_location,
+                                 const SizeStatistics& size_stats) {
+  this->AddPage(
+      page_location.offset, page_location.compressed_page_size,
+      page_location.first_row_index,
+      size_stats.is_set() ? size_stats.unencoded_byte_array_data_bytes : 
std::nullopt);
+}
+
 RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup(
     const RowGroupMetaData& row_group_metadata, const std::vector<int32_t>& 
columns) {
   int64_t ci_start = std::numeric_limits<int64_t>::max();
diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h
index d45c59cab2..3083159783 100644
--- a/cpp/src/parquet/page_index.h
+++ b/cpp/src/parquet/page_index.h
@@ -19,6 +19,7 @@
 
 #include "arrow/io/interfaces.h"
 #include "parquet/encryption/type_fwd.h"
+#include "parquet/type_fwd.h"
 #include "parquet/types.h"
 
 #include <optional>
@@ -26,9 +27,6 @@
 
 namespace parquet {
 
-class EncodedStatistics;
-struct PageIndexLocation;
-
 /// \brief ColumnIndex is a proxy around format::ColumnIndex.
 class PARQUET_EXPORT ColumnIndex {
  public:
@@ -76,6 +74,18 @@ class PARQUET_EXPORT ColumnIndex {
 
   /// \brief A vector of page indices for non-null pages.
   virtual const std::vector<int32_t>& non_null_page_indices() const = 0;
+
+  /// \brief Whether definition level histogram is available.
+  virtual bool has_definition_level_histograms() const = 0;
+
+  /// \brief Whether repetition level histogram is available.
+  virtual bool has_repetition_level_histograms() const = 0;
+
+  /// \brief List of definition level histograms for each page concatenated 
together.
+  virtual const std::vector<int64_t>& definition_level_histograms() const = 0;
+
+  /// \brief List of repetition level histograms for each page concatenated 
together.
+  virtual const std::vector<int64_t>& repetition_level_histograms() const = 0;
 };
 
 /// \brief Typed implementation of ColumnIndex.
@@ -129,6 +139,10 @@ class PARQUET_EXPORT OffsetIndex {
 
   /// \brief A vector of locations for each data page in this column.
   virtual const std::vector<PageLocation>& page_locations() const = 0;
+
+  /// \brief A vector of unencoded/uncompressed size of each page for 
BYTE_ARRAY types,
+  /// or empty for other types.
+  virtual const std::vector<int64_t>& unencoded_byte_array_data_bytes() const 
= 0;
 };
 
 /// \brief Interface for reading the page index for a Parquet row group.
@@ -266,7 +280,9 @@ class PARQUET_EXPORT ColumnIndexBuilder {
   /// not update statistics anymore.
   ///
   /// \param stats Page statistics in the encoded form.
-  virtual void AddPage(const EncodedStatistics& stats) = 0;
+  /// \param size_stats Size statistics of the page if available.
+  virtual void AddPage(const EncodedStatistics& stats,
+                       const SizeStatistics& size_stats) = 0;
 
   /// \brief Complete the column index.
   ///
@@ -299,15 +315,13 @@ class PARQUET_EXPORT OffsetIndexBuilder {
 
   virtual ~OffsetIndexBuilder() = default;
 
-  /// \brief Add page location of a data page.
+  /// \brief Add page location and size stats of a data page.
   virtual void AddPage(int64_t offset, int32_t compressed_page_size,
-                       int64_t first_row_index) = 0;
+                       int64_t first_row_index,
+                       std::optional<int64_t> unencoded_byte_array_length = 
{}) = 0;
 
-  /// \brief Add page location of a data page.
-  void AddPage(const PageLocation& page_location) {
-    AddPage(page_location.offset, page_location.compressed_page_size,
-            page_location.first_row_index);
-  }
+  /// \brief Add page location and size stats of a data page.
+  void AddPage(const PageLocation& page_location, const SizeStatistics& 
size_stats);
 
   /// \brief Complete the offset index.
   ///
diff --git a/cpp/src/parquet/page_index_benchmark.cc 
b/cpp/src/parquet/page_index_benchmark.cc
index 5631034105..e94fa0365d 100644
--- a/cpp/src/parquet/page_index_benchmark.cc
+++ b/cpp/src/parquet/page_index_benchmark.cc
@@ -82,7 +82,7 @@ void BM_ReadColumnIndex(::benchmark::State& state) {
     GenerateBenchmarkData(values_per_page, /*seed=*/0, values.data(), &heap,
                           kDataStringLength);
     stats->Update(values.data(), values_per_page, /*null_count=*/0);
-    builder->AddPage(stats->Encode());
+    builder->AddPage(stats->Encode(), /*size_stats=*/{});
   }
 
   builder->Finish();
diff --git a/cpp/src/parquet/page_index_test.cc 
b/cpp/src/parquet/page_index_test.cc
index 4db49b4267..916e28f8ce 100644
--- a/cpp/src/parquet/page_index_test.cc
+++ b/cpp/src/parquet/page_index_test.cc
@@ -419,15 +419,20 @@ TEST(PageIndex, 
DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
                          -1);
 }
 
-TEST(PageIndex, WriteOffsetIndex) {
+void TestWriteOffsetIndex(bool write_size_stats) {
   /// Create offset index via the OffsetIndexBuilder interface.
   auto builder = OffsetIndexBuilder::Make();
   const size_t num_pages = 5;
   const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
   const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
   const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000, 
40000};
+  const std::vector<int64_t> unencoded_byte_array_lengths = {1111, 2222, 0, 
3333, 4444};
   for (size_t i = 0; i < num_pages; ++i) {
-    builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+    auto unencoded_byte_array_length =
+        write_size_stats ? std::make_optional(unencoded_byte_array_lengths[i])
+                         : std::nullopt;
+    builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i],
+                     unencoded_byte_array_length);
   }
   const int64_t final_position = 4096;
   builder->Finish(final_position);
@@ -446,23 +451,73 @@ TEST(PageIndex, WriteOffsetIndex) {
   /// Verify the data of the offset index.
   for (const auto& offset_index : offset_indexes) {
     ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    if (write_size_stats) {
+      ASSERT_EQ(num_pages, 
offset_index->unencoded_byte_array_data_bytes().size());
+    } else {
+      ASSERT_TRUE(offset_index->unencoded_byte_array_data_bytes().empty());
+    }
     for (size_t i = 0; i < num_pages; ++i) {
       const auto& page_location = offset_index->page_locations().at(i);
       ASSERT_EQ(offsets[i] + final_position, page_location.offset);
       ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
       ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+      if (write_size_stats) {
+        ASSERT_EQ(unencoded_byte_array_lengths[i],
+                  offset_index->unencoded_byte_array_data_bytes()[i]);
+      }
     }
   }
 }
 
+TEST(PageIndex, WriteOffsetIndexWithoutSizeStats) {
+  TestWriteOffsetIndex(/*write_size_stats=*/false);
+}
+
+TEST(PageIndex, WriteOffsetIndexWithSizeStats) {
+  TestWriteOffsetIndex(/*write_size_stats=*/true);
+}
+
+struct PageLevelHistogram {
+  std::vector<int64_t> def_levels;
+  std::vector<int64_t> rep_levels;
+};
+
+std::unique_ptr<SizeStatistics> ConstructFakeSizeStatistics(
+    const ColumnDescriptor* descr, const PageLevelHistogram& 
page_level_histogram) {
+  auto stats = SizeStatistics::Make(descr);
+  stats->definition_level_histogram = page_level_histogram.def_levels;
+  stats->repetition_level_histogram = page_level_histogram.rep_levels;
+  return stats;
+}
+
+void VerifyPageLevelHistogram(size_t page_id,
+                              const std::vector<int64_t>& expected_page_levels,
+                              const std::vector<int64_t>& all_page_levels) {
+  const size_t max_level = expected_page_levels.size() - 1;
+  const size_t offset = page_id * (max_level + 1);
+  for (size_t level = 0; level <= max_level; ++level) {
+    ASSERT_EQ(expected_page_levels[level], all_page_levels[offset + level]);
+  }
+}
+
 void TestWriteTypedColumnIndex(schema::NodePtr node,
                                const std::vector<EncodedStatistics>& 
page_stats,
-                               BoundaryOrder::type boundary_order, bool 
has_null_counts) {
-  auto descr = std::make_unique<ColumnDescriptor>(node, 
/*max_definition_level=*/1, 0);
-
+                               BoundaryOrder::type boundary_order, bool 
has_null_counts,
+                               int16_t max_definition_level = 1,
+                               int16_t max_repetition_level = 0,
+                               const std::vector<PageLevelHistogram>& 
page_levels = {}) {
+  const bool build_size_stats = !page_levels.empty();
+  if (build_size_stats) {
+    ASSERT_EQ(page_levels.size(), page_stats.size());
+  }
+  auto descr = std::make_unique<ColumnDescriptor>(node, max_definition_level,
+                                                  max_repetition_level);
   auto builder = ColumnIndexBuilder::Make(descr.get());
-  for (const auto& stats : page_stats) {
-    builder->AddPage(stats);
+  for (size_t i = 0; i < page_stats.size(); ++i) {
+    auto size_stats = build_size_stats
+                          ? ConstructFakeSizeStatistics(descr.get(), 
page_levels[i])
+                          : std::make_unique<SizeStatistics>();
+    builder->AddPage(page_stats[i], *size_stats);
   }
   ASSERT_NO_THROW(builder->Finish());
 
@@ -482,6 +537,13 @@ void TestWriteTypedColumnIndex(schema::NodePtr node,
     ASSERT_EQ(boundary_order, column_index->boundary_order());
     ASSERT_EQ(has_null_counts, column_index->has_null_counts());
     const size_t num_pages = column_index->null_pages().size();
+    if (build_size_stats) {
+      ASSERT_EQ(num_pages * (max_repetition_level + 1),
+                column_index->repetition_level_histograms().size());
+      ASSERT_EQ(num_pages * (max_definition_level + 1),
+                column_index->definition_level_histograms().size());
+    }
+
     for (size_t i = 0; i < num_pages; ++i) {
       ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
       ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
@@ -489,6 +551,12 @@ void TestWriteTypedColumnIndex(schema::NodePtr node,
       if (has_null_counts) {
         ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
       }
+      if (build_size_stats) {
+        ASSERT_NO_FATAL_FAILURE(VerifyPageLevelHistogram(
+            i, page_levels[i].def_levels, 
column_index->definition_level_histograms()));
+        ASSERT_NO_FATAL_FAILURE(VerifyPageLevelHistogram(
+            i, page_levels[i].rep_levels, 
column_index->repetition_level_histograms()));
+      }
     }
   }
 }
@@ -640,7 +708,7 @@ TEST(PageIndex, WriteColumnIndexWithCorruptedStats) {
   ColumnDescriptor descr(schema::Int32("c1"), /*max_definition_level=*/1, 0);
   auto builder = ColumnIndexBuilder::Make(&descr);
   for (const auto& stats : page_stats) {
-    builder->AddPage(stats);
+    builder->AddPage(stats, SizeStatistics());
   }
   ASSERT_NO_THROW(builder->Finish());
   ASSERT_EQ(nullptr, builder->Build());
@@ -651,6 +719,31 @@ TEST(PageIndex, WriteColumnIndexWithCorruptedStats) {
   EXPECT_EQ(0, buffer->size());
 }
 
+TEST(PageIndex, WriteInt64ColumnIndexWithSizeStats) {
+  auto encode = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  // Integer values in the descending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
+  page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
+  page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));
+
+  // Page level histograms.
+  std::vector<PageLevelHistogram> page_levels;
+  page_levels.push_back(
+      PageLevelHistogram{/*def_levels=*/{2, 4, 6, 8}, /*rep_levels=*/{10, 5, 
5}});
+  page_levels.push_back(
+      PageLevelHistogram{/*def_levels=*/{1, 3, 5, 7}, /*rep_levels=*/{4, 8, 
4}});
+  page_levels.push_back(
+      PageLevelHistogram{/*def_levels=*/{0, 2, 4, 6}, /*rep_levels=*/{3, 4, 
5}});
+
+  TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats, 
BoundaryOrder::Descending,
+                            /*has_null_counts=*/true, 
/*max_definition_level=*/3,
+                            /*max_repetition_level=*/2, page_levels);
+}
+
 TEST(PageIndex, TestPageIndexBuilderWithZeroRowGroup) {
   schema::NodeVector fields = {schema::Int32("c1"), schema::ByteArray("c2")};
   schema::NodePtr root = schema::GroupNode::Make("schema", 
Repetition::REPEATED, fields);
@@ -689,14 +782,15 @@ class PageIndexBuilderTest : public ::testing::Test {
       for (int column = 0; column < num_columns; ++column) {
         if (static_cast<size_t>(column) < page_stats[row_group].size()) {
           auto column_index_builder = builder->GetColumnIndexBuilder(column);
-          
ASSERT_NO_THROW(column_index_builder->AddPage(page_stats[row_group][column]));
+          ASSERT_NO_THROW(
+              column_index_builder->AddPage(page_stats[row_group][column], 
{}));
           ASSERT_NO_THROW(column_index_builder->Finish());
         }
 
         if (static_cast<size_t>(column) < page_locations[row_group].size()) {
           auto offset_index_builder = builder->GetOffsetIndexBuilder(column);
-          ASSERT_NO_THROW(
-              
offset_index_builder->AddPage(page_locations[row_group][column]));
+          
ASSERT_NO_THROW(offset_index_builder->AddPage(page_locations[row_group][column],
+                                                        /*size_stats=*/{}));
           ASSERT_NO_THROW(offset_index_builder->Finish(final_position));
         }
       }
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index a8e4430a03..c942010396 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -47,6 +47,16 @@ namespace parquet {
 /// DataPageV2 at all.
 enum class ParquetDataPageVersion { V1, V2 };
 
+/// Controls the level of size statistics that are written to the file.
+enum class SizeStatisticsLevel : uint8_t {
+  // No size statistics are written.
+  None = 0,
+  // Only column chunk size statistics are written.
+  ColumnChunk,
+  // Both size statistics in the column chunk and page index are written.
+  PageAndColumnChunk
+};
+
 /// Align the default buffer size to a small multiple of a page size.
 constexpr int64_t kDefaultBufferSize = 4096 * 4;
 
@@ -247,7 +257,8 @@ class PARQUET_EXPORT WriterProperties {
           data_page_version_(ParquetDataPageVersion::V1),
           created_by_(DEFAULT_CREATED_BY),
           store_decimal_as_integer_(false),
-          page_checksum_enabled_(false) {}
+          page_checksum_enabled_(false),
+          size_statistics_level_(SizeStatisticsLevel::None) {}
 
     explicit Builder(const WriterProperties& properties)
         : pool_(properties.memory_pool()),
@@ -649,6 +660,16 @@ class PARQUET_EXPORT WriterProperties {
       return this->disable_write_page_index(path->ToDotString());
     }
 
+    /// \brief Set the level to write size statistics for all columns. Default 
is None.
+    ///
+    /// \param level The level to write size statistics. Note that if page 
index is not
+    /// enabled, page level size statistics will not be written even if the 
level
+    /// is set to PageAndColumnChunk.
+    Builder* set_size_statistics_level(SizeStatisticsLevel level) {
+      size_statistics_level_ = level;
+      return this;
+    }
+
     /// \brief Build the WriterProperties with the builder parameters.
     /// \return The WriterProperties defined by the builder.
     std::shared_ptr<WriterProperties> build() {
@@ -675,9 +696,9 @@ class PARQUET_EXPORT WriterProperties {
       return std::shared_ptr<WriterProperties>(new WriterProperties(
           pool_, dictionary_pagesize_limit_, write_batch_size_, 
max_row_group_length_,
           pagesize_, version_, created_by_, page_checksum_enabled_,
-          std::move(file_encryption_properties_), default_column_properties_,
-          column_properties, data_page_version_, store_decimal_as_integer_,
-          std::move(sorting_columns_)));
+          size_statistics_level_, std::move(file_encryption_properties_),
+          default_column_properties_, column_properties, data_page_version_,
+          store_decimal_as_integer_, std::move(sorting_columns_)));
     }
 
    private:
@@ -691,6 +712,7 @@ class PARQUET_EXPORT WriterProperties {
     std::string created_by_;
     bool store_decimal_as_integer_;
     bool page_checksum_enabled_;
+    SizeStatisticsLevel size_statistics_level_;
 
     std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
 
@@ -729,6 +751,10 @@ class PARQUET_EXPORT WriterProperties {
 
   inline bool page_checksum_enabled() const { return page_checksum_enabled_; }
 
+  inline SizeStatisticsLevel size_statistics_level() const {
+    return size_statistics_level_;
+  }
+
   inline Encoding::type dictionary_index_encoding() const {
     if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
       return Encoding::PLAIN_DICTIONARY;
@@ -822,6 +848,7 @@ class PARQUET_EXPORT WriterProperties {
       MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t 
write_batch_size,
       int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type 
version,
       const std::string& created_by, bool page_write_checksum_enabled,
+      SizeStatisticsLevel size_statistics_level,
       std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
       const ColumnProperties& default_column_properties,
       const std::unordered_map<std::string, ColumnProperties>& 
column_properties,
@@ -837,6 +864,7 @@ class PARQUET_EXPORT WriterProperties {
         parquet_created_by_(created_by),
         store_decimal_as_integer_(store_short_decimal_as_integer),
         page_checksum_enabled_(page_write_checksum_enabled),
+        size_statistics_level_(size_statistics_level),
         file_encryption_properties_(file_encryption_properties),
         sorting_columns_(std::move(sorting_columns)),
         default_column_properties_(default_column_properties),
@@ -852,6 +880,7 @@ class PARQUET_EXPORT WriterProperties {
   std::string parquet_created_by_;
   bool store_decimal_as_integer_;
   bool page_checksum_enabled_;
+  SizeStatisticsLevel size_statistics_level_;
 
   std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
 
diff --git a/cpp/src/parquet/size_statistics.cc 
b/cpp/src/parquet/size_statistics.cc
new file mode 100644
index 0000000000..a02cef7aba
--- /dev/null
+++ b/cpp/src/parquet/size_statistics.cc
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliancec
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/size_statistics.h"
+
+#include <algorithm>
+
+#include "arrow/util/logging.h"
+#include "parquet/exception.h"
+#include "parquet/schema.h"
+
+namespace parquet {
+
+void SizeStatistics::Merge(const SizeStatistics& other) {
+  if (repetition_level_histogram.size() != 
other.repetition_level_histogram.size()) {
+    throw ParquetException("Repetition level histogram size mismatch");
+  }
+  if (definition_level_histogram.size() != 
other.definition_level_histogram.size()) {
+    throw ParquetException("Definition level histogram size mismatch");
+  }
+  if (unencoded_byte_array_data_bytes.has_value() !=
+      other.unencoded_byte_array_data_bytes.has_value()) {
+    throw ParquetException("Unencoded byte array data bytes are not 
consistent");
+  }
+  std::transform(repetition_level_histogram.begin(), 
repetition_level_histogram.end(),
+                 other.repetition_level_histogram.begin(),
+                 repetition_level_histogram.begin(), std::plus<>());
+  std::transform(definition_level_histogram.begin(), 
definition_level_histogram.end(),
+                 other.definition_level_histogram.begin(),
+                 definition_level_histogram.begin(), std::plus<>());
+  if (unencoded_byte_array_data_bytes.has_value()) {
+    unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes.value() +
+                                      
other.unencoded_byte_array_data_bytes.value();
+  }
+}
+
+void SizeStatistics::IncrementUnencodedByteArrayDataBytes(int64_t value) {
+  ARROW_CHECK(unencoded_byte_array_data_bytes.has_value());
+  unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes.value() + 
value;
+}
+
+void SizeStatistics::Validate(const ColumnDescriptor* descr) const {
+  if (repetition_level_histogram.size() !=
+      static_cast<size_t>(descr->max_repetition_level() + 1)) {
+    throw ParquetException("Repetition level histogram size mismatch");
+  }
+  if (definition_level_histogram.size() !=
+      static_cast<size_t>(descr->max_definition_level() + 1)) {
+    throw ParquetException("Definition level histogram size mismatch");
+  }
+  if (unencoded_byte_array_data_bytes.has_value() &&
+      descr->physical_type() != Type::BYTE_ARRAY) {
+    throw ParquetException("Unencoded byte array data bytes does not support " 
+
+                           TypeToString(descr->physical_type()));
+  }
+  if (!unencoded_byte_array_data_bytes.has_value() &&
+      descr->physical_type() == Type::BYTE_ARRAY) {
+    throw ParquetException("Missing unencoded byte array data bytes");
+  }
+}
+
+void SizeStatistics::Reset() {
+  repetition_level_histogram.assign(repetition_level_histogram.size(), 0);
+  definition_level_histogram.assign(definition_level_histogram.size(), 0);
+  if (unencoded_byte_array_data_bytes.has_value()) {
+    unencoded_byte_array_data_bytes = 0;
+  }
+}
+
+std::unique_ptr<SizeStatistics> SizeStatistics::Make(const ColumnDescriptor* 
descr) {
+  auto size_stats = std::make_unique<SizeStatistics>();
+  size_stats->repetition_level_histogram.resize(descr->max_repetition_level() 
+ 1, 0);
+  size_stats->definition_level_histogram.resize(descr->max_definition_level() 
+ 1, 0);
+  if (descr->physical_type() == Type::BYTE_ARRAY) {
+    size_stats->unencoded_byte_array_data_bytes = 0;
+  }
+  return size_stats;
+}
+
+}  // namespace parquet
diff --git a/cpp/src/parquet/size_statistics.h 
b/cpp/src/parquet/size_statistics.h
new file mode 100644
index 0000000000..c25e70ee36
--- /dev/null
+++ b/cpp/src/parquet/size_statistics.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+
+#include "parquet/platform.h"
+#include "parquet/type_fwd.h"
+
+namespace parquet {
+
+/// A structure for capturing metadata for estimating the unencoded,
+/// uncompressed size of data written. This is useful for readers to estimate
+/// how much memory is needed to reconstruct data in their memory model and for
+/// fine-grained filter push down on nested structures (the histograms 
contained
+/// in this structure can help determine the number of nulls at a particular
+/// nesting level and maximum length of lists).
+struct PARQUET_EXPORT SizeStatistics {
+  /// When present, there is expected to be one element corresponding to each
+  /// definition (i.e. size=max definition+1) where each element
+  /// represents the number of times the definition level was observed in the
+  /// data.
+  ///
+  /// This field may be omitted (a.k.a. zero-length vector) if 
max_definition_level
+  /// is 0 without loss of information.
+  std::vector<int64_t> definition_level_histogram;
+
+  /// Same as definition_level_histogram except for repetition levels.
+  ///
+  /// This field may be omitted (a.k.a. zero-length vector) if 
max_repetition_level
+  /// is 0 without loss of information.
+  std::vector<int64_t> repetition_level_histogram;
+
+  /// The number of physical bytes stored for BYTE_ARRAY data values assuming
+  /// no encoding. This is exclusive of the bytes needed to store the length of
+  /// each byte array. In other words, this field is equivalent to the `(size
+  /// of PLAIN-ENCODING the byte array values) - (4 bytes * number of values
+  /// written)`. To determine unencoded sizes of other types readers can use
+  /// schema information multiplied by the number of non-null and null values.
+  /// The number of null/non-null values can be inferred from the histograms
+  /// below.
+  ///
+  /// For example, if a column chunk is dictionary-encoded with dictionary
+  /// ["a", "bc", "cde"], and a data page contains the indices [0, 0, 1, 2],
+  /// then this value for that data page should be 7 (1 + 1 + 2 + 3).
+  ///
+  /// This field should only be set for types that use BYTE_ARRAY as their
+  /// physical type.
+  std::optional<int64_t> unencoded_byte_array_data_bytes;
+
+  /// \brief Check if the SizeStatistics is set.
+  bool is_set() const {
+    return !repetition_level_histogram.empty() || 
!definition_level_histogram.empty() ||
+           unencoded_byte_array_data_bytes.has_value();
+  }
+
+  /// \brief Increment the unencoded byte array data bytes.
+  void IncrementUnencodedByteArrayDataBytes(int64_t value);
+
+  /// \brief Merge two SizeStatistics.
+  /// \throws ParquetException if SizeStatistics to merge is not compatible.
+  void Merge(const SizeStatistics& other);
+
+  /// \brief Validate the SizeStatistics
+  /// \throws ParquetException if the histograms don't have the right length,
+  /// or if unencoded_byte_array_data_bytes is present for a non-BYTE_ARRAY 
column.
+  void Validate(const ColumnDescriptor* descr) const;
+
+  /// \brief Reset the SizeStatistics to be empty.
+  void Reset();
+
+  /// \brief Make an empty SizeStatistics object for specific type.
+  static std::unique_ptr<SizeStatistics> Make(const ColumnDescriptor* descr);
+};
+
+}  // namespace parquet
diff --git a/cpp/src/parquet/size_statistics_test.cc 
b/cpp/src/parquet/size_statistics_test.cc
new file mode 100644
index 0000000000..cefd31dce2
--- /dev/null
+++ b/cpp/src/parquet/size_statistics_test.cc
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include <algorithm>
+#include <random>
+
+#include "arrow/buffer.h"
+#include "arrow/table.h"
+#include "arrow/testing/builder.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/span.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/reader_internal.h"
+#include "parquet/arrow/schema.h"
+#include "parquet/arrow/writer.h"
+#include "parquet/column_writer.h"
+#include "parquet/file_writer.h"
+#include "parquet/page_index.h"
+#include "parquet/schema.h"
+#include "parquet/size_statistics.h"
+#include "parquet/test_util.h"
+#include "parquet/thrift_internal.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+TEST(SizeStatistics, ThriftSerDe) {
+  const std::vector<int64_t> kDefLevels = {128, 64, 32, 16};
+  const std::vector<int64_t> kRepLevels = {100, 80, 60, 40, 20};
+  constexpr int64_t kUnencodedByteArrayDataBytes = 1234;
+
+  for (const auto& descr :
+       {std::make_unique<ColumnDescriptor>(schema::Int32("a"), 
/*max_def_level=*/3,
+                                           /*max_rep_level=*/4),
+        std::make_unique<ColumnDescriptor>(schema::ByteArray("a"), 
/*max_def_level=*/3,
+                                           /*max_rep_level=*/4)}) {
+    auto size_statistics = SizeStatistics::Make(descr.get());
+    size_statistics->repetition_level_histogram = kRepLevels;
+    size_statistics->definition_level_histogram = kDefLevels;
+    if (descr->physical_type() == Type::BYTE_ARRAY) {
+      
size_statistics->IncrementUnencodedByteArrayDataBytes(kUnencodedByteArrayDataBytes);
+    }
+    auto thrift_statistics = ToThrift(*size_statistics);
+    auto restored_statistics = FromThrift(thrift_statistics);
+    EXPECT_EQ(restored_statistics.definition_level_histogram, kDefLevels);
+    EXPECT_EQ(restored_statistics.repetition_level_histogram, kRepLevels);
+    if (descr->physical_type() == Type::BYTE_ARRAY) {
+      
EXPECT_TRUE(restored_statistics.unencoded_byte_array_data_bytes.has_value());
+      EXPECT_EQ(restored_statistics.unencoded_byte_array_data_bytes.value(),
+                kUnencodedByteArrayDataBytes);
+    } else {
+      
EXPECT_FALSE(restored_statistics.unencoded_byte_array_data_bytes.has_value());
+    }
+  }
+}
+
+bool operator==(const SizeStatistics& lhs, const SizeStatistics& rhs) {
+  return lhs.repetition_level_histogram == rhs.repetition_level_histogram &&
+         lhs.definition_level_histogram == rhs.definition_level_histogram &&
+         lhs.unencoded_byte_array_data_bytes == 
rhs.unencoded_byte_array_data_bytes;
+}
+
+struct PageSizeStatistics {
+  std::vector<int64_t> def_levels;
+  std::vector<int64_t> rep_levels;
+  std::vector<int64_t> byte_array_bytes;
+  bool operator==(const PageSizeStatistics& other) const {
+    return def_levels == other.def_levels && rep_levels == other.rep_levels &&
+           byte_array_bytes == other.byte_array_bytes;
+  }
+};
+
+class SizeStatisticsRoundTripTest : public ::testing::Test {
+ public:
+  void WriteFile(SizeStatisticsLevel level,
+                 const std::shared_ptr<::arrow::Table>& table) {
+    auto writer_properties = WriterProperties::Builder()
+                                 .max_row_group_length(2) /* every row group 
has 2 rows */
+                                 ->data_pagesize(1)       /* every page has 1 
row */
+                                 ->enable_write_page_index()
+                                 ->enable_statistics()
+                                 ->set_size_statistics_level(level)
+                                 ->build();
+
+    // Get schema from table.
+    auto schema = table->schema();
+    std::shared_ptr<SchemaDescriptor> parquet_schema;
+    auto arrow_writer_properties = default_arrow_writer_properties();
+    ASSERT_OK_NO_THROW(arrow::ToParquetSchema(schema.get(), *writer_properties,
+                                              *arrow_writer_properties, 
&parquet_schema));
+    auto schema_node =
+        
std::static_pointer_cast<schema::GroupNode>(parquet_schema->schema_root());
+
+    // Write table to buffer.
+    auto sink = CreateOutputStream();
+    auto pool = ::arrow::default_memory_pool();
+    auto writer = ParquetFileWriter::Open(sink, schema_node, 
writer_properties);
+    std::unique_ptr<arrow::FileWriter> arrow_writer;
+    ASSERT_OK(arrow::FileWriter::Make(pool, std::move(writer), schema,
+                                      arrow_writer_properties, &arrow_writer));
+    ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+    ASSERT_OK_NO_THROW(arrow_writer->Close());
+    ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish());
+  }
+
+  void ReadSizeStatistics() {
+    auto read_properties = default_arrow_reader_properties();
+    auto reader =
+        
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_));
+
+    // Read row group size statistics in order.
+    auto metadata = reader->metadata();
+    for (int i = 0; i < metadata->num_row_groups(); ++i) {
+      auto row_group_metadata = metadata->RowGroup(i);
+      for (int j = 0; j < metadata->num_columns(); ++j) {
+        auto column_metadata = row_group_metadata->ColumnChunk(j);
+        auto size_stats = column_metadata->size_statistics();
+        row_group_stats_.push_back(size_stats ? *size_stats : 
SizeStatistics{});
+      }
+    }
+
+    // Read page size statistics in order.
+    auto page_index_reader = reader->GetPageIndexReader();
+    ASSERT_NE(page_index_reader, nullptr);
+
+    for (int i = 0; i < metadata->num_row_groups(); ++i) {
+      auto row_group_index_reader = page_index_reader->RowGroup(i);
+      ASSERT_NE(row_group_index_reader, nullptr);
+
+      for (int j = 0; j < metadata->num_columns(); ++j) {
+        PageSizeStatistics page_stats;
+
+        auto column_index = row_group_index_reader->GetColumnIndex(j);
+        if (column_index != nullptr) {
+          if (column_index->has_definition_level_histograms()) {
+            page_stats.def_levels = 
column_index->definition_level_histograms();
+          }
+          if (column_index->has_repetition_level_histograms()) {
+            page_stats.rep_levels = 
column_index->repetition_level_histograms();
+          }
+        }
+
+        auto offset_index = row_group_index_reader->GetOffsetIndex(j);
+        if (offset_index != nullptr) {
+          page_stats.byte_array_bytes = 
offset_index->unencoded_byte_array_data_bytes();
+        }
+
+        page_stats_.emplace_back(std::move(page_stats));
+      }
+    }
+  }
+
+  void Reset() {
+    buffer_.reset();
+    row_group_stats_.clear();
+    page_stats_.clear();
+  }
+
+ protected:
+  std::shared_ptr<Buffer> buffer_;
+  std::vector<SizeStatistics> row_group_stats_;
+  std::vector<PageSizeStatistics> page_stats_;
+  inline static const SizeStatistics kEmptyRowGroupStats{};
+  inline static const PageSizeStatistics kEmptyPageStats{};
+};
+
+TEST_F(SizeStatisticsRoundTripTest, EnableSizeStats) {
+  auto schema = ::arrow::schema({
+      ::arrow::field("a", ::arrow::list(::arrow::list(::arrow::int32()))),
+      ::arrow::field("b", ::arrow::list(::arrow::list(::arrow::utf8()))),
+  });
+  // First two rows are in one row group, and the other two rows are in 
another row group.
+  auto table = ::arrow::TableFromJSON(schema, {R"([
+      [ [[1],[1,1],[1,1,1]], [["a"],["a","a"],["a","a","a"]] ],
+      [ [[0,1,null]],        [["foo","bar",null]]            ],
+      [ [],                  []                              ],
+      [ [[],[null],null],    [[],[null],null]                ]
+    ])"});
+
+  for (auto size_stats_level :
+       {SizeStatisticsLevel::None, SizeStatisticsLevel::ColumnChunk,
+        SizeStatisticsLevel::PageAndColumnChunk}) {
+    WriteFile(size_stats_level, table);
+    ReadSizeStatistics();
+
+    if (size_stats_level == SizeStatisticsLevel::None) {
+      EXPECT_THAT(row_group_stats_,
+                  ::testing::ElementsAre(kEmptyRowGroupStats, 
kEmptyRowGroupStats,
+                                         kEmptyRowGroupStats, 
kEmptyRowGroupStats));
+    } else {
+      EXPECT_THAT(row_group_stats_, ::testing::ElementsAre(
+                                        SizeStatistics{/*def_levels=*/{0, 0, 
0, 0, 1, 8},
+                                                       /*rep_levels=*/{2, 2, 
5},
+                                                       
/*byte_array_bytes=*/std::nullopt},
+                                        SizeStatistics{/*def_levels=*/{0, 0, 
0, 0, 1, 8},
+                                                       /*rep_levels=*/{2, 2, 
5},
+                                                       
/*byte_array_bytes=*/12},
+                                        SizeStatistics{/*def_levels=*/{0, 1, 
1, 1, 1, 0},
+                                                       /*rep_levels=*/{2, 2, 
0},
+                                                       
/*byte_array_bytes=*/std::nullopt},
+                                        SizeStatistics{/*def_levels=*/{0, 1, 
1, 1, 1, 0},
+                                                       /*rep_levels=*/{2, 2, 
0},
+                                                       
/*byte_array_bytes=*/0}));
+    }
+
+    if (size_stats_level == SizeStatisticsLevel::PageAndColumnChunk) {
+      EXPECT_THAT(
+          page_stats_,
+          ::testing::ElementsAre(
+              PageSizeStatistics{/*def_levels=*/{0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 
1, 2},
+                                 /*rep_levels=*/{1, 2, 3, 1, 0, 2},
+                                 /*byte_array_bytes=*/{}},
+              PageSizeStatistics{/*def_levels=*/{0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 
1, 2},
+                                 /*rep_levels=*/{1, 2, 3, 1, 0, 2},
+                                 /*byte_array_bytes=*/{6, 6}},
+              PageSizeStatistics{/*def_levels=*/{0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 
1, 0},
+                                 /*rep_levels=*/{1, 0, 0, 1, 2, 0},
+                                 /*byte_array_bytes=*/{}},
+              PageSizeStatistics{/*def_levels=*/{0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 
1, 0},
+                                 /*rep_levels=*/{1, 0, 0, 1, 2, 0},
+                                 /*byte_array_bytes=*/{0, 0}}));
+    } else {
+      EXPECT_THAT(page_stats_, ::testing::ElementsAre(kEmptyPageStats, 
kEmptyPageStats,
+                                                      kEmptyPageStats, 
kEmptyPageStats));
+    }
+
+    Reset();
+  }
+}
+
+TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) {
+  auto schema = ::arrow::schema(
+      {::arrow::field("a", ::arrow::dictionary(::arrow::int16(), 
::arrow::utf8()))});
+  WriteFile(
+      SizeStatisticsLevel::PageAndColumnChunk,
+      ::arrow::TableFromJSON(schema, 
{R"([["aa"],["aaa"],[null],["a"],["aaa"],["a"]])"}));
+
+  ReadSizeStatistics();
+  EXPECT_THAT(row_group_stats_,
+              ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{0, 2},
+                                                    /*rep_levels=*/{2},
+                                                    /*byte_array_bytes=*/5},
+                                     SizeStatistics{/*def_levels=*/{1, 1},
+                                                    /*rep_levels=*/{2},
+                                                    /*byte_array_bytes=*/1},
+                                     SizeStatistics{/*def_levels=*/{0, 2},
+                                                    /*rep_levels=*/{2},
+                                                    /*byte_array_bytes=*/4}));
+  EXPECT_THAT(page_stats_,
+              ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{0, 2},
+                                                        /*rep_levels=*/{2},
+                                                        
/*byte_array_bytes=*/{5}},
+                                     PageSizeStatistics{/*def_levels=*/{1, 1},
+                                                        /*rep_levels=*/{2},
+                                                        
/*byte_array_bytes=*/{1}},
+                                     PageSizeStatistics{/*def_levels=*/{0, 2},
+                                                        /*rep_levels=*/{2},
+                                                        
/*byte_array_bytes=*/{4}}));
+}
+
+}  // namespace parquet
diff --git a/cpp/src/parquet/thrift_internal.h 
b/cpp/src/parquet/thrift_internal.h
index e7bfd434c8..744af74311 100644
--- a/cpp/src/parquet/thrift_internal.h
+++ b/cpp/src/parquet/thrift_internal.h
@@ -43,6 +43,7 @@
 #include "parquet/exception.h"
 #include "parquet/platform.h"
 #include "parquet/properties.h"
+#include "parquet/size_statistics.h"
 #include "parquet/statistics.h"
 #include "parquet/types.h"
 
@@ -254,6 +255,14 @@ static inline SortingColumn 
FromThrift(format::SortingColumn thrift_sorting_colu
   return sorting_column;
 }
 
+static inline SizeStatistics FromThrift(const format::SizeStatistics& 
size_stats) {
+  return SizeStatistics{
+      size_stats.definition_level_histogram, 
size_stats.repetition_level_histogram,
+      size_stats.__isset.unencoded_byte_array_data_bytes
+          ? std::make_optional(size_stats.unencoded_byte_array_data_bytes)
+          : std::nullopt};
+}
+
 // ----------------------------------------------------------------------
 // Convert Thrift enums from Parquet enums
 
@@ -383,6 +392,17 @@ static inline format::EncryptionAlgorithm 
ToThrift(EncryptionAlgorithm encryptio
   return encryption_algorithm;
 }
 
+static inline format::SizeStatistics ToThrift(const SizeStatistics& 
size_stats) {
+  format::SizeStatistics size_statistics;
+  
size_statistics.__set_definition_level_histogram(size_stats.definition_level_histogram);
+  
size_statistics.__set_repetition_level_histogram(size_stats.repetition_level_histogram);
+  if (size_stats.unencoded_byte_array_data_bytes.has_value()) {
+    size_statistics.__set_unencoded_byte_array_data_bytes(
+        size_stats.unencoded_byte_array_data_bytes.value());
+  }
+  return size_statistics;
+}
+
 // ----------------------------------------------------------------------
 // Thrift struct serialization / deserialization utilities
 
diff --git a/cpp/src/parquet/type_fwd.h b/cpp/src/parquet/type_fwd.h
index da0d0f7bde..cda0dc5a77 100644
--- a/cpp/src/parquet/type_fwd.h
+++ b/cpp/src/parquet/type_fwd.h
@@ -68,7 +68,10 @@ struct ParquetVersion {
   };
 };
 
+struct PageIndexLocation;
+
 class FileMetaData;
+class FileCryptoMetaData;
 class RowGroupMetaData;
 
 class ColumnDescriptor;
@@ -82,10 +85,22 @@ class WriterPropertiesBuilder;
 class ArrowWriterProperties;
 class ArrowWriterPropertiesBuilder;
 
+class EncodedStatistics;
+class Statistics;
+struct SizeStatistics;
+
+class ColumnIndex;
+class OffsetIndex;
+
 namespace arrow {
 
 class FileWriter;
 class FileReader;
 
 }  // namespace arrow
+
+namespace schema {
+class ColumnPath;
+}  // namespace schema
+
 }  // namespace parquet

Reply via email to