This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f552989381fe [SPARK-54587][PYTHON] Consolidate all runner_conf related
code
f552989381fe is described below
commit f552989381fe878db9e6b5a0159dd059ba0b7766
Author: Tian Gao <[email protected]>
AuthorDate: Fri Dec 5 11:43:39 2025 +0800
[SPARK-54587][PYTHON] Consolidate all runner_conf related code
### What changes were proposed in this pull request?
Refactor runner_conf in `worker.py`. Put all the related logic together so
the code using it can just access its attributes.
### Why are the changes needed?
This is part of the effort to consolidate the worker protocol. We need to
clean up the code a bit first.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`pyspark-sql` passes locally. The rest we need to see CI results.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53315 from gaogaotiantian/consolidate-runner-conf.
Authored-by: Tian Gao <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/worker.py | 421 +++++++++++++++++++++--------------------------
1 file changed, 190 insertions(+), 231 deletions(-)
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 0c3fadbc6a9f..c49b633b9408 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -111,6 +111,82 @@ except Exception:
has_memory_profiler = False
+class RunnerConf:
+ def __init__(self, infile=None):
+ self._conf = {}
+ if infile is not None:
+ self.load(infile)
+
+ def load(self, infile):
+ num_conf = read_int(infile)
+ for i in range(num_conf):
+ k = utf8_deserializer.loads(infile)
+ v = utf8_deserializer.loads(infile)
+ self._conf[k] = v
+
+ def get(self, key: str, default=""):
+ val = self._conf.get(key, default)
+ if isinstance(val, str):
+ return val.lower()
+ return val
+
+ @property
+ def assign_cols_by_name(self) -> bool:
+ return (
+
self.get("spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName",
"true")
+ == "true"
+ )
+
+ @property
+ def use_large_var_types(self) -> bool:
+ return self.get("spark.sql.execution.arrow.useLargeVarTypes", "false")
== "true"
+
+ @property
+ def use_legacy_pandas_udf_conversion(self) -> bool:
+ return (
+
self.get("spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled",
"false")
+ == "true"
+ )
+
+ @property
+ def use_legacy_pandas_udtf_conversion(self) -> bool:
+ return (
+
self.get("spark.sql.legacy.execution.pythonUDTF.pandas.conversion.enabled",
"false")
+ == "true"
+ )
+
+ @property
+ def binary_as_bytes(self) -> bool:
+ return self.get("spark.sql.execution.pyspark.binaryAsBytes", "true")
== "true"
+
+ @property
+ def safecheck(self) -> bool:
+ return
self.get("spark.sql.execution.pandas.convertToArrowArraySafely", "false") ==
"true"
+
+ @property
+ def int_to_decimal_coercion_enabled(self) -> bool:
+ return (
+
self.get("spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled",
"false")
+ == "true"
+ )
+
+ @property
+ def timezone(self) -> Optional[str]:
+ return self.get("spark.sql.session.timeZone", None)
+
+ @property
+ def arrow_max_records_per_batch(self) -> int:
+ return int(self.get("spark.sql.execution.arrow.maxRecordsPerBatch",
10000))
+
+ @property
+ def arrow_max_bytes_per_batch(self) -> int:
+ return int(self.get("spark.sql.execution.arrow.maxBytesPerBatch",
2**31 - 1))
+
+ @property
+ def arrow_concurrency_level(self) -> int:
+ return
int(self.get("spark.sql.execution.pythonUDF.arrow.concurrency.level", -1))
+
+
def report_times(outfile, boot, init, finish):
write_int(SpecialLengths.TIMING_DATA, outfile)
write_long(int(1000 * boot), outfile)
@@ -137,7 +213,7 @@ def wrap_scalar_pandas_udf(f, args_offsets, kwargs_offsets,
return_type, runner_
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets,
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
def verify_result_type(result):
@@ -177,7 +253,7 @@ def wrap_scalar_arrow_udf(f, args_offsets, kwargs_offsets,
return_type, runner_c
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets,
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
def verify_result_type(result):
@@ -214,7 +290,7 @@ def wrap_scalar_arrow_udf(f, args_offsets, kwargs_offsets,
return_type, runner_c
def wrap_arrow_batch_udf(f, args_offsets, kwargs_offsets, return_type,
runner_conf):
- if use_legacy_pandas_udf_conversion(runner_conf):
+ if runner_conf.use_legacy_pandas_udf_conversion:
return wrap_arrow_batch_udf_legacy(
f, args_offsets, kwargs_offsets, return_type, runner_conf
)
@@ -233,7 +309,7 @@ def wrap_arrow_batch_udf_arrow(f, args_offsets,
kwargs_offsets, return_type, run
zero_arg_exec = True
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
if zero_arg_exec:
@@ -246,14 +322,12 @@ def wrap_arrow_batch_udf_arrow(f, args_offsets,
kwargs_offsets, return_type, run
def get_args(*args: list):
return zip(*args)
- if "spark.sql.execution.pythonUDF.arrow.concurrency.level" in runner_conf:
+ if runner_conf.arrow_concurrency_level > 0:
from concurrent.futures import ThreadPoolExecutor
- c =
int(runner_conf["spark.sql.execution.pythonUDF.arrow.concurrency.level"])
-
@fail_on_stopiteration
def evaluate(*args):
- with ThreadPoolExecutor(max_workers=c) as pool:
+ with
ThreadPoolExecutor(max_workers=runner_conf.arrow_concurrency_level) as pool:
"""
Takes list of Python objects and returns tuple of
(results, arrow_return_type, return_type).
@@ -298,7 +372,7 @@ def wrap_arrow_batch_udf_legacy(f, args_offsets,
kwargs_offsets, return_type, ru
zero_arg_exec = True
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
# "result_func" ensures the result of a Python UDF to be consistent
with/without Arrow
@@ -322,14 +396,12 @@ def wrap_arrow_batch_udf_legacy(f, args_offsets,
kwargs_offsets, return_type, ru
def get_args(*args: pd.Series):
return zip(*args)
- if "spark.sql.execution.pythonUDF.arrow.concurrency.level" in runner_conf:
+ if runner_conf.arrow_concurrency_level > 0:
from concurrent.futures import ThreadPoolExecutor
- c =
int(runner_conf["spark.sql.execution.pythonUDF.arrow.concurrency.level"])
-
@fail_on_stopiteration
def evaluate(*args: pd.Series) -> pd.Series:
- with ThreadPoolExecutor(max_workers=c) as pool:
+ with
ThreadPoolExecutor(max_workers=runner_conf.arrow_concurrency_level) as pool:
return pd.Series(
list(pool.map(lambda row: result_func(func(*row)),
get_args(*args)))
)
@@ -360,7 +432,7 @@ def wrap_arrow_batch_udf_legacy(f, args_offsets,
kwargs_offsets, return_type, ru
def wrap_pandas_batch_iter_udf(f, return_type, runner_conf):
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
iter_type_label = "pandas.DataFrame" if type(return_type) == StructType
else "pandas.Series"
@@ -460,7 +532,7 @@ def verify_pandas_result(result, return_type,
assign_cols_by_name, truncate_retu
def wrap_arrow_array_iter_udf(f, return_type, runner_conf):
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
def verify_result(result):
@@ -495,7 +567,7 @@ def wrap_arrow_array_iter_udf(f, return_type, runner_conf):
def wrap_arrow_batch_iter_udf(f, return_type, runner_conf):
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
def verify_result(result):
@@ -529,9 +601,7 @@ def wrap_arrow_batch_iter_udf(f, return_type, runner_conf):
def wrap_cogrouped_map_arrow_udf(f, return_type, argspec, runner_conf):
- _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
- if _assign_cols_by_name:
+ if runner_conf.assign_cols_by_name:
expected_cols_and_types = {
col.name: to_arrow_type(col.dataType) for col in return_type.fields
}
@@ -548,7 +618,7 @@ def wrap_cogrouped_map_arrow_udf(f, return_type, argspec,
runner_conf):
key = tuple(c[0] for c in key_table.columns)
result = f(key, left_value_table, right_value_table)
- verify_arrow_table(result, _assign_cols_by_name,
expected_cols_and_types)
+ verify_arrow_table(result, runner_conf.assign_cols_by_name,
expected_cols_and_types)
return result.to_batches()
@@ -556,9 +626,6 @@ def wrap_cogrouped_map_arrow_udf(f, return_type, argspec,
runner_conf):
def wrap_cogrouped_map_pandas_udf(f, return_type, argspec, runner_conf):
- _use_large_var_types = use_large_var_types(runner_conf)
- _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
def wrapped(left_key_series, left_value_series, right_key_series,
right_value_series):
import pandas as pd
@@ -572,12 +639,12 @@ def wrap_cogrouped_map_pandas_udf(f, return_type,
argspec, runner_conf):
key = tuple(s[0] for s in key_series)
result = f(key, left_df, right_df)
verify_pandas_result(
- result, return_type, _assign_cols_by_name,
truncate_return_schema=False
+ result, return_type, runner_conf.assign_cols_by_name,
truncate_return_schema=False
)
return result
- arrow_return_type = to_arrow_type(return_type, _use_large_var_types)
+ arrow_return_type = to_arrow_type(return_type,
runner_conf.use_large_var_types)
return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr),
arrow_return_type)]
@@ -675,9 +742,7 @@ def verify_arrow_batch(batch, assign_cols_by_name,
expected_cols_and_types):
def wrap_grouped_map_arrow_udf(f, return_type, argspec, runner_conf):
import pyarrow as pa
- _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
- if _assign_cols_by_name:
+ if runner_conf.assign_cols_by_name:
expected_cols_and_types = {
col.name: to_arrow_type(col.dataType) for col in return_type.fields
}
@@ -694,18 +759,16 @@ def wrap_grouped_map_arrow_udf(f, return_type, argspec,
runner_conf):
key = tuple(c[0] for c in key_batch.columns)
result = f(key, value_table)
- verify_arrow_table(result, _assign_cols_by_name,
expected_cols_and_types)
+ verify_arrow_table(result, runner_conf.assign_cols_by_name,
expected_cols_and_types)
yield from result.to_batches()
- arrow_return_type = to_arrow_type(return_type,
use_large_var_types(runner_conf))
+ arrow_return_type = to_arrow_type(return_type,
runner_conf.use_large_var_types)
return lambda k, v: (wrapped(k, v), arrow_return_type)
def wrap_grouped_map_arrow_iter_udf(f, return_type, argspec, runner_conf):
- _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
- if _assign_cols_by_name:
+ if runner_conf.assign_cols_by_name:
expected_cols_and_types = {
col.name: to_arrow_type(col.dataType) for col in return_type.fields
}
@@ -722,19 +785,16 @@ def wrap_grouped_map_arrow_iter_udf(f, return_type,
argspec, runner_conf):
result = f(key, value_batches)
def verify_element(batch):
- verify_arrow_batch(batch, _assign_cols_by_name,
expected_cols_and_types)
+ verify_arrow_batch(batch, runner_conf.assign_cols_by_name,
expected_cols_and_types)
return batch
yield from map(verify_element, result)
- arrow_return_type = to_arrow_type(return_type,
use_large_var_types(runner_conf))
+ arrow_return_type = to_arrow_type(return_type,
runner_conf.use_large_var_types)
return lambda k, v: (wrapped(k, v), arrow_return_type)
def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf):
- _use_large_var_types = use_large_var_types(runner_conf)
- _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
def wrapped(key_series, value_batches):
import pandas as pd
@@ -755,12 +815,12 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec,
runner_conf):
result = f(key, value_df)
verify_pandas_result(
- result, return_type, _assign_cols_by_name,
truncate_return_schema=False
+ result, return_type, runner_conf.assign_cols_by_name,
truncate_return_schema=False
)
yield result
- arrow_return_type = to_arrow_type(return_type, _use_large_var_types)
+ arrow_return_type = to_arrow_type(return_type,
runner_conf.use_large_var_types)
def flatten_wrapper(k, v):
# Return Iterator[[(df, arrow_type)]] directly
@@ -771,9 +831,6 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec,
runner_conf):
def wrap_grouped_map_pandas_iter_udf(f, return_type, argspec, runner_conf):
- _use_large_var_types = use_large_var_types(runner_conf)
- _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
def wrapped(key_series, value_batches):
import pandas as pd
@@ -792,13 +849,13 @@ def wrap_grouped_map_pandas_iter_udf(f, return_type,
argspec, runner_conf):
def verify_element(df):
verify_pandas_result(
- df, return_type, _assign_cols_by_name,
truncate_return_schema=False
+ df, return_type, runner_conf.assign_cols_by_name,
truncate_return_schema=False
)
return df
yield from map(verify_element, result)
- arrow_return_type = to_arrow_type(return_type, _use_large_var_types)
+ arrow_return_type = to_arrow_type(return_type,
runner_conf.use_large_var_types)
def flatten_wrapper(k, v):
# Return Iterator[[(df, arrow_type)]] directly
@@ -817,7 +874,7 @@ def wrap_grouped_transform_with_state_pandas_udf(f,
return_type, runner_conf):
return result_iter
- arrow_return_type = to_arrow_type(return_type,
use_large_var_types(runner_conf))
+ arrow_return_type = to_arrow_type(return_type,
runner_conf.use_large_var_types)
return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
@@ -836,7 +893,7 @@ def
wrap_grouped_transform_with_state_pandas_init_state_udf(f, return_type, runn
return result_iter
- arrow_return_type = to_arrow_type(return_type,
use_large_var_types(runner_conf))
+ arrow_return_type = to_arrow_type(return_type,
runner_conf.use_large_var_types)
return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
@@ -849,7 +906,7 @@ def wrap_grouped_transform_with_state_udf(f, return_type,
runner_conf):
return result_iter
- arrow_return_type = to_arrow_type(return_type,
use_large_var_types(runner_conf))
+ arrow_return_type = to_arrow_type(return_type,
runner_conf.use_large_var_types)
return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
@@ -869,7 +926,7 @@ def wrap_grouped_transform_with_state_init_state_udf(f,
return_type, runner_conf
return result_iter
- arrow_return_type = to_arrow_type(return_type,
use_large_var_types(runner_conf))
+ arrow_return_type = to_arrow_type(return_type,
runner_conf.use_large_var_types)
return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
@@ -888,7 +945,6 @@ def wrap_grouped_map_pandas_udf_with_state(f, return_type,
runner_conf):
Along with the returned iterator, the lambda instance will also produce
the return_type as
converted to the arrow schema.
"""
- _use_large_var_types = use_large_var_types(runner_conf)
def wrapped(key_series, value_series_gen, state):
"""
@@ -963,7 +1019,7 @@ def wrap_grouped_map_pandas_udf_with_state(f, return_type,
runner_conf):
state,
)
- arrow_return_type = to_arrow_type(return_type, _use_large_var_types)
+ arrow_return_type = to_arrow_type(return_type,
runner_conf.use_large_var_types)
return lambda k, v, s: [(wrapped(k, v, s), arrow_return_type)]
@@ -971,7 +1027,7 @@ def wrap_grouped_agg_pandas_udf(f, args_offsets,
kwargs_offsets, return_type, ru
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets,
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(*series):
@@ -990,7 +1046,7 @@ def wrap_grouped_agg_arrow_udf(f, args_offsets,
kwargs_offsets, return_type, run
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets,
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(*series):
@@ -1009,7 +1065,7 @@ def wrap_grouped_agg_arrow_iter_udf(f, args_offsets,
kwargs_offsets, return_type
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets,
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(batch_iter):
@@ -1075,7 +1131,7 @@ def wrap_unbounded_window_agg_pandas_udf(f, args_offsets,
kwargs_offsets, return
# to match window length, where grouped_agg_pandas_udf just returns
# the scalar value.
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(*series):
@@ -1096,7 +1152,7 @@ def wrap_unbounded_window_agg_arrow_udf(f, args_offsets,
kwargs_offsets, return_
# This is similar to wrap_unbounded_window_agg_pandas_udf, the only
difference
# is that this function is for arrow udf.
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(*series):
@@ -1117,7 +1173,7 @@ def wrap_bounded_window_agg_pandas_udf(f, args_offsets,
kwargs_offsets, return_t
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets[2:],
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(begin_index, end_index, *series):
@@ -1161,7 +1217,7 @@ def wrap_bounded_window_agg_arrow_udf(f, args_offsets,
kwargs_offsets, return_ty
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets[2:],
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=use_large_var_types(runner_conf)
+ return_type, prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(begin_index, end_index, *series):
@@ -1452,94 +1508,29 @@ def read_single_udf(pickleSer, infile, eval_type,
runner_conf, udf_index, profil
raise ValueError("Unknown eval type: {}".format(eval_type))
-# Used by SQL_GROUPED_MAP_PANDAS_UDF, SQL_GROUPED_MAP_ARROW_UDF,
-# SQL_COGROUPED_MAP_PANDAS_UDF, SQL_COGROUPED_MAP_ARROW_UDF,
-# SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE,
-# SQL_SCALAR_PANDAS_UDF and SQL_ARROW_BATCHED_UDF when
-# returning StructType
-def assign_cols_by_name(runner_conf):
- return (
- runner_conf.get(
-
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", "true"
- ).lower()
- == "true"
- )
-
-
-def use_large_var_types(runner_conf):
- return runner_conf.get("spark.sql.execution.arrow.useLargeVarTypes",
"false").lower() == "true"
-
-
-def use_legacy_pandas_udf_conversion(runner_conf):
- return (
- runner_conf.get(
- "spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled",
"false"
- ).lower()
- == "true"
- )
-
-
# Read and process a serialized user-defined table function (UDTF) from a
socket.
# It expects the UDTF to be in a specific format and performs various checks to
# ensure the UDTF is valid. This function also prepares a mapper function for
applying
# the UDTF logic to input rows.
def read_udtf(pickleSer, infile, eval_type):
- prefers_large_var_types = False
- legacy_pandas_conversion = False
- binary_as_bytes = True
-
if eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF:
- runner_conf = {}
# Load conf used for arrow evaluation.
- num_conf = read_int(infile)
- for i in range(num_conf):
- k = utf8_deserializer.loads(infile)
- v = utf8_deserializer.loads(infile)
- runner_conf[k] = v
- prefers_large_var_types = use_large_var_types(runner_conf)
- legacy_pandas_conversion = (
- runner_conf.get(
-
"spark.sql.legacy.execution.pythonUDTF.pandas.conversion.enabled", "false"
- ).lower()
- == "true"
- )
- binary_as_bytes = (
- runner_conf.get("spark.sql.execution.pyspark.binaryAsBytes",
"true").lower() == "true"
- )
+ runner_conf = RunnerConf(infile)
input_types = [
field.dataType for field in
_parse_datatype_json_string(utf8_deserializer.loads(infile))
]
- if legacy_pandas_conversion:
+ if runner_conf.use_legacy_pandas_udtf_conversion:
# NOTE: if timezone is set here, that implies
respectSessionTimeZone is True
- safecheck = (
- runner_conf.get(
- "spark.sql.execution.pandas.convertToArrowArraySafely",
"false"
- ).lower()
- == "true"
- )
- int_to_decimal_coercion_enabled = (
- runner_conf.get(
-
"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled", "false"
- ).lower()
- == "true"
- )
- timezone = runner_conf.get("spark.sql.session.timeZone", None)
ser = ArrowStreamPandasUDTFSerializer(
- timezone,
- safecheck,
+ runner_conf.timezone,
+ runner_conf.safecheck,
input_types=input_types,
-
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
)
else:
ser = ArrowStreamUDTFSerializer()
elif eval_type == PythonEvalType.SQL_ARROW_UDTF:
- runner_conf = {}
- num_conf = read_int(infile)
- for i in range(num_conf):
- k = utf8_deserializer.loads(infile)
- v = utf8_deserializer.loads(infile)
- runner_conf[k] = v
- prefers_large_var_types = use_large_var_types(runner_conf)
+ runner_conf = RunnerConf(infile)
# Read the table argument offsets
num_table_arg_offsets = read_int(infile)
table_arg_offsets = [read_int(infile) for _ in
range(num_table_arg_offsets)]
@@ -1547,6 +1538,7 @@ def read_udtf(pickleSer, infile, eval_type):
ser =
ArrowStreamArrowUDTFSerializer(table_arg_offsets=table_arg_offsets)
else:
# Each row is a group so do not batch but send one by one.
+ runner_conf = RunnerConf()
ser = BatchedSerializer(CPickleSerializer(), 1)
# See 'PythonUDTFRunner.PythonUDFWriterThread.writeCommand'
@@ -2177,13 +2169,16 @@ def read_udtf(pickleSer, infile, eval_type):
check_output_row_against_schema = build_null_checker(return_type)
- if eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF and
legacy_pandas_conversion:
+ if (
+ eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF
+ and runner_conf.use_legacy_pandas_udtf_conversion
+ ):
def wrap_arrow_udtf(f, return_type):
import pandas as pd
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=prefers_large_var_types
+ return_type,
prefers_large_types=runner_conf.use_large_var_types
)
return_type_size = len(return_type)
@@ -2301,13 +2296,16 @@ def read_udtf(pickleSer, infile, eval_type):
return mapper, None, ser, ser
- elif eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF and not
legacy_pandas_conversion:
+ elif (
+ eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF
+ and not runner_conf.use_legacy_pandas_udtf_conversion
+ ):
def wrap_arrow_udtf(f, return_type):
import pyarrow as pa
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=prefers_large_var_types
+ return_type,
prefers_large_types=runner_conf.use_large_var_types
)
return_type_size = len(return_type)
@@ -2399,7 +2397,7 @@ def read_udtf(pickleSer, infile, eval_type):
try:
table = LocalDataToArrowConversion.convert(
- data, return_type, prefers_large_var_types
+ data, return_type, runner_conf.use_large_var_types
)
except PySparkValueError as e:
if e.getErrorClass() == "AXIS_LENGTH_MISMATCH":
@@ -2451,7 +2449,7 @@ def read_udtf(pickleSer, infile, eval_type):
try:
converters = [
ArrowTableToRowsConversion._create_converter(
- dt, none_on_identity=True,
binary_as_bytes=binary_as_bytes
+ dt, none_on_identity=True,
binary_as_bytes=runner_conf.binary_as_bytes
)
for dt in input_types
]
@@ -2482,7 +2480,7 @@ def read_udtf(pickleSer, infile, eval_type):
import pyarrow as pa
arrow_return_type = to_arrow_type(
- return_type,
prefers_large_types=use_large_var_types(runner_conf)
+ return_type,
prefers_large_types=runner_conf.use_large_var_types
)
return_type_size = len(return_type)
@@ -2689,8 +2687,6 @@ def read_udtf(pickleSer, infile, eval_type):
def read_udfs(pickleSer, infile, eval_type):
- runner_conf = {}
-
state_server_port = None
key_schema = None
if eval_type in (
@@ -2719,11 +2715,7 @@ def read_udfs(pickleSer, infile, eval_type):
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF,
):
# Load conf used for pandas_udf evaluation
- num_conf = read_int(infile)
- for i in range(num_conf):
- k = utf8_deserializer.loads(infile)
- v = utf8_deserializer.loads(infile)
- runner_conf[k] = v
+ runner_conf = RunnerConf(infile)
state_object_schema = None
if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
@@ -2740,126 +2732,86 @@ def read_udfs(pickleSer, infile, eval_type):
key_schema =
StructType.fromJson(json.loads(utf8_deserializer.loads(infile)))
# NOTE: if timezone is set here, that implies respectSessionTimeZone
is True
- timezone = runner_conf.get("spark.sql.session.timeZone", None)
- prefers_large_var_types = use_large_var_types(runner_conf)
- safecheck = (
-
runner_conf.get("spark.sql.execution.pandas.convertToArrowArraySafely",
"false").lower()
- == "true"
- )
- int_to_decimal_coercion_enabled = (
- runner_conf.get(
-
"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled", "false"
- ).lower()
- == "true"
- )
- binary_as_bytes = (
- runner_conf.get("spark.sql.execution.pyspark.binaryAsBytes",
"true").lower() == "true"
- )
- _assign_cols_by_name = assign_cols_by_name(runner_conf)
-
if (
eval_type == PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF
or eval_type == PythonEvalType.SQL_GROUPED_MAP_ARROW_ITER_UDF
):
- ser = GroupArrowUDFSerializer(_assign_cols_by_name)
+ ser = GroupArrowUDFSerializer(runner_conf.assign_cols_by_name)
elif eval_type == PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF:
- ser = ArrowStreamAggArrowIterUDFSerializer(timezone, True,
_assign_cols_by_name, True)
+ ser = ArrowStreamAggArrowIterUDFSerializer(
+ runner_conf.timezone, True, runner_conf.assign_cols_by_name,
True
+ )
elif eval_type in (
PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF,
PythonEvalType.SQL_WINDOW_AGG_ARROW_UDF,
):
- ser = ArrowStreamAggArrowUDFSerializer(timezone, True,
_assign_cols_by_name, True)
+ ser = ArrowStreamAggArrowUDFSerializer(
+ runner_conf.timezone, True, runner_conf.assign_cols_by_name,
True
+ )
elif eval_type in (
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF,
):
ser = ArrowStreamAggPandasUDFSerializer(
- timezone, safecheck, _assign_cols_by_name,
int_to_decimal_coercion_enabled
+ runner_conf.timezone,
+ runner_conf.safecheck,
+ runner_conf.assign_cols_by_name,
+ runner_conf.int_to_decimal_coercion_enabled,
)
elif (
eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF
or eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF
):
ser = GroupPandasUDFSerializer(
- timezone, safecheck, _assign_cols_by_name,
int_to_decimal_coercion_enabled
+ runner_conf.timezone,
+ runner_conf.safecheck,
+ runner_conf.assign_cols_by_name,
+ runner_conf.int_to_decimal_coercion_enabled,
)
elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF:
- ser = CogroupArrowUDFSerializer(_assign_cols_by_name)
+ ser = CogroupArrowUDFSerializer(runner_conf.assign_cols_by_name)
elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
ser = CogroupPandasUDFSerializer(
- timezone,
- safecheck,
- _assign_cols_by_name,
-
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+ runner_conf.timezone,
+ runner_conf.safecheck,
+ runner_conf.assign_cols_by_name,
+
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
arrow_cast=True,
)
elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
- arrow_max_records_per_batch = runner_conf.get(
- "spark.sql.execution.arrow.maxRecordsPerBatch", 10000
- )
- arrow_max_records_per_batch = int(arrow_max_records_per_batch)
-
ser = ApplyInPandasWithStateSerializer(
- timezone,
- safecheck,
- _assign_cols_by_name,
+ runner_conf.timezone,
+ runner_conf.safecheck,
+ runner_conf.assign_cols_by_name,
state_object_schema,
- arrow_max_records_per_batch,
- prefers_large_var_types,
-
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+ runner_conf.arrow_max_records_per_batch,
+ runner_conf.use_large_var_types,
+
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
)
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF:
- arrow_max_records_per_batch = runner_conf.get(
- "spark.sql.execution.arrow.maxRecordsPerBatch", 10000
- )
- arrow_max_records_per_batch = int(arrow_max_records_per_batch)
-
- arrow_max_bytes_per_batch = runner_conf.get(
- "spark.sql.execution.arrow.maxBytesPerBatch", 2**31 - 1
- )
- arrow_max_bytes_per_batch = int(arrow_max_bytes_per_batch)
-
ser = TransformWithStateInPandasSerializer(
- timezone,
- safecheck,
- _assign_cols_by_name,
- arrow_max_records_per_batch,
- arrow_max_bytes_per_batch,
-
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+ runner_conf.timezone,
+ runner_conf.safecheck,
+ runner_conf.assign_cols_by_name,
+ runner_conf.arrow_max_records_per_batch,
+ runner_conf.arrow_max_bytes_per_batch,
+
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
)
elif eval_type ==
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF:
- arrow_max_records_per_batch = runner_conf.get(
- "spark.sql.execution.arrow.maxRecordsPerBatch", 10000
- )
- arrow_max_records_per_batch = int(arrow_max_records_per_batch)
-
- arrow_max_bytes_per_batch = runner_conf.get(
- "spark.sql.execution.arrow.maxBytesPerBatch", 2**31 - 1
- )
- arrow_max_bytes_per_batch = int(arrow_max_bytes_per_batch)
-
ser = TransformWithStateInPandasInitStateSerializer(
- timezone,
- safecheck,
- _assign_cols_by_name,
- arrow_max_records_per_batch,
- arrow_max_bytes_per_batch,
-
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+ runner_conf.timezone,
+ runner_conf.safecheck,
+ runner_conf.assign_cols_by_name,
+ runner_conf.arrow_max_records_per_batch,
+ runner_conf.arrow_max_bytes_per_batch,
+
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
)
elif eval_type ==
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF:
- arrow_max_records_per_batch = runner_conf.get(
- "spark.sql.execution.arrow.maxRecordsPerBatch", 10000
- )
- arrow_max_records_per_batch = int(arrow_max_records_per_batch)
-
- ser =
TransformWithStateInPySparkRowSerializer(arrow_max_records_per_batch)
+ ser =
TransformWithStateInPySparkRowSerializer(runner_conf.arrow_max_records_per_batch)
elif eval_type ==
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF:
- arrow_max_records_per_batch = runner_conf.get(
- "spark.sql.execution.arrow.maxRecordsPerBatch", 10000
+ ser = TransformWithStateInPySparkRowInitStateSerializer(
+ runner_conf.arrow_max_records_per_batch
)
- arrow_max_records_per_batch = int(arrow_max_records_per_batch)
-
- ser =
TransformWithStateInPySparkRowInitStateSerializer(arrow_max_records_per_batch)
elif eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF:
ser = ArrowStreamUDFSerializer()
elif eval_type in (
@@ -2867,16 +2819,22 @@ def read_udfs(pickleSer, infile, eval_type):
PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF,
):
# Arrow cast and safe check are always enabled
- ser = ArrowStreamArrowUDFSerializer(timezone, True,
_assign_cols_by_name, True)
+ ser = ArrowStreamArrowUDFSerializer(
+ runner_conf.timezone, True, runner_conf.assign_cols_by_name,
True
+ )
elif (
eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF
- and not use_legacy_pandas_udf_conversion(runner_conf)
+ and not runner_conf.use_legacy_pandas_udf_conversion
):
input_types = [
f.dataType for f in
_parse_datatype_json_string(utf8_deserializer.loads(infile))
]
ser = ArrowBatchUDFSerializer(
- timezone, safecheck, input_types,
int_to_decimal_coercion_enabled, binary_as_bytes
+ runner_conf.timezone,
+ runner_conf.safecheck,
+ input_types,
+ runner_conf.int_to_decimal_coercion_enabled,
+ runner_conf.binary_as_bytes,
)
else:
# Scalar Pandas UDF handles struct type arguments as pandas
DataFrames instead of
@@ -2899,17 +2857,18 @@ def read_udfs(pickleSer, infile, eval_type):
)
ser = ArrowStreamPandasUDFSerializer(
- timezone,
- safecheck,
- _assign_cols_by_name,
+ runner_conf.timezone,
+ runner_conf.safecheck,
+ runner_conf.assign_cols_by_name,
df_for_struct,
struct_in_pandas,
ndarray_as_list,
True,
input_types,
-
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
)
else:
+ runner_conf = RunnerConf()
batch_size = int(os.environ.get("PYTHON_UDF_BATCH_SIZE", "100"))
ser = BatchedSerializer(CPickleSerializer(), batch_size)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]