emkornfield commented on a change in pull request #8219: URL: https://github.com/apache/arrow/pull/8219#discussion_r492465434
########## File path: cpp/src/parquet/arrow/path_internal.cc ########## @@ -871,6 +877,8 @@ class MultipathLevelBuilderImpl : public MultipathLevelBuilder { std::move(write_leaf_callback)); } + bool Nested() const override { return !data_->child_data.empty(); } Review comment: done. ########## File path: cpp/src/parquet/arrow/writer.cc ########## @@ -134,15 +134,14 @@ class ArrowColumnWriterV2 { std::shared_ptr<Array> values_array = result.leaf_array->Slice(range.start, range.Size()); - return column_writer->WriteArrow(result.def_levels, result.rep_levels, - result.def_rep_level_count, *values_array, - ctx); + PARQUET_CATCH_AND_RETURN(column_writer->WriteArrow( + result.def_levels, result.rep_levels, result.def_rep_level_count, + *values_array, ctx, level_builder->Nested(), result.leaf_is_nullable)); Review comment: done. I suppose it is too late to revisit this? Perhaps provide status/result returning methods in one PR and then deprecated exception throwing ones? ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1130,37 +1185,60 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< return values_to_write; } + void MaybeCalculateValidityBits(const int16_t* def_levels, int64_t batch_size, + int64_t* out_values_to_write, + int64_t* out_spaced_values_to_write, + int64_t* null_count) { + if (bits_buffer_ == nullptr) { + if (!level_info_.HasNullableValues()) { + *out_values_to_write = batch_size; + *out_spaced_values_to_write = batch_size; + *null_count = 0; + } else { + for (int x = 0; x < batch_size; x++) { + *out_values_to_write += def_levels[x] == level_info_.def_level ? 1 : 0; + *out_spaced_values_to_write += + def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0; + } + *null_count = *out_values_to_write - *out_spaced_values_to_write; + } + return; + } + // Shrink to fit possible causes another allocation, and would only be necessary + // on the last batch. + int64_t new_bitmap_size = BitUtil::BytesForBits(batch_size); + if (new_bitmap_size != bits_buffer_->size()) { + PARQUET_THROW_NOT_OK( + bits_buffer_->Resize(new_bitmap_size, /*shrink_to_fit=*/false)); + bits_buffer_->ZeroPadding(); + } + internal::ValidityBitmapInputOutput io; + io.valid_bits = bits_buffer_->mutable_data(); + io.values_read_upper_bound = batch_size; + internal::DefLevelsToBitmap(def_levels, batch_size, level_info_, &io); + *out_values_to_write = io.values_read - io.null_count; + *out_spaced_values_to_write = io.values_read; + *null_count = io.null_count; + } + + std::shared_ptr<Array> MaybeUpdateArray(std::shared_ptr<Array> array, Review comment: done. ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1130,37 +1185,60 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< return values_to_write; } + void MaybeCalculateValidityBits(const int16_t* def_levels, int64_t batch_size, + int64_t* out_values_to_write, + int64_t* out_spaced_values_to_write, + int64_t* null_count) { + if (bits_buffer_ == nullptr) { + if (!level_info_.HasNullableValues()) { + *out_values_to_write = batch_size; + *out_spaced_values_to_write = batch_size; + *null_count = 0; + } else { + for (int x = 0; x < batch_size; x++) { + *out_values_to_write += def_levels[x] == level_info_.def_level ? 1 : 0; + *out_spaced_values_to_write += + def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0; + } + *null_count = *out_values_to_write - *out_spaced_values_to_write; + } + return; + } + // Shrink to fit possible causes another allocation, and would only be necessary + // on the last batch. + int64_t new_bitmap_size = BitUtil::BytesForBits(batch_size); + if (new_bitmap_size != bits_buffer_->size()) { + PARQUET_THROW_NOT_OK( + bits_buffer_->Resize(new_bitmap_size, /*shrink_to_fit=*/false)); + bits_buffer_->ZeroPadding(); + } + internal::ValidityBitmapInputOutput io; + io.valid_bits = bits_buffer_->mutable_data(); + io.values_read_upper_bound = batch_size; + internal::DefLevelsToBitmap(def_levels, batch_size, level_info_, &io); + *out_values_to_write = io.values_read - io.null_count; + *out_spaced_values_to_write = io.values_read; + *null_count = io.null_count; + } + + std::shared_ptr<Array> MaybeUpdateArray(std::shared_ptr<Array> array, + int64_t new_null_count) { + if (bits_buffer_ == nullptr) { + return array; + } + std::vector<std::shared_ptr<Buffer>> buffers = array->data()->buffers; + buffers[0] = bits_buffer_; + DCHECK(array->num_fields() == 0); + return arrow::MakeArray(std::make_shared<ArrayData>( + array->type(), array->length(), std::move(buffers), new_null_count)); Review comment: Agree, looks like:https://issues.apache.org/jira/browse/ARROW-7071 might be it? ########## File path: cpp/src/parquet/column_writer.cc ########## @@ -1009,11 +1046,29 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx) override { + ArrowWriteContext* ctx, bool nested, bool array_nullable) override { + bool leaf_is_not_nullable = !level_info_.HasNullableValues(); + // Leaf nulls are canonical when there is only a single null element and it is at the + // leaf. + bool leaf_nulls_are_canonical = + (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) && + array_nullable; + bool maybe_has_nulls = nested && !(leaf_is_not_nullable || leaf_nulls_are_canonical); Review comment: yeah, I renamed maybe_has_nulls to maybe_has_parent_nulls which is hopefully clearer? Happy to pick another name that makes sense. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org