[ https://issues.apache.org/jira/browse/SPARK-53342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun reassigned SPARK-53342: ------------------------------------- Assignee: Martin Grund > CreateDataFrame will silently truncate data with multiple Arrow RecordBatches > ----------------------------------------------------------------------------- > > Key: SPARK-53342 > URL: https://issues.apache.org/jira/browse/SPARK-53342 > Project: Spark > Issue Type: Bug > Components: Connect, SQL > Affects Versions: 3.5.0 > Reporter: Martin Grund > Assignee: Martin Grund > Priority: Major > Labels: pull-request-available > > This is created from a discussion here: > https://github.com/apache/spark-connect-go/issues/155 > Essentially the ArrowConverters do not properly deal with Arrow data that is > serialized as multiple record batches. This in turn leads to missing data > when loading from a local dataframe that has arrow data that is serialized > with more than one record batch. > Trivial PySpark repro: > {code:python} > from pyspark.sql.connect.plan import LocalRelation > import pyspark.sql.types as t > from pyspark.sql.connect.dataframe import DataFrame as CDF > import pyarrow as pa > import pyarrow.ipc as ipc > # Create table with 100 integer rows and a single column named "ID" > table = pa.Table.from_arrays([pa.array(range(100), type=pa.int64())], > names=["ID"]) > # Serialize as Arrow IPC into a byte array with exactly 10 record batches > sink = pa.BufferOutputStream() > with ipc.new_stream(sink, table.schema) as writer: > for b in table.to_batches(max_chunksize=10): > print("batch....") > writer.write_batch(b) > ipc_bytes = sink.getvalue().to_pybytes() > schema = t.StructType([ > t.StructField("ID", t.LongType(), True) > ]) > lr = LocalRelation(table, schema.json()) > lr_df = CDF(lr, spark) > agg_df = lr_df.groupBy().count() > agg_df.show() > # Extract the plan and modify the local relation > plan = agg_df._plan.to_proto(spark._client) > new_p = agg_df._plan.to_proto(spark._client) > new_p.root.aggregate.input.local_relation.data = ipc_bytes > #print(plan) > #print(new_p) > spark._client.to_table(plan, {}) > spark._client.to_table(new_p, {}) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org