pitrou commented on code in PR #41700:
URL: https://github.com/apache/arrow/pull/41700#discussion_r1723426802
##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -715,12 +853,247 @@ class TakeMetaFunction : public MetaFunction {
return Status::NotImplemented(
"Unsupported types for take operation: "
"values=",
- args[0].ToString(), "indices=", args[1].ToString());
+ args[0].ToString(), ", indices=", args[1].ToString());
}
};
// ----------------------------------------------------------------------
+/// \brief Prepare the output array like ExecuteArrayKernel::PrepareOutput()
+std::shared_ptr<ArrayData> PrepareOutput(const ExecBatch& batch, int64_t
length) {
+ DCHECK_EQ(batch.length, length);
+ auto out = std::make_shared<ArrayData>(batch.values[0].type(), length);
+ out->buffers.resize(batch.values[0].type()->layout().buffers.size());
+ return out;
+}
+
+Result<std::shared_ptr<Array>> ChunkedArrayAsArray(
+ const std::shared_ptr<ChunkedArray>& values, MemoryPool* pool) {
+ switch (values->num_chunks()) {
+ case 0:
+ return MakeArrayOfNull(values->type(), /*length=*/0, pool);
+ case 1:
+ return values->chunk(0);
+ default:
+ return Concatenate(values->chunks(), pool);
+ }
+}
+
+Status CallAAAKernel(ArrayKernelExec take_aaa_exec, KernelContext* ctx,
+ std::shared_ptr<ArrayData> values,
+ std::shared_ptr<ArrayData> indices, Datum* out) {
+ int64_t batch_length = values->length;
+ std::vector<Datum> args = {std::move(values), std::move(indices)};
+ ExecBatch array_array_batch(std::move(args), batch_length);
+ DCHECK_EQ(out->kind(), Datum::ARRAY);
+ ExecSpan exec_span{array_array_batch};
+ ExecResult result;
+ result.value = out->array();
+ RETURN_NOT_OK(take_aaa_exec(ctx, exec_span, &result));
+ DCHECK(result.is_array_data());
+ out->value = result.array_data();
+ return Status::OK();
+}
+
+Status CallCAAKernel(VectorKernel::ChunkedExec take_caa_exec, KernelContext*
ctx,
+ std::shared_ptr<ChunkedArray> values,
+ std::shared_ptr<ArrayData> indices, Datum* out) {
+ int64_t batch_length = values->length();
+ std::vector<Datum> args = {std::move(values), std::move(indices)};
+ ExecBatch chunked_array_array_batch(std::move(args), batch_length);
+ DCHECK_EQ(out->kind(), Datum::ARRAY);
+ return take_caa_exec(ctx, chunked_array_array_batch, out);
+}
+
+Status TakeACCChunkedExec(ArrayKernelExec take_aaa_exec, KernelContext* ctx,
+ const ExecBatch& batch, Datum* out) {
+ auto& values = batch.values[0].array();
+ auto& indices = batch.values[1].chunked_array();
+ auto num_chunks = indices->num_chunks();
+ std::vector<std::shared_ptr<Array>> new_chunks(num_chunks);
+ for (int i = 0; i < num_chunks; i++) {
+ // Take with that indices chunk
+ auto& indices_chunk = indices->chunk(i)->data();
+ Datum result = PrepareOutput(batch, values->length);
+ RETURN_NOT_OK(CallAAAKernel(take_aaa_exec, ctx, values, indices_chunk,
&result));
+ new_chunks[i] = MakeArray(result.array());
+ }
+ out->value = std::make_shared<ChunkedArray>(std::move(new_chunks),
values->type);
+ return Status::OK();
+}
+
+/// \brief Generic (slower) VectorKernel::exec_chunked (`CA->C`, `CC->C`, and
`AC->C`).
+///
+/// This function concatenates the chunks of the values and then calls the
`AA->A` take
+/// kernel to handle the `CA->C` cases. The ArrayData returned by the `AA->A`
kernel is
+/// converted to a ChunkedArray with a single chunk to honor the `CA->C`
contract.
+///
+/// For `CC->C` cases, it concatenates the chunks of the values and calls the
`AA->A` take
+/// kernel for each chunk of the indices, producing a new chunked array with
the same
+/// shape as the indices.
+///
+/// `AC->C` cases are trivially delegated to TakeACCChunkedExec without any
concatenation.
+///
+/// \param take_aaa_exec The `AA->A` take kernel to use.
+Status GenericTakeChunkedExec(ArrayKernelExec take_aaa_exec, KernelContext*
ctx,
+ const ExecBatch& batch, Datum* out) {
+ const auto& args = batch.values;
+ if (args[0].kind() == Datum::CHUNKED_ARRAY) {
+ auto& values_chunked = args[0].chunked_array();
Review Comment:
Can we settle on `const auto&` unless it's necessary to get a mutable ref?
##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -395,10 +526,45 @@ struct FixedWidthTakeImpl {
out_arr->null_count = out_arr->length - valid_count;
return Status::OK();
}
+
+ static Status ChunkedExec(KernelContext* ctx, const ChunkedArray& values,
+ const ArraySpan& indices, ArrayData* out_arr,
+ int64_t factor) {
+ const bool out_has_validity = values.null_count() > 0 ||
indices.MayHaveNulls();
+
+ ChunkedFixedWidthValuesSpan chunked_values{values};
+ ResolvedIndicesState resolved_idx;
+ RETURN_NOT_OK(resolved_idx.InitWithIndices<IndexCType>(
+ /*chunks=*/values.chunks(), /*idx_length=*/indices.length,
+ /*idx=*/indices.GetValues<IndexCType>(1), ctx->memory_pool()));
+
+ int64_t valid_count = 0;
+ arrow::internal::GatherFromChunks<kValueWidthInBits, IndexCType,
WithFactor::value>
Review Comment:
As a further optimization, perhaps we should test whether `chunk_index_vec`
contains a single distinct chunk index, in which case we could similarly
fallback on ArraySpan-take (I'm not sure how expensive the check would be,
though).
Probably for another PR/issue, if any.
##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -395,10 +526,45 @@ struct FixedWidthTakeImpl {
out_arr->null_count = out_arr->length - valid_count;
return Status::OK();
}
+
+ static Status ChunkedExec(KernelContext* ctx, const ChunkedArray& values,
+ const ArraySpan& indices, ArrayData* out_arr,
+ int64_t factor) {
+ const bool out_has_validity = values.null_count() > 0 ||
indices.MayHaveNulls();
Review Comment:
Or it seems this is actually handled at a higher level, in the kernel exec
scaffolding?
##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -715,12 +853,247 @@ class TakeMetaFunction : public MetaFunction {
return Status::NotImplemented(
"Unsupported types for take operation: "
"values=",
- args[0].ToString(), "indices=", args[1].ToString());
+ args[0].ToString(), ", indices=", args[1].ToString());
}
};
// ----------------------------------------------------------------------
+/// \brief Prepare the output array like ExecuteArrayKernel::PrepareOutput()
+std::shared_ptr<ArrayData> PrepareOutput(const ExecBatch& batch, int64_t
length) {
+ DCHECK_EQ(batch.length, length);
+ auto out = std::make_shared<ArrayData>(batch.values[0].type(), length);
+ out->buffers.resize(batch.values[0].type()->layout().buffers.size());
+ return out;
+}
+
+Result<std::shared_ptr<Array>> ChunkedArrayAsArray(
+ const std::shared_ptr<ChunkedArray>& values, MemoryPool* pool) {
+ switch (values->num_chunks()) {
+ case 0:
+ return MakeArrayOfNull(values->type(), /*length=*/0, pool);
+ case 1:
+ return values->chunk(0);
+ default:
+ return Concatenate(values->chunks(), pool);
+ }
+}
+
+Status CallAAAKernel(ArrayKernelExec take_aaa_exec, KernelContext* ctx,
+ std::shared_ptr<ArrayData> values,
+ std::shared_ptr<ArrayData> indices, Datum* out) {
+ int64_t batch_length = values->length;
+ std::vector<Datum> args = {std::move(values), std::move(indices)};
+ ExecBatch array_array_batch(std::move(args), batch_length);
+ DCHECK_EQ(out->kind(), Datum::ARRAY);
+ ExecSpan exec_span{array_array_batch};
+ ExecResult result;
+ result.value = out->array();
+ RETURN_NOT_OK(take_aaa_exec(ctx, exec_span, &result));
+ DCHECK(result.is_array_data());
+ out->value = result.array_data();
+ return Status::OK();
+}
+
+Status CallCAAKernel(VectorKernel::ChunkedExec take_caa_exec, KernelContext*
ctx,
+ std::shared_ptr<ChunkedArray> values,
+ std::shared_ptr<ArrayData> indices, Datum* out) {
+ int64_t batch_length = values->length();
+ std::vector<Datum> args = {std::move(values), std::move(indices)};
+ ExecBatch chunked_array_array_batch(std::move(args), batch_length);
+ DCHECK_EQ(out->kind(), Datum::ARRAY);
+ return take_caa_exec(ctx, chunked_array_array_batch, out);
+}
+
+Status TakeACCChunkedExec(ArrayKernelExec take_aaa_exec, KernelContext* ctx,
+ const ExecBatch& batch, Datum* out) {
+ auto& values = batch.values[0].array();
+ auto& indices = batch.values[1].chunked_array();
+ auto num_chunks = indices->num_chunks();
+ std::vector<std::shared_ptr<Array>> new_chunks(num_chunks);
+ for (int i = 0; i < num_chunks; i++) {
+ // Take with that indices chunk
+ auto& indices_chunk = indices->chunk(i)->data();
+ Datum result = PrepareOutput(batch, values->length);
+ RETURN_NOT_OK(CallAAAKernel(take_aaa_exec, ctx, values, indices_chunk,
&result));
+ new_chunks[i] = MakeArray(result.array());
+ }
+ out->value = std::make_shared<ChunkedArray>(std::move(new_chunks),
values->type);
+ return Status::OK();
+}
+
+/// \brief Generic (slower) VectorKernel::exec_chunked (`CA->C`, `CC->C`, and
`AC->C`).
+///
+/// This function concatenates the chunks of the values and then calls the
`AA->A` take
+/// kernel to handle the `CA->C` cases. The ArrayData returned by the `AA->A`
kernel is
+/// converted to a ChunkedArray with a single chunk to honor the `CA->C`
contract.
+///
+/// For `CC->C` cases, it concatenates the chunks of the values and calls the
`AA->A` take
+/// kernel for each chunk of the indices, producing a new chunked array with
the same
+/// shape as the indices.
+///
+/// `AC->C` cases are trivially delegated to TakeACCChunkedExec without any
concatenation.
+///
+/// \param take_aaa_exec The `AA->A` take kernel to use.
+Status GenericTakeChunkedExec(ArrayKernelExec take_aaa_exec, KernelContext*
ctx,
+ const ExecBatch& batch, Datum* out) {
+ const auto& args = batch.values;
+ if (args[0].kind() == Datum::CHUNKED_ARRAY) {
+ auto& values_chunked = args[0].chunked_array();
+ ARROW_ASSIGN_OR_RAISE(auto values_array,
+ ChunkedArrayAsArray(values_chunked,
ctx->memory_pool()));
+ if (args[1].kind() == Datum::ARRAY) {
+ // CA->C
+ auto& indices = args[1].array();
+ DCHECK_EQ(values_array->length(), batch.length);
+ {
+ // AA->A
+ RETURN_NOT_OK(
+ CallAAAKernel(take_aaa_exec, ctx, values_array->data(), indices,
out));
+ out->value = std::make_shared<ChunkedArray>(MakeArray(out->array()));
+ }
+ return Status::OK();
+ } else if (args[1].kind() == Datum::CHUNKED_ARRAY) {
+ // CC->C
+ const auto& indices = args[1].chunked_array();
+ std::vector<std::shared_ptr<Array>> new_chunks;
+ for (int i = 0; i < indices->num_chunks(); i++) {
+ // AA->A
+ auto& indices_chunk = indices->chunk(i)->data();
+ Datum result = PrepareOutput(batch, values_array->length());
+ RETURN_NOT_OK(CallAAAKernel(take_aaa_exec, ctx, values_array->data(),
+ indices_chunk, &result));
+ new_chunks.push_back(MakeArray(result.array()));
+ }
+ DCHECK(out->is_array());
+ out->value =
+ std::make_shared<ChunkedArray>(std::move(new_chunks),
values_chunked->type());
+ return Status::OK();
+ }
+ } else {
+ // VectorKernel::exec_chunked are only called when at least one of the
inputs is
+ // chunked, so we should be able to assume that args[1] is a chunked array
when
+ // everything is wired up correctly.
+ if (args[1].kind() == Datum::CHUNKED_ARRAY) {
+ // AC->C
+ return TakeACCChunkedExec(take_aaa_exec, ctx, batch, out);
+ } else {
+ DCHECK(false) << "Unexpected kind for array_take's exec_chunked kernel:
values="
+ << args[0].ToString() << ", indices=" <<
args[1].ToString();
+ }
+ }
+ return Status::NotImplemented(
+ "Unsupported kinds for 'array_take', try using 'take': "
+ "values=",
+ args[0].ToString(), ", indices=", args[1].ToString());
+}
+
+template <ArrayKernelExec kTakeAAAExec>
+struct GenericTakeChunkedExecFunctor {
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ return GenericTakeChunkedExec(kTakeAAAExec, ctx, batch, out);
+ }
+};
+
+/// \brief Specialized (faster) VectorKernel::exec_chunked (`CA->C`, `CC->C`,
`AC->C`).
+///
+/// This function doesn't ever need to concatenate the chunks of the values,
so it can be
+/// more efficient than GenericTakeChunkedExec that can only delegate to the
`AA->A` take
+/// kernels.
+///
+/// For `CA->C` cases, it can call the `CA->A` take kernel directly [1] and
trivially
+/// convert the result to a ChunkedArray of a single chunk to honor the
`CA->C` contract.
+///
+/// For `CC->C` cases it can call the `CA->A` take kernel for each chunk of
the indices to
+/// get each chunk that becomes the ChunkedArray output.
+///
+/// `AC->C` cases are trivially delegated to TakeACCChunkedExec.
+///
+/// \param take_aaa_exec The `AA->A` take kernel to use.
+Status SpecialTakeChunkedExec(const ArrayKernelExec take_aaa_exec,
+ VectorKernel::ChunkedExec take_caa_exec,
KernelContext* ctx,
+ const ExecBatch& batch, Datum* out) {
+ Datum result = PrepareOutput(batch, batch.length);
+ auto* pool = ctx->memory_pool();
+ const auto& args = batch.values;
+ if (args[0].kind() == Datum::CHUNKED_ARRAY) {
+ auto& values_chunked = args[0].chunked_array();
+ std::shared_ptr<Array> single_chunk = nullptr;
+ if (values_chunked->num_chunks() == 0 || values_chunked->length() == 0) {
Review Comment:
The disjonction is not necessary, is it? This can be reduced to
```suggestion
if (values_chunked->length() == 0) {
```
##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -736,22 +1109,40 @@ void
PopulateTakeKernels(std::vector<SelectionKernelData>* out) {
auto take_indices = match::Integer();
*out = {
- {InputType(match::Primitive()), take_indices, FixedWidthTakeExec},
- {InputType(match::BinaryLike()), take_indices, VarBinaryTakeExec},
- {InputType(match::LargeBinaryLike()), take_indices,
LargeVarBinaryTakeExec},
- {InputType(match::FixedSizeBinaryLike()), take_indices,
FixedWidthTakeExec},
- {InputType(null()), take_indices, NullTakeExec},
- {InputType(Type::DICTIONARY), take_indices, DictionaryTake},
- {InputType(Type::EXTENSION), take_indices, ExtensionTake},
- {InputType(Type::LIST), take_indices, ListTakeExec},
- {InputType(Type::LARGE_LIST), take_indices, LargeListTakeExec},
- {InputType(Type::LIST_VIEW), take_indices, ListViewTakeExec},
- {InputType(Type::LARGE_LIST_VIEW), take_indices, LargeListViewTakeExec},
- {InputType(Type::FIXED_SIZE_LIST), take_indices, FSLTakeExec},
- {InputType(Type::DENSE_UNION), take_indices, DenseUnionTakeExec},
- {InputType(Type::SPARSE_UNION), take_indices, SparseUnionTakeExec},
- {InputType(Type::STRUCT), take_indices, StructTakeExec},
- {InputType(Type::MAP), take_indices, MapTakeExec},
+ {InputType(match::Primitive()), take_indices, FixedWidthTakeExec,
+ SpecialTakeChunkedExecFunctor<FixedWidthTakeExec,
+ FixedWidthTakeChunkedExec>::Exec},
+ {InputType(match::BinaryLike()), take_indices, VarBinaryTakeExec,
+ GenericTakeChunkedExecFunctor<VarBinaryTakeExec>::Exec},
+ {InputType(match::LargeBinaryLike()), take_indices,
LargeVarBinaryTakeExec,
+ GenericTakeChunkedExecFunctor<LargeVarBinaryTakeExec>::Exec},
+ {InputType(match::FixedSizeBinaryLike()), take_indices,
FixedWidthTakeExec,
+ SpecialTakeChunkedExecFunctor<FixedWidthTakeExec,
+ FixedWidthTakeChunkedExec>::Exec},
+ {InputType(null()), take_indices, NullTakeExec,
+ GenericTakeChunkedExecFunctor<NullTakeExec>::Exec},
+ {InputType(Type::DICTIONARY), take_indices, DictionaryTake,
+ GenericTakeChunkedExecFunctor<DictionaryTake>::Exec},
+ {InputType(Type::EXTENSION), take_indices, ExtensionTake,
+ GenericTakeChunkedExecFunctor<ExtensionTake>::Exec},
+ {InputType(Type::LIST), take_indices, ListTakeExec,
+ GenericTakeChunkedExecFunctor<ListTakeExec>::Exec},
+ {InputType(Type::LARGE_LIST), take_indices, LargeListTakeExec,
+ GenericTakeChunkedExecFunctor<LargeListTakeExec>::Exec},
+ {InputType(Type::LIST_VIEW), take_indices, ListViewTakeExec,
+ GenericTakeChunkedExecFunctor<ListViewTakeExec>::Exec},
+ {InputType(Type::LARGE_LIST_VIEW), take_indices, LargeListViewTakeExec,
+ GenericTakeChunkedExecFunctor<LargeListViewTakeExec>::Exec},
+ {InputType(Type::FIXED_SIZE_LIST), take_indices, FSLTakeExec,
+ GenericTakeChunkedExecFunctor<FSLTakeExec>::Exec},
Review Comment:
We may need a more powerful type matcher, but ideally fixed size lists of
fixed width types should go through the special chunked exec function?
##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -736,22 +1109,40 @@ void
PopulateTakeKernels(std::vector<SelectionKernelData>* out) {
auto take_indices = match::Integer();
*out = {
- {InputType(match::Primitive()), take_indices, FixedWidthTakeExec},
- {InputType(match::BinaryLike()), take_indices, VarBinaryTakeExec},
- {InputType(match::LargeBinaryLike()), take_indices,
LargeVarBinaryTakeExec},
- {InputType(match::FixedSizeBinaryLike()), take_indices,
FixedWidthTakeExec},
- {InputType(null()), take_indices, NullTakeExec},
- {InputType(Type::DICTIONARY), take_indices, DictionaryTake},
- {InputType(Type::EXTENSION), take_indices, ExtensionTake},
- {InputType(Type::LIST), take_indices, ListTakeExec},
- {InputType(Type::LARGE_LIST), take_indices, LargeListTakeExec},
- {InputType(Type::LIST_VIEW), take_indices, ListViewTakeExec},
- {InputType(Type::LARGE_LIST_VIEW), take_indices, LargeListViewTakeExec},
- {InputType(Type::FIXED_SIZE_LIST), take_indices, FSLTakeExec},
- {InputType(Type::DENSE_UNION), take_indices, DenseUnionTakeExec},
- {InputType(Type::SPARSE_UNION), take_indices, SparseUnionTakeExec},
- {InputType(Type::STRUCT), take_indices, StructTakeExec},
- {InputType(Type::MAP), take_indices, MapTakeExec},
+ {InputType(match::Primitive()), take_indices, FixedWidthTakeExec,
+ SpecialTakeChunkedExecFunctor<FixedWidthTakeExec,
+ FixedWidthTakeChunkedExec>::Exec},
+ {InputType(match::BinaryLike()), take_indices, VarBinaryTakeExec,
+ GenericTakeChunkedExecFunctor<VarBinaryTakeExec>::Exec},
+ {InputType(match::LargeBinaryLike()), take_indices,
LargeVarBinaryTakeExec,
+ GenericTakeChunkedExecFunctor<LargeVarBinaryTakeExec>::Exec},
+ {InputType(match::FixedSizeBinaryLike()), take_indices,
FixedWidthTakeExec,
+ SpecialTakeChunkedExecFunctor<FixedWidthTakeExec,
+ FixedWidthTakeChunkedExec>::Exec},
+ {InputType(null()), take_indices, NullTakeExec,
+ GenericTakeChunkedExecFunctor<NullTakeExec>::Exec},
+ {InputType(Type::DICTIONARY), take_indices, DictionaryTake,
+ GenericTakeChunkedExecFunctor<DictionaryTake>::Exec},
+ {InputType(Type::EXTENSION), take_indices, ExtensionTake,
+ GenericTakeChunkedExecFunctor<ExtensionTake>::Exec},
+ {InputType(Type::LIST), take_indices, ListTakeExec,
+ GenericTakeChunkedExecFunctor<ListTakeExec>::Exec},
+ {InputType(Type::LARGE_LIST), take_indices, LargeListTakeExec,
+ GenericTakeChunkedExecFunctor<LargeListTakeExec>::Exec},
+ {InputType(Type::LIST_VIEW), take_indices, ListViewTakeExec,
+ GenericTakeChunkedExecFunctor<ListViewTakeExec>::Exec},
+ {InputType(Type::LARGE_LIST_VIEW), take_indices, LargeListViewTakeExec,
+ GenericTakeChunkedExecFunctor<LargeListViewTakeExec>::Exec},
+ {InputType(Type::FIXED_SIZE_LIST), take_indices, FSLTakeExec,
+ GenericTakeChunkedExecFunctor<FSLTakeExec>::Exec},
Review Comment:
Perhaps open a GH issue for this.
##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -357,15 +481,22 @@ template <typename IndexCType, typename
ValueBitWidthConstant,
struct FixedWidthTakeImpl {
static constexpr int kValueWidthInBits = ValueBitWidthConstant::value;
- static Status Exec(KernelContext* ctx, const ArraySpan& values,
+ static Status Exec(KernelContext* ctx, const ValuesSpan& values,
const ArraySpan& indices, ArrayData* out_arr, int64_t
factor) {
#ifndef NDEBUG
- int64_t bit_width = util::FixedWidthInBits(*values.type);
+ int64_t bit_width = util::FixedWidthInBits(*values.type());
DCHECK(WithFactor::value || (kValueWidthInBits == bit_width && factor ==
1));
DCHECK(!WithFactor::value ||
(factor > 0 && kValueWidthInBits == 8 && // factors are used with
bytes
static_cast<int64_t>(factor * kValueWidthInBits) == bit_width));
#endif
+ return values.is_chunked()
+ ? ChunkedExec(ctx, values.chunked_array(), indices, out_arr,
factor)
+ : Exec(ctx, values.array(), indices, out_arr, factor);
+ }
+
+ static Status Exec(KernelContext* ctx, const ArraySpan& values,
Review Comment:
Perhaps call this `ContiguousExec` similar to `ChunkedExec` below?
##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -736,22 +1109,40 @@ void
PopulateTakeKernels(std::vector<SelectionKernelData>* out) {
auto take_indices = match::Integer();
*out = {
- {InputType(match::Primitive()), take_indices, FixedWidthTakeExec},
- {InputType(match::BinaryLike()), take_indices, VarBinaryTakeExec},
- {InputType(match::LargeBinaryLike()), take_indices,
LargeVarBinaryTakeExec},
- {InputType(match::FixedSizeBinaryLike()), take_indices,
FixedWidthTakeExec},
- {InputType(null()), take_indices, NullTakeExec},
- {InputType(Type::DICTIONARY), take_indices, DictionaryTake},
- {InputType(Type::EXTENSION), take_indices, ExtensionTake},
- {InputType(Type::LIST), take_indices, ListTakeExec},
- {InputType(Type::LARGE_LIST), take_indices, LargeListTakeExec},
- {InputType(Type::LIST_VIEW), take_indices, ListViewTakeExec},
- {InputType(Type::LARGE_LIST_VIEW), take_indices, LargeListViewTakeExec},
- {InputType(Type::FIXED_SIZE_LIST), take_indices, FSLTakeExec},
- {InputType(Type::DENSE_UNION), take_indices, DenseUnionTakeExec},
- {InputType(Type::SPARSE_UNION), take_indices, SparseUnionTakeExec},
- {InputType(Type::STRUCT), take_indices, StructTakeExec},
- {InputType(Type::MAP), take_indices, MapTakeExec},
+ {InputType(match::Primitive()), take_indices, FixedWidthTakeExec,
+ SpecialTakeChunkedExecFunctor<FixedWidthTakeExec,
+ FixedWidthTakeChunkedExec>::Exec},
+ {InputType(match::BinaryLike()), take_indices, VarBinaryTakeExec,
+ GenericTakeChunkedExecFunctor<VarBinaryTakeExec>::Exec},
+ {InputType(match::LargeBinaryLike()), take_indices,
LargeVarBinaryTakeExec,
+ GenericTakeChunkedExecFunctor<LargeVarBinaryTakeExec>::Exec},
+ {InputType(match::FixedSizeBinaryLike()), take_indices,
FixedWidthTakeExec,
+ SpecialTakeChunkedExecFunctor<FixedWidthTakeExec,
+ FixedWidthTakeChunkedExec>::Exec},
+ {InputType(null()), take_indices, NullTakeExec,
+ GenericTakeChunkedExecFunctor<NullTakeExec>::Exec},
Review Comment:
This use case is probably not very important, but we should perhaps open a
low-priority issue for optimizing the null-take kernel :-)
##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -326,6 +327,129 @@ namespace {
using TakeState = OptionsWrapper<TakeOptions>;
+class ValuesSpan {
+ private:
+ const std::shared_ptr<ChunkedArray> chunked_ = nullptr;
+ const ArraySpan chunk0_; // first chunk or the whole array
+
+ public:
+ explicit ValuesSpan(const std::shared_ptr<ChunkedArray> values)
+ : chunked_(std::move(values)), chunk0_{*values->chunk(0)->data()} {
+ DCHECK(chunked_);
+ DCHECK_GT(chunked_->num_chunks(), 0);
+ }
+
+ explicit ValuesSpan(const ArraySpan& values) : chunk0_(values) {}
+
+ bool is_chunked() const { return chunked_ != nullptr; }
+
+ const ChunkedArray& chunked_array() const {
+ DCHECK(is_chunked());
+ return *chunked_;
+ }
+
+ const ArraySpan& chunk0() const { return chunk0_; }
+
+ const ArraySpan& array() const {
+ DCHECK(!is_chunked());
+ return chunk0_;
+ }
+
+ const DataType* type() const { return chunk0_.type; }
+
+ int64_t length() const { return is_chunked() ? chunked_->length() :
array().length; }
+
+ bool MayHaveNulls() const {
+ return is_chunked() ? chunked_->null_count() != 0 : array().MayHaveNulls();
+ }
+};
+
+struct ChunkedFixedWidthValuesSpan {
+ private:
+ // src_residual_bit_offsets_[i] is used to store the bit offset of the first
byte (0-7)
+ // in src_chunks_[i] iff kValueWidthInBits == 1.
+ std::vector<int> src_residual_bit_offsets;
+ // Pre-computed pointers to the start of the values in each chunk.
+ std::vector<const uint8_t*> src_chunks;
+
+ public:
+ explicit ChunkedFixedWidthValuesSpan(const ChunkedArray& values) {
+ const bool chunk_values_are_bit_sized = values.type()->id() == Type::BOOL;
+ DCHECK_EQ(chunk_values_are_bit_sized,
util::FixedWidthInBytes(*values.type()) == -1);
+ if (chunk_values_are_bit_sized) {
+ src_residual_bit_offsets.resize(values.num_chunks());
+ }
+ src_chunks.resize(values.num_chunks());
+
+ for (int i = 0; i < values.num_chunks(); ++i) {
+ const ArraySpan chunk{*values.chunk(i)->data()};
+ DCHECK(util::IsFixedWidthLike(chunk));
+
+ auto offset_pointer = util::OffsetPointerOfFixedBitWidthValues(chunk);
+ if (chunk_values_are_bit_sized) {
+ src_residual_bit_offsets[i] = offset_pointer.first;
+ } else {
+ DCHECK_EQ(offset_pointer.first, 0);
+ }
+ src_chunks[i] = offset_pointer.second;
+ }
+ }
+
+ ~ChunkedFixedWidthValuesSpan() = default;
Review Comment:
Is this actually necessary?
##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -395,10 +526,45 @@ struct FixedWidthTakeImpl {
out_arr->null_count = out_arr->length - valid_count;
return Status::OK();
}
+
+ static Status ChunkedExec(KernelContext* ctx, const ChunkedArray& values,
+ const ArraySpan& indices, ArrayData* out_arr,
+ int64_t factor) {
+ const bool out_has_validity = values.null_count() > 0 ||
indices.MayHaveNulls();
Review Comment:
As an optimization, do we want to fallback on ArraySpan-take if
`values.num_chunks() == 1`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]