[ 
https://issues.apache.org/jira/browse/SPARK-53917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-53917:
----------------------------------
    Summary: Supporting large LocalRelations  (was: [CONNECT] Supporting large 
LocalRelations)

> Supporting large LocalRelations
> -------------------------------
>
>                 Key: SPARK-53917
>                 URL: https://issues.apache.org/jira/browse/SPARK-53917
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Connect, PySpark
>    Affects Versions: 4.1.0
>            Reporter: Alex Khakhlyuk
>            Assignee: Alex Khakhlyuk
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.1.0
>
>         Attachments: image-2025-10-15-13-50-04-179.png, 
> image-2025-10-15-13-50-44-333.png, image-2025-10-15-13-53-40-306.png, 
> image-2025-10-15-13-56-46-840.png, image-2025-10-15-13-59-08-081.png
>
>
> h1. Problem description
> LocalRelation is a Catalyst logical operator used to represent a dataset of 
> rows inline as part of the LogicalPlan. LocalRelations represent dataframes 
> created directly from Python and Scala objects, e.g., Python and Scala lists, 
> pandas dataframes, csv files loaded in memory, etc.
> In Spark Connect, local relations are transferred over gRPC using 
> LocalRelation (for relations under 64MB) and CachedLocalRelation (larger 
> relations over 64MB) messages.
> CachedLocalRelations currently have a hard size limit of 2GB, which means 
> that spark users can’t execute queries with local client data, pandas 
> dataframes, csv files of over 2GB.
> h1. Design
> In Spark Connect, the client needs to serialize the local relation before 
> transferring it to the server. It serializes data via an Arrow IPC stream as 
> a single record batch and schema as a json string. It then embeds data and 
> schema as LocalRelation\{schema,data} proto message.
> Small local relations (under 64MB) are sent directly as part of the 
> ExecutePlanRequest.
> !image-2025-10-15-13-50-04-179.png!
> Larger local relations are first sent to the server via addArtifact and 
> stored in memory or on disk via BlockManager. Then an ExecutePlanRequest is 
> sent containing CachedLocalRelation\{hash}, where hash is the artifact hash. 
> The server retrieves the cached LocalRelation from the BlockManager via the 
> hash, deserializes it, adds it to the LogicalPlan and then executes it.
> !image-2025-10-15-13-50-44-333.png!
>  
> The server reads the data from the BlockManager as a stream and tries to 
> create proto.LocalRelation via
> {quote}proto.Relation
> .newBuilder()
> .getLocalRelation
> .getParserForType
> .parseFrom(blockData.toInputStream())
> {quote}
> This fails, because java protobuf library has a 2GB limit on deserializing 
> protobuf messages from a string.
> {quote}org.sparkproject.connect.com.google.protobuf.InvalidProtocolBufferException)
>  CodedInputStream encountered an embedded string or message which claimed to 
> have negative size.
> {quote}
> !image-2025-10-15-13-53-40-306.png!
> To fix this, I propose avoiding the protobuf layer during the serialization 
> on the client and deserialization on the server. Instead of caching the full 
> protobuf LocalRelation message, we cache the data and schema as separate 
> artifacts, send two hashes \{data_hash, schema_hash} to the server, load them 
> both from BlockManager directly and create a LocalRelation on the server 
> based on the unpacked data and schema.
> !image-2025-10-15-13-56-46-840.png!
> After creating a prototype with the new proto message, I discovered that 
> there are additional limits for CachedLocalRelations. Both the Scala Client 
> and the Server store the data in a single Java {{{}Array[Byte]{}}}, which has 
> a 2GB size limit in Java. To avoid this limit, I propose transferring data in 
> chunks. The Python and Scala clients will split data into multiple Arrow 
> batches and upload them separately to the server. Each batch will be uploaded 
> and stored a separate artifact. The Server will then load and process each 
> batch separately. We will keep batch sizes around 16MB (TBD), well below the 
> 2GB limit. This way we will avoid 2GB limits on both clients and on the 
> server.
> !image-2025-10-15-13-59-08-081.png!
> The final proto message looks like this:
> {quote}message ChunkedCachedLocalRelation {
>   // (Required) A list of sha-256 hashes for representing LocalRelation.data.
>   repeated string dataHashes = 1;
>   // (Optional) A sha-256 hash of the serialized LocalRelation.schema.
>   optional string schemaHash = 2;
> }
> {quote}
> Implementation details are discussed in the PR 
> [https://github.com/apache/spark/pull/52613].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to