[ 
https://issues.apache.org/jira/browse/SPARK-53342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-53342:
-------------------------------------

    Assignee: Martin Grund

> CreateDataFrame will silently truncate data with multiple Arrow RecordBatches
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-53342
>                 URL: https://issues.apache.org/jira/browse/SPARK-53342
>             Project: Spark
>          Issue Type: Bug
>          Components: Connect, SQL
>    Affects Versions: 3.5.0
>            Reporter: Martin Grund
>            Assignee: Martin Grund
>            Priority: Major
>              Labels: pull-request-available
>
> This is created from a discussion here: 
> https://github.com/apache/spark-connect-go/issues/155
> Essentially the ArrowConverters do not properly deal with Arrow data that is 
> serialized as multiple record batches. This in turn leads to missing data 
> when loading from a local dataframe that has arrow data that is serialized 
> with more than one record batch.
> Trivial PySpark repro:
> {code:python}
> from pyspark.sql.connect.plan import LocalRelation
> import pyspark.sql.types as t
> from pyspark.sql.connect.dataframe import DataFrame as CDF
> import pyarrow as pa
> import pyarrow.ipc as ipc
> # Create table with 100 integer rows and a single column named "ID"
> table = pa.Table.from_arrays([pa.array(range(100), type=pa.int64())], 
> names=["ID"])
> # Serialize as Arrow IPC into a byte array with exactly 10 record batches
> sink = pa.BufferOutputStream()
> with ipc.new_stream(sink, table.schema) as writer:
>     for b in table.to_batches(max_chunksize=10):
>         print("batch....")
>         writer.write_batch(b)
> ipc_bytes = sink.getvalue().to_pybytes()
> schema = t.StructType([
>     t.StructField("ID", t.LongType(), True)
> ])
> lr = LocalRelation(table, schema.json())
> lr_df = CDF(lr, spark)
> agg_df = lr_df.groupBy().count()
> agg_df.show()
> # Extract the plan and modify the local relation
> plan = agg_df._plan.to_proto(spark._client)
> new_p = agg_df._plan.to_proto(spark._client)
> new_p.root.aggregate.input.local_relation.data = ipc_bytes
> #print(plan)
> #print(new_p)
> spark._client.to_table(plan, {})
> spark._client.to_table(new_p, {})
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to