This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b5f56ca99 GH-45201: [C++][Parquet] Improve performance of generating 
size statistics (#45202)
2b5f56ca99 is described below

commit 2b5f56ca999678411f35862539f4f4a53b38de5a
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu Jan 9 15:57:04 2025 +0100

    GH-45201: [C++][Parquet] Improve performance of generating size statistics 
(#45202)
    
    ### Rationale for this change
    
    We found out in https://github.com/apache/arrow/pull/45085 that there is a 
non-trivial overhead when writing size statistics is enabled.
    
    ### What changes are included in this PR?
    
    Dramatically reduce overhead by speeding up def/rep levels histogram 
updates.
    
    Performance results on the author's machine:
    ```
    
------------------------------------------------------------------------------------------------------------------------------------------------
    Benchmark                                                                   
                   Time             CPU   Iterations UserCounters...
    
------------------------------------------------------------------------------------------------------------------------------------------------
    BM_WritePrimitiveColumn<SizeStatisticsLevel::None, ::arrow::Int64Type>      
             8103053 ns      8098569 ns           86 
bytes_per_second=1003.26Mi/s items_per_second=129.477M/s output_size=537.472k 
page_index_size=33
    BM_WritePrimitiveColumn<SizeStatisticsLevel::ColumnChunk, 
::arrow::Int64Type>            8153499 ns      8148492 ns           86 
bytes_per_second=997.117Mi/s items_per_second=128.683M/s output_size=537.488k 
page_index_size=33
    BM_WritePrimitiveColumn<SizeStatisticsLevel::PageAndColumnChunk, 
::arrow::Int64Type>     8212560 ns      8207754 ns           83 
bytes_per_second=989.918Mi/s items_per_second=127.754M/s output_size=537.502k 
page_index_size=47
    
    BM_WritePrimitiveColumn<SizeStatisticsLevel::None, ::arrow::StringType>     
            10405020 ns     10400775 ns           67 
bytes_per_second=444.142Mi/s items_per_second=100.817M/s output_size=848.305k 
page_index_size=34
    BM_WritePrimitiveColumn<SizeStatisticsLevel::ColumnChunk, 
::arrow::StringType>          10464784 ns     10460778 ns           66 
bytes_per_second=441.594Mi/s items_per_second=100.239M/s output_size=848.325k 
page_index_size=34
    BM_WritePrimitiveColumn<SizeStatisticsLevel::PageAndColumnChunk, 
::arrow::StringType>   10469832 ns     10465739 ns           67 
bytes_per_second=441.385Mi/s items_per_second=100.191M/s output_size=848.344k 
page_index_size=48
    
    BM_WriteListColumn<SizeStatisticsLevel::None, ::arrow::Int64Type>           
            13004962 ns     12992678 ns           52 
bytes_per_second=657.101Mi/s items_per_second=80.7052M/s output_size=617.464k 
page_index_size=34
    BM_WriteListColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::Int64Type>    
            13718352 ns     13705599 ns           50 
bytes_per_second=622.921Mi/s items_per_second=76.5071M/s output_size=617.486k 
page_index_size=34
    BM_WriteListColumn<SizeStatisticsLevel::PageAndColumnChunk, 
::arrow::Int64Type>         13845553 ns     13832138 ns           52 
bytes_per_second=617.222Mi/s items_per_second=75.8072M/s output_size=617.506k 
page_index_size=54
    
    BM_WriteListColumn<SizeStatisticsLevel::None, ::arrow::StringType>          
            15715263 ns     15702707 ns           44 
bytes_per_second=320.449Mi/s items_per_second=66.7768M/s output_size=927.326k 
page_index_size=35
    BM_WriteListColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::StringType>   
            16507328 ns     16493800 ns           43 
bytes_per_second=305.079Mi/s items_per_second=63.5739M/s output_size=927.352k 
page_index_size=35
    BM_WriteListColumn<SizeStatisticsLevel::PageAndColumnChunk, 
::arrow::StringType>        16575359 ns     16561311 ns           42 
bytes_per_second=303.836Mi/s items_per_second=63.3148M/s output_size=927.377k 
page_index_size=55
    ```
    
    Performance results without this PR:
    ```
    
------------------------------------------------------------------------------------------------------------------------------------------------
    Benchmark                                                                   
                   Time             CPU   Iterations UserCounters...
    
------------------------------------------------------------------------------------------------------------------------------------------------
    BM_WritePrimitiveColumn<SizeStatisticsLevel::None, ::arrow::Int64Type>      
             8042576 ns      8037678 ns           87 
bytes_per_second=1010.86Mi/s items_per_second=130.458M/s output_size=537.472k 
page_index_size=33
    BM_WritePrimitiveColumn<SizeStatisticsLevel::ColumnChunk, 
::arrow::Int64Type>            9576627 ns      9571279 ns           73 
bytes_per_second=848.894Mi/s items_per_second=109.554M/s output_size=537.488k 
page_index_size=33
    BM_WritePrimitiveColumn<SizeStatisticsLevel::PageAndColumnChunk, 
::arrow::Int64Type>     9570204 ns      9563595 ns           73 
bytes_per_second=849.576Mi/s items_per_second=109.642M/s output_size=537.502k 
page_index_size=47
    
    BM_WritePrimitiveColumn<SizeStatisticsLevel::None, ::arrow::StringType>     
            10165397 ns     10160868 ns           69 
bytes_per_second=454.628Mi/s items_per_second=103.197M/s output_size=848.305k 
page_index_size=34
    BM_WritePrimitiveColumn<SizeStatisticsLevel::ColumnChunk, 
::arrow::StringType>          11662568 ns     11657396 ns           60 
bytes_per_second=396.265Mi/s items_per_second=89.9494M/s output_size=848.325k 
page_index_size=34
    BM_WritePrimitiveColumn<SizeStatisticsLevel::PageAndColumnChunk, 
::arrow::StringType>   11657135 ns     11653063 ns           60 
bytes_per_second=396.412Mi/s items_per_second=89.9829M/s output_size=848.344k 
page_index_size=48
    
    BM_WriteListColumn<SizeStatisticsLevel::None, ::arrow::Int64Type>           
            13182006 ns     13168704 ns           51 
bytes_per_second=648.318Mi/s items_per_second=79.6264M/s output_size=617.464k 
page_index_size=34
    BM_WriteListColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::Int64Type>    
            16438205 ns     16421762 ns           43 
bytes_per_second=519.89Mi/s items_per_second=63.8528M/s output_size=617.486k 
page_index_size=34
    BM_WriteListColumn<SizeStatisticsLevel::PageAndColumnChunk, 
::arrow::Int64Type>         16424615 ns     16409032 ns           42 
bytes_per_second=520.293Mi/s items_per_second=63.9024M/s output_size=617.506k 
page_index_size=54
    
    BM_WriteListColumn<SizeStatisticsLevel::None, ::arrow::StringType>          
            15387808 ns     15373086 ns           46 
bytes_per_second=327.32Mi/s items_per_second=68.2086M/s output_size=927.326k 
page_index_size=35
    BM_WriteListColumn<SizeStatisticsLevel::ColumnChunk, ::arrow::StringType>   
            18319628 ns     18302938 ns           37 
bytes_per_second=274.924Mi/s items_per_second=57.29M/s output_size=927.352k 
page_index_size=35
    BM_WriteListColumn<SizeStatisticsLevel::PageAndColumnChunk, 
::arrow::StringType>        18346665 ns     18329336 ns           37 
bytes_per_second=274.528Mi/s items_per_second=57.2075M/s output_size=927.377k 
page_index_size=55
    ```
    
    ### Are these changes tested?
    
    Tested by existing tests, validated by existing benchmarks.
    
    ### Are there any user-facing changes?
    
    No.
    
    * GitHub Issue: #45201
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/parquet/column_writer.cc        |  53 ++++++-------
 cpp/src/parquet/size_statistics.cc      | 106 +++++++++++++++++++++++--
 cpp/src/parquet/size_statistics.h       |  10 +++
 cpp/src/parquet/size_statistics_test.cc | 132 ++++++++++++++++++++++++++++----
 4 files changed, 249 insertions(+), 52 deletions(-)

diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 12cbcf20af..683a5ab735 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -1468,42 +1468,43 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
   // which case we call back to the dense write path)
   std::shared_ptr<::arrow::Array> preserved_dictionary_;
 
-  int64_t WriteLevels(int64_t num_values, const int16_t* def_levels,
+  int64_t WriteLevels(int64_t num_levels, const int16_t* def_levels,
                       const int16_t* rep_levels) {
+    // Update histograms now, to maximize cache efficiency.
+    UpdateLevelHistogram(num_levels, def_levels, rep_levels);
+
     int64_t values_to_write = 0;
     // If the field is required and non-repeated, there are no definition 
levels
     if (descr_->max_definition_level() > 0) {
-      for (int64_t i = 0; i < num_values; ++i) {
+      for (int64_t i = 0; i < num_levels; ++i) {
         if (def_levels[i] == descr_->max_definition_level()) {
           ++values_to_write;
         }
       }
 
-      WriteDefinitionLevels(num_values, def_levels);
+      WriteDefinitionLevels(num_levels, def_levels);
     } else {
       // Required field, write all values
-      values_to_write = num_values;
+      values_to_write = num_levels;
     }
 
     // Not present for non-repeated fields
     if (descr_->max_repetition_level() > 0) {
       // A row could include more than one value
       // Count the occasions where we start a new row
-      for (int64_t i = 0; i < num_values; ++i) {
+      for (int64_t i = 0; i < num_levels; ++i) {
         if (rep_levels[i] == 0) {
           rows_written_++;
           num_buffered_rows_++;
         }
       }
 
-      WriteRepetitionLevels(num_values, rep_levels);
+      WriteRepetitionLevels(num_levels, rep_levels);
     } else {
       // Each value is exactly one row
-      rows_written_ += num_values;
-      num_buffered_rows_ += num_values;
+      rows_written_ += num_levels;
+      num_buffered_rows_ += num_levels;
     }
-
-    UpdateLevelHistogram(num_values, def_levels, rep_levels);
     return values_to_write;
   }
 
@@ -1575,6 +1576,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
 
   void WriteLevelsSpaced(int64_t num_levels, const int16_t* def_levels,
                          const int16_t* rep_levels) {
+    // Update histograms now, to maximize cache efficiency.
+    UpdateLevelHistogram(num_levels, def_levels, rep_levels);
+
     // If the field is required and non-repeated, there are no definition 
levels
     if (descr_->max_definition_level() > 0) {
       WriteDefinitionLevels(num_levels, def_levels);
@@ -1595,8 +1599,6 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
       rows_written_ += num_levels;
       num_buffered_rows_ += num_levels;
     }
-
-    UpdateLevelHistogram(num_levels, def_levels, rep_levels);
   }
 
   void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels,
@@ -1606,26 +1608,17 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
     }
 
     auto add_levels = [](std::vector<int64_t>& level_histogram,
-                         ::arrow::util::span<const int16_t> levels) {
-      for (int16_t level : levels) {
-        ARROW_DCHECK_LT(level, static_cast<int16_t>(level_histogram.size()));
-        ++level_histogram[level];
-      }
+                         ::arrow::util::span<const int16_t> levels, int16_t 
max_level) {
+      ARROW_DCHECK_EQ(static_cast<size_t>(max_level) + 1, 
level_histogram.size());
+      ::parquet::UpdateLevelHistogram(levels, level_histogram);
     };
 
-    if (descr_->max_definition_level() > 0) {
-      add_levels(page_size_statistics_->definition_level_histogram,
-                 {def_levels, static_cast<size_t>(num_levels)});
-    } else {
-      page_size_statistics_->definition_level_histogram[0] += num_levels;
-    }
-
-    if (descr_->max_repetition_level() > 0) {
-      add_levels(page_size_statistics_->repetition_level_histogram,
-                 {rep_levels, static_cast<size_t>(num_levels)});
-    } else {
-      page_size_statistics_->repetition_level_histogram[0] += num_levels;
-    }
+    add_levels(page_size_statistics_->definition_level_histogram,
+               {def_levels, static_cast<size_t>(num_levels)},
+               descr_->max_definition_level());
+    add_levels(page_size_statistics_->repetition_level_histogram,
+               {rep_levels, static_cast<size_t>(num_levels)},
+               descr_->max_repetition_level());
   }
 
   // Update the unencoded data bytes for ByteArray only per the specification.
diff --git a/cpp/src/parquet/size_statistics.cc 
b/cpp/src/parquet/size_statistics.cc
index a02cef7aba..7292f9222a 100644
--- a/cpp/src/parquet/size_statistics.cc
+++ b/cpp/src/parquet/size_statistics.cc
@@ -18,6 +18,9 @@
 #include "parquet/size_statistics.h"
 
 #include <algorithm>
+#include <numeric>
+#include <ostream>
+#include <string_view>
 
 #include "arrow/util/logging.h"
 #include "parquet/exception.h"
@@ -25,6 +28,17 @@
 
 namespace parquet {
 
+namespace {
+
+void MergeLevelHistogram(::arrow::util::span<int64_t> histogram,
+                         ::arrow::util::span<const int64_t> other) {
+  ARROW_DCHECK_EQ(histogram.size(), other.size());
+  std::transform(histogram.begin(), histogram.end(), other.begin(), 
histogram.begin(),
+                 std::plus<>());
+}
+
+}  // namespace
+
 void SizeStatistics::Merge(const SizeStatistics& other) {
   if (repetition_level_histogram.size() != 
other.repetition_level_histogram.size()) {
     throw ParquetException("Repetition level histogram size mismatch");
@@ -36,12 +50,8 @@ void SizeStatistics::Merge(const SizeStatistics& other) {
       other.unencoded_byte_array_data_bytes.has_value()) {
     throw ParquetException("Unencoded byte array data bytes are not 
consistent");
   }
-  std::transform(repetition_level_histogram.begin(), 
repetition_level_histogram.end(),
-                 other.repetition_level_histogram.begin(),
-                 repetition_level_histogram.begin(), std::plus<>());
-  std::transform(definition_level_histogram.begin(), 
definition_level_histogram.end(),
-                 other.definition_level_histogram.begin(),
-                 definition_level_histogram.begin(), std::plus<>());
+  MergeLevelHistogram(repetition_level_histogram, 
other.repetition_level_histogram);
+  MergeLevelHistogram(definition_level_histogram, 
other.definition_level_histogram);
   if (unencoded_byte_array_data_bytes.has_value()) {
     unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes.value() +
                                       
other.unencoded_byte_array_data_bytes.value();
@@ -91,4 +101,88 @@ std::unique_ptr<SizeStatistics> SizeStatistics::Make(const 
ColumnDescriptor* des
   return size_stats;
 }
 
+std::ostream& operator<<(std::ostream& os, const SizeStatistics& size_stats) {
+  constexpr std::string_view kComma = ", ";
+  os << "SizeStatistics{";
+  std::string_view sep = "";
+  if (size_stats.unencoded_byte_array_data_bytes.has_value()) {
+    os << "unencoded_byte_array_data_bytes="
+       << *size_stats.unencoded_byte_array_data_bytes;
+    sep = kComma;
+  }
+  auto print_histogram = [&](std::string_view name,
+                             const std::vector<int64_t>& histogram) {
+    if (!histogram.empty()) {
+      os << sep << name << "={";
+      sep = kComma;
+      std::string_view value_sep = "";
+      for (int64_t v : histogram) {
+        os << value_sep << v;
+        value_sep = kComma;
+      }
+      os << "}";
+    }
+  };
+  print_histogram("repetition_level_histogram", 
size_stats.repetition_level_histogram);
+  print_histogram("definition_level_histogram", 
size_stats.definition_level_histogram);
+  os << "}";
+  return os;
+}
+
+void UpdateLevelHistogram(::arrow::util::span<const int16_t> levels,
+                          ::arrow::util::span<int64_t> histogram) {
+  const int64_t num_levels = static_cast<int64_t>(levels.size());
+  DCHECK_GE(histogram.size(), 1);
+  const int16_t max_level = static_cast<int16_t>(histogram.size() - 1);
+  if (max_level == 0) {
+    histogram[0] += num_levels;
+    return;
+  }
+
+#ifndef NDEBUG
+  for (auto level : levels) {
+    ARROW_DCHECK_LE(level, max_level);
+  }
+#endif
+
+  if (max_level == 1) {
+    // Specialize the common case for non-repeated non-nested columns.
+    // Summing the levels gives us the number of 1s, and the number of 0s 
follows.
+    // We do repeated sums in the int16_t space, which the compiler is likely
+    // to vectorize efficiently.
+    constexpr int64_t kChunkSize = 1 << 14;  // to avoid int16_t overflows
+    int64_t hist1 = 0;
+    auto it = levels.begin();
+    while (it != levels.end()) {
+      const auto chunk_size = std::min<int64_t>(levels.end() - it, kChunkSize);
+      hist1 += std::accumulate(levels.begin(), levels.begin() + chunk_size, 
int16_t{0});
+      it += chunk_size;
+    }
+    histogram[0] += num_levels - hist1;
+    histogram[1] += hist1;
+    return;
+  }
+
+  // The generic implementation issues a series of histogram load-stores.
+  // However, it limits store-to-load dependencies by interleaving partial 
histogram
+  // updates.
+  constexpr int kUnroll = 4;
+  std::array<std::vector<int64_t>, kUnroll> partial_hist;
+  for (auto& hist : partial_hist) {
+    hist.assign(histogram.size(), 0);
+  }
+  int64_t i = 0;
+  for (; i <= num_levels - kUnroll; i += kUnroll) {
+    for (int j = 0; j < kUnroll; ++j) {
+      ++partial_hist[j][levels[i + j]];
+    }
+  }
+  for (; i < num_levels; ++i) {
+    ++partial_hist[0][levels[i]];
+  }
+  for (const auto& hist : partial_hist) {
+    MergeLevelHistogram(histogram, hist);
+  }
+}
+
 }  // namespace parquet
diff --git a/cpp/src/parquet/size_statistics.h 
b/cpp/src/parquet/size_statistics.h
index c25e70ee36..ec79b8c4f8 100644
--- a/cpp/src/parquet/size_statistics.h
+++ b/cpp/src/parquet/size_statistics.h
@@ -17,9 +17,12 @@
 
 #pragma once
 
+#include <cstdint>
+#include <iosfwd>
 #include <optional>
 #include <vector>
 
+#include "arrow/util/span.h"
 #include "parquet/platform.h"
 #include "parquet/type_fwd.h"
 
@@ -89,4 +92,11 @@ struct PARQUET_EXPORT SizeStatistics {
   static std::unique_ptr<SizeStatistics> Make(const ColumnDescriptor* descr);
 };
 
+PARQUET_EXPORT
+std::ostream& operator<<(std::ostream&, const SizeStatistics&);
+
+PARQUET_EXPORT
+void UpdateLevelHistogram(::arrow::util::span<const int16_t> levels,
+                          ::arrow::util::span<int64_t> histogram);
+
 }  // namespace parquet
diff --git a/cpp/src/parquet/size_statistics_test.cc 
b/cpp/src/parquet/size_statistics_test.cc
index cefd31dce2..0958ae4dec 100644
--- a/cpp/src/parquet/size_statistics_test.cc
+++ b/cpp/src/parquet/size_statistics_test.cc
@@ -19,16 +19,14 @@
 #include "gtest/gtest.h"
 
 #include <algorithm>
+#include <ostream>
 #include <random>
 
 #include "arrow/buffer.h"
 #include "arrow/table.h"
-#include "arrow/testing/builder.h"
 #include "arrow/testing/gtest_util.h"
-#include "arrow/util/bit_util.h"
 #include "arrow/util/span.h"
 #include "parquet/arrow/reader.h"
-#include "parquet/arrow/reader_internal.h"
 #include "parquet/arrow/schema.h"
 #include "parquet/arrow/writer.h"
 #include "parquet/column_writer.h"
@@ -42,6 +40,29 @@
 
 namespace parquet {
 
+TEST(SizeStatistics, UpdateLevelHistogram) {
+  {
+    // max_level = 1
+    std::vector<int64_t> histogram(2, 0);
+    UpdateLevelHistogram(std::vector<int16_t>{0, 1, 1, 1, 0}, histogram);
+    EXPECT_THAT(histogram, ::testing::ElementsAre(2, 3));
+    UpdateLevelHistogram(std::vector<int16_t>{1, 1, 0}, histogram);
+    EXPECT_THAT(histogram, ::testing::ElementsAre(3, 5));
+    UpdateLevelHistogram(std::vector<int16_t>{}, histogram);
+    EXPECT_THAT(histogram, ::testing::ElementsAre(3, 5));
+  }
+  {
+    // max_level > 1
+    std::vector<int64_t> histogram(3, 0);
+    UpdateLevelHistogram(std::vector<int16_t>{0, 1, 2, 2, 0}, histogram);
+    EXPECT_THAT(histogram, ::testing::ElementsAre(2, 1, 2));
+    UpdateLevelHistogram(std::vector<int16_t>{1, 1, 0}, histogram);
+    EXPECT_THAT(histogram, ::testing::ElementsAre(3, 3, 2));
+    UpdateLevelHistogram(std::vector<int16_t>{}, histogram);
+    EXPECT_THAT(histogram, ::testing::ElementsAre(3, 3, 2));
+  }
+}
+
 TEST(SizeStatistics, ThriftSerDe) {
   const std::vector<int64_t> kDefLevels = {128, 64, 32, 16};
   const std::vector<int64_t> kRepLevels = {100, 80, 60, 40, 20};
@@ -88,13 +109,38 @@ struct PageSizeStatistics {
   }
 };
 
+std::ostream& operator<<(std::ostream& os, const PageSizeStatistics& 
page_stats) {
+  constexpr std::string_view kComma = ", ";
+  os << "PageSizeStatistics{";
+  std::string_view sep = "";
+  auto print_vector = [&](std::string_view name, const std::vector<int64_t>& 
values) {
+    if (!values.empty()) {
+      os << sep << name << "={";
+      sep = kComma;
+      std::string_view value_sep = "";
+      for (int64_t v : values) {
+        os << value_sep << v;
+        value_sep = kComma;
+      }
+      os << "}";
+    }
+  };
+  print_vector("def_levels", page_stats.def_levels);
+  print_vector("rep_levels", page_stats.rep_levels);
+  print_vector("byte_array_bytes", page_stats.byte_array_bytes);
+  os << "}";
+  return os;
+}
+
 class SizeStatisticsRoundTripTest : public ::testing::Test {
  public:
-  void WriteFile(SizeStatisticsLevel level,
-                 const std::shared_ptr<::arrow::Table>& table) {
+  void WriteFile(SizeStatisticsLevel level, const 
std::shared_ptr<::arrow::Table>& table,
+                 int max_row_group_length, int page_size,
+                 int write_batch_size = DEFAULT_WRITE_BATCH_SIZE) {
     auto writer_properties = WriterProperties::Builder()
-                                 .max_row_group_length(2) /* every row group 
has 2 rows */
-                                 ->data_pagesize(1)       /* every page has 1 
row */
+                                 .max_row_group_length(max_row_group_length)
+                                 ->data_pagesize(page_size)
+                                 ->write_batch_size(write_batch_size)
                                  ->enable_write_page_index()
                                  ->enable_statistics()
                                  ->set_size_statistics_level(level)
@@ -127,6 +173,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test {
         
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_));
 
     // Read row group size statistics in order.
+    row_group_stats_.clear();
     auto metadata = reader->metadata();
     for (int i = 0; i < metadata->num_row_groups(); ++i) {
       auto row_group_metadata = metadata->RowGroup(i);
@@ -138,6 +185,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test {
     }
 
     // Read page size statistics in order.
+    page_stats_.clear();
     auto page_index_reader = reader->GetPageIndexReader();
     ASSERT_NE(page_index_reader, nullptr);
 
@@ -168,11 +216,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test 
{
     }
   }
 
-  void Reset() {
-    buffer_.reset();
-    row_group_stats_.clear();
-    page_stats_.clear();
-  }
+  void Reset() { buffer_.reset(); }
 
  protected:
   std::shared_ptr<Buffer> buffer_;
@@ -187,7 +231,7 @@ TEST_F(SizeStatisticsRoundTripTest, EnableSizeStats) {
       ::arrow::field("a", ::arrow::list(::arrow::list(::arrow::int32()))),
       ::arrow::field("b", ::arrow::list(::arrow::list(::arrow::utf8()))),
   });
-  // First two rows are in one row group, and the other two rows are in 
another row group.
+  // First two rows will be in one row group, and the other two rows in 
another row group.
   auto table = ::arrow::TableFromJSON(schema, {R"([
       [ [[1],[1,1],[1,1,1]], [["a"],["a","a"],["a","a","a"]] ],
       [ [[0,1,null]],        [["foo","bar",null]]            ],
@@ -198,7 +242,7 @@ TEST_F(SizeStatisticsRoundTripTest, EnableSizeStats) {
   for (auto size_stats_level :
        {SizeStatisticsLevel::None, SizeStatisticsLevel::ColumnChunk,
         SizeStatisticsLevel::PageAndColumnChunk}) {
-    WriteFile(size_stats_level, table);
+    WriteFile(size_stats_level, table, /*max_row_group_length=*/2, 
/*page_size=*/1);
     ReadSizeStatistics();
 
     if (size_stats_level == SizeStatisticsLevel::None) {
@@ -251,8 +295,8 @@ TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) {
       {::arrow::field("a", ::arrow::dictionary(::arrow::int16(), 
::arrow::utf8()))});
   WriteFile(
       SizeStatisticsLevel::PageAndColumnChunk,
-      ::arrow::TableFromJSON(schema, 
{R"([["aa"],["aaa"],[null],["a"],["aaa"],["a"]])"}));
-
+      ::arrow::TableFromJSON(schema, 
{R"([["aa"],["aaa"],[null],["a"],["aaa"],["a"]])"}),
+      /*max_row_group_length=*/2, /*page_size=*/1);
   ReadSizeStatistics();
   EXPECT_THAT(row_group_stats_,
               ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{0, 2},
@@ -276,4 +320,60 @@ TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) {
                                                         
/*byte_array_bytes=*/{4}}));
 }
 
+TEST_F(SizeStatisticsRoundTripTest, WritePageInBatches) {
+  // Rep/def level histograms are updates in batches of `write_batch_size` 
levels
+  // inside a single page. Exercise the logic with more than one batch per 
page.
+  auto schema = ::arrow::schema({::arrow::field("a", 
::arrow::list(::arrow::utf8()))});
+  auto table = ::arrow::TableFromJSON(schema, {R"([
+      [ [null,"a","ab"] ],
+      [ null ],
+      [ [] ],
+      [ [null,"d","de"] ],
+      [ ["g","gh",null] ],
+      [ ["j","jk",null] ]
+    ])"});
+  for (int write_batch_size : {100, 5, 4, 3, 2, 1}) {
+    ARROW_SCOPED_TRACE("write_batch_size = ", write_batch_size);
+    WriteFile(SizeStatisticsLevel::PageAndColumnChunk, table,
+              /*max_row_group_length=*/1000, /*page_size=*/1000, 
write_batch_size);
+    ReadSizeStatistics();
+    EXPECT_THAT(row_group_stats_,
+                ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{1, 1, 4, 
8},
+                                                      /*rep_levels=*/{6, 8},
+                                                      
/*byte_array_bytes=*/12}));
+    EXPECT_THAT(page_stats_,
+                ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{1, 
1, 4, 8},
+                                                          /*rep_levels=*/{6, 
8},
+                                                          
/*byte_array_bytes=*/{12}}));
+  }
+}
+
+TEST_F(SizeStatisticsRoundTripTest, LargePage) {
+  // When max_level is 1, the levels are summed in 2**30 chunks, exercise this
+  // by testing with a 90000 rows table;
+  auto schema = ::arrow::schema({::arrow::field("a", ::arrow::utf8())});
+  auto seed_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+    [ "a" ],
+    [ "bc" ],
+    [ null ]
+  ])");
+  ASSERT_OK_AND_ASSIGN(auto table, ::arrow::Table::FromRecordBatches(
+                                       ::arrow::RecordBatchVector(30000, 
seed_batch)));
+  ASSERT_OK_AND_ASSIGN(table, table->CombineChunks());
+  ASSERT_EQ(table->num_rows(), 90000);
+
+  WriteFile(SizeStatisticsLevel::PageAndColumnChunk, table,
+            /*max_row_group_length=*/1 << 30, /*page_size=*/1 << 30,
+            /*write_batch_size=*/50000);
+  ReadSizeStatistics();
+  EXPECT_THAT(row_group_stats_,
+              ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{30000, 
60000},
+                                                    /*rep_levels=*/{90000},
+                                                    
/*byte_array_bytes=*/90000}));
+  EXPECT_THAT(page_stats_,
+              ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{30000, 
60000},
+                                                        /*rep_levels=*/{90000},
+                                                        
/*byte_array_bytes=*/{90000}}));
+}
+
 }  // namespace parquet

Reply via email to