pitrou commented on a change in pull request #10806:
URL: https://github.com/apache/arrow/pull/10806#discussion_r691418361
##########
File path: cpp/src/arrow/compute/kernels/scalar_if_else_benchmark.cc
##########
@@ -180,12 +184,50 @@ static void CaseWhenBench(benchmark::State& state) {
val3->Slice(offset),
val4->Slice(offset)}));
}
- state.SetBytesProcessed(state.iterations() * (len - offset) * sizeof(CType));
+ // Set bytes processed to ~length of output
+ state.SetBytesProcessed(state.iterations() *
GetBytesProcessed<Type>::Get(val1));
+ state.SetItemsProcessed(state.iterations() * (len - offset));
+}
+
+static void CaseWhenBenchList(benchmark::State& state) {
+ auto type = list(int64());
+ auto fld = field("", type);
+
+ int64_t len = state.range(0);
+ int64_t offset = state.range(1);
+
+ random::RandomArrayGenerator rand(/*seed=*/0);
+
+ auto cond1 = std::static_pointer_cast<BooleanArray>(
+ rand.ArrayOf(boolean(), len, /*null_probability=*/0.01));
+ auto cond2 = std::static_pointer_cast<BooleanArray>(
+ rand.ArrayOf(boolean(), len, /*null_probability=*/0.01));
+ auto cond3 = std::static_pointer_cast<BooleanArray>(
+ rand.ArrayOf(boolean(), len, /*null_probability=*/0.01));
Review comment:
Am I missing something, or are these 3 arrays not used?
##########
File path: cpp/src/arrow/buffer_builder.h
##########
@@ -350,6 +350,17 @@ class TypedBufferBuilder<bool> {
bit_length_ += num_elements;
}
+ void UnsafeAppend(const uint8_t* bitmap, int64_t offset, int64_t
num_elements) {
+ if (num_elements == 0) return;
+ int64_t i = offset;
+ internal::GenerateBitsUnrolled(mutable_data(), bit_length_, num_elements,
[&] {
Review comment:
Perhaps use `CopyBitmap` instead? It should be faster on large numbers
of elements.
##########
File path: cpp/src/arrow/compute/kernels/scalar_if_else.cc
##########
@@ -1413,6 +1417,588 @@ struct CaseWhenFunctor<NullType> {
}
};
+static Status ExecVarWidthScalarCaseWhen(KernelContext* ctx, const ExecBatch&
batch,
+ Datum* out) {
+ const auto& conds = checked_cast<const
StructScalar&>(*batch.values[0].scalar());
+ Datum result;
+ for (size_t i = 0; i < batch.values.size() - 1; i++) {
+ if (i < conds.value.size()) {
+ const Scalar& cond = *conds.value[i];
+ if (cond.is_valid && internal::UnboxScalar<BooleanType>::Unbox(cond)) {
+ result = batch[i + 1];
+ break;
+ }
+ } else {
+ // ELSE clause
+ result = batch[i + 1];
+ break;
+ }
+ }
+ if (out->is_scalar()) {
+ *out = result.is_scalar() ? result.scalar() : MakeNullScalar(out->type());
+ return Status::OK();
+ }
+ ArrayData* output = out->mutable_array();
+ if (!result.is_value()) {
+ // All conditions false, no 'else' argument
+ ARROW_ASSIGN_OR_RAISE(
+ auto array, MakeArrayOfNull(output->type, batch.length,
ctx->memory_pool()));
+ *output = *array->data();
+ } else if (result.is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*result.scalar(),
batch.length,
+ ctx->memory_pool()));
+ *output = *array->data();
+ } else {
+ *output = *result.array();
+ }
+ return Status::OK();
+}
+
+template <typename ReserveData, typename AppendScalar, typename AppendArray>
+static Status ExecVarWidthArrayCaseWhen(KernelContext* ctx, const ExecBatch&
batch,
+ Datum* out, ReserveData reserve_data,
+ AppendScalar append_scalar,
+ AppendArray append_array) {
+ const auto& conds_array = *batch.values[0].array();
+ ArrayData* output = out->mutable_array();
+ const bool have_else_arg =
+ static_cast<size_t>(conds_array.type->num_fields()) <
(batch.values.size() - 1);
+ std::unique_ptr<ArrayBuilder> raw_builder;
+ RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), out->type(), &raw_builder));
+ RETURN_NOT_OK(raw_builder->Reserve(batch.length));
+ RETURN_NOT_OK(reserve_data(raw_builder.get()));
+
+ for (int64_t row = 0; row < batch.length; row++) {
+ int64_t selected = have_else_arg ?
static_cast<int64_t>(batch.values.size() - 1) : -1;
+ for (int64_t arg = 0; static_cast<size_t>(arg) <
conds_array.child_data.size();
+ arg++) {
+ const ArrayData& cond_array = *conds_array.child_data[arg];
+ if ((!cond_array.buffers[0] ||
+ BitUtil::GetBit(cond_array.buffers[0]->data(),
+ conds_array.offset + cond_array.offset + row)) &&
+ BitUtil::GetBit(cond_array.buffers[1]->data(),
+ conds_array.offset + cond_array.offset + row)) {
+ selected = arg + 1;
+ break;
+ }
+ }
+ if (selected < 0) {
+ RETURN_NOT_OK(raw_builder->AppendNull());
+ continue;
+ }
+ const Datum& source = batch.values[selected];
+ if (source.is_scalar()) {
+ const auto& scalar = *source.scalar();
+ if (!scalar.is_valid) {
+ RETURN_NOT_OK(raw_builder->AppendNull());
+ } else {
+ RETURN_NOT_OK(append_scalar(raw_builder.get(), scalar));
+ }
+ } else {
+ const auto& array = source.array();
+ if (!array->buffers[0] ||
+ BitUtil::GetBit(array->buffers[0]->data(), array->offset + row)) {
+ RETURN_NOT_OK(append_array(raw_builder.get(), array, row));
+ } else {
+ RETURN_NOT_OK(raw_builder->AppendNull());
+ }
+ }
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto temp_output, raw_builder->Finish());
+ *output = *temp_output->data();
+ return Status::OK();
+}
+
+template <typename Type>
+struct CaseWhenFunctor<Type, enable_if_base_binary<Type>> {
+ using offset_type = typename Type::offset_type;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ if (batch[0].null_count() > 0) {
+ return Status::Invalid("cond struct must not have outer nulls");
+ }
+ if (batch[0].is_scalar()) {
+ return ExecVarWidthScalarCaseWhen(ctx, batch, out);
+ }
+ return ExecArray(ctx, batch, out);
+ }
+
+ static Status ExecArray(KernelContext* ctx, const ExecBatch& batch, Datum*
out) {
+ return ExecVarWidthArrayCaseWhen(
+ ctx, batch, out,
+ // ReserveData
+ [&](ArrayBuilder* raw_builder) {
+ int64_t reservation = 0;
+ for (size_t arg = 1; arg < batch.values.size(); arg++) {
+ auto source = batch.values[arg];
+ if (source.is_scalar()) {
+ const auto& scalar =
+ checked_cast<const BaseBinaryScalar&>(*source.scalar());
+ if (!scalar.value) continue;
+ reservation =
+ std::max<int64_t>(reservation, batch.length *
scalar.value->size());
+ } else {
+ const auto& array = *source.array();
+ const auto& offsets = array.GetValues<offset_type>(1);
+ reservation =
+ std::max<int64_t>(reservation, offsets[array.length] -
offsets[0]);
+ }
+ }
+ // checked_cast works since (Large)StringBuilder <:
(Large)BinaryBuilder
+ return
checked_cast<BuilderType*>(raw_builder)->ReserveData(reservation);
+ },
+ // AppendScalar
+ [](ArrayBuilder* raw_builder, const Scalar& raw_scalar) {
+ const auto& scalar = checked_cast<const
BaseBinaryScalar&>(raw_scalar);
+ return checked_cast<BuilderType*>(raw_builder)
+ ->Append(scalar.value->data(),
+ static_cast<offset_type>(scalar.value->size()));
+ },
+ // AppendArray
+ [](ArrayBuilder* raw_builder, const std::shared_ptr<ArrayData>& array,
+ const int64_t row) {
+ const offset_type* offsets = array->GetValues<offset_type>(1);
+ return checked_cast<BuilderType*>(raw_builder)
+ ->Append(array->buffers[2]->data() + offsets[row],
+ offsets[row + 1] - offsets[row]);
+ });
+ }
+};
+
+// Given an array and a builder, append a slice of the array to the builder
+using ArrayAppenderFunc = std::function<Status(
+ ArrayBuilder*, const std::shared_ptr<ArrayData>&, int64_t, int64_t)>;
+
+static Status GetValueAppenders(const DataType& type, ArrayAppenderFunc*
array_appender);
Review comment:
Why not return `Result<ArrayAppenderFunc>`?
##########
File path: cpp/src/arrow/compute/kernels/scalar_if_else.cc
##########
@@ -1413,6 +1417,588 @@ struct CaseWhenFunctor<NullType> {
}
};
+static Status ExecVarWidthScalarCaseWhen(KernelContext* ctx, const ExecBatch&
batch,
+ Datum* out) {
+ const auto& conds = checked_cast<const
StructScalar&>(*batch.values[0].scalar());
+ Datum result;
+ for (size_t i = 0; i < batch.values.size() - 1; i++) {
+ if (i < conds.value.size()) {
+ const Scalar& cond = *conds.value[i];
+ if (cond.is_valid && internal::UnboxScalar<BooleanType>::Unbox(cond)) {
+ result = batch[i + 1];
+ break;
+ }
+ } else {
+ // ELSE clause
+ result = batch[i + 1];
+ break;
+ }
+ }
+ if (out->is_scalar()) {
+ *out = result.is_scalar() ? result.scalar() : MakeNullScalar(out->type());
+ return Status::OK();
+ }
+ ArrayData* output = out->mutable_array();
+ if (!result.is_value()) {
+ // All conditions false, no 'else' argument
+ ARROW_ASSIGN_OR_RAISE(
+ auto array, MakeArrayOfNull(output->type, batch.length,
ctx->memory_pool()));
+ *output = *array->data();
+ } else if (result.is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*result.scalar(),
batch.length,
+ ctx->memory_pool()));
+ *output = *array->data();
+ } else {
+ *output = *result.array();
+ }
+ return Status::OK();
+}
+
+template <typename ReserveData, typename AppendScalar, typename AppendArray>
+static Status ExecVarWidthArrayCaseWhen(KernelContext* ctx, const ExecBatch&
batch,
+ Datum* out, ReserveData reserve_data,
+ AppendScalar append_scalar,
+ AppendArray append_array) {
+ const auto& conds_array = *batch.values[0].array();
+ ArrayData* output = out->mutable_array();
+ const bool have_else_arg =
+ static_cast<size_t>(conds_array.type->num_fields()) <
(batch.values.size() - 1);
+ std::unique_ptr<ArrayBuilder> raw_builder;
+ RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), out->type(), &raw_builder));
+ RETURN_NOT_OK(raw_builder->Reserve(batch.length));
+ RETURN_NOT_OK(reserve_data(raw_builder.get()));
+
+ for (int64_t row = 0; row < batch.length; row++) {
+ int64_t selected = have_else_arg ?
static_cast<int64_t>(batch.values.size() - 1) : -1;
+ for (int64_t arg = 0; static_cast<size_t>(arg) <
conds_array.child_data.size();
+ arg++) {
+ const ArrayData& cond_array = *conds_array.child_data[arg];
+ if ((!cond_array.buffers[0] ||
+ BitUtil::GetBit(cond_array.buffers[0]->data(),
+ conds_array.offset + cond_array.offset + row)) &&
+ BitUtil::GetBit(cond_array.buffers[1]->data(),
+ conds_array.offset + cond_array.offset + row)) {
+ selected = arg + 1;
+ break;
+ }
+ }
+ if (selected < 0) {
+ RETURN_NOT_OK(raw_builder->AppendNull());
+ continue;
+ }
+ const Datum& source = batch.values[selected];
+ if (source.is_scalar()) {
+ const auto& scalar = *source.scalar();
+ if (!scalar.is_valid) {
+ RETURN_NOT_OK(raw_builder->AppendNull());
+ } else {
+ RETURN_NOT_OK(append_scalar(raw_builder.get(), scalar));
+ }
+ } else {
+ const auto& array = source.array();
+ if (!array->buffers[0] ||
+ BitUtil::GetBit(array->buffers[0]->data(), array->offset + row)) {
+ RETURN_NOT_OK(append_array(raw_builder.get(), array, row));
+ } else {
+ RETURN_NOT_OK(raw_builder->AppendNull());
+ }
+ }
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto temp_output, raw_builder->Finish());
+ *output = *temp_output->data();
+ return Status::OK();
+}
+
+template <typename Type>
+struct CaseWhenFunctor<Type, enable_if_base_binary<Type>> {
+ using offset_type = typename Type::offset_type;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ if (batch[0].null_count() > 0) {
+ return Status::Invalid("cond struct must not have outer nulls");
+ }
+ if (batch[0].is_scalar()) {
+ return ExecVarWidthScalarCaseWhen(ctx, batch, out);
+ }
+ return ExecArray(ctx, batch, out);
+ }
+
+ static Status ExecArray(KernelContext* ctx, const ExecBatch& batch, Datum*
out) {
+ return ExecVarWidthArrayCaseWhen(
+ ctx, batch, out,
+ // ReserveData
+ [&](ArrayBuilder* raw_builder) {
+ int64_t reservation = 0;
+ for (size_t arg = 1; arg < batch.values.size(); arg++) {
+ auto source = batch.values[arg];
+ if (source.is_scalar()) {
+ const auto& scalar =
+ checked_cast<const BaseBinaryScalar&>(*source.scalar());
+ if (!scalar.value) continue;
+ reservation =
+ std::max<int64_t>(reservation, batch.length *
scalar.value->size());
+ } else {
+ const auto& array = *source.array();
+ const auto& offsets = array.GetValues<offset_type>(1);
+ reservation =
+ std::max<int64_t>(reservation, offsets[array.length] -
offsets[0]);
+ }
+ }
+ // checked_cast works since (Large)StringBuilder <:
(Large)BinaryBuilder
+ return
checked_cast<BuilderType*>(raw_builder)->ReserveData(reservation);
+ },
+ // AppendScalar
+ [](ArrayBuilder* raw_builder, const Scalar& raw_scalar) {
+ const auto& scalar = checked_cast<const
BaseBinaryScalar&>(raw_scalar);
+ return checked_cast<BuilderType*>(raw_builder)
+ ->Append(scalar.value->data(),
+ static_cast<offset_type>(scalar.value->size()));
+ },
+ // AppendArray
+ [](ArrayBuilder* raw_builder, const std::shared_ptr<ArrayData>& array,
+ const int64_t row) {
+ const offset_type* offsets = array->GetValues<offset_type>(1);
+ return checked_cast<BuilderType*>(raw_builder)
+ ->Append(array->buffers[2]->data() + offsets[row],
+ offsets[row + 1] - offsets[row]);
+ });
+ }
+};
+
+// Given an array and a builder, append a slice of the array to the builder
+using ArrayAppenderFunc = std::function<Status(
+ ArrayBuilder*, const std::shared_ptr<ArrayData>&, int64_t, int64_t)>;
+
+static Status GetValueAppenders(const DataType& type, ArrayAppenderFunc*
array_appender);
+
+struct GetAppenders {
Review comment:
Hmm... this functionality may be useful elsewhere (and actually, perhaps
it can already be reused in other places).
Why not add a virtual method to `ArrayBuilder` (e.g. `virtual
AppendArraySlice(...)`) instead of keeping this private?
##########
File path: cpp/src/arrow/compute/kernels/scalar_if_else.cc
##########
@@ -1413,6 +1417,588 @@ struct CaseWhenFunctor<NullType> {
}
};
+static Status ExecVarWidthScalarCaseWhen(KernelContext* ctx, const ExecBatch&
batch,
+ Datum* out) {
+ const auto& conds = checked_cast<const
StructScalar&>(*batch.values[0].scalar());
+ Datum result;
+ for (size_t i = 0; i < batch.values.size() - 1; i++) {
+ if (i < conds.value.size()) {
+ const Scalar& cond = *conds.value[i];
+ if (cond.is_valid && internal::UnboxScalar<BooleanType>::Unbox(cond)) {
+ result = batch[i + 1];
+ break;
+ }
+ } else {
+ // ELSE clause
+ result = batch[i + 1];
+ break;
+ }
+ }
+ if (out->is_scalar()) {
+ *out = result.is_scalar() ? result.scalar() : MakeNullScalar(out->type());
+ return Status::OK();
+ }
+ ArrayData* output = out->mutable_array();
+ if (!result.is_value()) {
+ // All conditions false, no 'else' argument
+ ARROW_ASSIGN_OR_RAISE(
+ auto array, MakeArrayOfNull(output->type, batch.length,
ctx->memory_pool()));
+ *output = *array->data();
+ } else if (result.is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*result.scalar(),
batch.length,
+ ctx->memory_pool()));
+ *output = *array->data();
+ } else {
+ *output = *result.array();
+ }
+ return Status::OK();
+}
+
+template <typename ReserveData, typename AppendScalar, typename AppendArray>
+static Status ExecVarWidthArrayCaseWhen(KernelContext* ctx, const ExecBatch&
batch,
+ Datum* out, ReserveData reserve_data,
+ AppendScalar append_scalar,
+ AppendArray append_array) {
+ const auto& conds_array = *batch.values[0].array();
+ ArrayData* output = out->mutable_array();
+ const bool have_else_arg =
+ static_cast<size_t>(conds_array.type->num_fields()) <
(batch.values.size() - 1);
+ std::unique_ptr<ArrayBuilder> raw_builder;
+ RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), out->type(), &raw_builder));
+ RETURN_NOT_OK(raw_builder->Reserve(batch.length));
+ RETURN_NOT_OK(reserve_data(raw_builder.get()));
+
+ for (int64_t row = 0; row < batch.length; row++) {
+ int64_t selected = have_else_arg ?
static_cast<int64_t>(batch.values.size() - 1) : -1;
+ for (int64_t arg = 0; static_cast<size_t>(arg) <
conds_array.child_data.size();
+ arg++) {
+ const ArrayData& cond_array = *conds_array.child_data[arg];
+ if ((!cond_array.buffers[0] ||
+ BitUtil::GetBit(cond_array.buffers[0]->data(),
+ conds_array.offset + cond_array.offset + row)) &&
+ BitUtil::GetBit(cond_array.buffers[1]->data(),
+ conds_array.offset + cond_array.offset + row)) {
+ selected = arg + 1;
+ break;
+ }
+ }
+ if (selected < 0) {
+ RETURN_NOT_OK(raw_builder->AppendNull());
Review comment:
Can use `UnsafeAppendNull` since you reserved enough space at the start
(same below).
##########
File path: cpp/src/arrow/buffer_builder.h
##########
@@ -350,6 +350,17 @@ class TypedBufferBuilder<bool> {
bit_length_ += num_elements;
}
+ void UnsafeAppend(const uint8_t* bitmap, int64_t offset, int64_t
num_elements) {
Review comment:
Add a docstring/comment so that the difference with the previous
overload is clear?
##########
File path: cpp/src/arrow/compute/kernels/scalar_if_else.cc
##########
@@ -1413,6 +1417,588 @@ struct CaseWhenFunctor<NullType> {
}
};
+static Status ExecVarWidthScalarCaseWhen(KernelContext* ctx, const ExecBatch&
batch,
Review comment:
No need for `static` as this should be all in the anonymous namespace.
##########
File path: cpp/src/arrow/array/util.cc
##########
@@ -442,9 +442,18 @@ class NullArrayFactory {
// First buffer is always null
out_->buffers[0] = nullptr;
- // Type codes are all zero, so we can use buffer_ which has had it's memory
- // zeroed
out_->buffers[1] = buffer_;
+ // buffer_ is zeroed, but 0 may not be a valid type code
+ if (type.type_codes()[0] != 0) {
+ std::memset(buffer_->mutable_data(), type.type_codes()[0],
buffer_->size());
Review comment:
Hmm, we should do the converse here: keep `buffer_` zeroed and create a
new buffer for type codes.
##########
File path: cpp/src/arrow/compute/kernels/scalar_if_else.cc
##########
@@ -1413,6 +1417,588 @@ struct CaseWhenFunctor<NullType> {
}
};
+static Status ExecVarWidthScalarCaseWhen(KernelContext* ctx, const ExecBatch&
batch,
+ Datum* out) {
+ const auto& conds = checked_cast<const
StructScalar&>(*batch.values[0].scalar());
+ Datum result;
+ for (size_t i = 0; i < batch.values.size() - 1; i++) {
+ if (i < conds.value.size()) {
+ const Scalar& cond = *conds.value[i];
+ if (cond.is_valid && internal::UnboxScalar<BooleanType>::Unbox(cond)) {
+ result = batch[i + 1];
+ break;
+ }
+ } else {
+ // ELSE clause
+ result = batch[i + 1];
+ break;
+ }
+ }
+ if (out->is_scalar()) {
+ *out = result.is_scalar() ? result.scalar() : MakeNullScalar(out->type());
Review comment:
Should we add `DCHECK(result.is_scalar() || result.kind() ==
Datum::NONE)`?
##########
File path: cpp/src/arrow/compute/kernels/scalar_if_else.cc
##########
@@ -1413,6 +1417,588 @@ struct CaseWhenFunctor<NullType> {
}
};
+static Status ExecVarWidthScalarCaseWhen(KernelContext* ctx, const ExecBatch&
batch,
+ Datum* out) {
Review comment:
Why not return `Result<datum>`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]