[
https://issues.apache.org/jira/browse/SPARK-53917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun updated SPARK-53917:
----------------------------------
Summary: Supporting large LocalRelations (was: [CONNECT] Supporting large
LocalRelations)
> Supporting large LocalRelations
> -------------------------------
>
> Key: SPARK-53917
> URL: https://issues.apache.org/jira/browse/SPARK-53917
> Project: Spark
> Issue Type: Sub-task
> Components: Connect, PySpark
> Affects Versions: 4.1.0
> Reporter: Alex Khakhlyuk
> Assignee: Alex Khakhlyuk
> Priority: Major
> Labels: pull-request-available
> Fix For: 4.1.0
>
> Attachments: image-2025-10-15-13-50-04-179.png,
> image-2025-10-15-13-50-44-333.png, image-2025-10-15-13-53-40-306.png,
> image-2025-10-15-13-56-46-840.png, image-2025-10-15-13-59-08-081.png
>
>
> h1. Problem description
> LocalRelation is a Catalyst logical operator used to represent a dataset of
> rows inline as part of the LogicalPlan. LocalRelations represent dataframes
> created directly from Python and Scala objects, e.g., Python and Scala lists,
> pandas dataframes, csv files loaded in memory, etc.
> In Spark Connect, local relations are transferred over gRPC using
> LocalRelation (for relations under 64MB) and CachedLocalRelation (larger
> relations over 64MB) messages.
> CachedLocalRelations currently have a hard size limit of 2GB, which means
> that spark users can’t execute queries with local client data, pandas
> dataframes, csv files of over 2GB.
> h1. Design
> In Spark Connect, the client needs to serialize the local relation before
> transferring it to the server. It serializes data via an Arrow IPC stream as
> a single record batch and schema as a json string. It then embeds data and
> schema as LocalRelation\{schema,data} proto message.
> Small local relations (under 64MB) are sent directly as part of the
> ExecutePlanRequest.
> !image-2025-10-15-13-50-04-179.png!
> Larger local relations are first sent to the server via addArtifact and
> stored in memory or on disk via BlockManager. Then an ExecutePlanRequest is
> sent containing CachedLocalRelation\{hash}, where hash is the artifact hash.
> The server retrieves the cached LocalRelation from the BlockManager via the
> hash, deserializes it, adds it to the LogicalPlan and then executes it.
> !image-2025-10-15-13-50-44-333.png!
>
> The server reads the data from the BlockManager as a stream and tries to
> create proto.LocalRelation via
> {quote}proto.Relation
> .newBuilder()
> .getLocalRelation
> .getParserForType
> .parseFrom(blockData.toInputStream())
> {quote}
> This fails, because java protobuf library has a 2GB limit on deserializing
> protobuf messages from a string.
> {quote}org.sparkproject.connect.com.google.protobuf.InvalidProtocolBufferException)
> CodedInputStream encountered an embedded string or message which claimed to
> have negative size.
> {quote}
> !image-2025-10-15-13-53-40-306.png!
> To fix this, I propose avoiding the protobuf layer during the serialization
> on the client and deserialization on the server. Instead of caching the full
> protobuf LocalRelation message, we cache the data and schema as separate
> artifacts, send two hashes \{data_hash, schema_hash} to the server, load them
> both from BlockManager directly and create a LocalRelation on the server
> based on the unpacked data and schema.
> !image-2025-10-15-13-56-46-840.png!
> After creating a prototype with the new proto message, I discovered that
> there are additional limits for CachedLocalRelations. Both the Scala Client
> and the Server store the data in a single Java {{{}Array[Byte]{}}}, which has
> a 2GB size limit in Java. To avoid this limit, I propose transferring data in
> chunks. The Python and Scala clients will split data into multiple Arrow
> batches and upload them separately to the server. Each batch will be uploaded
> and stored a separate artifact. The Server will then load and process each
> batch separately. We will keep batch sizes around 16MB (TBD), well below the
> 2GB limit. This way we will avoid 2GB limits on both clients and on the
> server.
> !image-2025-10-15-13-59-08-081.png!
> The final proto message looks like this:
> {quote}message ChunkedCachedLocalRelation {
> // (Required) A list of sha-256 hashes for representing LocalRelation.data.
> repeated string dataHashes = 1;
> // (Optional) A sha-256 hash of the serialized LocalRelation.schema.
> optional string schemaHash = 2;
> }
> {quote}
> Implementation details are discussed in the PR
> [https://github.com/apache/spark/pull/52613].
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]