This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch arrow-worker
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 323ca112ce0d95f970d7c0f64d907def01074f94
Author: pawelkocinski <[email protected]>
AuthorDate: Sun Jul 27 00:29:41 2025 +0200

    SEDONA-738 Fix unit tests.
---
 sedonaworker/worker.py | 126 +++++--------------------------------------------
 1 file changed, 12 insertions(+), 114 deletions(-)

diff --git a/sedonaworker/worker.py b/sedonaworker/worker.py
index 42fb20beb3..365561f0a6 100644
--- a/sedonaworker/worker.py
+++ b/sedonaworker/worker.py
@@ -204,8 +204,6 @@ def read_udfs(pickleSer, infile, eval_type):
 
         if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
             ser = CogroupUDFSerializer(timezone, safecheck, 
assign_cols_by_name(runner_conf))
-        elif eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF:
-            ser = ArrowStreamUDFSerializer()
         else:
             # Scalar Pandas UDF handles struct type arguments as pandas 
DataFrames instead of
             # pandas Series. See SPARK-27240.
@@ -300,118 +298,18 @@ def read_udfs(pickleSer, infile, eval_type):
 
         # profiling is not supported for UDF
         return func, None, ser, ser
-
-    def extract_key_value_indexes(grouped_arg_offsets):
-        """
-        Helper function to extract the key and value indexes from arg_offsets 
for the grouped and
-        cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for 
equivalent scala code.
-
-        Parameters
-        ----------
-        grouped_arg_offsets:  list
-            List containing the key and value indexes of columns of the
-            DataFrames to be passed to the udf. It consists of n repeating 
groups where n is the
-            number of DataFrames.  Each group has the following format:
-                group[0]: length of group
-                group[1]: length of key indexes
-                group[2.. group[1] +2]: key attributes
-                group[group[1] +3 group[0]]: value attributes
-        """
-        parsed = []
-        idx = 0
-        while idx < len(grouped_arg_offsets):
-            offsets_len = grouped_arg_offsets[idx]
-            idx += 1
-            offsets = grouped_arg_offsets[idx : idx + offsets_len]
-            split_index = offsets[0] + 1
-            offset_keys = offsets[1:split_index]
-            offset_values = offsets[split_index:]
-            parsed.append([offset_keys, offset_values])
-            idx += offsets_len
-        return parsed
-
-    if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
-        # We assume there is only one UDF here because grouped map doesn't
-        # support combining multiple UDFs.
-        assert num_udfs == 1
-
-        # See FlatMapGroupsInPandasExec for how arg_offsets are used to
-        # distinguish between grouping attributes and data attributes
-        arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=0)
-        parsed_offsets = extract_key_value_indexes(arg_offsets)
-
-        # Create function like this:
-        #   mapper a: f([a[0]], [a[0], a[1]])
-        def mapper(a):
-            keys = [a[o] for o in parsed_offsets[0][0]]
-            vals = [a[o] for o in parsed_offsets[0][1]]
-            return f(keys, vals)
-
-    elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
-        # We assume there is only one UDF here because grouped map doesn't
-        # support combining multiple UDFs.
-        assert num_udfs == 1
-
-        # See FlatMapGroupsInPandas(WithState)Exec for how arg_offsets are 
used to
-        # distinguish between grouping attributes and data attributes
-        arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=0)
-        parsed_offsets = extract_key_value_indexes(arg_offsets)
-
-        def mapper(a):
-            """
-            The function receives (iterator of data, state) and performs 
extraction of key and
-            value from the data, with retaining lazy evaluation.
-
-            See `load_stream` in `ApplyInPandasWithStateSerializer` for more 
details on the input
-            and see `wrap_grouped_map_pandas_udf_with_state` for more details 
on how output will
-            be used.
-            """
-            from itertools import tee
-
-            state = a[1]
-            data_gen = (x[0] for x in a[0])
-
-            # We know there should be at least one item in the 
iterator/generator.
-            # We want to peek the first element to construct the key, hence 
applying
-            # tee to construct the key while we retain another 
iterator/generator
-            # for values.
-            keys_gen, values_gen = tee(data_gen)
-            keys_elem = next(keys_gen)
-            keys = [keys_elem[o] for o in parsed_offsets[0][0]]
-
-            # This must be generator comprehension - do not materialize.
-            vals = ([x[o] for o in parsed_offsets[0][1]] for x in values_gen)
-
-            return f(keys, vals, state)
-
-    elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
-        # We assume there is only one UDF here because cogrouped map doesn't
-        # support combining multiple UDFs.
-        assert num_udfs == 1
-        arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=0)
-
-        parsed_offsets = extract_key_value_indexes(arg_offsets)
-
-        def mapper(a):
-            df1_keys = [a[0][o] for o in parsed_offsets[0][0]]
-            df1_vals = [a[0][o] for o in parsed_offsets[0][1]]
-            df2_keys = [a[1][o] for o in parsed_offsets[1][0]]
-            df2_vals = [a[1][o] for o in parsed_offsets[1][1]]
-            return f(df1_keys, df1_vals, df2_keys, df2_vals)
-
-    else:
-        udfs = []
-        for i in range(num_udfs):
-            udfs.append(read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=i))
-
-        def mapper(a):
-            result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, 
f) in udfs)
-            # In the special case of a single UDF this will return a single 
result rather
-            # than a tuple of results; this is the format that the JVM side 
expects.
-            if len(result) == 1:
-                return result[0]
-            else:
-                return result
+    udfs = []
+    for i in range(num_udfs):
+        udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, 
udf_index=i))
+
+    def mapper(a):
+        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
+        # In the special case of a single UDF this will return a single result 
rather
+        # than a tuple of results; this is the format that the JVM side 
expects.
+        if len(result) == 1:
+            return result[0]
+        else:
+            return result
 
     def func(_, it):
         return map(mapper, it)

Reply via email to