This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ff9feb31f08 [SPARK-53760][GEO][SQL] Introduce GeometryType and 
GeographyType
8ff9feb31f08 is described below

commit 8ff9feb31f0898f593c57b6617ebbe68fd56fae8
Author: Uros Bojanic <[email protected]>
AuthorDate: Wed Oct 15 11:09:40 2025 +0800

    [SPARK-53760][GEO][SQL] Introduce GeometryType and GeographyType
    
    ### What changes were proposed in this pull request?
    Introduce two new logical types to Spark:
    - `GeographyType`
    - `GeometryType`
    
    This PR also adds appropriate SQL parsing and JSON serialization logic for 
the new types.
    
    ### Why are the changes needed?
    Kicking off https://issues.apache.org/jira/browse/SPARK-51658 by adding 
GEOMETRY and GEOGRAPHY types to Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, two new logical data types are added as `Experimental`.
    
    ### How was this patch tested?
    Added new tests to:
    - `GeographyTypeSuite`
    - `GeometryTypeSuite`
    
    Also, added appropriate test cases to:
    - `DataTypeSuite`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #52491 from uros-db/geo-logical-types.
    
    Authored-by: Uros Bojanic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  18 ++
 docs/sql-ref-ansi-compliance.md                    |   2 +
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4      |   2 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |   6 +
 .../sql/catalyst/parser/DataTypeAstBuilder.scala   |  26 +-
 .../types/SpatialReferenceSystemMapper.java        |  75 ++++++
 .../org/apache/spark/sql/types/DataType.scala      |  14 ++
 .../org/apache/spark/sql/types/GeographyType.scala | 266 +++++++++++++++++++++
 .../org/apache/spark/sql/types/GeometryType.scala  | 203 ++++++++++++++++
 .../org/apache/spark/sql/types/SpatialType.scala   |  30 +++
 .../org/apache/spark/sql/types/DataTypeSuite.scala |  20 ++
 .../sql-tests/results/keywords-enforced.sql.out    |   2 +
 .../resources/sql-tests/results/keywords.sql.out   |   2 +
 .../sql-tests/results/nonansi/keywords.sql.out     |   2 +
 .../types/SpatialReferenceSystemMapperSuite.java   |  54 +++++
 .../spark/sql/types/GeographyTypeSuite.scala       | 219 +++++++++++++++++
 .../apache/spark/sql/types/GeometryTypeSuite.scala | 203 ++++++++++++++++
 .../ThriftServerWithSparkContextSuite.scala        |   2 +-
 18 files changed, 1144 insertions(+), 2 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index a53992a85187..5f1d3d16d379 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5541,6 +5541,24 @@
     ],
     "sqlState" : "2201E"
   },
+  "ST_INVALID_ALGORITHM_VALUE" : {
+    "message" : [
+      "Invalid or unsupported edge interpolation algorithm value: '<alg>'."
+    ],
+    "sqlState" : "22023"
+  },
+  "ST_INVALID_CRS_VALUE" : {
+    "message" : [
+      "Invalid or unsupported CRS (coordinate reference system) value: 
'<crs>'."
+    ],
+    "sqlState" : "22023"
+  },
+  "ST_INVALID_SRID_VALUE" : {
+    "message" : [
+      "Invalid or unsupported SRID (spatial reference identifier) value: 
<srid>"
+    ],
+    "sqlState" : "22023"
+  },
   "SUM_OF_LIMIT_AND_OFFSET_EXCEEDS_MAX_INT" : {
     "message" : [
       "The sum of the LIMIT clause and the OFFSET clause must not be greater 
than the maximum 32-bit integer value (2,147,483,647) but found limit = 
<limit>, offset = <offset>."
diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index 8d2c13beff97..c82691ef4ee2 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -549,6 +549,8 @@ Below is a list of all the keywords in Spark SQL.
 |FUNCTION|non-reserved|non-reserved|reserved|
 |FUNCTIONS|non-reserved|non-reserved|non-reserved|
 |GENERATED|non-reserved|non-reserved|non-reserved|
+|GEOGRAPHY|non-reserved|non-reserved|non-reserved|
+|GEOMETRY|non-reserved|non-reserved|non-reserved|
 |GLOBAL|non-reserved|non-reserved|reserved|
 |GRANT|reserved|non-reserved|reserved|
 |GROUP|reserved|non-reserved|reserved|
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index e402067926f2..461af320097b 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -257,6 +257,8 @@ FULL: 'FULL';
 FUNCTION: 'FUNCTION';
 FUNCTIONS: 'FUNCTIONS';
 GENERATED: 'GENERATED';
+GEOGRAPHY: 'GEOGRAPHY';
+GEOMETRY: 'GEOMETRY';
 GLOBAL: 'GLOBAL';
 GRANT: 'GRANT';
 GROUP: 'GROUP';
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 8efab99d4ec8..ace8c9225a04 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -1340,6 +1340,8 @@ nonTrivialPrimitiveType
          fromDayTime=(DAY | HOUR | MINUTE | SECOND) (TO to=(HOUR | MINUTE | 
SECOND))?)?
     | TIMESTAMP (WITHOUT TIME ZONE)?
     | TIME (LEFT_PAREN precision=INTEGER_VALUE RIGHT_PAREN)? (WITHOUT TIME 
ZONE)?
+    | GEOGRAPHY (LEFT_PAREN srid=(INTEGER_VALUE | ANY) RIGHT_PAREN)
+    | GEOMETRY (LEFT_PAREN srid=(INTEGER_VALUE | ANY) RIGHT_PAREN)
     ;
 
 trivialPrimitiveType
@@ -1832,6 +1834,8 @@ ansiNonReserved
     | FUNCTION
     | FUNCTIONS
     | GENERATED
+    | GEOGRAPHY
+    | GEOMETRY
     | GLOBAL
     | GROUPING
     | HANDLER
@@ -2210,6 +2214,8 @@ nonReserved
     | FUNCTION
     | FUNCTIONS
     | GENERATED
+    | GEOGRAPHY
+    | GEOMETRY
     | GLOBAL
     | GRANT
     | GROUP
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala
index beb7061a841a..09d2c81eceb5 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala
@@ -30,7 +30,7 @@ import 
org.apache.spark.sql.catalyst.util.SparkParserUtils.{string, withOrigin}
 import org.apache.spark.sql.connector.catalog.IdentityColumnSpec
 import org.apache.spark.sql.errors.QueryParsingErrors
 import org.apache.spark.sql.internal.SqlApiConf
-import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteType, CalendarIntervalType, CharType, DataType, DateType, 
DayTimeIntervalType, DecimalType, DoubleType, FloatType, IntegerType, LongType, 
MapType, MetadataBuilder, NullType, ShortType, StringType, StructField, 
StructType, TimestampNTZType, TimestampType, TimeType, VarcharType, 
VariantType, YearMonthIntervalType}
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteType, CalendarIntervalType, CharType, DataType, DateType, 
DayTimeIntervalType, DecimalType, DoubleType, FloatType, GeographyType, 
GeometryType, IntegerType, LongType, MapType, MetadataBuilder, NullType, 
ShortType, StringType, StructField, StructType, TimestampNTZType, 
TimestampType, TimeType, VarcharType, VariantType, YearMonthIntervalType}
 
 class DataTypeAstBuilder extends SqlBaseParserBaseVisitor[AnyRef] {
   protected def typedVisit[T](ctx: ParseTree): T = {
@@ -118,6 +118,30 @@ class DataTypeAstBuilder extends 
SqlBaseParserBaseVisitor[AnyRef] {
             currentCtx.precision.getText.toInt
           }
           TimeType(precision)
+        case GEOGRAPHY =>
+          // Unparameterized geometry type isn't supported and will be caught 
by the default branch.
+          // Here, we only handle the parameterized GEOGRAPHY type syntax, 
which comes in two forms:
+          if (currentCtx.srid.getText.toLowerCase(Locale.ROOT) == "any") {
+            // The special parameterized GEOGRAPHY type syntax uses a single 
"ANY" string value.
+            // This implies a mixed GEOGRAPHY type, with potentially different 
SRIDs across rows.
+            GeographyType("ANY")
+          } else {
+            // The explicitly parameterzied GEOGRAPHY syntax uses a specified 
integer SRID value.
+            // This implies a fixed GEOGRAPHY type, with a single fixed SRID 
value across all rows.
+            GeographyType(currentCtx.srid.getText.toInt)
+          }
+        case GEOMETRY =>
+          // Unparameterized geometry type isn't supported and will be caught 
by the default branch.
+          // Here, we only handle the parameterized GEOMETRY type syntax, 
which comes in two forms:
+          if (currentCtx.srid.getText.toLowerCase(Locale.ROOT) == "any") {
+            // The special parameterized GEOMETRY type syntax uses a single 
"ANY" string value.
+            // This implies a mixed GEOMETRY type, with potentially different 
SRIDs across rows.
+            GeometryType("ANY")
+          } else {
+            // The explicitly parameterzied GEOMETRY type syntax has a single 
integer SRID value.
+            // This implies a fixed GEOMETRY type, with a single fixed SRID 
value across all rows.
+            GeometryType(currentCtx.srid.getText.toInt)
+          }
       }
     } else if (typeCtx.trivialPrimitiveType != null) {
       // This is a primitive type without parameters, e.g. BOOLEAN, TINYINT, 
etc.
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/internal/types/SpatialReferenceSystemMapper.java
 
b/sql/api/src/main/scala/org/apache/spark/sql/internal/types/SpatialReferenceSystemMapper.java
new file mode 100644
index 000000000000..16106b552a6d
--- /dev/null
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/internal/types/SpatialReferenceSystemMapper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.types;
+
+import java.util.HashMap;
+
+/*
+ * Class for maintaining mappings between supported SRID values and the string 
ID of the
+ * corresponding CRS.
+ */
+public class SpatialReferenceSystemMapper {
+
+  // We implement this class as a singleton (we disallow construction).
+  private SpatialReferenceSystemMapper() {}
+
+  private static final SpatialReferenceSystemMapper Instance = new 
SpatialReferenceSystemMapper();
+
+  // Returns the unique instance of this class.
+  public static SpatialReferenceSystemMapper get() {
+    return Instance;
+  }
+
+  // Hash maps defining the mappings to/from SRID and string ID for a CRS.
+  private static final HashMap<Integer, String> sridToStringId = 
buildSridToStringIdMap();
+  private static final HashMap<String, Integer> stringIdToSrid = 
buildStringIdToSridMap();
+
+  // Returns the string ID corresponding to the input SRID. If the input SRID 
is not supported,
+  // `null` is returned.
+  public String getStringId(int srid) {
+    return sridToStringId.get(srid);
+  }
+
+  // Returns the SRID corresponding to the input string ID. If the input 
string ID is not
+  // supported, `null` is returned.
+  public Integer getSrid(String stringId) {
+    return stringIdToSrid.get(stringId);
+  }
+
+  // Currently, we only support a limited set of SRID / CRS mappings. However, 
we will soon extend
+  // this to support all the SRIDs supported by relevant authorities and 
libraries. The methods
+  // below will be updated accordingly, in order to populate the mappings with 
more complete data.
+
+  // Helper method for building the SRID-to-string-ID mapping.
+  private static HashMap<Integer, String> buildSridToStringIdMap() {
+    HashMap<Integer, String> map = new HashMap<>();
+    map.put(0, "SRID:0"); // Unspecified
+    map.put(3857, "EPSG:3857"); // Web Mercator
+    map.put(4326, "OGC:CRS84"); // WGS84
+    return map;
+  }
+
+  // Helper method for building the string-ID-to-SRID mapping.
+  private static HashMap<String, Integer> buildStringIdToSridMap() {
+    HashMap<String, Integer> map = new HashMap<>();
+    map.put("SRID:0", 0); // Unspecified
+    map.put("EPSG:3857", 3857); // Web Mercator
+    map.put("OGC:CRS84", 4326); // WGS84
+    return map;
+  }
+}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 3ecc84a1578a..48a6514440dd 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -127,6 +127,10 @@ object DataType {
   private val CHAR_TYPE = """char\(\s*(\d+)\s*\)""".r
   private val VARCHAR_TYPE = """varchar\(\s*(\d+)\s*\)""".r
   private val STRING_WITH_COLLATION = """string\s+collate\s+(\w+)""".r
+  private val GEOMETRY_TYPE = """geometry\(\s*([\w]+:-?[\w]+)\s*\)""".r
+  private val GEOGRAPHY_TYPE_CRS = """geography\(\s*(\w+:-?\w+)\s*\)""".r
+  private val GEOGRAPHY_TYPE_ALG = """geography\(\s*(\w+)\s*\)""".r
+  private val GEOGRAPHY_TYPE_CRS_ALG = 
"""geography\(\s*(\w+:-?\w+)\s*,\s*(\w+)\s*\)""".r
 
   val COLLATIONS_METADATA_KEY = "__COLLATIONS"
 
@@ -217,6 +221,16 @@ object DataType {
       case CHAR_TYPE(length) => CharType(length.toInt)
       case VARCHAR_TYPE(length) => VarcharType(length.toInt)
       case STRING_WITH_COLLATION(collation) => StringType(collation)
+      // If the coordinate reference system (CRS) value is omitted, Parquet 
and other storage
+      // formats (Delta, Iceberg) consider "OGC:CRS84" to be the default value 
of the crs.
+      case "geometry" => GeometryType(GeometryType.GEOMETRY_DEFAULT_CRS)
+      case GEOMETRY_TYPE(crs) => GeometryType(crs)
+      case "geography" => GeographyType(GeographyType.GEOGRAPHY_DEFAULT_CRS)
+      case GEOGRAPHY_TYPE_CRS(crs) =>
+        GeographyType(crs, GeographyType.GEOGRAPHY_DEFAULT_ALGORITHM)
+      case GEOGRAPHY_TYPE_ALG(alg) =>
+        GeographyType(GeographyType.GEOGRAPHY_DEFAULT_CRS, alg)
+      case GEOGRAPHY_TYPE_CRS_ALG(crs, alg) => GeographyType(crs, alg)
       // For backwards compatibility, previously the type name of NullType is 
"null"
       case "null" => NullType
       case "timestamp_ltz" => TimestampType
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/types/GeographyType.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/types/GeographyType.scala
new file mode 100644
index 000000000000..b5a6517425a0
--- /dev/null
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/GeographyType.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonAST.{JString, JValue}
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.annotation.Experimental
+
+/**
+ * The data type representing GEOGRAPHY values which are spatial objects, as 
defined in the Open
+ * Geospatial Consortium (OGC) Simple Feature Access specification
+ * (https://portal.ogc.org/files/?artifact_id=25355), with a geographic 
coordinate system.
+ */
+@Experimental
+class GeographyType private (val crs: String, val algorithm: 
EdgeInterpolationAlgorithm)
+    extends AtomicType
+    with Serializable {
+
+  /**
+   * Spatial Reference Identifier (SRID) value of the geography type.
+   */
+  val srid: Int = GeographyType.toSrid(crs)
+
+  /**
+   * The default size of a value of the GeographyType is 2048 bytes, which can 
store roughly 120
+   * 2D points.
+   */
+  override def defaultSize: Int = 2048
+
+  /**
+   * The GeographyType is a mixed SRID type iff the SRID is MIXED_SRID. 
Semantically, this means
+   * that different SRID values per row are allowed.
+   */
+  def isMixedSrid: Boolean = srid == GeographyType.MIXED_SRID
+
+  /**
+   * Type name that is displayed to users.
+   */
+  override def typeName: String = {
+    if (isMixedSrid) {
+      // The mixed SRID type is displayed with a special specifier value "ANY".
+      "geography(any)"
+    } else {
+      // The fixed SRID type is always displayed with the appropriate SRID 
value.
+      s"geography($srid)"
+    }
+  }
+
+  /**
+   * String representation of the GeographyType, which uses SRID for fixed 
SRID types and "ANY"
+   * for mixed SRID types, providing a clear and concise user-friendly format 
for this type.
+   */
+  override def toString: String = {
+    if (isMixedSrid) {
+      // The mixed SRID type is displayed with a special specifier value "ANY".
+      "GeographyType(ANY)"
+    } else {
+      // The fixed SRID type is always displayed with the appropriate SRID 
value.
+      s"GeographyType($srid)"
+    }
+  }
+
+  /**
+   * JSON representation of the GeographyType, which uses the CRS string and 
edge interpolation
+   * algorithm string, in line with the current storage specifications (e.g. 
Parquet, Delta,
+   * Iceberg). Note that mixed SRID is disallowed, and only fixed SRID types 
can be stored. This
+   * is also in accordance to storage formats.
+   */
+  override def jsonValue: JValue = JString(s"geography($crs, $algorithm)")
+
+  private[spark] override def asNullable: GeographyType = this
+
+  /**
+   * Two types are considered equal iff they are both GeographyTypes and have 
the same type info.
+   * For the GEOGRAPHY type, the SRID value and algorithm uniquely identify 
its type information.
+   */
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case g: GeographyType =>
+        // Iff two GeographyTypes have the same SRID and algorithm, they are 
considered equal.
+        g.srid == srid && g.algorithm == algorithm
+      case _ =>
+        // In all other cases, the two types are considered not equal.
+        false
+    }
+  }
+
+  /**
+   * The hash code of the GeographyType is derived from its SRID value.
+   */
+  override def hashCode(): Int = srid.hashCode
+
+  /**
+   * The GeographyType can only accept another type if the other type is also 
a GeographyType, and
+   * the SRID values are compatible (see `acceptsGeographyType` below for more 
details).
+   */
+  override private[sql] def acceptsType(other: DataType): Boolean = {
+    other match {
+      case gt: GeographyType =>
+        // For GeographyType, we need to check the SRID values.
+        acceptsGeographyType(gt)
+      case _ =>
+        // In all other cases, the two types are considered different.
+        false
+    }
+  }
+
+  /**
+   * The GeographyType with mixed SRID can accept any other GeographyType, 
i.e. either a fixed
+   * SRID GeographyType or another mixed SRID GeographyType. Conversely, a 
GeographyType with
+   * fixed SRID can only accept another GeographyType with the same fixed SRID 
value, and not a
+   * mixed SRID.
+   */
+  def acceptsGeographyType(gt: GeographyType): Boolean = {
+    // If the SRID is mixed, we can accept any other GeographyType.
+    // If the SRID is not mixed, we can only accept the same SRID.
+    isMixedSrid || gt.srid == srid
+  }
+}
+
+@Experimental
+object GeographyType extends SpatialType {
+
+  /**
+   * Default CRS value for GeographyType depends on storage specification. 
Parquet and Iceberg use
+   * OGC:CRS84, which translates to SRID 4326 here.
+   */
+  final val GEOGRAPHY_DEFAULT_SRID = 4326
+  final val GEOGRAPHY_DEFAULT_CRS = "OGC:CRS84"
+
+  // The default edge interpolation algorithm value for GeographyType.
+  final val GEOGRAPHY_DEFAULT_ALGORITHM = EdgeInterpolationAlgorithm.SPHERICAL
+
+  // Another way to represent the default parquet crs value (OGC:CRS84).
+  final val GEOGRAPHY_DEFAULT_EPSG_CRS = s"EPSG:$GEOGRAPHY_DEFAULT_SRID"
+
+  /**
+   * The default concrete GeographyType in SQL.
+   */
+  private final val GEOGRAPHY_MIXED_TYPE: GeographyType =
+    GeographyType(MIXED_CRS, GEOGRAPHY_DEFAULT_ALGORITHM)
+
+  /**
+   * Constructors for GeographyType.
+   */
+  def apply(srid: Int): GeographyType = {
+    if (!isValidSrid(srid)) {
+      // Limited geographic SRID values are allowed.
+      throw new SparkIllegalArgumentException(
+        errorClass = "ST_INVALID_SRID_VALUE",
+        messageParameters = Map("srid" -> srid.toString))
+    }
+    new GeographyType(GEOGRAPHY_DEFAULT_CRS, GEOGRAPHY_DEFAULT_ALGORITHM)
+  }
+
+  def apply(crs: String): GeographyType = {
+    crs match {
+      case "ANY" =>
+        // Special value "ANY" is used for mixed SRID values.
+        // This should be available to users in the Scala API.
+        new GeographyType(MIXED_CRS, GEOGRAPHY_DEFAULT_ALGORITHM)
+      case _ =>
+        // Otherwise, we need to further check the CRS value.
+        // This shouldn't be available to users in the Scala API.
+        GeographyType(crs, GEOGRAPHY_DEFAULT_ALGORITHM.toString)
+    }
+  }
+
+  def apply(crs: String, algorithm: String): GeographyType = {
+    EdgeInterpolationAlgorithm.fromString(algorithm) match {
+      case Some(alg) => GeographyType(crs, alg)
+      case None =>
+        throw new SparkIllegalArgumentException(
+          errorClass = "ST_INVALID_ALGORITHM_VALUE",
+          messageParameters = Map("alg" -> algorithm))
+    }
+  }
+
+  def apply(crs: String, algorithm: EdgeInterpolationAlgorithm): GeographyType 
= {
+    if (!isValidCrs(crs)) {
+      // Limited geographic CRS values are allowed.
+      throw new SparkIllegalArgumentException(
+        errorClass = "ST_INVALID_CRS_VALUE",
+        messageParameters = Map("crs" -> crs))
+    }
+    new GeographyType(crs, algorithm)
+  }
+
+  /**
+   * Helper method to validate the CRS value. Limited geographic CRS values 
are allowed.
+   */
+  private def isValidCrs(crs: String): Boolean = {
+    // Currently, we only support "OGC:CRS84" / "EPSG:4326" / "SRID:ANY".
+    // In the future, we may support others.
+    crs.equalsIgnoreCase(GEOGRAPHY_DEFAULT_CRS) ||
+    crs.equalsIgnoreCase(GEOGRAPHY_DEFAULT_EPSG_CRS) ||
+    crs.equalsIgnoreCase(MIXED_CRS)
+  }
+
+  /**
+   * Helper method to validate the SRID value. Only geographic SRID values are 
allowed.
+   */
+
+  private def isValidSrid(srid: Int): Boolean = {
+    // Currently, we only support 4326. In the future, we may support others.
+    srid == GEOGRAPHY_DEFAULT_SRID
+  }
+
+  override private[sql] def defaultConcreteType: DataType = 
GEOGRAPHY_MIXED_TYPE
+
+  override private[sql] def acceptsType(other: DataType): Boolean =
+    other.isInstanceOf[GeographyType]
+
+  override private[sql] def simpleString: String = "geography"
+
+  /**
+   * Converts a CRS string to its corresponding SRID integer value.
+   */
+  private[types] def toSrid(crs: String): Int = {
+    // The special value "SRID:ANY" is used to represent mixed SRID values.
+    if (crs.equalsIgnoreCase(GeographyType.MIXED_CRS)) {
+      GeographyType.MIXED_SRID
+    }
+    // As for other valid CRS values, we currently offer limited support.
+    else if (crs.equalsIgnoreCase(GeographyType.GEOGRAPHY_DEFAULT_CRS) ||
+      crs.equalsIgnoreCase(GeographyType.GEOGRAPHY_DEFAULT_EPSG_CRS)) {
+      GeographyType.GEOGRAPHY_DEFAULT_SRID
+    } else {
+      throw new SparkIllegalArgumentException(
+        errorClass = "ST_INVALID_CRS_VALUE",
+        messageParameters = Map("crs" -> crs))
+    }
+  }
+}
+
+/**
+ * Edge interpolation algorithm for Geography logical type. Currently, Spark 
only supports
+ * spherical algorithm.
+ */
+sealed abstract class EdgeInterpolationAlgorithm
+
+object EdgeInterpolationAlgorithm {
+  case object SPHERICAL extends EdgeInterpolationAlgorithm
+
+  val values: Seq[EdgeInterpolationAlgorithm] =
+    Seq(SPHERICAL)
+
+  def fromString(s: String): Option[EdgeInterpolationAlgorithm] =
+    values.find(_.toString.equalsIgnoreCase(s))
+}
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/types/GeometryType.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/types/GeometryType.scala
new file mode 100644
index 000000000000..5685bd19ca89
--- /dev/null
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/GeometryType.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonAST.{JString, JValue}
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.internal.types.SpatialReferenceSystemMapper
+
+/**
+ * The data type representing GEOMETRY values which are spatial objects, as 
defined in the Open
+ * Geospatial Consortium (OGC) Simple Feature Access specification
+ * (https://portal.ogc.org/files/?artifact_id=25355), with a Cartesian 
coordinate system.
+ */
+@Experimental
+class GeometryType private (val crs: String) extends AtomicType with 
Serializable {
+
+  /**
+   * Spatial Reference Identifier (SRID) value of the geometry type.
+   */
+  val srid: Int = GeometryType.toSrid(crs)
+
+  /**
+   * The default size of a value of the GeometryType is 2048 bytes, which can 
store roughly 120 2D
+   * points.
+   */
+  override def defaultSize: Int = 2048
+
+  /**
+   * The GeometryType is a mixed SRID type iff the SRID is MIXED_SRID. 
Semantically, this means
+   * that different SRID values per row are allowed.
+   */
+  def isMixedSrid: Boolean = srid == GeometryType.MIXED_SRID
+
+  /**
+   * Type name that is displayed to users.
+   */
+  override def typeName: String = {
+    if (isMixedSrid) {
+      // The mixed SRID type is displayed with a special specifier value "ANY".
+      "geometry(any)"
+    } else {
+      // The fixed SRID type is always displayed with the appropriate SRID 
value.
+      s"geometry($srid)"
+    }
+  }
+
+  /**
+   * String representation of the GeometryType, which uses SRID for fixed SRID 
types and "ANY" for
+   * mixed SRID types, providing a clear and concise user-friendly format for 
this type.
+   */
+  override def toString: String = {
+    if (isMixedSrid) {
+      // The mixed SRID type is displayed with a special specifier value "ANY".
+      "GeometryType(ANY)"
+    } else {
+      // The fixed SRID type is always displayed with the appropriate SRID 
value.
+      s"GeometryType($srid)"
+    }
+  }
+
+  /**
+   * JSON representation of the GeometryType, which uses the CRS string, in 
line with the current
+   * storage specifications (e.g. Parquet, Delta, Iceberg). Note that mixed 
SRID is disallowed,
+   * and only fixed SRID types can be stored. This is also in accordance to 
storage formats.
+   */
+  override def jsonValue: JValue = JString(s"geometry($crs)")
+
+  private[spark] override def asNullable: GeometryType = this
+
+  /**
+   * Two types are considered equal iff they are both GeometryTypes and have 
the same SRID value.
+   * For the GEOMETRY type, the SRID value uniquely identifies its type 
information.
+   */
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case g: GeometryType =>
+        // Iff two GeometryTypes have the same SRID, they are considered equal.
+        g.srid == srid
+      case _ =>
+        // In all other cases, the two types are considered not equal.
+        false
+    }
+  }
+
+  /**
+   * The hash code of the GeometryType is derived from its SRID value.
+   */
+  override def hashCode(): Int = srid.hashCode
+
+  /**
+   * The GeometryType can only accept another type if the other type is also a 
GeometryType, and
+   * the SRID values are compatible (see `acceptsGeometryType` below for more 
details).
+   */
+  override private[sql] def acceptsType(other: DataType): Boolean = {
+    other match {
+      case gt: GeometryType =>
+        // For GeometryType, we need to check the SRID values.
+        acceptsGeometryType(gt)
+      case _ =>
+        // In all other cases, the two types are considered different.
+        false
+    }
+  }
+
+  /**
+   * The GeometryType with a mixed SRID can accept any other GeometryType, 
i.e. either a fixed
+   * SRID GeometryType or another mixed SRID GeometryType. Conversely, a 
GeometryType with a fixed
+   * SRID can only accept another GeometryType with the same fixed SRID value, 
and not a mixed
+   * SRID.
+   */
+  def acceptsGeometryType(gt: GeometryType): Boolean = {
+    // If the SRID is mixed, we can accept any other GeometryType.
+    // If the SRID is not mixed, we can only accept the same SRID.
+    isMixedSrid || gt.srid == srid
+  }
+}
+
+@Experimental
+object GeometryType extends SpatialType {
+
+  /**
+   * The default coordinate reference system (CRS) value used for geometries, 
as specified by the
+   * Parquet, Delta, and Iceberg specifications. If crs is omitted, it should 
always default to
+   * this.
+   */
+  final val GEOMETRY_DEFAULT_SRID = 4326
+  final val GEOMETRY_DEFAULT_CRS = "OGC:CRS84"
+
+  /**
+   * The default concrete GeometryType in SQL.
+   */
+  private final val GEOMETRY_MIXED_TYPE: GeometryType =
+    GeometryType(MIXED_CRS)
+
+  /**
+   * Constructors for GeometryType.
+   */
+  def apply(srid: Int): GeometryType = {
+    val crs = SpatialReferenceSystemMapper.get().getStringId(srid)
+    if (crs == null) {
+      throw new SparkIllegalArgumentException(
+        errorClass = "ST_INVALID_SRID_VALUE",
+        messageParameters = Map("srid" -> srid.toString))
+    }
+    new GeometryType(crs)
+  }
+
+  def apply(crs: String): GeometryType = {
+    crs match {
+      case "ANY" =>
+        // Special value "ANY" is used for mixed SRID values.
+        // This should be available to users in the Scala API.
+        new GeometryType(MIXED_CRS)
+      case _ =>
+        // Otherwise, we need to further check the CRS value.
+        // This shouldn't be available to users in the Scala API.
+        new GeometryType(crs)
+    }
+  }
+
+  override private[sql] def defaultConcreteType: DataType = GEOMETRY_MIXED_TYPE
+
+  override private[sql] def acceptsType(other: DataType): Boolean =
+    other.isInstanceOf[GeometryType]
+
+  override private[sql] def simpleString: String = "geometry"
+
+  /**
+   * Converts a CRS string to its corresponding SRID integer value.
+   */
+  private[types] def toSrid(crs: String): Int = {
+    // The special value "SRID:ANY" is used to represent mixed SRID values.
+    if (crs.equalsIgnoreCase(GeometryType.MIXED_CRS)) {
+      return GeometryType.MIXED_SRID
+    }
+    // For all other CRS values, we need to look up the corresponding SRID.
+    val srid = SpatialReferenceSystemMapper.get().getSrid(crs)
+    if (srid == null) {
+      // If the CRS value is not recognized, we throw an exception.
+      throw new SparkIllegalArgumentException(
+        errorClass = "ST_INVALID_CRS_VALUE",
+        messageParameters = Map("crs" -> crs))
+    }
+    srid
+  }
+}
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/types/SpatialType.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/types/SpatialType.scala
new file mode 100644
index 000000000000..b2818956943c
--- /dev/null
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/SpatialType.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.apache.spark.sql.types.AbstractDataType
+
+trait SpatialType extends AbstractDataType {
+
+  /**
+   * Mixed SRID value and the corresponding CRS for geospatial types (Geometry 
and Geography)
+   * These values represent a geospatial type that can hold different SRID 
values per row.
+   */
+  final val MIXED_SRID: Int = -1
+  final val MIXED_CRS: String = "SRID:ANY"
+}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
index c88b0fd99646..c698a03d7f34 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
@@ -279,6 +279,26 @@ class DataTypeSuite extends SparkFunSuite {
   checkDataTypeFromJson(VarcharType(10))
   checkDataTypeFromDDL(VarcharType(11))
 
+  // GEOMETRY type with default fixed SRID.
+  checkDataTypeFromJson(GeometryType(GeometryType.GEOMETRY_DEFAULT_SRID))
+  checkDataTypeFromDDL(GeometryType(GeometryType.GEOMETRY_DEFAULT_SRID))
+
+  // GEOMETRY type with non-default fixed SRID.
+  checkDataTypeFromJson(GeometryType(3857))
+  checkDataTypeFromDDL(GeometryType(3857))
+
+  // GEOMETRY type with mixed SRID.
+  checkDataTypeFromJson(GeometryType("ANY"))
+  checkDataTypeFromDDL(GeometryType("ANY"))
+
+  // GEOGRAPHY type with default fixed SRID.
+  checkDataTypeFromJson(GeographyType(GeographyType.GEOGRAPHY_DEFAULT_SRID))
+  checkDataTypeFromDDL(GeographyType(GeographyType.GEOGRAPHY_DEFAULT_SRID))
+
+  // GEOGRAPHY type with mixed SRID.
+  checkDataTypeFromJson(GeographyType("ANY"))
+  checkDataTypeFromDDL(GeographyType("ANY"))
+
   dayTimeIntervalTypes.foreach(checkDataTypeFromJson)
   yearMonthIntervalTypes.foreach(checkDataTypeFromJson)
 
diff --git 
a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out 
b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
index ef17566850e3..a067d2c53d05 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
@@ -144,6 +144,8 @@ FULL        true
 FUNCTION       false
 FUNCTIONS      false
 GENERATED      false
+GEOGRAPHY      false
+GEOMETRY       false
 GLOBAL false
 GRANT  true
 GROUP  true
diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out 
b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
index 97309774cc37..93822c6c6b75 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
@@ -144,6 +144,8 @@ FULL        false
 FUNCTION       false
 FUNCTIONS      false
 GENERATED      false
+GEOGRAPHY      false
+GEOMETRY       false
 GLOBAL false
 GRANT  false
 GROUP  false
diff --git 
a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out 
b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
index 97309774cc37..93822c6c6b75 100644
--- a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
@@ -144,6 +144,8 @@ FULL        false
 FUNCTION       false
 FUNCTIONS      false
 GENERATED      false
+GEOGRAPHY      false
+GEOMETRY       false
 GLOBAL false
 GRANT  false
 GROUP  false
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/types/SpatialReferenceSystemMapperSuite.java
 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/types/SpatialReferenceSystemMapperSuite.java
new file mode 100644
index 000000000000..69c803097ab5
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/types/SpatialReferenceSystemMapperSuite.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.types;
+ 
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class SpatialReferenceSystemMapperSuite {
+
+  @Test
+  public void getStringIdReturnsCorrectStringIdForValidSrid() {
+    SpatialReferenceSystemMapper srMapper = SpatialReferenceSystemMapper.get();
+    Assertions.assertEquals("SRID:0", srMapper.getStringId(0));
+    Assertions.assertEquals("EPSG:3857", srMapper.getStringId(3857));
+    Assertions.assertEquals("OGC:CRS84", srMapper.getStringId(4326));
+  }
+
+  @Test
+  public void getStringIdReturnsNullForInvalidSrid() {
+    SpatialReferenceSystemMapper srMapper = SpatialReferenceSystemMapper.get();
+    Assertions.assertNull(srMapper.getStringId(-1));
+    Assertions.assertNull(srMapper.getStringId(9999));
+  }
+
+  @Test
+  public void getSridReturnsCorrectSridForValidStringId() {
+    SpatialReferenceSystemMapper srMapper = SpatialReferenceSystemMapper.get();
+    Assertions.assertEquals(0, srMapper.getSrid("SRID:0"));
+    Assertions.assertEquals(3857, srMapper.getSrid("EPSG:3857"));
+    Assertions.assertEquals(4326, srMapper.getSrid("OGC:CRS84"));
+  }
+
+  @Test
+  public void getSridReturnsNullForInvalidStringId() {
+    SpatialReferenceSystemMapper srMapper = SpatialReferenceSystemMapper.get();
+    Assertions.assertNull(srMapper.getSrid("INVALID:ID"));
+    Assertions.assertNull(srMapper.getSrid("EPSG:9999"));
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/types/GeographyTypeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/types/GeographyTypeSuite.scala
new file mode 100644
index 000000000000..51de95826f81
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/types/GeographyTypeSuite.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import java.util.Locale
+
+import org.json4s.JsonAST.JString
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.SparkIllegalArgumentException
+
+class GeographyTypeSuite extends SparkFunSuite {
+
+  // These tests verify the basic behavior of the GeographyType logical type.
+
+  test("GEOGRAPHY type with specified invalid SRID") {
+    val srids: Seq[Int] = Seq(-4612, -4326, -2, -1, 1, 2, 3126, 4612)
+    srids.foreach { srid =>
+      checkError(
+        exception = intercept[SparkIllegalArgumentException] {
+          GeographyType(srid)
+        },
+        condition = "ST_INVALID_SRID_VALUE",
+        sqlState = "22023",
+        parameters = Map("srid" -> srid.toString)
+      )
+    }
+  }
+
+  test("GEOGRAPHY type with specified valid SRID") {
+    val srids: Seq[Int] = Seq(4326)
+    srids.foreach { srid =>
+      val g = GeographyType(srid)
+      assert(g.srid == srid)
+      assert(g == GeographyType(srid))
+      assert(g.hashCode() == srid.hashCode())
+      // This GEOGRAPHY type has a fixed SRID.
+      assert(!g.isMixedSrid)
+      // The type name for concrete geography type does display the SRID.
+      assert(g.typeName == s"geography($srid)")
+      assert(g.simpleString == s"geography($srid)")
+      assert(g.sql == s"GEOGRAPHY($srid)")
+      // GeographyType with mixed SRID cannot accept any other SRID value.
+      assert(g.acceptsGeographyType(GeographyType(4326)))
+      assert(!g.acceptsGeographyType(GeographyType("ANY")))
+    }
+  }
+
+  test("GEOGRAPHY type with specified valid CRS and algorithm") {
+    val typeInformation: Seq[(Int, String, EdgeInterpolationAlgorithm)] = Seq(
+      (4326, "OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)
+    )
+    typeInformation.foreach { case (srid, crs, algorithm) =>
+      val g = GeographyType(crs, algorithm)
+      // Verify that the type is correctly created.
+      assert(g.srid == srid)
+      assert(g.crs == crs)
+      assert(g.algorithm == algorithm)
+      assert(g == GeographyType(srid))
+      assert(g.hashCode() == srid.hashCode())
+      // This GEOGRAPHY type has a fixed SRID.
+      assert(!g.isMixedSrid)
+      // The type name for concrete geography type does display the SRID.
+      assert(g.typeName == s"geography($srid)")
+      assert(g.simpleString == s"geography($srid)")
+      assert(g.sql == s"GEOGRAPHY($srid)")
+      // GeographyType with mixed SRID cannot accept any other SRID value.
+      assert(g.acceptsGeographyType(GeographyType(4326)))
+      assert(!g.acceptsGeographyType(GeographyType("ANY")))
+    }
+  }
+
+  test("GEOGRAPHY type with the special ANY specifier for mixed SRID") {
+    val g = GeographyType("ANY")
+    assert(g.srid == GeographyType.MIXED_SRID)
+    assert(g == GeographyType("ANY"))
+    assert(g.hashCode() == GeographyType.MIXED_SRID.hashCode())
+    // This GEOGRAPHY type has a fixed SRID.
+    assert(g.isMixedSrid)
+    // The type name for concrete geography type does display the SRID.
+    assert(g.typeName == s"geography(any)")
+    assert(g.simpleString == s"geography(any)")
+    assert(g.sql == s"GEOGRAPHY(ANY)")
+    // GeographyType with mixed SRID can accept any other SRID value.
+    assert(g.acceptsGeographyType(GeographyType(4326)))
+    assert(g.acceptsGeographyType(GeographyType("ANY")))
+  }
+
+  // These tests verify the interaction between different GeographyTypes.
+
+  test("GEOGRAPHY types with same SRID values") {
+    val g1 = GeographyType(4326)
+    val g2 = GeographyType(4326)
+    // These two GEOGRAPHY types have equal type info.
+    assert(g1.srid == g2.srid)
+    assert(g1.crs == g2.crs)
+    assert(g1.algorithm == g2.algorithm)
+    // These two GEOGRAPHY types are considered equal.
+    assert(g1 == g2)
+    // These two GEOGRAPHY types can accept each other.
+    assert(g1.acceptsGeographyType(g2))
+    assert(g2.acceptsGeographyType(g1))
+  }
+
+  // This test verifies the SQL and JSON representation of GEOGRAPHY types.
+
+  test("GEOGRAPHY data type representation") {
+    def assertStringRepresentation(
+        geomType: GeographyType,
+        typeName: String,
+        jsonValue: String): Unit = {
+      assert(geomType.typeName === typeName)
+      assert(geomType.sql === typeName.toUpperCase(Locale.ROOT))
+      assert(geomType.jsonValue === JString(jsonValue))
+    }
+    assertStringRepresentation(
+      GeographyType(4326),
+      "geography(4326)",
+      "geography(OGC:CRS84, SPHERICAL)"
+    )
+  }
+
+  // These tests verify the JSON parsing of different GEOGRAPHY types.
+
+  test("GEOGRAPHY data type JSON parsing with valid CRS and algorithm") {
+    val validGeographies = Seq(
+      "\"geography\"",
+      "\"geography(OGC:CRS84)\"",
+      "\"geography(ogc:CRS84)\"",
+      "\"geography( ogc:CRS84  )\"",
+      "\"geography(EPSG:4326)\"",
+      "\"geography(spherical)\"",
+      "\"geography(  spherical)\"",
+      "\"geography(OGC:CRS84,    spherical    )\"",
+      "\"geography( OGC:CRS84   , spherical )\""
+    )
+    validGeographies.foreach { geog =>
+      DataType.fromJson(geog).isInstanceOf[GeographyType]
+    }
+  }
+
+  test("GEOGRAPHY data type JSON parsing with invalid CRS or algorithm") {
+    val invalidGeographies = Seq(
+      "\"geography()\"",
+      "\"geography(())\"",
+      "\"geography(asdf)\"",
+      "\"geography(srid:0)\"",
+      "\"geography(123:123)\"",
+      "\"geography(srid:srid)\"",
+      "\"geography(karney)\"",
+      "\"geography(srid:srid, spherical)\"",
+      "\"geography(OGC:CRS84, karney)\""
+    )
+    invalidGeographies.foreach { geog =>
+      val exception = intercept[SparkIllegalArgumentException] {
+        DataType.fromJson(geog)
+      }
+      assert(
+        Seq(
+          "INVALID_JSON_DATA_TYPE",
+          "ST_INVALID_CRS_VALUE",
+          "ST_INVALID_ALGORITHM_VALUE"
+        ).contains(exception.getCondition)
+      )
+    }
+  }
+
+  // These tests verify the SQL parsing of different GEOGRAPHY types.
+
+  test("GEOGRAPHY data type SQL parsing with valid SRID") {
+    val validGeographies = Seq(
+      "GEOGRAPHY(ANY)",
+      "GEOGRAPHY(4326)"
+    )
+    validGeographies.foreach { geog =>
+      val dt = DataType.fromDDL(geog)
+      assert(dt.isInstanceOf[GeographyType])
+    }
+  }
+
+  test("GEOGRAPHY data type SQL parsing with invalid SRID") {
+    val invalidGeographies = Seq(
+      "GEOGRAPHY(123)",
+      "GEOGRAPHY(-1)",
+      "GEOGRAPHY(-4326)",
+      "GEOGRAPHY(99999)",
+      "GEOGRAPHY(SRID)",
+      "GEOGRAPHY(MIXED)"
+    )
+    invalidGeographies.foreach { geog =>
+      val exception = intercept[Exception] {
+        DataType.fromDDL(geog)
+      }
+      exception match {
+        case e: SparkIllegalArgumentException =>
+          assert(e.getCondition == "ST_INVALID_SRID_VALUE")
+        case e: org.apache.spark.sql.catalyst.parser.ParseException =>
+          assert(e.getMessage.contains("PARSE_SYNTAX_ERROR"))
+        case _ =>
+          fail(s"Unexpected exception type: ${exception.getClass.getName}")
+      }
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/types/GeometryTypeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/types/GeometryTypeSuite.scala
new file mode 100644
index 000000000000..a6961f0c0343
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/types/GeometryTypeSuite.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import java.util.Locale
+
+import org.json4s.JsonAST.JString
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.SparkIllegalArgumentException
+
+class GeometryTypeSuite extends SparkFunSuite {
+
+  // These tests verify the basic behavior of the GeometryType logical type.
+
+  test("GEOMETRY type with specified invalid SRID") {
+    val srids: Seq[Int] = Seq(-4612, -4326, -2, -1, 1, 2)
+    srids.foreach { srid =>
+      checkError(
+        exception = intercept[SparkIllegalArgumentException] {
+          GeometryType(srid)
+        },
+        condition = "ST_INVALID_SRID_VALUE",
+        sqlState = "22023",
+        parameters = Map("srid" -> srid.toString)
+      )
+    }
+  }
+
+  test("GEOMETRY type with specified valid SRID") {
+    val srids: Seq[Int] = Seq(0, 3857, 4326)
+    srids.foreach { srid =>
+      val g = GeometryType(srid)
+      assert(g.srid == srid)
+      assert(g == GeometryType(srid))
+      assert(g.hashCode() == srid.hashCode())
+      // This GEOMETRY type has a fixed SRID.
+      assert(!g.isMixedSrid)
+      // The type name for concrete geometry type does display the SRID.
+      assert(g.typeName == s"geometry($srid)")
+      assert(g.simpleString == s"geometry($srid)")
+      assert(g.sql == s"GEOMETRY($srid)")
+      // GeometryType with a specific SRID cannot accept a different SRID 
value.
+      val otherSrid = if (srid == 3857) 4326 else 3857
+      assert(!g.acceptsGeometryType(GeometryType(otherSrid)))
+    }
+  }
+
+  test("GEOMETRY type with the special ANY specifier for mixed SRID") {
+    val g = GeometryType("ANY")
+    assert(g.srid == GeometryType.MIXED_SRID)
+    assert(g == GeometryType("ANY"))
+    assert(g.hashCode() == GeometryType.MIXED_SRID.hashCode())
+    // This GEOMETRY type has a fixed SRID.
+    assert(g.isMixedSrid)
+    // The type name for concrete geometry type does display the SRID.
+    assert(g.typeName == s"geometry(any)")
+    assert(g.simpleString == s"geometry(any)")
+    assert(g.sql == s"GEOMETRY(ANY)")
+    // GeometryType with mixed SRID can accept any other SRID value.
+    assert(g.acceptsGeometryType(GeometryType(0)))
+    assert(g.acceptsGeometryType(GeometryType(3857)))
+    assert(g.acceptsGeometryType(GeometryType(4326)))
+  }
+
+  // These tests verify the interaction between different GeometryTypes.
+
+  test("GEOMETRY types with same SRID values") {
+    val g1 = GeometryType(4326)
+    val g2 = GeometryType(4326)
+    // These two GEOMETRY types have equal type info.
+    assert(g1.srid == g2.srid)
+    assert(g1.crs == g2.crs)
+    // These two GEOMETRY types are considered equal.
+    assert(g1 == g2)
+    // These two GEOMETRY types can accept each other.
+    assert(g1.acceptsGeometryType(g2))
+    assert(g2.acceptsGeometryType(g1))
+  }
+
+  test("GEOMETRY types with different SRID values") {
+    val g1 = GeometryType(4326)
+    val g2 = GeometryType(3857)
+    // These two GEOMETRY types have different type info.
+    assert(g1.srid != g2.srid)
+    assert(g1.crs != g2.crs)
+    // These two GEOMETRY types are considered different.
+    assert(g1 != g2)
+    // These two GEOMETRY types cannot accept each other.
+    assert(!g1.acceptsGeometryType(g2))
+    assert(!g2.acceptsGeometryType(g1))
+  }
+
+  // This test verifies the SQL and JSON representation of GEOMETRY types.
+
+  test("GEOMETRY data type representation") {
+    def assertStringRepresentation(
+        geomType: GeometryType,
+        typeName: String,
+        jsonValue: String): Unit = {
+      assert(geomType.typeName === typeName)
+      assert(geomType.sql === typeName.toUpperCase(Locale.ROOT))
+      assert(geomType.jsonValue === JString(jsonValue))
+    }
+    assertStringRepresentation(GeometryType(0), "geometry(0)", 
"geometry(SRID:0)")
+    assertStringRepresentation(GeometryType(3857), "geometry(3857)", 
"geometry(EPSG:3857)")
+    assertStringRepresentation(GeometryType(4326), "geometry(4326)", 
"geometry(OGC:CRS84)")
+  }
+
+  // These tests verify the JSON parsing of different GEOMETRY types.
+
+  test("GEOMETRY data type JSON parsing with valid CRS") {
+    val validGeometries = Seq(
+      "\"geometry\"",
+      "\"geometry(OGC:CRS84)\""
+    )
+    validGeometries.foreach { geom =>
+      DataType.fromJson(geom).isInstanceOf[GeometryType]
+    }
+  }
+
+  test("GEOMETRY data type JSON parsing with invalid CRS") {
+    val invalidGeometries = Seq(
+      "\"geometry()\"",
+      "\"geometry(())\"",
+      "\"geometry(asdf)\"",
+      "\"geometry(asdf:fdsa)\"",
+      "\"geometry(123:123)\"",
+      "\"geometry(srid:srid)\"",
+      "\"geometry(SRID:1)\"",
+      "\"geometry(SRID:123)\"",
+      "\"geometry(EPSG:123)\"",
+      "\"geometry(ESRI:123)\"",
+      "\"geometry(OCG:123)\"",
+      "\"geometry(OCG:CRS123)\""
+    )
+    invalidGeometries.foreach { geom =>
+      val exception = intercept[SparkIllegalArgumentException] {
+        DataType.fromJson(geom)
+      }
+      assert(
+        Seq(
+          "INVALID_JSON_DATA_TYPE",
+          "ST_INVALID_CRS_VALUE"
+        ).contains(exception.getCondition)
+      )
+    }
+  }
+
+  // These tests verify the SQL parsing of different GEOMETRY types.
+
+  test("GEOMETRY data type SQL parsing with valid SRID") {
+    val validGeometries = Seq(
+      "GEOMETRY(ANY)",
+      "GEOMETRY(0)",
+      "GEOMETRY(3857)",
+      "GEOMETRY(4326)"
+    )
+    validGeometries.foreach { geom =>
+      val dt = DataType.fromDDL(geom)
+      assert(dt.isInstanceOf[GeometryType])
+    }
+  }
+
+  test("GEOMETRY data type SQL parsing with invalid SRID") {
+    val invalidGeometries = Seq(
+      "GEOMETRY(123)",
+      "GEOMETRY(-1)",
+      "GEOMETRY(-4326)",
+      "GEOMETRY(99999)",
+      "GEOMETRY(SRID)",
+      "GEOMETRY(MIXED)"
+    )
+    invalidGeometries.foreach { geom =>
+      val exception = intercept[Exception] {
+        DataType.fromDDL(geom)
+      }
+      exception match {
+        case e: SparkIllegalArgumentException =>
+          assert(e.getCondition == "ST_INVALID_SRID_VALUE")
+        case e: org.apache.spark.sql.catalyst.parser.ParseException =>
+          assert(e.getMessage.contains("PARSE_SYNTAX_ERROR"))
+        case _ =>
+          fail(s"Unexpected exception type: ${exception.getClass.getName}")
+      }
+    }
+  }
+}
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index a394295360f5..fd015330e8de 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends 
SharedThriftServer {
       val sessionHandle = client.openSession(user, "")
       val infoValue = client.getInfo(sessionHandle, 
GetInfoType.CLI_ODBC_KEYWORDS)
       // scalastyle:off line.size.limit
-      assert(infoValue.getStringValue == 
"ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,ATOMIC,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,C
 [...]
+      assert(infoValue.getStringValue == 
"ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,ATOMIC,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,C
 [...]
       // scalastyle:on line.size.limit
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to