westonpace commented on a change in pull request #12590: URL: https://github.com/apache/arrow/pull/12590#discussion_r831714645
########## File path: cpp/examples/arrow/aggregate_example.cc ########## @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This example showcases various ways to work with Datasets. It's +// intended to be paired with the documentation. Review comment: ```suggestion ``` ########## File path: cpp/src/arrow/python/udf.h ########## @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/python/platform.h" + +#include <cstdint> +#include <memory> + +#include "arrow/compute/api_scalar.h" +#include "arrow/compute/cast.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/function.h" +#include "arrow/compute/registry.h" +#include "arrow/datum.h" +#include "arrow/util/cpu_info.h" +#include "arrow/util/logging.h" + +#include "arrow/python/common.h" +#include "arrow/python/pyarrow.h" +#include "arrow/python/visibility.h" + +namespace cp = arrow::compute; + +namespace arrow { + +namespace py { + +#define DECLARE_CALL_UDF(TYPE_NAME, FUNCTION_SUFFIX, CONVERT_SUFFIX) \ + ARROW_PYTHON_EXPORT Status exec_function_##FUNCTION_SUFFIX(const cp::ExecBatch&, \ + PyObject*, int, Datum*); + +DECLARE_CALL_UDF(Scalar, scalar, scalar) +DECLARE_CALL_UDF(Array, array, make_array) Review comment: This macro isn't saving us much at all. Can we do away with it? ########## File path: cpp/src/arrow/python/udf.h ########## @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/python/platform.h" + +#include <cstdint> +#include <memory> + +#include "arrow/compute/api_scalar.h" +#include "arrow/compute/cast.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/function.h" +#include "arrow/compute/registry.h" +#include "arrow/datum.h" +#include "arrow/util/cpu_info.h" +#include "arrow/util/logging.h" + +#include "arrow/python/common.h" +#include "arrow/python/pyarrow.h" +#include "arrow/python/visibility.h" + +namespace cp = arrow::compute; + +namespace arrow { + +namespace py { + +#define DECLARE_CALL_UDF(TYPE_NAME, FUNCTION_SUFFIX, CONVERT_SUFFIX) \ + ARROW_PYTHON_EXPORT Status exec_function_##FUNCTION_SUFFIX(const cp::ExecBatch&, \ + PyObject*, int, Datum*); + +DECLARE_CALL_UDF(Scalar, scalar, scalar) +DECLARE_CALL_UDF(Array, array, make_array) + +#undef DECLARE_CALL_UDF + +class ARROW_PYTHON_EXPORT UdfBuilder { Review comment: I think I'd rather see an options object (`UdfOptions`) and a function `Status RegisterScalarUdf(PyObject* function, const UdfOptions& udf_options);` The options object could then be a plain struct with public fields. ########## File path: cpp/src/arrow/python/udf.cc ########## @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/python/udf.h" + +#include <cstddef> +#include <memory> +#include <sstream> + +#include "arrow/compute/function.h" +#include "arrow/python/common.h" + +namespace cp = arrow::compute; + +namespace arrow { + +namespace py { + +#define DEFINE_CALL_UDF(TYPE_NAME, FUNCTION_SUFFIX, CONVERT_SUFFIX) \ Review comment: This is clever but we generally try to avoid macros when possible (this is to allow you to write the function once instead of two very similar versions right?) as they make reading the code a bit more difficult. I'm a little on the fence regarding this macro because we only have two invocations. CC @pitrou for a second opinion ########## File path: python/pyarrow/_compute.pyx ########## @@ -2179,3 +2314,205 @@ cdef CExpression _bind(Expression filter, Schema schema) except *: return GetResultValue(filter.unwrap().Bind( deref(pyarrow_unwrap_schema(schema).get()))) + + +cdef CFunctionDoc _make_function_doc(func_doc): + """ + Helper function to generate the FunctionDoc + """ + cdef: + CFunctionDoc f_doc + vector[c_string] c_arg_names + c_bool c_options_required + if isinstance(func_doc, dict): + if func_doc["summary"] and isinstance(func_doc["summary"], str): + f_doc.summary = func_doc["summary"].encode() + else: + raise ValueError("key `summary` cannot be None") + + if func_doc["description"] and isinstance(func_doc["description"], str): + f_doc.description = func_doc["description"].encode() + else: + raise ValueError("key `description` cannot be None") + + if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list): + for arg_name in func_doc["arg_names"]: + if isinstance(arg_name, str): + c_arg_names.push_back(arg_name.encode()) + else: + raise ValueError( + "key `arg_names` must be a list of strings") + f_doc.arg_names = c_arg_names + else: + raise ValueError("key `arg_names` cannot be None") + + if func_doc["options_class"] and isinstance(func_doc["options_class"], str): + f_doc.options_class = func_doc["options_class"].encode() + else: + raise ValueError("key `options_class` cannot be None") + + if isinstance(func_doc["options_required"], bool): + c_options_required = func_doc["options_required"] + f_doc.options_required = c_options_required + else: + raise ValueError("key `options_required` must be bool") + + return f_doc + else: + raise TypeError(f"func_doc must be a dictionary") + + +cdef class UDFError(Exception): + cdef dict __dict__ + + def __init__(self, message='', extra_info=b''): + super().__init__(message) + self.extra_info = tobytes(extra_info) + + cdef CStatus to_status(self): + message = tobytes("UDF error: {}".format(str(self))) + return CStatus_UnknownError(message) + + +cdef class UDFRegistrationError(UDFError): + + def __init__(self, message='', extra_info=b''): + super().__init__(message, extra_info) + + cdef CStatus to_status(self): + message = tobytes("UDF Registration error: {}".format(str(self))) + return CStatus_UnknownError(message) + + +def register_function(func_name, arity, function_doc, in_types, + out_type, callback, mem_allocation="no_preallocate", + null_handling="computed_no_preallocate"): + """ + Register a user-defined-function (function) + + Parameters + ---------- + + func_name: str + function name + arity: Arity + arity of the function + function_doc: dict + a dictionary object with keys + ("summary", + "description", + "arg_names", + "options_class", (not supported yet) + "options_required" (not supported yet) + ) + in_types: List[InputType] + list of InputType objects which defines the input + types for the function + out_type: DataType + output type of the function + callback: callable + user defined function + mem_allocation: str + memory allocation mode + "preallocate" or "no_preallocate" Review comment: What does this mean? ########## File path: python/pyarrow/_compute.pyx ########## @@ -2179,3 +2314,205 @@ cdef CExpression _bind(Expression filter, Schema schema) except *: return GetResultValue(filter.unwrap().Bind( deref(pyarrow_unwrap_schema(schema).get()))) + + +cdef CFunctionDoc _make_function_doc(func_doc): + """ + Helper function to generate the FunctionDoc + """ + cdef: + CFunctionDoc f_doc + vector[c_string] c_arg_names + c_bool c_options_required + if isinstance(func_doc, dict): + if func_doc["summary"] and isinstance(func_doc["summary"], str): + f_doc.summary = func_doc["summary"].encode() + else: + raise ValueError("key `summary` cannot be None") + + if func_doc["description"] and isinstance(func_doc["description"], str): + f_doc.description = func_doc["description"].encode() + else: + raise ValueError("key `description` cannot be None") + + if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list): + for arg_name in func_doc["arg_names"]: + if isinstance(arg_name, str): + c_arg_names.push_back(arg_name.encode()) + else: + raise ValueError( + "key `arg_names` must be a list of strings") + f_doc.arg_names = c_arg_names + else: + raise ValueError("key `arg_names` cannot be None") + + if func_doc["options_class"] and isinstance(func_doc["options_class"], str): + f_doc.options_class = func_doc["options_class"].encode() + else: + raise ValueError("key `options_class` cannot be None") + + if isinstance(func_doc["options_required"], bool): + c_options_required = func_doc["options_required"] + f_doc.options_required = c_options_required + else: + raise ValueError("key `options_required` must be bool") + + return f_doc + else: + raise TypeError(f"func_doc must be a dictionary") + + +cdef class UDFError(Exception): + cdef dict __dict__ + + def __init__(self, message='', extra_info=b''): + super().__init__(message) + self.extra_info = tobytes(extra_info) + + cdef CStatus to_status(self): + message = tobytes("UDF error: {}".format(str(self))) + return CStatus_UnknownError(message) + + +cdef class UDFRegistrationError(UDFError): + + def __init__(self, message='', extra_info=b''): + super().__init__(message, extra_info) + + cdef CStatus to_status(self): + message = tobytes("UDF Registration error: {}".format(str(self))) + return CStatus_UnknownError(message) + + +def register_function(func_name, arity, function_doc, in_types, + out_type, callback, mem_allocation="no_preallocate", + null_handling="computed_no_preallocate"): + """ + Register a user-defined-function (function) + + Parameters + ---------- + + func_name: str + function name + arity: Arity + arity of the function + function_doc: dict + a dictionary object with keys + ("summary", + "description", + "arg_names", + "options_class", (not supported yet) + "options_required" (not supported yet) + ) + in_types: List[InputType] + list of InputType objects which defines the input + types for the function + out_type: DataType + output type of the function + callback: callable + user defined function + mem_allocation: str + memory allocation mode + "preallocate" or "no_preallocate" + null_handling: str + null handling mode + one of "intersect", "computed_preallocate", + "computed_no_preallocate", + "output_not_null" Review comment: What does this mean? ########## File path: cpp/examples/arrow/udf_example.cc ########## @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/compute/api.h> +#include <arrow/compute/exec/exec_plan.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/compute/exec/options.h> +#include <arrow/datum.h> +#include <arrow/record_batch.h> +#include <arrow/result.h> +#include <arrow/status.h> +#include <arrow/table.h> +#include <arrow/util/async_generator.h> +#include <arrow/util/future.h> +#include <arrow/util/vector.h> + +#include <cstdlib> +#include <iostream> +#include <memory> + +// Demonstrate registering an user-defined Arrow compute function outside of the Arrow Review comment: ```suggestion // Demonstrate registering a user-defined Arrow compute function outside of the Arrow ``` ########## File path: cpp/examples/arrow/udf_example.cc ########## @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/compute/api.h> +#include <arrow/compute/exec/exec_plan.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/compute/exec/options.h> +#include <arrow/datum.h> +#include <arrow/record_batch.h> +#include <arrow/result.h> +#include <arrow/status.h> +#include <arrow/table.h> +#include <arrow/util/async_generator.h> +#include <arrow/util/future.h> +#include <arrow/util/vector.h> Review comment: Same comment as before. It seems we have too many `#include` ########## File path: cpp/examples/arrow/aggregate_example.cc ########## @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This example showcases various ways to work with Datasets. It's +// intended to be paired with the documentation. + +#include <arrow/api.h> +#include <arrow/compute/api.h> +#include <arrow/compute/exec/exec_plan.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/compute/exec/options.h> +#include <arrow/datum.h> +#include <arrow/record_batch.h> +#include <arrow/result.h> +#include <arrow/status.h> +#include <arrow/table.h> +#include <arrow/util/async_generator.h> +#include <arrow/util/future.h> +#include <arrow/util/vector.h> + +#include <cstdlib> +#include <iostream> +#include <memory> + Review comment: We aren't stellar at IWYU but we should try and maintain it for our example code. I think we also need... ``` #include <type_traits> // for std::enable_if #include <vector> // for std::vector ``` ########## File path: cpp/src/arrow/python/udf.cc ########## @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/python/udf.h" + +#include <cstddef> +#include <memory> +#include <sstream> + +#include "arrow/compute/function.h" +#include "arrow/python/common.h" + +namespace cp = arrow::compute; + +namespace arrow { + +namespace py { + +#define DEFINE_CALL_UDF(TYPE_NAME, FUNCTION_SUFFIX, CONVERT_SUFFIX) \ + Status exec_function_##FUNCTION_SUFFIX(const cp::ExecBatch& batch, PyObject* function, \ + int num_args, Datum* out) { \ + std::shared_ptr<TYPE_NAME> c_res_data; \ + PyObject* arg_tuple = PyTuple_New(num_args); \ + for (int arg_id = 0; arg_id < num_args; arg_id++) { \ + if (!batch[arg_id].is_##FUNCTION_SUFFIX()) { \ + return Status::Invalid("Input type and data type doesn't match"); \ + } \ + auto c_data = batch[arg_id].CONVERT_SUFFIX(); \ + PyObject* data = wrap_##FUNCTION_SUFFIX(c_data); \ + PyTuple_SetItem(arg_tuple, arg_id, data); \ + } \ + PyObject* result = PyObject_CallObject(function, arg_tuple); \ + if (result == NULL) { \ + return Status::ExecutionError("Error occured in computation"); \ + } \ + auto res = unwrap_##FUNCTION_SUFFIX(result); \ + if (!res.status().ok()) { \ + return res.status(); \ + } \ + c_res_data = res.ValueOrDie(); \ + auto datum = new Datum(c_res_data); \ + *out = *datum; \ + return Status::OK(); \ + } + +DEFINE_CALL_UDF(Scalar, scalar, scalar) +DEFINE_CALL_UDF(Array, array, make_array) + +#undef DEFINE_CALL_UDF + +Status VerifyArityAndInput(cp::Arity arity, const cp::ExecBatch& batch) { + bool match = (uint64_t)arity.num_args == batch.values.size(); + if (!match) { + return Status::Invalid( + "Function Arity and Input data shape doesn't match, expceted {}"); Review comment: ```suggestion "Function Arity and Input data shape doesn't match, expected {}"); ``` ########## File path: cpp/examples/arrow/udf_example.cc ########## @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/compute/api.h> +#include <arrow/compute/exec/exec_plan.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/compute/exec/options.h> +#include <arrow/datum.h> +#include <arrow/record_batch.h> +#include <arrow/result.h> +#include <arrow/status.h> +#include <arrow/table.h> +#include <arrow/util/async_generator.h> +#include <arrow/util/future.h> +#include <arrow/util/vector.h> + +#include <cstdlib> +#include <iostream> +#include <memory> + +// Demonstrate registering an user-defined Arrow compute function outside of the Arrow +// source tree + +namespace cp = ::arrow::compute; + +#define ABORT_ON_FAILURE(expr) \ + do { \ + arrow::Status status_ = (expr); \ + if (!status_.ok()) { \ + std::cerr << status_.message() << std::endl; \ + abort(); \ + } \ + } while (0); + +template <typename TYPE, + typename = typename std::enable_if<arrow::is_number_type<TYPE>::value | + arrow::is_boolean_type<TYPE>::value | + arrow::is_temporal_type<TYPE>::value>::type> +arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample( + const std::vector<typename TYPE::c_type>& values) { + using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType; + using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType; + ARROW_BUILDER_TYPE builder; + ARROW_RETURN_NOT_OK(builder.Reserve(values.size())); + std::shared_ptr<ARROW_ARRAY_TYPE> array; + ARROW_RETURN_NOT_OK(builder.AppendValues(values)); + ARROW_RETURN_NOT_OK(builder.Finish(&array)); + return array; +} + +arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch( + const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) { + std::shared_ptr<arrow::RecordBatch> record_batch; + ARROW_ASSIGN_OR_RAISE(auto struct_result, + arrow::StructArray::Make(array_vector, field_vector)); + return record_batch->FromStructArray(struct_result); +} + +arrow::Result<std::shared_ptr<arrow::Table>> GetTable() { + std::shared_ptr<arrow::Table> table; + + auto field_vector = { + arrow::field("a", arrow::int64()), arrow::field("x", arrow::int64()), + arrow::field("y", arrow::int64()), arrow::field("z", arrow::int64()), + arrow::field("b", arrow::boolean())}; + + ARROW_ASSIGN_OR_RAISE(auto int_array, + GetArrayDataSample<arrow::Int64Type>({1, 2, 3, 4, 5, 6})); + ARROW_ASSIGN_OR_RAISE(auto x, + GetArrayDataSample<arrow::Int64Type>({21, 22, 23, 24, 25, 26})); + ARROW_ASSIGN_OR_RAISE(auto y, + GetArrayDataSample<arrow::Int64Type>({31, 32, 33, 34, 35, 36})); + ARROW_ASSIGN_OR_RAISE(auto z, + GetArrayDataSample<arrow::Int64Type>({41, 42, 43, 44, 45, 46})); + ARROW_ASSIGN_OR_RAISE(auto bool_array, GetArrayDataSample<arrow::BooleanType>( + {false, true, false, true, true, false})); + + auto schema = arrow::schema(field_vector); + auto data_vector = {int_array, x, y, z, bool_array}; + + table = arrow::Table::Make(schema, data_vector, 6); + + return table; +} + +class UDFOptionsType : public cp::FunctionOptionsType { + const char* type_name() const override { return "UDFOptionsType"; } + std::string Stringify(const cp::FunctionOptions&) const override { + return "UDFOptionsType"; + } + bool Compare(const cp::FunctionOptions&, const cp::FunctionOptions&) const override { + return true; + } + std::unique_ptr<cp::FunctionOptions> Copy(const cp::FunctionOptions&) const override; +}; + +cp::FunctionOptionsType* GetUDFOptionsType() { + static UDFOptionsType options_type; + return &options_type; +} + +class UDFOptions : public cp::FunctionOptions { + public: + UDFOptions() : cp::FunctionOptions(GetUDFOptionsType()) {} +}; + +std::unique_ptr<cp::FunctionOptions> UDFOptionsType::Copy( + const cp::FunctionOptions&) const { + return std::unique_ptr<cp::FunctionOptions>(new UDFOptions()); +} + +class ExampleNodeOptions : public cp::ExecNodeOptions {}; + +// a basic ExecNode which ignores all input batches +class ExampleNode : public cp::ExecNode { + public: + ExampleNode(ExecNode* input, const ExampleNodeOptions&) + : ExecNode(/*plan=*/input->plan(), /*inputs=*/{input}, + /*input_labels=*/{"ignored"}, + /*output_schema=*/input->output_schema(), /*num_outputs=*/1) {} + + const char* kind_name() const override { return "ExampleNode"; } + + arrow::Status StartProducing() override { + outputs_[0]->InputFinished(this, 0); + return arrow::Status::OK(); + } + + void ResumeProducing(ExecNode* output) override {} + void PauseProducing(ExecNode* output) override {} + + void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); } + void StopProducing() override { inputs_[0]->StopProducing(); } + + void InputReceived(ExecNode* input, cp::ExecBatch batch) override {} + void ErrorReceived(ExecNode* input, arrow::Status error) override {} + void InputFinished(ExecNode* input, int total_batches) override {} + + arrow::Future<> finished() override { return inputs_[0]->finished(); } +}; + +arrow::Result<cp::ExecNode*> ExampleExecNodeFactory(cp::ExecPlan* plan, + std::vector<cp::ExecNode*> inputs, + const cp::ExecNodeOptions& options) { + const auto& example_options = + arrow::internal::checked_cast<const ExampleNodeOptions&>(options); + + return plan->EmplaceNode<ExampleNode>(inputs[0], example_options); +} + +const cp::FunctionDoc func_doc{ + "User-defined-function usage to demonstrate registering an out-of-tree function", + "returns x + y + z", + {"x", "y", "z"}, + "UDFOptions"}; + +arrow::Status Execute() { + const std::string name = "x+x"; + auto func = std::make_shared<cp::ScalarFunction>(name, cp::Arity::Ternary(), &func_doc); + + auto exec_func = [](cp::KernelContext* ctx, const cp::ExecBatch& batch, Review comment: Can you make this an actual function instead of a lambda? ########## File path: cpp/examples/arrow/aggregate_example.cc ########## @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This example showcases various ways to work with Datasets. It's +// intended to be paired with the documentation. + +#include <arrow/api.h> +#include <arrow/compute/api.h> +#include <arrow/compute/exec/exec_plan.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/compute/exec/options.h> +#include <arrow/datum.h> +#include <arrow/record_batch.h> +#include <arrow/result.h> +#include <arrow/status.h> +#include <arrow/table.h> +#include <arrow/util/async_generator.h> +#include <arrow/util/future.h> +#include <arrow/util/vector.h> + +#include <cstdlib> +#include <iostream> +#include <memory> + +namespace cp = arrow::compute; + +#define ABORT_ON_FAILURE(expr) \ + do { \ + arrow::Status status_ = (expr); \ + if (!status_.ok()) { \ + std::cerr << status_.message() << std::endl; \ + abort(); \ + } \ + } while (0); + +template <typename TYPE, + typename = typename std::enable_if<arrow::is_number_type<TYPE>::value | + arrow::is_boolean_type<TYPE>::value | + arrow::is_temporal_type<TYPE>::value>::type> +arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample( + const std::vector<typename TYPE::c_type>& values) { + using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType; + using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType; + ARROW_BUILDER_TYPE builder; + ARROW_RETURN_NOT_OK(builder.Reserve(values.size())); + std::shared_ptr<ARROW_ARRAY_TYPE> array; + ARROW_RETURN_NOT_OK(builder.AppendValues(values)); + ARROW_RETURN_NOT_OK(builder.Finish(&array)); + return array; +} + +arrow::Result<std::shared_ptr<arrow::Table>> GetTable() { + std::shared_ptr<arrow::Table> table; + + auto field_vector = {arrow::field("a", arrow::int64()), + arrow::field("b", arrow::boolean()), + arrow::field("c", arrow::int64())}; + ARROW_ASSIGN_OR_RAISE(auto int_array, + GetArrayDataSample<arrow::Int64Type>({0, 1, 2, 0, 4, 1, 0, 5})); + ARROW_ASSIGN_OR_RAISE(auto bool_array, + GetArrayDataSample<arrow::BooleanType>( + {false, true, false, true, true, false, true, false})); + ARROW_ASSIGN_OR_RAISE(auto data_array, GetArrayDataSample<arrow::Int64Type>( + {10, 11, 12, 10, 11, 11, 10, 15})); + + auto schema = arrow::schema(field_vector); + auto data_vector = {int_array, bool_array, data_array}; + + table = arrow::Table::Make(schema, data_vector, 8); + + return table; +} + +arrow::Status DoAggregate() { + auto maybe_plan = cp::ExecPlan::Make(); + ABORT_ON_FAILURE(maybe_plan.status()); + auto plan = maybe_plan.ValueOrDie(); + cp::ExecContext exec_context(arrow::default_memory_pool(), + ::arrow::internal::GetCpuThreadPool()); + + ARROW_ASSIGN_OR_RAISE(auto table, GetTable()); + + std::cout << "Source Table" << std::endl; + + std::cout << table->ToString() << std::endl; + + std::shared_ptr<arrow::Table> out; + cp::CountOptions options(cp::CountOptions::ONLY_VALID); + auto aggregate_options = cp::AggregateNodeOptions{/*aggregates=*/{{"sum", &options}}, + /*targets=*/{"c"}, + /*names=*/{"count(c)"}, + /*keys=*/{}}; + auto schema = arrow::schema({arrow::field("count(c)", arrow::int64())}); + + ABORT_ON_FAILURE(cp::Declaration::Sequence( + { + {"table_source", cp::TableSourceNodeOptions{table, 2}}, + {"aggregate", aggregate_options}, + {"table_sink", cp::TableSinkNodeOptions{&out, schema}}, + }) + .AddToPlan(plan.get()) + .status()); + + ARROW_RETURN_NOT_OK(plan->StartProducing()); + + std::cout << "Output Table Data : " << std::endl; + std::cout << out->ToString() << std::endl; + + auto future = plan->finished(); + + return future.status(); +} + +int main(int argc, char** argv) { Review comment: How is this example different than `SourceScalarAggregateSinkExample`? I don't know that it needs to be but I'm wondering the motivation here. ########## File path: python/pyarrow/_compute.pyx ########## @@ -199,6 +202,135 @@ FunctionDoc = namedtuple( "options_required")) +cdef wrap_arity(const CArity c_arity): + """ + Wrap a C++ Arity in an Arity object + """ + cdef Arity arity = Arity.__new__(Arity) + arity.init(c_arity) + return arity + + +cdef wrap_input_type(const CInputType c_input_type): + """ + Wrap a C++ InputType in an InputType object + """ + cdef InputType input_type = InputType.__new__(InputType) + input_type.init(c_input_type) + return input_type + + +cdef class InputType(_Weakrefable): + """ + An interface for defining input-types for streaming execution engine + applications. + """ + + def __init__(self): + raise TypeError("Cannot use constructor to initialize InputType") + + cdef void init(self, const CInputType &input_type): + self.input_type = input_type + + @staticmethod + def scalar(data_type): + """ + create a scalar input type of the given data type + + Parameter + --------- + data_type: DataType + + Examples + -------- + + >>> import pyarrow as pa + >>> from pyarrow.compute import InputType + >>> in_type = InputType.scalar(pa.int32()) + <pyarrow._compute.InputType object at 0x1029fdcb0> + """ + cdef: + shared_ptr[CDataType] c_data_type + CInputType c_input_type + c_data_type = pyarrow_unwrap_data_type(data_type) + c_input_type = CInputType.Scalar(c_data_type) + return wrap_input_type(c_input_type) + + @staticmethod + def array(data_type): + """ + create an array input type of the given data type + + Parameter + --------- + data_type: DataType + + Examples + -------- + + >>> import pyarrow as pa + >>> from pyarrow.compute import InputType + >>> in_type = InputType.array(pa.int32()) + <pyarrow._compute.InputType object at 0x102ba4850> + """ + cdef: + shared_ptr[CDataType] c_data_type + CInputType c_input_type + c_data_type = pyarrow_unwrap_data_type(data_type) + c_input_type = CInputType.Array(c_data_type) + return wrap_input_type(c_input_type) + + +cdef class Arity(_Weakrefable): Review comment: Why can't arity just be an integer? ########## File path: python/examples/udf/udf_example.py ########## @@ -0,0 +1,322 @@ +from typing import List +import pyarrow as pa +from pyarrow import compute as pc +from pyarrow.compute import register_function +from pyarrow.compute import Arity, InputType + + +def get_function_doc(summary: str, desc: str, arg_names: List[str], + options_class: str, options_required: bool = False): + func_doc = {} + func_doc["summary"] = summary + func_doc["description"] = desc + func_doc["arg_names"] = arg_names + func_doc["options_class"] = options_class + func_doc["options_required"] = False + return func_doc + + +""" +Array Usage +""" + +# Example 1: Array Unary +print("=" * 80) +print("Example 1: Array Unary") +print("=" * 80) + + +def add_constant(array): + return pc.call_function("add", [array, 1]) + + +func_name_1 = "py_add_func" +arity_1 = Arity.unary() +in_types_1 = [InputType.array(pa.int64())] +out_type_1 = pa.int64() +doc_1 = get_function_doc("add function", "test add function", + ["value"], "None") Review comment: ```suggestion doc_1 = get_function_doc("add one function", "adds one to values", ["value"], "None") ``` Let's pretend this is a real case instead of "test add function" to make it more clear the intent of the field. ########## File path: cpp/examples/arrow/aggregate_example.cc ########## @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one Review comment: Since this PR is for scalar UDF could we add this example in a separate PR? ########## File path: python/examples/udf/udf_example.py ########## @@ -0,0 +1,322 @@ +from typing import List +import pyarrow as pa +from pyarrow import compute as pc +from pyarrow.compute import register_function +from pyarrow.compute import Arity, InputType + + +def get_function_doc(summary: str, desc: str, arg_names: List[str], + options_class: str, options_required: bool = False): + func_doc = {} + func_doc["summary"] = summary + func_doc["description"] = desc + func_doc["arg_names"] = arg_names + func_doc["options_class"] = options_class + func_doc["options_required"] = False + return func_doc + + +""" +Array Usage +""" + +# Example 1: Array Unary +print("=" * 80) +print("Example 1: Array Unary") +print("=" * 80) + + +def add_constant(array): + return pc.call_function("add", [array, 1]) Review comment: ```suggestion def add_one(array): return pc.call_function("add", [array, 1]) ``` ########## File path: cpp/examples/arrow/udf_example.cc ########## @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/compute/api.h> +#include <arrow/compute/exec/exec_plan.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/compute/exec/options.h> +#include <arrow/datum.h> +#include <arrow/record_batch.h> +#include <arrow/result.h> +#include <arrow/status.h> +#include <arrow/table.h> +#include <arrow/util/async_generator.h> +#include <arrow/util/future.h> +#include <arrow/util/vector.h> + +#include <cstdlib> +#include <iostream> +#include <memory> + +// Demonstrate registering an user-defined Arrow compute function outside of the Arrow +// source tree + +namespace cp = ::arrow::compute; + +#define ABORT_ON_FAILURE(expr) \ + do { \ + arrow::Status status_ = (expr); \ + if (!status_.ok()) { \ + std::cerr << status_.message() << std::endl; \ + abort(); \ + } \ + } while (0); + +template <typename TYPE, + typename = typename std::enable_if<arrow::is_number_type<TYPE>::value | + arrow::is_boolean_type<TYPE>::value | + arrow::is_temporal_type<TYPE>::value>::type> +arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample( + const std::vector<typename TYPE::c_type>& values) { + using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType; + using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType; + ARROW_BUILDER_TYPE builder; + ARROW_RETURN_NOT_OK(builder.Reserve(values.size())); + std::shared_ptr<ARROW_ARRAY_TYPE> array; + ARROW_RETURN_NOT_OK(builder.AppendValues(values)); + ARROW_RETURN_NOT_OK(builder.Finish(&array)); + return array; +} + +arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch( + const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) { + std::shared_ptr<arrow::RecordBatch> record_batch; + ARROW_ASSIGN_OR_RAISE(auto struct_result, + arrow::StructArray::Make(array_vector, field_vector)); + return record_batch->FromStructArray(struct_result); +} + +arrow::Result<std::shared_ptr<arrow::Table>> GetTable() { + std::shared_ptr<arrow::Table> table; + + auto field_vector = { + arrow::field("a", arrow::int64()), arrow::field("x", arrow::int64()), + arrow::field("y", arrow::int64()), arrow::field("z", arrow::int64()), + arrow::field("b", arrow::boolean())}; + + ARROW_ASSIGN_OR_RAISE(auto int_array, + GetArrayDataSample<arrow::Int64Type>({1, 2, 3, 4, 5, 6})); + ARROW_ASSIGN_OR_RAISE(auto x, + GetArrayDataSample<arrow::Int64Type>({21, 22, 23, 24, 25, 26})); + ARROW_ASSIGN_OR_RAISE(auto y, + GetArrayDataSample<arrow::Int64Type>({31, 32, 33, 34, 35, 36})); + ARROW_ASSIGN_OR_RAISE(auto z, + GetArrayDataSample<arrow::Int64Type>({41, 42, 43, 44, 45, 46})); + ARROW_ASSIGN_OR_RAISE(auto bool_array, GetArrayDataSample<arrow::BooleanType>( + {false, true, false, true, true, false})); + + auto schema = arrow::schema(field_vector); + auto data_vector = {int_array, x, y, z, bool_array}; + + table = arrow::Table::Make(schema, data_vector, 6); + + return table; +} + +class UDFOptionsType : public cp::FunctionOptionsType { + const char* type_name() const override { return "UDFOptionsType"; } + std::string Stringify(const cp::FunctionOptions&) const override { + return "UDFOptionsType"; + } + bool Compare(const cp::FunctionOptions&, const cp::FunctionOptions&) const override { + return true; + } + std::unique_ptr<cp::FunctionOptions> Copy(const cp::FunctionOptions&) const override; +}; + +cp::FunctionOptionsType* GetUDFOptionsType() { + static UDFOptionsType options_type; + return &options_type; +} + +class UDFOptions : public cp::FunctionOptions { + public: + UDFOptions() : cp::FunctionOptions(GetUDFOptionsType()) {} +}; + +std::unique_ptr<cp::FunctionOptions> UDFOptionsType::Copy( + const cp::FunctionOptions&) const { + return std::unique_ptr<cp::FunctionOptions>(new UDFOptions()); +} + +class ExampleNodeOptions : public cp::ExecNodeOptions {}; + +// a basic ExecNode which ignores all input batches +class ExampleNode : public cp::ExecNode { + public: + ExampleNode(ExecNode* input, const ExampleNodeOptions&) + : ExecNode(/*plan=*/input->plan(), /*inputs=*/{input}, + /*input_labels=*/{"ignored"}, + /*output_schema=*/input->output_schema(), /*num_outputs=*/1) {} + + const char* kind_name() const override { return "ExampleNode"; } + + arrow::Status StartProducing() override { + outputs_[0]->InputFinished(this, 0); + return arrow::Status::OK(); + } + + void ResumeProducing(ExecNode* output) override {} + void PauseProducing(ExecNode* output) override {} + + void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); } + void StopProducing() override { inputs_[0]->StopProducing(); } + + void InputReceived(ExecNode* input, cp::ExecBatch batch) override {} + void ErrorReceived(ExecNode* input, arrow::Status error) override {} + void InputFinished(ExecNode* input, int total_batches) override {} + + arrow::Future<> finished() override { return inputs_[0]->finished(); } +}; + +arrow::Result<cp::ExecNode*> ExampleExecNodeFactory(cp::ExecPlan* plan, + std::vector<cp::ExecNode*> inputs, + const cp::ExecNodeOptions& options) { + const auto& example_options = + arrow::internal::checked_cast<const ExampleNodeOptions&>(options); + + return plan->EmplaceNode<ExampleNode>(inputs[0], example_options); +} + +const cp::FunctionDoc func_doc{ + "User-defined-function usage to demonstrate registering an out-of-tree function", + "returns x + y + z", + {"x", "y", "z"}, + "UDFOptions"}; + +arrow::Status Execute() { + const std::string name = "x+x"; Review comment: Why `x+x` and not `x+y+z` Or maybe `add_three` or something so it's obvious the difference between the projected field name (the key in your expression dictionary passed into project) and the function name (what is passed into `call`). ########## File path: python/pyarrow/_compute.pyx ########## @@ -479,6 +611,9 @@ cdef class FunctionRegistry(_Weakrefable): func = GetResultValue(self.registry.GetFunction(c_name)) return wrap_function(func) + def register_function(self, name, arity, input_types, output_type, function_kind): + pass + Review comment: What's this? ########## File path: python/pyarrow/_compute.pyx ########## @@ -2179,3 +2314,205 @@ cdef CExpression _bind(Expression filter, Schema schema) except *: return GetResultValue(filter.unwrap().Bind( deref(pyarrow_unwrap_schema(schema).get()))) + + +cdef CFunctionDoc _make_function_doc(func_doc): + """ + Helper function to generate the FunctionDoc + """ + cdef: + CFunctionDoc f_doc + vector[c_string] c_arg_names + c_bool c_options_required + if isinstance(func_doc, dict): + if func_doc["summary"] and isinstance(func_doc["summary"], str): + f_doc.summary = func_doc["summary"].encode() + else: + raise ValueError("key `summary` cannot be None") + + if func_doc["description"] and isinstance(func_doc["description"], str): + f_doc.description = func_doc["description"].encode() + else: + raise ValueError("key `description` cannot be None") + + if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list): + for arg_name in func_doc["arg_names"]: + if isinstance(arg_name, str): + c_arg_names.push_back(arg_name.encode()) + else: + raise ValueError( + "key `arg_names` must be a list of strings") + f_doc.arg_names = c_arg_names + else: + raise ValueError("key `arg_names` cannot be None") + + if func_doc["options_class"] and isinstance(func_doc["options_class"], str): + f_doc.options_class = func_doc["options_class"].encode() + else: + raise ValueError("key `options_class` cannot be None") + + if isinstance(func_doc["options_required"], bool): + c_options_required = func_doc["options_required"] + f_doc.options_required = c_options_required + else: + raise ValueError("key `options_required` must be bool") + + return f_doc + else: + raise TypeError(f"func_doc must be a dictionary") + + +cdef class UDFError(Exception): + cdef dict __dict__ + + def __init__(self, message='', extra_info=b''): + super().__init__(message) + self.extra_info = tobytes(extra_info) + + cdef CStatus to_status(self): + message = tobytes("UDF error: {}".format(str(self))) + return CStatus_UnknownError(message) + + +cdef class UDFRegistrationError(UDFError): + + def __init__(self, message='', extra_info=b''): + super().__init__(message, extra_info) + + cdef CStatus to_status(self): + message = tobytes("UDF Registration error: {}".format(str(self))) + return CStatus_UnknownError(message) + + +def register_function(func_name, arity, function_doc, in_types, + out_type, callback, mem_allocation="no_preallocate", + null_handling="computed_no_preallocate"): + """ + Register a user-defined-function (function) Review comment: ```suggestion Register a user-defined-function ``` What's `(function)`? ########## File path: python/examples/udf/udf_example.py ########## @@ -0,0 +1,322 @@ +from typing import List +import pyarrow as pa +from pyarrow import compute as pc +from pyarrow.compute import register_function +from pyarrow.compute import Arity, InputType + + +def get_function_doc(summary: str, desc: str, arg_names: List[str], + options_class: str, options_required: bool = False): + func_doc = {} + func_doc["summary"] = summary + func_doc["description"] = desc + func_doc["arg_names"] = arg_names + func_doc["options_class"] = options_class + func_doc["options_required"] = False + return func_doc + + +""" +Array Usage +""" + +# Example 1: Array Unary +print("=" * 80) +print("Example 1: Array Unary") +print("=" * 80) + + +def add_constant(array): + return pc.call_function("add", [array, 1]) + + +func_name_1 = "py_add_func" Review comment: ```suggestion func_name_1 = "py_add_one_func" ``` ########## File path: cpp/examples/arrow/udf_example.cc ########## @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/compute/api.h> +#include <arrow/compute/exec/exec_plan.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/compute/exec/options.h> +#include <arrow/datum.h> +#include <arrow/record_batch.h> +#include <arrow/result.h> +#include <arrow/status.h> +#include <arrow/table.h> +#include <arrow/util/async_generator.h> +#include <arrow/util/future.h> +#include <arrow/util/vector.h> + +#include <cstdlib> +#include <iostream> +#include <memory> + +// Demonstrate registering an user-defined Arrow compute function outside of the Arrow +// source tree + +namespace cp = ::arrow::compute; + +#define ABORT_ON_FAILURE(expr) \ + do { \ + arrow::Status status_ = (expr); \ + if (!status_.ok()) { \ + std::cerr << status_.message() << std::endl; \ + abort(); \ + } \ + } while (0); + +template <typename TYPE, + typename = typename std::enable_if<arrow::is_number_type<TYPE>::value | + arrow::is_boolean_type<TYPE>::value | + arrow::is_temporal_type<TYPE>::value>::type> +arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample( + const std::vector<typename TYPE::c_type>& values) { + using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType; + using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType; + ARROW_BUILDER_TYPE builder; + ARROW_RETURN_NOT_OK(builder.Reserve(values.size())); + std::shared_ptr<ARROW_ARRAY_TYPE> array; + ARROW_RETURN_NOT_OK(builder.AppendValues(values)); + ARROW_RETURN_NOT_OK(builder.Finish(&array)); + return array; +} + +arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch( + const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) { + std::shared_ptr<arrow::RecordBatch> record_batch; + ARROW_ASSIGN_OR_RAISE(auto struct_result, + arrow::StructArray::Make(array_vector, field_vector)); + return record_batch->FromStructArray(struct_result); +} Review comment: Is this used? ########## File path: python/pyarrow/_compute.pyx ########## @@ -2179,3 +2314,205 @@ cdef CExpression _bind(Expression filter, Schema schema) except *: return GetResultValue(filter.unwrap().Bind( deref(pyarrow_unwrap_schema(schema).get()))) + + +cdef CFunctionDoc _make_function_doc(func_doc): + """ + Helper function to generate the FunctionDoc + """ + cdef: + CFunctionDoc f_doc + vector[c_string] c_arg_names + c_bool c_options_required + if isinstance(func_doc, dict): + if func_doc["summary"] and isinstance(func_doc["summary"], str): + f_doc.summary = func_doc["summary"].encode() + else: + raise ValueError("key `summary` cannot be None") + + if func_doc["description"] and isinstance(func_doc["description"], str): + f_doc.description = func_doc["description"].encode() + else: + raise ValueError("key `description` cannot be None") + + if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list): + for arg_name in func_doc["arg_names"]: + if isinstance(arg_name, str): + c_arg_names.push_back(arg_name.encode()) + else: + raise ValueError( + "key `arg_names` must be a list of strings") + f_doc.arg_names = c_arg_names + else: + raise ValueError("key `arg_names` cannot be None") + + if func_doc["options_class"] and isinstance(func_doc["options_class"], str): + f_doc.options_class = func_doc["options_class"].encode() + else: + raise ValueError("key `options_class` cannot be None") + + if isinstance(func_doc["options_required"], bool): + c_options_required = func_doc["options_required"] + f_doc.options_required = c_options_required + else: + raise ValueError("key `options_required` must be bool") + + return f_doc + else: + raise TypeError(f"func_doc must be a dictionary") + + +cdef class UDFError(Exception): + cdef dict __dict__ + + def __init__(self, message='', extra_info=b''): + super().__init__(message) + self.extra_info = tobytes(extra_info) + + cdef CStatus to_status(self): + message = tobytes("UDF error: {}".format(str(self))) + return CStatus_UnknownError(message) + + +cdef class UDFRegistrationError(UDFError): + + def __init__(self, message='', extra_info=b''): + super().__init__(message, extra_info) + + cdef CStatus to_status(self): + message = tobytes("UDF Registration error: {}".format(str(self))) + return CStatus_UnknownError(message) + + +def register_function(func_name, arity, function_doc, in_types, + out_type, callback, mem_allocation="no_preallocate", + null_handling="computed_no_preallocate"): + """ + Register a user-defined-function (function) + + Parameters + ---------- + + func_name: str + function name Review comment: Does this need to be unique? Can I register multiple functions with different arity / in_types but the same name? ########## File path: cpp/examples/arrow/udf_example.cc ########## @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/compute/api.h> +#include <arrow/compute/exec/exec_plan.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/compute/exec/options.h> +#include <arrow/datum.h> +#include <arrow/record_batch.h> +#include <arrow/result.h> +#include <arrow/status.h> +#include <arrow/table.h> +#include <arrow/util/async_generator.h> +#include <arrow/util/future.h> +#include <arrow/util/vector.h> + +#include <cstdlib> +#include <iostream> +#include <memory> + +// Demonstrate registering an user-defined Arrow compute function outside of the Arrow +// source tree + +namespace cp = ::arrow::compute; + +#define ABORT_ON_FAILURE(expr) \ + do { \ + arrow::Status status_ = (expr); \ + if (!status_.ok()) { \ + std::cerr << status_.message() << std::endl; \ + abort(); \ + } \ + } while (0); + +template <typename TYPE, + typename = typename std::enable_if<arrow::is_number_type<TYPE>::value | + arrow::is_boolean_type<TYPE>::value | + arrow::is_temporal_type<TYPE>::value>::type> +arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample( + const std::vector<typename TYPE::c_type>& values) { + using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType; + using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType; + ARROW_BUILDER_TYPE builder; + ARROW_RETURN_NOT_OK(builder.Reserve(values.size())); + std::shared_ptr<ARROW_ARRAY_TYPE> array; + ARROW_RETURN_NOT_OK(builder.AppendValues(values)); + ARROW_RETURN_NOT_OK(builder.Finish(&array)); + return array; +} + +arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch( + const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) { + std::shared_ptr<arrow::RecordBatch> record_batch; + ARROW_ASSIGN_OR_RAISE(auto struct_result, + arrow::StructArray::Make(array_vector, field_vector)); + return record_batch->FromStructArray(struct_result); +} + +arrow::Result<std::shared_ptr<arrow::Table>> GetTable() { + std::shared_ptr<arrow::Table> table; + + auto field_vector = { + arrow::field("a", arrow::int64()), arrow::field("x", arrow::int64()), + arrow::field("y", arrow::int64()), arrow::field("z", arrow::int64()), + arrow::field("b", arrow::boolean())}; + + ARROW_ASSIGN_OR_RAISE(auto int_array, + GetArrayDataSample<arrow::Int64Type>({1, 2, 3, 4, 5, 6})); + ARROW_ASSIGN_OR_RAISE(auto x, + GetArrayDataSample<arrow::Int64Type>({21, 22, 23, 24, 25, 26})); + ARROW_ASSIGN_OR_RAISE(auto y, + GetArrayDataSample<arrow::Int64Type>({31, 32, 33, 34, 35, 36})); + ARROW_ASSIGN_OR_RAISE(auto z, + GetArrayDataSample<arrow::Int64Type>({41, 42, 43, 44, 45, 46})); + ARROW_ASSIGN_OR_RAISE(auto bool_array, GetArrayDataSample<arrow::BooleanType>( + {false, true, false, true, true, false})); + + auto schema = arrow::schema(field_vector); + auto data_vector = {int_array, x, y, z, bool_array}; + + table = arrow::Table::Make(schema, data_vector, 6); + + return table; +} + +class UDFOptionsType : public cp::FunctionOptionsType { + const char* type_name() const override { return "UDFOptionsType"; } + std::string Stringify(const cp::FunctionOptions&) const override { + return "UDFOptionsType"; + } + bool Compare(const cp::FunctionOptions&, const cp::FunctionOptions&) const override { + return true; + } + std::unique_ptr<cp::FunctionOptions> Copy(const cp::FunctionOptions&) const override; Review comment: Where's the implementation? ########## File path: python/examples/udf/udf_example.py ########## @@ -0,0 +1,322 @@ +from typing import List +import pyarrow as pa +from pyarrow import compute as pc +from pyarrow.compute import register_function +from pyarrow.compute import Arity, InputType + + +def get_function_doc(summary: str, desc: str, arg_names: List[str], + options_class: str, options_required: bool = False): + func_doc = {} + func_doc["summary"] = summary + func_doc["description"] = desc + func_doc["arg_names"] = arg_names + func_doc["options_class"] = options_class + func_doc["options_required"] = False + return func_doc + + +""" +Array Usage +""" + +# Example 1: Array Unary +print("=" * 80) +print("Example 1: Array Unary") +print("=" * 80) + + +def add_constant(array): + return pc.call_function("add", [array, 1]) + + +func_name_1 = "py_add_func" +arity_1 = Arity.unary() Review comment: Can we use scopes or something to get rid of all these variable suffixes? E.g. ``` def array_unary_example(): func_name = ... arity = ... ``` ########## File path: python/pyarrow/public-api.pxi ########## @@ -25,11 +25,9 @@ from pyarrow.includes.libarrow cimport (CArray, CDataType, CField, # You cannot assign something to a dereferenced pointer in Cython thus these # methods don't use Status to indicate a successful operation. - Review comment: Let's avoid formatting changes to unrelated files ########## File path: python/pyarrow/tests/test_udf.py ########## @@ -0,0 +1,289 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import List + +import pytest + +import pyarrow as pa +from pyarrow import compute as pc +from pyarrow.compute import register_function +from pyarrow.compute import Arity, InputType + + +def get_function_doc(summary: str, desc: str, arg_names: List[str], + options_class: str, options_required: bool = False): + func_doc = {} + func_doc["summary"] = summary + func_doc["description"] = desc + func_doc["arg_names"] = arg_names + func_doc["options_class"] = options_class + func_doc["options_required"] = False + return func_doc + +# scalar unary function data + + +unary_doc = get_function_doc("add function", + "test add function", + ["scalar1"], + "None") + + +def unary_function(scalar1): Review comment: We should eventually add tests for negative cases as well (missing arguments, invalid values, etc.) ########## File path: python/pyarrow/_compute.pyx ########## @@ -2179,3 +2314,205 @@ cdef CExpression _bind(Expression filter, Schema schema) except *: return GetResultValue(filter.unwrap().Bind( deref(pyarrow_unwrap_schema(schema).get()))) + + +cdef CFunctionDoc _make_function_doc(func_doc): + """ + Helper function to generate the FunctionDoc + """ + cdef: + CFunctionDoc f_doc + vector[c_string] c_arg_names + c_bool c_options_required + if isinstance(func_doc, dict): + if func_doc["summary"] and isinstance(func_doc["summary"], str): + f_doc.summary = func_doc["summary"].encode() + else: + raise ValueError("key `summary` cannot be None") + + if func_doc["description"] and isinstance(func_doc["description"], str): + f_doc.description = func_doc["description"].encode() + else: + raise ValueError("key `description` cannot be None") + + if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list): + for arg_name in func_doc["arg_names"]: + if isinstance(arg_name, str): + c_arg_names.push_back(arg_name.encode()) + else: + raise ValueError( + "key `arg_names` must be a list of strings") + f_doc.arg_names = c_arg_names + else: + raise ValueError("key `arg_names` cannot be None") + + if func_doc["options_class"] and isinstance(func_doc["options_class"], str): + f_doc.options_class = func_doc["options_class"].encode() + else: + raise ValueError("key `options_class` cannot be None") + + if isinstance(func_doc["options_required"], bool): + c_options_required = func_doc["options_required"] + f_doc.options_required = c_options_required + else: + raise ValueError("key `options_required` must be bool") + + return f_doc + else: + raise TypeError(f"func_doc must be a dictionary") + + +cdef class UDFError(Exception): + cdef dict __dict__ + + def __init__(self, message='', extra_info=b''): + super().__init__(message) + self.extra_info = tobytes(extra_info) + + cdef CStatus to_status(self): + message = tobytes("UDF error: {}".format(str(self))) + return CStatus_UnknownError(message) + + +cdef class UDFRegistrationError(UDFError): + + def __init__(self, message='', extra_info=b''): + super().__init__(message, extra_info) + + cdef CStatus to_status(self): + message = tobytes("UDF Registration error: {}".format(str(self))) + return CStatus_UnknownError(message) + + +def register_function(func_name, arity, function_doc, in_types, + out_type, callback, mem_allocation="no_preallocate", + null_handling="computed_no_preallocate"): + """ + Register a user-defined-function (function) + + Parameters + ---------- + + func_name: str + function name + arity: Arity + arity of the function + function_doc: dict + a dictionary object with keys + ("summary", + "description", + "arg_names", + "options_class", (not supported yet) + "options_required" (not supported yet) + ) + in_types: List[InputType] + list of InputType objects which defines the input + types for the function + out_type: DataType + output type of the function + callback: callable + user defined function Review comment: What does this function get passed in? What does it need to return? We can find this information out from the examples but this seems a little sparse. ########## File path: cpp/examples/arrow/aggregate_example.cc ########## @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This example showcases various ways to work with Datasets. It's +// intended to be paired with the documentation. + +#include <arrow/api.h> +#include <arrow/compute/api.h> +#include <arrow/compute/exec/exec_plan.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/compute/exec/options.h> +#include <arrow/datum.h> +#include <arrow/record_batch.h> +#include <arrow/result.h> +#include <arrow/status.h> +#include <arrow/table.h> +#include <arrow/util/async_generator.h> +#include <arrow/util/future.h> +#include <arrow/util/vector.h> Review comment: Seems odd that I would still need to `#include` this many files. I think it should be sufficient to do: ``` #include <arrow/api.h> #include <arrow/compute/api.h> #include <arrow/compute/exec/exec_plan.h> // ARROW-15263 ``` ########## File path: cpp/examples/arrow/udf_example.cc ########## @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/compute/api.h> +#include <arrow/compute/exec/exec_plan.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/compute/exec/options.h> +#include <arrow/datum.h> +#include <arrow/record_batch.h> +#include <arrow/result.h> +#include <arrow/status.h> +#include <arrow/table.h> +#include <arrow/util/async_generator.h> +#include <arrow/util/future.h> +#include <arrow/util/vector.h> + +#include <cstdlib> +#include <iostream> +#include <memory> + Review comment: ``` #include <vector> // std::vector #include <string> // std::string #include <utility> // std::move ``` ########## File path: python/pyarrow/_compute.pyx ########## @@ -2179,3 +2314,205 @@ cdef CExpression _bind(Expression filter, Schema schema) except *: return GetResultValue(filter.unwrap().Bind( deref(pyarrow_unwrap_schema(schema).get()))) + + +cdef CFunctionDoc _make_function_doc(func_doc): + """ + Helper function to generate the FunctionDoc + """ + cdef: + CFunctionDoc f_doc + vector[c_string] c_arg_names + c_bool c_options_required + if isinstance(func_doc, dict): + if func_doc["summary"] and isinstance(func_doc["summary"], str): + f_doc.summary = func_doc["summary"].encode() + else: + raise ValueError("key `summary` cannot be None") + + if func_doc["description"] and isinstance(func_doc["description"], str): + f_doc.description = func_doc["description"].encode() + else: + raise ValueError("key `description` cannot be None") + + if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list): + for arg_name in func_doc["arg_names"]: + if isinstance(arg_name, str): + c_arg_names.push_back(arg_name.encode()) + else: + raise ValueError( + "key `arg_names` must be a list of strings") + f_doc.arg_names = c_arg_names + else: + raise ValueError("key `arg_names` cannot be None") + + if func_doc["options_class"] and isinstance(func_doc["options_class"], str): + f_doc.options_class = func_doc["options_class"].encode() + else: + raise ValueError("key `options_class` cannot be None") + + if isinstance(func_doc["options_required"], bool): + c_options_required = func_doc["options_required"] + f_doc.options_required = c_options_required + else: + raise ValueError("key `options_required` must be bool") + + return f_doc + else: + raise TypeError(f"func_doc must be a dictionary") + + +cdef class UDFError(Exception): + cdef dict __dict__ + + def __init__(self, message='', extra_info=b''): + super().__init__(message) + self.extra_info = tobytes(extra_info) + + cdef CStatus to_status(self): + message = tobytes("UDF error: {}".format(str(self))) + return CStatus_UnknownError(message) + + +cdef class UDFRegistrationError(UDFError): + + def __init__(self, message='', extra_info=b''): + super().__init__(message, extra_info) + + cdef CStatus to_status(self): + message = tobytes("UDF Registration error: {}".format(str(self))) + return CStatus_UnknownError(message) + + +def register_function(func_name, arity, function_doc, in_types, + out_type, callback, mem_allocation="no_preallocate", + null_handling="computed_no_preallocate"): + """ + Register a user-defined-function (function) + + Parameters + ---------- + + func_name: str + function name + arity: Arity + arity of the function + function_doc: dict + a dictionary object with keys + ("summary", + "description", + "arg_names", + "options_class", (not supported yet) + "options_required" (not supported yet) Review comment: It's ok to do this in pieces but can we just leave these keys out instead of saying "not supported yet"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
