pitrou commented on code in PR #38252:
URL: https://github.com/apache/arrow/pull/38252#discussion_r1363423518


##########
cpp/src/arrow/array/util.cc:
##########
@@ -347,215 +349,234 @@ static Result<std::shared_ptr<Scalar>> 
MakeScalarForRunEndValue(
   return std::make_shared<Int64Scalar>(run_end);
 }
 
-// get the maximum buffer length required, then allocate a single zeroed buffer
-// to use anywhere a buffer is required
 class NullArrayFactory {
  public:
-  struct GetBufferLength {
-    GetBufferLength(const std::shared_ptr<DataType>& type, int64_t length)
-        : type_(*type), length_(length), 
buffer_length_(bit_util::BytesForBits(length)) {}
-
-    Result<int64_t> Finish() && {
-      RETURN_NOT_OK(VisitTypeInline(type_, this));
-      return buffer_length_;
-    }
-
-    template <typename T, typename = 
decltype(TypeTraits<T>::bytes_required(0))>
-    Status Visit(const T&) {
-      return MaxOf(TypeTraits<T>::bytes_required(length_));
-    }
-
-    template <typename T>
-    enable_if_var_size_list<T, Status> Visit(const T& type) {
-      // values array may be empty, but there must be at least one offset of 0
-      RETURN_NOT_OK(MaxOf(sizeof(typename T::offset_type) * (length_ + 1)));
-      RETURN_NOT_OK(MaxOf(GetBufferLength(type.value_type(), length_)));
-      return Status::OK();
-    }
-
-    template <typename T>
-    enable_if_base_binary<T, Status> Visit(const T&) {
-      // values buffer may be empty, but there must be at least one offset of 0
-      return MaxOf(sizeof(typename T::offset_type) * (length_ + 1));
-    }
-
-    Status Visit(const FixedSizeListType& type) {
-      return MaxOf(GetBufferLength(type.value_type(), type.list_size() * 
length_));
-    }
-
-    Status Visit(const FixedSizeBinaryType& type) {
-      return MaxOf(type.byte_width() * length_);
-    }
-
-    Status Visit(const StructType& type) {
-      for (const auto& child : type.fields()) {
-        RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), length_)));
-      }
-      return Status::OK();
-    }
-
-    Status Visit(const SparseUnionType& type) {
-      // type codes
-      RETURN_NOT_OK(MaxOf(length_));
-      // will create children of the same length as the union
-      for (const auto& child : type.fields()) {
-        RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), length_)));
-      }
-      return Status::OK();
-    }
-
-    Status Visit(const DenseUnionType& type) {
-      // type codes
-      RETURN_NOT_OK(MaxOf(length_));
-      // offsets
-      RETURN_NOT_OK(MaxOf(sizeof(int32_t) * length_));
-      // will create children of length 1
-      for (const auto& child : type.fields()) {
-        RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), 1)));
-      }
-      return Status::OK();
-    }
+  // For most types, every buffer in an entirely null array will contain 
nothing but
+  // zeroes. For arrays of such types, we can allocate a single buffer and use 
that in
+  // every position of the array data. The first stage of visitation handles 
assessment
+  // of this buffer's size, the second uses the resulting buffer to build the 
null array.
+  //
+  // The first stage may not allocate from the MemoryPool or raise a failing 
status.
+  //
+  // In the second stage, `zero_buffer_` has been allocated and `out_` has:
+  // - type = type_
+  // - length = length_
+  // - null_count = length_ unless current output may have direct nulls,
+  //                0 otherwise
+  // - offset = 0
+  // - buffers = []
+  // - child_data = [nullptr] * type.num_fields()
+  // - dictionary = nullptr
+  bool presizing_zero_buffer_;
 
-    Status Visit(const DictionaryType& type) {
-      RETURN_NOT_OK(MaxOf(GetBufferLength(type.value_type(), length_)));
-      return MaxOf(GetBufferLength(type.index_type(), length_));
-    }
+  NullArrayFactory(const std::shared_ptr<DataType>& type, bool nullable, 
int64_t length)
+      : presizing_zero_buffer_{true},
+        type_{type},
+        nullable_{nullable},
+        length_{length},
+        zero_buffer_length_{MayHaveDirectNulls() ? 
bit_util::BytesForBits(length) : 0} {}
 
-    Status Visit(const RunEndEncodedType& type) {
-      // RunEndEncodedType has no buffers, only child arrays
-      buffer_length_ = 0;
-      return Status::OK();
-    }
+  NullArrayFactory(const std::shared_ptr<DataType>& type, bool nullable, 
int64_t length,
+                   const std::shared_ptr<Buffer>& zero_buffer, MemoryPool* 
pool)
+      : presizing_zero_buffer_{false},
+        type_{type},
+        nullable_{nullable},
+        length_{length},
+        zero_buffer_length_{MayHaveDirectNulls() ? 
bit_util::BytesForBits(length) : 0},
+        zero_buffer_{&zero_buffer},
+        pool_{pool} {}
 
-    Status Visit(const ExtensionType& type) {
-      // XXX is an extension array's length always == storage length
-      return MaxOf(GetBufferLength(type.storage_type(), length_));
-    }
+  template <typename... Args>
+  explicit NullArrayFactory(const std::shared_ptr<Field>& field, const 
Args&... args)
+      : NullArrayFactory{field->type(), field->nullable(), args...} {}
 
-    Status Visit(const DataType& type) {
-      return Status::NotImplemented("construction of all-null ", type);
-    }
+  bool MayHaveDirectNulls() const {
+    if (type_->storage_id() == Type::NA) return true;
+    return nullable_ && internal::HasValidityBitmap(type_->storage_id());
+  }
 
-   private:
-    Status MaxOf(GetBufferLength&& other) {
-      ARROW_ASSIGN_OR_RAISE(int64_t buffer_length, std::move(other).Finish());
-      return MaxOf(buffer_length);
-    }
+  void ZeroBufferMustBeAtLeast(int64_t length) {
+    DCHECK(presizing_zero_buffer_);
+    zero_buffer_length_ = std::max(zero_buffer_length_, length);
+  }
 
-    Status MaxOf(int64_t buffer_length) {
-      if (buffer_length > buffer_length_) {
-        buffer_length_ = buffer_length;
-      }
-      return Status::OK();
-    }
+  std::shared_ptr<Buffer> GetValidityBitmap() const {
+    DCHECK(!presizing_zero_buffer_);
+    return MayHaveDirectNulls() ? *zero_buffer_ : nullptr;
+  }
 
-    const DataType& type_;
-    int64_t length_, buffer_length_;
-  };
+  static int64_t GetZeroBufferLength(const std::shared_ptr<DataType>& type, 
bool nullable,
+                                     int64_t length) {
+    NullArrayFactory factory{type, nullable, length};
+    DCHECK_OK(VisitTypeInline(*type, &factory));
+    return factory.zero_buffer_length_;
+  }
 
-  NullArrayFactory(MemoryPool* pool, const std::shared_ptr<DataType>& type,
-                   int64_t length)
-      : pool_(pool), type_(type), length_(length) {}
+  static int64_t GetZeroBufferLength(const std::shared_ptr<Field>& field,
+                                     int64_t length) {
+    return GetZeroBufferLength(field->type(), field->nullable(), length);
+  }
 
-  Status CreateBuffer() {
-    if (type_->id() == Type::RUN_END_ENCODED) {
-      buffer_ = NULLPTR;
-      return Status::OK();
+  Status Visit(const NullType&) {
+    if (presizing_zero_buffer_) {
+      // null needs no buffers; don't touch the zero buffer size
+    } else {
+      out_->buffers = {nullptr};
     }
-    ARROW_ASSIGN_OR_RAISE(int64_t buffer_length,
-                          GetBufferLength(type_, length_).Finish());
-    ARROW_ASSIGN_OR_RAISE(buffer_, AllocateBuffer(buffer_length, pool_));
-    std::memset(buffer_->mutable_data(), 0, buffer_->size());
     return Status::OK();
   }
 
-  Result<std::shared_ptr<ArrayData>> Create() {
-    if (buffer_ == nullptr) {
-      RETURN_NOT_OK(CreateBuffer());
+  Status Visit(const BooleanType& type) {
+    if (presizing_zero_buffer_) {
+      ZeroBufferMustBeAtLeast(bit_util::BytesForBits(length_));
+      return Status::OK();
     }
-    std::vector<std::shared_ptr<ArrayData>> child_data(type_->num_fields());
-    auto buffer_slice =
-        buffer_ ? SliceBuffer(buffer_, 0, bit_util::BytesForBits(length_)) : 
NULLPTR;
-    out_ = ArrayData::Make(type_, length_, {std::move(buffer_slice)}, 
child_data, length_,
-                           0);
-    RETURN_NOT_OK(VisitTypeInline(*type_, this));
-    return out_;
-  }
-
-  Status Visit(const NullType&) {
-    out_->buffers.resize(1, nullptr);
+    out_->buffers = {GetValidityBitmap(), *zero_buffer_};
     return Status::OK();
   }
 
-  Status Visit(const FixedWidthType&) {
-    out_->buffers.resize(2, buffer_);
+  Status Visit(const FixedWidthType& type) {
+    if (presizing_zero_buffer_) {
+      ZeroBufferMustBeAtLeast(type.byte_width() * length_);
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap(), *zero_buffer_};
     return Status::OK();
   }
 
   template <typename T>
   enable_if_base_binary<T, Status> Visit(const T&) {
-    out_->buffers.resize(3, buffer_);
+    if (presizing_zero_buffer_) {
+      // values buffer may be empty, but there must be at least one offset of 0
+      ZeroBufferMustBeAtLeast(sizeof(typename T::offset_type) * (length_ + 1));
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap(), *zero_buffer_, *zero_buffer_};
     return Status::OK();
   }
 
   template <typename T>
   enable_if_var_size_list<T, Status> Visit(const T& type) {
-    out_->buffers.resize(2, buffer_);
-    ARROW_ASSIGN_OR_RAISE(out_->child_data[0], CreateChild(type, 0, 
/*length=*/0));
-    return Status::OK();
+    if (presizing_zero_buffer_) {
+      // values array may be empty, but there must be at least one offset of 0
+      ZeroBufferMustBeAtLeast(sizeof(typename T::offset_type) * (length_ + 1));
+      ZeroBufferMustBeAtLeast(GetZeroBufferLength(type.value_field(), 0));
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap(), *zero_buffer_};
+    return CreateChild(0, /*length=*/0);
   }
 
   Status Visit(const FixedSizeListType& type) {
-    ARROW_ASSIGN_OR_RAISE(out_->child_data[0],
-                          CreateChild(type, 0, length_ * type.list_size()));
-    return Status::OK();
+    if (presizing_zero_buffer_) {
+      ZeroBufferMustBeAtLeast(
+          GetZeroBufferLength(type.value_field(), type.list_size() * length_));
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap()};
+    return CreateChild(0, type.list_size() * length_);
   }
 
   Status Visit(const StructType& type) {
-    for (int i = 0; i < type_->num_fields(); ++i) {
-      ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(type, i, 
length_));
+    if (presizing_zero_buffer_) {
+      for (const auto& child : type.fields()) {
+        ZeroBufferMustBeAtLeast(GetZeroBufferLength(child, length_));
+      }
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap()};
+    for (int i = 0; i < type.num_fields(); ++i) {
+      RETURN_NOT_OK(CreateChild(i, length_));
     }
     return Status::OK();
   }
 
+  static Result<int8_t> GetIdOfFirstNullableUnionMember(const UnionType& type) 
{
+    for (auto [field, id] : Zip(type.fields(), type.type_codes())) {
+      if (field->nullable()) return id;
+    }
+    return Status::Invalid("Cannot produce an array of null ", type,
+                           " because no child field is nullable");
+  }
+
   Status Visit(const UnionType& type) {
-    out_->buffers.resize(2);
+    // For sparse unions, we create children with the same length as the 
parent.
+    //
+    // For dense unions, we create children with length 1 and have offsets 
which always
+    // refer to the first first slot from one child.

Review Comment:
   ```suggestion
       // refer to the first slot from one child.
   ```



##########
cpp/src/arrow/array/util.cc:
##########
@@ -347,215 +349,234 @@ static Result<std::shared_ptr<Scalar>> 
MakeScalarForRunEndValue(
   return std::make_shared<Int64Scalar>(run_end);
 }
 
-// get the maximum buffer length required, then allocate a single zeroed buffer
-// to use anywhere a buffer is required
 class NullArrayFactory {
  public:
-  struct GetBufferLength {
-    GetBufferLength(const std::shared_ptr<DataType>& type, int64_t length)
-        : type_(*type), length_(length), 
buffer_length_(bit_util::BytesForBits(length)) {}
-
-    Result<int64_t> Finish() && {
-      RETURN_NOT_OK(VisitTypeInline(type_, this));
-      return buffer_length_;
-    }
-
-    template <typename T, typename = 
decltype(TypeTraits<T>::bytes_required(0))>
-    Status Visit(const T&) {
-      return MaxOf(TypeTraits<T>::bytes_required(length_));
-    }
-
-    template <typename T>
-    enable_if_var_size_list<T, Status> Visit(const T& type) {
-      // values array may be empty, but there must be at least one offset of 0
-      RETURN_NOT_OK(MaxOf(sizeof(typename T::offset_type) * (length_ + 1)));
-      RETURN_NOT_OK(MaxOf(GetBufferLength(type.value_type(), length_)));
-      return Status::OK();
-    }
-
-    template <typename T>
-    enable_if_base_binary<T, Status> Visit(const T&) {
-      // values buffer may be empty, but there must be at least one offset of 0
-      return MaxOf(sizeof(typename T::offset_type) * (length_ + 1));
-    }
-
-    Status Visit(const FixedSizeListType& type) {
-      return MaxOf(GetBufferLength(type.value_type(), type.list_size() * 
length_));
-    }
-
-    Status Visit(const FixedSizeBinaryType& type) {
-      return MaxOf(type.byte_width() * length_);
-    }
-
-    Status Visit(const StructType& type) {
-      for (const auto& child : type.fields()) {
-        RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), length_)));
-      }
-      return Status::OK();
-    }
-
-    Status Visit(const SparseUnionType& type) {
-      // type codes
-      RETURN_NOT_OK(MaxOf(length_));
-      // will create children of the same length as the union
-      for (const auto& child : type.fields()) {
-        RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), length_)));
-      }
-      return Status::OK();
-    }
-
-    Status Visit(const DenseUnionType& type) {
-      // type codes
-      RETURN_NOT_OK(MaxOf(length_));
-      // offsets
-      RETURN_NOT_OK(MaxOf(sizeof(int32_t) * length_));
-      // will create children of length 1
-      for (const auto& child : type.fields()) {
-        RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), 1)));
-      }
-      return Status::OK();
-    }
+  // For most types, every buffer in an entirely null array will contain 
nothing but
+  // zeroes. For arrays of such types, we can allocate a single buffer and use 
that in
+  // every position of the array data. The first stage of visitation handles 
assessment
+  // of this buffer's size, the second uses the resulting buffer to build the 
null array.
+  //
+  // The first stage may not allocate from the MemoryPool or raise a failing 
status.
+  //
+  // In the second stage, `zero_buffer_` has been allocated and `out_` has:
+  // - type = type_
+  // - length = length_
+  // - null_count = length_ unless current output may have direct nulls,
+  //                0 otherwise
+  // - offset = 0
+  // - buffers = []
+  // - child_data = [nullptr] * type.num_fields()
+  // - dictionary = nullptr
+  bool presizing_zero_buffer_;
 
-    Status Visit(const DictionaryType& type) {
-      RETURN_NOT_OK(MaxOf(GetBufferLength(type.value_type(), length_)));
-      return MaxOf(GetBufferLength(type.index_type(), length_));
-    }
+  NullArrayFactory(const std::shared_ptr<DataType>& type, bool nullable, 
int64_t length)
+      : presizing_zero_buffer_{true},
+        type_{type},
+        nullable_{nullable},
+        length_{length},
+        zero_buffer_length_{MayHaveDirectNulls() ? 
bit_util::BytesForBits(length) : 0} {}
 
-    Status Visit(const RunEndEncodedType& type) {
-      // RunEndEncodedType has no buffers, only child arrays
-      buffer_length_ = 0;
-      return Status::OK();
-    }
+  NullArrayFactory(const std::shared_ptr<DataType>& type, bool nullable, 
int64_t length,
+                   const std::shared_ptr<Buffer>& zero_buffer, MemoryPool* 
pool)
+      : presizing_zero_buffer_{false},
+        type_{type},
+        nullable_{nullable},
+        length_{length},
+        zero_buffer_length_{MayHaveDirectNulls() ? 
bit_util::BytesForBits(length) : 0},
+        zero_buffer_{&zero_buffer},
+        pool_{pool} {}
 
-    Status Visit(const ExtensionType& type) {
-      // XXX is an extension array's length always == storage length
-      return MaxOf(GetBufferLength(type.storage_type(), length_));
-    }
+  template <typename... Args>
+  explicit NullArrayFactory(const std::shared_ptr<Field>& field, const 
Args&... args)
+      : NullArrayFactory{field->type(), field->nullable(), args...} {}
 
-    Status Visit(const DataType& type) {
-      return Status::NotImplemented("construction of all-null ", type);
-    }
+  bool MayHaveDirectNulls() const {
+    if (type_->storage_id() == Type::NA) return true;
+    return nullable_ && internal::HasValidityBitmap(type_->storage_id());
+  }
 
-   private:
-    Status MaxOf(GetBufferLength&& other) {
-      ARROW_ASSIGN_OR_RAISE(int64_t buffer_length, std::move(other).Finish());
-      return MaxOf(buffer_length);
-    }
+  void ZeroBufferMustBeAtLeast(int64_t length) {
+    DCHECK(presizing_zero_buffer_);
+    zero_buffer_length_ = std::max(zero_buffer_length_, length);
+  }
 
-    Status MaxOf(int64_t buffer_length) {
-      if (buffer_length > buffer_length_) {
-        buffer_length_ = buffer_length;
-      }
-      return Status::OK();
-    }
+  std::shared_ptr<Buffer> GetValidityBitmap() const {
+    DCHECK(!presizing_zero_buffer_);
+    return MayHaveDirectNulls() ? *zero_buffer_ : nullptr;
+  }
 
-    const DataType& type_;
-    int64_t length_, buffer_length_;
-  };
+  static int64_t GetZeroBufferLength(const std::shared_ptr<DataType>& type, 
bool nullable,
+                                     int64_t length) {
+    NullArrayFactory factory{type, nullable, length};
+    DCHECK_OK(VisitTypeInline(*type, &factory));
+    return factory.zero_buffer_length_;
+  }
 
-  NullArrayFactory(MemoryPool* pool, const std::shared_ptr<DataType>& type,
-                   int64_t length)
-      : pool_(pool), type_(type), length_(length) {}
+  static int64_t GetZeroBufferLength(const std::shared_ptr<Field>& field,
+                                     int64_t length) {
+    return GetZeroBufferLength(field->type(), field->nullable(), length);
+  }
 
-  Status CreateBuffer() {
-    if (type_->id() == Type::RUN_END_ENCODED) {
-      buffer_ = NULLPTR;
-      return Status::OK();
+  Status Visit(const NullType&) {
+    if (presizing_zero_buffer_) {
+      // null needs no buffers; don't touch the zero buffer size
+    } else {
+      out_->buffers = {nullptr};
     }
-    ARROW_ASSIGN_OR_RAISE(int64_t buffer_length,
-                          GetBufferLength(type_, length_).Finish());
-    ARROW_ASSIGN_OR_RAISE(buffer_, AllocateBuffer(buffer_length, pool_));
-    std::memset(buffer_->mutable_data(), 0, buffer_->size());
     return Status::OK();
   }
 
-  Result<std::shared_ptr<ArrayData>> Create() {
-    if (buffer_ == nullptr) {
-      RETURN_NOT_OK(CreateBuffer());
+  Status Visit(const BooleanType& type) {
+    if (presizing_zero_buffer_) {
+      ZeroBufferMustBeAtLeast(bit_util::BytesForBits(length_));
+      return Status::OK();
     }
-    std::vector<std::shared_ptr<ArrayData>> child_data(type_->num_fields());
-    auto buffer_slice =
-        buffer_ ? SliceBuffer(buffer_, 0, bit_util::BytesForBits(length_)) : 
NULLPTR;
-    out_ = ArrayData::Make(type_, length_, {std::move(buffer_slice)}, 
child_data, length_,
-                           0);
-    RETURN_NOT_OK(VisitTypeInline(*type_, this));
-    return out_;
-  }
-
-  Status Visit(const NullType&) {
-    out_->buffers.resize(1, nullptr);
+    out_->buffers = {GetValidityBitmap(), *zero_buffer_};
     return Status::OK();
   }
 
-  Status Visit(const FixedWidthType&) {
-    out_->buffers.resize(2, buffer_);
+  Status Visit(const FixedWidthType& type) {
+    if (presizing_zero_buffer_) {
+      ZeroBufferMustBeAtLeast(type.byte_width() * length_);
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap(), *zero_buffer_};
     return Status::OK();
   }
 
   template <typename T>
   enable_if_base_binary<T, Status> Visit(const T&) {
-    out_->buffers.resize(3, buffer_);
+    if (presizing_zero_buffer_) {
+      // values buffer may be empty, but there must be at least one offset of 0
+      ZeroBufferMustBeAtLeast(sizeof(typename T::offset_type) * (length_ + 1));
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap(), *zero_buffer_, *zero_buffer_};
     return Status::OK();
   }
 
   template <typename T>
   enable_if_var_size_list<T, Status> Visit(const T& type) {
-    out_->buffers.resize(2, buffer_);
-    ARROW_ASSIGN_OR_RAISE(out_->child_data[0], CreateChild(type, 0, 
/*length=*/0));
-    return Status::OK();
+    if (presizing_zero_buffer_) {
+      // values array may be empty, but there must be at least one offset of 0
+      ZeroBufferMustBeAtLeast(sizeof(typename T::offset_type) * (length_ + 1));
+      ZeroBufferMustBeAtLeast(GetZeroBufferLength(type.value_field(), 0));
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap(), *zero_buffer_};
+    return CreateChild(0, /*length=*/0);
   }
 
   Status Visit(const FixedSizeListType& type) {
-    ARROW_ASSIGN_OR_RAISE(out_->child_data[0],
-                          CreateChild(type, 0, length_ * type.list_size()));
-    return Status::OK();
+    if (presizing_zero_buffer_) {
+      ZeroBufferMustBeAtLeast(
+          GetZeroBufferLength(type.value_field(), type.list_size() * length_));
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap()};
+    return CreateChild(0, type.list_size() * length_);
   }
 
   Status Visit(const StructType& type) {
-    for (int i = 0; i < type_->num_fields(); ++i) {
-      ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(type, i, 
length_));
+    if (presizing_zero_buffer_) {
+      for (const auto& child : type.fields()) {
+        ZeroBufferMustBeAtLeast(GetZeroBufferLength(child, length_));
+      }
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap()};
+    for (int i = 0; i < type.num_fields(); ++i) {
+      RETURN_NOT_OK(CreateChild(i, length_));
     }
     return Status::OK();
   }
 
+  static Result<int8_t> GetIdOfFirstNullableUnionMember(const UnionType& type) 
{
+    for (auto [field, id] : Zip(type.fields(), type.type_codes())) {
+      if (field->nullable()) return id;
+    }
+    return Status::Invalid("Cannot produce an array of null ", type,
+                           " because no child field is nullable");
+  }
+
   Status Visit(const UnionType& type) {
-    out_->buffers.resize(2);
+    // For sparse unions, we create children with the same length as the 
parent.
+    //
+    // For dense unions, we create children with length 1 and have offsets 
which always
+    // refer to the first first slot from one child.
+    int64_t child_length = type.mode() == UnionMode::SPARSE ? length_ : 1;
+
+    if (presizing_zero_buffer_) {
+      // type codes
+      ZeroBufferMustBeAtLeast(length_);
 
-    // First buffer is always null
-    out_->buffers[0] = nullptr;
+      if (type.mode() == UnionMode::DENSE) {
+        // offsets
+        ZeroBufferMustBeAtLeast(sizeof(int32_t) * length_);
+      }
 
-    out_->buffers[1] = buffer_;
-    // buffer_ is zeroed, but 0 may not be a valid type code
-    if (type.type_codes()[0] != 0) {
-      ARROW_ASSIGN_OR_RAISE(out_->buffers[1], AllocateBuffer(length_, pool_));
-      std::memset(out_->buffers[1]->mutable_data(), type.type_codes()[0], 
length_);
+      for (const auto& child : type.fields()) {
+        ZeroBufferMustBeAtLeast(GetZeroBufferLength(child, child_length));
+      }
+      return Status::OK();
     }
 
-    // For sparse unions, we now create children with the same length as the
-    // parent
-    int64_t child_length = length_;
-    if (type.mode() == UnionMode::DENSE) {
-      // For dense unions, we set the offsets to all zero and create children
-      // with length 1
-      out_->buffers.resize(3);
-      out_->buffers[2] = buffer_;
+    // The validity bitmap is always absent for unions
+    out_->buffers = {nullptr};
+
+    // Next is the type ids buffer. We may not be able to use zero_buffer_
+    // for this since 0 may not be a valid type id.
+    ARROW_ASSIGN_OR_RAISE(int8_t first_nullable_id,
+                          GetIdOfFirstNullableUnionMember(type));
+    if (first_nullable_id == 0) {
+      out_->buffers.push_back(*zero_buffer_);
+    } else {
+      ARROW_ASSIGN_OR_RAISE(auto type_ids, AllocateBuffer(length_, pool_));
+      std::memset(type_ids->mutable_data(), first_nullable_id, length_);
+      out_->buffers.push_back(std::move(type_ids));
+    }
 
-      child_length = 1;
+    if (type.mode() == UnionMode::DENSE) {
+      out_->buffers.push_back(*zero_buffer_);
     }
-    for (int i = 0; i < type_->num_fields(); ++i) {
-      ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(type, i, 
child_length));
+
+    for (int i = 0; i < type.num_fields(); ++i) {
+      RETURN_NOT_OK(CreateChild(i, child_length));
     }
     return Status::OK();
   }
 
   Status Visit(const DictionaryType& type) {
-    out_->buffers.resize(2, buffer_);
-    ARROW_ASSIGN_OR_RAISE(auto typed_null_dict, 
MakeArrayOfNull(type.value_type(), 0));
-    out_->dictionary = typed_null_dict->data();
+    // The dictionary's indices are non-nullable; we can still create an array

Review Comment:
   ```suggestion
       // If the dictionary's indices are non-nullable, we can still create an 
array
   ```



##########
cpp/src/arrow/array/array_test.cc:
##########
@@ -438,6 +439,18 @@ TEST_F(TestArray, TestMakeArrayOfNull) {
       }
     }
   }
+
+  auto req = [](auto type) { return field("", std::move(type), 
/*nullable=*/false); };
+
+  // union with no nullable fields cannot represent a null

Review Comment:
   What would happen with length 0 and length > 1 in the checks below? Perhaps 
also test that.



##########
cpp/src/arrow/array/validate.cc:
##########
@@ -272,26 +272,16 @@ struct ValidateArrayImpl {
                              ") multiplied by the value size (", list_size, 
")");
     }
 
-    const Status child_valid = RecurseInto(values);
-    if (!child_valid.ok()) {
-      return Status::Invalid("Fixed size list child array invalid: ",
-                             child_valid.ToString());
-    }
-
+    RETURN_NOT_OK(RecurseIntoField(0, "Fixed size list child array"));
     return Status::OK();
   }
 
   Status Visit(const StructType& type) {
     for (int i = 0; i < type.num_fields(); ++i) {
-      const auto& field_data = *data.child_data[i];
-
       // Validate child first, to catch nonsensical length / offset etc.
-      const Status field_valid = RecurseInto(field_data);
-      if (!field_valid.ok()) {
-        return Status::Invalid("Struct child array #", i,
-                               " invalid: ", field_valid.ToString());
-      }
+      RETURN_NOT_OK(RecurseIntoField(0, "Struct child array #", i));

Review Comment:
   ```suggestion
         RETURN_NOT_OK(RecurseIntoField(i, "Struct child array #", i));
   ```



##########
cpp/src/arrow/array/util.cc:
##########
@@ -566,33 +587,67 @@ class NullArrayFactory {
       ARROW_ASSIGN_OR_RAISE(run_ends, MakeArrayFromScalar(*run_end_scalar, 1, 
pool_));
       ARROW_ASSIGN_OR_RAISE(values, MakeArrayOfNull(type.value_type(), 1, 
pool_));
     }
-    out_->child_data[0] = run_ends->data();
-    out_->child_data[1] = values->data();
+    out_->child_data = {run_ends->data(), values->data()};
     return Status::OK();
   }
 
   Status Visit(const ExtensionType& type) {
-    out_->child_data.resize(type.storage_type()->num_fields());
-    RETURN_NOT_OK(VisitTypeInline(*type.storage_type(), this));
+    if (presizing_zero_buffer_) {
+      ZeroBufferMustBeAtLeast(
+          GetZeroBufferLength(type.storage_type(), nullable_, length_));
+      return Status::OK();
+    }
+
+    ARROW_ASSIGN_OR_RAISE(auto out,
+                          CreateRelated(type.storage_type(), nullable_, 
length_));
+    *out_ = std::move(*out);

Review Comment:
   This looks weird. Is it guaranteed that the raw pointer `out_` is/will be 
tracked by a `shared_ptr` at some point? Can't we have `out_` be a `shared_ptr` 
itself?



##########
cpp/src/arrow/array/validate.cc:
##########
@@ -440,9 +422,28 @@ struct ValidateArrayImpl {
     return data.buffers[index] != nullptr && data.buffers[index]->address() != 
0;
   }
 
-  Status RecurseInto(const ArrayData& related_data) {
-    ValidateArrayImpl impl{related_data, full_validation};
-    return impl.Validate();
+  template <typename... FieldDescription>
+  Status RecurseIntoField(int field_index, const FieldDescription&... 
description) {
+    const auto& related_data = *data.child_data[field_index];
+
+    if (!data.type->storage_type_ref().field(field_index)->nullable()) {
+      if (related_data.null_count != 0 && related_data.null_count != 
kUnknownNullCount) {

Review Comment:
   If full validation is enabled, we could probably enable null count 
computation here.



##########
cpp/src/arrow/array/util.cc:
##########
@@ -566,33 +587,67 @@ class NullArrayFactory {
       ARROW_ASSIGN_OR_RAISE(run_ends, MakeArrayFromScalar(*run_end_scalar, 1, 
pool_));
       ARROW_ASSIGN_OR_RAISE(values, MakeArrayOfNull(type.value_type(), 1, 
pool_));
     }
-    out_->child_data[0] = run_ends->data();
-    out_->child_data[1] = values->data();
+    out_->child_data = {run_ends->data(), values->data()};
     return Status::OK();
   }
 
   Status Visit(const ExtensionType& type) {
-    out_->child_data.resize(type.storage_type()->num_fields());
-    RETURN_NOT_OK(VisitTypeInline(*type.storage_type(), this));
+    if (presizing_zero_buffer_) {
+      ZeroBufferMustBeAtLeast(
+          GetZeroBufferLength(type.storage_type(), nullable_, length_));
+      return Status::OK();
+    }
+
+    ARROW_ASSIGN_OR_RAISE(auto out,
+                          CreateRelated(type.storage_type(), nullable_, 
length_));
+    *out_ = std::move(*out);
+    out_->type = type_;
     return Status::OK();
   }
 
   Status Visit(const DataType& type) {
     return Status::NotImplemented("construction of all-null ", type);
   }
 
-  Result<std::shared_ptr<ArrayData>> CreateChild(const DataType& type, int i,
-                                                 int64_t length) {
-    NullArrayFactory child_factory(pool_, type.field(i)->type(), length);
-    child_factory.buffer_ = buffer_;
-    return child_factory.Create();
+  Result<std::shared_ptr<ArrayData>> Create() && {
+    DCHECK(!presizing_zero_buffer_);
+    auto out = std::make_shared<ArrayData>();
+    out_ = out.get();
+    out_->type = type_;
+    out_->length = length_;
+    out_->null_count = MayHaveDirectNulls() ? length_ : 0;
+    out_->offset = 0;
+    out_->buffers = {};
+    out_->child_data.resize(type_->storage_type_ref().num_fields());
+    out_->dictionary = nullptr;
+    RETURN_NOT_OK(VisitTypeInline(*type_, this));
+    return out;
+  }
+
+  Status CreateChild(int i, int64_t length) {
+    DCHECK(!presizing_zero_buffer_);
+    const auto& field = type_->storage_type_ref().field(i);
+    ARROW_ASSIGN_OR_RAISE(out_->child_data[i],
+                          CreateRelated(field->type(), field->nullable(), 
length));
+    return Status::OK();
+  }
+
+  Result<std::shared_ptr<ArrayData>> CreateRelated(
+      const std::shared_ptr<DataType>& related_type, bool nullable, int64_t 
length) {
+    return NullArrayFactory{related_type, nullable, length, *zero_buffer_, 
pool_}
+        .Create();
   }
 
-  MemoryPool* pool_;
   const std::shared_ptr<DataType>& type_;
+  bool nullable_;
   int64_t length_;
-  std::shared_ptr<ArrayData> out_;
-  std::shared_ptr<Buffer> buffer_;
+
+  int64_t zero_buffer_length_ = 0;
+
+  const std::shared_ptr<Buffer>* zero_buffer_;
+  MemoryPool* pool_;
+
+  ArrayData* out_;

Review Comment:
   This feels a bit fragile...



##########
cpp/src/arrow/extension_type.h:
##########
@@ -45,6 +45,9 @@ class ARROW_EXPORT ExtensionType : public DataType {
   /// \brief The type of array used to represent this extension type's data
   const std::shared_ptr<DataType>& storage_type() const { return 
storage_type_; }
 
+  /// \brief Return a reference to the storage type
+  const DataType& storage_type_ref() const override { return *storage_type_; }

Review Comment:
   I'm in favor of making `storage_type()` virtual.
   
   



##########
cpp/src/arrow/array/array_test.cc:
##########
@@ -410,6 +410,7 @@ static std::vector<std::shared_ptr<DataType>> 
TestArrayUtilitiesAgainstTheseType
 
 TEST_F(TestArray, TestMakeArrayOfNull) {
   for (int64_t length : {0, 1, 16, 133}) {
+    ARROW_SCOPED_TRACE("length = ", length);
     for (auto type : TestArrayUtilitiesAgainstTheseTypes()) {

Review Comment:
   It seems `TestArrayUtilitiesAgainstTheseTypes` doesn't include an empty 
struct, can we add one there? (or if it fails other tests, test 
`MakeArrayOfNull` specifically with an empty struct)



##########
cpp/src/arrow/array/util.cc:
##########
@@ -347,215 +349,234 @@ static Result<std::shared_ptr<Scalar>> 
MakeScalarForRunEndValue(
   return std::make_shared<Int64Scalar>(run_end);
 }
 
-// get the maximum buffer length required, then allocate a single zeroed buffer
-// to use anywhere a buffer is required
 class NullArrayFactory {
  public:
-  struct GetBufferLength {
-    GetBufferLength(const std::shared_ptr<DataType>& type, int64_t length)
-        : type_(*type), length_(length), 
buffer_length_(bit_util::BytesForBits(length)) {}
-
-    Result<int64_t> Finish() && {
-      RETURN_NOT_OK(VisitTypeInline(type_, this));
-      return buffer_length_;
-    }
-
-    template <typename T, typename = 
decltype(TypeTraits<T>::bytes_required(0))>
-    Status Visit(const T&) {
-      return MaxOf(TypeTraits<T>::bytes_required(length_));
-    }
-
-    template <typename T>
-    enable_if_var_size_list<T, Status> Visit(const T& type) {
-      // values array may be empty, but there must be at least one offset of 0
-      RETURN_NOT_OK(MaxOf(sizeof(typename T::offset_type) * (length_ + 1)));
-      RETURN_NOT_OK(MaxOf(GetBufferLength(type.value_type(), length_)));
-      return Status::OK();
-    }
-
-    template <typename T>
-    enable_if_base_binary<T, Status> Visit(const T&) {
-      // values buffer may be empty, but there must be at least one offset of 0
-      return MaxOf(sizeof(typename T::offset_type) * (length_ + 1));
-    }
-
-    Status Visit(const FixedSizeListType& type) {
-      return MaxOf(GetBufferLength(type.value_type(), type.list_size() * 
length_));
-    }
-
-    Status Visit(const FixedSizeBinaryType& type) {
-      return MaxOf(type.byte_width() * length_);
-    }
-
-    Status Visit(const StructType& type) {
-      for (const auto& child : type.fields()) {
-        RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), length_)));
-      }
-      return Status::OK();
-    }
-
-    Status Visit(const SparseUnionType& type) {
-      // type codes
-      RETURN_NOT_OK(MaxOf(length_));
-      // will create children of the same length as the union
-      for (const auto& child : type.fields()) {
-        RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), length_)));
-      }
-      return Status::OK();
-    }
-
-    Status Visit(const DenseUnionType& type) {
-      // type codes
-      RETURN_NOT_OK(MaxOf(length_));
-      // offsets
-      RETURN_NOT_OK(MaxOf(sizeof(int32_t) * length_));
-      // will create children of length 1
-      for (const auto& child : type.fields()) {
-        RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), 1)));
-      }
-      return Status::OK();
-    }
+  // For most types, every buffer in an entirely null array will contain 
nothing but
+  // zeroes. For arrays of such types, we can allocate a single buffer and use 
that in
+  // every position of the array data. The first stage of visitation handles 
assessment
+  // of this buffer's size, the second uses the resulting buffer to build the 
null array.
+  //
+  // The first stage may not allocate from the MemoryPool or raise a failing 
status.
+  //
+  // In the second stage, `zero_buffer_` has been allocated and `out_` has:
+  // - type = type_
+  // - length = length_
+  // - null_count = length_ unless current output may have direct nulls,
+  //                0 otherwise
+  // - offset = 0
+  // - buffers = []
+  // - child_data = [nullptr] * type.num_fields()
+  // - dictionary = nullptr
+  bool presizing_zero_buffer_;
 
-    Status Visit(const DictionaryType& type) {
-      RETURN_NOT_OK(MaxOf(GetBufferLength(type.value_type(), length_)));
-      return MaxOf(GetBufferLength(type.index_type(), length_));
-    }
+  NullArrayFactory(const std::shared_ptr<DataType>& type, bool nullable, 
int64_t length)
+      : presizing_zero_buffer_{true},
+        type_{type},
+        nullable_{nullable},
+        length_{length},
+        zero_buffer_length_{MayHaveDirectNulls() ? 
bit_util::BytesForBits(length) : 0} {}
 
-    Status Visit(const RunEndEncodedType& type) {
-      // RunEndEncodedType has no buffers, only child arrays
-      buffer_length_ = 0;
-      return Status::OK();
-    }
+  NullArrayFactory(const std::shared_ptr<DataType>& type, bool nullable, 
int64_t length,
+                   const std::shared_ptr<Buffer>& zero_buffer, MemoryPool* 
pool)
+      : presizing_zero_buffer_{false},
+        type_{type},
+        nullable_{nullable},
+        length_{length},
+        zero_buffer_length_{MayHaveDirectNulls() ? 
bit_util::BytesForBits(length) : 0},
+        zero_buffer_{&zero_buffer},
+        pool_{pool} {}
 
-    Status Visit(const ExtensionType& type) {
-      // XXX is an extension array's length always == storage length
-      return MaxOf(GetBufferLength(type.storage_type(), length_));
-    }
+  template <typename... Args>
+  explicit NullArrayFactory(const std::shared_ptr<Field>& field, const 
Args&... args)
+      : NullArrayFactory{field->type(), field->nullable(), args...} {}
 
-    Status Visit(const DataType& type) {
-      return Status::NotImplemented("construction of all-null ", type);
-    }
+  bool MayHaveDirectNulls() const {
+    if (type_->storage_id() == Type::NA) return true;
+    return nullable_ && internal::HasValidityBitmap(type_->storage_id());
+  }
 
-   private:
-    Status MaxOf(GetBufferLength&& other) {
-      ARROW_ASSIGN_OR_RAISE(int64_t buffer_length, std::move(other).Finish());
-      return MaxOf(buffer_length);
-    }
+  void ZeroBufferMustBeAtLeast(int64_t length) {
+    DCHECK(presizing_zero_buffer_);
+    zero_buffer_length_ = std::max(zero_buffer_length_, length);
+  }
 
-    Status MaxOf(int64_t buffer_length) {
-      if (buffer_length > buffer_length_) {
-        buffer_length_ = buffer_length;
-      }
-      return Status::OK();
-    }
+  std::shared_ptr<Buffer> GetValidityBitmap() const {
+    DCHECK(!presizing_zero_buffer_);
+    return MayHaveDirectNulls() ? *zero_buffer_ : nullptr;
+  }
 
-    const DataType& type_;
-    int64_t length_, buffer_length_;
-  };
+  static int64_t GetZeroBufferLength(const std::shared_ptr<DataType>& type, 
bool nullable,
+                                     int64_t length) {
+    NullArrayFactory factory{type, nullable, length};
+    DCHECK_OK(VisitTypeInline(*type, &factory));
+    return factory.zero_buffer_length_;
+  }
 
-  NullArrayFactory(MemoryPool* pool, const std::shared_ptr<DataType>& type,
-                   int64_t length)
-      : pool_(pool), type_(type), length_(length) {}
+  static int64_t GetZeroBufferLength(const std::shared_ptr<Field>& field,
+                                     int64_t length) {
+    return GetZeroBufferLength(field->type(), field->nullable(), length);
+  }
 
-  Status CreateBuffer() {
-    if (type_->id() == Type::RUN_END_ENCODED) {
-      buffer_ = NULLPTR;
-      return Status::OK();
+  Status Visit(const NullType&) {
+    if (presizing_zero_buffer_) {
+      // null needs no buffers; don't touch the zero buffer size
+    } else {
+      out_->buffers = {nullptr};
     }
-    ARROW_ASSIGN_OR_RAISE(int64_t buffer_length,
-                          GetBufferLength(type_, length_).Finish());
-    ARROW_ASSIGN_OR_RAISE(buffer_, AllocateBuffer(buffer_length, pool_));
-    std::memset(buffer_->mutable_data(), 0, buffer_->size());
     return Status::OK();
   }
 
-  Result<std::shared_ptr<ArrayData>> Create() {
-    if (buffer_ == nullptr) {
-      RETURN_NOT_OK(CreateBuffer());
+  Status Visit(const BooleanType& type) {
+    if (presizing_zero_buffer_) {
+      ZeroBufferMustBeAtLeast(bit_util::BytesForBits(length_));
+      return Status::OK();
     }
-    std::vector<std::shared_ptr<ArrayData>> child_data(type_->num_fields());
-    auto buffer_slice =
-        buffer_ ? SliceBuffer(buffer_, 0, bit_util::BytesForBits(length_)) : 
NULLPTR;
-    out_ = ArrayData::Make(type_, length_, {std::move(buffer_slice)}, 
child_data, length_,
-                           0);
-    RETURN_NOT_OK(VisitTypeInline(*type_, this));
-    return out_;
-  }
-
-  Status Visit(const NullType&) {
-    out_->buffers.resize(1, nullptr);
+    out_->buffers = {GetValidityBitmap(), *zero_buffer_};
     return Status::OK();
   }
 
-  Status Visit(const FixedWidthType&) {
-    out_->buffers.resize(2, buffer_);
+  Status Visit(const FixedWidthType& type) {
+    if (presizing_zero_buffer_) {
+      ZeroBufferMustBeAtLeast(type.byte_width() * length_);
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap(), *zero_buffer_};
     return Status::OK();
   }
 
   template <typename T>
   enable_if_base_binary<T, Status> Visit(const T&) {
-    out_->buffers.resize(3, buffer_);
+    if (presizing_zero_buffer_) {
+      // values buffer may be empty, but there must be at least one offset of 0
+      ZeroBufferMustBeAtLeast(sizeof(typename T::offset_type) * (length_ + 1));
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap(), *zero_buffer_, *zero_buffer_};
     return Status::OK();
   }
 
   template <typename T>
   enable_if_var_size_list<T, Status> Visit(const T& type) {
-    out_->buffers.resize(2, buffer_);
-    ARROW_ASSIGN_OR_RAISE(out_->child_data[0], CreateChild(type, 0, 
/*length=*/0));
-    return Status::OK();
+    if (presizing_zero_buffer_) {
+      // values array may be empty, but there must be at least one offset of 0
+      ZeroBufferMustBeAtLeast(sizeof(typename T::offset_type) * (length_ + 1));
+      ZeroBufferMustBeAtLeast(GetZeroBufferLength(type.value_field(), 0));
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap(), *zero_buffer_};
+    return CreateChild(0, /*length=*/0);
   }
 
   Status Visit(const FixedSizeListType& type) {
-    ARROW_ASSIGN_OR_RAISE(out_->child_data[0],
-                          CreateChild(type, 0, length_ * type.list_size()));
-    return Status::OK();
+    if (presizing_zero_buffer_) {
+      ZeroBufferMustBeAtLeast(
+          GetZeroBufferLength(type.value_field(), type.list_size() * length_));
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap()};
+    return CreateChild(0, type.list_size() * length_);
   }
 
   Status Visit(const StructType& type) {
-    for (int i = 0; i < type_->num_fields(); ++i) {
-      ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(type, i, 
length_));
+    if (presizing_zero_buffer_) {
+      for (const auto& child : type.fields()) {
+        ZeroBufferMustBeAtLeast(GetZeroBufferLength(child, length_));
+      }
+      return Status::OK();
+    }
+    out_->buffers = {GetValidityBitmap()};
+    for (int i = 0; i < type.num_fields(); ++i) {
+      RETURN_NOT_OK(CreateChild(i, length_));
     }
     return Status::OK();
   }
 
+  static Result<int8_t> GetIdOfFirstNullableUnionMember(const UnionType& type) 
{
+    for (auto [field, id] : Zip(type.fields(), type.type_codes())) {
+      if (field->nullable()) return id;
+    }
+    return Status::Invalid("Cannot produce an array of null ", type,
+                           " because no child field is nullable");
+  }
+
   Status Visit(const UnionType& type) {
-    out_->buffers.resize(2);
+    // For sparse unions, we create children with the same length as the 
parent.
+    //
+    // For dense unions, we create children with length 1 and have offsets 
which always
+    // refer to the first first slot from one child.
+    int64_t child_length = type.mode() == UnionMode::SPARSE ? length_ : 1;
+
+    if (presizing_zero_buffer_) {
+      // type codes
+      ZeroBufferMustBeAtLeast(length_);
 
-    // First buffer is always null
-    out_->buffers[0] = nullptr;
+      if (type.mode() == UnionMode::DENSE) {
+        // offsets
+        ZeroBufferMustBeAtLeast(sizeof(int32_t) * length_);
+      }
 
-    out_->buffers[1] = buffer_;
-    // buffer_ is zeroed, but 0 may not be a valid type code
-    if (type.type_codes()[0] != 0) {
-      ARROW_ASSIGN_OR_RAISE(out_->buffers[1], AllocateBuffer(length_, pool_));
-      std::memset(out_->buffers[1]->mutable_data(), type.type_codes()[0], 
length_);
+      for (const auto& child : type.fields()) {
+        ZeroBufferMustBeAtLeast(GetZeroBufferLength(child, child_length));
+      }
+      return Status::OK();
     }
 
-    // For sparse unions, we now create children with the same length as the
-    // parent
-    int64_t child_length = length_;
-    if (type.mode() == UnionMode::DENSE) {
-      // For dense unions, we set the offsets to all zero and create children
-      // with length 1
-      out_->buffers.resize(3);
-      out_->buffers[2] = buffer_;
+    // The validity bitmap is always absent for unions
+    out_->buffers = {nullptr};
+
+    // Next is the type ids buffer. We may not be able to use zero_buffer_
+    // for this since 0 may not be a valid type id.
+    ARROW_ASSIGN_OR_RAISE(int8_t first_nullable_id,
+                          GetIdOfFirstNullableUnionMember(type));
+    if (first_nullable_id == 0) {
+      out_->buffers.push_back(*zero_buffer_);
+    } else {
+      ARROW_ASSIGN_OR_RAISE(auto type_ids, AllocateBuffer(length_, pool_));
+      std::memset(type_ids->mutable_data(), first_nullable_id, length_);
+      out_->buffers.push_back(std::move(type_ids));
+    }
 
-      child_length = 1;
+    if (type.mode() == UnionMode::DENSE) {
+      out_->buffers.push_back(*zero_buffer_);
     }
-    for (int i = 0; i < type_->num_fields(); ++i) {
-      ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(type, i, 
child_length));
+
+    for (int i = 0; i < type.num_fields(); ++i) {
+      RETURN_NOT_OK(CreateChild(i, child_length));
     }
     return Status::OK();
   }
 
   Status Visit(const DictionaryType& type) {
-    out_->buffers.resize(2, buffer_);
-    ARROW_ASSIGN_OR_RAISE(auto typed_null_dict, 
MakeArrayOfNull(type.value_type(), 0));
-    out_->dictionary = typed_null_dict->data();
+    // The dictionary's indices are non-nullable; we can still create an array
+    // by creating a dictionary which contains a single null.

Review Comment:
   ```suggestion
       // by creating a dictionary which contains a single null.
       // All indices will be zero regardless.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to