felipecrv commented on code in PR #41700:
URL: https://github.com/apache/arrow/pull/41700#discussion_r1723503934
##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -715,12 +853,247 @@ class TakeMetaFunction : public MetaFunction {
return Status::NotImplemented(
"Unsupported types for take operation: "
"values=",
- args[0].ToString(), "indices=", args[1].ToString());
+ args[0].ToString(), ", indices=", args[1].ToString());
}
};
// ----------------------------------------------------------------------
+/// \brief Prepare the output array like ExecuteArrayKernel::PrepareOutput()
+std::shared_ptr<ArrayData> PrepareOutput(const ExecBatch& batch, int64_t
length) {
+ DCHECK_EQ(batch.length, length);
+ auto out = std::make_shared<ArrayData>(batch.values[0].type(), length);
+ out->buffers.resize(batch.values[0].type()->layout().buffers.size());
+ return out;
+}
+
+Result<std::shared_ptr<Array>> ChunkedArrayAsArray(
+ const std::shared_ptr<ChunkedArray>& values, MemoryPool* pool) {
+ switch (values->num_chunks()) {
+ case 0:
+ return MakeArrayOfNull(values->type(), /*length=*/0, pool);
+ case 1:
+ return values->chunk(0);
+ default:
+ return Concatenate(values->chunks(), pool);
+ }
+}
+
+Status CallAAAKernel(ArrayKernelExec take_aaa_exec, KernelContext* ctx,
+ std::shared_ptr<ArrayData> values,
+ std::shared_ptr<ArrayData> indices, Datum* out) {
+ int64_t batch_length = values->length;
+ std::vector<Datum> args = {std::move(values), std::move(indices)};
+ ExecBatch array_array_batch(std::move(args), batch_length);
+ DCHECK_EQ(out->kind(), Datum::ARRAY);
+ ExecSpan exec_span{array_array_batch};
+ ExecResult result;
+ result.value = out->array();
+ RETURN_NOT_OK(take_aaa_exec(ctx, exec_span, &result));
+ DCHECK(result.is_array_data());
+ out->value = result.array_data();
+ return Status::OK();
+}
+
+Status CallCAAKernel(VectorKernel::ChunkedExec take_caa_exec, KernelContext*
ctx,
+ std::shared_ptr<ChunkedArray> values,
+ std::shared_ptr<ArrayData> indices, Datum* out) {
+ int64_t batch_length = values->length();
+ std::vector<Datum> args = {std::move(values), std::move(indices)};
+ ExecBatch chunked_array_array_batch(std::move(args), batch_length);
+ DCHECK_EQ(out->kind(), Datum::ARRAY);
+ return take_caa_exec(ctx, chunked_array_array_batch, out);
+}
+
+Status TakeACCChunkedExec(ArrayKernelExec take_aaa_exec, KernelContext* ctx,
+ const ExecBatch& batch, Datum* out) {
+ auto& values = batch.values[0].array();
+ auto& indices = batch.values[1].chunked_array();
+ auto num_chunks = indices->num_chunks();
+ std::vector<std::shared_ptr<Array>> new_chunks(num_chunks);
+ for (int i = 0; i < num_chunks; i++) {
+ // Take with that indices chunk
+ auto& indices_chunk = indices->chunk(i)->data();
+ Datum result = PrepareOutput(batch, values->length);
+ RETURN_NOT_OK(CallAAAKernel(take_aaa_exec, ctx, values, indices_chunk,
&result));
+ new_chunks[i] = MakeArray(result.array());
+ }
+ out->value = std::make_shared<ChunkedArray>(std::move(new_chunks),
values->type);
+ return Status::OK();
+}
+
+/// \brief Generic (slower) VectorKernel::exec_chunked (`CA->C`, `CC->C`, and
`AC->C`).
+///
+/// This function concatenates the chunks of the values and then calls the
`AA->A` take
+/// kernel to handle the `CA->C` cases. The ArrayData returned by the `AA->A`
kernel is
+/// converted to a ChunkedArray with a single chunk to honor the `CA->C`
contract.
+///
+/// For `CC->C` cases, it concatenates the chunks of the values and calls the
`AA->A` take
+/// kernel for each chunk of the indices, producing a new chunked array with
the same
+/// shape as the indices.
+///
+/// `AC->C` cases are trivially delegated to TakeACCChunkedExec without any
concatenation.
+///
+/// \param take_aaa_exec The `AA->A` take kernel to use.
+Status GenericTakeChunkedExec(ArrayKernelExec take_aaa_exec, KernelContext*
ctx,
+ const ExecBatch& batch, Datum* out) {
+ const auto& args = batch.values;
+ if (args[0].kind() == Datum::CHUNKED_ARRAY) {
+ auto& values_chunked = args[0].chunked_array();
+ ARROW_ASSIGN_OR_RAISE(auto values_array,
+ ChunkedArrayAsArray(values_chunked,
ctx->memory_pool()));
+ if (args[1].kind() == Datum::ARRAY) {
+ // CA->C
+ auto& indices = args[1].array();
+ DCHECK_EQ(values_array->length(), batch.length);
+ {
+ // AA->A
+ RETURN_NOT_OK(
+ CallAAAKernel(take_aaa_exec, ctx, values_array->data(), indices,
out));
+ out->value = std::make_shared<ChunkedArray>(MakeArray(out->array()));
+ }
+ return Status::OK();
+ } else if (args[1].kind() == Datum::CHUNKED_ARRAY) {
+ // CC->C
+ const auto& indices = args[1].chunked_array();
+ std::vector<std::shared_ptr<Array>> new_chunks;
+ for (int i = 0; i < indices->num_chunks(); i++) {
+ // AA->A
+ auto& indices_chunk = indices->chunk(i)->data();
+ Datum result = PrepareOutput(batch, values_array->length());
+ RETURN_NOT_OK(CallAAAKernel(take_aaa_exec, ctx, values_array->data(),
+ indices_chunk, &result));
+ new_chunks.push_back(MakeArray(result.array()));
+ }
+ DCHECK(out->is_array());
+ out->value =
+ std::make_shared<ChunkedArray>(std::move(new_chunks),
values_chunked->type());
+ return Status::OK();
+ }
+ } else {
+ // VectorKernel::exec_chunked are only called when at least one of the
inputs is
+ // chunked, so we should be able to assume that args[1] is a chunked array
when
+ // everything is wired up correctly.
+ if (args[1].kind() == Datum::CHUNKED_ARRAY) {
+ // AC->C
+ return TakeACCChunkedExec(take_aaa_exec, ctx, batch, out);
+ } else {
+ DCHECK(false) << "Unexpected kind for array_take's exec_chunked kernel:
values="
+ << args[0].ToString() << ", indices=" <<
args[1].ToString();
+ }
+ }
+ return Status::NotImplemented(
+ "Unsupported kinds for 'array_take', try using 'take': "
+ "values=",
+ args[0].ToString(), ", indices=", args[1].ToString());
+}
+
+template <ArrayKernelExec kTakeAAAExec>
+struct GenericTakeChunkedExecFunctor {
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ return GenericTakeChunkedExec(kTakeAAAExec, ctx, batch, out);
+ }
+};
+
+/// \brief Specialized (faster) VectorKernel::exec_chunked (`CA->C`, `CC->C`,
`AC->C`).
+///
+/// This function doesn't ever need to concatenate the chunks of the values,
so it can be
+/// more efficient than GenericTakeChunkedExec that can only delegate to the
`AA->A` take
+/// kernels.
+///
+/// For `CA->C` cases, it can call the `CA->A` take kernel directly [1] and
trivially
+/// convert the result to a ChunkedArray of a single chunk to honor the
`CA->C` contract.
+///
+/// For `CC->C` cases it can call the `CA->A` take kernel for each chunk of
the indices to
+/// get each chunk that becomes the ChunkedArray output.
+///
+/// `AC->C` cases are trivially delegated to TakeACCChunkedExec.
+///
+/// \param take_aaa_exec The `AA->A` take kernel to use.
+Status SpecialTakeChunkedExec(const ArrayKernelExec take_aaa_exec,
+ VectorKernel::ChunkedExec take_caa_exec,
KernelContext* ctx,
+ const ExecBatch& batch, Datum* out) {
+ Datum result = PrepareOutput(batch, batch.length);
+ auto* pool = ctx->memory_pool();
+ const auto& args = batch.values;
+ if (args[0].kind() == Datum::CHUNKED_ARRAY) {
+ auto& values_chunked = args[0].chunked_array();
+ std::shared_ptr<Array> single_chunk = nullptr;
+ if (values_chunked->num_chunks() == 0 || values_chunked->length() == 0) {
Review Comment:
Nothing necessarily prevents chunked arrays of many empty chunks, does it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]