This is an automated email from the ASF dual-hosted git repository.

brycemecum pushed a commit to branch maint-19.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 36bdab582951119f97a3f3ffc7b705f84907efb5
Author: Gang Wu <[email protected]>
AuthorDate: Tue Jan 21 17:28:43 2025 +0800

    GH-45283: [C++][Parquet] Omit level histogram when max level is 0 (#45285)
    
    The level histogram of size statistics can be omitted if its max level is 
0. We haven't implemented this yet and enforces histogram size to be equal to 
`max_level + 1`. However, when reading a Parquet file with omitted level 
histogram, exception will be thrown.
    
    Omit level histogram when max level is 0.
    
    Yes, a test case has been added to reflect the change.
    
    No.
    * GitHub Issue: #45283
    
    Lead-authored-by: Gang Wu <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Gang Wu <[email protected]>
---
 cpp/src/parquet/column_writer.cc        |   9 +--
 cpp/src/parquet/size_statistics.cc      |  40 +++++++-----
 cpp/src/parquet/size_statistics_test.cc | 105 +++++++++++++++++++++++++++++---
 3 files changed, 126 insertions(+), 28 deletions(-)

diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 12cbcf20af..5525c91316 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -1606,11 +1606,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
     }
 
     auto add_levels = [](std::vector<int64_t>& level_histogram,
-                         ::arrow::util::span<const int16_t> levels) {
-      for (int16_t level : levels) {
-        ARROW_DCHECK_LT(level, static_cast<int16_t>(level_histogram.size()));
-        ++level_histogram[level];
+                         ::arrow::util::span<const int16_t> levels, int16_t 
max_level) {
+      if (max_level == 0) {
+        return;
       }
+      ARROW_DCHECK_EQ(static_cast<size_t>(max_level) + 1, 
level_histogram.size());
+      ::parquet::UpdateLevelHistogram(levels, level_histogram);
     };
 
     if (descr_->max_definition_level() > 0) {
diff --git a/cpp/src/parquet/size_statistics.cc 
b/cpp/src/parquet/size_statistics.cc
index a02cef7aba..7efc2777b5 100644
--- a/cpp/src/parquet/size_statistics.cc
+++ b/cpp/src/parquet/size_statistics.cc
@@ -54,23 +54,28 @@ void 
SizeStatistics::IncrementUnencodedByteArrayDataBytes(int64_t value) {
 }
 
 void SizeStatistics::Validate(const ColumnDescriptor* descr) const {
-  if (repetition_level_histogram.size() !=
-      static_cast<size_t>(descr->max_repetition_level() + 1)) {
-    throw ParquetException("Repetition level histogram size mismatch");
-  }
-  if (definition_level_histogram.size() !=
-      static_cast<size_t>(descr->max_definition_level() + 1)) {
-    throw ParquetException("Definition level histogram size mismatch");
-  }
+  auto validate_histogram = [](const std::vector<int64_t>& histogram, int16_t 
max_level,
+                               const std::string& name) {
+    if (histogram.empty()) {
+      // A levels histogram is always allowed to be missing.
+      return;
+    }
+    if (histogram.size() != static_cast<size_t>(max_level + 1)) {
+      std::stringstream ss;
+      ss << name << " level histogram size mismatch, size: " << 
histogram.size()
+         << ", expected: " << (max_level + 1);
+      throw ParquetException(ss.str());
+    }
+  };
+  validate_histogram(repetition_level_histogram, descr->max_repetition_level(),
+                     "Repetition");
+  validate_histogram(definition_level_histogram, descr->max_definition_level(),
+                     "Definition");
   if (unencoded_byte_array_data_bytes.has_value() &&
       descr->physical_type() != Type::BYTE_ARRAY) {
     throw ParquetException("Unencoded byte array data bytes does not support " 
+
                            TypeToString(descr->physical_type()));
   }
-  if (!unencoded_byte_array_data_bytes.has_value() &&
-      descr->physical_type() == Type::BYTE_ARRAY) {
-    throw ParquetException("Missing unencoded byte array data bytes");
-  }
 }
 
 void SizeStatistics::Reset() {
@@ -83,8 +88,15 @@ void SizeStatistics::Reset() {
 
 std::unique_ptr<SizeStatistics> SizeStatistics::Make(const ColumnDescriptor* 
descr) {
   auto size_stats = std::make_unique<SizeStatistics>();
-  size_stats->repetition_level_histogram.resize(descr->max_repetition_level() 
+ 1, 0);
-  size_stats->definition_level_histogram.resize(descr->max_definition_level() 
+ 1, 0);
+  // If the max level is 0, the level histogram can be omitted because it 
contains
+  // only single level (a.k.a. 0) and its count is equivalent to `num_values` 
of the
+  // column chunk or data page.
+  if (descr->max_repetition_level() != 0) {
+    
size_stats->repetition_level_histogram.resize(descr->max_repetition_level() + 
1, 0);
+  }
+  if (descr->max_definition_level() != 0) {
+    
size_stats->definition_level_histogram.resize(descr->max_definition_level() + 
1, 0);
+  }
   if (descr->physical_type() == Type::BYTE_ARRAY) {
     size_stats->unencoded_byte_array_data_bytes = 0;
   }
diff --git a/cpp/src/parquet/size_statistics_test.cc 
b/cpp/src/parquet/size_statistics_test.cc
index cefd31dce2..ff79e5c35a 100644
--- a/cpp/src/parquet/size_statistics_test.cc
+++ b/cpp/src/parquet/size_statistics_test.cc
@@ -168,12 +168,22 @@ class SizeStatisticsRoundTripTest : public 
::testing::Test {
     }
   }
 
-  void Reset() {
-    buffer_.reset();
-    row_group_stats_.clear();
-    page_stats_.clear();
+  void ReadData() {
+    auto reader =
+        
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_));
+    auto metadata = reader->metadata();
+    for (int i = 0; i < metadata->num_row_groups(); ++i) {
+      int64_t num_rows = metadata->RowGroup(i)->num_rows();
+      auto row_group_reader = reader->RowGroup(i);
+      for (int j = 0; j < metadata->num_columns(); ++j) {
+        auto column_reader = row_group_reader->RecordReader(j);
+        ASSERT_EQ(column_reader->ReadRecords(num_rows + 1), num_rows);
+      }
+    }
   }
 
+  void Reset() { buffer_.reset(); }
+
  protected:
   std::shared_ptr<Buffer> buffer_;
   std::vector<SizeStatistics> row_group_stats_;
@@ -256,24 +266,99 @@ TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) 
{
   ReadSizeStatistics();
   EXPECT_THAT(row_group_stats_,
               ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{0, 2},
-                                                    /*rep_levels=*/{2},
+                                                    /*rep_levels=*/{},
                                                     /*byte_array_bytes=*/5},
                                      SizeStatistics{/*def_levels=*/{1, 1},
-                                                    /*rep_levels=*/{2},
+                                                    /*rep_levels=*/{},
                                                     /*byte_array_bytes=*/1},
                                      SizeStatistics{/*def_levels=*/{0, 2},
-                                                    /*rep_levels=*/{2},
+                                                    /*rep_levels=*/{},
                                                     /*byte_array_bytes=*/4}));
   EXPECT_THAT(page_stats_,
               ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{0, 2},
-                                                        /*rep_levels=*/{2},
+                                                        /*rep_levels=*/{},
                                                         
/*byte_array_bytes=*/{5}},
                                      PageSizeStatistics{/*def_levels=*/{1, 1},
-                                                        /*rep_levels=*/{2},
+                                                        /*rep_levels=*/{},
                                                         
/*byte_array_bytes=*/{1}},
                                      PageSizeStatistics{/*def_levels=*/{0, 2},
-                                                        /*rep_levels=*/{2},
+                                                        /*rep_levels=*/{},
                                                         
/*byte_array_bytes=*/{4}}));
 }
 
+TEST_F(SizeStatisticsRoundTripTest, WritePageInBatches) {
+  // Rep/def level histograms are updates in batches of `write_batch_size` 
levels
+  // inside a single page. Exercise the logic with more than one batch per 
page.
+  auto schema = ::arrow::schema({::arrow::field("a", 
::arrow::list(::arrow::utf8()))});
+  auto table = ::arrow::TableFromJSON(schema, {R"([
+      [ [null,"a","ab"] ],
+      [ null ],
+      [ [] ],
+      [ [null,"d","de"] ],
+      [ ["g","gh",null] ],
+      [ ["j","jk",null] ]
+    ])"});
+  for (int write_batch_size : {100, 5, 4, 3, 2, 1}) {
+    ARROW_SCOPED_TRACE("write_batch_size = ", write_batch_size);
+    WriteFile(SizeStatisticsLevel::PageAndColumnChunk, table,
+              /*max_row_group_length=*/1000, /*page_size=*/1000, 
write_batch_size);
+    ReadSizeStatistics();
+    EXPECT_THAT(row_group_stats_,
+                ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{1, 1, 4, 
8},
+                                                      /*rep_levels=*/{6, 8},
+                                                      
/*byte_array_bytes=*/12}));
+    EXPECT_THAT(page_stats_,
+                ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{1, 
1, 4, 8},
+                                                          /*rep_levels=*/{6, 
8},
+                                                          
/*byte_array_bytes=*/{12}}));
+  }
+}
+
+TEST_F(SizeStatisticsRoundTripTest, LargePage) {
+  // When max_level is 1, the levels are summed in 2**30 chunks, exercise this
+  // by testing with a 90000 rows table;
+  auto schema = ::arrow::schema({::arrow::field("a", ::arrow::utf8())});
+  auto seed_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+    [ "a" ],
+    [ "bc" ],
+    [ null ]
+  ])");
+  ASSERT_OK_AND_ASSIGN(auto table, ::arrow::Table::FromRecordBatches(
+                                       ::arrow::RecordBatchVector(30000, 
seed_batch)));
+  ASSERT_OK_AND_ASSIGN(table, table->CombineChunks());
+  ASSERT_EQ(table->num_rows(), 90000);
+
+  WriteFile(SizeStatisticsLevel::PageAndColumnChunk, table,
+            /*max_row_group_length=*/1 << 30, /*page_size=*/1 << 30,
+            /*write_batch_size=*/50000);
+  ReadSizeStatistics();
+  EXPECT_THAT(row_group_stats_,
+              ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{30000, 
60000},
+                                                    /*rep_levels=*/{},
+                                                    
/*byte_array_bytes=*/90000}));
+  EXPECT_THAT(page_stats_,
+              ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{30000, 
60000},
+                                                        /*rep_levels=*/{},
+                                                        
/*byte_array_bytes=*/{90000}}));
+}
+
+TEST_F(SizeStatisticsRoundTripTest, MaxLevelZero) {
+  auto schema =
+      ::arrow::schema({::arrow::field("a", ::arrow::utf8(), 
/*nullable=*/false)});
+  WriteFile(SizeStatisticsLevel::PageAndColumnChunk,
+            ::arrow::TableFromJSON(schema, {R"([["foo"],["bar"]])"}),
+            /*max_row_group_length=*/2,
+            /*page_size=*/1024);
+  ASSERT_NO_FATAL_FAILURE(ReadSizeStatistics());
+  ASSERT_NO_FATAL_FAILURE(ReadData());
+  EXPECT_THAT(row_group_stats_,
+              ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{},
+                                                    /*rep_levels=*/{},
+                                                    /*byte_array_bytes=*/6}));
+  EXPECT_THAT(page_stats_,
+              ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{},
+                                                        /*rep_levels=*/{},
+                                                        
/*byte_array_bytes=*/{6}}));
+}
+
 }  // namespace parquet

Reply via email to