This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 93acd1fc3ff7 [SPARK-55473][PYTHON] Replace itertools.tee with chain in 
applyInPandasWithState
93acd1fc3ff7 is described below

commit 93acd1fc3ff7732d58b7195fd4e7d2c9679babcd
Author: Yicong Huang <[email protected]>
AuthorDate: Wed Feb 11 10:03:49 2026 +0900

    [SPARK-55473][PYTHON] Replace itertools.tee with chain in 
applyInPandasWithState
    
    ### What changes were proposed in this pull request?
    
    Replace `itertools.tee` with `itertools.chain` in the 
`applyInPandasWithState` mapper function to eliminate unnecessary buffering 
overhead.
    
    ### Why are the changes needed?
    
    Reduces memory overhead for large groups.
    
    ### Does this PR introduce any user-facing changes?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54250 from 
Yicong-Huang/SPARK-55473/optimize/remove-itertools-tee-applyInPandasWithState.
    
    Authored-by: Yicong Huang <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/worker.py | 13 +++++--------
 1 file changed, 5 insertions(+), 8 deletions(-)

diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index ef2e830216ca..62a7d87f0224 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -3142,21 +3142,18 @@ def read_udfs(pickleSer, infile, eval_type, 
runner_conf, eval_conf):
             and see `wrap_grouped_map_pandas_udf_with_state` for more details 
on how output will
             be used.
             """
-            from itertools import tee
+            from itertools import chain
 
             state = a[1]
             data_gen = (x[0] for x in a[0])
 
             # We know there should be at least one item in the 
iterator/generator.
-            # We want to peek the first element to construct the key, hence 
applying
-            # tee to construct the key while we retain another 
iterator/generator
-            # for values.
-            keys_gen, values_gen = tee(data_gen)
-            keys_elem = next(keys_gen)
-            keys = [keys_elem[o] for o in parsed_offsets[0][0]]
+            # Consume the first element to extract keys
+            first_elem = next(data_gen)
+            keys = [first_elem[o] for o in parsed_offsets[0][0]]
 
             # This must be generator comprehension - do not materialize.
-            vals = ([x[o] for o in parsed_offsets[0][1]] for x in values_gen)
+            vals = ([x[o] for o in parsed_offsets[0][1]] for x in 
chain([first_elem], data_gen))
 
             return f(keys, vals, state)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to