Yicong Huang created SPARK-55168:
------------------------------------
Summary: Refactor GroupArrowUDFSerializer to use
ArrowBatchTransformer.flatten_struct
Key: SPARK-55168
URL: https://issues.apache.org/jira/browse/SPARK-55168
Project: Spark
Issue Type: Sub-task
Components: PySpark
Affects Versions: 4.2.0
Reporter: Yicong Huang
Currently, {{GroupArrowUDFSerializer.load_stream}} has an inline
{{process_group}} function that duplicates the flatten_struct logic already
extracted in {{ArrowBatchTransformer}}:
{code:python}
# Current code in GroupArrowUDFSerializer.load_stream (line 1021-1024)
def process_group(batches: "Iterator[pa.RecordBatch]"):
for batch in batches:
struct = batch.column(0)
yield pa.RecordBatch.from_arrays(struct.flatten(),
schema=pa.schema(struct.type))
{code}
This is identical to {{ArrowBatchTransformer.flatten_struct}}. We should reuse
the existing transformer:
{code:python}
# Proposed change
def load_stream(self, stream):
...
if dataframes_in_group == 1:
batch_iter = map(ArrowBatchTransformer.flatten_struct,
ArrowStreamSerializer.load_stream(self, stream))
yield batch_iter
...
{code}
This continues Phase 1 of SPARK-55159 by eliminating duplicated transformation
logic and improving code reuse.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]