This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e92ef557686d [SPARK-55640][GEO][SQL] Propagate WKB parsing errors for 
Geometry and Geography
e92ef557686d is described below

commit e92ef557686daf8961a52cfab141b803dde77413
Author: Uros Bojanic <[email protected]>
AuthorDate: Thu Feb 26 11:32:30 2026 +0800

    [SPARK-55640][GEO][SQL] Propagate WKB parsing errors for Geometry and 
Geography
    
    ### What changes were proposed in this pull request?
    WKB reader was implemented for Geometry and Geography, but only using 
internal exception handling. This PR addresses this by introducing proper 
user-facing error classes for WKB parsing.
    
    ### Why are the changes needed?
    Propagate the WKB parsing errors properly to the user.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, users now get proper errors for invalid WKB parsing.
    
    ### How was this patch tested?
    Added new unit tests and end-to-end SQL tests for WKB parsing.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #54424 from uros-db/geo-wkb-parse-exceptions.
    
    Authored-by: Uros Bojanic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  6 ++
 python/pyspark/errors/error-conditions.json        |  6 ++
 python/pyspark/sql/tests/test_functions.py         | 26 +++++++
 .../apache/spark/sql/catalyst/util/Geography.java  | 20 ++++--
 .../apache/spark/sql/catalyst/util/Geometry.java   | 20 ++++--
 .../sql/catalyst/util/geo/WkbParseException.java   | 37 ++++------
 .../spark/sql/catalyst/util/geo/WkbReader.java     |  2 +-
 .../spark/sql/errors/QueryExecutionErrors.scala    |  9 +++
 .../catalyst/util/geo/WkbErrorHandlingTest.java    | 58 +++++++---------
 .../sql/catalyst/util/geo/WkbGeographyTest.java    |  2 +-
 .../util/geo/WkbReaderWriterAdvancedTest.java      |  8 +--
 .../sql/catalyst/util/GeographyExecutionSuite.java | 12 ++++
 .../sql/catalyst/util/GeometryExecutionSuite.java  | 12 ++++
 .../analyzer-results/st-functions.sql.out          | 53 ++++++++++++++
 .../resources/sql-tests/inputs/st-functions.sql    | 15 ++++
 .../sql-tests/results/st-functions.sql.out         | 80 ++++++++++++++++++++++
 .../org/apache/spark/sql/STExpressionsSuite.scala  | 35 ++++++++++
 .../org/apache/spark/sql/STFunctionsSuite.scala    | 39 +++++++++++
 18 files changed, 359 insertions(+), 81 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index dc6200095444..94111b8e9ee0 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -7754,6 +7754,12 @@
     ],
     "sqlState" : "42601"
   },
+  "WKB_PARSE_ERROR" : {
+    "message" : [
+      "Error parsing WKB: <parseError> at position <pos>"
+    ],
+    "sqlState" : "22023"
+  },
   "WRITE_STREAM_NOT_ALLOWED" : {
     "message" : [
       "`writeStream` can be called only on streaming Dataset/DataFrame."
diff --git a/python/pyspark/errors/error-conditions.json 
b/python/pyspark/errors/error-conditions.json
index 94ede56da5e1..b0cf4c084e20 100644
--- a/python/pyspark/errors/error-conditions.json
+++ b/python/pyspark/errors/error-conditions.json
@@ -1488,6 +1488,12 @@
       "Value for `<arg_name>` must be between <lower_bound> and <upper_bound> 
(inclusive), got <actual>"
     ]
   },
+  "WKB_PARSE_ERROR" : {
+    "message" : [
+      "Error parsing WKB: <parseError> at position <pos>"
+    ],
+    "sqlState" : "22023"
+  },
   "WRONG_NUM_ARGS_FOR_HIGHER_ORDER_FUNCTION": {
     "message": [
       "Function `<func_name>` should take between 1 and 3 arguments, but the 
provided function takes <num_args>."
diff --git a/python/pyspark/sql/tests/test_functions.py 
b/python/pyspark/sql/tests/test_functions.py
index 9313cd34dc06..cdfd8cc24cd0 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -26,6 +26,7 @@ import re
 import unittest
 
 from pyspark.errors import PySparkTypeError, PySparkValueError, 
SparkRuntimeException
+from pyspark.errors.exceptions.base import IllegalArgumentException
 from pyspark.sql import Row, Window, functions as F, types
 from pyspark.sql.avro.functions import from_avro, to_avro
 from pyspark.sql.column import Column
@@ -3798,6 +3799,25 @@ class FunctionsTestsMixin:
         )
         self.assertEqual(results, [expected])
 
+    def test_st_geogfromwkb(self):
+        df = self.spark.createDataFrame(
+            [(bytes.fromhex("0101000000000000000000F03F0000000000000040"),)],
+            ["wkb"],
+        )
+        results = df.select(
+            F.hex(F.st_asbinary(F.st_geogfromwkb("wkb"))),
+        ).collect()
+        expected = Row(
+            "0101000000000000000000F03F0000000000000040",
+        )
+        self.assertEqual(results, [expected])
+        # ST_GeogFromWKB with invalid WKB.
+        df = self.spark.createDataFrame([(bytearray(b"\x6f"),)], ["wkb"])
+        with self.assertRaises(IllegalArgumentException) as error_context:
+            df.select(F.st_geogfromwkb("wkb")).collect()
+        self.assertIn("[WKB_PARSE_ERROR]", str(error_context.exception))
+        self.assertIn("Unexpected end of WKB buffer", 
str(error_context.exception))
+
     def test_st_geomfromwkb(self):
         df = self.spark.createDataFrame(
             [(bytes.fromhex("0101000000000000000000F03F0000000000000040"), 
4326)],
@@ -3814,6 +3834,12 @@ class FunctionsTestsMixin:
             "0101000000000000000000F03F0000000000000040",
         )
         self.assertEqual(results, [expected])
+        # ST_GeomFromWKB with invalid WKB.
+        df = self.spark.createDataFrame([(bytearray(b"\x6f"),)], ["wkb"])
+        with self.assertRaises(IllegalArgumentException) as error_context:
+            df.select(F.st_geomfromwkb("wkb")).collect()
+        self.assertIn("[WKB_PARSE_ERROR]", str(error_context.exception))
+        self.assertIn("Unexpected end of WKB buffer", 
str(error_context.exception))
 
     def test_st_setsrid(self):
         df = self.spark.createDataFrame(
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java
index 445e30af15b8..f446d71d5a5b 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java
@@ -17,8 +17,10 @@
 package org.apache.spark.sql.catalyst.util;
 
 import org.apache.spark.sql.catalyst.util.geo.GeometryModel;
+import org.apache.spark.sql.catalyst.util.geo.WkbParseException;
 import org.apache.spark.sql.catalyst.util.geo.WkbReader;
 import org.apache.spark.sql.catalyst.util.geo.WkbWriter;
+import org.apache.spark.sql.errors.QueryExecutionErrors;
 import org.apache.spark.unsafe.types.GeographyVal;
 
 import java.nio.ByteBuffer;
@@ -81,13 +83,17 @@ public final class Geography implements Geo {
 
   // Returns a Geography object with the specified SRID value by parsing the 
input WKB.
   public static Geography fromWkb(byte[] wkb, int srid) {
-    WkbReader reader = new WkbReader(true);
-    reader.read(wkb); // Validate WKB with geography coordinate bounds.
-
-    byte[] bytes = new byte[HEADER_SIZE + wkb.length];
-    ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid);
-    System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length);
-    return fromBytes(bytes);
+    try {
+      WkbReader reader = new WkbReader(true);
+      reader.read(wkb); // Validate WKB with geography coordinate bounds.
+
+      byte[] bytes = new byte[HEADER_SIZE + wkb.length];
+      ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid);
+      System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length);
+      return fromBytes(bytes);
+    } catch (WkbParseException e) {
+      throw QueryExecutionErrors.wkbParseError(e.getParseError(), 
e.getPosition());
+    }
   }
 
   // Overload for the WKB reader where we use the default SRID for Geography.
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java
index 757c63f421e7..58be589b86ff 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java
@@ -17,8 +17,10 @@
 package org.apache.spark.sql.catalyst.util;
 
 import org.apache.spark.sql.catalyst.util.geo.GeometryModel;
+import org.apache.spark.sql.catalyst.util.geo.WkbParseException;
 import org.apache.spark.sql.catalyst.util.geo.WkbReader;
 import org.apache.spark.sql.catalyst.util.geo.WkbWriter;
+import org.apache.spark.sql.errors.QueryExecutionErrors;
 import org.apache.spark.unsafe.types.GeometryVal;
 
 import java.nio.ByteBuffer;
@@ -81,13 +83,17 @@ public final class Geometry implements Geo {
 
   // Returns a Geometry object with the specified SRID value by parsing the 
input WKB.
   public static Geometry fromWkb(byte[] wkb, int srid) {
-    WkbReader reader = new WkbReader();
-    reader.read(wkb); // Validate WKB
-
-    byte[] bytes = new byte[HEADER_SIZE + wkb.length];
-    ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid);
-    System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length);
-    return fromBytes(bytes);
+    try {
+      WkbReader reader = new WkbReader();
+      reader.read(wkb); // Validate WKB
+
+      byte[] bytes = new byte[HEADER_SIZE + wkb.length];
+      ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid);
+      System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length);
+      return fromBytes(bytes);
+    } catch (WkbParseException e) {
+      throw QueryExecutionErrors.wkbParseError(e.getParseError(), 
e.getPosition());
+    }
   }
 
   // Overload for the WKB reader where we use the default SRID for Geometry.
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbParseException.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbParseException.java
index 3ea8e95e7cc4..b1d1b4dfba1a 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbParseException.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbParseException.java
@@ -19,40 +19,27 @@ package org.apache.spark.sql.catalyst.util.geo;
 /**
  * Exception thrown when parsing WKB data fails.
  */
-class WkbParseException extends RuntimeException {
+public class WkbParseException extends RuntimeException {
+  private final String parseError;
   private final long position;
-  private final String wkbString;
+  private final byte[] wkb;
 
-  WkbParseException(String message, long position, byte[] wkb) {
-    super(formatMessage(message, position, wkb));
+  WkbParseException(String parseError, long position, byte[] wkb) {
+    super();
+    this.parseError = parseError;
     this.position = position;
-    this.wkbString = wkb != null ? bytesToHex(wkb) : "";
+    this.wkb = wkb;
   }
 
-  private static String formatMessage(String message, long position, byte[] 
wkb) {
-    String baseMessage = message + " at position " + position;
-    if (wkb != null && wkb.length > 0) {
-      baseMessage += " in WKB: " + bytesToHex(wkb);
-    }
-    return baseMessage;
+  public String getParseError() {
+    return parseError;
   }
 
-  private static String bytesToHex(byte[] bytes) {
-    if (bytes == null || bytes.length == 0) {
-      return "";
-    }
-    StringBuilder sb = new StringBuilder(bytes.length * 2);
-    for (byte b : bytes) {
-      sb.append(String.format("%02X", b));
-    }
-    return sb.toString();
-  }
-
-  long getPosition() {
+  public long getPosition() {
     return position;
   }
 
-  String getWkbString() {
-    return wkbString;
+  public byte[] getWkb() {
+    return wkb;
   }
 }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java
index e0b8b543300b..9546ec7cf184 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java
@@ -222,7 +222,7 @@ public class WkbReader {
 
     // Check that we have enough bytes for header (endianness byte + 4-byte 
type)
     if (currentWkb.length < WkbUtil.BYTE_SIZE + WkbUtil.TYPE_SIZE) {
-      throw new WkbParseException("WKB data too short", 0, currentWkb);
+      throw new WkbParseException("Unexpected end of WKB buffer", 0, 
currentWkb);
     }
 
     // Create buffer wrapping the entire byte array
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 4e7cfa653389..8985bdb519d1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -676,6 +676,15 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
     stInvalidSridValueError(srid.toString)
   }
 
+  def wkbParseError(msg: String, pos: String): SparkIllegalArgumentException = 
{
+    new SparkIllegalArgumentException(errorClass = "WKB_PARSE_ERROR",
+      messageParameters = Map("parseError" -> msg, "pos" -> pos))
+  }
+
+  def wkbParseError(msg: String, pos: Long): SparkIllegalArgumentException = {
+    wkbParseError(msg, pos.toString)
+  }
+
   def withSuggestionIntervalArithmeticOverflowError(
       suggestedFunc: String,
       context: QueryContext): ArithmeticException = {
diff --git 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbErrorHandlingTest.java
 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbErrorHandlingTest.java
index 660775a1adc8..f66bc50daf98 100644
--- 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbErrorHandlingTest.java
+++ 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbErrorHandlingTest.java
@@ -46,13 +46,12 @@ public class WkbErrorHandlingTest extends WkbTestBase {
     WkbParseException ex = Assertions.assertThrows(
         WkbParseException.class, () -> reader.read(wkb),
         "Should throw WkbParseException for WKB: " + hex);
-    
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
-        "Exception message should contain the WKB hex: " + hex + ", actual: " 
+ ex.getMessage());
+    Assertions.assertSame(wkb, ex.getWkb());
     if (expectedMessagePart != null && !expectedMessagePart.isEmpty()) {
       Assertions.assertTrue(
-          
ex.getMessage().toLowerCase().contains(expectedMessagePart.toLowerCase()),
+          
ex.getParseError().toLowerCase().contains(expectedMessagePart.toLowerCase()),
           "Exception message should contain '" + expectedMessagePart + "', 
actual: " +
-              ex.getMessage());
+              ex.getParseError());
     }
   }
 
@@ -63,69 +62,60 @@ public class WkbErrorHandlingTest extends WkbTestBase {
     WkbParseException ex = Assertions.assertThrows(
       WkbParseException.class, () -> reader.read(emptyWkb));
     // Empty WKB produces empty hex string, so just verify exception was thrown
-    Assertions.assertNotNull(ex.getMessage());
+    Assertions.assertNotNull(ex.getParseError());
   }
 
   @Test
   public void testTooShortWkb() {
     // Only endianness byte
-    String hex = "01";
-    byte[] tooShort = hexToBytes(hex);
+    byte[] tooShort = hexToBytes("01");
     WkbReader reader = new WkbReader();
     WkbParseException ex = Assertions.assertThrows(
       WkbParseException.class, () -> reader.read(tooShort));
-    
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
-      "Exception message should contain the WKB hex: " + hex);
+    Assertions.assertSame(tooShort, ex.getWkb());
   }
 
   @Test
   public void testInvalidGeometryTypeZero() {
     // Type = 0 (invalid, should be 1-7)
-    String hex = "0100000000000000000000F03F0000000000000040";
-    byte[] invalidType = hexToBytes(hex);
+    byte[] invalidType = 
hexToBytes("0100000000000000000000F03F0000000000000040");
     WkbReader reader = new WkbReader();
     WkbParseException ex = Assertions.assertThrows(
       WkbParseException.class, () -> reader.read(invalidType));
-    
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
-      "Exception message should contain the WKB hex: " + hex);
+    Assertions.assertSame(invalidType, ex.getWkb());
   }
 
   @Test
   public void testTruncatedPointCoordinates() {
     // Point WKB with truncated coordinates (missing Y coordinate)
-    String hex = "0101000000000000000000F03F";
-    byte[] truncated = hexToBytes(hex);
+    byte[] truncated = hexToBytes("0101000000000000000000F03F");
     WkbReader reader = new WkbReader();
     WkbParseException ex = Assertions.assertThrows(
       WkbParseException.class, () -> reader.read(truncated));
-    
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
-      "Exception message should contain the WKB hex: " + hex);
+    Assertions.assertSame(truncated, ex.getWkb());
   }
 
   @Test
   public void testTruncatedByte() {
     // Only one byte (FF) of the 4-byte INT field.
-    String hex = "0102000000ff";
-    byte[] truncated = hexToBytes(hex);
+    byte[] truncated = hexToBytes("0102000000ff");
     WkbReader reader = new WkbReader();
     WkbParseException ex = Assertions.assertThrows(
       WkbParseException.class, () -> reader.read(truncated));
-    
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
-      "Exception message should contain the WKB hex: " + hex);
+    Assertions.assertSame(truncated, ex.getWkb());
   }
 
   @Test
   public void testTruncatedLineString() {
     // LineString with declared 2 points but only 1 provided
-    String hex = "010200000002000000" +  // LineString with 2 points
-      "0000000000000000" +              // X of first point
-      "0000000000000000";               // Y of first point (missing second 
point)
-    byte[] truncated = hexToBytes(hex);
+    byte[] truncated = hexToBytes(
+      "010200000002000000" +  // LineString with 2 points
+      "0000000000000000" +    // X of first point
+      "0000000000000000");    // Y of first point (missing second point)
     WkbReader reader = new WkbReader();
     WkbParseException ex = Assertions.assertThrows(
       WkbParseException.class, () -> reader.read(truncated));
-    
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
-      "Exception message should contain the WKB hex: " + hex);
+    Assertions.assertSame(truncated, ex.getWkb());
   }
 
   @Test
@@ -164,8 +154,7 @@ public class WkbErrorHandlingTest extends WkbTestBase {
 
     WkbParseException ex = Assertions.assertThrows(
       WkbParseException.class, () -> reader.read(invalidPolygon));
-    
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
-      "Exception message should contain the WKB hex: " + hex);
+    Assertions.assertSame(invalidPolygon, ex.getWkb());
   }
 
   @Test
@@ -174,20 +163,19 @@ public class WkbErrorHandlingTest extends WkbTestBase {
     WkbReader reader = new WkbReader(1);
 
     // Polygon with ring where first and last points don't match
-    String hex = "01" +                                      // Little endian
+    byte[] nonClosedRing = hexToBytes(
+      "01" +                                               // Little endian
       "03000000" +                                         // Polygon type
       "01000000" +                                         // 1 ring
       "04000000" +                                         // 4 points
       "0000000000000000" + "0000000000000000" +            // (0, 0)
       "000000000000F03F" + "0000000000000000" +            // (1, 0)
       "000000000000F03F" + "000000000000F03F" +            // (1, 1)
-      "0000000000000040" + "0000000000000040";             // (2, 2) - doesn't 
match first point!
-    byte[] nonClosedRing = hexToBytes(hex);
+      "0000000000000040" + "0000000000000040");            // (2, 2) - doesn't 
match first point!
 
     WkbParseException ex = Assertions.assertThrows(
       WkbParseException.class, () -> reader.read(nonClosedRing));
-    
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()),
-      "Exception message should contain the WKB hex: " + hex);
+    Assertions.assertSame(nonClosedRing, ex.getWkb());
   }
 
   @Test
@@ -197,7 +185,7 @@ public class WkbErrorHandlingTest extends WkbTestBase {
       WkbParseException.class, () -> reader.read(null),
       "Should throw WKBParseException for null byte array");
     // Null WKB cannot produce hex string, just verify exception was thrown
-    Assertions.assertNotNull(ex.getMessage());
+    Assertions.assertNotNull(ex.getParseError());
   }
 
   // ========== Invalid Byte Order Tests ==========
diff --git 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbGeographyTest.java
 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbGeographyTest.java
index 5c9412519216..04e98ac8da9c 100644
--- 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbGeographyTest.java
+++ 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbGeographyTest.java
@@ -629,7 +629,7 @@ public class WkbGeographyTest extends WkbTestBase {
     byte[] wkb = makePointWkb2D(200.0, 0.0);
     WkbParseException ex = Assertions.assertThrows(
         WkbParseException.class, () -> geographyReader1().read(wkb));
-    String msg = ex.getMessage();
+    String msg = ex.getParseError();
     Assertions.assertTrue(msg.contains("Invalid coordinate value"));
   }
 }
diff --git 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbReaderWriterAdvancedTest.java
 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbReaderWriterAdvancedTest.java
index 98bec81ca011..c02f1f196e38 100644
--- 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbReaderWriterAdvancedTest.java
+++ 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbReaderWriterAdvancedTest.java
@@ -55,8 +55,7 @@ public class WkbReaderWriterAdvancedTest extends WkbTestBase {
     if (expectedValidateError) {
       WkbParseException ex = Assertions.assertThrows(
         WkbParseException.class, () -> validateReader.read(wkbLittle, 0));
-      
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(wkbHexLittle.toUpperCase()),
-        "Exception message should contain the WKB hex: " + wkbHexLittle);
+      Assertions.assertSame(wkbLittle, ex.getWkb());
       geomLittle = noValidateReader.read(wkbLittle, 0);
     } else {
       geomLittle = validateReader.read(wkbLittle, 0);
@@ -73,8 +72,7 @@ public class WkbReaderWriterAdvancedTest extends WkbTestBase {
     if (expectedValidateError) {
       WkbParseException ex = Assertions.assertThrows(
         WkbParseException.class, () -> validateReader.read(wkbBig, 0));
-      
Assertions.assertTrue(ex.getMessage().toUpperCase().contains(wkbHexBig.toUpperCase()),
-        "Exception message should contain the WKB hex: " + wkbHexBig);
+      Assertions.assertSame(wkbBig, ex.getWkb());
       geomBig = noValidateReader.read(wkbBig, 0);
     } else {
       geomBig = validateReader.read(wkbBig, 0);
@@ -1198,7 +1196,7 @@ public class WkbReaderWriterAdvancedTest extends 
WkbTestBase {
       WkbReader geographyReader = new WkbReader(true);
       WkbParseException ex = Assertions.assertThrows(
           WkbParseException.class, () -> geographyReader.read(wkb, 0));
-      Assertions.assertTrue(ex.getMessage().contains("Invalid coordinate 
value"));
+      Assertions.assertTrue(ex.getParseError().contains("Invalid coordinate 
value"));
       // Geography mode without validation should accept non-geographic 
coordinate values.
       WkbReader noValidateGeographyReader = new WkbReader(0, true);
       Assertions.assertDoesNotThrow(() -> noValidateGeographyReader.read(wkb, 
0));
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java
index d7c40047977a..fece3e36bc59 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.util;
 
+import org.apache.spark.SparkIllegalArgumentException;
 import org.apache.spark.unsafe.types.GeographyVal;
 import org.junit.jupiter.api.Test;
 
@@ -113,6 +114,17 @@ class GeographyExecutionSuite {
     assertEquals(4326, geography.srid());
   }
 
+  @Test
+  void testFromWkbInvalidWkb() {
+    byte[] invalidWkb = new byte[]{111};
+    SparkIllegalArgumentException exception = assertThrows(
+      SparkIllegalArgumentException.class,
+      () -> Geometry.fromWkb(invalidWkb)
+    );
+    assertEquals("WKB_PARSE_ERROR", exception.getCondition());
+    assertTrue(exception.getMessage().contains("Unexpected end of WKB 
buffer"));
+  }
+
   /** Tests for Geography EWKB parsing. */
 
   @Test
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeometryExecutionSuite.java
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeometryExecutionSuite.java
index 617ef7752a05..5d0b11e969ad 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeometryExecutionSuite.java
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeometryExecutionSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.util;
 
+import org.apache.spark.SparkIllegalArgumentException;
 import org.apache.spark.unsafe.types.GeometryVal;
 import org.junit.jupiter.api.Test;
 
@@ -127,6 +128,17 @@ class GeometryExecutionSuite {
     assertEquals(0, geometry.srid());
   }
 
+  @Test
+  void testFromWkbInvalidWkb() {
+    byte[] invalidWkb = new byte[]{111};
+    SparkIllegalArgumentException exception = assertThrows(
+      SparkIllegalArgumentException.class,
+      () -> Geometry.fromWkb(invalidWkb)
+    );
+    assertEquals("WKB_PARSE_ERROR", exception.getCondition());
+    assertTrue(exception.getMessage().contains("Unexpected end of WKB 
buffer"));
+  }
+
   /** Tests for Geometry EWKB parsing. */
 
   @Test
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/st-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/st-functions.sql.out
index 8391c4800d8b..7566e69392fd 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/st-functions.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/st-functions.sql.out
@@ -466,6 +466,45 @@ Project 
[hex(st_asbinary(st_geomfromwkb(0x0101000000000000000000F03F000000000000
 +- OneRowRelation
 
 
+-- !query
+SELECT ST_AsBinary(ST_GeogFromWKB(NULL))
+-- !query analysis
+Project [st_asbinary(st_geogfromwkb(cast(null as binary))) AS 
st_asbinary(st_geogfromwkb(NULL))#x]
++- OneRowRelation
+
+
+-- !query
+SELECT 
hex(ST_AsBinary(ST_GeogFromWKB(X'0101000000000000000000F03F0000000000000040')))
+-- !query analysis
+Project 
[hex(st_asbinary(st_geogfromwkb(0x0101000000000000000000F03F0000000000000040))) 
AS 
hex(st_asbinary(st_geogfromwkb(X'0101000000000000000000F03F0000000000000040')))#x]
++- OneRowRelation
+
+
+-- !query
+SELECT ST_GeogFromWKB(X'6F')
+-- !query analysis
+Project [st_geogfromwkb(0x6F) AS st_geogfromwkb(X'6F')#x]
++- OneRowRelation
+
+
+-- !query
+SELECT COUNT(*) FROM geodata WHERE ST_GeogFromWKB(wkb) IS NULL AND wkb IS NOT 
NULL
+-- !query analysis
+Aggregate [count(1) AS count(1)#xL]
++- Filter (isnull(st_geogfromwkb(wkb#x)) AND isnotnull(wkb#x))
+   +- SubqueryAlias spark_catalog.default.geodata
+      +- Relation spark_catalog.default.geodata[wkb#x] parquet
+
+
+-- !query
+SELECT COUNT(*) FROM geodata WHERE ST_AsBinary(ST_GeogFromWKB(wkb)) <> wkb
+-- !query analysis
+Aggregate [count(1) AS count(1)#xL]
++- Filter NOT (st_asbinary(st_geogfromwkb(wkb#x)) = wkb#x)
+   +- SubqueryAlias spark_catalog.default.geodata
+      +- Relation spark_catalog.default.geodata[wkb#x] parquet
+
+
 -- !query
 SELECT ST_AsBinary(ST_GeomFromWKB(NULL))
 -- !query analysis
@@ -527,6 +566,20 @@ org.apache.spark.SparkIllegalArgumentException
 }
 
 
+-- !query
+SELECT ST_GeomFromWKB(X'6F')
+-- !query analysis
+Project [st_geomfromwkb(0x6F, 0) AS st_geomfromwkb(X'6F', 0)#x]
++- OneRowRelation
+
+
+-- !query
+SELECT ST_GeomFromWKB(X'6F', 4326)
+-- !query analysis
+Project [st_geomfromwkb(0x6F, 4326) AS st_geomfromwkb(X'6F', 4326)#x]
++- OneRowRelation
+
+
 -- !query
 SELECT COUNT(*) FROM geodata WHERE ST_GeomFromWKB(wkb) IS NULL AND wkb IS NOT 
NULL
 -- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/st-functions.sql 
b/sql/core/src/test/resources/sql-tests/inputs/st-functions.sql
index c12b79fb2869..f3543837ddfc 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/st-functions.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/st-functions.sql
@@ -95,6 +95,18 @@ SELECT typeof(IF(wkb IS NOT NULL, 
ST_GeomFromWKB(wkb)::GEOMETRY(ANY), ST_GeomFro
 SELECT 
hex(ST_AsBinary(ST_GeogFromWKB(X'0101000000000000000000f03f0000000000000040'))) 
AS result;
 SELECT 
hex(ST_AsBinary(ST_GeomFromWKB(X'0101000000000000000000f03f0000000000000040'))) 
AS result;
 
+---- ST_GeogFromWKB
+
+-- 1. Driver-level queries.
+SELECT ST_AsBinary(ST_GeogFromWKB(NULL));
+SELECT 
hex(ST_AsBinary(ST_GeogFromWKB(X'0101000000000000000000F03F0000000000000040')));
+-- Error handling: invalid WKB.
+SELECT ST_GeogFromWKB(X'6F');
+
+-- 2. Table-level queries.
+SELECT COUNT(*) FROM geodata WHERE ST_GeogFromWKB(wkb) IS NULL AND wkb IS NOT 
NULL;
+SELECT COUNT(*) FROM geodata WHERE ST_AsBinary(ST_GeogFromWKB(wkb)) <> wkb;
+
 ---- ST_GeomFromWKB
 
 -- 1. Driver-level queries.
@@ -106,6 +118,9 @@ SELECT 
hex(ST_AsBinary(ST_GeomFromWKB(X'0101000000000000000000F03F00000000000000
 -- Error handling: invalid SRID.
 SELECT ST_GeomFromWKB(X'0101000000000000000000F03F0000000000000040', -1);
 SELECT ST_GeomFromWKB(X'0101000000000000000000F03F0000000000000040', 9999);
+-- Error handling: invalid WKB.
+SELECT ST_GeomFromWKB(X'6F');
+SELECT ST_GeomFromWKB(X'6F', 4326);
 
 -- 2. Table-level queries.
 SELECT COUNT(*) FROM geodata WHERE ST_GeomFromWKB(wkb) IS NULL AND wkb IS NOT 
NULL;
diff --git a/sql/core/src/test/resources/sql-tests/results/st-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/st-functions.sql.out
index c4b0fbeec5be..03f5728d7d3a 100644
--- a/sql/core/src/test/resources/sql-tests/results/st-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/st-functions.sql.out
@@ -523,6 +523,54 @@ struct<result:string>
 0101000000000000000000F03F0000000000000040
 
 
+-- !query
+SELECT ST_AsBinary(ST_GeogFromWKB(NULL))
+-- !query schema
+struct<st_asbinary(st_geogfromwkb(NULL)):binary>
+-- !query output
+NULL
+
+
+-- !query
+SELECT 
hex(ST_AsBinary(ST_GeogFromWKB(X'0101000000000000000000F03F0000000000000040')))
+-- !query schema
+struct<hex(st_asbinary(st_geogfromwkb(X'0101000000000000000000F03F0000000000000040'))):string>
+-- !query output
+0101000000000000000000F03F0000000000000040
+
+
+-- !query
+SELECT ST_GeogFromWKB(X'6F')
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkIllegalArgumentException
+{
+  "errorClass" : "WKB_PARSE_ERROR",
+  "sqlState" : "22023",
+  "messageParameters" : {
+    "parseError" : "Unexpected end of WKB buffer",
+    "pos" : "0"
+  }
+}
+
+
+-- !query
+SELECT COUNT(*) FROM geodata WHERE ST_GeogFromWKB(wkb) IS NULL AND wkb IS NOT 
NULL
+-- !query schema
+struct<count(1):bigint>
+-- !query output
+0
+
+
+-- !query
+SELECT COUNT(*) FROM geodata WHERE ST_AsBinary(ST_GeogFromWKB(wkb)) <> wkb
+-- !query schema
+struct<count(1):bigint>
+-- !query output
+0
+
+
 -- !query
 SELECT ST_AsBinary(ST_GeomFromWKB(NULL))
 -- !query schema
@@ -593,6 +641,38 @@ org.apache.spark.SparkIllegalArgumentException
 }
 
 
+-- !query
+SELECT ST_GeomFromWKB(X'6F')
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkIllegalArgumentException
+{
+  "errorClass" : "WKB_PARSE_ERROR",
+  "sqlState" : "22023",
+  "messageParameters" : {
+    "parseError" : "Unexpected end of WKB buffer",
+    "pos" : "0"
+  }
+}
+
+
+-- !query
+SELECT ST_GeomFromWKB(X'6F', 4326)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkIllegalArgumentException
+{
+  "errorClass" : "WKB_PARSE_ERROR",
+  "sqlState" : "22023",
+  "messageParameters" : {
+    "parseError" : "Unexpected end of WKB buffer",
+    "pos" : "0"
+  }
+}
+
+
 -- !query
 SELECT COUNT(*) FROM geodata WHERE ST_GeomFromWKB(wkb) IS NULL AND wkb IS NOT 
NULL
 -- !query schema
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala
index 33274c05b899..4716818e5e36 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala
@@ -488,6 +488,31 @@ class STExpressionsSuite
     checkEvaluation(ST_AsBinary(geometryExpression), wkb)
   }
 
+  test("ST_GeogFromWKB - expressions") {
+    // Test data: WKB representation of POINT(1 2).
+    val wkb = 
Hex.unhex("0101000000000000000000F03F0000000000000040".getBytes())
+    val wkbLiteral = Literal.create(wkb, BinaryType)
+    // ST_GeogFromWKB with default SRID.
+    val geographyExpression = ST_GeogFromWKB(wkbLiteral)
+    assert(geographyExpression.dataType.sameType(defaultGeographyType))
+    checkEvaluation(ST_AsBinary(geographyExpression), wkb)
+    checkEvaluation(ST_Srid(geographyExpression), defaultGeographySrid)
+    // ST_GeogFromWKB with NULL input.
+    val nullLiteral = Literal.create(null, BinaryType)
+    val geographyExpressionNull = ST_GeogFromWKB(nullLiteral)
+    checkEvaluation(geographyExpressionNull, null)
+    // ST_GeogFromWKB with invalid WKB.
+    val invalidWkbLiteral = Literal.create(Array[Byte](111), BinaryType)
+    val geographyExpressionInvalidWkb = ST_GeogFromWKB(invalidWkbLiteral)
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        geographyExpressionInvalidWkb.eval()
+      },
+      condition = "WKB_PARSE_ERROR",
+      parameters = Map("parseError" -> "Unexpected end of WKB buffer", "pos" 
-> "0")
+    )
+  }
+
   test("ST_GeomFromWKB - expressions") {
     // Test data: WKB representation of POINT(1 2).
     val wkb = 
Hex.unhex("0101000000000000000000F03F0000000000000040".getBytes())
@@ -513,6 +538,16 @@ class STExpressionsSuite
       condition = "ST_INVALID_SRID_VALUE",
       parameters = Map("srid" -> s"$invalidSrid")
     )
+    // ST_GeomFromWKB with invalid WKB.
+    val invalidWkbLiteral = Literal.create(Array[Byte](111), BinaryType)
+    val geometryExpressionInvalidWkb = new ST_GeomFromWKB(invalidWkbLiteral)
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        geometryExpressionInvalidWkb.eval()
+      },
+      condition = "WKB_PARSE_ERROR",
+      parameters = Map("parseError" -> "Unexpected end of WKB buffer", "pos" 
-> "0")
+    )
   }
 
   test("ST_GeomFromWKB - columns") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/STFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/STFunctionsSuite.scala
index dbde688de264..1f52f2c0b9f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/STFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/STFunctionsSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -43,6 +44,27 @@ class STFunctionsSuite extends QueryTest with 
SharedSparkSession {
         "0101000000000000000000f03f0000000000000040"))
   }
 
+  test("st_geogfromwkb") {
+    // Test data: Well-Known Binary (WKB) representations.
+    val df = Seq[(String)](
+      (
+        "0101000000000000000000f03f0000000000000040"
+      )).toDF("wkb")
+    // ST_GeogFromWKB.
+    checkAnswer(
+      
df.select(lower(hex(st_asbinary(st_geogfromwkb(unhex($"wkb"))))).as("col0")),
+      Row("0101000000000000000000f03f0000000000000040"))
+    // ST_GeogFromWKB with invalid WKB.
+    val df_invalid = Seq(Array[Byte](111)).toDF("wkb")
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        df_invalid.select(st_geogfromwkb($"wkb")).collect()
+      },
+      condition = "WKB_PARSE_ERROR",
+      parameters = Map("parseError" -> "Unexpected end of WKB buffer", "pos" 
-> "0")
+    )
+  }
+
   test("st_geomfromwkb") {
     // Test data: Well-Known Binary (WKB) representations.
     val df = Seq[(String, Int)](
@@ -59,6 +81,23 @@ class STFunctionsSuite extends QueryTest with 
SharedSparkSession {
         "0101000000000000000000f03f0000000000000040",
         "0101000000000000000000f03f0000000000000040",
         "0101000000000000000000f03f0000000000000040"))
+    // ST_GeomFromWKB with invalid SRID.
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        df.select(st_geomfromwkb(unhex($"wkb"), lit(-1))).collect()
+      },
+      condition = "ST_INVALID_SRID_VALUE",
+      parameters = Map("srid" -> "-1")
+    )
+    // ST_GeomFromWKB with invalid WKB.
+    val df_invalid = Seq(Array[Byte](111)).toDF("wkb")
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        df_invalid.select(st_geomfromwkb($"wkb")).collect()
+      },
+      condition = "WKB_PARSE_ERROR",
+      parameters = Map("parseError" -> "Unexpected end of WKB buffer", "pos" 
-> "0")
+    )
   }
 
   /** ST accessor expressions. */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to