This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 44d3f763c0 GH-43758: [C++] Compute: More comment in RowEncoder (#43763)
44d3f763c0 is described below
commit 44d3f763c083280d2480a735a9ab45243af48232
Author: mwish <[email protected]>
AuthorDate: Mon Sep 2 23:06:10 2024 +0800
GH-43758: [C++] Compute: More comment in RowEncoder (#43763)
### Rationale for this change
Some comments for RowEncoder
### What changes are included in this PR?
Some comments for RowEncoder
### Are these changes tested?
Covered by existing
### Are there any user-facing changes?
no
* GitHub Issue: #43758
Lead-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Rossi Sun <[email protected]>
Signed-off-by: mwish <[email protected]>
---
cpp/src/arrow/compute/light_array_internal.h | 6 +-
cpp/src/arrow/compute/row/row_encoder_internal.cc | 56 ++++----
cpp/src/arrow/compute/row/row_encoder_internal.h | 154 ++++++++++++++++++----
cpp/src/arrow/compute/row/row_internal.h | 2 +-
4 files changed, 155 insertions(+), 63 deletions(-)
diff --git a/cpp/src/arrow/compute/light_array_internal.h
b/cpp/src/arrow/compute/light_array_internal.h
index b8e48f096b..5adb06e540 100644
--- a/cpp/src/arrow/compute/light_array_internal.h
+++ b/cpp/src/arrow/compute/light_array_internal.h
@@ -65,12 +65,12 @@ struct ARROW_EXPORT KeyColumnMetadata {
/// If this is true the column will have a validity buffer and
/// a data buffer and the third buffer will be unused.
bool is_fixed_length;
- /// \brief True if this column is the null type
+ /// \brief True if this column is the null type(NA).
bool is_null_type;
/// \brief The number of bytes for each item
///
/// Zero has a special meaning, indicating a bit vector with one bit per
value if it
- /// isn't a null type column.
+ /// isn't a null type column. Generally, this means that the column is a
boolean type.
///
/// For a varying-length binary column this represents the number of bytes
per offset.
uint32_t fixed_length;
@@ -405,7 +405,7 @@ class ARROW_EXPORT ExecBatchBuilder {
int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); }
- static int num_rows_max() { return 1 << kLogNumRows; }
+ static constexpr int num_rows_max() { return 1 << kLogNumRows; }
private:
static constexpr int kLogNumRows = 15;
diff --git a/cpp/src/arrow/compute/row/row_encoder_internal.cc
b/cpp/src/arrow/compute/row/row_encoder_internal.cc
index 414cc6793a..0965e4e8f9 100644
--- a/cpp/src/arrow/compute/row/row_encoder_internal.cc
+++ b/cpp/src/arrow/compute/row/row_encoder_internal.cc
@@ -145,41 +145,37 @@ void FixedWidthKeyEncoder::AddLengthNull(int32_t* length)
{
Status FixedWidthKeyEncoder::Encode(const ExecValue& data, int64_t
batch_length,
uint8_t** encoded_bytes) {
+ auto handle_next_valid_value = [&](std::string_view bytes) {
+ auto& encoded_ptr = *encoded_bytes++;
+ *encoded_ptr++ = kValidByte;
+ memcpy(encoded_ptr, bytes.data(), byte_width_);
+ encoded_ptr += byte_width_;
+ };
+ auto handle_next_null_value = [&] {
+ auto& encoded_ptr = *encoded_bytes++;
+ *encoded_ptr++ = kNullByte;
+ memset(encoded_ptr, 0, byte_width_);
+ encoded_ptr += byte_width_;
+ };
if (data.is_array()) {
ArraySpan viewed = data.array;
+ // The original type might not be FixedSizeBinaryType, but it would
+ // treat the input as binary data.
auto view_ty = fixed_size_binary(byte_width_);
viewed.type = view_ty.get();
- VisitArraySpanInline<FixedSizeBinaryType>(
- viewed,
- [&](std::string_view bytes) {
- auto& encoded_ptr = *encoded_bytes++;
- *encoded_ptr++ = kValidByte;
- memcpy(encoded_ptr, bytes.data(), byte_width_);
- encoded_ptr += byte_width_;
- },
- [&] {
- auto& encoded_ptr = *encoded_bytes++;
- *encoded_ptr++ = kNullByte;
- memset(encoded_ptr, 0, byte_width_);
- encoded_ptr += byte_width_;
- });
+ VisitArraySpanInline<FixedSizeBinaryType>(viewed, handle_next_valid_value,
+ handle_next_null_value);
} else {
const auto& scalar =
data.scalar_as<arrow::internal::PrimitiveScalarBase>();
if (scalar.is_valid) {
- const std::string_view data = scalar.view();
- DCHECK_EQ(data.size(), static_cast<size_t>(byte_width_));
+ const std::string_view scalar_data = scalar.view();
+ DCHECK_EQ(scalar_data.size(), static_cast<size_t>(byte_width_));
for (int64_t i = 0; i < batch_length; i++) {
- auto& encoded_ptr = *encoded_bytes++;
- *encoded_ptr++ = kValidByte;
- memcpy(encoded_ptr, data.data(), data.size());
- encoded_ptr += byte_width_;
+ handle_next_valid_value(scalar_data);
}
} else {
for (int64_t i = 0; i < batch_length; i++) {
- auto& encoded_ptr = *encoded_bytes++;
- *encoded_ptr++ = kNullByte;
- memset(encoded_ptr, 0, byte_width_);
- encoded_ptr += byte_width_;
+ handle_next_null_value();
}
}
}
@@ -267,11 +263,11 @@ void RowEncoder::Init(const std::vector<TypeHolder>&
column_types, ExecContext*
for (size_t i = 0; i < column_types.size(); ++i) {
const bool is_extension = column_types[i].id() == Type::EXTENSION;
- const TypeHolder& type = is_extension
- ?
arrow::internal::checked_pointer_cast<ExtensionType>(
- column_types[i].GetSharedPtr())
- ->storage_type()
- : column_types[i];
+ const TypeHolder& type =
+ is_extension
+ ? arrow::internal::checked_cast<const
ExtensionType*>(column_types[i].type)
+ ->storage_type()
+ : column_types[i];
if (is_extension) {
extension_types_[i] =
arrow::internal::checked_pointer_cast<ExtensionType>(
@@ -379,7 +375,7 @@ Result<ExecBatch> RowEncoder::Decode(int64_t num_rows,
const int32_t* row_ids) {
ARROW_ASSIGN_OR_RAISE(out.values[i], ::arrow::internal::GetArrayView(
column_array_data,
extension_types_[i]))
} else {
- out.values[i] = column_array_data;
+ out.values[i] = std::move(column_array_data);
}
}
diff --git a/cpp/src/arrow/compute/row/row_encoder_internal.h
b/cpp/src/arrow/compute/row/row_encoder_internal.h
index 60eb14af50..4d6cc34af2 100644
--- a/cpp/src/arrow/compute/row/row_encoder_internal.h
+++ b/cpp/src/arrow/compute/row/row_encoder_internal.h
@@ -38,16 +38,41 @@ struct ARROW_EXPORT KeyEncoder {
virtual ~KeyEncoder() = default;
+ // Increment the values in the lengths array by the length of the encoded
key for the
+ // corresponding value in the given column.
+ //
+ // Generally if Encoder is for a fixed-width type, the length of the encoded
key
+ // would add ExtraByteForNull + byte_width.
+ // If Encoder is for a variable-width type, the length would add
ExtraByteForNull +
+ // sizeof(Offset) + buffer_size.
+ // If Encoder is for null type, the length would add 0.
virtual void AddLength(const ExecValue& value, int64_t batch_length,
int32_t* lengths) = 0;
+ // Increment the length by the length of an encoded null value.
+ // It's a special case for AddLength like `AddLength(Null-Scalar, 1,
lengths)`.
virtual void AddLengthNull(int32_t* length) = 0;
+ // Encode the column into the encoded_bytes, which is an array of pointers
to each row
+ // buffer.
+ //
+ // If value is an array, the array-size should be batch_length.
+ // If value is a scalar, the value would repeat batch_length times.
+ // NB: The pointers in the encoded_bytes will be advanced as values being
encoded into.
virtual Status Encode(const ExecValue&, int64_t batch_length,
uint8_t** encoded_bytes) = 0;
+ // Encode a null value into the encoded_bytes, which is an array of pointers
to each row
+ // buffer.
+ //
+ // It's a special case for Encode like `Encode(Null-Scalar, 1,
encoded_bytes)`.
+ // NB: The pointers in the encoded_bytes will be advanced as values being
encoded into.
virtual void EncodeNull(uint8_t** encoded_bytes) = 0;
+ // Decode the encoded key from the encoded_bytes, which is an array of
pointers to each
+ // row buffer, into an ArrayData.
+ //
+ // NB: The pointers in the encoded_bytes will be advanced as values being
decoded from.
virtual Result<std::shared_ptr<ArrayData>> Decode(uint8_t** encoded_bytes,
int32_t length,
MemoryPool*) = 0;
@@ -94,7 +119,7 @@ struct ARROW_EXPORT FixedWidthKeyEncoder : KeyEncoder {
MemoryPool* pool) override;
std::shared_ptr<DataType> type_;
- int byte_width_;
+ const int byte_width_;
};
struct ARROW_EXPORT DictionaryKeyEncoder : FixedWidthKeyEncoder {
@@ -118,6 +143,7 @@ struct ARROW_EXPORT VarLengthKeyEncoder : KeyEncoder {
void AddLength(const ExecValue& data, int64_t batch_length, int32_t*
lengths) override {
if (data.is_array()) {
int64_t i = 0;
+ ARROW_DCHECK_EQ(data.array.length, batch_length);
VisitArraySpanInline<T>(
data.array,
[&](std::string_view bytes) {
@@ -142,41 +168,34 @@ struct ARROW_EXPORT VarLengthKeyEncoder : KeyEncoder {
Status Encode(const ExecValue& data, int64_t batch_length,
uint8_t** encoded_bytes) override {
+ auto handle_next_valid_value = [&encoded_bytes](std::string_view bytes) {
+ auto& encoded_ptr = *encoded_bytes++;
+ *encoded_ptr++ = kValidByte;
+ util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
+ encoded_ptr += sizeof(Offset);
+ memcpy(encoded_ptr, bytes.data(), bytes.size());
+ encoded_ptr += bytes.size();
+ };
+ auto handle_next_null_value = [&encoded_bytes]() {
+ auto& encoded_ptr = *encoded_bytes++;
+ *encoded_ptr++ = kNullByte;
+ util::SafeStore(encoded_ptr, static_cast<Offset>(0));
+ encoded_ptr += sizeof(Offset);
+ };
if (data.is_array()) {
- VisitArraySpanInline<T>(
- data.array,
- [&](std::string_view bytes) {
- auto& encoded_ptr = *encoded_bytes++;
- *encoded_ptr++ = kValidByte;
- util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
- encoded_ptr += sizeof(Offset);
- memcpy(encoded_ptr, bytes.data(), bytes.size());
- encoded_ptr += bytes.size();
- },
- [&] {
- auto& encoded_ptr = *encoded_bytes++;
- *encoded_ptr++ = kNullByte;
- util::SafeStore(encoded_ptr, static_cast<Offset>(0));
- encoded_ptr += sizeof(Offset);
- });
+ DCHECK_EQ(data.length(), batch_length);
+ VisitArraySpanInline<T>(data.array, handle_next_valid_value,
+ handle_next_null_value);
} else {
const auto& scalar = data.scalar_as<BaseBinaryScalar>();
if (scalar.is_valid) {
- const auto& bytes = *scalar.value;
+ const auto bytes = std::string_view{*scalar.value};
for (int64_t i = 0; i < batch_length; i++) {
- auto& encoded_ptr = *encoded_bytes++;
- *encoded_ptr++ = kValidByte;
- util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
- encoded_ptr += sizeof(Offset);
- memcpy(encoded_ptr, bytes.data(), bytes.size());
- encoded_ptr += bytes.size();
+ handle_next_valid_value(bytes);
}
} else {
for (int64_t i = 0; i < batch_length; i++) {
- auto& encoded_ptr = *encoded_bytes++;
- *encoded_ptr++ = kNullByte;
- util::SafeStore(encoded_ptr, static_cast<Offset>(0));
- encoded_ptr += sizeof(Offset);
+ handle_next_null_value();
}
}
}
@@ -250,6 +269,68 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
}
};
+/// RowEncoder encodes ExecSpan to a variable length byte sequence
+/// created by concatenating the encoded form of each column. The encoding
+/// for each column depends on its data type.
+///
+/// This is used to encode columns into row-major format, which will be
+/// beneficial for grouping and joining operations.
+///
+/// Unlike DuckDB and arrow-rs, currently this row format can not help
+/// sortings because the row-format is uncomparable.
+///
+/// # Key Column Encoding
+///
+/// The row format is composed of the the KeyColumn encodings for each,
+/// and the column is encoded as follows:
+/// 1. A null byte for each column, indicating whether the column is null.
+/// "1" for null, "0" for non-null.
+/// 2. The "fixed width" encoding for the column, it would exist whether
+/// the column is null or not.
+/// 3. The "variable payload" encoding for the column, it would exists only
+/// for non-null string/binary columns.
+/// For string/binary columns, the length of the payload is in
+/// "fixed width" part, and the binary contents are in the
+/// "variable payload" part.
+/// 4. Specially, if all columns in a row are null, the caller may decide
+/// to refer to kRowIdForNulls instead of actually encoding/decoding
+/// it using any KeyEncoder. See the comment for encoded_nulls_.
+///
+/// The endianness of the encoded bytes is platform-dependent.
+///
+/// ## Null Type
+///
+/// Null Type is a special case, it doesn't occupy any space in the
+/// encoded row.
+///
+/// ## Fixed Width Type
+///
+/// Fixed Width Type is encoded as a fixed-width byte sequence. For example:
+/// ```
+/// Int8: 5, null, 6
+/// ```
+/// Would be encoded as [0 5], [1 0], [0 6].
+///
+/// ### Dictionary Type
+///
+/// Dictionary Type is encoded as a fixed-width byte sequence using
+/// dictionary indices, the dictionary should be identical for all
+/// rows.
+///
+/// ## Variable Width Type
+///
+/// Variable Width Type is encoded as:
+/// [null byte, variable-byte length, variable bytes]. For example:
+///
+/// String "abc" Would be encoded as:
+/// 0 ( 1 byte for not null) + 3 ( 4 bytes for length ) + "abc" (payload)
+///
+/// Null string Would be encoded as:
+/// 1 ( 1 byte for null) + 0 ( 4 bytes for length )
+///
+/// # Row Encoding
+///
+/// The row format is the concatenation of the encodings of each column.
class ARROW_EXPORT RowEncoder {
public:
static constexpr int kRowIdForNulls() { return -1; }
@@ -259,6 +340,9 @@ class ARROW_EXPORT RowEncoder {
Status EncodeAndAppend(const ExecSpan& batch);
Result<ExecBatch> Decode(int64_t num_rows, const int32_t* row_ids);
+ // Returns the encoded representation of the row at index i.
+ // If i is kRowIdForNulls, it returns the pre-encoded all-nulls
+ // row.
inline std::string encoded_row(int32_t i) const {
if (i == kRowIdForNulls()) {
return std::string(reinterpret_cast<const char*>(encoded_nulls_.data()),
@@ -270,14 +354,26 @@ class ARROW_EXPORT RowEncoder {
}
int32_t num_rows() const {
- return offsets_.size() == 0 ? 0 : static_cast<int32_t>(offsets_.size() -
1);
+ return offsets_.empty() ? 0 : static_cast<int32_t>(offsets_.size() - 1);
}
private:
ExecContext* ctx_{nullptr};
std::vector<std::shared_ptr<KeyEncoder>> encoders_;
+ // offsets_ vector stores the starting position (offset) of each encoded row
+ // within the bytes_ vector. This allows for quick access to individual rows.
+ //
+ // The size would be num_rows + 1 if not empty, the last element is the total
+ // length of the bytes_ vector.
std::vector<int32_t> offsets_;
+ // The encoded bytes of all non "kRowIdForNulls" rows.
std::vector<uint8_t> bytes_;
+ // A pre-encoded constant row with all its columns being null. Useful when
+ // the caller is certain that an entire row is null and then uses
kRowIdForNulls
+ // to refer to it.
+ //
+ // EncodeAndAppend would never append this row, but encoded_row and Decode
would
+ // return this row when kRowIdForNulls is passed.
std::vector<uint8_t> encoded_nulls_;
std::vector<std::shared_ptr<ExtensionType>> extension_types_;
};
diff --git a/cpp/src/arrow/compute/row/row_internal.h
b/cpp/src/arrow/compute/row/row_internal.h
index 094a9c31ef..3ab86fd1fc 100644
--- a/cpp/src/arrow/compute/row/row_internal.h
+++ b/cpp/src/arrow/compute/row/row_internal.h
@@ -38,7 +38,7 @@ struct ARROW_EXPORT RowTableMetadata {
/// For a fixed-length binary row, common size of rows in bytes,
/// rounded up to the multiple of alignment.
///
- /// For a varying-length binary, size of all encoded fixed-length key
columns,
+ /// For a varying-length binary row, size of all encoded fixed-length key
columns,
/// including lengths of varying-length columns, rounded up to the multiple
of string
/// alignment.
uint32_t fixed_length;