This is an automated email from the ASF dual-hosted git repository.

wjones127 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 05c8b222c4 GH-34086: [C++][Parquet] Fix writing num_rows to data page 
v2 (#34096)
05c8b222c4 is described below

commit 05c8b222c4d409d538cc5f92c802d4d683407ad5
Author: Gang Wu <[email protected]>
AuthorDate: Sat Feb 11 01:41:15 2023 +0800

    GH-34086: [C++][Parquet] Fix writing num_rows to data page v2 (#34096)
    
    ### Rationale for this change
    
    The C++ parquet writer does not correctly fill num_rows field to DataPageV2 
header.
    
    ### What changes are included in this PR?
    
    ColumnWriter keeps track of number of rows buffered in the current data 
page and then fills it into header of data page v2.
    
    ### Are these changes tested?
    
    A test case has been added to make sure the data page header has been set 
correctly for required, optional and repeated columns.
    
    ### Are there any user-facing changes?
    
    No.
    * Closes: #34086
    
    Authored-by: Gang Wu <[email protected]>
    Signed-off-by: Will Jones <[email protected]>
---
 cpp/src/parquet/column_writer.cc      | 35 ++++++++++-----
 cpp/src/parquet/column_writer_test.cc | 85 +++++++++++++++++++++++++++++++++++
 2 files changed, 108 insertions(+), 12 deletions(-)

diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index f549331faf..26d25ba17f 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -657,6 +657,7 @@ class ColumnWriterImpl {
         allocator_(properties->memory_pool()),
         num_buffered_values_(0),
         num_buffered_encoded_values_(0),
+        num_buffered_rows_(0),
         rows_written_(0),
         total_bytes_written_(0),
         total_compressed_bytes_(0),
@@ -757,10 +758,13 @@ class ColumnWriterImpl {
   // case.
   int64_t num_buffered_values_;
 
-  // The total number of stored values. For repeated or optional values, this
-  // number may be lower than num_buffered_values_.
+  // The total number of stored values in the data page. For repeated or 
optional
+  // values, this number may be lower than num_buffered_values_.
   int64_t num_buffered_encoded_values_;
 
+  // Total number of rows buffered in the data page.
+  int64_t num_buffered_rows_;
+
   // Total number of rows written with this ColumnWriter
   int64_t rows_written_;
 
@@ -869,6 +873,7 @@ void ColumnWriterImpl::AddDataPage() {
   InitSinks();
   num_buffered_values_ = 0;
   num_buffered_encoded_values_ = 0;
+  num_buffered_rows_ = 0;
 }
 
 void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
@@ -894,6 +899,8 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t 
definition_levels_rle_size,
     compressed_data = uncompressed_data_;
   }
 
+  int32_t num_values = static_cast<int32_t>(num_buffered_values_);
+
   // Write the page to OutputStream eagerly if there is no dictionary or
   // if dictionary encoding has fallen back to PLAIN
   if (has_dictionary_ && !fallback_) {  // Save pages until end of dictionary 
encoding
@@ -901,15 +908,14 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t 
definition_levels_rle_size,
         auto compressed_data_copy,
         compressed_data->CopySlice(0, compressed_data->size(), allocator_));
     std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
-        compressed_data_copy, static_cast<int32_t>(num_buffered_values_), 
encoding_,
-        Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
+        compressed_data_copy, num_values, encoding_, Encoding::RLE, 
Encoding::RLE,
+        uncompressed_size, page_stats);
     total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
 
     data_pages_.push_back(std::move(page_ptr));
   } else {  // Eagerly write pages
-    DataPageV1 page(compressed_data, 
static_cast<int32_t>(num_buffered_values_),
-                    encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size,
-                    page_stats);
+    DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE, 
Encoding::RLE,
+                    uncompressed_size, page_stats);
     WriteDataPage(page);
   }
 }
@@ -943,6 +949,7 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t 
definition_levels_rle_size,
 
   int32_t num_values = static_cast<int32_t>(num_buffered_values_);
   int32_t null_count = static_cast<int32_t>(page_stats.null_count);
+  int32_t num_rows = static_cast<int32_t>(num_buffered_rows_);
   int32_t def_levels_byte_length = 
static_cast<int32_t>(definition_levels_rle_size);
   int32_t rep_levels_byte_length = 
static_cast<int32_t>(repetition_levels_rle_size);
 
@@ -952,12 +959,12 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t 
definition_levels_rle_size,
     PARQUET_ASSIGN_OR_THROW(auto data_copy,
                             combined->CopySlice(0, combined->size(), 
allocator_));
     std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>(
-        combined, num_values, null_count, num_values, encoding_, 
def_levels_byte_length,
+        combined, num_values, null_count, num_rows, encoding_, 
def_levels_byte_length,
         rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), 
page_stats);
     total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
     data_pages_.push_back(std::move(page_ptr));
   } else {
-    DataPageV2 page(combined, num_values, null_count, num_values, encoding_,
+    DataPageV2 page(combined, num_values, null_count, num_rows, encoding_,
                     def_levels_byte_length, rep_levels_byte_length, 
uncompressed_size,
                     pager_->has_compressor(), page_stats);
     WriteDataPage(page);
@@ -1263,6 +1270,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
       for (int64_t i = 0; i < num_values; ++i) {
         if (rep_levels[i] == 0) {
           rows_written_++;
+          num_buffered_rows_++;
         }
       }
 
@@ -1270,6 +1278,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
     } else {
       // Each value is exactly one row
       rows_written_ += num_values;
+      num_buffered_rows_ += num_values;
     }
     return values_to_write;
   }
@@ -1353,12 +1362,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
       for (int64_t i = 0; i < num_levels; ++i) {
         if (rep_levels[i] == 0) {
           rows_written_++;
+          num_buffered_rows_++;
         }
       }
       WriteRepetitionLevels(num_levels, rep_levels);
     } else {
       // Each value is exactly one row
       rows_written_ += num_levels;
+      num_buffered_rows_ += num_levels;
     }
   }
 
@@ -1477,9 +1488,9 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
     int64_t batch_num_values = 0;
     int64_t batch_num_spaced_values = 0;
     int64_t null_count = ::arrow::kUnknownNullCount;
-    // Bits is not null for nullable values.  At this point in the code we 
can't determine
-    // if the leaf array has the same null values as any parents it might have 
had so we
-    // need to recompute it from def levels.
+    // Bits is not null for nullable values.  At this point in the code we 
can't
+    // determine if the leaf array has the same null values as any parents it 
might have
+    // had so we need to recompute it from def levels.
     MaybeCalculateValidityBits(AddIfNotNull(def_levels, offset), batch_size,
                                &batch_num_values, &batch_num_spaced_values, 
&null_count);
     WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset),
diff --git a/cpp/src/parquet/column_writer_test.cc 
b/cpp/src/parquet/column_writer_test.cc
index 78d627c382..20a6799c57 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -27,6 +27,7 @@
 
 #include "parquet/column_reader.h"
 #include "parquet/column_writer.h"
+#include "parquet/file_reader.h"
 #include "parquet/file_writer.h"
 #include "parquet/metadata.h"
 #include "parquet/platform.h"
@@ -1058,5 +1059,89 @@ TEST(TestLevelEncoder, MinimumBufferSize2) {
   }
 }
 
+TEST(TestColumnWriter, WriteDataPageV2Header) {
+  auto sink = CreateOutputStream();
+  auto schema = std::static_pointer_cast<GroupNode>(
+      GroupNode::Make("schema", Repetition::REQUIRED,
+                      {
+                          schema::Int32("required", Repetition::REQUIRED),
+                          schema::Int32("optional", Repetition::OPTIONAL),
+                          schema::Int32("repeated", Repetition::REPEATED),
+                      }));
+  auto properties = WriterProperties::Builder()
+                        .disable_dictionary()
+                        ->data_page_version(ParquetDataPageVersion::V2)
+                        ->build();
+  auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+  auto rg_writer = file_writer->AppendRowGroup();
+
+  constexpr int32_t num_rows = 100;
+
+  auto required_writer = 
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  for (int32_t i = 0; i < num_rows; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+
+  // Write a null value at every other row.
+  auto optional_writer = 
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  for (int32_t i = 0; i < num_rows; i++) {
+    int16_t definition_level = i % 2 == 0 ? 1 : 0;
+    optional_writer->WriteBatch(1, &definition_level, nullptr, &i);
+  }
+
+  // Each row has repeated twice.
+  auto repeated_writer = 
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  for (int i = 0; i < 2 * num_rows; i++) {
+    int32_t value = i * 1000;
+    int16_t definition_level = 1;
+    int16_t repetition_level = i % 2 == 0 ? 1 : 0;
+    repeated_writer->WriteBatch(1, &definition_level, &repetition_level, 
&value);
+  }
+
+  ASSERT_NO_THROW(file_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+  auto file_reader = ParquetFileReader::Open(
+      std::make_shared<::arrow::io::BufferReader>(buffer), 
default_reader_properties());
+  auto metadata = file_reader->metadata();
+  ASSERT_EQ(1, metadata->num_row_groups());
+  auto row_group_reader = file_reader->RowGroup(0);
+
+  // Verify required column.
+  {
+    auto page_reader = row_group_reader->GetColumnPageReader(0);
+    auto page = page_reader->NextPage();
+    ASSERT_NE(page, nullptr);
+    auto data_page = std::static_pointer_cast<DataPageV2>(page);
+    EXPECT_EQ(num_rows, data_page->num_rows());
+    EXPECT_EQ(num_rows, data_page->num_values());
+    EXPECT_EQ(0, data_page->num_nulls());
+    EXPECT_EQ(page_reader->NextPage(), nullptr);
+  }
+
+  // Verify optional column.
+  {
+    auto page_reader = row_group_reader->GetColumnPageReader(1);
+    auto page = page_reader->NextPage();
+    ASSERT_NE(page, nullptr);
+    auto data_page = std::static_pointer_cast<DataPageV2>(page);
+    EXPECT_EQ(num_rows, data_page->num_rows());
+    EXPECT_EQ(num_rows, data_page->num_values());
+    EXPECT_EQ(num_rows / 2, data_page->num_nulls());
+    EXPECT_EQ(page_reader->NextPage(), nullptr);
+  }
+
+  // Verify repeated column.
+  {
+    auto page_reader = row_group_reader->GetColumnPageReader(2);
+    auto page = page_reader->NextPage();
+    ASSERT_NE(page, nullptr);
+    auto data_page = std::static_pointer_cast<DataPageV2>(page);
+    EXPECT_EQ(num_rows, data_page->num_rows());
+    EXPECT_EQ(num_rows * 2, data_page->num_values());
+    EXPECT_EQ(0, data_page->num_nulls());
+    EXPECT_EQ(page_reader->NextPage(), nullptr);
+  }
+}
+
 }  // namespace test
 }  // namespace parquet

Reply via email to