This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 9f3df27b7806 [SPARK-54253][GEO][CONNECT][FOLLOWUP] Enforce geospatial 
support config in Spark Connect
9f3df27b7806 is described below

commit 9f3df27b7806c0725c61186ec1af5e706d0a21eb
Author: Uros Bojanic <[email protected]>
AuthorDate: Thu Dec 4 10:21:37 2025 -0800

    [SPARK-54253][GEO][CONNECT][FOLLOWUP] Enforce geospatial support config in 
Spark Connect
    
    ### What changes were proposed in this pull request?
    A new SQL config (`spark.sql.geospatial.enabled`) was introduced as part of 
https://github.com/apache/spark/pull/53009.
    
    However, the original PR didn't fully gate geospatial functionality, so 
this PR also forbids geo dataframes in Spark Connect.
    
    ### Why are the changes needed?
    Guard the geospatial feature until it's fully finished.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added appropriate unit tests to confirm that the config is effective:
    
    - `GeographyConnectDataFrameSuite`
    - `GeometryConnectDataFrameSuite`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #53259 from uros-db/geo-config_connect_tests.
    
    Authored-by: Uros Bojanic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 38d601076c1ae856cb56c5888abe3d1e3c4329c2)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/GeographyConnectDataFrameSuite.scala  | 21 +++++++++++++++++++++
 .../spark/sql/GeometryConnectDataFrameSuite.scala   | 21 +++++++++++++++++++++
 .../execution/SparkConnectPlanExecution.scala       |  7 +++++++
 3 files changed, 49 insertions(+)

diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/GeographyConnectDataFrameSuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/GeographyConnectDataFrameSuite.scala
index 2016a84ac5a3..f3f52366df66 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/GeographyConnectDataFrameSuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/GeographyConnectDataFrameSuite.scala
@@ -103,4 +103,25 @@ class GeographyConnectDataFrameSuite extends QueryTest 
with RemoteSparkSession {
     val expectedGeog = Geography.fromWKB(point, 4326)
     checkAnswer(df, Seq(Row(expectedGeog)))
   }
+
+  test("geospatial feature disabled") {
+    withSQLConf("spark.sql.geospatial.enabled" -> "false") {
+      val geography = Geography.fromWKB(point1, 4326)
+      val schema = StructType(Seq(StructField("col1", GeographyType(4326))))
+      // Java List[Row] + schema.
+      val javaList = java.util.Arrays.asList(Row(geography))
+      checkError(
+        exception = intercept[AnalysisException] {
+          spark.createDataFrame(javaList, schema).collect()
+        },
+        condition = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED")
+      // Implicit encoder path.
+      import testImplicits._
+      checkError(
+        exception = intercept[AnalysisException] {
+          Seq(geography).toDF("g").collect()
+        },
+        condition = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED")
+    }
+  }
 }
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/GeometryConnectDataFrameSuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/GeometryConnectDataFrameSuite.scala
index 1450ac54184b..b66c8a6a3d78 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/GeometryConnectDataFrameSuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/GeometryConnectDataFrameSuite.scala
@@ -109,4 +109,25 @@ class GeometryConnectDataFrameSuite extends QueryTest with 
RemoteSparkSession {
     val expectedGeom = Geometry.fromWKB(point, 0)
     checkAnswer(df, Seq(Row(expectedGeom)))
   }
+
+  test("geospatial feature disabled") {
+    withSQLConf("spark.sql.geospatial.enabled" -> "false") {
+      val geometry = Geometry.fromWKB(point1, 0)
+      val schema = StructType(Seq(StructField("col1", GeometryType(0))))
+      // Java List[Row] + schema.
+      val javaList = java.util.Arrays.asList(Row(geometry))
+      checkError(
+        exception = intercept[AnalysisException] {
+          spark.createDataFrame(javaList, schema).collect()
+        },
+        condition = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED")
+      // Implicit encoder path.
+      import testImplicits._
+      checkError(
+        exception = intercept[AnalysisException] {
+          Seq(geometry).toDF("g").collect()
+        },
+        condition = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED")
+    }
+  }
 }
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
index f5cb2696d849..dc20af8e3700 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
@@ -29,6 +29,7 @@ import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.ExecutePlanResponse
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.st.STExpressionUtils
 import org.apache.spark.sql.classic.{DataFrame, Dataset}
 import org.apache.spark.sql.connect.common.DataTypeProtoConverter
 import 
org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
@@ -126,6 +127,12 @@ private[execution] class 
SparkConnectPlanExecution(executeHolder: ExecuteHolder)
     val sessionId = executePlan.sessionHolder.sessionId
     val spark = dataframe.sparkSession
     val schema = dataframe.schema
+    val geospatialEnabled = spark.sessionState.conf.geospatialEnabled
+    if (!geospatialEnabled && 
schema.existsRecursively(STExpressionUtils.isGeoSpatialType)) {
+      throw new org.apache.spark.sql.AnalysisException(
+        errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
+        messageParameters = scala.collection.immutable.Map.empty)
+    }
     val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
     val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
     val largeVarTypes = spark.sessionState.conf.arrowUseLargeVarTypes


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to