This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 6327add8b822 [SPARK-54683][SQL] Unify geo and time types blocking
6327add8b822 is described below
commit 6327add8b82285856828759976cfe0a24958fd15
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Dec 15 11:01:18 2025 +0800
[SPARK-54683][SQL] Unify geo and time types blocking
This PR aims to refactor the code that blocks time and geo types.
code unification
no
existing tests
no
Closes #53438 from cloud-fan/block.
Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 4a18179d6abcd17e07ab4fee8a22b12f3d90ef7f)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/CatalystTypeConverters.scala | 10 +++-------
.../sql/catalyst/DeserializerBuildHelper.scala | 2 ++
.../spark/sql/catalyst/SerializerBuildHelper.scala | 2 ++
.../spark/sql/catalyst/analysis/CheckAnalysis.scala | 1 +
.../spark/sql/catalyst/expressions/Cast.scala | 21 ++-------------------
.../plans/logical/v2AlterTableCommands.scala | 2 ++
.../apache/spark/sql/catalyst/util/TypeUtils.scala | 13 +++++++++++++
.../execution/SparkConnectPlanExecution.scala | 16 +++-------------
.../sql/execution/datasources/DataSourceUtils.scala | 6 ++----
9 files changed, 30 insertions(+), 43 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index a90720ac5108..cdbd2d49e8b7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -25,11 +25,12 @@ import java.time.{Duration, Instant, LocalDate,
LocalDateTime, LocalTime, Period
import java.util.{Map => JavaMap}
import javax.annotation.Nullable
+import scala.language.existentials
+
import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType._
@@ -60,6 +61,7 @@ object CatalystTypeConverters {
}
private def getConverterForType(dataType: DataType):
CatalystTypeConverter[Any, Any, Any] = {
+ TypeUtils.failUnsupportedDataType(dataType, SQLConf.get)
val converter = dataType match {
case udt: UserDefinedType[_] => UDTConverter(udt)
case arrayType: ArrayType => ArrayConverter(arrayType.elementType)
@@ -68,18 +70,12 @@ object CatalystTypeConverters {
case CharType(length) => new CharConverter(length)
case VarcharType(length) => new VarcharConverter(length)
case _: StringType => StringConverter
- case _ @ (_: GeographyType | _: GeometryType) if
!SQLConf.get.geospatialEnabled =>
- throw new org.apache.spark.sql.AnalysisException(
- errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
- messageParameters = scala.collection.immutable.Map.empty)
case g: GeographyType =>
new GeographyConverter(g)
case g: GeometryType =>
new GeometryConverter(g)
case DateType if SQLConf.get.datetimeJava8ApiEnabled =>
LocalDateConverter
case DateType => DateConverter
- case _: TimeType if !SQLConf.get.isTimeTypeEnabled =>
- QueryCompilationErrors.unsupportedTimeTypeError()
case _: TimeType => TimeConverter
case TimestampType if SQLConf.get.datetimeJava8ApiEnabled =>
InstantConverter
case TimestampType => TimestampConverter
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
index 85a5cf4f6b26..080794643fa0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
@@ -345,6 +345,8 @@ object DeserializerBuildHelper {
createDeserializerForInstant(path)
case LocalDateTimeEncoder =>
createDeserializerForLocalDateTime(path)
+ case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
+ throw
org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
case LocalTimeEncoder =>
createDeserializerForLocalTime(path)
case UDTEncoder(udt, udtClass) =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
index 32fb859745d8..b8b2406a5813 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
@@ -367,6 +367,8 @@ object SerializerBuildHelper {
case TimestampEncoder(false) => createSerializerForSqlTimestamp(input)
case InstantEncoder(false) => createSerializerForJavaInstant(input)
case LocalDateTimeEncoder => createSerializerForLocalDateTime(input)
+ case LocalTimeEncoder if !SQLConf.get.isTimeTypeEnabled =>
+ throw
org.apache.spark.sql.errors.QueryCompilationErrors.unsupportedTimeTypeError()
case LocalTimeEncoder => createSerializerForLocalTime(input)
case UDTEncoder(udt, udtClass) =>
createSerializerForUserDefinedType(input, udt, udtClass)
case OptionEncoder(valueEnc) =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 04451162276f..3b8a363e704a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -712,6 +712,7 @@ trait CheckAnalysis extends LookupCatalog with
QueryErrorsBase with PlanToString
}
create.tableSchema.foreach(f =>
TypeUtils.failWithIntervalType(f.dataType))
+ TypeUtils.failUnsupportedDataType(create.tableSchema, SQLConf.get)
SchemaUtils.checkIndeterminateCollationInSchema(create.tableSchema)
case write: V2WriteCommand if write.resolved =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 03b6a452fe7d..00b0a83f6d53 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import
org.apache.spark.sql.catalyst.util.IntervalUtils.{dayTimeIntervalToByte,
dayTimeIntervalToDecimal, dayTimeIntervalToInt, dayTimeIntervalToLong,
dayTimeIntervalToShort, yearMonthIntervalToByte, yearMonthIntervalToInt,
yearMonthIntervalToShort}
-import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase,
QueryExecutionErrors}
+import org.apache.spark.sql.errors.{QueryErrorsBase, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{GeographyVal, UTF8String, VariantVal}
@@ -90,12 +90,6 @@ object Cast extends QueryErrorsBase {
* - String <=> Binary
*/
def canAnsiCast(from: DataType, to: DataType): Boolean = (from, to) match {
- case (fromType, toType) if !SQLConf.get.geospatialEnabled &&
- (isGeoSpatialType(fromType) || isGeoSpatialType(toType)) =>
- throw new org.apache.spark.sql.AnalysisException(
- errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
- messageParameters = scala.collection.immutable.Map.empty)
-
case (fromType, toType) if fromType == toType => true
case (NullType, _) => true
@@ -224,12 +218,6 @@ object Cast extends QueryErrorsBase {
* Returns true iff we can cast `from` type to `to` type.
*/
def canCast(from: DataType, to: DataType): Boolean = (from, to) match {
- case (fromType, toType) if !SQLConf.get.geospatialEnabled &&
- (isGeoSpatialType(fromType) || isGeoSpatialType(toType)) =>
- throw new org.apache.spark.sql.AnalysisException(
- errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
- messageParameters = scala.collection.immutable.Map.empty)
-
case (fromType, toType) if fromType == toType => true
case (NullType, _) => true
@@ -602,12 +590,7 @@ case class Cast(
}
override def checkInputDataTypes(): TypeCheckResult = {
- dataType match {
- // If the cast is to a TIME type, first check if TIME type is enabled.
- case _: TimeType if !SQLConf.get.isTimeTypeEnabled =>
- throw QueryCompilationErrors.unsupportedTimeTypeError()
- case _ =>
- }
+ TypeUtils.failUnsupportedDataType(dataType, SQLConf.get)
val canCast = evalMode match {
case EvalMode.LEGACY => Cast.canCast(child.dataType, dataType)
case EvalMode.ANSI => Cast.canAnsiCast(child.dataType, dataType)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index 4ec8baf351cb..843ce22061d8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -112,6 +112,7 @@ case class AddColumns(
columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
columnsToAdd.foreach { c =>
TypeUtils.failWithIntervalType(c.dataType)
+ TypeUtils.failUnsupportedDataType(c.dataType, conf)
}
override lazy val resolved: Boolean = table.resolved &&
columnsToAdd.forall(_.resolved)
@@ -144,6 +145,7 @@ case class ReplaceColumns(
columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
columnsToAdd.foreach { c =>
TypeUtils.failWithIntervalType(c.dataType)
+ TypeUtils.failUnsupportedDataType(c.dataType, conf)
}
override lazy val resolved: Boolean = table.resolved &&
columnsToAdd.forall(_.resolved)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
index 9f89f068b756..9c5df04f9569 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt,
TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering}
+import
org.apache.spark.sql.catalyst.expressions.st.STExpressionUtils.isGeoSpatialType
import org.apache.spark.sql.catalyst.types.{PhysicalDataType,
PhysicalNumericType}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
/**
@@ -136,4 +138,15 @@ object TypeUtils extends QueryErrorsBase {
}
if (dataType.existsRecursively(isInterval)) f
}
+
+ def failUnsupportedDataType(dataType: DataType, conf: SQLConf): Unit = {
+ if (!conf.isTimeTypeEnabled &&
dataType.existsRecursively(_.isInstanceOf[TimeType])) {
+ throw QueryCompilationErrors.unsupportedTimeTypeError()
+ }
+ if (!conf.geospatialEnabled &&
dataType.existsRecursively(isGeoSpatialType)) {
+ throw new org.apache.spark.sql.AnalysisException(
+ errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
+ messageParameters = scala.collection.immutable.Map.empty)
+ }
+ }
}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
index e3c392cd2a2c..4b12c96e977e 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
@@ -29,7 +29,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.st.STExpressionUtils
+import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.classic.{DataFrame, Dataset}
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import
org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
@@ -37,11 +37,10 @@ import
org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_ARROW_MAX_BATCH
import org.apache.spark.sql.connect.planner.{InvalidInputErrors,
SparkConnectPlanner}
import org.apache.spark.sql.connect.service.ExecuteHolder
import org.apache.spark.sql.connect.utils.MetricGenerator
-import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.{DoNotCleanup, LocalTableScanExec,
QueryExecution, RemoveShuffleFiles, SkipMigration, SQLExecution}
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StructType, TimeType}
+import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.ThreadUtils
/**
@@ -128,16 +127,7 @@ private[execution] class
SparkConnectPlanExecution(executeHolder: ExecuteHolder)
val sessionId = executePlan.sessionHolder.sessionId
val spark = dataframe.sparkSession
val schema = dataframe.schema
- val geospatialEnabled = spark.sessionState.conf.geospatialEnabled
- if (!geospatialEnabled &&
schema.existsRecursively(STExpressionUtils.isGeoSpatialType)) {
- throw new org.apache.spark.sql.AnalysisException(
- errorClass = "UNSUPPORTED_FEATURE.GEOSPATIAL_DISABLED",
- messageParameters = scala.collection.immutable.Map.empty)
- }
- val timeTypeEnabled = spark.sessionState.conf.isTimeTypeEnabled
- if (!timeTypeEnabled &&
schema.existsRecursively(_.isInstanceOf[TimeType])) {
- throw QueryCompilationErrors.unsupportedTimeTypeError()
- }
+ TypeUtils.failUnsupportedDataType(schema, spark.sessionState.conf)
val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
val largeVarTypes = spark.sessionState.conf.arrowUseLargeVarTypes
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index fb5b605bab01..3fd82573f001 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{SparkException,
SparkUpgradeException}
import org.apache.spark.sql.{sources, SPARK_LEGACY_DATETIME_METADATA_KEY,
SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY,
SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, Expression, ExpressionSet, PredicateHelper}
-import org.apache.spark.sql.catalyst.util.RebaseDateTime
+import org.apache.spark.sql.catalyst.util.{RebaseDateTime, TypeUtils}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
@@ -93,9 +93,7 @@ object DataSourceUtils extends PredicateHelper {
* in a driver side.
*/
def verifySchema(format: FileFormat, schema: StructType, readOnly: Boolean =
false): Unit = {
- if (!SQLConf.get.isTimeTypeEnabled &&
schema.existsRecursively(_.isInstanceOf[TimeType])) {
- throw QueryCompilationErrors.unsupportedTimeTypeError()
- }
+ TypeUtils.failUnsupportedDataType(schema, SQLConf.get)
schema.foreach { field =>
val supported = if (readOnly) {
format.supportReadDataType(field.dataType)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]