mesejo commented on code in PR #880: URL: https://github.com/apache/datafusion-python/pull/880#discussion_r1777392689
########## examples/python-udwf.py: ########## @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +import datafusion +from datafusion import udwf, functions as f, col, lit +from datafusion.udf import WindowEvaluator +from datafusion.expr import WindowFrame + +# This example creates five different examples of user defined window functions in order +# to demonstrate the variety of ways a user may need to implement. + + +class ExponentialSmoothDefault(WindowEvaluator): + """Create a running smooth operation across an entire partition at once.""" + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array: + results = [] + curr_value = 0.0 + values = values[0] + for idx in range(num_rows): + if idx == 0: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + results.append(curr_value) + + return pa.array(results) + + +class SmoothBoundedFromPreviousRow(WindowEvaluator): + """Smooth over from the previous to current row only.""" + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def supports_bounded_execution(self) -> bool: + return True + + def get_range(self, idx: int, num_rows: int) -> tuple[int, int]: + # Ovrerride the default range of current row since uses_window_frame is False Review Comment: ```suggestion # Override the default range of current row since uses_window_frame is False ``` ########## examples/python-udwf.py: ########## @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one Review Comment: As a side comment: Should we have automated tests of the examples? Nothing fancy; simple execution of the scripts should be enough ########## python/datafusion/tests/test_udwf.py: ########## @@ -0,0 +1,294 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +import pytest + +from datafusion import SessionContext, column, udwf, lit, functions as f +from datafusion.udf import WindowEvaluator +from datafusion.expr import WindowFrame + + +class ExponentialSmoothDefault(WindowEvaluator): + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array: + results = [] + curr_value = 0.0 + values = values[0] + for idx in range(num_rows): + if idx == 0: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + results.append(curr_value) + + return pa.array(results) + + +class ExponentialSmoothBounded(WindowEvaluator): + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def supports_bounded_execution(self) -> bool: + return True + + def get_range(self, idx: int, num_rows: int) -> tuple[int, int]: + # Ovrerride the default range of current row since uses_window_frame is False Review Comment: ```suggestion # Override the default range of current row since uses_window_frame is False ``` ########## docs/source/user-guide/common-operations/udf-and-udfa.rst: ########## Review Comment: nit: There is a mix of user-defined and user defined (without hyphen) usages ########## docs/source/user-guide/common-operations/udf-and-udfa.rst: ########## @@ -35,14 +48,70 @@ However you can still incorporate your own functions, i.e. User-Defined Function ctx = datafusion.SessionContext() batch = pyarrow.RecordBatch.from_arrays( - [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])], + [pyarrow.array([1, None, 3]), pyarrow.array([4, 5, 6])], names=["a", "b"], ) df = ctx.create_dataframe([[batch]], name="batch_array") - df.select(is_null_arr(col("a"))).to_pandas() + df.select(col("a"), is_null_arr(col("a")).alias("is_null")).show() + +In the previous example, we used the fact that pyarrow provides a variety of built in array +functions such as ``is_null()``. There are additional pyarrow +`compute functions <https://arrow.apache.org/docs/python/compute.html>`_ available. When possible, +it is highly recommended to use these functions because they can perform computations without doing +any copy operations from the original arrays. This leads to greatly improved performance. + +If you need to perform an operation in python that is not available with the pyarrow compute +functions, you will need to convert the record batch into python values, perform your operation, +and construct an array. This operation of converting the built in data type of the array into a +python object can be one of the slowest operations in DataFusion, so it should be done sparingly. + +The following example performs the same operation as before with ``is_null`` but demonstrates +converting to Python objects to do the evaluation. + +.. ipython:: python + + import pyarrow + import datafusion + from datafusion import udf, col + + def is_null(array: pyarrow.Array) -> pyarrow.Array: + results = [] Review Comment: ```suggestion return pyarrow.array([value.as_py() is None for value in array]) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org