pitrou commented on code in PR #43912:
URL: https://github.com/apache/arrow/pull/43912#discussion_r1763463071
##########
cpp/src/arrow/compute/row/row_encoder_internal.h:
##########
@@ -18,10 +18,13 @@
#pragma once
#include <cstdint>
+#include <iostream>
Review Comment:
Why?
##########
cpp/src/arrow/acero/hash_join_node_test.cc:
##########
@@ -514,7 +514,7 @@ std::vector<std::shared_ptr<Array>> GenRandomUniqueRecords(
val_types.push_back(result[i]->type());
}
RowEncoder encoder;
- encoder.Init(val_types, ctx);
+ auto s = encoder.Init(val_types, ctx);
Review Comment:
Please at least use `DCHECK_OK`.
##########
cpp/src/arrow/compute/row/row_encoder_internal.h:
##########
@@ -269,6 +272,190 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
}
};
+template <typename ListType>
+struct ARROW_EXPORT ListKeyEncoder : KeyEncoder {
+ static_assert(is_list_like_type<ListType>(), "ListKeyEncoder only supports
ListType");
+ using Offset = typename ListType::offset_type;
+
+ ListKeyEncoder(std::shared_ptr<DataType> self_type,
+ std::shared_ptr<DataType> element_type,
+ std::shared_ptr<KeyEncoder> element_encoder)
+ : self_type_(std::move(self_type)),
+ element_type_(std::move(element_type)),
+ element_encoder_(std::move(element_encoder)) {}
+
+ void AddLength(const ExecValue& data, int64_t batch_length, int32_t*
lengths) override {
+ if (data.is_array()) {
+ ARROW_DCHECK_EQ(data.array.length, batch_length);
+ const uint8_t* validity = data.array.buffers[0].data;
+ const auto* offsets = data.array.GetValues<Offset>(1);
+ // AddLength for each list
+ std::vector<int32_t> child_lengthes;
+ int32_t index{0};
+ ArraySpan tmp_child_data(data.array.child_data[0]);
+ VisitBitBlocksVoid(
+ validity, data.array.offset, data.array.length,
+ [&](int64_t i) {
+ ARROW_UNUSED(i);
Review Comment:
It's used.
##########
cpp/src/arrow/compute/row/row_encoder_internal.h:
##########
@@ -269,6 +272,190 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
}
};
+template <typename ListType>
+struct ARROW_EXPORT ListKeyEncoder : KeyEncoder {
+ static_assert(is_list_like_type<ListType>(), "ListKeyEncoder only supports
ListType");
+ using Offset = typename ListType::offset_type;
+
+ ListKeyEncoder(std::shared_ptr<DataType> self_type,
+ std::shared_ptr<DataType> element_type,
+ std::shared_ptr<KeyEncoder> element_encoder)
+ : self_type_(std::move(self_type)),
+ element_type_(std::move(element_type)),
+ element_encoder_(std::move(element_encoder)) {}
+
+ void AddLength(const ExecValue& data, int64_t batch_length, int32_t*
lengths) override {
+ if (data.is_array()) {
+ ARROW_DCHECK_EQ(data.array.length, batch_length);
+ const uint8_t* validity = data.array.buffers[0].data;
+ const auto* offsets = data.array.GetValues<Offset>(1);
+ // AddLength for each list
+ std::vector<int32_t> child_lengthes;
+ int32_t index{0};
+ ArraySpan tmp_child_data(data.array.child_data[0]);
+ VisitBitBlocksVoid(
+ validity, data.array.offset, data.array.length,
+ [&](int64_t i) {
+ ARROW_UNUSED(i);
+ child_lengthes.clear();
Review Comment:
No need to call `clear` as you're calling `resize` below.
##########
cpp/src/arrow/compute/row/row_encoder_internal.h:
##########
@@ -269,6 +272,190 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
}
};
+template <typename ListType>
+struct ARROW_EXPORT ListKeyEncoder : KeyEncoder {
+ static_assert(is_list_like_type<ListType>(), "ListKeyEncoder only supports
ListType");
+ using Offset = typename ListType::offset_type;
+
+ ListKeyEncoder(std::shared_ptr<DataType> self_type,
+ std::shared_ptr<DataType> element_type,
+ std::shared_ptr<KeyEncoder> element_encoder)
+ : self_type_(std::move(self_type)),
+ element_type_(std::move(element_type)),
+ element_encoder_(std::move(element_encoder)) {}
+
+ void AddLength(const ExecValue& data, int64_t batch_length, int32_t*
lengths) override {
+ if (data.is_array()) {
+ ARROW_DCHECK_EQ(data.array.length, batch_length);
+ const uint8_t* validity = data.array.buffers[0].data;
+ const auto* offsets = data.array.GetValues<Offset>(1);
+ // AddLength for each list
+ std::vector<int32_t> child_lengthes;
+ int32_t index{0};
+ ArraySpan tmp_child_data(data.array.child_data[0]);
+ VisitBitBlocksVoid(
+ validity, data.array.offset, data.array.length,
+ [&](int64_t i) {
+ ARROW_UNUSED(i);
+ child_lengthes.clear();
+ Offset list_length = offsets[i + 1] - offsets[i];
+ if (list_length == 0) {
+ lengths[index] += kExtraByteForNull + sizeof(Offset);
+ ++index;
+ return;
+ }
+ child_lengthes.resize(list_length, 0);
+ tmp_child_data.SetSlice(offsets[i], list_length);
+ this->element_encoder_->AddLength(ExecValue{tmp_child_data},
list_length,
+ child_lengthes.data());
+ lengths[index] += kExtraByteForNull + sizeof(Offset);
+ for (int32_t j = 0; j < list_length; j++) {
+ lengths[index] += child_lengthes[j];
+ }
+ ++index;
+ },
+ [&]() {
+ lengths[index] = kExtraByteForNull + sizeof(Offset);
+ ++index;
+ });
+ } else {
+ const auto& list_scalar = data.scalar_as<BaseListScalar>();
+ int32_t accum_length = 0;
+ // Counting the size of the encoded list if the list is valid
+ if (list_scalar.is_valid && list_scalar.value->length() > 0) {
+ auto element_count = static_cast<int32_t>(list_scalar.value->length());
+ // Counting the size of the encoded list
+ std::vector<int32_t> child_lengthes(element_count, 0);
+
this->element_encoder_->AddLength(ExecValue{*list_scalar.value->data()},
+ element_count,
child_lengthes.data());
+ for (int32_t i = 0; i < element_count; i++) {
+ accum_length += child_lengthes[i];
+ }
+ }
+ for (int64_t i = 0; i < batch_length; i++) {
+ lengths[i] += kExtraByteForNull + sizeof(Offset) + accum_length;
+ }
+ }
+ }
+
+ void AddLengthNull(int32_t* length) override {
+ *length += kExtraByteForNull + sizeof(Offset);
+ }
+
+ Status Encode(const ExecValue& data, int64_t batch_length,
+ uint8_t** encoded_bytes) override {
+ auto handle_null_value = [&encoded_bytes]() {
+ auto& encoded_ptr = *encoded_bytes++;
+ *encoded_ptr++ = kNullByte;
+ util::SafeStore(encoded_ptr, static_cast<Offset>(0));
+ encoded_ptr += sizeof(Offset);
+ };
+ auto handle_valid_value = [&encoded_bytes,
+ this](const ArraySpan& child_array) -> Status {
+ auto& encoded_ptr = *encoded_bytes++;
+ *encoded_ptr++ = kValidByte;
+ util::SafeStore(encoded_ptr, static_cast<Offset>(child_array.length));
+ encoded_ptr += sizeof(Offset);
+ // handling the child data
+ for (int64_t i = 0; i < child_array.length; i++) {
+ ArraySpan tmp_child_data(child_array);
+ tmp_child_data.SetSlice(child_array.offset + i, 1);
+ RETURN_NOT_OK(
+ this->element_encoder_->Encode(ExecValue{tmp_child_data}, 1,
&encoded_ptr));
+ }
+ return Status::OK();
+ };
+ if (data.is_array()) {
+ ARROW_DCHECK_EQ(data.array.length, batch_length);
+ const uint8_t* validity = data.array.buffers[0].data;
+ const auto* offsets = data.array.GetValues<Offset>(1);
+ ArraySpan tmp_child_data(data.array.child_data[0]);
+ RETURN_NOT_OK(VisitBitBlocks(
+ validity, data.array.offset, data.array.length,
+ [&](int64_t i) {
+ ARROW_UNUSED(i);
Review Comment:
It's used.
##########
cpp/src/arrow/compute/row/row_encoder_internal.h:
##########
@@ -269,6 +272,190 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
}
};
+template <typename ListType>
+struct ARROW_EXPORT ListKeyEncoder : KeyEncoder {
Review Comment:
Yes, please do. It would be nice to hide most contents from this file into
the corresponding `.cc`
##########
cpp/src/arrow/compute/row/row_encoder_internal.h:
##########
@@ -269,6 +272,190 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
}
};
+template <typename ListType>
+struct ARROW_EXPORT ListKeyEncoder : KeyEncoder {
Review Comment:
Can you add a comment explaining how the encoding looks like?
##########
cpp/src/arrow/compute/row/row_encoder_internal.h:
##########
@@ -269,6 +272,190 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
}
};
+template <typename ListType>
+struct ARROW_EXPORT ListKeyEncoder : KeyEncoder {
+ static_assert(is_list_like_type<ListType>(), "ListKeyEncoder only supports
ListType");
+ using Offset = typename ListType::offset_type;
+
+ ListKeyEncoder(std::shared_ptr<DataType> self_type,
+ std::shared_ptr<DataType> element_type,
+ std::shared_ptr<KeyEncoder> element_encoder)
+ : self_type_(std::move(self_type)),
+ element_type_(std::move(element_type)),
+ element_encoder_(std::move(element_encoder)) {}
+
+ void AddLength(const ExecValue& data, int64_t batch_length, int32_t*
lengths) override {
+ if (data.is_array()) {
+ ARROW_DCHECK_EQ(data.array.length, batch_length);
+ const uint8_t* validity = data.array.buffers[0].data;
+ const auto* offsets = data.array.GetValues<Offset>(1);
+ // AddLength for each list
+ std::vector<int32_t> child_lengthes;
Review Comment:
```suggestion
std::vector<int32_t> child_lengths;
```
##########
cpp/src/arrow/compute/row/row_encoder_internal.h:
##########
@@ -269,6 +272,190 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
}
};
+template <typename ListType>
+struct ARROW_EXPORT ListKeyEncoder : KeyEncoder {
+ static_assert(is_list_like_type<ListType>(), "ListKeyEncoder only supports
ListType");
+ using Offset = typename ListType::offset_type;
+
+ ListKeyEncoder(std::shared_ptr<DataType> self_type,
+ std::shared_ptr<DataType> element_type,
+ std::shared_ptr<KeyEncoder> element_encoder)
+ : self_type_(std::move(self_type)),
+ element_type_(std::move(element_type)),
+ element_encoder_(std::move(element_encoder)) {}
+
+ void AddLength(const ExecValue& data, int64_t batch_length, int32_t*
lengths) override {
+ if (data.is_array()) {
+ ARROW_DCHECK_EQ(data.array.length, batch_length);
+ const uint8_t* validity = data.array.buffers[0].data;
+ const auto* offsets = data.array.GetValues<Offset>(1);
+ // AddLength for each list
+ std::vector<int32_t> child_lengthes;
+ int32_t index{0};
+ ArraySpan tmp_child_data(data.array.child_data[0]);
+ VisitBitBlocksVoid(
+ validity, data.array.offset, data.array.length,
+ [&](int64_t i) {
+ ARROW_UNUSED(i);
+ child_lengthes.clear();
+ Offset list_length = offsets[i + 1] - offsets[i];
+ if (list_length == 0) {
+ lengths[index] += kExtraByteForNull + sizeof(Offset);
+ ++index;
+ return;
+ }
+ child_lengthes.resize(list_length, 0);
+ tmp_child_data.SetSlice(offsets[i], list_length);
+ this->element_encoder_->AddLength(ExecValue{tmp_child_data},
list_length,
+ child_lengthes.data());
+ lengths[index] += kExtraByteForNull + sizeof(Offset);
+ for (int32_t j = 0; j < list_length; j++) {
+ lengths[index] += child_lengthes[j];
+ }
+ ++index;
+ },
+ [&]() {
+ lengths[index] = kExtraByteForNull + sizeof(Offset);
+ ++index;
+ });
+ } else {
+ const auto& list_scalar = data.scalar_as<BaseListScalar>();
+ int32_t accum_length = 0;
+ // Counting the size of the encoded list if the list is valid
+ if (list_scalar.is_valid && list_scalar.value->length() > 0) {
+ auto element_count = static_cast<int32_t>(list_scalar.value->length());
+ // Counting the size of the encoded list
+ std::vector<int32_t> child_lengthes(element_count, 0);
+
this->element_encoder_->AddLength(ExecValue{*list_scalar.value->data()},
+ element_count,
child_lengthes.data());
+ for (int32_t i = 0; i < element_count; i++) {
+ accum_length += child_lengthes[i];
+ }
+ }
+ for (int64_t i = 0; i < batch_length; i++) {
+ lengths[i] += kExtraByteForNull + sizeof(Offset) + accum_length;
+ }
+ }
+ }
+
+ void AddLengthNull(int32_t* length) override {
+ *length += kExtraByteForNull + sizeof(Offset);
+ }
+
+ Status Encode(const ExecValue& data, int64_t batch_length,
+ uint8_t** encoded_bytes) override {
+ auto handle_null_value = [&encoded_bytes]() {
+ auto& encoded_ptr = *encoded_bytes++;
+ *encoded_ptr++ = kNullByte;
+ util::SafeStore(encoded_ptr, static_cast<Offset>(0));
+ encoded_ptr += sizeof(Offset);
+ };
+ auto handle_valid_value = [&encoded_bytes,
+ this](const ArraySpan& child_array) -> Status {
+ auto& encoded_ptr = *encoded_bytes++;
+ *encoded_ptr++ = kValidByte;
+ util::SafeStore(encoded_ptr, static_cast<Offset>(child_array.length));
+ encoded_ptr += sizeof(Offset);
+ // handling the child data
+ for (int64_t i = 0; i < child_array.length; i++) {
+ ArraySpan tmp_child_data(child_array);
+ tmp_child_data.SetSlice(child_array.offset + i, 1);
+ RETURN_NOT_OK(
+ this->element_encoder_->Encode(ExecValue{tmp_child_data}, 1,
&encoded_ptr));
+ }
+ return Status::OK();
+ };
+ if (data.is_array()) {
+ ARROW_DCHECK_EQ(data.array.length, batch_length);
+ const uint8_t* validity = data.array.buffers[0].data;
+ const auto* offsets = data.array.GetValues<Offset>(1);
+ ArraySpan tmp_child_data(data.array.child_data[0]);
+ RETURN_NOT_OK(VisitBitBlocks(
+ validity, data.array.offset, data.array.length,
+ [&](int64_t i) {
+ ARROW_UNUSED(i);
+ Offset list_length = offsets[i + 1] - offsets[i];
+ tmp_child_data.SetSlice(offsets[i], list_length);
+ return handle_valid_value(tmp_child_data);
+ },
+ [&]() {
+ handle_null_value();
+ return Status::OK();
+ }));
+ } else {
+ const auto& list_scalar = data.scalar_as<BaseListScalar>();
+ if (list_scalar.is_valid) {
+ ArraySpan span(*list_scalar.value->data());
+ for (int64_t i = 0; i < batch_length; i++) {
+ RETURN_NOT_OK(handle_valid_value(span));
+ }
+ } else {
+ for (int64_t i = 0; i < batch_length; i++) {
+ handle_null_value();
+ }
Review Comment:
You could instead call `handle_valid_value` or `handle_null_value` once and
then `memcpy` the result `batch_length - 1` times.
##########
cpp/src/arrow/compute/row/row_encoder_internal.h:
##########
@@ -269,6 +272,190 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
}
};
+template <typename ListType>
+struct ARROW_EXPORT ListKeyEncoder : KeyEncoder {
+ static_assert(is_list_like_type<ListType>(), "ListKeyEncoder only supports
ListType");
+ using Offset = typename ListType::offset_type;
+
+ ListKeyEncoder(std::shared_ptr<DataType> self_type,
+ std::shared_ptr<DataType> element_type,
+ std::shared_ptr<KeyEncoder> element_encoder)
+ : self_type_(std::move(self_type)),
+ element_type_(std::move(element_type)),
+ element_encoder_(std::move(element_encoder)) {}
+
+ void AddLength(const ExecValue& data, int64_t batch_length, int32_t*
lengths) override {
+ if (data.is_array()) {
+ ARROW_DCHECK_EQ(data.array.length, batch_length);
+ const uint8_t* validity = data.array.buffers[0].data;
+ const auto* offsets = data.array.GetValues<Offset>(1);
+ // AddLength for each list
+ std::vector<int32_t> child_lengthes;
+ int32_t index{0};
+ ArraySpan tmp_child_data(data.array.child_data[0]);
+ VisitBitBlocksVoid(
+ validity, data.array.offset, data.array.length,
+ [&](int64_t i) {
+ ARROW_UNUSED(i);
+ child_lengthes.clear();
+ Offset list_length = offsets[i + 1] - offsets[i];
+ if (list_length == 0) {
+ lengths[index] += kExtraByteForNull + sizeof(Offset);
+ ++index;
+ return;
+ }
+ child_lengthes.resize(list_length, 0);
+ tmp_child_data.SetSlice(offsets[i], list_length);
+ this->element_encoder_->AddLength(ExecValue{tmp_child_data},
list_length,
+ child_lengthes.data());
+ lengths[index] += kExtraByteForNull + sizeof(Offset);
+ for (int32_t j = 0; j < list_length; j++) {
+ lengths[index] += child_lengthes[j];
+ }
+ ++index;
+ },
+ [&]() {
+ lengths[index] = kExtraByteForNull + sizeof(Offset);
+ ++index;
+ });
+ } else {
+ const auto& list_scalar = data.scalar_as<BaseListScalar>();
+ int32_t accum_length = 0;
+ // Counting the size of the encoded list if the list is valid
+ if (list_scalar.is_valid && list_scalar.value->length() > 0) {
+ auto element_count = static_cast<int32_t>(list_scalar.value->length());
+ // Counting the size of the encoded list
+ std::vector<int32_t> child_lengthes(element_count, 0);
+
this->element_encoder_->AddLength(ExecValue{*list_scalar.value->data()},
+ element_count,
child_lengthes.data());
+ for (int32_t i = 0; i < element_count; i++) {
+ accum_length += child_lengthes[i];
+ }
+ }
+ for (int64_t i = 0; i < batch_length; i++) {
+ lengths[i] += kExtraByteForNull + sizeof(Offset) + accum_length;
+ }
+ }
+ }
+
+ void AddLengthNull(int32_t* length) override {
+ *length += kExtraByteForNull + sizeof(Offset);
+ }
+
+ Status Encode(const ExecValue& data, int64_t batch_length,
+ uint8_t** encoded_bytes) override {
+ auto handle_null_value = [&encoded_bytes]() {
+ auto& encoded_ptr = *encoded_bytes++;
+ *encoded_ptr++ = kNullByte;
+ util::SafeStore(encoded_ptr, static_cast<Offset>(0));
+ encoded_ptr += sizeof(Offset);
+ };
+ auto handle_valid_value = [&encoded_bytes,
+ this](const ArraySpan& child_array) -> Status {
+ auto& encoded_ptr = *encoded_bytes++;
+ *encoded_ptr++ = kValidByte;
+ util::SafeStore(encoded_ptr, static_cast<Offset>(child_array.length));
+ encoded_ptr += sizeof(Offset);
+ // handling the child data
+ for (int64_t i = 0; i < child_array.length; i++) {
+ ArraySpan tmp_child_data(child_array);
+ tmp_child_data.SetSlice(child_array.offset + i, 1);
+ RETURN_NOT_OK(
+ this->element_encoder_->Encode(ExecValue{tmp_child_data}, 1,
&encoded_ptr));
+ }
Review Comment:
Why do it one element at a time? Why not instead:
```suggestion
RETURN_NOT_OK(
this->element_encoder_->Encode(ExecValue{child_array},
child_array.length, &encoded_ptr));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]