This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ac13473fff64 [SPARK-55047][CONNECT] Add client-side limit for local 
relation size
ac13473fff64 is described below

commit ac13473fff64919e8e7756e3a42ce3a68627dd73
Author: Alex Khakhlyuk <[email protected]>
AuthorDate: Fri Jan 16 09:31:55 2026 -0400

    [SPARK-55047][CONNECT] Add client-side limit for local relation size
    
    ### What changes were proposed in this pull request?
    
    Currently, local relation sizes are limited using 
`spark.sql.session.localRelationSizeLimit` conf (set to 3GB by default).
    This limit is only checked on the server. The client can upload arbitrary 
large relations, e.g. 100 GB, as artifacts, the server will store them, try to 
materialize the local relation on the driver and only then throw an error if 
the relation exceeds the limit.
    
    This is bad for several reasons:
    - The driver will need to store arbitrary amount of data and can run out of 
memory or disk space causing driver failure.
    - The client wastes a lot of time uploading the data to the server only to 
fail later. This is bad UX.
    
    This PR adds a check on the client side. If the client sees that the local 
relation it serializes exceeds the limit, it will throw an AnalysisException 
with error class LOCAL_RELATION_SIZE_LIMIT_EXCEEDED and sql state 54000 
(program limit exceeded)
    
    ### Why are the changes needed?
    
    Improve driver stability when using large local relations in Spark Connect.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New integration tests for Scala and Python clients.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53815 from khakhlyuk/large-local-relations-client-side-limit.
    
    Authored-by: Alex Khakhlyuk <[email protected]>
    Signed-off-by: Herman van Hövell <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  6 +++++
 python/pyspark/errors/error-conditions.json        |  6 +++++
 python/pyspark/sql/connect/session.py              | 20 ++++++++++++++++
 .../sql/tests/connect/arrow/test_parity_arrow.py   | 27 ++++++++++++++++++++++
 .../org/apache/spark/sql/internal/SQLConf.scala    | 19 +++++++--------
 .../spark/sql/connect/SparkSessionE2ESuite.scala   | 24 +++++++++++++++++++
 .../apache/spark/sql/connect/SparkSession.scala    | 11 ++++++++-
 7 files changed, 103 insertions(+), 10 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 519a3cafd3be..22e9d7f8f536 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -4357,6 +4357,12 @@
     ],
     "sqlState" : "42601"
   },
+  "LOCAL_RELATION_SIZE_LIMIT_EXCEEDED" : {
+    "message" : [
+      "Local relation size (<actualSize> bytes) exceeds the limit (<sizeLimit> 
bytes)."
+    ],
+    "sqlState" : "54000"
+  },
   "LOCATION_ALREADY_EXISTS" : {
     "message" : [
       "Cannot name the managed table as <identifier>, as its associated 
location <location> already exists. Please pick a different table name, or 
remove the existing location first."
diff --git a/python/pyspark/errors/error-conditions.json 
b/python/pyspark/errors/error-conditions.json
index c48280e07057..7943c0992603 100644
--- a/python/pyspark/errors/error-conditions.json
+++ b/python/pyspark/errors/error-conditions.json
@@ -551,6 +551,12 @@
       "<arg1> and <arg2> should be of the same length, got <arg1_length> and 
<arg2_length>."
     ]
   },
+  "LOCAL_RELATION_SIZE_LIMIT_EXCEEDED": {
+    "message": [
+      "Local relation size (<actualSize> bytes) exceeds the limit (<sizeLimit> 
bytes)."
+    ],
+    "sqlState": "54000"
+  },
   "MALFORMED_GEOGRAPHY": {
     "message": [
       "Geography binary is malformed. Please check the data source is valid."
diff --git a/python/pyspark/sql/connect/session.py 
b/python/pyspark/sql/connect/session.py
index b323a0eac6f6..b432ea224045 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -98,6 +98,7 @@ from pyspark.sql.types import (
 )
 from pyspark.sql.utils import to_str
 from pyspark.errors import (
+    AnalysisException,
     PySparkAttributeError,
     PySparkNotImplementedError,
     PySparkRuntimeError,
@@ -536,6 +537,7 @@ class SparkSession:
             "spark.sql.timestampType",
             "spark.sql.session.timeZone",
             "spark.sql.session.localRelationCacheThreshold",
+            "spark.sql.session.localRelationSizeLimit",
             "spark.sql.session.localRelationChunkSizeRows",
             "spark.sql.session.localRelationChunkSizeBytes",
             "spark.sql.session.localRelationBatchOfChunksSizeBytes",
@@ -770,6 +772,9 @@ class SparkSession:
         cache_threshold = int(
             configs["spark.sql.session.localRelationCacheThreshold"]  # type: 
ignore[arg-type]
         )
+        local_relation_size_limit = int(
+            configs["spark.sql.session.localRelationSizeLimit"]  # type: 
ignore[arg-type]
+        )
         max_chunk_size_rows = int(
             configs["spark.sql.session.localRelationChunkSizeRows"]  # type: 
ignore[arg-type]
         )
@@ -783,6 +788,7 @@ class SparkSession:
         if cache_threshold <= _table.nbytes:
             plan = self._cache_local_relation(
                 local_relation,
+                local_relation_size_limit,
                 max_chunk_size_rows,
                 max_chunk_size_bytes,
                 max_batch_of_chunks_size_bytes,
@@ -1062,6 +1068,7 @@ class SparkSession:
     def _cache_local_relation(
         self,
         local_relation: LocalRelation,
+        local_relation_size_limit: int,
         max_chunk_size_rows: int,
         max_chunk_size_bytes: int,
         max_batch_of_chunks_size_bytes: int,
@@ -1088,10 +1095,12 @@ class SparkSession:
         hashes = []
         current_batch = []
         current_batch_size = 0
+        total_size = 0
         if has_schema:
             schema_chunk = local_relation._serialize_schema()
             current_batch.append(schema_chunk)
             current_batch_size += len(schema_chunk)
+            total_size += len(schema_chunk)
 
         data_chunks: Iterator[bytes] = local_relation._serialize_table_chunks(
             max_chunk_size_rows, min(max_chunk_size_bytes, 
max_batch_of_chunks_size_bytes)
@@ -1099,6 +1108,17 @@ class SparkSession:
 
         for chunk in data_chunks:
             chunk_size = len(chunk)
+            total_size += chunk_size
+
+            # Check if total size exceeds the limit
+            if total_size > local_relation_size_limit:
+                raise AnalysisException(
+                    errorClass="LOCAL_RELATION_SIZE_LIMIT_EXCEEDED",
+                    messageParameters={
+                        "actualSize": str(total_size),
+                        "sizeLimit": str(local_relation_size_limit),
+                    },
+                )
 
             # Check if adding this chunk would exceed batch size
             if (
diff --git a/python/pyspark/sql/tests/connect/arrow/test_parity_arrow.py 
b/python/pyspark/sql/tests/connect/arrow/test_parity_arrow.py
index 34883c147709..20ce29c93eab 100644
--- a/python/pyspark/sql/tests/connect/arrow/test_parity_arrow.py
+++ b/python/pyspark/sql/tests/connect/arrow/test_parity_arrow.py
@@ -17,6 +17,7 @@
 
 import unittest
 
+from pyspark.errors import AnalysisException
 from pyspark.sql.tests.arrow.test_arrow import ArrowTestsMixin
 from pyspark.testing.connectutils import ReusedConnectTestCase
 from pyspark.testing.pandasutils import PandasOnSparkTestUtils
@@ -84,6 +85,32 @@ class ArrowParityTests(ArrowTestsMixin, 
ReusedConnectTestCase, PandasOnSparkTest
     def test_large_cached_local_relation_same_values(self):
         self.check_large_cached_local_relation_same_values()
 
+    def test_large_local_relation_size_limit_exceeded(self):
+        import random
+        import string
+
+        conf_key = "spark.sql.session.localRelationSizeLimit"
+        original_limit = self.spark.conf.get(conf_key)
+        try:
+            new_limit = 50 * 1024 * 1024
+            self.spark.conf.set(conf_key, new_limit)
+
+            row_size = 1000
+            row_count = 64 * 1000
+            suffix = "abcdef"
+            str_value = (
+                "".join(random.choices(string.ascii_letters + string.digits, 
k=row_size)) + suffix
+            )
+            data = [(i, str_value) for i in range(row_count)]
+
+            with self.assertRaisesRegex(
+                AnalysisException, 
f"LOCAL_RELATION_SIZE_LIMIT_EXCEEDED.*{new_limit}"
+            ):
+                df = self.spark.createDataFrame(data, ["col1", "col2"])
+                df.count()
+        finally:
+            self.spark.conf.set(conf_key, original_limit)
+
     def test_toPandas_with_array_type(self):
         self.check_toPandas_with_array_type(True)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d6f595c653e0..8917c372e53d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -6248,6 +6248,16 @@ object SQLConf {
       .checkValue(_ >= 0, "The threshold of cached local relations must not be 
negative")
       .createWithDefault(1024 * 1024)
 
+  val LOCAL_RELATION_SIZE_LIMIT =
+    buildConf("spark.sql.session.localRelationSizeLimit")
+      .internal()
+      .doc("Limit on how large ChunkedCachedLocalRelation.data can be in 
bytes." +
+        "If the limit is exceeded, an exception is thrown.")
+      .version("4.1.0")
+      .longConf
+      .checkValue(_ > 0, "The local relation size in bytes must be positive")
+      .createWithDefault(3L * 1024 * 1024 * 1024)
+
   val LOCAL_RELATION_CHUNK_SIZE_ROWS =
     buildConf(SqlApiConfHelper.LOCAL_RELATION_CHUNK_SIZE_ROWS_KEY)
       .doc("The chunk size in number of rows when splitting 
ChunkedCachedLocalRelation.data " +
@@ -6281,15 +6291,6 @@ object SQLConf {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("2000MB")
 
-  val LOCAL_RELATION_SIZE_LIMIT =
-    buildConf("spark.sql.session.localRelationSizeLimit")
-      .internal()
-      .doc("Limit on how large ChunkedCachedLocalRelation.data can be in 
bytes." +
-        "If the limit is exceeded, an exception is thrown.")
-      .version("4.1.0")
-      .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("3GB")
-
   val LOCAL_RELATION_BATCH_OF_CHUNKS_SIZE_BYTES =
     buildConf(SqlApiConfHelper.LOCAL_RELATION_BATCH_OF_CHUNKS_SIZE_BYTES_KEY)
       .internal()
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
index 6678a11a80b0..f87c990bee47 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
@@ -476,4 +476,28 @@ class SparkSessionE2ESuite extends ConnectFunSuite with 
RemoteSparkSession {
       assert(!df.filter(df("_2").endsWith(suffix)).isEmpty)
     }
   }
+
+  test("large local relation size limit exceeded") {
+    // Set a low limit so we don't need to create a huge dataset
+    val conf_key = "spark.sql.session.localRelationSizeLimit"
+    val originalLimit = spark.conf.get(conf_key)
+    try {
+      val newLimit = (50 * 1024 * 1024).toString
+      spark.conf.set(conf_key, newLimit)
+      val rowSize = 1000
+      val rowCount = 64 * 1000
+      val suffix = "abcdef"
+      val str = scala.util.Random.alphanumeric.take(rowSize).mkString + suffix
+      val data = Seq.tabulate(rowCount)(i => (i, str))
+
+      val e = intercept[Exception] {
+        val df = spark.createDataFrame(data)
+        df.count()
+      }
+      assert(e.getMessage.contains("LOCAL_RELATION_SIZE_LIMIT_EXCEEDED"))
+      assert(e.getMessage.contains(newLimit))
+    } finally {
+      spark.conf.set(conf_key, originalLimit)
+    }
+  }
 }
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala
 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala
index 42dd1a2b9979..0ca34328c062 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala
@@ -45,7 +45,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.LogKeys.{CONFIG, PATH}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql
-import org.apache.spark.sql.{Column, Encoder, ExperimentalMethods, 
Observation, Row, SparkSessionBuilder, SparkSessionCompanion, 
SparkSessionExtensions}
+import org.apache.spark.sql.{AnalysisException, Column, Encoder, 
ExperimentalMethods, Observation, Row, SparkSessionBuilder, 
SparkSessionCompanion, SparkSessionExtensions}
 import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection}
 import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder}
 import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{agnosticEncoderFor, 
BoxedLongEncoder, UnboundRowEncoder}
@@ -122,6 +122,7 @@ class SparkSession private[sql] (
         val maxChunkSizeBytes = 
conf.get(SqlApiConf.LOCAL_RELATION_CHUNK_SIZE_BYTES_KEY).toInt
         val maxBatchOfChunksSize =
           
conf.get(SqlApiConf.LOCAL_RELATION_BATCH_OF_CHUNKS_SIZE_BYTES_KEY).toLong
+        val localRelationSizeLimit = 
conf.get("spark.sql.session.localRelationSizeLimit").toLong
 
         // Serialize with chunking support
         val it = ArrowSerializer.serialize(
@@ -150,6 +151,14 @@ class SparkSession private[sql] (
             totalChunks += 1
             totalSize += chunkSize
 
+            if (totalSize > localRelationSizeLimit) {
+              throw new AnalysisException(
+                errorClass = "LOCAL_RELATION_SIZE_LIMIT_EXCEEDED",
+                messageParameters = Map(
+                  "actualSize" -> totalSize.toString,
+                  "sizeLimit" -> localRelationSizeLimit.toString))
+            }
+
             // Check if adding this chunk would exceed batch size
             if (currentBatchSize + chunkSize > maxBatchOfChunksSize) {
               // Upload current batch


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to