westonpace commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r850787225


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These 
containers
+/// makes compromises in terms of strong ownership and the range of data types 
supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) 
buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per 
value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes 
per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The 
lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const 
uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The 
lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* 
buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at 
buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {

Review Comment:
   Can we add constants like:
   
   ```
   KeyColumnArray::kValidityBuffer
   KeyColumnArray::kFixedSizeBuffer
   KeyColumnArray::kVariableLengthBuffer
   ```
   
   It might help for readability.  Or we could split this into three accessors.



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These 
containers
+/// makes compromises in terms of strong ownership and the range of data types 
supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) 
buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per 
value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes 
per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The 
lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const 
uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The 
lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* 
buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at 
buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));
+  }
+  /// \brief Return a read-only version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  const uint32_t* offsets() const {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(data(1));
+  }
+  /// \brief Return the type metadata
+  const KeyColumnMetadata& metadata() const { return metadata_; }
+  /// \brief Return the length (in rows) of the array
+  int64_t length() const { return length_; }
+  /// \brief Return the bit offset into the corresponding vector
+  ///
+  /// if i == 1 then this must be a bool array
+  int bit_offset(int i) const {
+    ARROW_DCHECK(i >= 0 && i < max_buffers_);
+    return bit_offset_[i];
+  }
+
+ private:
+  static constexpr int max_buffers_ = 3;
+  const uint8_t* buffers_[max_buffers_];
+  uint8_t* mutable_buffers_[max_buffers_];
+  KeyColumnMetadata metadata_;
+  int64_t length_;
+  // Starting bit offset within the first byte (between 0 and 7)
+  // to be used when accessing buffers that store bit vectors.
+  int bit_offset_[max_buffers_ - 1];
+};
+
+/// \brief Create KeyColumnMetadata from a DataType
+///
+/// If `type` is a dictionary type then this will return the KeyColumnMetadata 
for
+/// the indices type
+///
+/// The caller should ensure this is only called on "key" columns.  Calling 
this with
+/// a non-key column will return a meaningless value (or abort on a debug 
build)
+KeyColumnMetadata ColumnMetadataFromDataType(const std::shared_ptr<DataType>& 
type);
+
+/// \brief Create KeyColumnArray from ArrayData
+///
+/// If `type` is a dictionary type then this will return the KeyColumnArray for
+/// the indices array
+///
+/// The caller should ensure this is only called on "key" columns.
+/// \see ColumnMetadataFromDataType for details
+KeyColumnArray ColumnArrayFromArrayData(const std::shared_ptr<ArrayData>& 
array_data,
+                                        int start_row, int num_rows);
+
+/// \brief Create KeyColumnMetadata instances from an ExecBatch
+///
+/// column_metadatas will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array 
shape
+/// \see ColumnMetadataFromDataType for more details
+void ColumnMetadatasFromExecBatch(const ExecBatch& batch,
+                                  std::vector<KeyColumnMetadata>* 
column_metadatas);
+
+/// \brief Create KeyColumnArray instances from a slice of an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array 
shape
+/// \see ColumnArrayFromArrayData for more details
+void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int 
num_rows,
+                               std::vector<KeyColumnArray>* column_arrays);
+
+/// \brief Create KeyColumnArray instances from an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array 
shape
+/// \see ColumnArrayFromArrayData for more details
+void ColumnArraysFromExecBatch(const ExecBatch& batch,
+                               std::vector<KeyColumnArray>* column_arrays);
+
+/// A lightweight resizable array for "key" columns
+///
+/// Unlike KeyColumnArray this instance owns its buffers
+///
+/// Resizing is handled by arrow::ResizableBuffer and a doubling approach is
+/// used so that resizes will always grow up to the next power of 2
+class ResizableArrayData {
+ public:
+  /// \brief Create an uninitialized instance
+  ///
+  /// Init must be called before calling any other operations
+  ResizableArrayData()
+      : log_num_rows_min_(0),
+        pool_(NULLPTR),
+        num_rows_(0),
+        num_rows_allocated_(0),
+        var_len_buf_size_(0) {}
+  ~ResizableArrayData() { Clear(true); }
+  /// \brief Initialize the array
+  /// \param data_type The data type this array is holding data for.
+  /// \param pool The pool to make allocations on
+  /// \param log_num_rows_min All resize operations will allocate at least 
enough
+  ///                         space for (1 << log_num_rows_min) rows
+  void Init(const std::shared_ptr<DataType>& data_type, MemoryPool* pool,
+            int log_num_rows_min);
+  /// \brief Resets the array back to an empty state
+  /// \param release_buffers If true then allocated memory is released and the
+  ///                        next resize operation will have to reallocate 
memory
+  void Clear(bool release_buffers);
+  /// \brief Resize the fixed length buffers
+  ///
+  /// The buffers will be resized to hold at least `num_rows_new` rows of data
+  Status ResizeFixedLengthBuffers(int num_rows_new);
+  /// \brief Resize the varying length buffer if this array is a variable 
binary type
+  ///
+  /// This must be called after offsets have been populated and the buffer 
will be
+  /// resized to hold at least as much data as the offsets require
+  ///
+  /// Does nothing if the array is not a variable binary type
+  Status ResizeVaryingLengthBuffer();
+  /// \brief The current length (in rows) of the array
+  int num_rows() const { return num_rows_; }
+  /// \brief A non-owning view into this array
+  KeyColumnArray column_array() const;
+  /// \brief A lightweight descriptor of the data held by this array
+  KeyColumnMetadata column_metadata() const {
+    return ColumnMetadataFromDataType(data_type_);
+  }
+  /// \brief Convert the data to an arrow::ArrayData
+  ///
+  /// This is a zero copy operation and the created ArrayData will reference 
the
+  /// buffers held by this instance.
+  std::shared_ptr<ArrayData> array_data() const;
+  /// \brief A raw pointer to the requested buffer
+  ///
+  /// If i is 0 then this returns the validity buffer
+  /// If i is 1 then this returns the buffer used for values (if this is a 
fixed
+  ///           length data type) or offsets (if this is a variable binary 
type)
+  /// If i is 2 then this returns the buffer used for variable length binary 
data
+  uint8_t* mutable_data(int i) {
+    return i == 0   ? non_null_buf_->mutable_data()
+           : i == 1 ? fixed_len_buf_->mutable_data()
+                    : var_len_buf_->mutable_data();
+  }
+
+ private:
+  static constexpr int64_t kNumPaddingBytes = 64;
+  int log_num_rows_min_;
+  std::shared_ptr<DataType> data_type_;
+  MemoryPool* pool_;
+  int num_rows_;
+  int num_rows_allocated_;
+  int var_len_buf_size_;
+  std::shared_ptr<ResizableBuffer> non_null_buf_;
+  std::shared_ptr<ResizableBuffer> fixed_len_buf_;
+  std::shared_ptr<ResizableBuffer> var_len_buf_;
+};
+
+/// \brief A builder to concatenate batches of data into a larger batch
+///
+/// Will only store num_rows_max() rows
+class ExecBatchBuilder {
+ public:
+  /// \brief Add rows from `source` into `target` column
+  ///
+  /// If `target` is uninitialized or cleared it will be initialized to use
+  /// the given pool.
+  static Status AppendSelected(const std::shared_ptr<ArrayData>& source,
+                               ResizableArrayData* target, int 
num_rows_to_append,
+                               const uint16_t* row_ids, MemoryPool* pool);
+
+  /// \brief Add nulls into `target` column
+  ///
+  /// If `target` is uninitialized or cleared it will be initialized to use
+  /// the given pool.
+  static Status AppendNulls(const std::shared_ptr<DataType>& type,
+                            ResizableArrayData& target, int num_rows_to_append,
+                            MemoryPool* pool);
+
+  /// \brief Add selected rows from `batch`
+  ///
+  /// If `col_ids` is null then `num_cols` should less than batch.num_values() 
and
+  /// the first `num_cols` columns of batch will be appended.
+  ///
+  /// All columns in `batch` must have array shape
+  Status AppendSelected(MemoryPool* pool, const ExecBatch& batch, int 
num_rows_to_append,
+                        const uint16_t* row_ids, int num_cols,
+                        const int* col_ids = NULLPTR);

Review Comment:
   It's a little confusing that we have overloads that store `num_appended` and 
overloads that do not store `num_appended`.  The former are also able to exceed 
`num_rows_max()` I think.  What is the intended use case for the overloads that 
don't store `num_appended`?



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These 
containers
+/// makes compromises in terms of strong ownership and the range of data types 
supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) 
buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per 
value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes 
per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The 
lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const 
uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The 
lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* 
buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at 
buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));
+  }
+  /// \brief Return a read-only version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  const uint32_t* offsets() const {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(data(1));
+  }
+  /// \brief Return the type metadata
+  const KeyColumnMetadata& metadata() const { return metadata_; }
+  /// \brief Return the length (in rows) of the array
+  int64_t length() const { return length_; }
+  /// \brief Return the bit offset into the corresponding vector
+  ///
+  /// if i == 1 then this must be a bool array
+  int bit_offset(int i) const {
+    ARROW_DCHECK(i >= 0 && i < max_buffers_);
+    return bit_offset_[i];
+  }
+
+ private:
+  static constexpr int max_buffers_ = 3;
+  const uint8_t* buffers_[max_buffers_];
+  uint8_t* mutable_buffers_[max_buffers_];
+  KeyColumnMetadata metadata_;
+  int64_t length_;
+  // Starting bit offset within the first byte (between 0 and 7)
+  // to be used when accessing buffers that store bit vectors.
+  int bit_offset_[max_buffers_ - 1];
+};
+
+/// \brief Create KeyColumnMetadata from a DataType
+///
+/// If `type` is a dictionary type then this will return the KeyColumnMetadata 
for
+/// the indices type
+///
+/// The caller should ensure this is only called on "key" columns.  Calling 
this with
+/// a non-key column will return a meaningless value (or abort on a debug 
build)
+KeyColumnMetadata ColumnMetadataFromDataType(const std::shared_ptr<DataType>& 
type);
+
+/// \brief Create KeyColumnArray from ArrayData
+///
+/// If `type` is a dictionary type then this will return the KeyColumnArray for
+/// the indices array
+///
+/// The caller should ensure this is only called on "key" columns.
+/// \see ColumnMetadataFromDataType for details
+KeyColumnArray ColumnArrayFromArrayData(const std::shared_ptr<ArrayData>& 
array_data,
+                                        int start_row, int num_rows);
+
+/// \brief Create KeyColumnMetadata instances from an ExecBatch
+///
+/// column_metadatas will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array 
shape
+/// \see ColumnMetadataFromDataType for more details
+void ColumnMetadatasFromExecBatch(const ExecBatch& batch,
+                                  std::vector<KeyColumnMetadata>* 
column_metadatas);
+
+/// \brief Create KeyColumnArray instances from a slice of an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array 
shape
+/// \see ColumnArrayFromArrayData for more details
+void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int 
num_rows,
+                               std::vector<KeyColumnArray>* column_arrays);
+
+/// \brief Create KeyColumnArray instances from an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array 
shape
+/// \see ColumnArrayFromArrayData for more details
+void ColumnArraysFromExecBatch(const ExecBatch& batch,
+                               std::vector<KeyColumnArray>* column_arrays);
+
+/// A lightweight resizable array for "key" columns
+///
+/// Unlike KeyColumnArray this instance owns its buffers
+///
+/// Resizing is handled by arrow::ResizableBuffer and a doubling approach is
+/// used so that resizes will always grow up to the next power of 2
+class ResizableArrayData {
+ public:
+  /// \brief Create an uninitialized instance
+  ///
+  /// Init must be called before calling any other operations
+  ResizableArrayData()
+      : log_num_rows_min_(0),
+        pool_(NULLPTR),
+        num_rows_(0),
+        num_rows_allocated_(0),
+        var_len_buf_size_(0) {}
+  ~ResizableArrayData() { Clear(true); }
+  /// \brief Initialize the array
+  /// \param data_type The data type this array is holding data for.
+  /// \param pool The pool to make allocations on
+  /// \param log_num_rows_min All resize operations will allocate at least 
enough
+  ///                         space for (1 << log_num_rows_min) rows
+  void Init(const std::shared_ptr<DataType>& data_type, MemoryPool* pool,
+            int log_num_rows_min);
+  /// \brief Resets the array back to an empty state
+  /// \param release_buffers If true then allocated memory is released and the
+  ///                        next resize operation will have to reallocate 
memory
+  void Clear(bool release_buffers);
+  /// \brief Resize the fixed length buffers
+  ///
+  /// The buffers will be resized to hold at least `num_rows_new` rows of data
+  Status ResizeFixedLengthBuffers(int num_rows_new);
+  /// \brief Resize the varying length buffer if this array is a variable 
binary type
+  ///
+  /// This must be called after offsets have been populated and the buffer 
will be
+  /// resized to hold at least as much data as the offsets require
+  ///
+  /// Does nothing if the array is not a variable binary type
+  Status ResizeVaryingLengthBuffer();
+  /// \brief The current length (in rows) of the array
+  int num_rows() const { return num_rows_; }
+  /// \brief A non-owning view into this array
+  KeyColumnArray column_array() const;
+  /// \brief A lightweight descriptor of the data held by this array
+  KeyColumnMetadata column_metadata() const {
+    return ColumnMetadataFromDataType(data_type_);
+  }
+  /// \brief Convert the data to an arrow::ArrayData
+  ///
+  /// This is a zero copy operation and the created ArrayData will reference 
the
+  /// buffers held by this instance.
+  std::shared_ptr<ArrayData> array_data() const;
+  /// \brief A raw pointer to the requested buffer
+  ///
+  /// If i is 0 then this returns the validity buffer
+  /// If i is 1 then this returns the buffer used for values (if this is a 
fixed
+  ///           length data type) or offsets (if this is a variable binary 
type)
+  /// If i is 2 then this returns the buffer used for variable length binary 
data
+  uint8_t* mutable_data(int i) {
+    return i == 0   ? non_null_buf_->mutable_data()
+           : i == 1 ? fixed_len_buf_->mutable_data()
+                    : var_len_buf_->mutable_data();
+  }
+
+ private:
+  static constexpr int64_t kNumPaddingBytes = 64;
+  int log_num_rows_min_;
+  std::shared_ptr<DataType> data_type_;
+  MemoryPool* pool_;
+  int num_rows_;
+  int num_rows_allocated_;
+  int var_len_buf_size_;
+  std::shared_ptr<ResizableBuffer> non_null_buf_;
+  std::shared_ptr<ResizableBuffer> fixed_len_buf_;
+  std::shared_ptr<ResizableBuffer> var_len_buf_;

Review Comment:
   Here we have three explicitly named buffers and for KeyColumnArray we have a 
fixed-size array of buffers.  We should probably align on a single approach.  
Do you have a preference between the two or a reason for them to be different?



##########
cpp/src/arrow/compute/light_array.cc:
##########
@@ -0,0 +1,736 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <type_traits>
+
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t 
length,
+                               const uint8_t* buffer0, const uint8_t* buffer1,
+                               const uint8_t* buffer2, int bit_offset0, int 
bit_offset1) {
+  static_assert(std::is_pod<KeyColumnArray>::value,
+                "This class was intended to be a POD type");
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = buffer0;
+  buffers_[1] = buffer1;
+  buffers_[2] = buffer2;
+  mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t 
length,
+                               uint8_t* buffer0, uint8_t* buffer1, uint8_t* 
buffer2,
+                               int bit_offset0, int bit_offset1) {
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = mutable_buffers_[0] = buffer0;
+  buffers_[1] = mutable_buffers_[1] = buffer1;
+  buffers_[2] = mutable_buffers_[2] = buffer2;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}
+
+KeyColumnArray KeyColumnArray::WithBufferFrom(const KeyColumnArray& other,
+                                              int buffer_id_to_replace) const {
+  KeyColumnArray copy = *this;
+  copy.mutable_buffers_[buffer_id_to_replace] =
+      other.mutable_buffers_[buffer_id_to_replace];
+  copy.buffers_[buffer_id_to_replace] = other.buffers_[buffer_id_to_replace];
+  if (buffer_id_to_replace < max_buffers_ - 1) {
+    copy.bit_offset_[buffer_id_to_replace] = 
other.bit_offset_[buffer_id_to_replace];
+  }
+  return copy;
+}
+
+KeyColumnArray KeyColumnArray::WithMetadata(const KeyColumnMetadata& metadata) 
const {
+  KeyColumnArray copy = *this;
+  copy.metadata_ = metadata;
+  return copy;
+}
+
+KeyColumnArray KeyColumnArray::Slice(int64_t offset, int64_t length) const {
+  KeyColumnArray sliced;
+  sliced.metadata_ = metadata_;
+  sliced.length_ = length;
+  uint32_t fixed_size =
+      !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length;
+
+  sliced.buffers_[0] =
+      buffers_[0] ? buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr;
+  sliced.mutable_buffers_[0] =
+      mutable_buffers_[0] ? mutable_buffers_[0] + (bit_offset_[0] + offset) / 
8 : nullptr;
+  sliced.bit_offset_[0] = (bit_offset_[0] + offset) % 8;
+
+  if (fixed_size == 0 && !metadata_.is_null_type) {
+    sliced.buffers_[1] =
+        buffers_[1] ? buffers_[1] + (bit_offset_[1] + offset) / 8 : nullptr;
+    sliced.mutable_buffers_[1] = mutable_buffers_[1]
+                                     ? mutable_buffers_[1] + (bit_offset_[1] + 
offset) / 8
+                                     : nullptr;
+    sliced.bit_offset_[1] = (bit_offset_[1] + offset) % 8;
+  } else {
+    sliced.buffers_[1] = buffers_[1] ? buffers_[1] + offset * fixed_size : 
nullptr;
+    sliced.mutable_buffers_[1] =
+        mutable_buffers_[1] ? mutable_buffers_[1] + offset * fixed_size : 
nullptr;
+    sliced.bit_offset_[1] = 0;
+  }
+
+  sliced.buffers_[2] = buffers_[2];
+  sliced.mutable_buffers_[2] = mutable_buffers_[2];
+  return sliced;
+}
+
+KeyColumnMetadata ColumnMetadataFromDataType(const std::shared_ptr<DataType>& 
type) {
+  if (type->id() == Type::DICTIONARY) {
+    auto bit_width =
+        arrow::internal::checked_cast<const 
FixedWidthType&>(*type).bit_width();
+    ARROW_DCHECK(bit_width % 8 == 0);
+    return KeyColumnMetadata(true, bit_width / 8);
+  }
+  if (type->id() == Type::BOOL) {
+    return KeyColumnMetadata(true, 0);
+  }
+  if (is_fixed_width(type->id())) {
+    return KeyColumnMetadata(
+        true,
+        arrow::internal::checked_cast<const 
FixedWidthType&>(*type).bit_width() / 8);
+  }
+  if (is_binary_like(type->id())) {
+    return KeyColumnMetadata(false, sizeof(uint32_t));
+  }
+  if (is_large_binary_like(type->id())) {
+    return KeyColumnMetadata(false, sizeof(uint64_t));
+  }
+  if (type->id() == Type::NA) {
+    return KeyColumnMetadata(true, 0, true);
+  }
+  // Should not reach this point, caller attempted to create a KeyColumnArray 
from an
+  // invalid type
+  ARROW_DCHECK(false);
+  return KeyColumnMetadata(true, sizeof(int));
+}
+
+KeyColumnArray ColumnArrayFromArrayData(const std::shared_ptr<ArrayData>& 
array_data,
+                                        int start_row, int num_rows) {
+  KeyColumnArray column_array = KeyColumnArray(
+      ColumnMetadataFromDataType(array_data->type),
+      array_data->offset + start_row + num_rows,
+      array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : 
nullptr,
+      array_data->buffers[1]->data(),
+      (array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR)
+          ? array_data->buffers[2]->data()
+          : nullptr);
+  return column_array.Slice(array_data->offset + start_row, num_rows);
+}
+
+void ColumnMetadatasFromExecBatch(const ExecBatch& batch,
+                                  std::vector<KeyColumnMetadata>* 
column_metadatas) {
+  int num_columns = static_cast<int>(batch.values.size());
+  column_metadatas->resize(num_columns);
+  for (int i = 0; i < num_columns; ++i) {
+    const Datum& data = batch.values[i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    (*column_metadatas)[i] = ColumnMetadataFromDataType(array_data->type);
+  }
+}
+
+void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int 
num_rows,
+                               std::vector<KeyColumnArray>* column_arrays) {
+  int num_columns = static_cast<int>(batch.values.size());
+  column_arrays->resize(num_columns);
+  for (int i = 0; i < num_columns; ++i) {
+    const Datum& data = batch.values[i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    (*column_arrays)[i] = ColumnArrayFromArrayData(array_data, start_row, 
num_rows);
+  }
+}
+
+void ColumnArraysFromExecBatch(const ExecBatch& batch,
+                               std::vector<KeyColumnArray>* column_arrays) {
+  ColumnArraysFromExecBatch(batch, 0, static_cast<int>(batch.length), 
column_arrays);
+}
+
+void ResizableArrayData::Init(const std::shared_ptr<DataType>& data_type,
+                              MemoryPool* pool, int log_num_rows_min) {
+#ifndef NDEBUG
+  if (num_rows_allocated_ > 0) {
+    ARROW_DCHECK(data_type_ != NULLPTR);
+    KeyColumnMetadata metadata_before = ColumnMetadataFromDataType(data_type_);
+    KeyColumnMetadata metadata_after = ColumnMetadataFromDataType(data_type);
+    ARROW_DCHECK(metadata_before.is_fixed_length == 
metadata_after.is_fixed_length &&
+                 metadata_before.fixed_length == metadata_after.fixed_length);
+  }
+#endif
+  Clear(/*release_buffers=*/false);
+  log_num_rows_min_ = log_num_rows_min;
+  data_type_ = data_type;
+  pool_ = pool;
+}
+
+void ResizableArrayData::Clear(bool release_buffers) {
+  num_rows_ = 0;
+  if (release_buffers) {
+    non_null_buf_.reset();
+    fixed_len_buf_.reset();
+    var_len_buf_.reset();
+    num_rows_allocated_ = 0;
+    var_len_buf_size_ = 0;
+  }
+}
+
+Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) {
+  ARROW_DCHECK(num_rows_new >= 0);
+  if (num_rows_new <= num_rows_allocated_) {
+    num_rows_ = num_rows_new;
+    return Status::OK();
+  }
+
+  int num_rows_allocated_new = 1 << log_num_rows_min_;
+  while (num_rows_allocated_new < num_rows_new) {
+    num_rows_allocated_new *= 2;
+  }
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(data_type_);
+
+  if (fixed_len_buf_ == NULLPTR) {
+    ARROW_DCHECK(non_null_buf_ == NULLPTR && var_len_buf_ == NULLPTR);
+
+    ARROW_ASSIGN_OR_RAISE(
+        non_null_buf_,
+        AllocateResizableBuffer(
+            bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, 
pool_));
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        ARROW_ASSIGN_OR_RAISE(
+            fixed_len_buf_,
+            AllocateResizableBuffer(
+                bit_util::BytesForBits(num_rows_allocated_new) + 
kNumPaddingBytes,
+                pool_));
+      } else {
+        ARROW_ASSIGN_OR_RAISE(
+            fixed_len_buf_,
+            AllocateResizableBuffer(
+                num_rows_allocated_new * column_metadata.fixed_length + 
kNumPaddingBytes,
+                pool_));
+      }
+    } else {
+      ARROW_ASSIGN_OR_RAISE(
+          fixed_len_buf_,
+          AllocateResizableBuffer(
+              (num_rows_allocated_new + 1) * sizeof(uint32_t) + 
kNumPaddingBytes, pool_));
+    }
+
+    ARROW_ASSIGN_OR_RAISE(var_len_buf_, AllocateResizableBuffer(
+                                            sizeof(uint64_t) + 
kNumPaddingBytes, pool_));
+
+    var_len_buf_size_ = sizeof(uint64_t);
+  } else {
+    ARROW_DCHECK(non_null_buf_ != NULLPTR && var_len_buf_ != NULLPTR);
+
+    
RETURN_NOT_OK(non_null_buf_->Resize(bit_util::BytesForBits(num_rows_allocated_new)
 +
+                                        kNumPaddingBytes));
+
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        RETURN_NOT_OK(fixed_len_buf_->Resize(
+            bit_util::BytesForBits(num_rows_allocated_new) + 
kNumPaddingBytes));
+      } else {
+        RETURN_NOT_OK(fixed_len_buf_->Resize(
+            num_rows_allocated_new * column_metadata.fixed_length + 
kNumPaddingBytes));
+      }
+    } else {
+      RETURN_NOT_OK(fixed_len_buf_->Resize(
+          (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes));
+    }
+  }
+
+  num_rows_allocated_ = num_rows_allocated_new;
+  num_rows_ = num_rows_new;
+
+  return Status::OK();
+}
+
+Status ResizableArrayData::ResizeVaryingLengthBuffer() {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_);
+
+  if (!column_metadata.is_fixed_length) {
+    int min_new_size = static_cast<int>(
+        reinterpret_cast<const uint32_t*>(fixed_len_buf_->data())[num_rows_]);
+    ARROW_DCHECK(var_len_buf_size_ > 0);
+    if (var_len_buf_size_ < min_new_size) {
+      int new_size = var_len_buf_size_;
+      while (new_size < min_new_size) {
+        new_size *= 2;
+      }
+      RETURN_NOT_OK(var_len_buf_->Resize(new_size + kNumPaddingBytes));
+      var_len_buf_size_ = new_size;
+    }
+  }
+
+  return Status::OK();
+}
+
+KeyColumnArray ResizableArrayData::column_array() const {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_);
+  return KeyColumnArray(column_metadata, num_rows_, 
non_null_buf_->mutable_data(),
+                        fixed_len_buf_->mutable_data(), 
var_len_buf_->mutable_data());
+}
+
+std::shared_ptr<ArrayData> ResizableArrayData::array_data() const {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_);
+
+  auto valid_count = arrow::internal::CountSetBits(non_null_buf_->data(), 
/*offset=*/0,
+                                                   
static_cast<int64_t>(num_rows_));
+  int null_count = static_cast<int>(num_rows_) - static_cast<int>(valid_count);
+
+  if (column_metadata.is_fixed_length) {
+    return ArrayData::Make(data_type_, num_rows_, {non_null_buf_, 
fixed_len_buf_},
+                           null_count);
+  } else {
+    return ArrayData::Make(data_type_, num_rows_,
+                           {non_null_buf_, fixed_len_buf_, var_len_buf_}, 
null_count);
+  }
+}
+
+int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr<ArrayData>& column,
+                                    int num_rows, const uint16_t* row_ids,
+                                    int num_tail_bytes_to_skip) {
+#ifndef NDEBUG
+  // Ids must be in non-decreasing order
+  //
+  for (int i = 1; i < num_rows; ++i) {
+    ARROW_DCHECK(row_ids[i] >= row_ids[i - 1]);
+  }
+#endif
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(column->type);
+
+  int num_rows_left = num_rows;
+  int num_bytes_skipped = 0;
+  while (num_rows_left > 0 && num_bytes_skipped < num_tail_bytes_to_skip) {
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        num_rows_left = std::max(num_rows_left, 8) - 8;
+        ++num_bytes_skipped;
+      } else {
+        --num_rows_left;
+        num_bytes_skipped += column_metadata.fixed_length;
+      }
+    } else {
+      --num_rows_left;
+      int row_id_removed = row_ids[num_rows_left];
+      const uint32_t* offsets =
+          reinterpret_cast<const uint32_t*>(column->buffers[1]->data());
+      num_bytes_skipped += offsets[row_id_removed + 1] - 
offsets[row_id_removed];
+    }
+  }
+
+  return num_rows - num_rows_left;
+}
+
+template <bool OUTPUT_BYTE_ALIGNED>
+void ExecBatchBuilder::CollectBitsImp(const uint8_t* input_bits,
+                                      int64_t input_bits_offset, uint8_t* 
output_bits,
+                                      int64_t output_bits_offset, int num_rows,
+                                      const uint16_t* row_ids) {
+  if (!OUTPUT_BYTE_ALIGNED) {
+    ARROW_DCHECK(output_bits_offset % 8 > 0);
+    output_bits[output_bits_offset / 8] &=
+        static_cast<uint8_t>((1 << (output_bits_offset % 8)) - 1);
+  } else {
+    ARROW_DCHECK(output_bits_offset % 8 == 0);
+  }
+  constexpr int unroll = 8;
+  for (int i = 0; i < num_rows / unroll; ++i) {
+    const uint16_t* row_ids_base = row_ids + unroll * i;
+    uint8_t result;
+    result = bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[0]) 
? 1 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + 
row_ids_base[1]) ? 2 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + 
row_ids_base[2]) ? 4 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + 
row_ids_base[3]) ? 8 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + 
row_ids_base[4]) ? 16 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + 
row_ids_base[5]) ? 32 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + 
row_ids_base[6]) ? 64 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + 
row_ids_base[7]) ? 128 : 0;
+    if (OUTPUT_BYTE_ALIGNED) {
+      output_bits[output_bits_offset / 8 + i] = result;
+    } else {
+      output_bits[output_bits_offset / 8 + i] |=
+          static_cast<uint8_t>(result << (output_bits_offset % 8));
+      output_bits[output_bits_offset / 8 + i + 1] =
+          static_cast<uint8_t>(result >> (8 - (output_bits_offset % 8)));
+    }
+  }
+  if (num_rows % unroll > 0) {
+    for (int i = num_rows - (num_rows % unroll); i < num_rows; ++i) {
+      bit_util::SetBitTo(output_bits, output_bits_offset + i,
+                         bit_util::GetBit(input_bits, input_bits_offset + 
row_ids[i]));
+    }
+  }
+}
+
+void ExecBatchBuilder::CollectBits(const uint8_t* input_bits, int64_t 
input_bits_offset,
+                                   uint8_t* output_bits, int64_t 
output_bits_offset,
+                                   int num_rows, const uint16_t* row_ids) {
+  if (output_bits_offset % 8 > 0) {
+    CollectBitsImp<false>(input_bits, input_bits_offset, output_bits, 
output_bits_offset,
+                          num_rows, row_ids);
+  } else {
+    CollectBitsImp<true>(input_bits, input_bits_offset, output_bits, 
output_bits_offset,
+                         num_rows, row_ids);
+  }
+}
+
+template <class PROCESS_VALUE_FN>
+void ExecBatchBuilder::Visit(const std::shared_ptr<ArrayData>& column, int 
num_rows,
+                             const uint16_t* row_ids, PROCESS_VALUE_FN 
process_value_fn) {
+  KeyColumnMetadata metadata = ColumnMetadataFromDataType(column->type);
+
+  if (!metadata.is_fixed_length) {
+    const uint8_t* ptr_base = column->buffers[2]->data();
+    const uint32_t* offsets =
+        reinterpret_cast<const uint32_t*>(column->buffers[1]->data()) + 
column->offset;
+    for (int i = 0; i < num_rows; ++i) {
+      uint16_t row_id = row_ids[i];
+      const uint8_t* field_ptr = ptr_base + offsets[row_id];
+      uint32_t field_length = offsets[row_id + 1] - offsets[row_id];
+      process_value_fn(i, field_ptr, field_length);
+    }
+  } else {
+    ARROW_DCHECK(metadata.fixed_length > 0);
+    for (int i = 0; i < num_rows; ++i) {
+      uint16_t row_id = row_ids[i];
+      const uint8_t* field_ptr =
+          column->buffers[1]->data() +
+          (column->offset + row_id) * 
static_cast<int64_t>(metadata.fixed_length);
+      process_value_fn(i, field_ptr, metadata.fixed_length);
+    }
+  }
+}
+
+Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& 
source,
+                                        ResizableArrayData* target,
+                                        int num_rows_to_append, const 
uint16_t* row_ids,
+                                        MemoryPool* pool) {
+  int num_rows_before = target->num_rows();
+  ARROW_DCHECK(num_rows_before >= 0);
+  int num_rows_after = num_rows_before + num_rows_to_append;
+  if (target->num_rows() == 0) {
+    target->Init(source->type, pool, kLogNumRows);
+  }
+  RETURN_NOT_OK(target->ResizeFixedLengthBuffers(num_rows_after));
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(source->type);
+
+  if (column_metadata.is_fixed_length) {
+    // Fixed length column
+    //
+    uint32_t fixed_length = column_metadata.fixed_length;
+    switch (fixed_length) {
+      case 0:
+        CollectBits(source->buffers[1]->data(), source->offset, 
target->mutable_data(1),
+                    num_rows_before, num_rows_to_append, row_ids);
+        break;
+      case 1:
+        Visit(source, num_rows_to_append, row_ids,
+              [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                target->mutable_data(1)[num_rows_before + i] = *ptr;
+              });
+        break;
+      case 2:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              
reinterpret_cast<uint16_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint16_t*>(ptr);
+            });
+        break;
+      case 4:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              
reinterpret_cast<uint32_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint32_t*>(ptr);
+            });
+        break;
+      case 8:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              
reinterpret_cast<uint64_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint64_t*>(ptr);
+            });
+        break;
+      default: {
+        int num_rows_to_process =
+            num_rows_to_append -
+            NumRowsToSkip(source, num_rows_to_append, row_ids, 
sizeof(uint64_t));
+        Visit(source, num_rows_to_process, row_ids,
+              [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                uint64_t* dst = reinterpret_cast<uint64_t*>(
+                    target->mutable_data(1) +
+                    static_cast<int64_t>(num_bytes) * (num_rows_before + i));
+                const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+                for (uint32_t word_id = 0;
+                     word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t));
+                     ++word_id) {
+                  util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src 
+ word_id));
+                }
+              });
+        if (num_rows_to_append > num_rows_to_process) {
+          Visit(source, num_rows_to_append - num_rows_to_process,
+                row_ids + num_rows_to_process,
+                [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                  uint64_t* dst = reinterpret_cast<uint64_t*>(
+                      target->mutable_data(1) +
+                      static_cast<int64_t>(num_bytes) *
+                          (num_rows_before + num_rows_to_process + i));
+                  const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+                  memcpy(dst, src, num_bytes);
+                });
+        }
+      }
+    }
+  } else {
+    // Varying length column
+    //
+
+    // Step 1: calculate target offsets
+    //
+    uint32_t* offsets = reinterpret_cast<uint32_t*>(target->mutable_data(1));
+    uint32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
+    Visit(source, num_rows_to_append, row_ids,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            offsets[num_rows_before + i] = num_bytes;
+          });
+    for (int i = 0; i < num_rows_to_append; ++i) {
+      uint32_t length = offsets[num_rows_before + i];
+      offsets[num_rows_before + i] = sum;
+      sum += length;
+    }
+    offsets[num_rows_before + num_rows_to_append] = sum;
+
+    // Step 2: resize output buffers
+    //
+    RETURN_NOT_OK(target->ResizeVaryingLengthBuffer());
+
+    // Step 3: copy varying-length data
+    //
+    int num_rows_to_process =
+        num_rows_to_append -
+        NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
+    Visit(source, num_rows_to_process, row_ids,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            uint64_t* dst = 
reinterpret_cast<uint64_t*>(target->mutable_data(2) +
+                                                        
offsets[num_rows_before + i]);
+            const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+            for (uint32_t word_id = 0;
+                 word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t)); 
++word_id) {
+              util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + 
word_id));
+            }
+          });
+    Visit(source, num_rows_to_append - num_rows_to_process, row_ids + 
num_rows_to_process,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            uint64_t* dst = reinterpret_cast<uint64_t*>(
+                target->mutable_data(2) +
+                offsets[num_rows_before + num_rows_to_process + i]);
+            const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+            memcpy(dst, src, num_bytes);
+          });
+  }
+
+  // Process nulls
+  //
+  if (source->buffers[0] == NULLPTR) {
+    uint8_t* dst = target->mutable_data(0);
+    dst[num_rows_before / 8] |= static_cast<uint8_t>(~0ULL << (num_rows_before 
& 7));
+    for (int i = num_rows_before / 8 + 1;
+         i < bit_util::BytesForBits(num_rows_before + num_rows_to_append); 
++i) {
+      dst[i] = 0xff;
+    }
+  } else {
+    CollectBits(source->buffers[0]->data(), source->offset, 
target->mutable_data(0),
+                num_rows_before, num_rows_to_append, row_ids);
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(const std::shared_ptr<DataType>& type,
+                                     ResizableArrayData& target, int 
num_rows_to_append,
+                                     MemoryPool* pool) {
+  int num_rows_before = target.num_rows();
+  int num_rows_after = num_rows_before + num_rows_to_append;
+  if (target.num_rows() == 0) {
+    target.Init(type, pool, kLogNumRows);
+  }
+  RETURN_NOT_OK(target.ResizeFixedLengthBuffers(num_rows_after));
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(type);
+
+  // Process fixed length buffer
+  //
+  if (column_metadata.is_fixed_length) {
+    uint8_t* dst = target.mutable_data(1);
+    if (column_metadata.fixed_length == 0) {
+      dst[num_rows_before / 8] &= static_cast<uint8_t>((1 << (num_rows_before 
% 8)) - 1);
+      int64_t offset_begin = num_rows_before / 8 + 1;
+      int64_t offset_end = bit_util::BytesForBits(num_rows_after);
+      if (offset_end > offset_begin) {
+        memset(dst + offset_begin, 0, offset_end - offset_begin);
+      }
+    } else {
+      memset(dst + num_rows_before * 
static_cast<int64_t>(column_metadata.fixed_length),
+             0, static_cast<int64_t>(column_metadata.fixed_length) * 
num_rows_to_append);
+    }
+  } else {
+    uint32_t* dst = reinterpret_cast<uint32_t*>(target.mutable_data(1));
+    uint32_t sum = num_rows_before == 0 ? 0 : dst[num_rows_before];
+    for (int64_t i = num_rows_before; i <= num_rows_after; ++i) {
+      dst[i] = sum;
+    }
+  }
+
+  // Process nulls
+  //
+  uint8_t* dst = target.mutable_data(0);
+  dst[num_rows_before / 8] &= static_cast<uint8_t>((1 << (num_rows_before % 
8)) - 1);
+  int64_t offset_begin = num_rows_before / 8 + 1;
+  int64_t offset_end = bit_util::BytesForBits(num_rows_after);
+  if (offset_end > offset_begin) {
+    memset(dst + offset_begin, 0, offset_end - offset_begin);
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& 
batch,
+                                        int num_rows_to_append, const 
uint16_t* row_ids,
+                                        int num_cols, const int* col_ids) {
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+  // If this is the first time we append rows, then initialize output buffers.
+  //
+  if (values_.empty()) {
+    values_.resize(num_cols);
+    for (int i = 0; i < num_cols; ++i) {
+      const Datum& data = batch.values[col_ids ? col_ids[i] : i];
+      ARROW_DCHECK(data.is_array());
+      const std::shared_ptr<ArrayData>& array_data = data.array();
+      values_[i].Init(array_data->type, pool, kLogNumRows);
+    }
+  }
+
+  for (size_t i = 0; i < values_.size(); ++i) {
+    const Datum& data = batch.values[col_ids ? col_ids[i] : i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    RETURN_NOT_OK(
+        AppendSelected(array_data, &values_[i], num_rows_to_append, row_ids, 
pool));
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& 
batch,
+                                        int num_rows_to_append, const 
uint16_t* row_ids,
+                                        int* num_appended, int num_cols,
+                                        const int* col_ids) {
+  *num_appended = 0;
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+  int num_rows_max = 1 << kLogNumRows;
+  int num_rows_present = num_rows();
+  if (num_rows_present >= num_rows_max) {
+    return Status::OK();
+  }
+  int num_rows_available = num_rows_max - num_rows_present;
+  int num_rows_next = std::min(num_rows_available, num_rows_to_append);
+  RETURN_NOT_OK(AppendSelected(pool, batch, num_rows_next, row_ids, num_cols, 
col_ids));
+  *num_appended = num_rows_next;
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(MemoryPool* pool,
+                                     const 
std::vector<std::shared_ptr<DataType>>& types,
+                                     int num_rows_to_append) {
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+
+  // If this is the first time we append rows, then initialize output buffers.
+  //
+  if (values_.empty()) {
+    values_.resize(types.size());
+    for (size_t i = 0; i < types.size(); ++i) {
+      values_[i].Init(types[i], pool, kLogNumRows);
+    }
+  }
+
+  for (size_t i = 0; i < values_.size(); ++i) {
+    RETURN_NOT_OK(AppendNulls(types[i], values_[i], num_rows_to_append, pool));
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(MemoryPool* pool,
+                                     const 
std::vector<std::shared_ptr<DataType>>& types,
+                                     int num_rows_to_append, int* 
num_appended) {
+  *num_appended = 0;
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+  int num_rows_max = 1 << kLogNumRows;
+  int num_rows_present = num_rows();
+  if (num_rows_present >= num_rows_max) {
+    return Status::OK();
+  }
+  int num_rows_available = num_rows_max - num_rows_present;
+  int num_rows_next = std::min(num_rows_available, num_rows_to_append);
+  RETURN_NOT_OK(AppendNulls(pool, types, num_rows_next));
+  *num_appended = num_rows_next;
+  return Status::OK();
+}
+
+ExecBatch ExecBatchBuilder::Flush() {
+  ARROW_DCHECK(num_rows() > 0);

Review Comment:
   This feels like it might be a difficult assertion for the caller to 
maintain.  What is the downside of returning an empty `ExecBatch` in this 
situation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to