This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e92ef557686d [SPARK-55640][GEO][SQL] Propagate WKB parsing errors for
Geometry and Geography
e92ef557686d is described below
commit e92ef557686daf8961a52cfab141b803dde77413
Author: Uros Bojanic <[email protected]>
AuthorDate: Thu Feb 26 11:32:30 2026 +0800
[SPARK-55640][GEO][SQL] Propagate WKB parsing errors for Geometry and
Geography
### What changes were proposed in this pull request?
WKB reader was implemented for Geometry and Geography, but only using
internal exception handling. This PR addresses this by introducing proper
user-facing error classes for WKB parsing.
### Why are the changes needed?
Propagate the WKB parsing errors properly to the user.
### Does this PR introduce _any_ user-facing change?
Yes, users now get proper errors for invalid WKB parsing.
### How was this patch tested?
Added new unit tests and end-to-end SQL tests for WKB parsing.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54424 from uros-db/geo-wkb-parse-exceptions.
Authored-by: Uros Bojanic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 6 ++
python/pyspark/errors/error-conditions.json | 6 ++
python/pyspark/sql/tests/test_functions.py | 26 +++++++
.../apache/spark/sql/catalyst/util/Geography.java | 20 ++++--
.../apache/spark/sql/catalyst/util/Geometry.java | 20 ++++--
.../sql/catalyst/util/geo/WkbParseException.java | 37 ++++------
.../spark/sql/catalyst/util/geo/WkbReader.java | 2 +-
.../spark/sql/errors/QueryExecutionErrors.scala | 9 +++
.../catalyst/util/geo/WkbErrorHandlingTest.java | 58 +++++++---------
.../sql/catalyst/util/geo/WkbGeographyTest.java | 2 +-
.../util/geo/WkbReaderWriterAdvancedTest.java | 8 +--
.../sql/catalyst/util/GeographyExecutionSuite.java | 12 ++++
.../sql/catalyst/util/GeometryExecutionSuite.java | 12 ++++
.../analyzer-results/st-functions.sql.out | 53 ++++++++++++++
.../resources/sql-tests/inputs/st-functions.sql | 15 ++++
.../sql-tests/results/st-functions.sql.out | 80 ++++++++++++++++++++++
.../org/apache/spark/sql/STExpressionsSuite.scala | 35 ++++++++++
.../org/apache/spark/sql/STFunctionsSuite.scala | 39 +++++++++++
18 files changed, 359 insertions(+), 81 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index dc6200095444..94111b8e9ee0 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -7754,6 +7754,12 @@
],
"sqlState" : "42601"
},
+ "WKB_PARSE_ERROR" : {
+ "message" : [
+ "Error parsing WKB: <parseError> at position <pos>"
+ ],
+ "sqlState" : "22023"
+ },
"WRITE_STREAM_NOT_ALLOWED" : {
"message" : [
"`writeStream` can be called only on streaming Dataset/DataFrame."
diff --git a/python/pyspark/errors/error-conditions.json
b/python/pyspark/errors/error-conditions.json
index 94ede56da5e1..b0cf4c084e20 100644
--- a/python/pyspark/errors/error-conditions.json
+++ b/python/pyspark/errors/error-conditions.json
@@ -1488,6 +1488,12 @@
"Value for `<arg_name>` must be between <lower_bound> and <upper_bound>
(inclusive), got <actual>"
]
},
+ "WKB_PARSE_ERROR" : {
+ "message" : [
+ "Error parsing WKB: <parseError> at position <pos>"
+ ],
+ "sqlState" : "22023"
+ },
"WRONG_NUM_ARGS_FOR_HIGHER_ORDER_FUNCTION": {
"message": [
"Function `<func_name>` should take between 1 and 3 arguments, but the
provided function takes <num_args>."
diff --git a/python/pyspark/sql/tests/test_functions.py
b/python/pyspark/sql/tests/test_functions.py
index 9313cd34dc06..cdfd8cc24cd0 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -26,6 +26,7 @@ import re
import unittest
from pyspark.errors import PySparkTypeError, PySparkValueError,
SparkRuntimeException
+from pyspark.errors.exceptions.base import IllegalArgumentException
from pyspark.sql import Row, Window, functions as F, types
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.column import Column
@@ -3798,6 +3799,25 @@ class FunctionsTestsMixin:
)
self.assertEqual(results, [expected])
+ def test_st_geogfromwkb(self):
+ df = self.spark.createDataFrame(
+ [(bytes.fromhex("0101000000000000000000F03F0000000000000040"),)],
+ ["wkb"],
+ )
+ results = df.select(
+ F.hex(F.st_asbinary(F.st_geogfromwkb("wkb"))),
+ ).collect()
+ expected = Row(
+ "0101000000000000000000F03F0000000000000040",
+ )
+ self.assertEqual(results, [expected])
+ # ST_GeogFromWKB with invalid WKB.
+ df = self.spark.createDataFrame([(bytearray(b"\x6f"),)], ["wkb"])
+ with self.assertRaises(IllegalArgumentException) as error_context:
+ df.select(F.st_geogfromwkb("wkb")).collect()
+ self.assertIn("[WKB_PARSE_ERROR]", str(error_context.exception))
+ self.assertIn("Unexpected end of WKB buffer",
str(error_context.exception))
+
def test_st_geomfromwkb(self):
df = self.spark.createDataFrame(
[(bytes.fromhex("0101000000000000000000F03F0000000000000040"),
4326)],
@@ -3814,6 +3834,12 @@ class FunctionsTestsMixin:
"0101000000000000000000F03F0000000000000040",
)
self.assertEqual(results, [expected])
+ # ST_GeomFromWKB with invalid WKB.
+ df = self.spark.createDataFrame([(bytearray(b"\x6f"),)], ["wkb"])
+ with self.assertRaises(IllegalArgumentException) as error_context:
+ df.select(F.st_geomfromwkb("wkb")).collect()
+ self.assertIn("[WKB_PARSE_ERROR]", str(error_context.exception))
+ self.assertIn("Unexpected end of WKB buffer",
str(error_context.exception))
def test_st_setsrid(self):
df = self.spark.createDataFrame(
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java
index 445e30af15b8..f446d71d5a5b 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java
@@ -17,8 +17,10 @@
package org.apache.spark.sql.catalyst.util;
import org.apache.spark.sql.catalyst.util.geo.GeometryModel;
+import org.apache.spark.sql.catalyst.util.geo.WkbParseException;
import org.apache.spark.sql.catalyst.util.geo.WkbReader;
import org.apache.spark.sql.catalyst.util.geo.WkbWriter;
+import org.apache.spark.sql.errors.QueryExecutionErrors;
import org.apache.spark.unsafe.types.GeographyVal;
import java.nio.ByteBuffer;
@@ -81,13 +83,17 @@ public final class Geography implements Geo {
// Returns a Geography object with the specified SRID value by parsing the
input WKB.
public static Geography fromWkb(byte[] wkb, int srid) {
- WkbReader reader = new WkbReader(true);
- reader.read(wkb); // Validate WKB with geography coordinate bounds.
-
- byte[] bytes = new byte[HEADER_SIZE + wkb.length];
- ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid);
- System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length);
- return fromBytes(bytes);
+ try {
+ WkbReader reader = new WkbReader(true);
+ reader.read(wkb); // Validate WKB with geography coordinate bounds.
+
+ byte[] bytes = new byte[HEADER_SIZE + wkb.length];
+ ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid);
+ System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length);
+ return fromBytes(bytes);
+ } catch (WkbParseException e) {
+ throw QueryExecutionErrors.wkbParseError(e.getParseError(),
e.getPosition());
+ }
}
// Overload for the WKB reader where we use the default SRID for Geography.
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java
index 757c63f421e7..58be589b86ff 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java
@@ -17,8 +17,10 @@
package org.apache.spark.sql.catalyst.util;
import org.apache.spark.sql.catalyst.util.geo.GeometryModel;
+import org.apache.spark.sql.catalyst.util.geo.WkbParseException;
import org.apache.spark.sql.catalyst.util.geo.WkbReader;
import org.apache.spark.sql.catalyst.util.geo.WkbWriter;
+import org.apache.spark.sql.errors.QueryExecutionErrors;
import org.apache.spark.unsafe.types.GeometryVal;
import java.nio.ByteBuffer;
@@ -81,13 +83,17 @@ public final class Geometry implements Geo {
// Returns a Geometry object with the specified SRID value by parsing the
input WKB.
public static Geometry fromWkb(byte[] wkb, int srid) {
- WkbReader reader = new WkbReader();
- reader.read(wkb); // Validate WKB
-
- byte[] bytes = new byte[HEADER_SIZE + wkb.length];
- ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid);
- System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length);
- return fromBytes(bytes);
+ try {
+ WkbReader reader = new WkbReader();
+ reader.read(wkb); // Validate WKB
+
+ byte[] bytes = new byte[HEADER_SIZE + wkb.length];
+ ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid);
+ System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length);
+ return fromBytes(bytes);
+ } catch (WkbParseException e) {
+ throw QueryExecutionErrors.wkbParseError(e.getParseError(),
e.getPosition());
+ }
}
// Overload for the WKB reader where we use the default SRID for Geometry.
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbParseException.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbParseException.java
index 3ea8e95e7cc4..b1d1b4dfba1a 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbParseException.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbParseException.java
@@ -19,40 +19,27 @@ package org.apache.spark.sql.catalyst.util.geo;
/**
* Exception thrown when parsing WKB data fails.
*/
-class WkbParseException extends RuntimeException {
+public class WkbParseException extends RuntimeException {
+ private final String parseError;
private final long position;
- private final String wkbString;
+ private final byte[] wkb;
- WkbParseException(String message, long position, byte[] wkb) {
- super(formatMessage(message, position, wkb));
+ WkbParseException(String parseError, long position, byte[] wkb) {
+ super();
+ this.parseError = parseError;
this.position = position;
- this.wkbString = wkb != null ? bytesToHex(wkb) : "";
+ this.wkb = wkb;
}
- private static String formatMessage(String message, long position, byte[]
wkb) {
- String baseMessage = message + " at position " + position;
- if (wkb != null && wkb.length > 0) {
- baseMessage += " in WKB: " + bytesToHex(wkb);
- }
- return baseMessage;
+ public String getParseError() {
+ return parseError;
}
- private static String bytesToHex(byte[] bytes) {
- if (bytes == null || bytes.length == 0) {
- return "";
- }
- StringBuilder sb = new StringBuilder(bytes.length * 2);
- for (byte b : bytes) {
- sb.append(String.format("%02X", b));
- }
- return sb.toString();
- }
-
- long getPosition() {
+ public long getPosition() {
return position;
}
- String getWkbString() {
- return wkbString;
+ public byte[] getWkb() {
+ return wkb;
}
}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java
index e0b8b543300b..9546ec7cf184 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java
@@ -222,7 +222,7 @@ public class WkbReader {
// Check that we have enough bytes for header (endianness byte + 4-byte
type)
if (currentWkb.length < WkbUtil.BYTE_SIZE + WkbUtil.TYPE_SIZE) {
- throw new WkbParseException("WKB data too short", 0, currentWkb);
+ throw new WkbParseException("Unexpected end of WKB buffer", 0,
currentWkb);
}
// Create buffer wrapping the entire byte array
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 4e7cfa653389..8985bdb519d1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -676,6 +676,15 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
stInvalidSridValueError(srid.toString)
}
+ def wkbParseError(msg: String, pos: String): SparkIllegalArgumentException =
{
+ new SparkIllegalArgumentException(errorClass = "WKB_PARSE_ERROR",
+ messageParameters = Map("parseError" -> msg, "pos" -> pos))
+ }
+
+ def wkbParseError(msg: String, pos: Long): SparkIllegalArgumentException = {
+ wkbParseError(msg, pos.toString)
+ }
+
def withSuggestionIntervalArithmeticOverflowError(
suggestedFunc: String,
context: QueryContext): ArithmeticException = {
diff --git
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbErrorHandlingTest.java
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbErrorHandlingTest.java
index 660775a1adc8..f66bc50daf98 100644
---
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbErrorHandlingTest.java
+++
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbErrorHandlingTest.java
@@ -46,13 +46,12 @@ public class WkbErrorHandlingTest extends WkbTestBase {
WkbParseException ex = Assertions.assertThrows(
WkbParseException.class, () -> reader.read(wkb),
"Should throw WkbParseException for WKB: " + hex);
-
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
- "Exception message should contain the WKB hex: " + hex + ", actual: "
+ ex.getMessage());
+ Assertions.assertSame(wkb, ex.getWkb());
if (expectedMessagePart != null && !expectedMessagePart.isEmpty()) {
Assertions.assertTrue(
-
ex.getMessage().toLowerCase().contains(expectedMessagePart.toLowerCase()),
+
ex.getParseError().toLowerCase().contains(expectedMessagePart.toLowerCase()),
"Exception message should contain '" + expectedMessagePart + "',
actual: " +
- ex.getMessage());
+ ex.getParseError());
}
}
@@ -63,69 +62,60 @@ public class WkbErrorHandlingTest extends WkbTestBase {
WkbParseException ex = Assertions.assertThrows(
WkbParseException.class, () -> reader.read(emptyWkb));
// Empty WKB produces empty hex string, so just verify exception was thrown
- Assertions.assertNotNull(ex.getMessage());
+ Assertions.assertNotNull(ex.getParseError());
}
@Test
public void testTooShortWkb() {
// Only endianness byte
- String hex = "01";
- byte[] tooShort = hexToBytes(hex);
+ byte[] tooShort = hexToBytes("01");
WkbReader reader = new WkbReader();
WkbParseException ex = Assertions.assertThrows(
WkbParseException.class, () -> reader.read(tooShort));
-
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
- "Exception message should contain the WKB hex: " + hex);
+ Assertions.assertSame(tooShort, ex.getWkb());
}
@Test
public void testInvalidGeometryTypeZero() {
// Type = 0 (invalid, should be 1-7)
- String hex = "0100000000000000000000F03F0000000000000040";
- byte[] invalidType = hexToBytes(hex);
+ byte[] invalidType =
hexToBytes("0100000000000000000000F03F0000000000000040");
WkbReader reader = new WkbReader();
WkbParseException ex = Assertions.assertThrows(
WkbParseException.class, () -> reader.read(invalidType));
-
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
- "Exception message should contain the WKB hex: " + hex);
+ Assertions.assertSame(invalidType, ex.getWkb());
}
@Test
public void testTruncatedPointCoordinates() {
// Point WKB with truncated coordinates (missing Y coordinate)
- String hex = "0101000000000000000000F03F";
- byte[] truncated = hexToBytes(hex);
+ byte[] truncated = hexToBytes("0101000000000000000000F03F");
WkbReader reader = new WkbReader();
WkbParseException ex = Assertions.assertThrows(
WkbParseException.class, () -> reader.read(truncated));
-
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
- "Exception message should contain the WKB hex: " + hex);
+ Assertions.assertSame(truncated, ex.getWkb());
}
@Test
public void testTruncatedByte() {
// Only one byte (FF) of the 4-byte INT field.
- String hex = "0102000000ff";
- byte[] truncated = hexToBytes(hex);
+ byte[] truncated = hexToBytes("0102000000ff");
WkbReader reader = new WkbReader();
WkbParseException ex = Assertions.assertThrows(
WkbParseException.class, () -> reader.read(truncated));
-
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
- "Exception message should contain the WKB hex: " + hex);
+ Assertions.assertSame(truncated, ex.getWkb());
}
@Test
public void testTruncatedLineString() {
// LineString with declared 2 points but only 1 provided
- String hex = "010200000002000000" + // LineString with 2 points
- "0000000000000000" + // X of first point
- "0000000000000000"; // Y of first point (missing second
point)
- byte[] truncated = hexToBytes(hex);
+ byte[] truncated = hexToBytes(
+ "010200000002000000" + // LineString with 2 points
+ "0000000000000000" + // X of first point
+ "0000000000000000"); // Y of first point (missing second point)
WkbReader reader = new WkbReader();
WkbParseException ex = Assertions.assertThrows(
WkbParseException.class, () -> reader.read(truncated));
-
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
- "Exception message should contain the WKB hex: " + hex);
+ Assertions.assertSame(truncated, ex.getWkb());
}
@Test
@@ -164,8 +154,7 @@ public class WkbErrorHandlingTest extends WkbTestBase {
WkbParseException ex = Assertions.assertThrows(
WkbParseException.class, () -> reader.read(invalidPolygon));
-
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
- "Exception message should contain the WKB hex: " + hex);
+ Assertions.assertSame(invalidPolygon, ex.getWkb());
}
@Test
@@ -174,20 +163,19 @@ public class WkbErrorHandlingTest extends WkbTestBase {
WkbReader reader = new WkbReader(1);
// Polygon with ring where first and last points don't match
- String hex = "01" + // Little endian
+ byte[] nonClosedRing = hexToBytes(
+ "01" + // Little endian
"03000000" + // Polygon type
"01000000" + // 1 ring
"04000000" + // 4 points
"0000000000000000" + "0000000000000000" + // (0, 0)
"000000000000F03F" + "0000000000000000" + // (1, 0)
"000000000000F03F" + "000000000000F03F" + // (1, 1)
- "0000000000000040" + "0000000000000040"; // (2, 2) - doesn't
match first point!
- byte[] nonClosedRing = hexToBytes(hex);
+ "0000000000000040" + "0000000000000040"); // (2, 2) - doesn't
match first point!
WkbParseException ex = Assertions.assertThrows(
WkbParseException.class, () -> reader.read(nonClosedRing));
-
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
- "Exception message should contain the WKB hex: " + hex);
+ Assertions.assertSame(nonClosedRing, ex.getWkb());
}
@Test
@@ -197,7 +185,7 @@ public class WkbErrorHandlingTest extends WkbTestBase {
WkbParseException.class, () -> reader.read(null),
"Should throw WKBParseException for null byte array");
// Null WKB cannot produce hex string, just verify exception was thrown
- Assertions.assertNotNull(ex.getMessage());
+ Assertions.assertNotNull(ex.getParseError());
}
// ========== Invalid Byte Order Tests ==========
diff --git
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbGeographyTest.java
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbGeographyTest.java
index 5c9412519216..04e98ac8da9c 100644
---
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbGeographyTest.java
+++
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbGeographyTest.java
@@ -629,7 +629,7 @@ public class WkbGeographyTest extends WkbTestBase {
byte[] wkb = makePointWkb2D(200.0, 0.0);
WkbParseException ex = Assertions.assertThrows(
WkbParseException.class, () -> geographyReader1().read(wkb));
- String msg = ex.getMessage();
+ String msg = ex.getParseError();
Assertions.assertTrue(msg.contains("Invalid coordinate value"));
}
}
diff --git
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbReaderWriterAdvancedTest.java
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbReaderWriterAdvancedTest.java
index 98bec81ca011..c02f1f196e38 100644
---
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbReaderWriterAdvancedTest.java
+++
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbReaderWriterAdvancedTest.java
@@ -55,8 +55,7 @@ public class WkbReaderWriterAdvancedTest extends WkbTestBase {
if (expectedValidateError) {
WkbParseException ex = Assertions.assertThrows(
WkbParseException.class, () -> validateReader.read(wkbLittle, 0));
-
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(wkbHexLittle.toUpperCase()),
- "Exception message should contain the WKB hex: " + wkbHexLittle);
+ Assertions.assertSame(wkbLittle, ex.getWkb());
geomLittle = noValidateReader.read(wkbLittle, 0);
} else {
geomLittle = validateReader.read(wkbLittle, 0);
@@ -73,8 +72,7 @@ public class WkbReaderWriterAdvancedTest extends WkbTestBase {
if (expectedValidateError) {
WkbParseException ex = Assertions.assertThrows(
WkbParseException.class, () -> validateReader.read(wkbBig, 0));
-
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(wkbHexBig.toUpperCase()),
- "Exception message should contain the WKB hex: " + wkbHexBig);
+ Assertions.assertSame(wkbBig, ex.getWkb());
geomBig = noValidateReader.read(wkbBig, 0);
} else {
geomBig = validateReader.read(wkbBig, 0);
@@ -1198,7 +1196,7 @@ public class WkbReaderWriterAdvancedTest extends
WkbTestBase {
WkbReader geographyReader = new WkbReader(true);
WkbParseException ex = Assertions.assertThrows(
WkbParseException.class, () -> geographyReader.read(wkb, 0));
- Assertions.assertTrue(ex.getMessage().contains("Invalid coordinate
value"));
+ Assertions.assertTrue(ex.getParseError().contains("Invalid coordinate
value"));
// Geography mode without validation should accept non-geographic
coordinate values.
WkbReader noValidateGeographyReader = new WkbReader(0, true);
Assertions.assertDoesNotThrow(() -> noValidateGeographyReader.read(wkb,
0));
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java
index d7c40047977a..fece3e36bc59 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.util;
+import org.apache.spark.SparkIllegalArgumentException;
import org.apache.spark.unsafe.types.GeographyVal;
import org.junit.jupiter.api.Test;
@@ -113,6 +114,17 @@ class GeographyExecutionSuite {
assertEquals(4326, geography.srid());
}
+ @Test
+ void testFromWkbInvalidWkb() {
+ byte[] invalidWkb = new byte[]{111};
+ SparkIllegalArgumentException exception = assertThrows(
+ SparkIllegalArgumentException.class,
+ () -> Geometry.fromWkb(invalidWkb)
+ );
+ assertEquals("WKB_PARSE_ERROR", exception.getCondition());
+ assertTrue(exception.getMessage().contains("Unexpected end of WKB
buffer"));
+ }
+
/** Tests for Geography EWKB parsing. */
@Test
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeometryExecutionSuite.java
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeometryExecutionSuite.java
index 617ef7752a05..5d0b11e969ad 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeometryExecutionSuite.java
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeometryExecutionSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.util;
+import org.apache.spark.SparkIllegalArgumentException;
import org.apache.spark.unsafe.types.GeometryVal;
import org.junit.jupiter.api.Test;
@@ -127,6 +128,17 @@ class GeometryExecutionSuite {
assertEquals(0, geometry.srid());
}
+ @Test
+ void testFromWkbInvalidWkb() {
+ byte[] invalidWkb = new byte[]{111};
+ SparkIllegalArgumentException exception = assertThrows(
+ SparkIllegalArgumentException.class,
+ () -> Geometry.fromWkb(invalidWkb)
+ );
+ assertEquals("WKB_PARSE_ERROR", exception.getCondition());
+ assertTrue(exception.getMessage().contains("Unexpected end of WKB
buffer"));
+ }
+
/** Tests for Geometry EWKB parsing. */
@Test
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/st-functions.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/st-functions.sql.out
index 8391c4800d8b..7566e69392fd 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/st-functions.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/st-functions.sql.out
@@ -466,6 +466,45 @@ Project
[hex(st_asbinary(st_geomfromwkb(0x0101000000000000000000F03F000000000000
+- OneRowRelation
+-- !query
+SELECT ST_AsBinary(ST_GeogFromWKB(NULL))
+-- !query analysis
+Project [st_asbinary(st_geogfromwkb(cast(null as binary))) AS
st_asbinary(st_geogfromwkb(NULL))#x]
++- OneRowRelation
+
+
+-- !query
+SELECT
hex(ST_AsBinary(ST_GeogFromWKB(X'0101000000000000000000F03F0000000000000040')))
+-- !query analysis
+Project
[hex(st_asbinary(st_geogfromwkb(0x0101000000000000000000F03F0000000000000040)))
AS
hex(st_asbinary(st_geogfromwkb(X'0101000000000000000000F03F0000000000000040')))#x]
++- OneRowRelation
+
+
+-- !query
+SELECT ST_GeogFromWKB(X'6F')
+-- !query analysis
+Project [st_geogfromwkb(0x6F) AS st_geogfromwkb(X'6F')#x]
++- OneRowRelation
+
+
+-- !query
+SELECT COUNT(*) FROM geodata WHERE ST_GeogFromWKB(wkb) IS NULL AND wkb IS NOT
NULL
+-- !query analysis
+Aggregate [count(1) AS count(1)#xL]
++- Filter (isnull(st_geogfromwkb(wkb#x)) AND isnotnull(wkb#x))
+ +- SubqueryAlias spark_catalog.default.geodata
+ +- Relation spark_catalog.default.geodata[wkb#x] parquet
+
+
+-- !query
+SELECT COUNT(*) FROM geodata WHERE ST_AsBinary(ST_GeogFromWKB(wkb)) <> wkb
+-- !query analysis
+Aggregate [count(1) AS count(1)#xL]
++- Filter NOT (st_asbinary(st_geogfromwkb(wkb#x)) = wkb#x)
+ +- SubqueryAlias spark_catalog.default.geodata
+ +- Relation spark_catalog.default.geodata[wkb#x] parquet
+
+
-- !query
SELECT ST_AsBinary(ST_GeomFromWKB(NULL))
-- !query analysis
@@ -527,6 +566,20 @@ org.apache.spark.SparkIllegalArgumentException
}
+-- !query
+SELECT ST_GeomFromWKB(X'6F')
+-- !query analysis
+Project [st_geomfromwkb(0x6F, 0) AS st_geomfromwkb(X'6F', 0)#x]
++- OneRowRelation
+
+
+-- !query
+SELECT ST_GeomFromWKB(X'6F', 4326)
+-- !query analysis
+Project [st_geomfromwkb(0x6F, 4326) AS st_geomfromwkb(X'6F', 4326)#x]
++- OneRowRelation
+
+
-- !query
SELECT COUNT(*) FROM geodata WHERE ST_GeomFromWKB(wkb) IS NULL AND wkb IS NOT
NULL
-- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/st-functions.sql
b/sql/core/src/test/resources/sql-tests/inputs/st-functions.sql
index c12b79fb2869..f3543837ddfc 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/st-functions.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/st-functions.sql
@@ -95,6 +95,18 @@ SELECT typeof(IF(wkb IS NOT NULL,
ST_GeomFromWKB(wkb)::GEOMETRY(ANY), ST_GeomFro
SELECT
hex(ST_AsBinary(ST_GeogFromWKB(X'0101000000000000000000f03f0000000000000040')))
AS result;
SELECT
hex(ST_AsBinary(ST_GeomFromWKB(X'0101000000000000000000f03f0000000000000040')))
AS result;
+---- ST_GeogFromWKB
+
+-- 1. Driver-level queries.
+SELECT ST_AsBinary(ST_GeogFromWKB(NULL));
+SELECT
hex(ST_AsBinary(ST_GeogFromWKB(X'0101000000000000000000F03F0000000000000040')));
+-- Error handling: invalid WKB.
+SELECT ST_GeogFromWKB(X'6F');
+
+-- 2. Table-level queries.
+SELECT COUNT(*) FROM geodata WHERE ST_GeogFromWKB(wkb) IS NULL AND wkb IS NOT
NULL;
+SELECT COUNT(*) FROM geodata WHERE ST_AsBinary(ST_GeogFromWKB(wkb)) <> wkb;
+
---- ST_GeomFromWKB
-- 1. Driver-level queries.
@@ -106,6 +118,9 @@ SELECT
hex(ST_AsBinary(ST_GeomFromWKB(X'0101000000000000000000F03F00000000000000
-- Error handling: invalid SRID.
SELECT ST_GeomFromWKB(X'0101000000000000000000F03F0000000000000040', -1);
SELECT ST_GeomFromWKB(X'0101000000000000000000F03F0000000000000040', 9999);
+-- Error handling: invalid WKB.
+SELECT ST_GeomFromWKB(X'6F');
+SELECT ST_GeomFromWKB(X'6F', 4326);
-- 2. Table-level queries.
SELECT COUNT(*) FROM geodata WHERE ST_GeomFromWKB(wkb) IS NULL AND wkb IS NOT
NULL;
diff --git a/sql/core/src/test/resources/sql-tests/results/st-functions.sql.out
b/sql/core/src/test/resources/sql-tests/results/st-functions.sql.out
index c4b0fbeec5be..03f5728d7d3a 100644
--- a/sql/core/src/test/resources/sql-tests/results/st-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/st-functions.sql.out
@@ -523,6 +523,54 @@ struct<result:string>
0101000000000000000000F03F0000000000000040
+-- !query
+SELECT ST_AsBinary(ST_GeogFromWKB(NULL))
+-- !query schema
+struct<st_asbinary(st_geogfromwkb(NULL)):binary>
+-- !query output
+NULL
+
+
+-- !query
+SELECT
hex(ST_AsBinary(ST_GeogFromWKB(X'0101000000000000000000F03F0000000000000040')))
+-- !query schema
+struct<hex(st_asbinary(st_geogfromwkb(X'0101000000000000000000F03F0000000000000040'))):string>
+-- !query output
+0101000000000000000000F03F0000000000000040
+
+
+-- !query
+SELECT ST_GeogFromWKB(X'6F')
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkIllegalArgumentException
+{
+ "errorClass" : "WKB_PARSE_ERROR",
+ "sqlState" : "22023",
+ "messageParameters" : {
+ "parseError" : "Unexpected end of WKB buffer",
+ "pos" : "0"
+ }
+}
+
+
+-- !query
+SELECT COUNT(*) FROM geodata WHERE ST_GeogFromWKB(wkb) IS NULL AND wkb IS NOT
NULL
+-- !query schema
+struct<count(1):bigint>
+-- !query output
+0
+
+
+-- !query
+SELECT COUNT(*) FROM geodata WHERE ST_AsBinary(ST_GeogFromWKB(wkb)) <> wkb
+-- !query schema
+struct<count(1):bigint>
+-- !query output
+0
+
+
-- !query
SELECT ST_AsBinary(ST_GeomFromWKB(NULL))
-- !query schema
@@ -593,6 +641,38 @@ org.apache.spark.SparkIllegalArgumentException
}
+-- !query
+SELECT ST_GeomFromWKB(X'6F')
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkIllegalArgumentException
+{
+ "errorClass" : "WKB_PARSE_ERROR",
+ "sqlState" : "22023",
+ "messageParameters" : {
+ "parseError" : "Unexpected end of WKB buffer",
+ "pos" : "0"
+ }
+}
+
+
+-- !query
+SELECT ST_GeomFromWKB(X'6F', 4326)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkIllegalArgumentException
+{
+ "errorClass" : "WKB_PARSE_ERROR",
+ "sqlState" : "22023",
+ "messageParameters" : {
+ "parseError" : "Unexpected end of WKB buffer",
+ "pos" : "0"
+ }
+}
+
+
-- !query
SELECT COUNT(*) FROM geodata WHERE ST_GeomFromWKB(wkb) IS NULL AND wkb IS NOT
NULL
-- !query schema
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala
index 33274c05b899..4716818e5e36 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala
@@ -488,6 +488,31 @@ class STExpressionsSuite
checkEvaluation(ST_AsBinary(geometryExpression), wkb)
}
+ test("ST_GeogFromWKB - expressions") {
+ // Test data: WKB representation of POINT(1 2).
+ val wkb =
Hex.unhex("0101000000000000000000F03F0000000000000040".getBytes())
+ val wkbLiteral = Literal.create(wkb, BinaryType)
+ // ST_GeogFromWKB with default SRID.
+ val geographyExpression = ST_GeogFromWKB(wkbLiteral)
+ assert(geographyExpression.dataType.sameType(defaultGeographyType))
+ checkEvaluation(ST_AsBinary(geographyExpression), wkb)
+ checkEvaluation(ST_Srid(geographyExpression), defaultGeographySrid)
+ // ST_GeogFromWKB with NULL input.
+ val nullLiteral = Literal.create(null, BinaryType)
+ val geographyExpressionNull = ST_GeogFromWKB(nullLiteral)
+ checkEvaluation(geographyExpressionNull, null)
+ // ST_GeogFromWKB with invalid WKB.
+ val invalidWkbLiteral = Literal.create(Array[Byte](111), BinaryType)
+ val geographyExpressionInvalidWkb = ST_GeogFromWKB(invalidWkbLiteral)
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ geographyExpressionInvalidWkb.eval()
+ },
+ condition = "WKB_PARSE_ERROR",
+ parameters = Map("parseError" -> "Unexpected end of WKB buffer", "pos"
-> "0")
+ )
+ }
+
test("ST_GeomFromWKB - expressions") {
// Test data: WKB representation of POINT(1 2).
val wkb =
Hex.unhex("0101000000000000000000F03F0000000000000040".getBytes())
@@ -513,6 +538,16 @@ class STExpressionsSuite
condition = "ST_INVALID_SRID_VALUE",
parameters = Map("srid" -> s"$invalidSrid")
)
+ // ST_GeomFromWKB with invalid WKB.
+ val invalidWkbLiteral = Literal.create(Array[Byte](111), BinaryType)
+ val geometryExpressionInvalidWkb = new ST_GeomFromWKB(invalidWkbLiteral)
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ geometryExpressionInvalidWkb.eval()
+ },
+ condition = "WKB_PARSE_ERROR",
+ parameters = Map("parseError" -> "Unexpected end of WKB buffer", "pos"
-> "0")
+ )
}
test("ST_GeomFromWKB - columns") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/STFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/STFunctionsSuite.scala
index dbde688de264..1f52f2c0b9f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/STFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/STFunctionsSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql
+import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -43,6 +44,27 @@ class STFunctionsSuite extends QueryTest with
SharedSparkSession {
"0101000000000000000000f03f0000000000000040"))
}
+ test("st_geogfromwkb") {
+ // Test data: Well-Known Binary (WKB) representations.
+ val df = Seq[(String)](
+ (
+ "0101000000000000000000f03f0000000000000040"
+ )).toDF("wkb")
+ // ST_GeogFromWKB.
+ checkAnswer(
+
df.select(lower(hex(st_asbinary(st_geogfromwkb(unhex($"wkb"))))).as("col0")),
+ Row("0101000000000000000000f03f0000000000000040"))
+ // ST_GeogFromWKB with invalid WKB.
+ val df_invalid = Seq(Array[Byte](111)).toDF("wkb")
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ df_invalid.select(st_geogfromwkb($"wkb")).collect()
+ },
+ condition = "WKB_PARSE_ERROR",
+ parameters = Map("parseError" -> "Unexpected end of WKB buffer", "pos"
-> "0")
+ )
+ }
+
test("st_geomfromwkb") {
// Test data: Well-Known Binary (WKB) representations.
val df = Seq[(String, Int)](
@@ -59,6 +81,23 @@ class STFunctionsSuite extends QueryTest with
SharedSparkSession {
"0101000000000000000000f03f0000000000000040",
"0101000000000000000000f03f0000000000000040",
"0101000000000000000000f03f0000000000000040"))
+ // ST_GeomFromWKB with invalid SRID.
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ df.select(st_geomfromwkb(unhex($"wkb"), lit(-1))).collect()
+ },
+ condition = "ST_INVALID_SRID_VALUE",
+ parameters = Map("srid" -> "-1")
+ )
+ // ST_GeomFromWKB with invalid WKB.
+ val df_invalid = Seq(Array[Byte](111)).toDF("wkb")
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ df_invalid.select(st_geomfromwkb($"wkb")).collect()
+ },
+ condition = "WKB_PARSE_ERROR",
+ parameters = Map("parseError" -> "Unexpected end of WKB buffer", "pos"
-> "0")
+ )
}
/** ST accessor expressions. */
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]