mapleFU commented on code in PR #43912:
URL: https://github.com/apache/arrow/pull/43912#discussion_r1749186953


##########
cpp/src/arrow/compute/row/row_encoder_internal.h:
##########
@@ -269,6 +272,190 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
   }
 };
 
+template <typename ListType>
+struct ARROW_EXPORT ListKeyEncoder : KeyEncoder {
+  static_assert(is_list_like_type<ListType>(), "ListKeyEncoder only supports 
ListType");
+  using Offset = typename ListType::offset_type;
+
+  ListKeyEncoder(std::shared_ptr<DataType> self_type,
+                 std::shared_ptr<DataType> element_type,
+                 std::shared_ptr<KeyEncoder> element_encoder)
+      : self_type_(std::move(self_type)),
+        element_type_(std::move(element_type)),
+        element_encoder_(std::move(element_encoder)) {}
+
+  void AddLength(const ExecValue& data, int64_t batch_length, int32_t* 
lengths) override {
+    if (data.is_array()) {
+      ARROW_DCHECK_EQ(data.array.length, batch_length);
+      const uint8_t* validity = data.array.buffers[0].data;
+      const auto* offsets = data.array.GetValues<Offset>(1);
+      // AddLength for each list
+      std::vector<int32_t> child_lengthes;
+      int32_t index{0};
+      ArraySpan tmp_child_data(data.array.child_data[0]);
+      VisitBitBlocksVoid(
+          validity, data.array.offset, data.array.length,
+          [&](int64_t i) {
+            ARROW_UNUSED(i);
+            child_lengthes.clear();
+            Offset list_length = offsets[i + 1] - offsets[i];
+            if (list_length == 0) {
+              lengths[index] += kExtraByteForNull + sizeof(Offset);
+              ++index;
+              return;
+            }
+            child_lengthes.resize(list_length, 0);
+            tmp_child_data.SetSlice(offsets[i], list_length);
+            this->element_encoder_->AddLength(ExecValue{tmp_child_data}, 
list_length,
+                                              child_lengthes.data());
+            lengths[index] += kExtraByteForNull + sizeof(Offset);
+            for (int32_t j = 0; j < list_length; j++) {
+              lengths[index] += child_lengthes[j];
+            }
+            ++index;
+          },
+          [&]() {
+            lengths[index] = kExtraByteForNull + sizeof(Offset);
+            ++index;
+          });
+    } else {
+      const auto& list_scalar = data.scalar_as<BaseListScalar>();
+      int32_t accum_length = 0;
+      // Counting the size of the encoded list if the list is valid
+      if (list_scalar.is_valid && list_scalar.value->length() > 0) {
+        auto element_count = static_cast<int32_t>(list_scalar.value->length());
+        // Counting the size of the encoded list
+        std::vector<int32_t> child_lengthes(element_count, 0);
+        
this->element_encoder_->AddLength(ExecValue{*list_scalar.value->data()},
+                                          element_count, 
child_lengthes.data());
+        for (int32_t i = 0; i < element_count; i++) {
+          accum_length += child_lengthes[i];
+        }
+      }
+      for (int64_t i = 0; i < batch_length; i++) {
+        lengths[i] += kExtraByteForNull + sizeof(Offset) + accum_length;
+      }
+    }
+  }
+
+  void AddLengthNull(int32_t* length) override {
+    *length += kExtraByteForNull + sizeof(Offset);
+  }
+
+  Status Encode(const ExecValue& data, int64_t batch_length,
+                uint8_t** encoded_bytes) override {
+    auto handle_null_value = [&encoded_bytes]() {
+      auto& encoded_ptr = *encoded_bytes++;
+      *encoded_ptr++ = kNullByte;
+      util::SafeStore(encoded_ptr, static_cast<Offset>(0));
+      encoded_ptr += sizeof(Offset);
+    };
+    auto handle_valid_value = [&encoded_bytes,
+                               this](const ArraySpan& child_array) -> Status {
+      auto& encoded_ptr = *encoded_bytes++;
+      *encoded_ptr++ = kValidByte;
+      util::SafeStore(encoded_ptr, static_cast<Offset>(child_array.length));
+      encoded_ptr += sizeof(Offset);
+      // handling the child data
+      for (int64_t i = 0; i < child_array.length; i++) {
+        ArraySpan tmp_child_data(child_array);
+        tmp_child_data.SetSlice(child_array.offset + i, 1);
+        RETURN_NOT_OK(
+            this->element_encoder_->Encode(ExecValue{tmp_child_data}, 1, 
&encoded_ptr));
+      }
+      return Status::OK();
+    };
+    if (data.is_array()) {
+      ARROW_DCHECK_EQ(data.array.length, batch_length);
+      const uint8_t* validity = data.array.buffers[0].data;
+      const auto* offsets = data.array.GetValues<Offset>(1);
+      ArraySpan tmp_child_data(data.array.child_data[0]);
+      RETURN_NOT_OK(VisitBitBlocks(
+          validity, data.array.offset, data.array.length,
+          [&](int64_t i) {
+            ARROW_UNUSED(i);
+            Offset list_length = offsets[i + 1] - offsets[i];
+            tmp_child_data.SetSlice(offsets[i], list_length);
+            return handle_valid_value(tmp_child_data);
+          },
+          [&]() {
+            handle_null_value();
+            return Status::OK();
+          }));
+    } else {
+      const auto& list_scalar = data.scalar_as<BaseListScalar>();
+      if (list_scalar.is_valid) {
+        ArraySpan span(*list_scalar.value->data());
+        for (int64_t i = 0; i < batch_length; i++) {
+          RETURN_NOT_OK(handle_valid_value(span));
+        }
+      } else {
+        for (int64_t i = 0; i < batch_length; i++) {
+          handle_null_value();
+        }
+      }
+    }
+    return Status::OK();
+  }
+
+  void EncodeNull(uint8_t** encoded_bytes) override {
+    auto& encoded_ptr = *encoded_bytes;
+    *encoded_ptr++ = kNullByte;
+    util::SafeStore(encoded_ptr, static_cast<Offset>(0));
+    encoded_ptr += sizeof(Offset);
+  }
+
+  Result<std::shared_ptr<ArrayData>> Decode(uint8_t** encoded_bytes, int32_t 
length,
+                                            MemoryPool* pool) override {
+    std::shared_ptr<Buffer> null_buf;
+    int32_t null_count;
+    ARROW_RETURN_NOT_OK(DecodeNulls(pool, length, encoded_bytes, &null_buf, 
&null_count));
+
+    // Build the offsets buffer of the ListArray
+    ARROW_ASSIGN_OR_RAISE(auto offset_buf,
+                          AllocateBuffer(sizeof(Offset) * (1 + length), pool));
+    auto raw_offsets = offset_buf->mutable_span_as<Offset>();
+    Offset element_sum = 0;
+    raw_offsets[0] = 0;
+    std::vector<std::shared_ptr<Array>> child_datas;

Review Comment:
   This is a bit tricky, it always decode 1 from child, so we'll have 
"element-size" `Array` here...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to