emkornfield commented on a change in pull request #8219:
URL: https://github.com/apache/arrow/pull/8219#discussion_r492461706



##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -2360,6 +2361,49 @@ TEST(ArrowReadWrite, SingleColumnNullableStruct) {
       3);
 }
 
+TEST(ArrowReadWrite, DisagreeingValidityBitmap) {}
+
+TEST(ArrowReadWrite, NestedRequiredField) {
+  auto int_field = ::arrow::field("int_array", ::arrow::int32(), 
/*nullable=*/false);
+  auto int_array = ::arrow::ArrayFromJSON(int_field->type(), "[0, 1, 2, 3, 4, 
5, 7, 8]");
+  auto struct_field =
+      ::arrow::field("root", ::arrow::struct_({int_field}), /*nullable=*/true);
+  std::shared_ptr<Buffer> validity_bitmap;
+  ASSERT_OK_AND_ASSIGN(validity_bitmap, ::arrow::AllocateBitmap(8));
+  validity_bitmap->mutable_data()[0] = 0xCC;
+
+  auto struct_data = std::make_shared<ArrayData>(
+      struct_field->type(), /*length=*/8,
+      std::vector<std::shared_ptr<Buffer>>{validity_bitmap},
+      std::vector<std::shared_ptr<ArrayData>>{int_array->data()});
+  CheckSimpleRoundtrip(
+      ::arrow::Table::Make(
+          ::arrow::schema({struct_field}),
+          
{std::make_shared<::arrow::ChunkedArray>(::arrow::MakeArray(struct_data))}),
+      /*row_group_size=*/8);
+}
+
+TEST(ArrowReadWrite, NestedNullableField) {
+  auto int_field = ::arrow::field("int_array", ::arrow::int32());
+  auto int_array = ::arrow::ArrayFromJSON(int_field->type(), "[0, null, 2, 
null, 4, 5, null, 8]");
+  auto struct_field =
+      ::arrow::field("root", ::arrow::struct_({int_field}), /*nullable=*/true);
+  std::shared_ptr<Buffer> validity_bitmap;
+  ASSERT_OK_AND_ASSIGN(validity_bitmap, ::arrow::AllocateBitmap(8));
+  validity_bitmap->mutable_data()[0] = 0xCC;
+
+  auto struct_data = std::make_shared<ArrayData>(
+      struct_field->type(), /*length=*/8,
+      std::vector<std::shared_ptr<Buffer>>{validity_bitmap},
+      std::vector<std::shared_ptr<ArrayData>>{int_array->data()});

Review comment:
       thanks, I somehow keep forgetting this.

##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -2360,6 +2361,49 @@ TEST(ArrowReadWrite, SingleColumnNullableStruct) {
       3);
 }
 
+TEST(ArrowReadWrite, DisagreeingValidityBitmap) {}
+
+TEST(ArrowReadWrite, NestedRequiredField) {
+  auto int_field = ::arrow::field("int_array", ::arrow::int32(), 
/*nullable=*/false);
+  auto int_array = ::arrow::ArrayFromJSON(int_field->type(), "[0, 1, 2, 3, 4, 
5, 7, 8]");
+  auto struct_field =
+      ::arrow::field("root", ::arrow::struct_({int_field}), /*nullable=*/true);
+  std::shared_ptr<Buffer> validity_bitmap;
+  ASSERT_OK_AND_ASSIGN(validity_bitmap, ::arrow::AllocateBitmap(8));
+  validity_bitmap->mutable_data()[0] = 0xCC;
+
+  auto struct_data = std::make_shared<ArrayData>(
+      struct_field->type(), /*length=*/8,
+      std::vector<std::shared_ptr<Buffer>>{validity_bitmap},
+      std::vector<std::shared_ptr<ArrayData>>{int_array->data()});
+  CheckSimpleRoundtrip(
+      ::arrow::Table::Make(
+          ::arrow::schema({struct_field}),
+          
{std::make_shared<::arrow::ChunkedArray>(::arrow::MakeArray(struct_data))}),
+      /*row_group_size=*/8);

Review comment:
       it turns out this could be simplified as well, so I don't think a helper 
function is necessary.

##########
File path: cpp/src/parquet/arrow/path_internal.cc
##########
@@ -871,6 +877,8 @@ class MultipathLevelBuilderImpl : public 
MultipathLevelBuilder {
                      std::move(write_leaf_callback));
   }
 
+  bool Nested() const override { return !data_->child_data.empty(); }

Review comment:
       done.

##########
File path: cpp/src/parquet/arrow/writer.cc
##########
@@ -134,15 +134,14 @@ class ArrowColumnWriterV2 {
               std::shared_ptr<Array> values_array =
                   result.leaf_array->Slice(range.start, range.Size());
 
-              return column_writer->WriteArrow(result.def_levels, 
result.rep_levels,
-                                               result.def_rep_level_count, 
*values_array,
-                                               ctx);
+              PARQUET_CATCH_AND_RETURN(column_writer->WriteArrow(
+                  result.def_levels, result.rep_levels, 
result.def_rep_level_count,
+                  *values_array, ctx, level_builder->Nested(), 
result.leaf_is_nullable));

Review comment:
       done.  I suppose it is too late to revisit this?  Perhaps provide 
status/result returning  methods in one PR and then deprecated exception 
throwing ones?

##########
File path: cpp/src/parquet/column_writer.cc
##########
@@ -1130,37 +1185,60 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
     return values_to_write;
   }
 
+  void MaybeCalculateValidityBits(const int16_t* def_levels, int64_t 
batch_size,
+                                  int64_t* out_values_to_write,
+                                  int64_t* out_spaced_values_to_write,
+                                  int64_t* null_count) {
+    if (bits_buffer_ == nullptr) {
+      if (!level_info_.HasNullableValues()) {
+        *out_values_to_write = batch_size;
+        *out_spaced_values_to_write = batch_size;
+        *null_count = 0;
+      } else {
+        for (int x = 0; x < batch_size; x++) {
+          *out_values_to_write += def_levels[x] == level_info_.def_level ? 1 : 
0;
+          *out_spaced_values_to_write +=
+              def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0;
+        }
+        *null_count = *out_values_to_write - *out_spaced_values_to_write;
+      }
+      return;
+    }
+    // Shrink to fit possible causes another allocation, and would only be 
necessary
+    // on the last batch.
+    int64_t new_bitmap_size = BitUtil::BytesForBits(batch_size);
+    if (new_bitmap_size != bits_buffer_->size()) {
+      PARQUET_THROW_NOT_OK(
+          bits_buffer_->Resize(new_bitmap_size, /*shrink_to_fit=*/false));
+      bits_buffer_->ZeroPadding();
+    }
+    internal::ValidityBitmapInputOutput io;
+    io.valid_bits = bits_buffer_->mutable_data();
+    io.values_read_upper_bound = batch_size;
+    internal::DefLevelsToBitmap(def_levels, batch_size, level_info_, &io);
+    *out_values_to_write = io.values_read - io.null_count;
+    *out_spaced_values_to_write = io.values_read;
+    *null_count = io.null_count;
+  }
+
+  std::shared_ptr<Array> MaybeUpdateArray(std::shared_ptr<Array> array,

Review comment:
       done.

##########
File path: cpp/src/parquet/column_writer.cc
##########
@@ -1130,37 +1185,60 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
     return values_to_write;
   }
 
+  void MaybeCalculateValidityBits(const int16_t* def_levels, int64_t 
batch_size,
+                                  int64_t* out_values_to_write,
+                                  int64_t* out_spaced_values_to_write,
+                                  int64_t* null_count) {
+    if (bits_buffer_ == nullptr) {
+      if (!level_info_.HasNullableValues()) {
+        *out_values_to_write = batch_size;
+        *out_spaced_values_to_write = batch_size;
+        *null_count = 0;
+      } else {
+        for (int x = 0; x < batch_size; x++) {
+          *out_values_to_write += def_levels[x] == level_info_.def_level ? 1 : 
0;
+          *out_spaced_values_to_write +=
+              def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0;
+        }
+        *null_count = *out_values_to_write - *out_spaced_values_to_write;
+      }
+      return;
+    }
+    // Shrink to fit possible causes another allocation, and would only be 
necessary
+    // on the last batch.
+    int64_t new_bitmap_size = BitUtil::BytesForBits(batch_size);
+    if (new_bitmap_size != bits_buffer_->size()) {
+      PARQUET_THROW_NOT_OK(
+          bits_buffer_->Resize(new_bitmap_size, /*shrink_to_fit=*/false));
+      bits_buffer_->ZeroPadding();
+    }
+    internal::ValidityBitmapInputOutput io;
+    io.valid_bits = bits_buffer_->mutable_data();
+    io.values_read_upper_bound = batch_size;
+    internal::DefLevelsToBitmap(def_levels, batch_size, level_info_, &io);
+    *out_values_to_write = io.values_read - io.null_count;
+    *out_spaced_values_to_write = io.values_read;
+    *null_count = io.null_count;
+  }
+
+  std::shared_ptr<Array> MaybeUpdateArray(std::shared_ptr<Array> array,
+                                          int64_t new_null_count) {
+    if (bits_buffer_ == nullptr) {
+      return array;
+    }
+    std::vector<std::shared_ptr<Buffer>> buffers = array->data()->buffers;
+    buffers[0] = bits_buffer_;
+    DCHECK(array->num_fields() == 0);
+    return arrow::MakeArray(std::make_shared<ArrayData>(
+        array->type(), array->length(), std::move(buffers), new_null_count));

Review comment:
       Agree, looks like:https://issues.apache.org/jira/browse/ARROW-7071  
might be it?

##########
File path: cpp/src/parquet/column_writer.cc
##########
@@ -1009,11 +1046,29 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
 
   Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
                     int64_t num_levels, const ::arrow::Array& array,
-                    ArrowWriteContext* ctx) override {
+                    ArrowWriteContext* ctx, bool nested, bool array_nullable) 
override {
+    bool leaf_is_not_nullable = !level_info_.HasNullableValues();
+    // Leaf nulls are canonical when there is only a single null element and 
it is at the
+    // leaf.
+    bool leaf_nulls_are_canonical =
+        (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) 
&&
+        array_nullable;
+    bool maybe_has_nulls = nested && !(leaf_is_not_nullable || 
leaf_nulls_are_canonical);

Review comment:
       yeah, I renamed maybe_has_nulls to maybe_has_parent_nulls which is 
hopefully clearer?  Happy to pick another name that makes sense.

##########
File path: cpp/src/parquet/column_writer.cc
##########
@@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
 
   Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
                     int64_t num_levels, const ::arrow::Array& array,
-                    ArrowWriteContext* ctx) override {
+                    ArrowWriteContext* ctx, bool nested, bool array_nullable) 
override {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    bool leaf_is_not_nullable = !level_info_.HasNullableValues();
+    // Leaf nulls are canonical when there is only a single null element and 
it is at the
+    // leaf.
+    bool leaf_nulls_are_canonical =
+        (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) 
&&
+        array_nullable;
+    bool maybe_parent_nulls =
+        nested && !(leaf_is_not_nullable || leaf_nulls_are_canonical);

Review comment:
       it is not.

##########
File path: cpp/src/parquet/column_writer.cc
##########
@@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
 
   Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
                     int64_t num_levels, const ::arrow::Array& array,
-                    ArrowWriteContext* ctx) override {
+                    ArrowWriteContext* ctx, bool nested, bool array_nullable) 
override {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    bool leaf_is_not_nullable = !level_info_.HasNullableValues();
+    // Leaf nulls are canonical when there is only a single null element and 
it is at the
+    // leaf.
+    bool leaf_nulls_are_canonical =
+        (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) 
&&
+        array_nullable;
+    bool maybe_parent_nulls =
+        nested && !(leaf_is_not_nullable || leaf_nulls_are_canonical);
+    if (maybe_parent_nulls) {
+      ARROW_ASSIGN_OR_RAISE(
+          bits_buffer_,
+          arrow::AllocateResizableBuffer(
+              BitUtil::BytesForBits(properties_->write_batch_size()), 
ctx->memory_pool));
+      bits_buffer_->ZeroPadding();
+      std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));

Review comment:
       this line should be removed.  but above, yes, we do allocate a new 
buffer for each WriteArrow call.  I think the lifecycle of this object might 
only be used for one WriteArrow call.  internally there is a concept of 
batching, and the allocation should only happen once for here for each of those 
batches.

##########
File path: cpp/src/parquet/column_writer.cc
##########
@@ -1130,37 +1188,61 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
     return values_to_write;
   }
 
+  void MaybeCalculateValidityBits(const int16_t* def_levels, int64_t 
batch_size,
+                                  int64_t* out_values_to_write,
+                                  int64_t* out_spaced_values_to_write,
+                                  int64_t* null_count) {
+    if (bits_buffer_ == nullptr) {
+      if (!level_info_.HasNullableValues()) {
+        *out_values_to_write = batch_size;
+        *out_spaced_values_to_write = batch_size;
+        *null_count = 0;
+      } else {
+        for (int x = 0; x < batch_size; x++) {
+          *out_values_to_write += def_levels[x] == level_info_.def_level ? 1 : 
0;
+          *out_spaced_values_to_write +=
+              def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0;
+        }
+        *null_count = *out_values_to_write - *out_spaced_values_to_write;
+      }
+      return;
+    }
+    // Shrink to fit possible causes another allocation, and would only be 
necessary
+    // on the last batch.
+    int64_t new_bitmap_size = BitUtil::BytesForBits(batch_size);
+    if (new_bitmap_size != bits_buffer_->size()) {
+      PARQUET_THROW_NOT_OK(
+          bits_buffer_->Resize(new_bitmap_size, /*shrink_to_fit=*/false));
+      bits_buffer_->ZeroPadding();
+    }
+    internal::ValidityBitmapInputOutput io;
+    io.valid_bits = bits_buffer_->mutable_data();
+    io.values_read_upper_bound = batch_size;
+    internal::DefLevelsToBitmap(def_levels, batch_size, level_info_, &io);

Review comment:
       you understand correctly.  I didn't want to push def/rep levels further 
down the stack but that is a possibility.  

##########
File path: cpp/src/parquet/column_writer.cc
##########
@@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
 
   Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
                     int64_t num_levels, const ::arrow::Array& array,
-                    ArrowWriteContext* ctx) override {
+                    ArrowWriteContext* ctx, bool nested, bool array_nullable) 
override {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    bool leaf_is_not_nullable = !level_info_.HasNullableValues();
+    // Leaf nulls are canonical when there is only a single null element and 
it is at the
+    // leaf.
+    bool leaf_nulls_are_canonical =
+        (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) 
&&
+        array_nullable;

Review comment:
       it is the leaf, will do some renaming to make this clearer.

##########
File path: cpp/src/parquet/column_writer.cc
##########
@@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
 
   Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
                     int64_t num_levels, const ::arrow::Array& array,
-                    ArrowWriteContext* ctx) override {
+                    ArrowWriteContext* ctx, bool nested, bool array_nullable) 
override {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    bool leaf_is_not_nullable = !level_info_.HasNullableValues();

Review comment:
       will rename.

##########
File path: cpp/src/parquet/column_writer.cc
##########
@@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
 
   Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
                     int64_t num_levels, const ::arrow::Array& array,
-                    ArrowWriteContext* ctx) override {
+                    ArrowWriteContext* ctx, bool nested, bool array_nullable) 
override {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    bool leaf_is_not_nullable = !level_info_.HasNullableValues();
+    // Leaf nulls are canonical when there is only a single null element and 
it is at the

Review comment:
       will do.

##########
File path: cpp/src/parquet/column_writer.cc
##########
@@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
 
   Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
                     int64_t num_levels, const ::arrow::Array& array,
-                    ArrowWriteContext* ctx) override {
+                    ArrowWriteContext* ctx, bool nested, bool array_nullable) 
override {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    bool leaf_is_not_nullable = !level_info_.HasNullableValues();
+    // Leaf nulls are canonical when there is only a single null element and 
it is at the

Review comment:
       done.

##########
File path: cpp/src/parquet/column_writer.cc
##########
@@ -1009,12 +1046,33 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
 
   Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
                     int64_t num_levels, const ::arrow::Array& array,
-                    ArrowWriteContext* ctx) override {
+                    ArrowWriteContext* ctx, bool nested, bool array_nullable) 
override {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    bool leaf_is_not_nullable = !level_info_.HasNullableValues();
+    // Leaf nulls are canonical when there is only a single null element and 
it is at the
+    // leaf.
+    bool leaf_nulls_are_canonical =
+        (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) 
&&
+        array_nullable;
+    bool maybe_parent_nulls =
+        nested && !(leaf_is_not_nullable || leaf_nulls_are_canonical);

Review comment:
       nested is actually unncessary.  i've removed it.  The only thing that 
matters is if the column is nullable according to columninfo and it isn't the 
only nullable column.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to