This is an automated email from the ASF dual-hosted git repository.

wjones127 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new b888f4d6c7 GH-34142: [C++][Parquet] Fix record not to span multiple 
pages (#34193)
b888f4d6c7 is described below

commit b888f4d6c7dc490ce17b9f64d32af23ffc6f4617
Author: Gang Wu <[email protected]>
AuthorDate: Fri Feb 24 06:09:37 2023 +0800

    GH-34142: [C++][Parquet] Fix record not to span multiple pages (#34193)
    
    ### Rationale for this change
    
    Parquet data page v2 requires pages change on record boundaries. Currently 
the parquet-cpp writer does not enforce this.
    
    ### What changes are included in this PR?
    
    Change `ColumnWriter` to split data page on record boundaries when data 
page v2 is applied.
    
    ### Are these changes tested?
    
    Add test `TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundaries)`
    
    ### Are there any user-facing changes?
    
    No.
    * Closes: #34142
    
    Authored-by: Gang Wu <[email protected]>
    Signed-off-by: Will Jones <[email protected]>
---
 cpp/src/parquet/column_writer.cc      |  94 +++++++++++++++----
 cpp/src/parquet/column_writer_test.cc | 170 ++++++++++++++++++++++++++++++++++
 2 files changed, 246 insertions(+), 18 deletions(-)

diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 9f4c0b6900..4a5a6819b3 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -1015,11 +1015,59 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page_size=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, 
/*check_page_size=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& 
action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+
+    if (end_offset < num_levels) {
+      // This is not the last chunk of batch and end_offset is a record 
boundary.
+      // It is a good chance to check the page size.
+      action(offset, end_offset - offset, /*check_page_size=*/true);
+    } else {
+      DCHECK_EQ(end_offset, num_levels);
+      // This is the last chunk of batch, and we do not know whether 
end_offset is a
+      // record boundary. Find the offset to beginning of last record in this 
chunk,
+      // so we can check page size.
+      int64_t last_record_begin_offset = num_levels - 1;
+      while (last_record_begin_offset >= offset &&
+             rep_levels[last_record_begin_offset] != 0) {
+        last_record_begin_offset--;
+      }
+
+      if (offset < last_record_begin_offset) {
+        // We have found the beginning of last record and can check page size.
+        action(offset, last_record_begin_offset - offset, 
/*check_page_size=*/true);
+        offset = last_record_begin_offset;
+      }
+
+      // There is no record boundary in this chunk and cannot check page size.
+      action(offset, end_offset - offset, /*check_page_size=*/false);
+    }
+
+    offset = end_offset;
   }
 }
 
@@ -1083,7 +1131,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
     // pagesize limit
     int64_t value_offset = 0;
 
-    auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
+    auto WriteChunk = [&](int64_t offset, int64_t batch_size, bool check_page) 
{
       int64_t values_to_write = WriteLevels(batch_size, 
AddIfNotNull(def_levels, offset),
                                             AddIfNotNull(rep_levels, offset));
 
@@ -1093,14 +1141,15 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
       }
       WriteValues(AddIfNotNull(values, value_offset), values_to_write,
                   batch_size - values_to_write);
-      CommitWriteAndCheckPageLimit(batch_size, values_to_write);
+      CommitWriteAndCheckPageLimit(batch_size, values_to_write, check_page);
       value_offset += values_to_write;
 
       // Dictionary size checked separately from data page size since we
       // circumvent this check when writing ::arrow::DictionaryArray directly
       CheckDictionarySizeLimit();
     };
-    DoInBatches(num_values, properties_->write_batch_size(), WriteChunk);
+    DoInBatches(def_levels, rep_levels, num_values, 
properties_->write_batch_size(),
+                WriteChunk, pages_change_on_record_boundaries());
     return value_offset;
   }
 
@@ -1109,7 +1158,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
                         int64_t valid_bits_offset, const T* values) override {
     // Like WriteBatch, but for spaced values
     int64_t value_offset = 0;
-    auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
+    auto WriteChunk = [&](int64_t offset, int64_t batch_size, bool check_page) 
{
       int64_t batch_num_values = 0;
       int64_t batch_num_spaced_values = 0;
       int64_t null_count;
@@ -1128,14 +1177,15 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
                           batch_num_spaced_values, valid_bits,
                           valid_bits_offset + value_offset, 
/*num_levels=*/batch_size);
       }
-      CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values);
+      CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values, 
check_page);
       value_offset += batch_num_spaced_values;
 
       // Dictionary size checked separately from data page size since we
       // circumvent this check when writing ::arrow::DictionaryArray directly
       CheckDictionarySizeLimit();
     };
-    DoInBatches(num_values, properties_->write_batch_size(), WriteChunk);
+    DoInBatches(def_levels, rep_levels, num_values, 
properties_->write_batch_size(),
+                WriteChunk, pages_change_on_record_boundaries());
   }
 
   Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
@@ -1228,6 +1278,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
 
   const WriterProperties* properties() override { return properties_; }
 
+  bool pages_change_on_record_boundaries() const {
+    return properties_->data_page_version() == ParquetDataPageVersion::V2;
+  }
+
  private:
   using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
   using TypedStats = TypedStatistics<DType>;
@@ -1374,11 +1428,13 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
     }
   }
 
-  void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values) {
+  void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
+                                    bool check_page_size) {
     num_buffered_values_ += num_levels;
     num_buffered_encoded_values_ += num_values;
 
-    if (current_encoder_->EstimatedDataEncodedSize() >= 
properties_->data_pagesize()) {
+    if (check_page_size &&
+        current_encoder_->EstimatedDataEncodedSize() >= 
properties_->data_pagesize()) {
       AddDataPage();
     }
   }
@@ -1518,7 +1574,7 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
   };
 
   int64_t value_offset = 0;
-  auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size) {
+  auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size, bool 
check_page) {
     int64_t batch_num_values = 0;
     int64_t batch_num_spaced_values = 0;
     int64_t null_count = ::arrow::kUnknownNullCount;
@@ -1538,7 +1594,7 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
         writeable_indices,
         MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool));
     dict_encoder->PutIndices(*writeable_indices);
-    CommitWriteAndCheckPageLimit(batch_size, batch_num_values);
+    CommitWriteAndCheckPageLimit(batch_size, batch_num_values, check_page);
     value_offset += batch_num_spaced_values;
   };
 
@@ -1562,8 +1618,9 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
     return WriteDense();
   }
 
-  PARQUET_CATCH_NOT_OK(
-      DoInBatches(num_levels, properties_->write_batch_size(), 
WriteIndicesChunk));
+  PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels,
+                                   properties_->write_batch_size(), 
WriteIndicesChunk,
+                                   pages_change_on_record_boundaries()));
   return Status::OK();
 }
 
@@ -1995,7 +2052,7 @@ Status 
TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
   }
 
   int64_t value_offset = 0;
-  auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
+  auto WriteChunk = [&](int64_t offset, int64_t batch_size, bool check_page) {
     int64_t batch_num_values = 0;
     int64_t batch_num_spaced_values = 0;
     int64_t null_count = 0;
@@ -2017,13 +2074,14 @@ Status 
TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
       page_statistics_->IncrementNullCount(batch_size - non_null);
       page_statistics_->IncrementNumValues(non_null);
     }
-    CommitWriteAndCheckPageLimit(batch_size, batch_num_values);
+    CommitWriteAndCheckPageLimit(batch_size, batch_num_values, check_page);
     CheckDictionarySizeLimit();
     value_offset += batch_num_spaced_values;
   };
 
-  PARQUET_CATCH_NOT_OK(
-      DoInBatches(num_levels, properties_->write_batch_size(), WriteChunk));
+  PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels,
+                                   properties_->write_batch_size(), WriteChunk,
+                                   pages_change_on_record_boundaries()));
   return Status::OK();
 }
 
diff --git a/cpp/src/parquet/column_writer_test.cc 
b/cpp/src/parquet/column_writer_test.cc
index fe326caf1a..f97108153b 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -1148,5 +1148,175 @@ TEST(TestColumnWriter, WriteDataPageV2Header) {
   }
 }
 
+// The test below checks that data page v2 changes on record boundaries for
+// all repetition types (i.e. required, optional, and repeated)
+TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundaries) {
+  auto sink = CreateOutputStream();
+  auto schema = std::static_pointer_cast<GroupNode>(
+      GroupNode::Make("schema", Repetition::REQUIRED,
+                      {schema::Int32("required", Repetition::REQUIRED),
+                       schema::Int32("optional", Repetition::OPTIONAL),
+                       schema::Int32("repeated", Repetition::REPEATED)}));
+  // Write at most 11 levels per batch.
+  constexpr int64_t batch_size = 11;
+  auto properties = WriterProperties::Builder()
+                        .disable_dictionary()
+                        ->data_page_version(ParquetDataPageVersion::V2)
+                        ->write_batch_size(batch_size)
+                        ->data_pagesize(1) /* every page size check creates a 
new page */
+                        ->build();
+  auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+  auto rg_writer = file_writer->AppendRowGroup();
+
+  constexpr int32_t num_levels = 100;
+  const std::vector<int32_t> values(num_levels, 1024);
+  std::array<int16_t, num_levels> def_levels;
+  std::array<int16_t, num_levels> rep_levels;
+  for (int32_t i = 0; i < num_levels; i++) {
+    def_levels[i] = i % 2 == 0 ? 1 : 0;
+    rep_levels[i] = i % 2 == 0 ? 0 : 1;
+  }
+
+  auto required_writer = 
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  required_writer->WriteBatch(num_levels, nullptr, nullptr, values.data());
+
+  // Write a null value at every other row.
+  auto optional_writer = 
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  optional_writer->WriteBatch(num_levels, def_levels.data(), nullptr, 
values.data());
+
+  // Each row has repeated twice.
+  auto repeated_writer = 
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(),
+                              values.data());
+  repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(),
+                              values.data());
+
+  ASSERT_NO_THROW(file_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+  auto file_reader = ParquetFileReader::Open(
+      std::make_shared<::arrow::io::BufferReader>(buffer), 
default_reader_properties());
+  auto metadata = file_reader->metadata();
+  ASSERT_EQ(1, metadata->num_row_groups());
+  auto row_group_reader = file_reader->RowGroup(0);
+
+  // Check if pages are changed on record boundaries.
+  constexpr int num_columns = 3;
+  const std::array<int64_t, num_columns> expected_num_pages = {10, 10, 19};
+  for (int i = 0; i < num_columns; ++i) {
+    auto page_reader = row_group_reader->GetColumnPageReader(i);
+    int64_t num_rows = 0;
+    int64_t num_pages = 0;
+    std::shared_ptr<Page> page;
+    while ((page = page_reader->NextPage()) != nullptr) {
+      auto data_page = std::static_pointer_cast<DataPageV2>(page);
+      if (i < 2) {
+        EXPECT_EQ(data_page->num_values(), data_page->num_rows());
+      } else {
+        // Make sure repeated column has 2 values per row and not span 
multiple pages.
+        EXPECT_EQ(data_page->num_values(), 2 * data_page->num_rows());
+      }
+      num_rows += data_page->num_rows();
+      num_pages++;
+    }
+    EXPECT_EQ(num_levels, num_rows);
+    EXPECT_EQ(expected_num_pages[i], num_pages);
+  }
+}
+
+// The test below checks that data page v2 changes on record boundaries for
+// repeated columns with small batches.
+TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundariesWithSmallBatches) 
{
+  auto sink = CreateOutputStream();
+  auto schema = std::static_pointer_cast<GroupNode>(
+      GroupNode::Make("schema", Repetition::REQUIRED,
+                      {schema::Int32("tiny_repeat", Repetition::REPEATED),
+                       schema::Int32("small_repeat", Repetition::REPEATED),
+                       schema::Int32("medium_repeat", Repetition::REPEATED),
+                       schema::Int32("large_repeat", Repetition::REPEATED)}));
+
+  // The batch_size is large enough so each WriteBatch call checks page size 
at most once.
+  constexpr int64_t batch_size = std::numeric_limits<int64_t>::max();
+  auto properties = WriterProperties::Builder()
+                        .disable_dictionary()
+                        ->data_page_version(ParquetDataPageVersion::V2)
+                        ->write_batch_size(batch_size)
+                        ->data_pagesize(1) /* every page size check creates a 
new page */
+                        ->build();
+  auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+  auto rg_writer = file_writer->AppendRowGroup();
+
+  constexpr int32_t num_cols = 4;
+  constexpr int64_t num_rows = 400;
+  constexpr int64_t num_levels = 100;
+  constexpr std::array<int64_t, num_cols> num_levels_per_row_by_col = {1, 50, 
99, 150};
+
+  // All values are not null and fixed to 1024 for simplicity.
+  const std::vector<int32_t> values(num_levels, 1024);
+  const std::vector<int16_t> def_levels(num_levels, 1);
+  std::vector<int16_t> rep_levels(num_levels, 0);
+
+  for (int32_t i = 0; i < num_cols; ++i) {
+    auto writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+    const auto num_levels_per_row = num_levels_per_row_by_col[i];
+    int64_t num_rows_written = 0;
+    int64_t num_levels_written_curr_row = 0;
+    while (num_rows_written < num_rows) {
+      int32_t num_levels_to_write = 0;
+      while (num_levels_to_write < num_levels) {
+        if (num_levels_written_curr_row == 0) {
+          // A new record.
+          rep_levels[num_levels_to_write++] = 0;
+        } else {
+          rep_levels[num_levels_to_write++] = 1;
+        }
+
+        if (++num_levels_written_curr_row == num_levels_per_row) {
+          // Current row has enough levels.
+          num_levels_written_curr_row = 0;
+          if (++num_rows_written == num_rows) {
+            // Enough rows have been written.
+            break;
+          }
+        }
+      }
+
+      writer->WriteBatch(num_levels_to_write, def_levels.data(), 
rep_levels.data(),
+                         values.data());
+    }
+  }
+
+  ASSERT_NO_THROW(file_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+  auto file_reader = ParquetFileReader::Open(
+      std::make_shared<::arrow::io::BufferReader>(buffer), 
default_reader_properties());
+  auto metadata = file_reader->metadata();
+  ASSERT_EQ(1, metadata->num_row_groups());
+  auto row_group_reader = file_reader->RowGroup(0);
+
+  // Check if pages are changed on record boundaries.
+  const std::array<int64_t, num_cols> expect_num_pages_by_col = {5, 201, 397, 
201};
+  const std::array<int64_t, num_cols> expect_num_rows_1st_page_by_col = {99, 
1, 1, 1};
+  const std::array<int64_t, num_cols> expect_num_vals_1st_page_by_col = {99, 
50, 99, 150};
+  for (int32_t i = 0; i < num_cols; ++i) {
+    auto page_reader = row_group_reader->GetColumnPageReader(i);
+    int64_t num_rows_read = 0;
+    int64_t num_pages_read = 0;
+    int64_t num_values_read = 0;
+    std::shared_ptr<Page> page;
+    while ((page = page_reader->NextPage()) != nullptr) {
+      auto data_page = std::static_pointer_cast<DataPageV2>(page);
+      num_values_read += data_page->num_values();
+      num_rows_read += data_page->num_rows();
+      if (num_pages_read++ == 0) {
+        EXPECT_EQ(expect_num_rows_1st_page_by_col[i], data_page->num_rows());
+        EXPECT_EQ(expect_num_vals_1st_page_by_col[i], data_page->num_values());
+      }
+    }
+    EXPECT_EQ(num_rows, num_rows_read);
+    EXPECT_EQ(expect_num_pages_by_col[i], num_pages_read);
+    EXPECT_EQ(num_levels_per_row_by_col[i] * num_rows, num_values_read);
+  }
+}
+
 }  // namespace test
 }  // namespace parquet

Reply via email to