This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c764c3ef5 GH-34326: [C++][Parquet] Page null_count is incorrect if 
stats is disabled (#34327)
7c764c3ef5 is described below

commit 7c764c3ef55db034239dd9244874168587232edd
Author: Gang Wu <[email protected]>
AuthorDate: Sat Mar 4 01:54:00 2023 +0800

    GH-34326: [C++][Parquet] Page null_count is incorrect if stats is disabled 
(#34327)
    
    ### Rationale for this change
    
    Parquet ColumnWriter obtains null_count of a page from page stats as below 
([link](https://github.com/apache/arrow/blob/main/cpp/src/parquet/column_writer.cc#L952))
    ```cpp
      EncodedStatistics page_stats = GetPageStatistics();
    
      int32_t null_count = static_cast<int32_t>(page_stats.null_count);
    
      DataPageV2 page(combined, num_values, null_count, num_rows, encoding_,
                        def_levels_byte_length, rep_levels_byte_length, 
uncompressed_size,
                        pager_->has_compressor(), page_stats);
    ```
    
    However, the null_count is uninitialized if page stat is not enabled:
    ```cpp
      EncodedStatistics GetPageStatistics() override {
        EncodedStatistics result;
        if (page_statistics_) result = page_statistics_->Encode();
        return result;
      }
    ```
    
    ### What changes are included in this PR?
    
    ColumnWriter collects null_count by itself. To be safe, it also checks that 
from page stats if available.
    
    ### Are these changes tested?
    
    Added a test case to cover null counts of optional and repeated fields are 
properly set.
    
    ### Are there any user-facing changes?
    
    No.
    * Closes: #34326
    
    Authored-by: Gang Wu <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/parquet/column_writer.cc      | 42 +++++++++++++--------
 cpp/src/parquet/column_writer_test.cc | 69 +++++++++++++++++++++++++++++++++++
 2 files changed, 96 insertions(+), 15 deletions(-)

diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 829beb5325..f8072662a1 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -672,6 +672,7 @@ class ColumnWriterImpl {
         allocator_(properties->memory_pool()),
         num_buffered_values_(0),
         num_buffered_encoded_values_(0),
+        num_buffered_nulls_(0),
         num_buffered_rows_(0),
         rows_written_(0),
         total_bytes_written_(0),
@@ -777,6 +778,9 @@ class ColumnWriterImpl {
   // values, this number may be lower than num_buffered_values_.
   int64_t num_buffered_encoded_values_;
 
+  // The total number of nulls stored in the data page.
+  int64_t num_buffered_nulls_;
+
   // Total number of rows buffered in the data page.
   int64_t num_buffered_rows_;
 
@@ -890,6 +894,7 @@ void ColumnWriterImpl::AddDataPage() {
   num_buffered_values_ = 0;
   num_buffered_encoded_values_ = 0;
   num_buffered_rows_ = 0;
+  num_buffered_nulls_ = 0;
 }
 
 void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
@@ -964,11 +969,15 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t 
definition_levels_rle_size,
   ResetPageStatistics();
 
   int32_t num_values = static_cast<int32_t>(num_buffered_values_);
-  int32_t null_count = static_cast<int32_t>(page_stats.null_count);
+  int32_t null_count = static_cast<int32_t>(num_buffered_nulls_);
   int32_t num_rows = static_cast<int32_t>(num_buffered_rows_);
   int32_t def_levels_byte_length = 
static_cast<int32_t>(definition_levels_rle_size);
   int32_t rep_levels_byte_length = 
static_cast<int32_t>(repetition_levels_rle_size);
 
+  // page_stats.null_count is not set when page_statistics_ is nullptr. It is 
only used
+  // here for safety check.
+  DCHECK(!page_stats.has_null_count || page_stats.null_count == null_count);
+
   // Write the page to OutputStream eagerly if there is no dictionary or
   // if dictionary encoding has fallen back to PLAIN
   if (has_dictionary_ && !fallback_) {  // Save pages until end of dictionary 
encoding
@@ -1154,9 +1163,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
       if (values_to_write > 0) {
         DCHECK_NE(nullptr, values);
       }
-      WriteValues(AddIfNotNull(values, value_offset), values_to_write,
-                  batch_size - values_to_write);
-      CommitWriteAndCheckPageLimit(batch_size, values_to_write, check_page);
+      const int64_t num_nulls = batch_size - values_to_write;
+      WriteValues(AddIfNotNull(values, value_offset), values_to_write, 
num_nulls);
+      CommitWriteAndCheckPageLimit(batch_size, values_to_write, num_nulls, 
check_page);
       value_offset += values_to_write;
 
       // Dictionary size checked separately from data page size since we
@@ -1186,13 +1195,15 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
       if (bits_buffer_ != nullptr) {
         WriteValuesSpaced(AddIfNotNull(values, value_offset), batch_num_values,
                           batch_num_spaced_values, bits_buffer_->data(), 
/*offset=*/0,
-                          /*num_levels=*/batch_size);
+                          /*num_levels=*/batch_size, null_count);
       } else {
         WriteValuesSpaced(AddIfNotNull(values, value_offset), batch_num_values,
                           batch_num_spaced_values, valid_bits,
-                          valid_bits_offset + value_offset, 
/*num_levels=*/batch_size);
+                          valid_bits_offset + value_offset, 
/*num_levels=*/batch_size,
+                          null_count);
       }
-      CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values, 
check_page);
+      CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values, 
null_count,
+                                   check_page);
       value_offset += batch_num_spaced_values;
 
       // Dictionary size checked separately from data page size since we
@@ -1381,7 +1392,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
           *out_spaced_values_to_write +=
               def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0;
         }
-        *null_count = *out_values_to_write - *out_spaced_values_to_write;
+        *null_count = batch_size - *out_values_to_write;
       }
       return;
     }
@@ -1448,9 +1459,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
   }
 
   void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
-                                    bool check_page_size) {
+                                    int64_t num_nulls, bool check_page_size) {
     num_buffered_values_ += num_levels;
     num_buffered_encoded_values_ += num_values;
+    num_buffered_nulls_ += num_nulls;
 
     if (check_page_size &&
         current_encoder_->EstimatedDataEncodedSize() >= 
properties_->data_pagesize()) {
@@ -1501,7 +1513,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
 
   void WriteValuesSpaced(const T* values, int64_t num_values, int64_t 
num_spaced_values,
                          const uint8_t* valid_bits, int64_t valid_bits_offset,
-                         int64_t num_levels) {
+                         int64_t num_levels, int64_t num_nulls) {
     if (num_values != num_spaced_values) {
       current_value_encoder_->PutSpaced(values, 
static_cast<int>(num_spaced_values),
                                         valid_bits, valid_bits_offset);
@@ -1509,7 +1521,6 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
       current_value_encoder_->Put(values, static_cast<int>(num_values));
     }
     if (page_statistics_ != nullptr) {
-      const int64_t num_nulls = num_levels - num_values;
       page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset,
                                      num_spaced_values, num_values, num_nulls);
     }
@@ -1613,7 +1624,7 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
         writeable_indices,
         MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool));
     dict_encoder->PutIndices(*writeable_indices);
-    CommitWriteAndCheckPageLimit(batch_size, batch_num_values, check_page);
+    CommitWriteAndCheckPageLimit(batch_size, batch_num_values, null_count, 
check_page);
     value_offset += batch_num_spaced_values;
   };
 
@@ -2086,14 +2097,15 @@ Status 
TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
         data_slice, MaybeReplaceValidity(data_slice, null_count, 
ctx->memory_pool));
 
     current_encoder_->Put(*data_slice);
+    // Null values in ancestors count as nulls.
+    const int64_t non_null = data_slice->length() - data_slice->null_count();
     if (page_statistics_ != nullptr) {
       page_statistics_->Update(*data_slice, /*update_counts=*/false);
-      // Null values in ancestors count as nulls.
-      int64_t non_null = data_slice->length() - data_slice->null_count();
       page_statistics_->IncrementNullCount(batch_size - non_null);
       page_statistics_->IncrementNumValues(non_null);
     }
-    CommitWriteAndCheckPageLimit(batch_size, batch_num_values, check_page);
+    CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size - 
non_null,
+                                 check_page);
     CheckDictionarySizeLimit();
     value_offset += batch_num_spaced_values;
   };
diff --git a/cpp/src/parquet/column_writer_test.cc 
b/cpp/src/parquet/column_writer_test.cc
index aa05f4e791..65c1ccbdfd 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -1482,5 +1482,74 @@ TEST_F(ColumnWriterTestSizeEstimated, 
BufferedCompression) {
   EXPECT_GT(written_size, required_writer->total_compressed_bytes_written());
 }
 
+TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
+  auto sink = CreateOutputStream();
+  auto list_type = GroupNode::Make("list", Repetition::REPEATED,
+                                   {schema::Int32("elem", 
Repetition::OPTIONAL)});
+  auto schema = std::static_pointer_cast<GroupNode>(GroupNode::Make(
+      "schema", Repetition::REQUIRED,
+      {
+          schema::Int32("non_null", Repetition::OPTIONAL),
+          schema::Int32("half_null", Repetition::OPTIONAL),
+          schema::Int32("all_null", Repetition::OPTIONAL),
+          GroupNode::Make("half_null_list", Repetition::OPTIONAL, {list_type}),
+          GroupNode::Make("half_empty_list", Repetition::OPTIONAL, 
{list_type}),
+          GroupNode::Make("half_list_of_null", Repetition::OPTIONAL, 
{list_type}),
+          GroupNode::Make("all_single_list", Repetition::OPTIONAL, 
{list_type}),
+      }));
+  auto properties = WriterProperties::Builder()
+                        /* Use V2 data page to read null_count from header */
+                        .data_page_version(ParquetDataPageVersion::V2)
+                        /* Disable stats to test null_count is properly set */
+                        ->disable_statistics()
+                        ->disable_dictionary()
+                        ->build();
+  auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+  auto rg_writer = file_writer->AppendRowGroup();
+
+  constexpr int32_t num_rows = 10;
+  constexpr int32_t num_cols = 7;
+  const std::vector<std::vector<int16_t>> def_levels_by_col = {
+      {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, {1, 0, 1, 0, 1, 0, 1, 0, 1, 0},
+      {0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, {0, 3, 0, 3, 0, 3, 0, 3, 0, 3},
+      {1, 3, 1, 3, 1, 3, 1, 3, 1, 3}, {2, 3, 2, 3, 2, 3, 2, 3, 2, 3},
+      {3, 3, 3, 3, 3, 3, 3, 3, 3, 3},
+  };
+  const std::vector<int16_t> ref_levels(num_rows, 0);
+  const std::vector<int32_t> values(num_rows, 123);
+  const std::vector<int64_t> expect_null_count_by_col = {0, 5, 10, 5, 5, 5, 0};
+
+  for (int32_t i = 0; i < num_cols; ++i) {
+    auto writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+    writer->WriteBatch(num_rows, def_levels_by_col[i].data(),
+                       i >= 3 ? ref_levels.data() : nullptr, values.data());
+  }
+
+  ASSERT_NO_THROW(file_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+  auto file_reader = ParquetFileReader::Open(
+      std::make_shared<::arrow::io::BufferReader>(buffer), 
default_reader_properties());
+  auto metadata = file_reader->metadata();
+  ASSERT_EQ(1, metadata->num_row_groups());
+  auto row_group_reader = file_reader->RowGroup(0);
+
+  std::shared_ptr<Page> page;
+  for (int32_t i = 0; i < num_cols; ++i) {
+    auto page_reader = row_group_reader->GetColumnPageReader(i);
+    int64_t num_nulls_read = 0;
+    int64_t num_rows_read = 0;
+    int64_t num_values_read = 0;
+    while ((page = page_reader->NextPage()) != nullptr) {
+      auto data_page = std::static_pointer_cast<DataPageV2>(page);
+      num_nulls_read += data_page->num_nulls();
+      num_rows_read += data_page->num_rows();
+      num_values_read += data_page->num_values();
+    }
+    EXPECT_EQ(expect_null_count_by_col[i], num_nulls_read);
+    EXPECT_EQ(num_rows, num_rows_read);
+    EXPECT_EQ(num_rows, num_values_read);
+  }
+}
+
 }  // namespace test
 }  // namespace parquet

Reply via email to