[
https://issues.apache.org/jira/browse/SPARK-53342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun updated SPARK-53342:
----------------------------------
Parent: SPARK-54357
Issue Type: Sub-task (was: Bug)
> CreateDataFrame will silently truncate data with multiple Arrow RecordBatches
> -----------------------------------------------------------------------------
>
> Key: SPARK-53342
> URL: https://issues.apache.org/jira/browse/SPARK-53342
> Project: Spark
> Issue Type: Sub-task
> Components: Connect, SQL
> Affects Versions: 3.5.0
> Reporter: Martin Grund
> Assignee: Martin Grund
> Priority: Major
> Labels: pull-request-available
> Fix For: 4.1.0, 4.0.1
>
>
> This is created from a discussion here:
> https://github.com/apache/spark-connect-go/issues/155
> Essentially the ArrowConverters do not properly deal with Arrow data that is
> serialized as multiple record batches. This in turn leads to missing data
> when loading from a local dataframe that has arrow data that is serialized
> with more than one record batch.
> Trivial PySpark repro:
> {code:python}
> from pyspark.sql.connect.plan import LocalRelation
> import pyspark.sql.types as t
> from pyspark.sql.connect.dataframe import DataFrame as CDF
> import pyarrow as pa
> import pyarrow.ipc as ipc
> # Create table with 100 integer rows and a single column named "ID"
> table = pa.Table.from_arrays([pa.array(range(100), type=pa.int64())],
> names=["ID"])
> # Serialize as Arrow IPC into a byte array with exactly 10 record batches
> sink = pa.BufferOutputStream()
> with ipc.new_stream(sink, table.schema) as writer:
> for b in table.to_batches(max_chunksize=10):
> print("batch....")
> writer.write_batch(b)
> ipc_bytes = sink.getvalue().to_pybytes()
> schema = t.StructType([
> t.StructField("ID", t.LongType(), True)
> ])
> lr = LocalRelation(table, schema.json())
> lr_df = CDF(lr, spark)
> agg_df = lr_df.groupBy().count()
> agg_df.show()
> # Extract the plan and modify the local relation
> plan = agg_df._plan.to_proto(spark._client)
> new_p = agg_df._plan.to_proto(spark._client)
> new_p.root.aggregate.input.local_relation.data = ipc_bytes
> #print(plan)
> #print(new_p)
> spark._client.to_table(plan, {})
> spark._client.to_table(new_p, {})
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]