This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 19408689a2bd [SPARK-51658][GEO][SQL] Introduce Geometry and Geography
in-memory wrapper formats
19408689a2bd is described below
commit 19408689a2bda73aef99896867fa349a0d260cea
Author: Uros Bojanic <[email protected]>
AuthorDate: Wed Oct 29 22:27:56 2025 +0800
[SPARK-51658][GEO][SQL] Introduce Geometry and Geography in-memory wrapper
formats
### What changes were proposed in this pull request?
Introduce the catalyst-internal geospatial in-memory layout for Geometry
and Geography: `SRID` (integer value) header + `WKB` (byte array) payload, and
add core conversion API to retrieve these values from the server-side execution
wrappers.
### Why are the changes needed?
Establish a consistent representation/encoding for geospatial types, with a
clear contract towards `Geometry` and `Geography` classes. This also provides
minimal safe SRID & WKB extraction in execution and lays the groundwork for
standard format support, conversions, etc.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added the appropriate unit tests to validate SRID and WKB extraction from
the in-memory data formats:
- `GeographyExecutionSuite`
- `GeometryExecutionSuite`
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52761 from uros-db/geo-memory-repr.
Authored-by: Uros Bojanic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/catalyst/util/Geo.java | 38 ++++++++++++++
.../apache/spark/sql/catalyst/util/Geography.java | 26 ++++++++--
.../apache/spark/sql/catalyst/util/Geometry.java | 26 ++++++++--
.../sql/catalyst/util/GeographyExecutionSuite.java | 59 ++++++++++++++++++----
.../sql/catalyst/util/GeometryExecutionSuite.java | 59 ++++++++++++++++++----
5 files changed, 178 insertions(+), 30 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geo.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geo.java
index 7c9b9ec29b41..2299f3598863 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geo.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geo.java
@@ -16,16 +16,54 @@
*/
package org.apache.spark.sql.catalyst.util;
+import java.nio.ByteOrder;
+
// Helper interface for the APIs expected from top-level GEOMETRY and
GEOGRAPHY classes.
interface Geo {
+ // The default endianness for in-memory geo objects is Little Endian (NDR).
+ ByteOrder DEFAULT_ENDIANNESS = ByteOrder.LITTLE_ENDIAN;
+
+ /**
+ * In-memory representation of a geospatial object:
+ * +------------------------------+
+ * | Geo Object |
+ * +------------------------------+
+ * | Header (Fixed Size) |
+ * | +--------------------------+ |
+ * | | SRID [4 bytes] | |
+ * | +--------------------------+ |
+ * | |
+ * | Payload (Variable Size) |
+ * | +--------------------------+ |
+ * | | WKB [byte array] | |
+ * | +--------------------------+ |
+ * +------------------------------+
+ * Byte order: Little Endian (NDR).
+ */
+
+ // Constant defining the size of the header in the geo object in-memory
representation.
+ int HEADER_SIZE = 4;
+
+ /** Header offsets. */
+
+ // Constant defining the offset of the Spatial Reference System Identifier
(SRID) value.
+ int SRID_OFFSET = 0;
+
+ /** Payload offsets. */
+
+ // Constant defining the offset of the Well-Known Binary (WKB)
representation value.
+ int WKB_OFFSET = HEADER_SIZE;
+
/** Binary converters. */
// Returns the Well-Known Binary (WKB) representation of the geo object.
byte[] toWkb();
+ byte[] toWkb(ByteOrder endianness);
// Returns the Extended Well-Known Binary (EWKB) representation of the geo
object.
byte[] toEwkb();
+ byte[] toEwkb(ByteOrder endianness);
/** Textual converters. */
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java
index 882143c4b3dd..f7b0df8990d3 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java
@@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.util;
import org.apache.spark.unsafe.types.GeographyVal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Arrays;
// Catalyst-internal server-side execution wrapper for GEOGRAPHY.
@@ -113,16 +115,31 @@ public final class Geography implements Geo {
@Override
public byte[] toWkb() {
- // Once WKB conversion is implemented, it should support NDR and XDR
endianness.
- throw new UnsupportedOperationException("Geography WKB conversion is not
yet supported.");
+ // This method returns only the WKB portion of the in-memory Geography
representation.
+ // Note that the header is skipped, and that the WKB is returned as-is
(little-endian).
+ return Arrays.copyOfRange(getBytes(), WKB_OFFSET, getBytes().length);
+ }
+
+ @Override
+ public byte[] toWkb(ByteOrder endianness) {
+ // The default endianness is Little Endian (NDR).
+ if (endianness == DEFAULT_ENDIANNESS) {
+ return toWkb();
+ } else {
+ throw new UnsupportedOperationException("Geography WKB endianness is not
yet supported.");
+ }
}
@Override
public byte[] toEwkb() {
- // Once EWKB conversion is implemented, it should support NDR and XDR
endianness.
throw new UnsupportedOperationException("Geography EWKB conversion is not
yet supported.");
}
+ @Override
+ public byte[] toEwkb(ByteOrder endianness) {
+ throw new UnsupportedOperationException("Geography EWKB endianness is not
yet supported.");
+ }
+
/** Geography textual standard format converters: WKT and EWKT. */
@Override
@@ -141,7 +158,8 @@ public final class Geography implements Geo {
@Override
public int srid() {
- throw new UnsupportedOperationException("Geography SRID is not yet
supported.");
+ // This method gets the SRID value from the in-memory Geography
representation header.
+ return
ByteBuffer.wrap(getBytes()).order(DEFAULT_ENDIANNESS).getInt(SRID_OFFSET);
}
}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java
index 9d82b27df250..81cdaeb97ce2 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java
@@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.util;
import org.apache.spark.unsafe.types.GeometryVal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Arrays;
// Catalyst-internal server-side execution wrapper for GEOMETRY.
@@ -113,16 +115,31 @@ public final class Geometry implements Geo {
@Override
public byte[] toWkb() {
- // Once WKB conversion is implemented, it should support NDR and XDR
endianness.
- throw new UnsupportedOperationException("Geometry WKB conversion is not
yet supported.");
+ // This method returns only the WKB portion of the in-memory Geometry
representation.
+ // Note that the header is skipped, and that the WKB is returned as-is
(little-endian).
+ return Arrays.copyOfRange(getBytes(), WKB_OFFSET, getBytes().length);
+ }
+
+ @Override
+ public byte[] toWkb(ByteOrder endianness) {
+ // The default endianness is Little Endian (NDR).
+ if (endianness == DEFAULT_ENDIANNESS) {
+ return toWkb();
+ } else {
+ throw new UnsupportedOperationException("Geometry WKB endianness is not
yet supported.");
+ }
}
@Override
public byte[] toEwkb() {
- // Once EWKB conversion is implemented, it should support NDR and XDR
endianness.
throw new UnsupportedOperationException("Geometry EWKB conversion is not
yet supported.");
}
+ @Override
+ public byte[] toEwkb(ByteOrder endianness) {
+ throw new UnsupportedOperationException("Geometry EWKB endianness is not
yet supported.");
+ }
+
/** Geometry textual standard format converters: WKT and EWKT. */
@Override
@@ -141,7 +158,8 @@ public final class Geometry implements Geo {
@Override
public int srid() {
- throw new UnsupportedOperationException("Geometry SRID is not yet
supported.");
+ // This method gets the SRID value from the in-memory Geometry
representation header.
+ return
ByteBuffer.wrap(getBytes()).order(DEFAULT_ENDIANNESS).getInt(SRID_OFFSET);
}
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java
index f53d66edfa40..de1f4d916d2e 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java
@@ -20,6 +20,9 @@ package org.apache.spark.sql.catalyst.util;
import org.apache.spark.unsafe.types.GeographyVal;
import org.junit.jupiter.api.Test;
+import java.nio.ByteOrder;
+import java.util.HexFormat;
+
import static org.junit.jupiter.api.Assertions.*;
@@ -28,13 +31,15 @@ import static org.junit.jupiter.api.Assertions.*;
*/
class GeographyExecutionTest {
- // A sample Geography byte array for testing purposes.
+ // A sample Geography byte array for testing purposes, representing a
POINT(1 2) with SRID 4326.
private final byte[] testGeographyVal = new byte[] {
+ (byte)0xE6, 0x10, 0x00, 0x00,
0x01, 0x01, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, (byte)0xF0,
0x3F, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x40
+ 0x00, 0x00, 0x00, 0x00,
+ 0x40
};
/** Tests for Geography factory methods and getters. */
@@ -146,13 +151,29 @@ class GeographyExecutionTest {
/** Tests for Geography WKB and EWKB converters. */
@Test
- void testToWkbUnsupported() {
+ void testToWkb() {
+ Geography geography = Geography.fromBytes(testGeographyVal);
+ // WKB value (endianness: NDR) corresponding to WKT: POINT(1 2).
+ byte[] wkb =
HexFormat.of().parseHex("0101000000000000000000f03f0000000000000040");
+ assertArrayEquals(wkb, geography.toWkb());
+ }
+
+ @Test
+ void testToWkbEndiannessNDR() {
+ Geography geography = Geography.fromBytes(testGeographyVal);
+ // WKB value (endianness: NDR) corresponding to WKT: POINT(1 2).
+ byte[] wkb =
HexFormat.of().parseHex("0101000000000000000000f03f0000000000000040");
+ assertArrayEquals(wkb, geography.toWkb(ByteOrder.LITTLE_ENDIAN));
+ }
+
+ @Test
+ void testToWkbEndiannessXDR() {
Geography geography = Geography.fromBytes(testGeographyVal);
UnsupportedOperationException exception = assertThrows(
UnsupportedOperationException.class,
- geography::toWkb
+ () -> geography.toWkb(ByteOrder.BIG_ENDIAN)
);
- assertEquals("Geography WKB conversion is not yet supported.",
exception.getMessage());
+ assertEquals("Geography WKB endianness is not yet supported.",
exception.getMessage());
}
@Test
@@ -165,6 +186,26 @@ class GeographyExecutionTest {
assertEquals("Geography EWKB conversion is not yet supported.",
exception.getMessage());
}
+ @Test
+ void testToEwkbEndiannessXDRUnsupported() {
+ Geography geography = Geography.fromBytes(testGeographyVal);
+ UnsupportedOperationException exception = assertThrows(
+ UnsupportedOperationException.class,
+ () -> geography.toEwkb(ByteOrder.BIG_ENDIAN)
+ );
+ assertEquals("Geography EWKB endianness is not yet supported.",
exception.getMessage());
+ }
+
+ @Test
+ void testToEwkbEndiannessNDRUnsupported() {
+ Geography geography = Geography.fromBytes(testGeographyVal);
+ UnsupportedOperationException exception = assertThrows(
+ UnsupportedOperationException.class,
+ () -> geography.toEwkb(ByteOrder.LITTLE_ENDIAN)
+ );
+ assertEquals("Geography EWKB endianness is not yet supported.",
exception.getMessage());
+ }
+
/** Tests for Geography WKT and EWKT converters. */
@Test
@@ -190,12 +231,8 @@ class GeographyExecutionTest {
/** Tests for other Geography methods. */
@Test
- void testSridUnsupported() {
+ void testSrid() {
Geography geography = Geography.fromBytes(testGeographyVal);
- UnsupportedOperationException exception = assertThrows(
- UnsupportedOperationException.class,
- geography::srid
- );
- assertEquals("Geography SRID is not yet supported.",
exception.getMessage());
+ assertEquals(4326, geography.srid());
}
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeometryExecutionSuite.java
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeometryExecutionSuite.java
index 13e6618f833f..17950f9cad0d 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeometryExecutionSuite.java
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeometryExecutionSuite.java
@@ -20,6 +20,9 @@ package org.apache.spark.sql.catalyst.util;
import org.apache.spark.unsafe.types.GeometryVal;
import org.junit.jupiter.api.Test;
+import java.nio.ByteOrder;
+import java.util.HexFormat;
+
import static org.junit.jupiter.api.Assertions.*;
@@ -28,13 +31,15 @@ import static org.junit.jupiter.api.Assertions.*;
*/
class GeometryExecutionTest {
- // A sample Geometry byte array for testing purposes.
+ // A sample Geometry byte array for testing purposes, representing a POINT(1
2) with SRID 4326.
private final byte[] testGeometryVal = new byte[] {
+ (byte)0xE6, 0x10, 0x00, 0x00,
0x01, 0x01, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, (byte)0xF0,
0x3F, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x40
+ 0x00, 0x00, 0x00, 0x00,
+ 0x40
};
/** Tests for Geometry factory methods and getters. */
@@ -146,13 +151,29 @@ class GeometryExecutionTest {
/** Tests for Geometry WKB and EWKB converters. */
@Test
- void testToWkbUnsupported() {
+ void testToWkb() {
+ Geometry geometry = Geometry.fromBytes(testGeometryVal);
+ // WKB value (endianness: NDR) corresponding to WKT: POINT(1 2).
+ byte[] wkb =
HexFormat.of().parseHex("0101000000000000000000f03f0000000000000040");
+ assertArrayEquals(wkb, geometry.toWkb());
+ }
+
+ @Test
+ void testToWkbEndiannessNDR() {
+ Geometry geometry = Geometry.fromBytes(testGeometryVal);
+ // WKB value (endianness: NDR) corresponding to WKT: POINT(1 2).
+ byte[] wkb =
HexFormat.of().parseHex("0101000000000000000000f03f0000000000000040");
+ assertArrayEquals(wkb, geometry.toWkb(ByteOrder.LITTLE_ENDIAN));
+ }
+
+ @Test
+ void testToWkbEndiannessXDR() {
Geometry geometry = Geometry.fromBytes(testGeometryVal);
UnsupportedOperationException exception = assertThrows(
UnsupportedOperationException.class,
- geometry::toWkb
+ () -> geometry.toWkb(ByteOrder.BIG_ENDIAN)
);
- assertEquals("Geometry WKB conversion is not yet supported.",
exception.getMessage());
+ assertEquals("Geometry WKB endianness is not yet supported.",
exception.getMessage());
}
@Test
@@ -165,6 +186,26 @@ class GeometryExecutionTest {
assertEquals("Geometry EWKB conversion is not yet supported.",
exception.getMessage());
}
+ @Test
+ void testToEwkbEndiannessXDRUnsupported() {
+ Geometry geometry = Geometry.fromBytes(testGeometryVal);
+ UnsupportedOperationException exception = assertThrows(
+ UnsupportedOperationException.class,
+ () -> geometry.toEwkb(ByteOrder.BIG_ENDIAN)
+ );
+ assertEquals("Geometry EWKB endianness is not yet supported.",
exception.getMessage());
+ }
+
+ @Test
+ void testToEwkbEndiannessNDRUnsupported() {
+ Geometry geometry = Geometry.fromBytes(testGeometryVal);
+ UnsupportedOperationException exception = assertThrows(
+ UnsupportedOperationException.class,
+ () -> geometry.toEwkb(ByteOrder.LITTLE_ENDIAN)
+ );
+ assertEquals("Geometry EWKB endianness is not yet supported.",
exception.getMessage());
+ }
+
/** Tests for Geometry WKT and EWKT converters. */
@Test
@@ -190,12 +231,8 @@ class GeometryExecutionTest {
/** Tests for other Geometry methods. */
@Test
- void testSridUnsupported() {
+ void testSrid() {
Geometry geometry = Geometry.fromBytes(testGeometryVal);
- UnsupportedOperationException exception = assertThrows(
- UnsupportedOperationException.class,
- geometry::srid
- );
- assertEquals("Geometry SRID is not yet supported.",
exception.getMessage());
+ assertEquals(4326, geometry.srid());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]