This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new d1a157d1bc7c [SPARK-57083][SQL] Preserve geography SRID across
encoders, Parquet readers, and Python conversion
d1a157d1bc7c is described below
commit d1a157d1bc7cd4242cb81fd2d6a90ff43be74c48
Author: Uros Bojanic <[email protected]>
AuthorDate: Wed May 27 18:45:11 2026 +0800
[SPARK-57083][SQL] Preserve geography SRID across encoders, Parquet
readers, and Python conversion
### What changes were proposed in this pull request?
Now that `GeographyType` supports per-row SRIDs, this PR fixes several code
paths that were silently dropping the SRID when materializing a `GeographyVal`
from WKB and manually substituting the default value (`4326`).
### Why are the changes needed?
Without these fixes, geographies read from Arrow, Parquet, Python, or
constructed via `CatalystTypeConverters.convertToCatalyst` had their SRID
rewritten to the default 4326, regardless of the column's declared SRID or the
value's actual SRID. This produced silent data corruption for any geography
stored with a non-default geographic SRID.
### Does this PR introduce _any_ user-facing change?
Yes — bug fix. Reading a Parquet column declared as
`GeographyType(<non-default SRID>)`, materializing geographies from
Arrow/Python, or converting Geography values via
`CatalystTypeConverters.convertToCatalyst` now yields values whose SRID matches
the column's declared SRID. Previously the SRID was always 4326.
### How was this patch tested?
Updated relevant test suites and added appropriate unit test cases.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor (Claude Opus 4.7)
Closes #56127 from uros-db/geography-srids.
Authored-by: Uros Bojanic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 4a61083f22e605fa0a4c73915499514200902b79)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/catalyst/util/STUtils.java | 8 ++
.../spark/sql/vectorized/ArrowColumnVector.java | 4 +-
.../sql/catalyst/CatalystTypeConverters.scala | 2 +-
.../sql/catalyst/CatalystTypeConvertersSuite.scala | 25 ++++
.../spark/sql/catalyst/util/StUtilsSuite.java | 34 ++++-
.../datasources/parquet/WKBConverterStrategy.java | 2 +-
.../datasources/parquet/ParquetRowConverter.scala | 20 +--
.../sql/execution/python/EvaluatePython.scala | 3 +-
.../sql/execution/arrow/ArrowWriterSuite.scala | 8 ++
.../datasources/parquet/ParquetGeoSuite.scala | 61 ++++++++-
.../sql/execution/python/EvaluatePythonSuite.scala | 139 +++++++++++++++++++++
11 files changed, 288 insertions(+), 18 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/STUtils.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/STUtils.java
index c5d57fd08fed..c9035569770c 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/STUtils.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/STUtils.java
@@ -152,6 +152,14 @@ public final class STUtils {
return toPhysVal(Geography.fromWkb(wkb));
}
+ public static GeographyVal stGeogFromWKB(byte[] wkb, int srid) {
+ // We only allow setting the SRID to geographic values.
+ if(!GeographyType.isSridSupported(srid)) {
+ throw QueryExecutionErrors.stInvalidSridValueError(srid);
+ }
+ return toPhysVal(Geography.fromWkb(wkb, srid));
+ }
+
// ST_GeomFromWKB
public static GeometryVal stGeomFromWKB(byte[] wkb) {
return toPhysVal(Geometry.fromWkb(wkb));
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
index 019bc258579a..8cacfcda0b0e 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
@@ -157,9 +157,7 @@ public class ArrowColumnVector extends ColumnVector {
int srid = getChild(0).getInt(rowId);
byte[] bytes = getChild(1).getBinary(rowId);
gt.assertSridAllowedForType(srid);
- // TODO(GEO-602): Geog still does not support different SRIDs, once it
does,
- // we need to update this.
- return (bytes == null) ? null : STUtils.stGeogFromWKB(bytes);
+ return (bytes == null) ? null : STUtils.stGeogFromWKB(bytes, srid);
}
@Override
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index d51007e7d336..be66f851e361 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -666,7 +666,7 @@ object CatalystTypeConverters {
case r: Row => InternalRow(r.toSeq.map(convertToCatalyst): _*)
case arr: Array[Byte] => arr
case g: org.apache.spark.sql.types.Geometry =>
STUtils.stGeomFromWKB(g.getBytes, g.getSrid)
- case g: org.apache.spark.sql.types.Geography =>
STUtils.stGeogFromWKB(g.getBytes)
+ case g: org.apache.spark.sql.types.Geography =>
STUtils.stGeogFromWKB(g.getBytes, g.getSrid)
case arr: Array[Char] => StringConverter.toCatalyst(arr)
case arr: Array[_] => new GenericArrayData(arr.map(convertToCatalyst))
case map: Map[_, _] =>
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
index b730a6c27a3b..222465d82c02 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
@@ -488,6 +488,18 @@ class CatalystTypeConvertersSuite extends SparkFunSuite
with SQLHelper {
assert(STUtils.stSrid(resultVal) === 4326)
}
+ test("converting Geography with non-default SRID via convertToCatalyst") {
+ // Geography supports a variety of geographic SRIDs beyond the default
4326.
+ Seq(4267, 4269, 4612, 37001, 104030).foreach { srid =>
+ val geog = Geography.fromWKB(pointWkb, srid)
+ val result = CatalystTypeConverters.convertToCatalyst(geog)
+ assert(result.isInstanceOf[GeographyVal])
+ val resultVal = result.asInstanceOf[GeographyVal]
+ assert(java.util.Arrays.equals(STUtils.stAsBinary(resultVal, NDR),
pointWkb))
+ assert(STUtils.stSrid(resultVal) === srid)
+ }
+ }
+
test("convertToCatalyst null handling for geospatial types") {
assert(CatalystTypeConverters.convertToCatalyst(null: Geometry) === null)
assert(CatalystTypeConverters.convertToCatalyst(null: Geography) === null)
@@ -503,6 +515,19 @@ class CatalystTypeConvertersSuite extends SparkFunSuite
with SQLHelper {
parameters = Map("srid" -> "1"))
}
+ test("convertToCatalyst with Geography with invalid SRID") {
+ // Geography only accepts geographic SRIDs (e.g. 0 and 3857 are not
geographic).
+ Seq(0, 1, 3857).foreach { invalidSrid =>
+ val geog = Geography.fromWKB(pointWkb, invalidSrid)
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ CatalystTypeConverters.convertToCatalyst(geog)
+ },
+ condition = "ST_INVALID_SRID_VALUE",
+ parameters = Map("srid" -> invalidSrid.toString))
+ }
+ }
+
test("createToCatalystConverter for GeometryType") {
val gt = GeometryType(0)
val converter = CatalystTypeConverters.createToCatalystConverter(gt)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StUtilsSuite.java
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StUtilsSuite.java
index c3a806d897dd..7aaa3a8013c1 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StUtilsSuite.java
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StUtilsSuite.java
@@ -142,12 +142,44 @@ class STUtilsSuite {
// ST_GeogFromWKB
@Test
- void testStGeogFromWKB() {
+ void testStGeogFromWKBNoSrid() {
GeographyVal geographyVal = STUtils.stGeogFromWKB(testWkb);
assertNotNull(geographyVal);
assertArrayEquals(testGeographyBytes, geographyVal.getBytes());
}
+ @Test
+ void testStGeogFromWKBWithDefaultSrid() {
+ GeographyVal geographyVal = STUtils.stGeogFromWKB(testWkb,
testGeographySrid);
+ assertNotNull(geographyVal);
+ assertArrayEquals(testGeographyBytes, geographyVal.getBytes());
+ }
+
+ @Test
+ void testStGeogFromWKBWithValidSrid() {
+ // Geography supports a variety of geographic SRIDs (not just the default
4326).
+ for (int validGeographySrid : new int[]{4267, 4269, 4326, 4612, 37001,
104030}) {
+ GeographyVal geographyVal = STUtils.stGeogFromWKB(testWkb,
validGeographySrid);
+ assertNotNull(geographyVal);
+ byte[] expectedBytes = new byte[testWkb.length + sridLen];
+ byte[] geogSrid =
ByteBuffer.allocate(sridLen).order(end).putInt(validGeographySrid).array();
+ System.arraycopy(geogSrid, 0, expectedBytes, 0, sridLen);
+ System.arraycopy(testWkb, 0, expectedBytes, sridLen, testWkb.length);
+ assertArrayEquals(expectedBytes, geographyVal.getBytes());
+ }
+ }
+
+ @Test
+ void testStGeogFromWKBWithInvalidSrid() {
+ // SRIDs that are either out of range or correspond to non-geographic
SRSes (e.g. 0, 3857).
+ for (int invalidGeographySrid : new int[]{-9999, -2, -1, 0, 1, 2, 3857,
9999}) {
+ SparkIllegalArgumentException exception =
assertThrows(SparkIllegalArgumentException.class,
+ () -> STUtils.stGeogFromWKB(testWkb, invalidGeographySrid));
+ assertEquals("ST_INVALID_SRID_VALUE", exception.getCondition());
+ assertTrue(exception.getMessage().contains("value: " +
invalidGeographySrid + "."));
+ }
+ }
+
// ST_GeomFromWKB
@Test
void testStGeomFromWKBNoSrid() {
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
index a8be90951289..dc821f6c5710 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
@@ -46,6 +46,6 @@ enum WKBToGeographyConverter implements WKBConverterStrategy {
@Override
public byte[] convert(byte[] wkb, int srid) {
- return STUtils.stGeogFromWKB(wkb).getBytes();
+ return STUtils.stGeogFromWKB(wkb, srid).getBytes();
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index f253cbd0d0d3..50cc104eb8f6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -415,8 +415,8 @@ private[parquet] class ParquetRowConverter(
case geom: GeometryType =>
new ParquetGeometryConverter(geom.srid, updater)
- case _: GeographyType =>
- new ParquetGeographyConverter(updater)
+ case geog: GeographyType =>
+ new ParquetGeographyConverter(geog.srid, updater)
// As long as the parquet type is INT64 timestamp, whether logical
annotation
// `isAdjustedToUTC` is false or true, it will be read as Spark's
TimestampLTZ type
@@ -619,7 +619,7 @@ private[parquet] class ParquetRowConverter(
}
/**
- * Parquet converter for strings. A dictionary is used to minimize string
decoding cost.
+ * Parquet converter for geometries. A dictionary is used to minimize WKB
decoding cost.
*/
private final class ParquetGeometryConverter(srid: Int, updater:
ParentContainerUpdater)
extends ParquetPrimitiveConverter(updater) {
@@ -655,9 +655,9 @@ private[parquet] class ParquetRowConverter(
}
/**
- * Parquet converter for strings. A dictionary is used to minimize string
decoding cost.
+ * Parquet converter for geographies. A dictionary is used to minimize WKB
decoding cost.
*/
- private final class ParquetGeographyConverter(updater:
ParentContainerUpdater)
+ private final class ParquetGeographyConverter(srid: Int, updater:
ParentContainerUpdater)
extends ParquetPrimitiveConverter(updater) {
private var expandedDictionary: Array[GeographyVal] = null
@@ -666,7 +666,7 @@ private[parquet] class ParquetRowConverter(
override def setDictionary(dictionary: Dictionary): Unit = {
this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i =>
- STUtils.stGeogFromWKB(dictionary.decodeToBinary(i).getBytesUnsafe)
+ STUtils.stGeogFromWKB(dictionary.decodeToBinary(i).getBytesUnsafe,
srid)
}
}
@@ -678,15 +678,15 @@ private[parquet] class ParquetRowConverter(
val buffer = value.toByteBuffer
val numBytes = buffer.remaining()
- val geometry = if (buffer.hasArray) {
+ val geography = if (buffer.hasArray) {
val array = buffer.array()
val offset = buffer.arrayOffset() + buffer.position()
- STUtils.stGeogFromWKB(array.slice(offset, offset + numBytes))
+ STUtils.stGeogFromWKB(array.slice(offset, offset + numBytes), srid)
} else {
- STUtils.stGeogFromWKB(value.getBytesUnsafe)
+ STUtils.stGeogFromWKB(value.getBytesUnsafe, srid)
}
- updater.set(geometry)
+ updater.set(geography)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index 6a9b4978e27b..973a767144b7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -245,7 +245,8 @@ object EvaluatePython {
val geographySrid = s.get("srid").asInstanceOf[Int]
g.assertSridAllowedForType(geographySrid)
STUtils.stGeogFromWKB(
- s.get("wkb").asInstanceOf[Array[Byte]])
+ s.get("wkb").asInstanceOf[Array[Byte]],
+ geographySrid)
}
case g: GeometryType => (obj: Any) => nullSafeConvert(obj) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala
index c3e9af54d431..52001c2a021d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala
@@ -105,15 +105,23 @@ class ArrowWriterSuite extends SparkFunSuite {
}
val geographies = wkbs.map(x => InternalGeography.fromWkb(x,
4326).getValue)
+ val geographies4267 = wkbs.map(x => InternalGeography.fromWkb(x,
4267).getValue)
+ val geographies4269 = wkbs.map(x => InternalGeography.fromWkb(x,
4269).getValue)
val geometries = wkbs.map(x => InternalGeometry.fromWkb(x, 0).getValue)
val mixedGeometries = wkbs.zip(Seq(0, 4326)).map {
case (g, srid) => InternalGeometry.fromWkb(g, srid).getValue
}
+ val mixedGeographies = wkbs.zip(Seq(4267, 4269)).map {
+ case (g, srid) => InternalGeography.fromWkb(g, srid).getValue
+ }
check(GeometryType(0), geometries)
check(GeographyType(4326), geographies)
+ check(GeographyType(4267), geographies4267)
+ check(GeographyType(4269), geographies4269)
check(GeometryType("ANY"), mixedGeometries)
check(GeographyType("ANY"), geographies)
+ check(GeographyType("ANY"), mixedGeographies)
check(BooleanType, Seq(true, null, false))
check(ByteType, Seq(1.toByte, 2.toByte, null, 4.toByte))
check(ShortType, Seq(1.toShort, 2.toShort, null, 4.toShort))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
index 107b5b7675b1..057f722f215c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
@@ -22,8 +22,9 @@ import java.io.File
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.sql.functions.{st_asbinary, st_geogfromwkb,
st_geomfromwkb}
+import org.apache.spark.sql.functions.{st_asbinary, st_geogfromwkb,
st_geomfromwkb, st_srid}
import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{Geography, GeographyType, StructField,
StructType}
class ParquetGeoSuite
extends ParquetCompatibilityTest
@@ -98,6 +99,32 @@ class ParquetGeoSuite
testReadWrite(Seq(point1Wkb, line1Wkb, makePolygonWkb()))
}
+ test("geography preserves non-default SRID through Parquet round-trip") {
+ // Geography supports a variety of geographic SRIDs beyond the default
4326. Verify that the
+ // SRID is preserved when written to and read back from Parquet, across
the row-based reader
+ // (ParquetGeographyConverter) and the vectorized reader
(WKBToGeographyConverter).
+ Seq(4267, 4269, 4326).foreach { srid =>
+ withTempDir { dir =>
+ val schema = StructType(Seq(
+ StructField("geog", GeographyType(srid), nullable = true)))
+ val wkbValues = Seq(point0Wkb, point1Wkb, line0Wkb)
+ val rdd = sparkContext.parallelize(
+ wkbValues.map(wkb => Row(Geography.fromWKB(wkb, srid))))
+ val df = spark.createDataFrame(rdd, schema)
+ withAllParquetWriters {
+ df.write.mode("overwrite").parquet(dir.getAbsolutePath)
+ withAllParquetReaders {
+ // Verify both the WKB payload and the SRID round-trip correctly.
+ checkAnswer(
+ spark.read.parquet(dir.getAbsolutePath)
+ .select(st_asbinary($"geog"), st_srid($"geog")),
+ wkbValues.map(wkb => Row(wkb, srid)))
+ }
+ }
+ }
+ }
+ }
+
test("dictionary encoding") {
val wkbValues = Seq(
point0Wkb,
@@ -119,4 +146,36 @@ class ParquetGeoSuite
}
}
}
+
+ test("geography preserves non-default SRID with dictionary encoding") {
+ // Force dictionary encoding by repeating a small number of values many
times. This exercises
+ // the setDictionary path of ParquetGeographyConverter and the dictionary
path of
+ // WKBToGeographyConverter, both of which must materialize geographies
with the column's SRID.
+ val srid = 4267
+ val wkbValues = Seq(point0Wkb, point1Wkb, line0Wkb)
+ val repeatedWkbs = List.fill(10000)(wkbValues).flatten
+ val schema = StructType(Seq(
+ StructField("geog", GeographyType(srid), nullable = true)))
+ val rdd = sparkContext.parallelize(
+ repeatedWkbs.map(wkb => Row(Geography.fromWKB(wkb, srid))))
+ val df = spark.createDataFrame(rdd, schema)
+
+ Seq(true, false).foreach { useDictionary =>
+ withSQLConf(ParquetOutputFormat.ENABLE_DICTIONARY ->
useDictionary.toString) {
+ withTempDir { dir =>
+ withAllParquetWriters {
+ df.write.mode("overwrite").parquet(dir.getAbsolutePath)
+ withAllParquetReaders {
+ // Aggregate-style assertion to keep the comparison cheap on 30K
rows: every row
+ // should round-trip with the original SRID.
+ val readBack = spark.read.parquet(dir.getAbsolutePath)
+ .select(st_srid($"geog").as("srid"))
+ assert(readBack.count() === repeatedWkbs.length)
+ assert(readBack.where($"srid" =!= srid).count() === 0)
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/EvaluatePythonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/EvaluatePythonSuite.scala
new file mode 100644
index 000000000000..aee19587dcaa
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/EvaluatePythonSuite.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException,
SparkRuntimeException}
+import org.apache.spark.sql.catalyst.util.STUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal}
+
+class EvaluatePythonSuite extends SparkFunSuite {
+
+ // POINT(1 2) in WKB, little-endian.
+ private val pointWkb: Array[Byte] =
"010100000000000000000031400000000000001C40"
+ .grouped(2).map(Integer.parseInt(_, 16).toByte).toArray
+
+ private def pyGeo(srid: Int, wkb: Array[Byte]): java.util.HashMap[String,
Any] = {
+ val m = new java.util.HashMap[String, Any]()
+ m.put("srid", srid)
+ m.put("wkb", wkb)
+ m
+ }
+
+ // ----- GeographyType -----
+
+ test("makeFromJava(GeographyType): preserves per-row SRID for fixed-SRID
columns") {
+ // Geography supports a variety of geographic SRIDs beyond the default
4326. Ensure that the
+ // SRID is preserved on the Python -> Catalyst conversion path.
+ Seq(4267, 4269, 4326, 4612, 37001, 104030).foreach { srid =>
+ val convert = EvaluatePython.makeFromJava(GeographyType(srid))
+ val result = convert(pyGeo(srid, pointWkb))
+ assert(result.isInstanceOf[GeographyVal])
+ assert(STUtils.stSrid(result.asInstanceOf[GeographyVal]) === srid)
+ }
+ }
+
+ test("makeFromJava(GeographyType ANY): preserves per-row SRID for mixed-SRID
columns") {
+ val convert = EvaluatePython.makeFromJava(GeographyType("ANY"))
+ Seq(4267, 4269, 4326).foreach { srid =>
+ val result = convert(pyGeo(srid, pointWkb))
+ assert(result.isInstanceOf[GeographyVal])
+ assert(STUtils.stSrid(result.asInstanceOf[GeographyVal]) === srid)
+ }
+ }
+
+ test("makeFromJava(GeographyType): rejects SRID mismatch on a fixed-SRID
column") {
+ val convert = EvaluatePython.makeFromJava(GeographyType(4326))
+ checkError(
+ exception = intercept[SparkRuntimeException] {
+ convert(pyGeo(4267, pointWkb))
+ },
+ condition = "GEO_ENCODER_SRID_MISMATCH_ERROR",
+ parameters = Map("type" -> "GEOGRAPHY", "valueSrid" -> "4267",
"typeSrid" -> "4326"))
+ }
+
+ test("makeFromJava(GeographyType ANY): rejects non-geographic SRID") {
+ val convert = EvaluatePython.makeFromJava(GeographyType("ANY"))
+ // SRID 0 is not a geographic SRID; even mixed-SRID columns must reject it.
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ convert(pyGeo(0, pointWkb))
+ },
+ condition = "ST_INVALID_SRID_VALUE",
+ parameters = Map("srid" -> "0"))
+ // SRID 3857 is a valid Cartesian SRID but not geographic.
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ convert(pyGeo(3857, pointWkb))
+ },
+ condition = "ST_INVALID_SRID_VALUE",
+ parameters = Map("srid" -> "3857"))
+ }
+
+ test("makeFromJava(GeographyType): null is preserved") {
+ val convert = EvaluatePython.makeFromJava(GeographyType(4326))
+ assert(convert(null) === null)
+ }
+
+ // ----- GeometryType -----
+
+ test("makeFromJava(GeometryType): preserves per-row SRID for fixed-SRID
columns") {
+ // Geometry supports both the default SRID 0 and a variety of
Cartesian/geographic SRIDs.
+ Seq(0, 3857, 4267, 4269, 4326, 32601, 102964).foreach { srid =>
+ val convert = EvaluatePython.makeFromJava(GeometryType(srid))
+ val result = convert(pyGeo(srid, pointWkb))
+ assert(result.isInstanceOf[GeometryVal])
+ assert(STUtils.stSrid(result.asInstanceOf[GeometryVal]) === srid)
+ }
+ }
+
+ test("makeFromJava(GeometryType ANY): preserves per-row SRID for mixed-SRID
columns") {
+ val convert = EvaluatePython.makeFromJava(GeometryType("ANY"))
+ Seq(0, 3857, 4267, 4269, 4326).foreach { srid =>
+ val result = convert(pyGeo(srid, pointWkb))
+ assert(result.isInstanceOf[GeometryVal])
+ assert(STUtils.stSrid(result.asInstanceOf[GeometryVal]) === srid)
+ }
+ }
+
+ test("makeFromJava(GeometryType): rejects SRID mismatch on a fixed-SRID
column") {
+ val convert = EvaluatePython.makeFromJava(GeometryType(0))
+ checkError(
+ exception = intercept[SparkRuntimeException] {
+ convert(pyGeo(4326, pointWkb))
+ },
+ condition = "GEO_ENCODER_SRID_MISMATCH_ERROR",
+ parameters = Map("type" -> "GEOMETRY", "valueSrid" -> "4326", "typeSrid"
-> "0"))
+ }
+
+ test("makeFromJava(GeometryType ANY): rejects unsupported SRID") {
+ val convert = EvaluatePython.makeFromJava(GeometryType("ANY"))
+ // SRID 1 is not a registered SRID, so even mixed-SRID columns must reject
it.
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ convert(pyGeo(1, pointWkb))
+ },
+ condition = "ST_INVALID_SRID_VALUE",
+ parameters = Map("srid" -> "1"))
+ }
+
+ test("makeFromJava(GeometryType): null is preserved") {
+ val convert = EvaluatePython.makeFromJava(GeometryType(0))
+ assert(convert(null) === null)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]