This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 6327add8b822 [SPARK-54683][SQL] Unify geo and time types blocking
6327add8b822 is described below

commit 6327add8b82285856828759976cfe0a24958fd15
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Dec 15 11:01:18 2025 +0800

    [SPARK-54683][SQL] Unify geo and time types blocking
    
    This PR aims to refactor the code that blocks time and geo types.
    
    code unification
    
    no
    
    existing tests
    
    no
    
    Closes #53438 from cloud-fan/block.
    
    Lead-authored-by: Wenchen Fan <[email protected]>
    Co-authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 4a18179d6abcd17e07ab4fee8a22b12f3d90ef7f)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/CatalystTypeConverters.scala | 10 +++-------
 .../sql/catalyst/DeserializerBuildHelper.scala      |  2 ++
 .../spark/sql/catalyst/SerializerBuildHelper.scala  |  2 ++
 .../spark/sql/catalyst/analysis/CheckAnalysis.scala |  1 +
 .../spark/sql/catalyst/expressions/Cast.scala       | 21 ++-------------------
 .../plans/logical/v2AlterTableCommands.scala        |  2 ++
 .../apache/spark/sql/catalyst/util/TypeUtils.scala  | 13 +++++++++++++
 .../execution/SparkConnectPlanExecution.scala       | 16 +++-------------
 .../sql/execution/datasources/DataSourceUtils.scala |  6 ++----
 9 files changed, 30 insertions(+), 43 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index a90720ac5108..cdbd2d49e8b7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -25,11 +25,12 @@ import java.time.{Duration, Instant, LocalDate, 
LocalDateTime, LocalTime, Period
 import java.util.{Map => JavaMap}
 import javax.annotation.Nullable
 
+import scala.language.existentials
+
 import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.DayTimeIntervalType._
@@ -60,6 +61,7 @@ object CatalystTypeConverters {
   }
 
   private def getConverterForType(dataType: DataType): 
CatalystTypeConverter[Any, Any, Any] = {
+    TypeUtils.failUnsupportedDataType(dataType, SQLConf.get)
     val converter = dataType match {
       case udt: UserDefinedType[_] => UDTConverter(udt)
       case arrayType: ArrayType => ArrayConverter(arrayType.elementType)
@@ -68,18 +70,12 @@ object CatalystTypeConverters {
       case CharType(length) => new CharConverter(length)
       case VarcharType(length) => new VarcharConverter(length)
       case _: StringType => StringConverter
-      case _ @ (_: GeographyType | _: GeometryType) if 
!SQLConf.get.geospatialEnabled =>
-        throw new org.apache.spark.sql.AnalysisException(
-          errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
-          messageParameters = scala.collection.immutable.Map.empty)
       case g: GeographyType =>
         new GeographyConverter(g)
       case g: GeometryType =>
         new GeometryConverter(g)
       case DateType if SQLConf.get.datetimeJava8ApiEnabled => 
LocalDateConverter
       case DateType => DateConverter
-      case _: TimeType if !SQLConf.get.isTimeTypeEnabled =>
-        QueryCompilationErrors.unsupportedTimeTypeError()
       case _: TimeType => TimeConverter
       case TimestampType if SQLConf.get.datetimeJava8ApiEnabled => 
InstantConverter
       case TimestampType => TimestampConverter
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
index 85a5cf4f6b26..080794643fa0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
@@ -345,6 +345,8 @@ object DeserializerBuildHelper {
       createDeserializerForInstant(path)
     case LocalDateTimeEncoder =>
       createDeserializerForLocalDateTime(path)
+    case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
+      throw 
org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
     case LocalTimeEncoder =>
       createDeserializerForLocalTime(path)
     case UDTEncoder(udt, udtClass) =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
index 32fb859745d8..b8b2406a5813 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
@@ -367,6 +367,8 @@ object SerializerBuildHelper {
     case TimestampEncoder(false) => createSerializerForSqlTimestamp(input)
     case InstantEncoder(false) => createSerializerForJavaInstant(input)
     case LocalDateTimeEncoder => createSerializerForLocalDateTime(input)
+    case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
+      throw 
org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
     case LocalTimeEncoder => createSerializerForLocalTime(input)
     case UDTEncoder(udt, udtClass) => 
createSerializerForUserDefinedType(input, udt, udtClass)
     case OptionEncoder(valueEnc) =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 04451162276f..3b8a363e704a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -712,6 +712,7 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
             }
 
             create.tableSchema.foreach(f => 
TypeUtils.failWithIntervalType(f.dataType))
+            TypeUtils.failUnsupportedDataType(create.tableSchema, SQLConf.get)
             SchemaUtils.checkIndeterminateCollationInSchema(create.tableSchema)
 
           case write: V2WriteCommand if write.resolved =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 03b6a452fe7d..00b0a83f6d53 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import 
org.apache.spark.sql.catalyst.util.IntervalUtils.{dayTimeIntervalToByte, 
dayTimeIntervalToDecimal, dayTimeIntervalToInt, dayTimeIntervalToLong, 
dayTimeIntervalToShort, yearMonthIntervalToByte, yearMonthIntervalToInt, 
yearMonthIntervalToShort}
-import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase, 
QueryExecutionErrors}
+import org.apache.spark.sql.errors.{QueryErrorsBase, QueryExecutionErrors}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{GeographyVal, UTF8String, VariantVal}
@@ -90,12 +90,6 @@ object Cast extends QueryErrorsBase {
    *   - String <=> Binary
    */
   def canAnsiCast(from: DataType, to: DataType): Boolean = (from, to) match {
-    case (fromType, toType) if !SQLConf.get.geospatialEnabled &&
-        (isGeoSpatialType(fromType) || isGeoSpatialType(toType)) =>
-      throw new org.apache.spark.sql.AnalysisException(
-        errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
-        messageParameters = scala.collection.immutable.Map.empty)
-
     case (fromType, toType) if fromType == toType => true
 
     case (NullType, _) => true
@@ -224,12 +218,6 @@ object Cast extends QueryErrorsBase {
    * Returns true iff we can cast `from` type to `to` type.
    */
   def canCast(from: DataType, to: DataType): Boolean = (from, to) match {
-    case (fromType, toType) if !SQLConf.get.geospatialEnabled &&
-        (isGeoSpatialType(fromType) || isGeoSpatialType(toType)) =>
-      throw new org.apache.spark.sql.AnalysisException(
-        errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
-        messageParameters = scala.collection.immutable.Map.empty)
-
     case (fromType, toType) if fromType == toType => true
 
     case (NullType, _) => true
@@ -602,12 +590,7 @@ case class Cast(
   }
 
   override def checkInputDataTypes(): TypeCheckResult = {
-    dataType match {
-      // If the cast is to a TIME type, first check if TIME type is enabled.
-      case _: TimeType if !SQLConf.get.isTimeTypeEnabled =>
-        throw QueryCompilationErrors.unsupportedTimeTypeError()
-      case _ =>
-    }
+    TypeUtils.failUnsupportedDataType(dataType, SQLConf.get)
     val canCast = evalMode match {
       case EvalMode.LEGACY => Cast.canCast(child.dataType, dataType)
       case EvalMode.ANSI => Cast.canAnsiCast(child.dataType, dataType)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index 4ec8baf351cb..843ce22061d8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -112,6 +112,7 @@ case class AddColumns(
     columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
   columnsToAdd.foreach { c =>
     TypeUtils.failWithIntervalType(c.dataType)
+    TypeUtils.failUnsupportedDataType(c.dataType, conf)
   }
 
   override lazy val resolved: Boolean = table.resolved && 
columnsToAdd.forall(_.resolved)
@@ -144,6 +145,7 @@ case class ReplaceColumns(
     columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
   columnsToAdd.foreach { c =>
     TypeUtils.failWithIntervalType(c.dataType)
+    TypeUtils.failUnsupportedDataType(c.dataType, conf)
   }
 
   override lazy val resolved: Boolean = table.resolved && 
columnsToAdd.forall(_.resolved)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
index 9f89f068b756..9c5df04f9569 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql.catalyst.util
 import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, 
TypeCheckResult, TypeCoercion}
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
 import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering}
+import 
org.apache.spark.sql.catalyst.expressions.st.STExpressionUtils.isGeoSpatialType
 import org.apache.spark.sql.catalyst.types.{PhysicalDataType, 
PhysicalNumericType}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 /**
@@ -136,4 +138,15 @@ object TypeUtils extends QueryErrorsBase {
     }
     if (dataType.existsRecursively(isInterval)) f
   }
+
+  def failUnsupportedDataType(dataType: DataType, conf: SQLConf): Unit = {
+    if (!conf.isTimeTypeEnabled && 
dataType.existsRecursively(_.isInstanceOf[TimeType])) {
+      throw QueryCompilationErrors.unsupportedTimeTypeError()
+    }
+    if (!conf.geospatialEnabled && 
dataType.existsRecursively(isGeoSpatialType)) {
+      throw new org.apache.spark.sql.AnalysisException(
+        errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
+        messageParameters = scala.collection.immutable.Map.empty)
+    }
+  }
 }
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
index e3c392cd2a2c..4b12c96e977e 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
@@ -29,7 +29,7 @@ import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.ExecutePlanResponse
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.st.STExpressionUtils
+import org.apache.spark.sql.catalyst.util.TypeUtils
 import org.apache.spark.sql.classic.{DataFrame, Dataset}
 import org.apache.spark.sql.connect.common.DataTypeProtoConverter
 import 
org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
@@ -37,11 +37,10 @@ import 
org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_ARROW_MAX_BATCH
 import org.apache.spark.sql.connect.planner.{InvalidInputErrors, 
SparkConnectPlanner}
 import org.apache.spark.sql.connect.service.ExecuteHolder
 import org.apache.spark.sql.connect.utils.MetricGenerator
-import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.{DoNotCleanup, LocalTableScanExec, 
QueryExecution, RemoveShuffleFiles, SkipMigration, SQLExecution}
 import org.apache.spark.sql.execution.arrow.ArrowConverters
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StructType, TimeType}
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -128,16 +127,7 @@ private[execution] class 
SparkConnectPlanExecution(executeHolder: ExecuteHolder)
     val sessionId = executePlan.sessionHolder.sessionId
     val spark = dataframe.sparkSession
     val schema = dataframe.schema
-    val geospatialEnabled = spark.sessionState.conf.geospatialEnabled
-    if (!geospatialEnabled && 
schema.existsRecursively(STExpressionUtils.isGeoSpatialType)) {
-      throw new org.apache.spark.sql.AnalysisException(
-        errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
-        messageParameters = scala.collection.immutable.Map.empty)
-    }
-    val timeTypeEnabled = spark.sessionState.conf.isTimeTypeEnabled
-    if (!timeTypeEnabled && 
schema.existsRecursively(_.isInstanceOf[TimeType])) {
-      throw QueryCompilationErrors.unsupportedTimeTypeError()
-    }
+    TypeUtils.failUnsupportedDataType(schema, spark.sessionState.conf)
     val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
     val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
     val largeVarTypes = spark.sessionState.conf.arrowUseLargeVarTypes
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index fb5b605bab01..3fd82573f001 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{SparkException, 
SparkUpgradeException}
 import org.apache.spark.sql.{sources, SPARK_LEGACY_DATETIME_METADATA_KEY, 
SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, 
SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, Expression, ExpressionSet, PredicateHelper}
-import org.apache.spark.sql.catalyst.util.RebaseDateTime
+import org.apache.spark.sql.catalyst.util.{RebaseDateTime, TypeUtils}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
@@ -93,9 +93,7 @@ object DataSourceUtils extends PredicateHelper {
    * in a driver side.
    */
   def verifySchema(format: FileFormat, schema: StructType, readOnly: Boolean = 
false): Unit = {
-    if (!SQLConf.get.isTimeTypeEnabled && 
schema.existsRecursively(_.isInstanceOf[TimeType])) {
-      throw QueryCompilationErrors.unsupportedTimeTypeError()
-    }
+    TypeUtils.failUnsupportedDataType(schema, SQLConf.get)
     schema.foreach { field =>
       val supported = if (readOnly) {
         format.supportReadDataType(field.dataType)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to