pitrou commented on a change in pull request #7240: URL: https://github.com/apache/arrow/pull/7240#discussion_r428679922
########## File path: cpp/src/arrow/compute/registry.cc ########## @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/registry.h" + +#include <algorithm> +#include <memory> +#include <mutex> +#include <unordered_map> + +#include "arrow/compute/function.h" +#include "arrow/compute/registry_internal.h" +#include "arrow/status.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace compute { + +class FunctionRegistry::FunctionRegistryImpl { + public: + Status AddFunction(std::shared_ptr<const Function> function, bool allow_overwrite) { + std::lock_guard<std::mutex> mutation_guard(lock_); + + const std::string& name = function->name(); + auto it = name_to_function_.find(name); + if (it != name_to_function_.end() && !allow_overwrite) { + return Status::KeyError("Already have a function registered with name: ", name); + } + name_to_function_[name] = std::move(function); + return Status::OK(); + } + + Result<std::shared_ptr<const Function>> GetFunction(const std::string& name) const { + auto it = name_to_function_.find(name); + if (it == name_to_function_.end()) { + return Status::KeyError("No function registered with name: ", name); + } + return it->second; + } + + std::vector<std::string> GetFunctionNames() const { + std::vector<std::string> results; + for (auto it : name_to_function_) { + results.push_back(it.first); + } + std::sort(results.begin(), results.end()); + return results; + } + + int num_functions() const { return static_cast<int>(name_to_function_.size()); } + + private: + std::mutex lock_; + std::unordered_map<std::string, std::shared_ptr<const Function>> name_to_function_; +}; + +std::unique_ptr<FunctionRegistry> FunctionRegistry::Make() { + return std::unique_ptr<FunctionRegistry>(new FunctionRegistry()); +} + +FunctionRegistry::FunctionRegistry() { impl_.reset(new FunctionRegistryImpl()); } + +FunctionRegistry::~FunctionRegistry() {} + +Status FunctionRegistry::AddFunction(std::shared_ptr<const Function> function, + bool allow_overwrite) { + return impl_->AddFunction(std::move(function), allow_overwrite); +} + +Result<std::shared_ptr<const Function>> FunctionRegistry::GetFunction( + const std::string& name) const { + return impl_->GetFunction(name); +} + +std::vector<std::string> FunctionRegistry::GetFunctionNames() const { + return impl_->GetFunctionNames(); +} + +int FunctionRegistry::num_functions() const { return impl_->num_functions(); } + +static std::unique_ptr<FunctionRegistry> g_registry; +static std::once_flag func_registry_initialized; + +namespace internal { + +static void CreateBuiltInRegistry() { + g_registry = FunctionRegistry::Make(); + + // Scalar functions + RegisterScalarArithmetic(g_registry.get()); + RegisterScalarBoolean(g_registry.get()); + RegisterScalarComparison(g_registry.get()); + RegisterScalarSetLookup(g_registry.get()); + + // Aggregate functions + RegisterScalarAggregateBasic(g_registry.get()); + + // Vector functions + RegisterVectorFilter(g_registry.get()); + RegisterVectorHash(g_registry.get()); + RegisterVectorSort(g_registry.get()); + RegisterVectorTake(g_registry.get()); +} + +} // namespace internal + +FunctionRegistry* GetFunctionRegistry() { + std::call_once(func_registry_initialized, internal::CreateBuiltInRegistry); Review comment: Or simply ```c++ static auto g_registry = internal::CreateBuiltInRegistry(); ``` ########## File path: cpp/src/arrow/compute/options.h ########## @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <utility> + +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; +class DataType; + +namespace compute { + +struct ARROW_EXPORT FunctionOptions {}; + +struct ARROW_EXPORT CastOptions : public FunctionOptions { Review comment: I don't really understand why the various options classes are exposed here. Shouldn't they be in the respective `.h` files? ########## File path: cpp/src/arrow/compute/options.h ########## @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <utility> + +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; +class DataType; + +namespace compute { + +struct ARROW_EXPORT FunctionOptions {}; + +struct ARROW_EXPORT CastOptions : public FunctionOptions { + CastOptions() + : allow_int_overflow(false), + allow_time_truncate(false), + allow_time_overflow(false), + allow_decimal_truncate(false), + allow_float_truncate(false), + allow_invalid_utf8(false) {} + + explicit CastOptions(bool safe) + : allow_int_overflow(!safe), + allow_time_truncate(!safe), + allow_time_overflow(!safe), + allow_decimal_truncate(!safe), + allow_float_truncate(!safe), + allow_invalid_utf8(!safe) {} + + static CastOptions Safe() { return CastOptions(true); } + + static CastOptions Unsafe() { return CastOptions(false); } + + // Type being casted to. May be passed separate to eager function + // compute::Cast + std::shared_ptr<DataType> to_type; Review comment: It's weird to have this as part of the cast options. ########## File path: cpp/src/arrow/compute/kernels/codegen_internal.h ########## @@ -0,0 +1,648 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <memory> +#include <vector> + +#include "arrow/array.h" +#include "arrow/compute/kernel.h" +#include "arrow/scalar.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging.h" +#include "arrow/util/optional.h" +#include "arrow/util/string_view.h" +#include "arrow/visitor_inline.h" + +namespace arrow { + +using internal::BitmapReader; +using internal::FirstTimeBitmapWriter; +using internal::GenerateBitsUnrolled; + +namespace compute { + +#ifdef ARROW_EXTRA_ERROR_CONTEXT + +#define KERNEL_ABORT_IF_ERROR(ctx, expr) \ + do { \ + Status _st = (expr); \ + if (ARROW_PREDICT_FALSE(!_st.ok())) { \ + _st.AddContextLine(__FILE__, __LINE__, #expr); \ + ctx->SetStatus(_st); \ + return; \ + } \ + } while (0) + +#else + +#define KERNEL_ABORT_IF_ERROR(ctx, expr) \ + do { \ + Status _st = (expr); \ + if (ARROW_PREDICT_FALSE(!_st.ok())) { \ + ctx->SetStatus(_st); \ + return; \ + } \ + } while (0) + +#endif // ARROW_EXTRA_ERROR_CONTEXT + +// A kernel that exposes Call methods that handles iteration over ArrayData +// inputs itself +// + +constexpr int kValidity = 0; +constexpr int kBinaryOffsets = 1; +constexpr int kPrimitiveData = 1; +constexpr int kBinaryData = 2; + +// ---------------------------------------------------------------------- +// Iteration / value access utilities + +template <typename T, typename R = void> +using enable_if_has_c_type_not_boolean = enable_if_t<has_c_type<T>::value && + !is_boolean_type<T>::value, R>; + +template <typename T, typename Enable = void> +struct CodegenTraits; + +template <typename T> +struct CodegenTraits<T, enable_if_has_c_type<T>> { + using value_type = typename T::c_type; +}; + +template <typename T> +struct CodegenTraits<T, enable_if_base_binary<T>> { + using value_type = util::string_view; +}; + +template <typename Type, typename Enable = void> +struct ArrayIterator; + +template <typename Type> +struct ArrayIterator<Type, enable_if_has_c_type_not_boolean<Type>> { + using T = typename Type::c_type; + const T* values; + ArrayIterator(const ArrayData& data) : values(data.GetValues<T>(1)) {} + T operator()() { return *values++; } +}; + +template <typename Type> +struct ArrayIterator<Type, enable_if_boolean<Type>> { + BitmapReader reader; + ArrayIterator(const ArrayData& data) + : reader(data.buffers[1]->data(), data.offset, data.length) {} + bool operator()() { + bool out = reader.IsSet(); + reader.Next(); + return out; + } +}; + +template <typename Type> +struct ArrayIterator<Type, enable_if_base_binary<Type>> { + int64_t position = 0; + typename TypeTraits<Type>::ArrayType arr; + ArrayIterator(const ArrayData& data) + : arr(data.Copy()) {} + util::string_view operator()() { return arr.GetView(position++); } +}; + +template <typename Type, typename Enable = void> +struct UnboxScalar; + +template <typename Type> +struct UnboxScalar<Type, enable_if_has_c_type<Type>> { + using ScalarType = typename TypeTraits<Type>::ScalarType; + static typename Type::c_type Unbox(const Datum& datum) { + return datum.scalar_as<ScalarType>().value; + } +}; + +template <typename Type> +struct UnboxScalar<Type, enable_if_base_binary<Type>> { + static util::string_view Unbox(const Datum& datum) { + return util::string_view(*datum.scalar_as<BaseBinaryScalar>().value); + } +}; + +template <typename Type, typename Enable = void> +struct GetValueType; + +template <typename Type> +struct GetValueType<Type, enable_if_has_c_type<Type>> { + using T = typename Type::c_type; +}; + +template <typename Type> +struct GetValueType< + Type, enable_if_t<is_base_binary_type<Type>::value || is_decimal_type<Type>::value || + is_fixed_size_binary_type<Type>::value>> { + using T = util::string_view; +}; + +// ---------------------------------------------------------------------- +// Generate an array kernel given template classes + +void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out); + +void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec, + const ExecBatch& batch, Datum* out); + +// ---------------------------------------------------------------------- +// Boolean data utilities + +// ---------------------------------------------------------------------- +// Template kernel exec function generators + +template <typename T> +void Extend(const std::vector<T>& values, std::vector<T>* out) { + for (const auto& t : values) { + out->push_back(t); + } +} + +const std::vector<std::shared_ptr<DataType>>& BaseBinaryTypes(); +const std::vector<std::shared_ptr<DataType>>& SignedIntTypes(); +const std::vector<std::shared_ptr<DataType>>& UnsignedIntTypes(); +const std::vector<std::shared_ptr<DataType>>& IntTypes(); +const std::vector<std::shared_ptr<DataType>>& FloatingPointTypes(); + +// Number types without boolean +const std::vector<std::shared_ptr<DataType>>& NumericTypes(); + +// Temporal types including time and timestamps for each unit +const std::vector<std::shared_ptr<DataType>>& TemporalTypes(); + +// Integer, floating point, base binary, and temporal +const std::vector<std::shared_ptr<DataType>>& PrimitiveTypes(); + +namespace codegen { + +struct SimpleExec { + // Operator must implement + // + // static void Call(KernelContext*, const ArrayData& in, ArrayData* out) + template <typename Operator> + static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + if (batch[0].kind() == Datum::SCALAR) { + ctx->SetStatus(Status::NotImplemented("NYI")); + } else if (batch.length > 0) { + Operator::Call(ctx, *batch[0].array(), out->mutable_array()); + } + } + + // Operator must implement + // + // static void Call(KernelContext*, const ArrayData& arg0, const ArrayData& arg1, + // ArrayData* out) + template <typename Operator> + static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) { + ctx->SetStatus(Status::NotImplemented("NYI")); + } else if (batch.length > 0) { + Operator::Call(ctx, *batch[0].array(), *batch[1].array(), out->mutable_array()); + } + } +}; + +// TODO: Run benchmarks to determine if OutputAdapter is a zero-cost abstraction +struct ScalarPrimitiveExec { Review comment: It's difficult to understand why there's `ScalarPrimitiveExec` as well as `ScalarUnary` and other stuff. ########## File path: cpp/src/arrow/compute/kernels/scalar_cast_internal.h ########## @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <vector> + +#include "arrow/builder.h" +#include "arrow/compute/cast.h" +#include "arrow/compute/cast_internal.h" +#include "arrow/compute/kernels/common.h" + +namespace arrow { + +using internal::checked_cast; + +namespace compute { +namespace internal { + +template <typename OutType, typename InType, typename Enable = void> +struct CastFunctor {}; + +// No-op functor for identity casts +template <typename O, typename I> +struct CastFunctor< + O, I, enable_if_t<std::is_same<O, I>::value && is_parameter_free_type<I>::value>> { + static void Exec(KernelContext*, const ExecBatch&, Datum*) {} +}; + +void CastFromExtension(KernelContext* ctx, const ExecBatch& batch, Datum* out); + +// ---------------------------------------------------------------------- +// Dictionary to other things + +template <typename T, typename IndexType, typename Enable = void> +struct FromDictVisitor {}; + +// Visitor for Dict<FixedSizeBinaryType> +template <typename T, typename IndexType> +struct FromDictVisitor<T, IndexType, enable_if_fixed_size_binary<T>> { + using ArrayType = typename TypeTraits<T>::ArrayType; + + FromDictVisitor(KernelContext* ctx, const ArrayType& dictionary, ArrayData* output) + : dictionary_(dictionary), + byte_width_(dictionary.byte_width()), + out_(output->buffers[1]->mutable_data() + byte_width_ * output->offset) {} + + Status Init() { return Status::OK(); } + + Status VisitNull() { + memset(out_, 0, byte_width_); + out_ += byte_width_; + return Status::OK(); + } + + Status VisitValue(typename IndexType::c_type dict_index) { + const uint8_t* value = dictionary_.Value(dict_index); + memcpy(out_, value, byte_width_); + out_ += byte_width_; + return Status::OK(); + } + + Status Finish() { return Status::OK(); } + + const ArrayType& dictionary_; + int32_t byte_width_; + uint8_t* out_; +}; + +// Visitor for Dict<BinaryType> +template <typename T, typename IndexType> +struct FromDictVisitor<T, IndexType, enable_if_base_binary<T>> { + using ArrayType = typename TypeTraits<T>::ArrayType; + + FromDictVisitor(KernelContext* ctx, const ArrayType& dictionary, ArrayData* output) + : ctx_(ctx), dictionary_(dictionary), output_(output) {} + + Status Init() { + RETURN_NOT_OK(MakeBuilder(ctx_->memory_pool(), output_->type, &builder_)); + binary_builder_ = checked_cast<BinaryBuilder*>(builder_.get()); + return Status::OK(); + } + + Status VisitNull() { return binary_builder_->AppendNull(); } + + Status VisitValue(typename IndexType::c_type dict_index) { + return binary_builder_->Append(dictionary_.GetView(dict_index)); + } + + Status Finish() { + std::shared_ptr<Array> plain_array; + RETURN_NOT_OK(binary_builder_->Finish(&plain_array)); + output_->buffers = plain_array->data()->buffers; + return Status::OK(); + } + + KernelContext* ctx_; + const ArrayType& dictionary_; + ArrayData* output_; + std::unique_ptr<ArrayBuilder> builder_; + BinaryBuilder* binary_builder_; +}; + +// Visitor for Dict<NumericType | TemporalType> +template <typename T, typename IndexType> +struct FromDictVisitor< + T, IndexType, enable_if_t<is_number_type<T>::value || is_temporal_type<T>::value>> { + using ArrayType = typename TypeTraits<T>::ArrayType; + + using value_type = typename T::c_type; + + FromDictVisitor(KernelContext* ctx, const ArrayType& dictionary, ArrayData* output) + : dictionary_(dictionary), out_(output->GetMutableValues<value_type>(1)) {} + + Status Init() { return Status::OK(); } + + Status VisitNull() { + *out_++ = value_type{}; // Zero-initialize + return Status::OK(); + } + + Status VisitValue(typename IndexType::c_type dict_index) { + *out_++ = dictionary_.Value(dict_index); + return Status::OK(); + } + + Status Finish() { return Status::OK(); } + + const ArrayType& dictionary_; + value_type* out_; +}; + +template <typename T> +struct FromDictUnpackHelper { + using ArrayType = typename TypeTraits<T>::ArrayType; + + template <typename IndexType> + void Unpack(KernelContext* ctx, const ArrayData& indices, const ArrayType& dictionary, + ArrayData* output) { + FromDictVisitor<T, IndexType> visitor{ctx, dictionary, output}; + KERNEL_ABORT_IF_ERROR(ctx, visitor.Init()); + KERNEL_ABORT_IF_ERROR(ctx, ArrayDataVisitor<IndexType>::Visit(indices, &visitor)); + KERNEL_ABORT_IF_ERROR(ctx, visitor.Finish()); + } +}; + +// Dispatch dictionary casts to UnpackHelper +template <typename T> +struct FromDictionaryCast { + using ArrayType = typename TypeTraits<T>::ArrayType; + + static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const ArrayData& input = *batch[0].array(); + ArrayData* output = out->mutable_array(); + + const DictionaryType& type = checked_cast<const DictionaryType&>(*input.type); + const Array& dictionary = *input.dictionary; + const DataType& values_type = *dictionary.type(); + + // ARROW-7077 + if (!values_type.Equals(*output->type)) { + ctx->SetStatus(Status::Invalid("Cannot unpack dictionary of type ", type.ToString(), + " to type ", output->type->ToString())); + return; + } + + FromDictUnpackHelper<T> unpack_helper; + switch (type.index_type()->id()) { + case Type::INT8: + unpack_helper.template Unpack<Int8Type>( + ctx, input, static_cast<const ArrayType&>(dictionary), output); + break; + case Type::INT16: + unpack_helper.template Unpack<Int16Type>( + ctx, input, static_cast<const ArrayType&>(dictionary), output); + break; + case Type::INT32: + unpack_helper.template Unpack<Int32Type>( + ctx, input, static_cast<const ArrayType&>(dictionary), output); + break; + case Type::INT64: + unpack_helper.template Unpack<Int64Type>( + ctx, input, static_cast<const ArrayType&>(dictionary), output); + break; + default: + ctx->SetStatus( + Status::TypeError("Invalid index type: ", type.index_type()->ToString())); + break; + } + } +}; + +template <> +struct FromDictionaryCast<NullType> { + static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + ArrayData* output = out->mutable_array(); + output->buffers = {nullptr}; + output->null_count = batch.length; + } +}; + +template <> +struct FromDictionaryCast<BooleanType> { + static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {} +}; + +template <typename T> +struct FromNullCast { + static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + ArrayData* output = out->mutable_array(); + std::shared_ptr<Array> nulls; + Status s = MakeArrayOfNull(output->type, batch.length).Value(&nulls); + KERNEL_ABORT_IF_ERROR(ctx, s); + out->value = nulls->data(); + } +}; + +// Adds a cast function where the functor is defined and the input and output +// types have a type_singleton +template <typename InType, typename OutType> +void AddSimpleCast(InputType in_ty, OutputType out_ty, CastFunction* func) { + DCHECK_OK(func->AddKernel(InType::type_id, {in_ty}, out_ty, + CastFunctor<OutType, InType>::Exec)); +} + +void ZeroCopyCastExec(KernelContext* ctx, const ExecBatch& batch, Datum* out); + +void AddZeroCopyCast(InputType in_type, OutputType out_type, CastFunction* func); + +// OutputType::Resolver that returns a descr with the shape of the input +// argument and the type from CastOptions +Result<ValueDescr> ResolveOutputFromOptions(KernelContext* ctx, + const std::vector<ValueDescr>& args); + +template <typename T, typename Enable = void> +struct MaybeAddFromDictionary { + static void Add(const OutputType& out_ty, CastFunction* func) {} +}; + +template <typename T> +struct MaybeAddFromDictionary< + T, enable_if_t<!is_boolean_type<T>::value && !is_nested_type<T>::value && + !std::is_same<DictionaryType, T>::value>> { + static void Add(const OutputType& out_ty, CastFunction* func) { + // Dictionary unpacking not implemented for boolean or nested types Review comment: High-level question: isn't `Cast<ValueType>(Dict(indices, dictionary))` the same as `Take(dictionary, indices)`? So the Take implementation could ideally be reused without additionally hassle. ########## File path: cpp/src/arrow/compute/kernels/scalar_boolean.cc ########## @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/kernels/common.h" +#include "arrow/util/bit_util.h" + +namespace arrow { +namespace compute { + +namespace { + +enum class ResolveNull { KLEENE_LOGIC, PROPAGATE }; Review comment: Still used? ########## File path: cpp/src/arrow/compute/kernels/scalar_cast_internal.h ########## @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <vector> + +#include "arrow/builder.h" +#include "arrow/compute/cast.h" +#include "arrow/compute/cast_internal.h" +#include "arrow/compute/kernels/common.h" + +namespace arrow { + +using internal::checked_cast; + +namespace compute { +namespace internal { + +template <typename OutType, typename InType, typename Enable = void> +struct CastFunctor {}; + +// No-op functor for identity casts +template <typename O, typename I> +struct CastFunctor< + O, I, enable_if_t<std::is_same<O, I>::value && is_parameter_free_type<I>::value>> { + static void Exec(KernelContext*, const ExecBatch&, Datum*) {} +}; + +void CastFromExtension(KernelContext* ctx, const ExecBatch& batch, Datum* out); + +// ---------------------------------------------------------------------- +// Dictionary to other things + +template <typename T, typename IndexType, typename Enable = void> +struct FromDictVisitor {}; + +// Visitor for Dict<FixedSizeBinaryType> +template <typename T, typename IndexType> +struct FromDictVisitor<T, IndexType, enable_if_fixed_size_binary<T>> { + using ArrayType = typename TypeTraits<T>::ArrayType; + + FromDictVisitor(KernelContext* ctx, const ArrayType& dictionary, ArrayData* output) + : dictionary_(dictionary), + byte_width_(dictionary.byte_width()), + out_(output->buffers[1]->mutable_data() + byte_width_ * output->offset) {} + + Status Init() { return Status::OK(); } + + Status VisitNull() { + memset(out_, 0, byte_width_); + out_ += byte_width_; + return Status::OK(); + } + + Status VisitValue(typename IndexType::c_type dict_index) { + const uint8_t* value = dictionary_.Value(dict_index); + memcpy(out_, value, byte_width_); + out_ += byte_width_; + return Status::OK(); + } + + Status Finish() { return Status::OK(); } + + const ArrayType& dictionary_; + int32_t byte_width_; + uint8_t* out_; +}; + +// Visitor for Dict<BinaryType> +template <typename T, typename IndexType> +struct FromDictVisitor<T, IndexType, enable_if_base_binary<T>> { + using ArrayType = typename TypeTraits<T>::ArrayType; + + FromDictVisitor(KernelContext* ctx, const ArrayType& dictionary, ArrayData* output) + : ctx_(ctx), dictionary_(dictionary), output_(output) {} + + Status Init() { + RETURN_NOT_OK(MakeBuilder(ctx_->memory_pool(), output_->type, &builder_)); + binary_builder_ = checked_cast<BinaryBuilder*>(builder_.get()); + return Status::OK(); + } + + Status VisitNull() { return binary_builder_->AppendNull(); } + + Status VisitValue(typename IndexType::c_type dict_index) { + return binary_builder_->Append(dictionary_.GetView(dict_index)); + } + + Status Finish() { + std::shared_ptr<Array> plain_array; + RETURN_NOT_OK(binary_builder_->Finish(&plain_array)); + output_->buffers = plain_array->data()->buffers; + return Status::OK(); + } + + KernelContext* ctx_; + const ArrayType& dictionary_; + ArrayData* output_; + std::unique_ptr<ArrayBuilder> builder_; + BinaryBuilder* binary_builder_; +}; + +// Visitor for Dict<NumericType | TemporalType> +template <typename T, typename IndexType> +struct FromDictVisitor< + T, IndexType, enable_if_t<is_number_type<T>::value || is_temporal_type<T>::value>> { + using ArrayType = typename TypeTraits<T>::ArrayType; + + using value_type = typename T::c_type; + + FromDictVisitor(KernelContext* ctx, const ArrayType& dictionary, ArrayData* output) + : dictionary_(dictionary), out_(output->GetMutableValues<value_type>(1)) {} + + Status Init() { return Status::OK(); } + + Status VisitNull() { + *out_++ = value_type{}; // Zero-initialize + return Status::OK(); + } + + Status VisitValue(typename IndexType::c_type dict_index) { + *out_++ = dictionary_.Value(dict_index); + return Status::OK(); + } + + Status Finish() { return Status::OK(); } + + const ArrayType& dictionary_; + value_type* out_; +}; + +template <typename T> +struct FromDictUnpackHelper { + using ArrayType = typename TypeTraits<T>::ArrayType; + + template <typename IndexType> + void Unpack(KernelContext* ctx, const ArrayData& indices, const ArrayType& dictionary, + ArrayData* output) { + FromDictVisitor<T, IndexType> visitor{ctx, dictionary, output}; + KERNEL_ABORT_IF_ERROR(ctx, visitor.Init()); + KERNEL_ABORT_IF_ERROR(ctx, ArrayDataVisitor<IndexType>::Visit(indices, &visitor)); + KERNEL_ABORT_IF_ERROR(ctx, visitor.Finish()); + } +}; + +// Dispatch dictionary casts to UnpackHelper +template <typename T> +struct FromDictionaryCast { + using ArrayType = typename TypeTraits<T>::ArrayType; + + static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const ArrayData& input = *batch[0].array(); + ArrayData* output = out->mutable_array(); + + const DictionaryType& type = checked_cast<const DictionaryType&>(*input.type); + const Array& dictionary = *input.dictionary; + const DataType& values_type = *dictionary.type(); + + // ARROW-7077 + if (!values_type.Equals(*output->type)) { + ctx->SetStatus(Status::Invalid("Cannot unpack dictionary of type ", type.ToString(), + " to type ", output->type->ToString())); + return; + } + + FromDictUnpackHelper<T> unpack_helper; + switch (type.index_type()->id()) { + case Type::INT8: + unpack_helper.template Unpack<Int8Type>( + ctx, input, static_cast<const ArrayType&>(dictionary), output); + break; + case Type::INT16: + unpack_helper.template Unpack<Int16Type>( + ctx, input, static_cast<const ArrayType&>(dictionary), output); + break; + case Type::INT32: + unpack_helper.template Unpack<Int32Type>( + ctx, input, static_cast<const ArrayType&>(dictionary), output); + break; + case Type::INT64: + unpack_helper.template Unpack<Int64Type>( + ctx, input, static_cast<const ArrayType&>(dictionary), output); + break; + default: + ctx->SetStatus( + Status::TypeError("Invalid index type: ", type.index_type()->ToString())); + break; + } + } +}; + +template <> +struct FromDictionaryCast<NullType> { + static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + ArrayData* output = out->mutable_array(); + output->buffers = {nullptr}; + output->null_count = batch.length; + } +}; + +template <> +struct FromDictionaryCast<BooleanType> { + static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {} +}; + +template <typename T> +struct FromNullCast { + static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + ArrayData* output = out->mutable_array(); + std::shared_ptr<Array> nulls; + Status s = MakeArrayOfNull(output->type, batch.length).Value(&nulls); + KERNEL_ABORT_IF_ERROR(ctx, s); + out->value = nulls->data(); + } +}; + +// Adds a cast function where the functor is defined and the input and output +// types have a type_singleton +template <typename InType, typename OutType> +void AddSimpleCast(InputType in_ty, OutputType out_ty, CastFunction* func) { + DCHECK_OK(func->AddKernel(InType::type_id, {in_ty}, out_ty, + CastFunctor<OutType, InType>::Exec)); +} + +void ZeroCopyCastExec(KernelContext* ctx, const ExecBatch& batch, Datum* out); + +void AddZeroCopyCast(InputType in_type, OutputType out_type, CastFunction* func); + +// OutputType::Resolver that returns a descr with the shape of the input +// argument and the type from CastOptions +Result<ValueDescr> ResolveOutputFromOptions(KernelContext* ctx, + const std::vector<ValueDescr>& args); + +template <typename T, typename Enable = void> +struct MaybeAddFromDictionary { + static void Add(const OutputType& out_ty, CastFunction* func) {} +}; + +template <typename T> +struct MaybeAddFromDictionary< + T, enable_if_t<!is_boolean_type<T>::value && !is_nested_type<T>::value && + !std::is_same<DictionaryType, T>::value>> { + static void Add(const OutputType& out_ty, CastFunction* func) { + // Dictionary unpacking not implemented for boolean or nested types + DCHECK_OK(func->AddKernel(Type::DICTIONARY, {InputType::Array(Type::DICTIONARY)}, + out_ty, FromDictionaryCast<T>::Exec)); + } +}; + +template <typename OutType> +void AddCommonCasts(OutputType out_ty, CastFunction* func) { + // From null to this type + DCHECK_OK(func->AddKernel(Type::NA, {InputType::Array(null())}, out_ty, Review comment: If `AddKernel` can fail then we should propagate the errors everywhere, IMHO. ########## File path: cpp/src/arrow/compute/kernels/scalar_arithmetic.cc ########## @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/kernels/common.h" + +namespace arrow { +namespace compute { + +struct Add { + template <typename OUT, typename ARG0, typename ARG1> + static constexpr OUT Call(KernelContext*, ARG0 left, ARG1 right) { + return left + right; + } +}; + +namespace codegen { + +template <typename Op> +void MakeBinaryFunction(std::string name, FunctionRegistry* registry) { + auto func = std::make_shared<ScalarFunction>(name, /*arity=*/2); + for (const std::shared_ptr<DataType>& ty : NumericTypes()) { + DCHECK_OK(func->AddKernel({InputType::Array(ty), InputType::Array(ty)}, ty, + ScalarNumericEqualTypes::Binary<Op>(*ty))); + } + DCHECK_OK(registry->AddFunction(std::move(func))); Review comment: We should really propagate errors... ########## File path: cpp/src/arrow/compute/kernels/codegen_internal.cc ########## @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/kernels/codegen_internal.h" + +#include <cstdint> +#include <memory> +#include <mutex> +#include <vector> + +#include "arrow/type_fwd.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace compute { + +void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + ctx->SetStatus(Status::NotImplemented("This kernel is malformed")); +} + +void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec, + const ExecBatch& batch, Datum* out) { + ExecBatch flipped_batch = batch; + Datum tmp = flipped_batch.values[0]; + flipped_batch.values[0] = flipped_batch.values[1]; + flipped_batch.values[1] = tmp; + exec(ctx, flipped_batch, out); +} + +std::vector<std::shared_ptr<DataType>> g_signed_int_types; +std::vector<std::shared_ptr<DataType>> g_unsigned_int_types; +std::vector<std::shared_ptr<DataType>> g_int_types; +std::vector<std::shared_ptr<DataType>> g_floating_types; +std::vector<std::shared_ptr<DataType>> g_numeric_types; +std::vector<std::shared_ptr<DataType>> g_base_binary_types; +std::vector<std::shared_ptr<DataType>> g_temporal_types; +std::vector<std::shared_ptr<DataType>> g_primitive_types; +static std::once_flag codegen_static_initialized; + +static void InitStaticData() { + // Signed int types + g_signed_int_types.push_back(int8()); + g_signed_int_types.push_back(int16()); + g_signed_int_types.push_back(int32()); + g_signed_int_types.push_back(int64()); + + // Unsigned int types + g_unsigned_int_types.push_back(uint8()); + g_unsigned_int_types.push_back(uint16()); + g_unsigned_int_types.push_back(uint32()); + g_unsigned_int_types.push_back(uint64()); + + // All int types + Extend(g_unsigned_int_types, &g_int_types); + Extend(g_signed_int_types, &g_int_types); + + // Floating point types + g_floating_types.push_back(float32()); + g_floating_types.push_back(float64()); + + // Numeric types + Extend(g_int_types, &g_numeric_types); + Extend(g_floating_types, &g_numeric_types); + + // Temporal types + g_temporal_types.push_back(date32()); + g_temporal_types.push_back(date64()); + g_temporal_types.push_back(time32(TimeUnit::SECOND)); + g_temporal_types.push_back(time32(TimeUnit::MILLI)); + g_temporal_types.push_back(time64(TimeUnit::MICRO)); + g_temporal_types.push_back(time64(TimeUnit::NANO)); + g_temporal_types.push_back(timestamp(TimeUnit::SECOND)); + g_temporal_types.push_back(timestamp(TimeUnit::MILLI)); + g_temporal_types.push_back(timestamp(TimeUnit::MICRO)); + g_temporal_types.push_back(timestamp(TimeUnit::NANO)); + + // Base binary types (without FixedSizeBinary) + g_base_binary_types.push_back(binary()); + g_base_binary_types.push_back(utf8()); + g_base_binary_types.push_back(large_binary()); + g_base_binary_types.push_back(large_utf8()); + + // Non-parametric, non-nested types. This also DOES NOT include + // + // * Decimal + // * Fixed Size Binary + g_primitive_types.push_back(null()); + g_primitive_types.push_back(boolean()); + Extend(g_numeric_types, &g_primitive_types); + Extend(g_temporal_types, &g_primitive_types); + Extend(g_base_binary_types, &g_primitive_types); +} + +const std::vector<std::shared_ptr<DataType>>& BaseBinaryTypes() { + std::call_once(codegen_static_initialized, InitStaticData); + return g_base_binary_types; +} + +const std::vector<std::shared_ptr<DataType>>& SignedIntTypes() { + std::call_once(codegen_static_initialized, InitStaticData); Review comment: How about: ```c++ static std::vector<std::shared_ptr<DataType>> g_signed_int_types{int8(), int16(), int32(), int64()}; ``` ########## File path: cpp/src/arrow/compute/function.h ########## @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// NOTE: API is EXPERIMENTAL and will change without going through a +// deprecation cycle + +#pragma once + +#include <string> +#include <utility> +#include <vector> + +#include "arrow/compute/kernel.h" +#include "arrow/compute/options.h" // IWYU pragma: keep +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +struct ValueDescr; + +namespace compute { + +/// \brief Contains the number of required arguments for the function +struct ARROW_EXPORT FunctionArity { + static FunctionArity Nullary() { return FunctionArity(0, false); } + static FunctionArity Unary() { return FunctionArity(1, false); } + static FunctionArity Binary() { return FunctionArity(2, false); } + static FunctionArity Ternary() { return FunctionArity(3, false); } + static FunctionArity Varargs(int min_args = 1) { return FunctionArity(min_args, true); } + + FunctionArity(int num_args, bool is_varargs = false) // NOLINT implicit conversion + : num_args(num_args), is_varargs(is_varargs) {} + + /// The number of required arguments (or the minimum number for varargs + /// functions) + int num_args; + + /// If true, then the num_args is the minimum number of required arguments + bool is_varargs = false; +}; + +/// \brief Base class for function containers that are capable of dispatch to +/// kernel implementations +class ARROW_EXPORT Function { + public: + /// \brief The kind of function, which indicates in what contexts it is + /// valid for use + enum Kind { + /// A function that performs scalar data operations on whole arrays of + /// data. Can generally process Array or Scalar values. The size of the + /// output will be the same as the size (or broadcasted size, in the case + /// of mixing Array and Scalar inputs) of the input. + SCALAR, + + /// A function with array input and output whose behavior depends on the + /// values of the entire arrays passed, rather than the value of each scalar + /// value. + VECTOR, + + /// A function that computes scalar summary statistics from array input. + SCALAR_AGGREGATE + }; + + virtual ~Function() = default; + + /// \brief The name of the kernel. The registry enforces uniqueness of names + const std::string& name() const { return name_; } + + /// \brief The kind of kernel, which indicates in what contexts it is valid + /// for use + Function::Kind kind() const { return kind_; } + + /// \brief Contains the number of arguments the function requires + const FunctionArity& arity() const { return arity_; } + + /// \brief Returns the number of registered kernels for this function + virtual int num_kernels() const = 0; + + /// \brief Convenience for invoking a function with kernel dispatch and + /// memory allocation details taken care of + Result<Datum> Execute(const std::vector<Datum>& args, const FunctionOptions* options, + ExecContext* ctx = NULLPTR) const; + + protected: + Function(std::string name, Function::Kind kind, const FunctionArity& arity) + : name_(std::move(name)), kind_(kind), arity_(arity) {} + std::string name_; + Function::Kind kind_; + FunctionArity arity_; +}; + +namespace detail { + +template <typename KernelType> +class FunctionImpl : public Function { + public: + /// \brief Return vector of all available kernels for this function + const std::vector<KernelType>& kernels() const { return kernels_; } + + int num_kernels() const override { return static_cast<int>(kernels_.size()); } + + protected: + FunctionImpl(std::string name, Function::Kind kind, const FunctionArity& arity) + : Function(std::move(name), kind, arity) {} + + std::vector<KernelType> kernels_; +}; + +} // namespace detail + +/// \brief A function that executes elementwise operations on arrays or +/// scalars, and therefore whose results generally do not depend on the order +/// of the values in the arguments. Accepts and returns arrays that are all of +/// the same size. These functions roughly correspond to the functions used in +/// SQL expressions. +class ARROW_EXPORT ScalarFunction : public detail::FunctionImpl<ScalarKernel> { + public: + using KernelType = ScalarKernel; + + ScalarFunction(std::string name, const FunctionArity& arity) + : detail::FunctionImpl<ScalarKernel>(std::move(name), Function::SCALAR, arity) {} + + /// \brief Add a simple kernel (function implementation) with given + /// input/output types, no required state initialization, preallocation for + /// fixed-width types, and default null handling (intersect validity bitmaps + /// of inputs) + Status AddKernel(std::vector<InputType> in_types, OutputType out_type, + ArrayKernelExec exec, KernelInit init = NULLPTR); + + /// \brief Add a kernel (function implementation). Returns error if fails + /// to match the other parameters of the function + Status AddKernel(ScalarKernel kernel); + + /// \brief Return the first kernel that can execute the function given the + /// exact argument types (without implicit type casts or scalar->array + /// promotions) + virtual Result<const ScalarKernel*> DispatchExact( Review comment: Why `virtual`? Are there subclasses? ########## File path: cpp/src/arrow/compute/kernels/codegen_internal.h ########## @@ -0,0 +1,648 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <memory> +#include <vector> + +#include "arrow/array.h" +#include "arrow/compute/kernel.h" +#include "arrow/scalar.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging.h" +#include "arrow/util/optional.h" +#include "arrow/util/string_view.h" +#include "arrow/visitor_inline.h" + +namespace arrow { + +using internal::BitmapReader; +using internal::FirstTimeBitmapWriter; +using internal::GenerateBitsUnrolled; + +namespace compute { + +#ifdef ARROW_EXTRA_ERROR_CONTEXT + +#define KERNEL_ABORT_IF_ERROR(ctx, expr) \ Review comment: `KERNEL_RETURN_IF_ERROR`? It doesn't abort... ########## File path: cpp/src/arrow/compute/kernels/codegen_internal.h ########## @@ -0,0 +1,648 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <memory> +#include <vector> + +#include "arrow/array.h" +#include "arrow/compute/kernel.h" +#include "arrow/scalar.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging.h" +#include "arrow/util/optional.h" +#include "arrow/util/string_view.h" +#include "arrow/visitor_inline.h" + +namespace arrow { + +using internal::BitmapReader; +using internal::FirstTimeBitmapWriter; +using internal::GenerateBitsUnrolled; + +namespace compute { + +#ifdef ARROW_EXTRA_ERROR_CONTEXT + +#define KERNEL_ABORT_IF_ERROR(ctx, expr) \ + do { \ + Status _st = (expr); \ + if (ARROW_PREDICT_FALSE(!_st.ok())) { \ + _st.AddContextLine(__FILE__, __LINE__, #expr); \ + ctx->SetStatus(_st); \ + return; \ + } \ + } while (0) + +#else + +#define KERNEL_ABORT_IF_ERROR(ctx, expr) \ + do { \ + Status _st = (expr); \ + if (ARROW_PREDICT_FALSE(!_st.ok())) { \ + ctx->SetStatus(_st); \ + return; \ + } \ + } while (0) + +#endif // ARROW_EXTRA_ERROR_CONTEXT + +// A kernel that exposes Call methods that handles iteration over ArrayData +// inputs itself +// + +constexpr int kValidity = 0; +constexpr int kBinaryOffsets = 1; +constexpr int kPrimitiveData = 1; +constexpr int kBinaryData = 2; + +// ---------------------------------------------------------------------- +// Iteration / value access utilities + +template <typename T, typename R = void> +using enable_if_has_c_type_not_boolean = enable_if_t<has_c_type<T>::value && + !is_boolean_type<T>::value, R>; + +template <typename T, typename Enable = void> +struct CodegenTraits; + +template <typename T> +struct CodegenTraits<T, enable_if_has_c_type<T>> { + using value_type = typename T::c_type; +}; + +template <typename T> +struct CodegenTraits<T, enable_if_base_binary<T>> { + using value_type = util::string_view; +}; + +template <typename Type, typename Enable = void> +struct ArrayIterator; + +template <typename Type> +struct ArrayIterator<Type, enable_if_has_c_type_not_boolean<Type>> { + using T = typename Type::c_type; + const T* values; + ArrayIterator(const ArrayData& data) : values(data.GetValues<T>(1)) {} + T operator()() { return *values++; } +}; + +template <typename Type> +struct ArrayIterator<Type, enable_if_boolean<Type>> { + BitmapReader reader; + ArrayIterator(const ArrayData& data) + : reader(data.buffers[1]->data(), data.offset, data.length) {} + bool operator()() { + bool out = reader.IsSet(); + reader.Next(); + return out; + } +}; + +template <typename Type> +struct ArrayIterator<Type, enable_if_base_binary<Type>> { + int64_t position = 0; + typename TypeTraits<Type>::ArrayType arr; + ArrayIterator(const ArrayData& data) + : arr(data.Copy()) {} Review comment: Why the copy? ########## File path: cpp/src/arrow/compute/kernel.cc ########## @@ -0,0 +1,308 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/kernel.h" + +#include <cstddef> +#include <memory> +#include <sstream> +#include <string> + +#include "arrow/buffer.h" +#include "arrow/compute/exec.h" +#include "arrow/result.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/hashing.h" Review comment: Required? This pulls a lot of stuff. ########## File path: cpp/src/arrow/compute/kernels/codegen_internal.h ########## @@ -0,0 +1,648 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <memory> +#include <vector> + +#include "arrow/array.h" +#include "arrow/compute/kernel.h" +#include "arrow/scalar.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging.h" +#include "arrow/util/optional.h" +#include "arrow/util/string_view.h" +#include "arrow/visitor_inline.h" + +namespace arrow { + +using internal::BitmapReader; +using internal::FirstTimeBitmapWriter; +using internal::GenerateBitsUnrolled; + +namespace compute { + +#ifdef ARROW_EXTRA_ERROR_CONTEXT + +#define KERNEL_ABORT_IF_ERROR(ctx, expr) \ + do { \ + Status _st = (expr); \ + if (ARROW_PREDICT_FALSE(!_st.ok())) { \ + _st.AddContextLine(__FILE__, __LINE__, #expr); \ + ctx->SetStatus(_st); \ + return; \ + } \ + } while (0) + +#else + +#define KERNEL_ABORT_IF_ERROR(ctx, expr) \ + do { \ + Status _st = (expr); \ + if (ARROW_PREDICT_FALSE(!_st.ok())) { \ + ctx->SetStatus(_st); \ + return; \ + } \ + } while (0) + +#endif // ARROW_EXTRA_ERROR_CONTEXT + +// A kernel that exposes Call methods that handles iteration over ArrayData +// inputs itself +// + +constexpr int kValidity = 0; +constexpr int kBinaryOffsets = 1; +constexpr int kPrimitiveData = 1; +constexpr int kBinaryData = 2; + +// ---------------------------------------------------------------------- +// Iteration / value access utilities + +template <typename T, typename R = void> +using enable_if_has_c_type_not_boolean = enable_if_t<has_c_type<T>::value && + !is_boolean_type<T>::value, R>; + +template <typename T, typename Enable = void> +struct CodegenTraits; + +template <typename T> +struct CodegenTraits<T, enable_if_has_c_type<T>> { + using value_type = typename T::c_type; +}; + +template <typename T> +struct CodegenTraits<T, enable_if_base_binary<T>> { + using value_type = util::string_view; +}; + +template <typename Type, typename Enable = void> +struct ArrayIterator; + +template <typename Type> +struct ArrayIterator<Type, enable_if_has_c_type_not_boolean<Type>> { + using T = typename Type::c_type; + const T* values; + ArrayIterator(const ArrayData& data) : values(data.GetValues<T>(1)) {} + T operator()() { return *values++; } +}; + +template <typename Type> +struct ArrayIterator<Type, enable_if_boolean<Type>> { + BitmapReader reader; + ArrayIterator(const ArrayData& data) + : reader(data.buffers[1]->data(), data.offset, data.length) {} + bool operator()() { + bool out = reader.IsSet(); + reader.Next(); + return out; + } +}; + +template <typename Type> +struct ArrayIterator<Type, enable_if_base_binary<Type>> { + int64_t position = 0; + typename TypeTraits<Type>::ArrayType arr; + ArrayIterator(const ArrayData& data) + : arr(data.Copy()) {} + util::string_view operator()() { return arr.GetView(position++); } +}; + +template <typename Type, typename Enable = void> +struct UnboxScalar; + +template <typename Type> +struct UnboxScalar<Type, enable_if_has_c_type<Type>> { + using ScalarType = typename TypeTraits<Type>::ScalarType; + static typename Type::c_type Unbox(const Datum& datum) { + return datum.scalar_as<ScalarType>().value; + } +}; + +template <typename Type> +struct UnboxScalar<Type, enable_if_base_binary<Type>> { + static util::string_view Unbox(const Datum& datum) { + return util::string_view(*datum.scalar_as<BaseBinaryScalar>().value); + } +}; + +template <typename Type, typename Enable = void> +struct GetValueType; + +template <typename Type> +struct GetValueType<Type, enable_if_has_c_type<Type>> { + using T = typename Type::c_type; +}; + +template <typename Type> +struct GetValueType< + Type, enable_if_t<is_base_binary_type<Type>::value || is_decimal_type<Type>::value || + is_fixed_size_binary_type<Type>::value>> { + using T = util::string_view; +}; + +// ---------------------------------------------------------------------- +// Generate an array kernel given template classes + +void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out); + +void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec, + const ExecBatch& batch, Datum* out); + +// ---------------------------------------------------------------------- +// Boolean data utilities + +// ---------------------------------------------------------------------- +// Template kernel exec function generators + +template <typename T> +void Extend(const std::vector<T>& values, std::vector<T>* out) { + for (const auto& t : values) { + out->push_back(t); + } +} + +const std::vector<std::shared_ptr<DataType>>& BaseBinaryTypes(); +const std::vector<std::shared_ptr<DataType>>& SignedIntTypes(); +const std::vector<std::shared_ptr<DataType>>& UnsignedIntTypes(); +const std::vector<std::shared_ptr<DataType>>& IntTypes(); +const std::vector<std::shared_ptr<DataType>>& FloatingPointTypes(); + +// Number types without boolean +const std::vector<std::shared_ptr<DataType>>& NumericTypes(); + +// Temporal types including time and timestamps for each unit +const std::vector<std::shared_ptr<DataType>>& TemporalTypes(); + +// Integer, floating point, base binary, and temporal +const std::vector<std::shared_ptr<DataType>>& PrimitiveTypes(); + +namespace codegen { + +struct SimpleExec { + // Operator must implement + // + // static void Call(KernelContext*, const ArrayData& in, ArrayData* out) + template <typename Operator> + static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + if (batch[0].kind() == Datum::SCALAR) { + ctx->SetStatus(Status::NotImplemented("NYI")); + } else if (batch.length > 0) { + Operator::Call(ctx, *batch[0].array(), out->mutable_array()); + } + } + + // Operator must implement + // + // static void Call(KernelContext*, const ArrayData& arg0, const ArrayData& arg1, + // ArrayData* out) + template <typename Operator> + static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) { + ctx->SetStatus(Status::NotImplemented("NYI")); + } else if (batch.length > 0) { + Operator::Call(ctx, *batch[0].array(), *batch[1].array(), out->mutable_array()); + } + } +}; + +// TODO: Run benchmarks to determine if OutputAdapter is a zero-cost abstraction +struct ScalarPrimitiveExec { + template <typename Op, typename OutType, typename Arg0Type> + static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + using OUT = typename OutType::c_type; + using ARG0 = typename Arg0Type::c_type; + + if (batch[0].kind() == Datum::SCALAR) { + ctx->SetStatus(Status::NotImplemented("NYI")); + } else { + ArrayData* out_arr = out->mutable_array(); + auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData); + auto arg0_data = batch[0].array()->GetValues<ARG0>(kPrimitiveData); + for (int64_t i = 0; i < batch.length; ++i) { + *out_data++ = Op::template Call<OUT, ARG0>(ctx, *arg0_data++); + } + } + } + + template <typename Op, typename OutType, typename Arg0Type, typename Arg1Type> + static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + using OUT = typename OutType::c_type; + using ARG0 = typename Arg0Type::c_type; + using ARG1 = typename Arg1Type::c_type; + + if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) { + ctx->SetStatus(Status::NotImplemented("NYI")); + } else { + ArrayData* out_arr = out->mutable_array(); + auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData); + auto arg0_data = batch[0].array()->GetValues<ARG0>(kPrimitiveData); + auto arg1_data = batch[1].array()->GetValues<ARG1>(kPrimitiveData); + for (int64_t i = 0; i < batch.length; ++i) { + *out_data++ = Op::template Call<OUT, ARG0, ARG1>(ctx, *arg0_data++, *arg1_data++); + } + } + } +}; + +template <typename Type, typename Enable = void> +struct OutputAdapter; + +template <typename Type> +struct OutputAdapter<Type, enable_if_boolean<Type>> { + template <typename Generator> + static void Write(KernelContext*, Datum* out, Generator&& generator) { + ArrayData* out_arr = out->mutable_array(); + auto out_bitmap = out_arr->buffers[1]->mutable_data(); + GenerateBitsUnrolled(out_bitmap, out_arr->offset, out_arr->length, + std::forward<Generator>(generator)); + } +}; + +template <typename Type> +struct OutputAdapter<Type, enable_if_has_c_type_not_boolean<Type>> { + template <typename Generator> + static void Write(KernelContext*, Datum* out, Generator&& generator) { + ArrayData* out_arr = out->mutable_array(); + auto out_data = out_arr->GetMutableValues<typename Type::c_type>(kPrimitiveData); + // TODO: Is this as fast as a more explicitly inlined function? + for (int64_t i = 0 ; i < out_arr->length; ++i) { + *out_data++ = generator(); + } + } +}; + +template <typename Type> +struct OutputAdapter<Type, enable_if_base_binary<Type>> { + template <typename Generator> + static void Write(KernelContext* ctx, Datum* out, Generator&& generator) { + ctx->SetStatus(Status::NotImplemented("NYI")); + } +}; + +template <typename OutType, typename Arg0Type, typename Op> +struct ScalarUnary { + using OutScalar = typename TypeTraits<OutType>::ScalarType; + + using OUT = typename CodegenTraits<OutType>::value_type; + using ARG0 = typename CodegenTraits<Arg0Type>::value_type; + + static void Array(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + ArrayIterator<Arg0Type> arg0(*batch[0].array()); + OutputAdapter<OutType>::Write(ctx, out, [&]() -> OUT { + return Op::template Call<OUT, ARG0>(ctx, arg0()); + }); + } + + static void Scalar(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + if (batch[0].scalar()->is_valid) { + ARG0 arg0 = UnboxScalar<Arg0Type>::Unbox(batch[0]); + out->value = std::make_shared<OutScalar>(Op::template Call<OUT, ARG0>(ctx, arg0), + out->type()); + } else { + out->value = MakeNullScalar(batch[0].type()); + } + } + + static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + if (batch[0].kind() == Datum::ARRAY) { + return Array(ctx, batch, out); + } else { + return Scalar(ctx, batch, out); + } + } +}; + +// Applies a scalar operation with state on the null-null values of a single Review comment: "non-null"? ########## File path: cpp/src/arrow/compute/kernels/CMakeLists.txt ########## @@ -15,37 +15,41 @@ # specific language governing permissions and limitations # under the License. -arrow_install_all_headers("arrow/compute/kernels") - -add_arrow_compute_test(boolean_test) -add_arrow_compute_test(cast_test) -add_arrow_compute_test(hash_test) -add_arrow_compute_test(isin_test) -add_arrow_compute_test(match_test) -add_arrow_compute_test(sort_to_indices_test) -add_arrow_compute_test(nth_to_indices_test) -add_arrow_compute_test(util_internal_test) -add_arrow_compute_test(add_test) +# ---------------------------------------------------------------------- +# Scalar kernels -# Aggregates -add_arrow_compute_test(aggregate_test) +add_arrow_compute_test(scalar_test + SOURCES + scalar_arithmetic_test.cc + scalar_boolean_test.cc + scalar_cast_test.cc + scalar_compare_test.cc + scalar_set_lookup_test.cc) -# Comparison -add_arrow_compute_test(compare_test) +# add_arrow_compute_test(cast_test) -# Selection -add_arrow_compute_test(take_test) -add_arrow_compute_test(filter_test) +add_arrow_benchmark(scalar_compare_benchmark PREFIX "arrow-compute") -add_arrow_benchmark(sort_to_indices_benchmark PREFIX "arrow-compute") -add_arrow_benchmark(nth_to_indices_benchmark PREFIX "arrow-compute") +# ---------------------------------------------------------------------- +# Vector kernels -# Aggregates -add_arrow_benchmark(aggregate_benchmark PREFIX "arrow-compute") +add_arrow_compute_test(vector_test + SOURCES + vector_filter_test.cc + vector_hash_test.cc + vector_take_test.cc + vector_sort_test.cc) + +# add_arrow_benchmark(vector_hash_benchmark PREFIX "arrow-compute") +# add_arrow_benchmark(vector_sort_benchmark PREFIX "arrow-compute") +# add_arrow_benchmark(vector_partition_benchmark PREFIX "arrow-compute") +# add_arrow_benchmark(vector_filter_benchmark PREFIX "arrow-compute")a +# add_arrow_benchmark(vector_take_benchmark PREFIX "arrow-compute") Review comment: Should the benchmarks be re-enabled? ########## File path: cpp/src/arrow/compute/kernel.h ########## @@ -15,295 +15,517 @@ // specific language governing permissions and limitations // under the License. +// NOTE: API is EXPERIMENTAL and will change without going through a +// deprecation cycle + #pragma once +#include <cstdint> +#include <functional> #include <memory> +#include <string> #include <utility> #include <vector> -#include "arrow/array.h" -#include "arrow/record_batch.h" -#include "arrow/scalar.h" -#include "arrow/table.h" -#include "arrow/util/macros.h" -#include "arrow/util/memory.h" -#include "arrow/util/variant.h" // IWYU pragma: export +#include "arrow/compute/exec.h" +#include "arrow/datum.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" #include "arrow/util/visibility.h" namespace arrow { + +class Buffer; +struct Datum; + namespace compute { -class FunctionContext; +struct FunctionOptions; -/// \class OpKernel -/// \brief Base class for operator kernels -/// -/// Note to implementors: -/// Operator kernels are intended to be the lowest level of an analytics/compute -/// engine. They will generally not be exposed directly to end-users. Instead -/// they will be wrapped by higher level constructs (e.g. top-level functions -/// or physical execution plan nodes). These higher level constructs are -/// responsible for user input validation and returning the appropriate -/// error Status. -/// -/// Due to this design, implementations of Call (the execution -/// method on subclasses) should use assertions (i.e. DCHECK) to double-check -/// parameter arguments when in higher level components returning an -/// InvalidArgument error might be more appropriate. -/// -class ARROW_EXPORT OpKernel { +/// \brief Base class for opaque kernel-specific state. For example, if there +/// is some kind of initialization required +struct KernelState { + virtual ~KernelState() = default; +}; + +/// \brief Context/state for the execution of a particular kernel +class ARROW_EXPORT KernelContext { public: - virtual ~OpKernel() = default; - /// \brief EXPERIMENTAL The output data type of the kernel - /// \return the output type - virtual std::shared_ptr<DataType> out_type() const = 0; + explicit KernelContext(ExecContext* exec_ctx) : exec_ctx_(exec_ctx) {} + + /// \brief Allocate buffer from the context's memory pool + Result<std::shared_ptr<Buffer>> Allocate(int64_t nbytes); + + /// \brief Allocate buffer for bitmap from the context's memory pool + Result<std::shared_ptr<Buffer>> AllocateBitmap(int64_t num_bits); + + /// \brief Indicate that an error has occurred, to be checked by a exec caller + /// \param[in] status a Status instance + /// + /// \note Will not overwrite a prior set Status, so we will have the first + /// error that occurred until ExecContext::ResetStatus is called + void SetStatus(const Status& status); + + /// \brief Clear any error status + void ResetStatus(); + + /// \brief Return true if an error has occurred + bool HasError() const { return !status_.ok(); } + + /// \brief Return the current status of the context + const Status& status() const { return status_; } + + // For passing kernel state to + void SetState(KernelState* state) { state_ = state; } + + KernelState* state() { return state_; } + + /// \brief Common state related to function execution + ExecContext* exec_context() { return exec_ctx_; } + + MemoryPool* memory_pool() { return exec_ctx_->memory_pool(); } + + private: + ExecContext* exec_ctx_; + Status status_; + KernelState* state_; }; -struct Datum; -static inline bool CollectionEquals(const std::vector<Datum>& left, - const std::vector<Datum>& right); - -// Datums variants may have a length. This special value indicate that the -// current variant does not have a length. -constexpr int64_t kUnknownLength = -1; - -/// \class Datum -/// \brief Variant type for various Arrow C++ data structures -struct ARROW_EXPORT Datum { - enum type { NONE, SCALAR, ARRAY, CHUNKED_ARRAY, RECORD_BATCH, TABLE, COLLECTION }; - - util::variant<decltype(NULLPTR), std::shared_ptr<Scalar>, std::shared_ptr<ArrayData>, - std::shared_ptr<ChunkedArray>, std::shared_ptr<RecordBatch>, - std::shared_ptr<Table>, std::vector<Datum>> - value; - - /// \brief Empty datum, to be populated elsewhere - Datum() : value(NULLPTR) {} - - Datum(const std::shared_ptr<Scalar>& value) // NOLINT implicit conversion - : value(value) {} - Datum(const std::shared_ptr<ArrayData>& value) // NOLINT implicit conversion - : value(value) {} - - Datum(const std::shared_ptr<Array>& value) // NOLINT implicit conversion - : Datum(value ? value->data() : NULLPTR) {} - - Datum(const std::shared_ptr<ChunkedArray>& value) // NOLINT implicit conversion - : value(value) {} - Datum(const std::shared_ptr<RecordBatch>& value) // NOLINT implicit conversion - : value(value) {} - Datum(const std::shared_ptr<Table>& value) // NOLINT implicit conversion - : value(value) {} - Datum(const std::vector<Datum>& value) // NOLINT implicit conversion - : value(value) {} - - // Cast from subtypes of Array to Datum - template <typename T, typename = enable_if_t<std::is_base_of<Array, T>::value>> - Datum(const std::shared_ptr<T>& value) // NOLINT implicit conversion - : Datum(std::shared_ptr<Array>(value)) {} - - // Convenience constructors - explicit Datum(bool value) : value(std::make_shared<BooleanScalar>(value)) {} - explicit Datum(int8_t value) : value(std::make_shared<Int8Scalar>(value)) {} - explicit Datum(uint8_t value) : value(std::make_shared<UInt8Scalar>(value)) {} - explicit Datum(int16_t value) : value(std::make_shared<Int16Scalar>(value)) {} - explicit Datum(uint16_t value) : value(std::make_shared<UInt16Scalar>(value)) {} - explicit Datum(int32_t value) : value(std::make_shared<Int32Scalar>(value)) {} - explicit Datum(uint32_t value) : value(std::make_shared<UInt32Scalar>(value)) {} - explicit Datum(int64_t value) : value(std::make_shared<Int64Scalar>(value)) {} - explicit Datum(uint64_t value) : value(std::make_shared<UInt64Scalar>(value)) {} - explicit Datum(float value) : value(std::make_shared<FloatScalar>(value)) {} - explicit Datum(double value) : value(std::make_shared<DoubleScalar>(value)) {} - - ~Datum() {} - - Datum(const Datum& other) noexcept { this->value = other.value; } - - Datum& operator=(const Datum& other) noexcept { - value = other.value; - return *this; - } +#define ARROW_CTX_RETURN_IF_ERROR(CTX) \ + do { \ + if (ARROW_PREDICT_FALSE((CTX)->HasError())) { \ + Status s = (CTX)->status(); \ + (CTX)->ResetStatus(); \ + return s; \ + } \ + } while (0) + +/// A standard function taking zero or more Array/Scalar values and returning +/// Array/Scalar output. May be used for SCALAR and VECTOR kernel kinds. Should +/// write into pre-allocated memory except in cases when a builder +/// (e.g. StringBuilder) must be employed +using ArrayKernelExec = std::function<void(KernelContext*, const ExecBatch&, Datum*)>; + +/// \brief A container to express what kernel argument input types are accepted +class ARROW_EXPORT InputType { + public: + enum Kind { + /// Accept any value type + ANY_TYPE, - // Define move constructor and move assignment, for better performance - Datum(Datum&& other) noexcept : value(std::move(other.value)) {} + /// A fixed arrow::DataType and will only exact match having this exact + /// type (e.g. same TimestampType unit, same decimal scale and precision, + /// or same nested child types + EXACT_TYPE, - Datum& operator=(Datum&& other) noexcept { - value = std::move(other.value); - return *this; - } + /// Any type having the indicated Type::type id. For example, accept + /// any Type::LIST or any Type::TIMESTAMP + SAME_TYPE_ID, + }; - Datum::type kind() const { - switch (this->value.index()) { - case 0: - return Datum::NONE; - case 1: - return Datum::SCALAR; - case 2: - return Datum::ARRAY; - case 3: - return Datum::CHUNKED_ARRAY; - case 4: - return Datum::RECORD_BATCH; - case 5: - return Datum::TABLE; - case 6: - return Datum::COLLECTION; - default: - return Datum::NONE; - } - } + InputType(ValueDescr::Shape shape = ValueDescr::ANY) // NOLINT implicit construction + : kind_(ANY_TYPE), shape_(shape) {} - std::shared_ptr<ArrayData> array() const { - return util::get<std::shared_ptr<ArrayData>>(this->value); - } + InputType(std::shared_ptr<DataType> type, + ValueDescr::Shape shape = ValueDescr::ANY) // NOLINT implicit construction + : kind_(EXACT_TYPE), shape_(shape), type_(std::move(type)), type_id_(type_->id()) {} - std::shared_ptr<Array> make_array() const { - return MakeArray(util::get<std::shared_ptr<ArrayData>>(this->value)); - } + InputType(const ValueDescr& descr) // NOLINT implicit construction + : InputType(descr.type, descr.shape) {} - std::shared_ptr<ChunkedArray> chunked_array() const { - return util::get<std::shared_ptr<ChunkedArray>>(this->value); - } + InputType(Type::type type_id, + ValueDescr::Shape shape = ValueDescr::ANY) // NOLINT implicit construction + : kind_(SAME_TYPE_ID), shape_(shape), type_id_(type_id) {} - std::shared_ptr<RecordBatch> record_batch() const { - return util::get<std::shared_ptr<RecordBatch>>(this->value); - } + InputType(const InputType& other) { CopyInto(other); } - std::shared_ptr<Table> table() const { - return util::get<std::shared_ptr<Table>>(this->value); + // Convenience ctors + static InputType Array(std::shared_ptr<DataType> type) { + return InputType(std::move(type), ValueDescr::ARRAY); } - const std::vector<Datum> collection() const { - return util::get<std::vector<Datum>>(this->value); + static InputType Scalar(std::shared_ptr<DataType> type) { + return InputType(std::move(type), ValueDescr::SCALAR); } - std::shared_ptr<Scalar> scalar() const { - return util::get<std::shared_ptr<Scalar>>(this->value); - } + static InputType Array(Type::type id) { return InputType(id, ValueDescr::ARRAY); } - bool is_array() const { return this->kind() == Datum::ARRAY; } + static InputType Scalar(Type::type id) { return InputType(id, ValueDescr::SCALAR); } - bool is_arraylike() const { - return this->kind() == Datum::ARRAY || this->kind() == Datum::CHUNKED_ARRAY; - } + void operator=(const InputType& other) { CopyInto(other); } - bool is_scalar() const { return this->kind() == Datum::SCALAR; } + InputType(InputType&& other) { MoveInto(std::forward<InputType>(other)); } - bool is_collection() const { return this->kind() == Datum::COLLECTION; } + void operator=(InputType&& other) { MoveInto(std::forward<InputType>(other)); } - /// \brief The value type of the variant, if any - /// - /// \return nullptr if no type - std::shared_ptr<DataType> type() const { - if (this->kind() == Datum::ARRAY) { - return util::get<std::shared_ptr<ArrayData>>(this->value)->type; - } else if (this->kind() == Datum::CHUNKED_ARRAY) { - return util::get<std::shared_ptr<ChunkedArray>>(this->value)->type(); - } else if (this->kind() == Datum::SCALAR) { - return util::get<std::shared_ptr<Scalar>>(this->value)->type; - } - return NULLPTR; + /// \brief Return true if this type exactly matches another + bool Equals(const InputType& other) const; + + bool operator==(const InputType& other) const { return this->Equals(other); } + + bool operator!=(const InputType& other) const { return !(*this == other); } + + /// \brief Return hash code + uint64_t Hash() const; + + /// \brief Render a human-readable string representation + std::string ToString() const; + + /// \brief Return true if the value matches this argument kind in type + /// and shape + bool Matches(const Datum& value) const; + + /// \brief Return true if the value descriptor matches this argument kind in + /// type and shape + bool Matches(const ValueDescr& value) const; + + /// \brief The type matching rule that this InputType uses + Kind kind() const { return kind_; } + + ValueDescr::Shape shape() const { return shape_; } + + /// \brief For ArgKind::EXACT_TYPE, the exact type that this InputType must + /// match. Otherwise this function should not be used + const std::shared_ptr<DataType>& type() const; + + /// \brief For ArgKind::SAME_TYPE_ID, the Type::type that this InputType must + /// match, Otherwise this function should not be used + Type::type type_id() const; + + private: + void CopyInto(const InputType& other) { + this->kind_ = other.kind_; + this->shape_ = other.shape_; + this->type_ = other.type_; + this->type_id_ = other.type_id_; } - /// \brief The value length of the variant, if any - /// - /// \return kUnknownLength if no type - int64_t length() const { - if (this->kind() == Datum::ARRAY) { - return util::get<std::shared_ptr<ArrayData>>(this->value)->length; - } else if (this->kind() == Datum::CHUNKED_ARRAY) { - return util::get<std::shared_ptr<ChunkedArray>>(this->value)->length(); - } else if (this->kind() == Datum::SCALAR) { - return 1; - } - return kUnknownLength; + void MoveInto(InputType&& other) { + this->kind_ = other.kind_; + this->shape_ = other.shape_; + this->type_ = std::move(other.type_); + this->type_id_ = other.type_id_; } - /// \brief The array chunks of the variant, if any - /// - /// \return empty if not arraylike - ArrayVector chunks() const { - if (!this->is_arraylike()) { - return {}; - } - if (this->is_array()) { - return {this->make_array()}; - } - return this->chunked_array()->chunks(); + Kind kind_; + + ValueDescr::Shape shape_ = ValueDescr::ANY; + + // For EXACT_TYPE ArgKind + std::shared_ptr<DataType> type_; + + // For SAME_TYPE_ID ArgKind + Type::type type_id_ = Type::NA; +}; + +/// \brief Container to capture both exact and input-dependent output types +/// +/// The value shape returned by Resolve will be determined by broadcasting the +/// shapes of the input arguments, otherwise this is handled by the +/// user-defined resolver function +/// +/// * Any ARRAY shape -> output shape is ARRAY +/// * All SCALAR shapes -> output shape is SCALAR +class ARROW_EXPORT OutputType { + public: + /// \brief An enum indicating whether the value type is an invariant fixed + /// value or one that's computed by a kernel-defined resolver function + enum ResolveKind { FIXED, COMPUTED }; + + /// Type resolution function. Given input types and shapes, return output + /// type and shape. This function SHOULD _not_ be used to check for arity, + /// that SHOULD be performed one or more layers above. May make use of kernel + /// state to know what type to output + using Resolver = + std::function<Result<ValueDescr>(KernelContext*, const std::vector<ValueDescr>&)>; + + OutputType(std::shared_ptr<DataType> type) // NOLINT implicit construction + : kind_(FIXED), type_(std::move(type)) {} + + /// For outputting a particular type and shape + OutputType(ValueDescr descr); // NOLINT implicit construction + + explicit OutputType(Resolver resolver) : kind_(COMPUTED), resolver_(resolver) {} + + OutputType(const OutputType& other) { + this->kind_ = other.kind_; + this->shape_ = other.shape_; + this->type_ = other.type_; + this->resolver_ = other.resolver_; } - bool Equals(const Datum& other) const { - if (this->kind() != other.kind()) return false; - - switch (this->kind()) { - case Datum::NONE: - return true; - case Datum::SCALAR: - return internal::SharedPtrEquals(this->scalar(), other.scalar()); - case Datum::ARRAY: - return internal::SharedPtrEquals(this->make_array(), other.make_array()); - case Datum::CHUNKED_ARRAY: - return internal::SharedPtrEquals(this->chunked_array(), other.chunked_array()); - case Datum::RECORD_BATCH: - return internal::SharedPtrEquals(this->record_batch(), other.record_batch()); - case Datum::TABLE: - return internal::SharedPtrEquals(this->table(), other.table()); - case Datum::COLLECTION: - return CollectionEquals(this->collection(), other.collection()); - default: - return false; - } + OutputType(OutputType&& other) { + this->kind_ = other.kind_; + this->type_ = std::move(other.type_); + this->shape_ = other.shape_; + this->resolver_ = other.resolver_; } + + /// \brief Return the shape and type of the expected output value of the + /// kernel given the value descriptors (shapes and types). The resolver may + /// make use of state information kept in the KernelContext + Result<ValueDescr> Resolve(KernelContext* ctx, + const std::vector<ValueDescr>& args) const; + + /// \brief The value type for the FIXED kind rule + const std::shared_ptr<DataType>& type() const; + + /// \brief For use with COMPUTED resolution strategy, the output type depends + /// on the input type. It may be more convenient to invoke this with + /// OutputType::Resolve returned from this method + const Resolver& resolver() const; + + /// \brief Render a human-readable string representation + std::string ToString() const; + + /// \brief Return the kind of type resolution of this output type, whether + /// fixed/invariant or computed by a "user"-defined resolver + ResolveKind kind() const { return kind_; } + + /// \brief If the shape is ANY, then Resolve will compute the shape based on + /// the input arguments + ValueDescr::Shape shape() const { return shape_; } + + private: + ResolveKind kind_; + + // For FIXED resolution + std::shared_ptr<DataType> type_; + + ValueDescr::Shape shape_ = ValueDescr::ANY; + + // For COMPUTED resolution + Resolver resolver_; }; -/// \class UnaryKernel -/// \brief An array-valued function of a single input argument. +/// \brief Holds the input types and output type of the kernel /// -/// Note to implementors: Try to avoid making kernels that allocate memory if -/// the output size is a deterministic function of the Input Datum's metadata. -/// Instead separate the logic of the kernel and allocations necessary into -/// two different kernels. Some reusable kernels that allocate buffers -/// and delegate computation to another kernel are available in util-internal.h. -class ARROW_EXPORT UnaryKernel : public OpKernel { +/// Varargs functions should pass a single input type to be used to validate +/// the the input types of a function invocation +class ARROW_EXPORT KernelSignature { public: - /// \brief Executes the kernel. - /// - /// \param[in] ctx The function context for the kernel - /// \param[in] input The kernel input data - /// \param[out] out The output of the function. Each implementation of this - /// function might assume different things about the existing contents of out - /// (e.g. which buffers are preallocated). In the future it is expected that - /// there will be a more generic mechanism for understanding the necessary - /// contracts. - virtual Status Call(FunctionContext* ctx, const Datum& input, Datum* out) = 0; + KernelSignature(std::vector<InputType> in_types, OutputType out_type, + bool is_varargs = false); + + /// \brief Convenience ctor since make_shared can be awkward + static std::shared_ptr<KernelSignature> Make(std::vector<InputType> in_types, + OutputType out_type, + bool is_varargs = false); + + /// \brief Return true if the signature if compatible with the list of input + /// value descriptors + bool MatchesInputs(const std::vector<ValueDescr>& descriptors) const; + + /// \brief Returns true if the input types of each signature are + /// equal. Well-formed functions should have a deterministic output type + /// given input types, but currently it is the responsibility of the + /// developer to ensure this + bool Equals(const KernelSignature& other) const; + + bool operator==(const KernelSignature& other) const { return this->Equals(other); } + + bool operator!=(const KernelSignature& other) const { return !(*this == other); } + + /// \brief Compute a hash code for the signature + int64_t Hash() const; + + const std::vector<InputType>& in_types() const { return in_types_; } + + const OutputType& out_type() const { return out_type_; } + + /// \brief Render a human-readable string representation + std::string ToString() const; + + bool is_varargs() const { return is_varargs_; } + + private: + std::vector<InputType> in_types_; + OutputType out_type_; + bool is_varargs_; + + // For caching the hash code after it's computed the first time + mutable int64_t hash_code_; }; -/// \class BinaryKernel -/// \brief An array-valued function of a two input arguments -class ARROW_EXPORT BinaryKernel : public OpKernel { - public: - virtual Status Call(FunctionContext* ctx, const Datum& left, const Datum& right, - Datum* out) = 0; +struct SimdLevel { + enum type { NONE, SSE4_2, AVX, AVX2, AVX512, NEON }; }; -// TODO doxygen 1.8.16 does not like the following code -///@cond INTERNAL +struct NullHandling { + enum type { + /// Compute the output validity bitmap by intersecting the validity bitmaps + /// of the arguments. Kernel does not do anything with the bitmap + INTERSECTION, -static inline bool CollectionEquals(const std::vector<Datum>& left, - const std::vector<Datum>& right) { - if (left.size() != right.size()) { - return false; - } + /// Kernel expects a pre-allocated buffer to write the result bitmap into + COMPUTED_PREALLOCATE, - for (size_t i = 0; i < left.size(); i++) { - if (!left[i].Equals(right[i])) { - return false; - } - } - return true; -} + /// Kernel allocates and populates the validity bitmap of the output + COMPUTED_NO_PREALLOCATE, + + /// Output is never null + OUTPUT_NOT_NULL + }; +}; + +struct MemAllocation { + enum type { + // For data types that support pre-allocation (fixed-type), the kernel + // expects to be provided pre-allocated memory to write + // into. Non-fixed-width must always allocate their own memory but perhaps + // not their validity bitmaps. The allocation made for the same length as + // the execution batch, so vector kernels yielding differently sized output + // should not use this + PREALLOCATE, + + // The kernel does its own memory allocation + NO_PREALLOCATE + }; +}; + +struct Kernel; + +struct KernelInitArgs { + const Kernel* kernel; + const std::vector<ValueDescr>& inputs; + const FunctionOptions* options; +}; + +// Kernel initializer (context, argument descriptors, options) +using KernelInit = + std::function<std::unique_ptr<KernelState>(KernelContext*, const KernelInitArgs&)>; + +/// \brief Base type for kernels. Contains the function signature and +/// optionally the state initialization function, along with some common +/// attributes +struct Kernel { + Kernel() {} + + Kernel(std::shared_ptr<KernelSignature> sig, KernelInit init) + : signature(std::move(sig)), init(init) {} + + Kernel(std::vector<InputType> in_types, OutputType out_type, KernelInit init) + : Kernel(KernelSignature::Make(std::move(in_types), out_type), init) {} + + std::shared_ptr<KernelSignature> signature; + + /// \brief Create a new KernelState for invocations of this kernel, e.g. to + /// set up any options or state relevant for execution. May be nullptr + KernelInit init; -///@endcond + // Does execution benefit from parallelization (splitting large chunks into + // smaller chunks and using multiple threads). Some vector kernels may + // require single-threaded execution. + bool parallelizable = true; + + SimdLevel::type simd_level = SimdLevel::NONE; Review comment: Is this actually used? ########## File path: cpp/src/arrow/compute/kernels/codegen_internal.cc ########## @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/kernels/codegen_internal.h" + +#include <cstdint> +#include <memory> +#include <mutex> +#include <vector> + +#include "arrow/type_fwd.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace compute { + +void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + ctx->SetStatus(Status::NotImplemented("This kernel is malformed")); +} + +void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec, + const ExecBatch& batch, Datum* out) { + ExecBatch flipped_batch = batch; + Datum tmp = flipped_batch.values[0]; + flipped_batch.values[0] = flipped_batch.values[1]; + flipped_batch.values[1] = tmp; Review comment: `std::swap(flipped_batch.values[0], flipped_batch.values[1])`? ########## File path: cpp/src/arrow/compute/kernels/codegen_internal.h ########## @@ -0,0 +1,648 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <memory> +#include <vector> + +#include "arrow/array.h" +#include "arrow/compute/kernel.h" +#include "arrow/scalar.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging.h" +#include "arrow/util/optional.h" +#include "arrow/util/string_view.h" +#include "arrow/visitor_inline.h" + +namespace arrow { + +using internal::BitmapReader; +using internal::FirstTimeBitmapWriter; +using internal::GenerateBitsUnrolled; + +namespace compute { + +#ifdef ARROW_EXTRA_ERROR_CONTEXT + +#define KERNEL_ABORT_IF_ERROR(ctx, expr) \ + do { \ + Status _st = (expr); \ + if (ARROW_PREDICT_FALSE(!_st.ok())) { \ + _st.AddContextLine(__FILE__, __LINE__, #expr); \ + ctx->SetStatus(_st); \ + return; \ + } \ + } while (0) + +#else + +#define KERNEL_ABORT_IF_ERROR(ctx, expr) \ + do { \ + Status _st = (expr); \ + if (ARROW_PREDICT_FALSE(!_st.ok())) { \ + ctx->SetStatus(_st); \ + return; \ + } \ + } while (0) + +#endif // ARROW_EXTRA_ERROR_CONTEXT + +// A kernel that exposes Call methods that handles iteration over ArrayData +// inputs itself +// + +constexpr int kValidity = 0; +constexpr int kBinaryOffsets = 1; +constexpr int kPrimitiveData = 1; +constexpr int kBinaryData = 2; + +// ---------------------------------------------------------------------- +// Iteration / value access utilities + +template <typename T, typename R = void> +using enable_if_has_c_type_not_boolean = enable_if_t<has_c_type<T>::value && + !is_boolean_type<T>::value, R>; + +template <typename T, typename Enable = void> +struct CodegenTraits; + +template <typename T> +struct CodegenTraits<T, enable_if_has_c_type<T>> { + using value_type = typename T::c_type; +}; + +template <typename T> +struct CodegenTraits<T, enable_if_base_binary<T>> { + using value_type = util::string_view; +}; + +template <typename Type, typename Enable = void> +struct ArrayIterator; + +template <typename Type> +struct ArrayIterator<Type, enable_if_has_c_type_not_boolean<Type>> { + using T = typename Type::c_type; + const T* values; + ArrayIterator(const ArrayData& data) : values(data.GetValues<T>(1)) {} + T operator()() { return *values++; } +}; + +template <typename Type> +struct ArrayIterator<Type, enable_if_boolean<Type>> { + BitmapReader reader; + ArrayIterator(const ArrayData& data) + : reader(data.buffers[1]->data(), data.offset, data.length) {} + bool operator()() { + bool out = reader.IsSet(); + reader.Next(); + return out; + } +}; + +template <typename Type> +struct ArrayIterator<Type, enable_if_base_binary<Type>> { + int64_t position = 0; + typename TypeTraits<Type>::ArrayType arr; + ArrayIterator(const ArrayData& data) + : arr(data.Copy()) {} + util::string_view operator()() { return arr.GetView(position++); } +}; + +template <typename Type, typename Enable = void> +struct UnboxScalar; + +template <typename Type> +struct UnboxScalar<Type, enable_if_has_c_type<Type>> { + using ScalarType = typename TypeTraits<Type>::ScalarType; + static typename Type::c_type Unbox(const Datum& datum) { + return datum.scalar_as<ScalarType>().value; + } +}; + +template <typename Type> +struct UnboxScalar<Type, enable_if_base_binary<Type>> { + static util::string_view Unbox(const Datum& datum) { + return util::string_view(*datum.scalar_as<BaseBinaryScalar>().value); + } +}; + +template <typename Type, typename Enable = void> +struct GetValueType; + +template <typename Type> +struct GetValueType<Type, enable_if_has_c_type<Type>> { + using T = typename Type::c_type; Review comment: Is it different from `CodegenTraits::value_type`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org