This is an automated email from the ASF dual-hosted git repository.
wjones127 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new b888f4d6c7 GH-34142: [C++][Parquet] Fix record not to span multiple
pages (#34193)
b888f4d6c7 is described below
commit b888f4d6c7dc490ce17b9f64d32af23ffc6f4617
Author: Gang Wu <[email protected]>
AuthorDate: Fri Feb 24 06:09:37 2023 +0800
GH-34142: [C++][Parquet] Fix record not to span multiple pages (#34193)
### Rationale for this change
Parquet data page v2 requires pages change on record boundaries. Currently
the parquet-cpp writer does not enforce this.
### What changes are included in this PR?
Change `ColumnWriter` to split data page on record boundaries when data
page v2 is applied.
### Are these changes tested?
Add test `TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundaries)`
### Are there any user-facing changes?
No.
* Closes: #34142
Authored-by: Gang Wu <[email protected]>
Signed-off-by: Will Jones <[email protected]>
---
cpp/src/parquet/column_writer.cc | 94 +++++++++++++++----
cpp/src/parquet/column_writer_test.cc | 170 ++++++++++++++++++++++++++++++++++
2 files changed, 246 insertions(+), 18 deletions(-)
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 9f4c0b6900..4a5a6819b3 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -1015,11 +1015,59 @@ template <typename Action>
inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
int64_t num_batches = static_cast<int>(total / batch_size);
for (int round = 0; round < num_batches; round++) {
- action(round * batch_size, batch_size);
+ action(round * batch_size, batch_size, /*check_page_size=*/true);
}
// Write the remaining values
if (total % batch_size > 0) {
- action(num_batches * batch_size, total % batch_size);
+ action(num_batches * batch_size, total % batch_size,
/*check_page_size=*/true);
+ }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+ int64_t num_levels, int64_t batch_size, Action&&
action,
+ bool pages_change_on_record_boundaries) {
+ if (!pages_change_on_record_boundaries || !rep_levels) {
+ // If rep_levels is null, then we are writing a non-repeated column.
+ // In this case, every record contains only one level.
+ return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+ }
+
+ int64_t offset = 0;
+ while (offset < num_levels) {
+ int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+ // Find next record boundary (i.e. ref_level = 0)
+ while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+ end_offset++;
+ }
+
+ if (end_offset < num_levels) {
+ // This is not the last chunk of batch and end_offset is a record
boundary.
+ // It is a good chance to check the page size.
+ action(offset, end_offset - offset, /*check_page_size=*/true);
+ } else {
+ DCHECK_EQ(end_offset, num_levels);
+ // This is the last chunk of batch, and we do not know whether
end_offset is a
+ // record boundary. Find the offset to beginning of last record in this
chunk,
+ // so we can check page size.
+ int64_t last_record_begin_offset = num_levels - 1;
+ while (last_record_begin_offset >= offset &&
+ rep_levels[last_record_begin_offset] != 0) {
+ last_record_begin_offset--;
+ }
+
+ if (offset < last_record_begin_offset) {
+ // We have found the beginning of last record and can check page size.
+ action(offset, last_record_begin_offset - offset,
/*check_page_size=*/true);
+ offset = last_record_begin_offset;
+ }
+
+ // There is no record boundary in this chunk and cannot check page size.
+ action(offset, end_offset - offset, /*check_page_size=*/false);
+ }
+
+ offset = end_offset;
}
}
@@ -1083,7 +1131,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
// pagesize limit
int64_t value_offset = 0;
- auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
+ auto WriteChunk = [&](int64_t offset, int64_t batch_size, bool check_page)
{
int64_t values_to_write = WriteLevels(batch_size,
AddIfNotNull(def_levels, offset),
AddIfNotNull(rep_levels, offset));
@@ -1093,14 +1141,15 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
}
WriteValues(AddIfNotNull(values, value_offset), values_to_write,
batch_size - values_to_write);
- CommitWriteAndCheckPageLimit(batch_size, values_to_write);
+ CommitWriteAndCheckPageLimit(batch_size, values_to_write, check_page);
value_offset += values_to_write;
// Dictionary size checked separately from data page size since we
// circumvent this check when writing ::arrow::DictionaryArray directly
CheckDictionarySizeLimit();
};
- DoInBatches(num_values, properties_->write_batch_size(), WriteChunk);
+ DoInBatches(def_levels, rep_levels, num_values,
properties_->write_batch_size(),
+ WriteChunk, pages_change_on_record_boundaries());
return value_offset;
}
@@ -1109,7 +1158,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
int64_t valid_bits_offset, const T* values) override {
// Like WriteBatch, but for spaced values
int64_t value_offset = 0;
- auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
+ auto WriteChunk = [&](int64_t offset, int64_t batch_size, bool check_page)
{
int64_t batch_num_values = 0;
int64_t batch_num_spaced_values = 0;
int64_t null_count;
@@ -1128,14 +1177,15 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
batch_num_spaced_values, valid_bits,
valid_bits_offset + value_offset,
/*num_levels=*/batch_size);
}
- CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values);
+ CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values,
check_page);
value_offset += batch_num_spaced_values;
// Dictionary size checked separately from data page size since we
// circumvent this check when writing ::arrow::DictionaryArray directly
CheckDictionarySizeLimit();
};
- DoInBatches(num_values, properties_->write_batch_size(), WriteChunk);
+ DoInBatches(def_levels, rep_levels, num_values,
properties_->write_batch_size(),
+ WriteChunk, pages_change_on_record_boundaries());
}
Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
@@ -1228,6 +1278,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
const WriterProperties* properties() override { return properties_; }
+ bool pages_change_on_record_boundaries() const {
+ return properties_->data_page_version() == ParquetDataPageVersion::V2;
+ }
+
private:
using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
using TypedStats = TypedStatistics<DType>;
@@ -1374,11 +1428,13 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
}
}
- void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values) {
+ void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
+ bool check_page_size) {
num_buffered_values_ += num_levels;
num_buffered_encoded_values_ += num_values;
- if (current_encoder_->EstimatedDataEncodedSize() >=
properties_->data_pagesize()) {
+ if (check_page_size &&
+ current_encoder_->EstimatedDataEncodedSize() >=
properties_->data_pagesize()) {
AddDataPage();
}
}
@@ -1518,7 +1574,7 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
};
int64_t value_offset = 0;
- auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size) {
+ auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size, bool
check_page) {
int64_t batch_num_values = 0;
int64_t batch_num_spaced_values = 0;
int64_t null_count = ::arrow::kUnknownNullCount;
@@ -1538,7 +1594,7 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
writeable_indices,
MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool));
dict_encoder->PutIndices(*writeable_indices);
- CommitWriteAndCheckPageLimit(batch_size, batch_num_values);
+ CommitWriteAndCheckPageLimit(batch_size, batch_num_values, check_page);
value_offset += batch_num_spaced_values;
};
@@ -1562,8 +1618,9 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
return WriteDense();
}
- PARQUET_CATCH_NOT_OK(
- DoInBatches(num_levels, properties_->write_batch_size(),
WriteIndicesChunk));
+ PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels,
+ properties_->write_batch_size(),
WriteIndicesChunk,
+ pages_change_on_record_boundaries()));
return Status::OK();
}
@@ -1995,7 +2052,7 @@ Status
TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
}
int64_t value_offset = 0;
- auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
+ auto WriteChunk = [&](int64_t offset, int64_t batch_size, bool check_page) {
int64_t batch_num_values = 0;
int64_t batch_num_spaced_values = 0;
int64_t null_count = 0;
@@ -2017,13 +2074,14 @@ Status
TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
page_statistics_->IncrementNullCount(batch_size - non_null);
page_statistics_->IncrementNumValues(non_null);
}
- CommitWriteAndCheckPageLimit(batch_size, batch_num_values);
+ CommitWriteAndCheckPageLimit(batch_size, batch_num_values, check_page);
CheckDictionarySizeLimit();
value_offset += batch_num_spaced_values;
};
- PARQUET_CATCH_NOT_OK(
- DoInBatches(num_levels, properties_->write_batch_size(), WriteChunk));
+ PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels,
+ properties_->write_batch_size(), WriteChunk,
+ pages_change_on_record_boundaries()));
return Status::OK();
}
diff --git a/cpp/src/parquet/column_writer_test.cc
b/cpp/src/parquet/column_writer_test.cc
index fe326caf1a..f97108153b 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -1148,5 +1148,175 @@ TEST(TestColumnWriter, WriteDataPageV2Header) {
}
}
+// The test below checks that data page v2 changes on record boundaries for
+// all repetition types (i.e. required, optional, and repeated)
+TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundaries) {
+ auto sink = CreateOutputStream();
+ auto schema = std::static_pointer_cast<GroupNode>(
+ GroupNode::Make("schema", Repetition::REQUIRED,
+ {schema::Int32("required", Repetition::REQUIRED),
+ schema::Int32("optional", Repetition::OPTIONAL),
+ schema::Int32("repeated", Repetition::REPEATED)}));
+ // Write at most 11 levels per batch.
+ constexpr int64_t batch_size = 11;
+ auto properties = WriterProperties::Builder()
+ .disable_dictionary()
+ ->data_page_version(ParquetDataPageVersion::V2)
+ ->write_batch_size(batch_size)
+ ->data_pagesize(1) /* every page size check creates a
new page */
+ ->build();
+ auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+ auto rg_writer = file_writer->AppendRowGroup();
+
+ constexpr int32_t num_levels = 100;
+ const std::vector<int32_t> values(num_levels, 1024);
+ std::array<int16_t, num_levels> def_levels;
+ std::array<int16_t, num_levels> rep_levels;
+ for (int32_t i = 0; i < num_levels; i++) {
+ def_levels[i] = i % 2 == 0 ? 1 : 0;
+ rep_levels[i] = i % 2 == 0 ? 0 : 1;
+ }
+
+ auto required_writer =
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+ required_writer->WriteBatch(num_levels, nullptr, nullptr, values.data());
+
+ // Write a null value at every other row.
+ auto optional_writer =
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+ optional_writer->WriteBatch(num_levels, def_levels.data(), nullptr,
values.data());
+
+ // Each row has repeated twice.
+ auto repeated_writer =
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+ repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(),
+ values.data());
+ repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(),
+ values.data());
+
+ ASSERT_NO_THROW(file_writer->Close());
+ ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+ auto file_reader = ParquetFileReader::Open(
+ std::make_shared<::arrow::io::BufferReader>(buffer),
default_reader_properties());
+ auto metadata = file_reader->metadata();
+ ASSERT_EQ(1, metadata->num_row_groups());
+ auto row_group_reader = file_reader->RowGroup(0);
+
+ // Check if pages are changed on record boundaries.
+ constexpr int num_columns = 3;
+ const std::array<int64_t, num_columns> expected_num_pages = {10, 10, 19};
+ for (int i = 0; i < num_columns; ++i) {
+ auto page_reader = row_group_reader->GetColumnPageReader(i);
+ int64_t num_rows = 0;
+ int64_t num_pages = 0;
+ std::shared_ptr<Page> page;
+ while ((page = page_reader->NextPage()) != nullptr) {
+ auto data_page = std::static_pointer_cast<DataPageV2>(page);
+ if (i < 2) {
+ EXPECT_EQ(data_page->num_values(), data_page->num_rows());
+ } else {
+ // Make sure repeated column has 2 values per row and not span
multiple pages.
+ EXPECT_EQ(data_page->num_values(), 2 * data_page->num_rows());
+ }
+ num_rows += data_page->num_rows();
+ num_pages++;
+ }
+ EXPECT_EQ(num_levels, num_rows);
+ EXPECT_EQ(expected_num_pages[i], num_pages);
+ }
+}
+
+// The test below checks that data page v2 changes on record boundaries for
+// repeated columns with small batches.
+TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundariesWithSmallBatches)
{
+ auto sink = CreateOutputStream();
+ auto schema = std::static_pointer_cast<GroupNode>(
+ GroupNode::Make("schema", Repetition::REQUIRED,
+ {schema::Int32("tiny_repeat", Repetition::REPEATED),
+ schema::Int32("small_repeat", Repetition::REPEATED),
+ schema::Int32("medium_repeat", Repetition::REPEATED),
+ schema::Int32("large_repeat", Repetition::REPEATED)}));
+
+ // The batch_size is large enough so each WriteBatch call checks page size
at most once.
+ constexpr int64_t batch_size = std::numeric_limits<int64_t>::max();
+ auto properties = WriterProperties::Builder()
+ .disable_dictionary()
+ ->data_page_version(ParquetDataPageVersion::V2)
+ ->write_batch_size(batch_size)
+ ->data_pagesize(1) /* every page size check creates a
new page */
+ ->build();
+ auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+ auto rg_writer = file_writer->AppendRowGroup();
+
+ constexpr int32_t num_cols = 4;
+ constexpr int64_t num_rows = 400;
+ constexpr int64_t num_levels = 100;
+ constexpr std::array<int64_t, num_cols> num_levels_per_row_by_col = {1, 50,
99, 150};
+
+ // All values are not null and fixed to 1024 for simplicity.
+ const std::vector<int32_t> values(num_levels, 1024);
+ const std::vector<int16_t> def_levels(num_levels, 1);
+ std::vector<int16_t> rep_levels(num_levels, 0);
+
+ for (int32_t i = 0; i < num_cols; ++i) {
+ auto writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+ const auto num_levels_per_row = num_levels_per_row_by_col[i];
+ int64_t num_rows_written = 0;
+ int64_t num_levels_written_curr_row = 0;
+ while (num_rows_written < num_rows) {
+ int32_t num_levels_to_write = 0;
+ while (num_levels_to_write < num_levels) {
+ if (num_levels_written_curr_row == 0) {
+ // A new record.
+ rep_levels[num_levels_to_write++] = 0;
+ } else {
+ rep_levels[num_levels_to_write++] = 1;
+ }
+
+ if (++num_levels_written_curr_row == num_levels_per_row) {
+ // Current row has enough levels.
+ num_levels_written_curr_row = 0;
+ if (++num_rows_written == num_rows) {
+ // Enough rows have been written.
+ break;
+ }
+ }
+ }
+
+ writer->WriteBatch(num_levels_to_write, def_levels.data(),
rep_levels.data(),
+ values.data());
+ }
+ }
+
+ ASSERT_NO_THROW(file_writer->Close());
+ ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+ auto file_reader = ParquetFileReader::Open(
+ std::make_shared<::arrow::io::BufferReader>(buffer),
default_reader_properties());
+ auto metadata = file_reader->metadata();
+ ASSERT_EQ(1, metadata->num_row_groups());
+ auto row_group_reader = file_reader->RowGroup(0);
+
+ // Check if pages are changed on record boundaries.
+ const std::array<int64_t, num_cols> expect_num_pages_by_col = {5, 201, 397,
201};
+ const std::array<int64_t, num_cols> expect_num_rows_1st_page_by_col = {99,
1, 1, 1};
+ const std::array<int64_t, num_cols> expect_num_vals_1st_page_by_col = {99,
50, 99, 150};
+ for (int32_t i = 0; i < num_cols; ++i) {
+ auto page_reader = row_group_reader->GetColumnPageReader(i);
+ int64_t num_rows_read = 0;
+ int64_t num_pages_read = 0;
+ int64_t num_values_read = 0;
+ std::shared_ptr<Page> page;
+ while ((page = page_reader->NextPage()) != nullptr) {
+ auto data_page = std::static_pointer_cast<DataPageV2>(page);
+ num_values_read += data_page->num_values();
+ num_rows_read += data_page->num_rows();
+ if (num_pages_read++ == 0) {
+ EXPECT_EQ(expect_num_rows_1st_page_by_col[i], data_page->num_rows());
+ EXPECT_EQ(expect_num_vals_1st_page_by_col[i], data_page->num_values());
+ }
+ }
+ EXPECT_EQ(num_rows, num_rows_read);
+ EXPECT_EQ(expect_num_pages_by_col[i], num_pages_read);
+ EXPECT_EQ(num_levels_per_row_by_col[i] * num_rows, num_values_read);
+ }
+}
+
} // namespace test
} // namespace parquet