This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f552989381fe [SPARK-54587][PYTHON] Consolidate all runner_conf related 
code
f552989381fe is described below

commit f552989381fe878db9e6b5a0159dd059ba0b7766
Author: Tian Gao <[email protected]>
AuthorDate: Fri Dec 5 11:43:39 2025 +0800

    [SPARK-54587][PYTHON] Consolidate all runner_conf related code
    
    ### What changes were proposed in this pull request?
    
    Refactor runner_conf in `worker.py`. Put all the related logic together so 
the code using it can just access its attributes.
    
    ### Why are the changes needed?
    
    This is part of the effort to consolidate the worker protocol. We need to 
clean up the code a bit first.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `pyspark-sql` passes locally. The rest we need to see CI results.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53315 from gaogaotiantian/consolidate-runner-conf.
    
    Authored-by: Tian Gao <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/worker.py | 421 +++++++++++++++++++++--------------------------
 1 file changed, 190 insertions(+), 231 deletions(-)

diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 0c3fadbc6a9f..c49b633b9408 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -111,6 +111,82 @@ except Exception:
     has_memory_profiler = False
 
 
+class RunnerConf:
+    def __init__(self, infile=None):
+        self._conf = {}
+        if infile is not None:
+            self.load(infile)
+
+    def load(self, infile):
+        num_conf = read_int(infile)
+        for i in range(num_conf):
+            k = utf8_deserializer.loads(infile)
+            v = utf8_deserializer.loads(infile)
+            self._conf[k] = v
+
+    def get(self, key: str, default=""):
+        val = self._conf.get(key, default)
+        if isinstance(val, str):
+            return val.lower()
+        return val
+
+    @property
+    def assign_cols_by_name(self) -> bool:
+        return (
+            
self.get("spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", 
"true")
+            == "true"
+        )
+
+    @property
+    def use_large_var_types(self) -> bool:
+        return self.get("spark.sql.execution.arrow.useLargeVarTypes", "false") 
== "true"
+
+    @property
+    def use_legacy_pandas_udf_conversion(self) -> bool:
+        return (
+            
self.get("spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled", 
"false")
+            == "true"
+        )
+
+    @property
+    def use_legacy_pandas_udtf_conversion(self) -> bool:
+        return (
+            
self.get("spark.sql.legacy.execution.pythonUDTF.pandas.conversion.enabled", 
"false")
+            == "true"
+        )
+
+    @property
+    def binary_as_bytes(self) -> bool:
+        return self.get("spark.sql.execution.pyspark.binaryAsBytes", "true") 
== "true"
+
+    @property
+    def safecheck(self) -> bool:
+        return 
self.get("spark.sql.execution.pandas.convertToArrowArraySafely", "false") == 
"true"
+
+    @property
+    def int_to_decimal_coercion_enabled(self) -> bool:
+        return (
+            
self.get("spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled", 
"false")
+            == "true"
+        )
+
+    @property
+    def timezone(self) -> Optional[str]:
+        return self.get("spark.sql.session.timeZone", None)
+
+    @property
+    def arrow_max_records_per_batch(self) -> int:
+        return int(self.get("spark.sql.execution.arrow.maxRecordsPerBatch", 
10000))
+
+    @property
+    def arrow_max_bytes_per_batch(self) -> int:
+        return int(self.get("spark.sql.execution.arrow.maxBytesPerBatch", 
2**31 - 1))
+
+    @property
+    def arrow_concurrency_level(self) -> int:
+        return 
int(self.get("spark.sql.execution.pythonUDF.arrow.concurrency.level", -1))
+
+
 def report_times(outfile, boot, init, finish):
     write_int(SpecialLengths.TIMING_DATA, outfile)
     write_long(int(1000 * boot), outfile)
@@ -137,7 +213,7 @@ def wrap_scalar_pandas_udf(f, args_offsets, kwargs_offsets, 
return_type, runner_
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
 
     def verify_result_type(result):
@@ -177,7 +253,7 @@ def wrap_scalar_arrow_udf(f, args_offsets, kwargs_offsets, 
return_type, runner_c
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
 
     def verify_result_type(result):
@@ -214,7 +290,7 @@ def wrap_scalar_arrow_udf(f, args_offsets, kwargs_offsets, 
return_type, runner_c
 
 
 def wrap_arrow_batch_udf(f, args_offsets, kwargs_offsets, return_type, 
runner_conf):
-    if use_legacy_pandas_udf_conversion(runner_conf):
+    if runner_conf.use_legacy_pandas_udf_conversion:
         return wrap_arrow_batch_udf_legacy(
             f, args_offsets, kwargs_offsets, return_type, runner_conf
         )
@@ -233,7 +309,7 @@ def wrap_arrow_batch_udf_arrow(f, args_offsets, 
kwargs_offsets, return_type, run
         zero_arg_exec = True
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
 
     if zero_arg_exec:
@@ -246,14 +322,12 @@ def wrap_arrow_batch_udf_arrow(f, args_offsets, 
kwargs_offsets, return_type, run
         def get_args(*args: list):
             return zip(*args)
 
-    if "spark.sql.execution.pythonUDF.arrow.concurrency.level" in runner_conf:
+    if runner_conf.arrow_concurrency_level > 0:
         from concurrent.futures import ThreadPoolExecutor
 
-        c = 
int(runner_conf["spark.sql.execution.pythonUDF.arrow.concurrency.level"])
-
         @fail_on_stopiteration
         def evaluate(*args):
-            with ThreadPoolExecutor(max_workers=c) as pool:
+            with 
ThreadPoolExecutor(max_workers=runner_conf.arrow_concurrency_level) as pool:
                 """
                 Takes list of Python objects and returns tuple of
                 (results, arrow_return_type, return_type).
@@ -298,7 +372,7 @@ def wrap_arrow_batch_udf_legacy(f, args_offsets, 
kwargs_offsets, return_type, ru
         zero_arg_exec = True
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
 
     # "result_func" ensures the result of a Python UDF to be consistent 
with/without Arrow
@@ -322,14 +396,12 @@ def wrap_arrow_batch_udf_legacy(f, args_offsets, 
kwargs_offsets, return_type, ru
         def get_args(*args: pd.Series):
             return zip(*args)
 
-    if "spark.sql.execution.pythonUDF.arrow.concurrency.level" in runner_conf:
+    if runner_conf.arrow_concurrency_level > 0:
         from concurrent.futures import ThreadPoolExecutor
 
-        c = 
int(runner_conf["spark.sql.execution.pythonUDF.arrow.concurrency.level"])
-
         @fail_on_stopiteration
         def evaluate(*args: pd.Series) -> pd.Series:
-            with ThreadPoolExecutor(max_workers=c) as pool:
+            with 
ThreadPoolExecutor(max_workers=runner_conf.arrow_concurrency_level) as pool:
                 return pd.Series(
                     list(pool.map(lambda row: result_func(func(*row)), 
get_args(*args)))
                 )
@@ -360,7 +432,7 @@ def wrap_arrow_batch_udf_legacy(f, args_offsets, 
kwargs_offsets, return_type, ru
 
 def wrap_pandas_batch_iter_udf(f, return_type, runner_conf):
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
     iter_type_label = "pandas.DataFrame" if type(return_type) == StructType 
else "pandas.Series"
 
@@ -460,7 +532,7 @@ def verify_pandas_result(result, return_type, 
assign_cols_by_name, truncate_retu
 
 def wrap_arrow_array_iter_udf(f, return_type, runner_conf):
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
 
     def verify_result(result):
@@ -495,7 +567,7 @@ def wrap_arrow_array_iter_udf(f, return_type, runner_conf):
 
 def wrap_arrow_batch_iter_udf(f, return_type, runner_conf):
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
 
     def verify_result(result):
@@ -529,9 +601,7 @@ def wrap_arrow_batch_iter_udf(f, return_type, runner_conf):
 
 
 def wrap_cogrouped_map_arrow_udf(f, return_type, argspec, runner_conf):
-    _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
-    if _assign_cols_by_name:
+    if runner_conf.assign_cols_by_name:
         expected_cols_and_types = {
             col.name: to_arrow_type(col.dataType) for col in return_type.fields
         }
@@ -548,7 +618,7 @@ def wrap_cogrouped_map_arrow_udf(f, return_type, argspec, 
runner_conf):
             key = tuple(c[0] for c in key_table.columns)
             result = f(key, left_value_table, right_value_table)
 
-        verify_arrow_table(result, _assign_cols_by_name, 
expected_cols_and_types)
+        verify_arrow_table(result, runner_conf.assign_cols_by_name, 
expected_cols_and_types)
 
         return result.to_batches()
 
@@ -556,9 +626,6 @@ def wrap_cogrouped_map_arrow_udf(f, return_type, argspec, 
runner_conf):
 
 
 def wrap_cogrouped_map_pandas_udf(f, return_type, argspec, runner_conf):
-    _use_large_var_types = use_large_var_types(runner_conf)
-    _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
     def wrapped(left_key_series, left_value_series, right_key_series, 
right_value_series):
         import pandas as pd
 
@@ -572,12 +639,12 @@ def wrap_cogrouped_map_pandas_udf(f, return_type, 
argspec, runner_conf):
             key = tuple(s[0] for s in key_series)
             result = f(key, left_df, right_df)
         verify_pandas_result(
-            result, return_type, _assign_cols_by_name, 
truncate_return_schema=False
+            result, return_type, runner_conf.assign_cols_by_name, 
truncate_return_schema=False
         )
 
         return result
 
-    arrow_return_type = to_arrow_type(return_type, _use_large_var_types)
+    arrow_return_type = to_arrow_type(return_type, 
runner_conf.use_large_var_types)
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), 
arrow_return_type)]
 
 
@@ -675,9 +742,7 @@ def verify_arrow_batch(batch, assign_cols_by_name, 
expected_cols_and_types):
 def wrap_grouped_map_arrow_udf(f, return_type, argspec, runner_conf):
     import pyarrow as pa
 
-    _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
-    if _assign_cols_by_name:
+    if runner_conf.assign_cols_by_name:
         expected_cols_and_types = {
             col.name: to_arrow_type(col.dataType) for col in return_type.fields
         }
@@ -694,18 +759,16 @@ def wrap_grouped_map_arrow_udf(f, return_type, argspec, 
runner_conf):
             key = tuple(c[0] for c in key_batch.columns)
             result = f(key, value_table)
 
-        verify_arrow_table(result, _assign_cols_by_name, 
expected_cols_and_types)
+        verify_arrow_table(result, runner_conf.assign_cols_by_name, 
expected_cols_and_types)
 
         yield from result.to_batches()
 
-    arrow_return_type = to_arrow_type(return_type, 
use_large_var_types(runner_conf))
+    arrow_return_type = to_arrow_type(return_type, 
runner_conf.use_large_var_types)
     return lambda k, v: (wrapped(k, v), arrow_return_type)
 
 
 def wrap_grouped_map_arrow_iter_udf(f, return_type, argspec, runner_conf):
-    _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
-    if _assign_cols_by_name:
+    if runner_conf.assign_cols_by_name:
         expected_cols_and_types = {
             col.name: to_arrow_type(col.dataType) for col in return_type.fields
         }
@@ -722,19 +785,16 @@ def wrap_grouped_map_arrow_iter_udf(f, return_type, 
argspec, runner_conf):
             result = f(key, value_batches)
 
         def verify_element(batch):
-            verify_arrow_batch(batch, _assign_cols_by_name, 
expected_cols_and_types)
+            verify_arrow_batch(batch, runner_conf.assign_cols_by_name, 
expected_cols_and_types)
             return batch
 
         yield from map(verify_element, result)
 
-    arrow_return_type = to_arrow_type(return_type, 
use_large_var_types(runner_conf))
+    arrow_return_type = to_arrow_type(return_type, 
runner_conf.use_large_var_types)
     return lambda k, v: (wrapped(k, v), arrow_return_type)
 
 
 def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf):
-    _use_large_var_types = use_large_var_types(runner_conf)
-    _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
     def wrapped(key_series, value_batches):
         import pandas as pd
 
@@ -755,12 +815,12 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec, 
runner_conf):
             result = f(key, value_df)
 
         verify_pandas_result(
-            result, return_type, _assign_cols_by_name, 
truncate_return_schema=False
+            result, return_type, runner_conf.assign_cols_by_name, 
truncate_return_schema=False
         )
 
         yield result
 
-    arrow_return_type = to_arrow_type(return_type, _use_large_var_types)
+    arrow_return_type = to_arrow_type(return_type, 
runner_conf.use_large_var_types)
 
     def flatten_wrapper(k, v):
         # Return Iterator[[(df, arrow_type)]] directly
@@ -771,9 +831,6 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec, 
runner_conf):
 
 
 def wrap_grouped_map_pandas_iter_udf(f, return_type, argspec, runner_conf):
-    _use_large_var_types = use_large_var_types(runner_conf)
-    _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
     def wrapped(key_series, value_batches):
         import pandas as pd
 
@@ -792,13 +849,13 @@ def wrap_grouped_map_pandas_iter_udf(f, return_type, 
argspec, runner_conf):
 
         def verify_element(df):
             verify_pandas_result(
-                df, return_type, _assign_cols_by_name, 
truncate_return_schema=False
+                df, return_type, runner_conf.assign_cols_by_name, 
truncate_return_schema=False
             )
             return df
 
         yield from map(verify_element, result)
 
-    arrow_return_type = to_arrow_type(return_type, _use_large_var_types)
+    arrow_return_type = to_arrow_type(return_type, 
runner_conf.use_large_var_types)
 
     def flatten_wrapper(k, v):
         # Return Iterator[[(df, arrow_type)]] directly
@@ -817,7 +874,7 @@ def wrap_grouped_transform_with_state_pandas_udf(f, 
return_type, runner_conf):
 
         return result_iter
 
-    arrow_return_type = to_arrow_type(return_type, 
use_large_var_types(runner_conf))
+    arrow_return_type = to_arrow_type(return_type, 
runner_conf.use_large_var_types)
     return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
 
 
@@ -836,7 +893,7 @@ def 
wrap_grouped_transform_with_state_pandas_init_state_udf(f, return_type, runn
 
         return result_iter
 
-    arrow_return_type = to_arrow_type(return_type, 
use_large_var_types(runner_conf))
+    arrow_return_type = to_arrow_type(return_type, 
runner_conf.use_large_var_types)
     return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
 
 
@@ -849,7 +906,7 @@ def wrap_grouped_transform_with_state_udf(f, return_type, 
runner_conf):
 
         return result_iter
 
-    arrow_return_type = to_arrow_type(return_type, 
use_large_var_types(runner_conf))
+    arrow_return_type = to_arrow_type(return_type, 
runner_conf.use_large_var_types)
     return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
 
 
@@ -869,7 +926,7 @@ def wrap_grouped_transform_with_state_init_state_udf(f, 
return_type, runner_conf
 
         return result_iter
 
-    arrow_return_type = to_arrow_type(return_type, 
use_large_var_types(runner_conf))
+    arrow_return_type = to_arrow_type(return_type, 
runner_conf.use_large_var_types)
     return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
 
 
@@ -888,7 +945,6 @@ def wrap_grouped_map_pandas_udf_with_state(f, return_type, 
runner_conf):
     Along with the returned iterator, the lambda instance will also produce 
the return_type as
     converted to the arrow schema.
     """
-    _use_large_var_types = use_large_var_types(runner_conf)
 
     def wrapped(key_series, value_series_gen, state):
         """
@@ -963,7 +1019,7 @@ def wrap_grouped_map_pandas_udf_with_state(f, return_type, 
runner_conf):
             state,
         )
 
-    arrow_return_type = to_arrow_type(return_type, _use_large_var_types)
+    arrow_return_type = to_arrow_type(return_type, 
runner_conf.use_large_var_types)
     return lambda k, v, s: [(wrapped(k, v, s), arrow_return_type)]
 
 
@@ -971,7 +1027,7 @@ def wrap_grouped_agg_pandas_udf(f, args_offsets, 
kwargs_offsets, return_type, ru
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(*series):
@@ -990,7 +1046,7 @@ def wrap_grouped_agg_arrow_udf(f, args_offsets, 
kwargs_offsets, return_type, run
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(*series):
@@ -1009,7 +1065,7 @@ def wrap_grouped_agg_arrow_iter_udf(f, args_offsets, 
kwargs_offsets, return_type
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(batch_iter):
@@ -1075,7 +1131,7 @@ def wrap_unbounded_window_agg_pandas_udf(f, args_offsets, 
kwargs_offsets, return
     # to match window length, where grouped_agg_pandas_udf just returns
     # the scalar value.
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(*series):
@@ -1096,7 +1152,7 @@ def wrap_unbounded_window_agg_arrow_udf(f, args_offsets, 
kwargs_offsets, return_
     # This is similar to wrap_unbounded_window_agg_pandas_udf, the only 
difference
     # is that this function is for arrow udf.
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(*series):
@@ -1117,7 +1173,7 @@ def wrap_bounded_window_agg_pandas_udf(f, args_offsets, 
kwargs_offsets, return_t
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets[2:], 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(begin_index, end_index, *series):
@@ -1161,7 +1217,7 @@ def wrap_bounded_window_agg_arrow_udf(f, args_offsets, 
kwargs_offsets, return_ty
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets[2:], 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=use_large_var_types(runner_conf)
+        return_type, prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(begin_index, end_index, *series):
@@ -1452,94 +1508,29 @@ def read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index, profil
         raise ValueError("Unknown eval type: {}".format(eval_type))
 
 
-# Used by SQL_GROUPED_MAP_PANDAS_UDF, SQL_GROUPED_MAP_ARROW_UDF,
-# SQL_COGROUPED_MAP_PANDAS_UDF, SQL_COGROUPED_MAP_ARROW_UDF,
-# SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE,
-# SQL_SCALAR_PANDAS_UDF and SQL_ARROW_BATCHED_UDF when
-# returning StructType
-def assign_cols_by_name(runner_conf):
-    return (
-        runner_conf.get(
-            
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", "true"
-        ).lower()
-        == "true"
-    )
-
-
-def use_large_var_types(runner_conf):
-    return runner_conf.get("spark.sql.execution.arrow.useLargeVarTypes", 
"false").lower() == "true"
-
-
-def use_legacy_pandas_udf_conversion(runner_conf):
-    return (
-        runner_conf.get(
-            "spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled", 
"false"
-        ).lower()
-        == "true"
-    )
-
-
 # Read and process a serialized user-defined table function (UDTF) from a 
socket.
 # It expects the UDTF to be in a specific format and performs various checks to
 # ensure the UDTF is valid. This function also prepares a mapper function for 
applying
 # the UDTF logic to input rows.
 def read_udtf(pickleSer, infile, eval_type):
-    prefers_large_var_types = False
-    legacy_pandas_conversion = False
-    binary_as_bytes = True
-
     if eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF:
-        runner_conf = {}
         # Load conf used for arrow evaluation.
-        num_conf = read_int(infile)
-        for i in range(num_conf):
-            k = utf8_deserializer.loads(infile)
-            v = utf8_deserializer.loads(infile)
-            runner_conf[k] = v
-        prefers_large_var_types = use_large_var_types(runner_conf)
-        legacy_pandas_conversion = (
-            runner_conf.get(
-                
"spark.sql.legacy.execution.pythonUDTF.pandas.conversion.enabled", "false"
-            ).lower()
-            == "true"
-        )
-        binary_as_bytes = (
-            runner_conf.get("spark.sql.execution.pyspark.binaryAsBytes", 
"true").lower() == "true"
-        )
+        runner_conf = RunnerConf(infile)
         input_types = [
             field.dataType for field in 
_parse_datatype_json_string(utf8_deserializer.loads(infile))
         ]
-        if legacy_pandas_conversion:
+        if runner_conf.use_legacy_pandas_udtf_conversion:
             # NOTE: if timezone is set here, that implies 
respectSessionTimeZone is True
-            safecheck = (
-                runner_conf.get(
-                    "spark.sql.execution.pandas.convertToArrowArraySafely", 
"false"
-                ).lower()
-                == "true"
-            )
-            int_to_decimal_coercion_enabled = (
-                runner_conf.get(
-                    
"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled", "false"
-                ).lower()
-                == "true"
-            )
-            timezone = runner_conf.get("spark.sql.session.timeZone", None)
             ser = ArrowStreamPandasUDTFSerializer(
-                timezone,
-                safecheck,
+                runner_conf.timezone,
+                runner_conf.safecheck,
                 input_types=input_types,
-                
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+                
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
             )
         else:
             ser = ArrowStreamUDTFSerializer()
     elif eval_type == PythonEvalType.SQL_ARROW_UDTF:
-        runner_conf = {}
-        num_conf = read_int(infile)
-        for i in range(num_conf):
-            k = utf8_deserializer.loads(infile)
-            v = utf8_deserializer.loads(infile)
-            runner_conf[k] = v
-        prefers_large_var_types = use_large_var_types(runner_conf)
+        runner_conf = RunnerConf(infile)
         # Read the table argument offsets
         num_table_arg_offsets = read_int(infile)
         table_arg_offsets = [read_int(infile) for _ in 
range(num_table_arg_offsets)]
@@ -1547,6 +1538,7 @@ def read_udtf(pickleSer, infile, eval_type):
         ser = 
ArrowStreamArrowUDTFSerializer(table_arg_offsets=table_arg_offsets)
     else:
         # Each row is a group so do not batch but send one by one.
+        runner_conf = RunnerConf()
         ser = BatchedSerializer(CPickleSerializer(), 1)
 
     # See 'PythonUDTFRunner.PythonUDFWriterThread.writeCommand'
@@ -2177,13 +2169,16 @@ def read_udtf(pickleSer, infile, eval_type):
 
     check_output_row_against_schema = build_null_checker(return_type)
 
-    if eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF and 
legacy_pandas_conversion:
+    if (
+        eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF
+        and runner_conf.use_legacy_pandas_udtf_conversion
+    ):
 
         def wrap_arrow_udtf(f, return_type):
             import pandas as pd
 
             arrow_return_type = to_arrow_type(
-                return_type, prefers_large_types=prefers_large_var_types
+                return_type, 
prefers_large_types=runner_conf.use_large_var_types
             )
             return_type_size = len(return_type)
 
@@ -2301,13 +2296,16 @@ def read_udtf(pickleSer, infile, eval_type):
 
         return mapper, None, ser, ser
 
-    elif eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF and not 
legacy_pandas_conversion:
+    elif (
+        eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF
+        and not runner_conf.use_legacy_pandas_udtf_conversion
+    ):
 
         def wrap_arrow_udtf(f, return_type):
             import pyarrow as pa
 
             arrow_return_type = to_arrow_type(
-                return_type, prefers_large_types=prefers_large_var_types
+                return_type, 
prefers_large_types=runner_conf.use_large_var_types
             )
             return_type_size = len(return_type)
 
@@ -2399,7 +2397,7 @@ def read_udtf(pickleSer, infile, eval_type):
 
                 try:
                     table = LocalDataToArrowConversion.convert(
-                        data, return_type, prefers_large_var_types
+                        data, return_type, runner_conf.use_large_var_types
                     )
                 except PySparkValueError as e:
                     if e.getErrorClass() == "AXIS_LENGTH_MISMATCH":
@@ -2451,7 +2449,7 @@ def read_udtf(pickleSer, infile, eval_type):
             try:
                 converters = [
                     ArrowTableToRowsConversion._create_converter(
-                        dt, none_on_identity=True, 
binary_as_bytes=binary_as_bytes
+                        dt, none_on_identity=True, 
binary_as_bytes=runner_conf.binary_as_bytes
                     )
                     for dt in input_types
                 ]
@@ -2482,7 +2480,7 @@ def read_udtf(pickleSer, infile, eval_type):
             import pyarrow as pa
 
             arrow_return_type = to_arrow_type(
-                return_type, 
prefers_large_types=use_large_var_types(runner_conf)
+                return_type, 
prefers_large_types=runner_conf.use_large_var_types
             )
             return_type_size = len(return_type)
 
@@ -2689,8 +2687,6 @@ def read_udtf(pickleSer, infile, eval_type):
 
 
 def read_udfs(pickleSer, infile, eval_type):
-    runner_conf = {}
-
     state_server_port = None
     key_schema = None
     if eval_type in (
@@ -2719,11 +2715,7 @@ def read_udfs(pickleSer, infile, eval_type):
         PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF,
     ):
         # Load conf used for pandas_udf evaluation
-        num_conf = read_int(infile)
-        for i in range(num_conf):
-            k = utf8_deserializer.loads(infile)
-            v = utf8_deserializer.loads(infile)
-            runner_conf[k] = v
+        runner_conf = RunnerConf(infile)
 
         state_object_schema = None
         if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
@@ -2740,126 +2732,86 @@ def read_udfs(pickleSer, infile, eval_type):
             key_schema = 
StructType.fromJson(json.loads(utf8_deserializer.loads(infile)))
 
         # NOTE: if timezone is set here, that implies respectSessionTimeZone 
is True
-        timezone = runner_conf.get("spark.sql.session.timeZone", None)
-        prefers_large_var_types = use_large_var_types(runner_conf)
-        safecheck = (
-            
runner_conf.get("spark.sql.execution.pandas.convertToArrowArraySafely", 
"false").lower()
-            == "true"
-        )
-        int_to_decimal_coercion_enabled = (
-            runner_conf.get(
-                
"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled", "false"
-            ).lower()
-            == "true"
-        )
-        binary_as_bytes = (
-            runner_conf.get("spark.sql.execution.pyspark.binaryAsBytes", 
"true").lower() == "true"
-        )
-        _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
         if (
             eval_type == PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF
             or eval_type == PythonEvalType.SQL_GROUPED_MAP_ARROW_ITER_UDF
         ):
-            ser = GroupArrowUDFSerializer(_assign_cols_by_name)
+            ser = GroupArrowUDFSerializer(runner_conf.assign_cols_by_name)
         elif eval_type == PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF:
-            ser = ArrowStreamAggArrowIterUDFSerializer(timezone, True, 
_assign_cols_by_name, True)
+            ser = ArrowStreamAggArrowIterUDFSerializer(
+                runner_conf.timezone, True, runner_conf.assign_cols_by_name, 
True
+            )
         elif eval_type in (
             PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF,
             PythonEvalType.SQL_WINDOW_AGG_ARROW_UDF,
         ):
-            ser = ArrowStreamAggArrowUDFSerializer(timezone, True, 
_assign_cols_by_name, True)
+            ser = ArrowStreamAggArrowUDFSerializer(
+                runner_conf.timezone, True, runner_conf.assign_cols_by_name, 
True
+            )
         elif eval_type in (
             PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
             PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF,
         ):
             ser = ArrowStreamAggPandasUDFSerializer(
-                timezone, safecheck, _assign_cols_by_name, 
int_to_decimal_coercion_enabled
+                runner_conf.timezone,
+                runner_conf.safecheck,
+                runner_conf.assign_cols_by_name,
+                runner_conf.int_to_decimal_coercion_enabled,
             )
         elif (
             eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF
             or eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF
         ):
             ser = GroupPandasUDFSerializer(
-                timezone, safecheck, _assign_cols_by_name, 
int_to_decimal_coercion_enabled
+                runner_conf.timezone,
+                runner_conf.safecheck,
+                runner_conf.assign_cols_by_name,
+                runner_conf.int_to_decimal_coercion_enabled,
             )
         elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF:
-            ser = CogroupArrowUDFSerializer(_assign_cols_by_name)
+            ser = CogroupArrowUDFSerializer(runner_conf.assign_cols_by_name)
         elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
             ser = CogroupPandasUDFSerializer(
-                timezone,
-                safecheck,
-                _assign_cols_by_name,
-                
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+                runner_conf.timezone,
+                runner_conf.safecheck,
+                runner_conf.assign_cols_by_name,
+                
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
                 arrow_cast=True,
             )
         elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
-            arrow_max_records_per_batch = runner_conf.get(
-                "spark.sql.execution.arrow.maxRecordsPerBatch", 10000
-            )
-            arrow_max_records_per_batch = int(arrow_max_records_per_batch)
-
             ser = ApplyInPandasWithStateSerializer(
-                timezone,
-                safecheck,
-                _assign_cols_by_name,
+                runner_conf.timezone,
+                runner_conf.safecheck,
+                runner_conf.assign_cols_by_name,
                 state_object_schema,
-                arrow_max_records_per_batch,
-                prefers_large_var_types,
-                
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+                runner_conf.arrow_max_records_per_batch,
+                runner_conf.use_large_var_types,
+                
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
             )
         elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF:
-            arrow_max_records_per_batch = runner_conf.get(
-                "spark.sql.execution.arrow.maxRecordsPerBatch", 10000
-            )
-            arrow_max_records_per_batch = int(arrow_max_records_per_batch)
-
-            arrow_max_bytes_per_batch = runner_conf.get(
-                "spark.sql.execution.arrow.maxBytesPerBatch", 2**31 - 1
-            )
-            arrow_max_bytes_per_batch = int(arrow_max_bytes_per_batch)
-
             ser = TransformWithStateInPandasSerializer(
-                timezone,
-                safecheck,
-                _assign_cols_by_name,
-                arrow_max_records_per_batch,
-                arrow_max_bytes_per_batch,
-                
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+                runner_conf.timezone,
+                runner_conf.safecheck,
+                runner_conf.assign_cols_by_name,
+                runner_conf.arrow_max_records_per_batch,
+                runner_conf.arrow_max_bytes_per_batch,
+                
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
             )
         elif eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF:
-            arrow_max_records_per_batch = runner_conf.get(
-                "spark.sql.execution.arrow.maxRecordsPerBatch", 10000
-            )
-            arrow_max_records_per_batch = int(arrow_max_records_per_batch)
-
-            arrow_max_bytes_per_batch = runner_conf.get(
-                "spark.sql.execution.arrow.maxBytesPerBatch", 2**31 - 1
-            )
-            arrow_max_bytes_per_batch = int(arrow_max_bytes_per_batch)
-
             ser = TransformWithStateInPandasInitStateSerializer(
-                timezone,
-                safecheck,
-                _assign_cols_by_name,
-                arrow_max_records_per_batch,
-                arrow_max_bytes_per_batch,
-                
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+                runner_conf.timezone,
+                runner_conf.safecheck,
+                runner_conf.assign_cols_by_name,
+                runner_conf.arrow_max_records_per_batch,
+                runner_conf.arrow_max_bytes_per_batch,
+                
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
             )
         elif eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF:
-            arrow_max_records_per_batch = runner_conf.get(
-                "spark.sql.execution.arrow.maxRecordsPerBatch", 10000
-            )
-            arrow_max_records_per_batch = int(arrow_max_records_per_batch)
-
-            ser = 
TransformWithStateInPySparkRowSerializer(arrow_max_records_per_batch)
+            ser = 
TransformWithStateInPySparkRowSerializer(runner_conf.arrow_max_records_per_batch)
         elif eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF:
-            arrow_max_records_per_batch = runner_conf.get(
-                "spark.sql.execution.arrow.maxRecordsPerBatch", 10000
+            ser = TransformWithStateInPySparkRowInitStateSerializer(
+                runner_conf.arrow_max_records_per_batch
             )
-            arrow_max_records_per_batch = int(arrow_max_records_per_batch)
-
-            ser = 
TransformWithStateInPySparkRowInitStateSerializer(arrow_max_records_per_batch)
         elif eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF:
             ser = ArrowStreamUDFSerializer()
         elif eval_type in (
@@ -2867,16 +2819,22 @@ def read_udfs(pickleSer, infile, eval_type):
             PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF,
         ):
             # Arrow cast and safe check are always enabled
-            ser = ArrowStreamArrowUDFSerializer(timezone, True, 
_assign_cols_by_name, True)
+            ser = ArrowStreamArrowUDFSerializer(
+                runner_conf.timezone, True, runner_conf.assign_cols_by_name, 
True
+            )
         elif (
             eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF
-            and not use_legacy_pandas_udf_conversion(runner_conf)
+            and not runner_conf.use_legacy_pandas_udf_conversion
         ):
             input_types = [
                 f.dataType for f in 
_parse_datatype_json_string(utf8_deserializer.loads(infile))
             ]
             ser = ArrowBatchUDFSerializer(
-                timezone, safecheck, input_types, 
int_to_decimal_coercion_enabled, binary_as_bytes
+                runner_conf.timezone,
+                runner_conf.safecheck,
+                input_types,
+                runner_conf.int_to_decimal_coercion_enabled,
+                runner_conf.binary_as_bytes,
             )
         else:
             # Scalar Pandas UDF handles struct type arguments as pandas 
DataFrames instead of
@@ -2899,17 +2857,18 @@ def read_udfs(pickleSer, infile, eval_type):
             )
 
             ser = ArrowStreamPandasUDFSerializer(
-                timezone,
-                safecheck,
-                _assign_cols_by_name,
+                runner_conf.timezone,
+                runner_conf.safecheck,
+                runner_conf.assign_cols_by_name,
                 df_for_struct,
                 struct_in_pandas,
                 ndarray_as_list,
                 True,
                 input_types,
-                
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+                
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
             )
     else:
+        runner_conf = RunnerConf()
         batch_size = int(os.environ.get("PYTHON_UDF_BATCH_SIZE", "100"))
         ser = BatchedSerializer(CPickleSerializer(), batch_size)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to