Yicong Huang created SPARK-55168:
------------------------------------

             Summary: Refactor GroupArrowUDFSerializer to use 
ArrowBatchTransformer.flatten_struct
                 Key: SPARK-55168
                 URL: https://issues.apache.org/jira/browse/SPARK-55168
             Project: Spark
          Issue Type: Sub-task
          Components: PySpark
    Affects Versions: 4.2.0
            Reporter: Yicong Huang



Currently, {{GroupArrowUDFSerializer.load_stream}} has an inline 
{{process_group}} function that duplicates the flatten_struct logic already 
extracted in {{ArrowBatchTransformer}}:

{code:python}
# Current code in GroupArrowUDFSerializer.load_stream (line 1021-1024)
def process_group(batches: "Iterator[pa.RecordBatch]"):
    for batch in batches:
        struct = batch.column(0)
        yield pa.RecordBatch.from_arrays(struct.flatten(), 
schema=pa.schema(struct.type))
{code}

This is identical to {{ArrowBatchTransformer.flatten_struct}}. We should reuse 
the existing transformer:

{code:python}
# Proposed change
def load_stream(self, stream):
    ...
    if dataframes_in_group == 1:
        batch_iter = map(ArrowBatchTransformer.flatten_struct, 
                         ArrowStreamSerializer.load_stream(self, stream))
        yield batch_iter
    ...
{code}

This continues Phase 1 of SPARK-55159 by eliminating duplicated transformation 
logic and improving code reuse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to