emkornfield commented on a change in pull request #8219: URL: https://github.com/apache/arrow/pull/8219#discussion_r492461706
########## File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc ########## @@ -2360,6 +2361,49 @@ TEST(ArrowReadWrite, SingleColumnNullableStruct) { 3); } +TEST(ArrowReadWrite, DisagreeingValidityBitmap) {} + +TEST(ArrowReadWrite, NestedRequiredField) { + auto int_field = ::arrow::field("int_array", ::arrow::int32(), /*nullable=*/false); + auto int_array = ::arrow::ArrayFromJSON(int_field->type(), "[0, 1, 2, 3, 4, 5, 7, 8]"); + auto struct_field = + ::arrow::field("root", ::arrow::struct_({int_field}), /*nullable=*/true); + std::shared_ptr<Buffer> validity_bitmap; + ASSERT_OK_AND_ASSIGN(validity_bitmap, ::arrow::AllocateBitmap(8)); + validity_bitmap->mutable_data()[0] = 0xCC; + + auto struct_data = std::make_shared<ArrayData>( + struct_field->type(), /*length=*/8, + std::vector<std::shared_ptr<Buffer>>{validity_bitmap}, + std::vector<std::shared_ptr<ArrayData>>{int_array->data()}); + CheckSimpleRoundtrip( + ::arrow::Table::Make( + ::arrow::schema({struct_field}), + {std::make_shared<::arrow::ChunkedArray>(::arrow::MakeArray(struct_data))}), + /*row_group_size=*/8); +} + +TEST(ArrowReadWrite, NestedNullableField) { + auto int_field = ::arrow::field("int_array", ::arrow::int32()); + auto int_array = ::arrow::ArrayFromJSON(int_field->type(), "[0, null, 2, null, 4, 5, null, 8]"); + auto struct_field = + ::arrow::field("root", ::arrow::struct_({int_field}), /*nullable=*/true); + std::shared_ptr<Buffer> validity_bitmap; + ASSERT_OK_AND_ASSIGN(validity_bitmap, ::arrow::AllocateBitmap(8)); + validity_bitmap->mutable_data()[0] = 0xCC; + + auto struct_data = std::make_shared<ArrayData>( + struct_field->type(), /*length=*/8, + std::vector<std::shared_ptr<Buffer>>{validity_bitmap}, + std::vector<std::shared_ptr<ArrayData>>{int_array->data()}); Review comment: thanks, I somehow keep forgetting this. ########## File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc ########## @@ -2360,6 +2361,49 @@ TEST(ArrowReadWrite, SingleColumnNullableStruct) { 3); } +TEST(ArrowReadWrite, DisagreeingValidityBitmap) {} + +TEST(ArrowReadWrite, NestedRequiredField) { + auto int_field = ::arrow::field("int_array", ::arrow::int32(), /*nullable=*/false); + auto int_array = ::arrow::ArrayFromJSON(int_field->type(), "[0, 1, 2, 3, 4, 5, 7, 8]"); + auto struct_field = + ::arrow::field("root", ::arrow::struct_({int_field}), /*nullable=*/true); + std::shared_ptr<Buffer> validity_bitmap; + ASSERT_OK_AND_ASSIGN(validity_bitmap, ::arrow::AllocateBitmap(8)); + validity_bitmap->mutable_data()[0] = 0xCC; + + auto struct_data = std::make_shared<ArrayData>( + struct_field->type(), /*length=*/8, + std::vector<std::shared_ptr<Buffer>>{validity_bitmap}, + std::vector<std::shared_ptr<ArrayData>>{int_array->data()}); + CheckSimpleRoundtrip( + ::arrow::Table::Make( + ::arrow::schema({struct_field}), + {std::make_shared<::arrow::ChunkedArray>(::arrow::MakeArray(struct_data))}), + /*row_group_size=*/8); Review comment: it turns out this could be simplified as well, so I don't think a helper function is necessary. ########## File path: cpp/src/parquet/arrow/path_internal.cc ########## @@ -871,6 +877,8 @@ class MultipathLevelBuilderImpl : public MultipathLevelBuilder { std::move(write_leaf_callback)); } + bool Nested() const override { return !data_->child_data.empty(); } Review comment: done. ########## File path: cpp/src/parquet/arrow/writer.cc ########## @@ -134,15 +134,14 @@ class ArrowColumnWriterV2 { std::shared_ptr<Array> values_array = result.leaf_array->Slice(range.start, range.Size()); - return column_writer->WriteArrow(result.def_levels, result.rep_levels, - result.def_rep_level_count, *values_array, - ctx); + PARQUET_CATCH_AND_RETURN(column_writer->WriteArrow( + result.def_levels, result.rep_levels, result.def_rep_level_count, + *values_array, ctx, level_builder->Nested(), result.leaf_is_nullable)); Review comment: done. I suppose it is too late to revisit this? Perhaps provide status/result returning methods in one PR and then deprecated exception throwing ones? ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1130,37 +1185,60 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< return values_to_write; } + void MaybeCalculateValidityBits(const int16_t* def_levels, int64_t batch_size, + int64_t* out_values_to_write, + int64_t* out_spaced_values_to_write, + int64_t* null_count) { + if (bits_buffer_ == nullptr) { + if (!level_info_.HasNullableValues()) { + *out_values_to_write = batch_size; + *out_spaced_values_to_write = batch_size; + *null_count = 0; + } else { + for (int x = 0; x < batch_size; x++) { + *out_values_to_write += def_levels[x] == level_info_.def_level ? 1 : 0; + *out_spaced_values_to_write += + def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0; + } + *null_count = *out_values_to_write - *out_spaced_values_to_write; + } + return; + } + // Shrink to fit possible causes another allocation, and would only be necessary + // on the last batch. + int64_t new_bitmap_size = BitUtil::BytesForBits(batch_size); + if (new_bitmap_size != bits_buffer_->size()) { + PARQUET_THROW_NOT_OK( + bits_buffer_->Resize(new_bitmap_size, /*shrink_to_fit=*/false)); + bits_buffer_->ZeroPadding(); + } + internal::ValidityBitmapInputOutput io; + io.valid_bits = bits_buffer_->mutable_data(); + io.values_read_upper_bound = batch_size; + internal::DefLevelsToBitmap(def_levels, batch_size, level_info_, &io); + *out_values_to_write = io.values_read - io.null_count; + *out_spaced_values_to_write = io.values_read; + *null_count = io.null_count; + } + + std::shared_ptr<Array> MaybeUpdateArray(std::shared_ptr<Array> array, Review comment: done. ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1130,37 +1185,60 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< return values_to_write; } + void MaybeCalculateValidityBits(const int16_t* def_levels, int64_t batch_size, + int64_t* out_values_to_write, + int64_t* out_spaced_values_to_write, + int64_t* null_count) { + if (bits_buffer_ == nullptr) { + if (!level_info_.HasNullableValues()) { + *out_values_to_write = batch_size; + *out_spaced_values_to_write = batch_size; + *null_count = 0; + } else { + for (int x = 0; x < batch_size; x++) { + *out_values_to_write += def_levels[x] == level_info_.def_level ? 1 : 0; + *out_spaced_values_to_write += + def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0; + } + *null_count = *out_values_to_write - *out_spaced_values_to_write; + } + return; + } + // Shrink to fit possible causes another allocation, and would only be necessary + // on the last batch. + int64_t new_bitmap_size = BitUtil::BytesForBits(batch_size); + if (new_bitmap_size != bits_buffer_->size()) { + PARQUET_THROW_NOT_OK( + bits_buffer_->Resize(new_bitmap_size, /*shrink_to_fit=*/false)); + bits_buffer_->ZeroPadding(); + } + internal::ValidityBitmapInputOutput io; + io.valid_bits = bits_buffer_->mutable_data(); + io.values_read_upper_bound = batch_size; + internal::DefLevelsToBitmap(def_levels, batch_size, level_info_, &io); + *out_values_to_write = io.values_read - io.null_count; + *out_spaced_values_to_write = io.values_read; + *null_count = io.null_count; + } + + std::shared_ptr<Array> MaybeUpdateArray(std::shared_ptr<Array> array, + int64_t new_null_count) { + if (bits_buffer_ == nullptr) { + return array; + } + std::vector<std::shared_ptr<Buffer>> buffers = array->data()->buffers; + buffers[0] = bits_buffer_; + DCHECK(array->num_fields() == 0); + return arrow::MakeArray(std::make_shared<ArrayData>( + array->type(), array->length(), std::move(buffers), new_null_count)); Review comment: Agree, looks like:https://issues.apache.org/jira/browse/ARROW-7071 might be it? ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1009,11 +1046,29 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx) override { + ArrowWriteContext* ctx, bool nested, bool array_nullable) override { + bool leaf_is_not_nullable = !level_info_.HasNullableValues(); + // Leaf nulls are canonical when there is only a single null element and it is at the + // leaf. + bool leaf_nulls_are_canonical = + (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) && + array_nullable; + bool maybe_has_nulls = nested && !(leaf_is_not_nullable || leaf_nulls_are_canonical); Review comment: yeah, I renamed maybe_has_nulls to maybe_has_parent_nulls which is hopefully clearer? Happy to pick another name that makes sense. ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx) override { + ArrowWriteContext* ctx, bool nested, bool array_nullable) override { + BEGIN_PARQUET_CATCH_EXCEPTIONS + bool leaf_is_not_nullable = !level_info_.HasNullableValues(); + // Leaf nulls are canonical when there is only a single null element and it is at the + // leaf. + bool leaf_nulls_are_canonical = + (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) && + array_nullable; + bool maybe_parent_nulls = + nested && !(leaf_is_not_nullable || leaf_nulls_are_canonical); Review comment: it is not. ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx) override { + ArrowWriteContext* ctx, bool nested, bool array_nullable) override { + BEGIN_PARQUET_CATCH_EXCEPTIONS + bool leaf_is_not_nullable = !level_info_.HasNullableValues(); + // Leaf nulls are canonical when there is only a single null element and it is at the + // leaf. + bool leaf_nulls_are_canonical = + (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) && + array_nullable; + bool maybe_parent_nulls = + nested && !(leaf_is_not_nullable || leaf_nulls_are_canonical); + if (maybe_parent_nulls) { + ARROW_ASSIGN_OR_RAISE( + bits_buffer_, + arrow::AllocateResizableBuffer( + BitUtil::BytesForBits(properties_->write_batch_size()), ctx->memory_pool)); + bits_buffer_->ZeroPadding(); + std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); Review comment: this line should be removed. but above, yes, we do allocate a new buffer for each WriteArrow call. I think the lifecycle of this object might only be used for one WriteArrow call. internally there is a concept of batching, and the allocation should only happen once for here for each of those batches. ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1130,37 +1188,61 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< return values_to_write; } + void MaybeCalculateValidityBits(const int16_t* def_levels, int64_t batch_size, + int64_t* out_values_to_write, + int64_t* out_spaced_values_to_write, + int64_t* null_count) { + if (bits_buffer_ == nullptr) { + if (!level_info_.HasNullableValues()) { + *out_values_to_write = batch_size; + *out_spaced_values_to_write = batch_size; + *null_count = 0; + } else { + for (int x = 0; x < batch_size; x++) { + *out_values_to_write += def_levels[x] == level_info_.def_level ? 1 : 0; + *out_spaced_values_to_write += + def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0; + } + *null_count = *out_values_to_write - *out_spaced_values_to_write; + } + return; + } + // Shrink to fit possible causes another allocation, and would only be necessary + // on the last batch. + int64_t new_bitmap_size = BitUtil::BytesForBits(batch_size); + if (new_bitmap_size != bits_buffer_->size()) { + PARQUET_THROW_NOT_OK( + bits_buffer_->Resize(new_bitmap_size, /*shrink_to_fit=*/false)); + bits_buffer_->ZeroPadding(); + } + internal::ValidityBitmapInputOutput io; + io.valid_bits = bits_buffer_->mutable_data(); + io.values_read_upper_bound = batch_size; + internal::DefLevelsToBitmap(def_levels, batch_size, level_info_, &io); Review comment: you understand correctly. I didn't want to push def/rep levels further down the stack but that is a possibility. ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx) override { + ArrowWriteContext* ctx, bool nested, bool array_nullable) override { + BEGIN_PARQUET_CATCH_EXCEPTIONS + bool leaf_is_not_nullable = !level_info_.HasNullableValues(); + // Leaf nulls are canonical when there is only a single null element and it is at the + // leaf. + bool leaf_nulls_are_canonical = + (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) && + array_nullable; Review comment: it is the leaf, will do some renaming to make this clearer. ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx) override { + ArrowWriteContext* ctx, bool nested, bool array_nullable) override { + BEGIN_PARQUET_CATCH_EXCEPTIONS + bool leaf_is_not_nullable = !level_info_.HasNullableValues(); Review comment: will rename. ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx) override { + ArrowWriteContext* ctx, bool nested, bool array_nullable) override { + BEGIN_PARQUET_CATCH_EXCEPTIONS + bool leaf_is_not_nullable = !level_info_.HasNullableValues(); + // Leaf nulls are canonical when there is only a single null element and it is at the Review comment: will do. ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx) override { + ArrowWriteContext* ctx, bool nested, bool array_nullable) override { + BEGIN_PARQUET_CATCH_EXCEPTIONS + bool leaf_is_not_nullable = !level_info_.HasNullableValues(); + // Leaf nulls are canonical when there is only a single null element and it is at the Review comment: done. ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx) override { + ArrowWriteContext* ctx, bool nested, bool array_nullable) override { + BEGIN_PARQUET_CATCH_EXCEPTIONS + bool leaf_is_not_nullable = !level_info_.HasNullableValues(); + // Leaf nulls are canonical when there is only a single null element and it is at the + // leaf. + bool leaf_nulls_are_canonical = + (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) && + array_nullable; + bool maybe_parent_nulls = + nested && !(leaf_is_not_nullable || leaf_nulls_are_canonical); Review comment: nested is actually unncessary. i've removed it. The only thing that matters is if the column is nullable according to columninfo and it isn't the only nullable column. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org