This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 93acd1fc3ff7 [SPARK-55473][PYTHON] Replace itertools.tee with chain in
applyInPandasWithState
93acd1fc3ff7 is described below
commit 93acd1fc3ff7732d58b7195fd4e7d2c9679babcd
Author: Yicong Huang <[email protected]>
AuthorDate: Wed Feb 11 10:03:49 2026 +0900
[SPARK-55473][PYTHON] Replace itertools.tee with chain in
applyInPandasWithState
### What changes were proposed in this pull request?
Replace `itertools.tee` with `itertools.chain` in the
`applyInPandasWithState` mapper function to eliminate unnecessary buffering
overhead.
### Why are the changes needed?
Reduces memory overhead for large groups.
### Does this PR introduce any user-facing changes?
No.
### How was this patch tested?
Existing tests
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54250 from
Yicong-Huang/SPARK-55473/optimize/remove-itertools-tee-applyInPandasWithState.
Authored-by: Yicong Huang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/worker.py | 13 +++++--------
1 file changed, 5 insertions(+), 8 deletions(-)
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index ef2e830216ca..62a7d87f0224 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -3142,21 +3142,18 @@ def read_udfs(pickleSer, infile, eval_type,
runner_conf, eval_conf):
and see `wrap_grouped_map_pandas_udf_with_state` for more details
on how output will
be used.
"""
- from itertools import tee
+ from itertools import chain
state = a[1]
data_gen = (x[0] for x in a[0])
# We know there should be at least one item in the
iterator/generator.
- # We want to peek the first element to construct the key, hence
applying
- # tee to construct the key while we retain another
iterator/generator
- # for values.
- keys_gen, values_gen = tee(data_gen)
- keys_elem = next(keys_gen)
- keys = [keys_elem[o] for o in parsed_offsets[0][0]]
+ # Consume the first element to extract keys
+ first_elem = next(data_gen)
+ keys = [first_elem[o] for o in parsed_offsets[0][0]]
# This must be generator comprehension - do not materialize.
- vals = ([x[o] for o in parsed_offsets[0][1]] for x in values_gen)
+ vals = ([x[o] for o in parsed_offsets[0][1]] for x in
chain([first_elem], data_gen))
return f(keys, vals, state)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]