This is an automated email from the ASF dual-hosted git repository. jiayu pushed a commit to branch SEDONA-575 in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 7594a3c34bbc1b240bf8ebccf7ffea6cf110024a Author: Pranav Toggi <[email protected]> AuthorDate: Fri Apr 5 16:14:00 2024 -0400 [TASK-137] Add ST_LineFromWKB (#151) * init * init 2 * add flink test * add scala, python, snowflake tests * add docs * Fix scala implementation * Fix test * Add ST_LineFromWKB * fix typo * Fix scala implementation * add scala dataframeapi test * Add flink tests * Add python dataframeapi test * Fix test * Fix doc * Fix test * Add tests * Fix test * Fix typo * Update docs --- .../org/apache/sedona/common/Constructors.java | 13 ++++++ .../org/apache/sedona/common/FunctionsTest.java | 20 +++++++++ docs/api/flink/Constructor.md | 31 ++++++++++++++ docs/api/snowflake/vector-data/Constructor.md | 29 +++++++++++++ docs/api/sql/Constructor.md | 31 ++++++++++++++ .../main/java/org/apache/sedona/flink/Catalog.java | 1 + .../sedona/flink/expressions/Constructors.java | 37 +++++++++++++++-- .../org/apache/sedona/flink/ConstructorTest.java | 47 ++++++++++++++++++++-- python/sedona/sql/st_constructors.py | 14 +++++++ python/tests/sql/test_dataframe_api.py | 1 + .../sedona/snowflake/snowsql/TestConstructors.java | 16 ++++++++ .../org/apache/sedona/snowflake/snowsql/UDFs.java | 14 +++++++ .../scala/org/apache/sedona/sql/UDF/Catalog.scala | 1 + .../sql/sedona_sql/expressions/Constructors.scala | 45 +++++++++++++++++++++ .../sedona_sql/expressions/st_constructors.scala | 6 +++ .../apache/sedona/sql/constructorTestScala.scala | 41 +++++++++++++++++++ .../apache/sedona/sql/dataFrameAPITestScala.scala | 13 ++++++ 17 files changed, 353 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/org/apache/sedona/common/Constructors.java b/common/src/main/java/org/apache/sedona/common/Constructors.java index 6dd348c01..e33789ab6 100644 --- a/common/src/main/java/org/apache/sedona/common/Constructors.java +++ b/common/src/main/java/org/apache/sedona/common/Constructors.java @@ -79,6 +79,19 @@ public class Constructors { return geom; } + public static Geometry lineFromWKB(byte[] wkb) throws ParseException { + return lineFromWKB(wkb, 0); + } + + public static Geometry lineFromWKB(byte[] wkb, int srid) throws ParseException { + GeometryFactory geometryFactory = new GeometryFactory(new PrecisionModel(), srid); + Geometry geom = new WKBReader(geometryFactory).read(wkb); + if (!(geom instanceof LineString)) { + return null; + } + return geom; + } + public static Geometry mLineFromText(String wkt, int srid) throws ParseException { if (wkt == null || !wkt.startsWith("MULTILINESTRING")) { return null; diff --git a/common/src/test/java/org/apache/sedona/common/FunctionsTest.java b/common/src/test/java/org/apache/sedona/common/FunctionsTest.java index 6d15e20ca..4fe7d1578 100644 --- a/common/src/test/java/org/apache/sedona/common/FunctionsTest.java +++ b/common/src/test/java/org/apache/sedona/common/FunctionsTest.java @@ -152,6 +152,26 @@ public class FunctionsTest extends TestBase { assertNull(result); } + @Test + public void lineFromWKB() throws Exception{ + Geometry geometry = GEOMETRY_FACTORY.createPoint(new Coordinate(1.0, 2.0)); + byte[] wkbGeom = Functions.asWKB(geometry); + Geometry result = Constructors.lineFromWKB(wkbGeom); + assertNull(result); + + geometry = GEOMETRY_FACTORY.createLineString(coordArray(0.0, 0.0, 5.0, 5.0, 5.0, 2.0)); + wkbGeom = Functions.asWKB(geometry); + result = Constructors.lineFromWKB(wkbGeom, 4326); + assertEquals(geometry, result); + assertEquals(4326, Objects.requireNonNull(result).getSRID()); + + geometry = GEOMETRY_FACTORY.createLineString(coordArray(0.0, 0.0, 1.5, 1.5, 2.0, 2.0)); + wkbGeom = Functions.asWKB(geometry); + result = Constructors.lineFromWKB(wkbGeom); + assertEquals(geometry, result); + assertEquals(0, Objects.requireNonNull(result).getSRID()); + } + @Test public void splitLineStringByMultipoint() { LineString lineString = GEOMETRY_FACTORY.createLineString(coordArray(0.0, 0.0, 1.5, 1.5, 2.0, 2.0)); diff --git a/docs/api/flink/Constructor.md b/docs/api/flink/Constructor.md index 3f26e08b7..2d0e91e5d 100644 --- a/docs/api/flink/Constructor.md +++ b/docs/api/flink/Constructor.md @@ -331,6 +331,37 @@ Output: LINESTRING (-74.0428197 40.6867969, -74.0421975 40.6921336, -74.050802 40.6912794) ``` +## ST_LineFromWKB + +Introduction: Construct a LineString geometry from WKB string or Binary and an optional SRID. This function also supports EWKB format. + +!!!note + Returns null if geometry is not of type LineString. + +Format: + +`ST_LineFromWKB (Wkb: String)` + +`ST_LineFromWKB (Wkb: Binary)` + +`ST_LineFromWKB (Wkb: String, srid: Integer)` + +`ST_LineFromWKB (Wkb: Binary, srid: Integer)` + +Since: `vTBD` + +Example: + +```sql +SELECT ST_LineFromWKB([01 02 00 00 00 02 00 00 00 00 00 00 00 84 D6 00 C0 00 00 00 00 80 B5 D6 BF 00 00 00 60 E1 EF F7 BF 00 00 00 80 07 5D E5 BF]) +``` + +Output: + +``` +LINESTRING (-2.1047439575195312 -0.354827880859375, -1.49606454372406 -0.6676061153411865) +``` + ## ST_MLineFromText Introduction: Construct a MultiLineString from Text and Optional SRID diff --git a/docs/api/snowflake/vector-data/Constructor.md b/docs/api/snowflake/vector-data/Constructor.md index 796f77994..3ba2433e2 100644 --- a/docs/api/snowflake/vector-data/Constructor.md +++ b/docs/api/snowflake/vector-data/Constructor.md @@ -303,6 +303,35 @@ Output: LINESTRING (-74.0428197 40.6867969, -74.0421975 40.6921336, -74.050802 40.6912794) ``` +## ST_LineFromWKB + +Introduction: Construct a LineString geometry from WKB string or Binary and an optional SRID. This function also supports EWKB format. + +!!!note + Returns null if geometry is not of type LineString. + +Format: + +`ST_LineFromWKB (Wkb: String)` + +`ST_LineFromWKB (Wkb: Binary)` + +`ST_LineFromWKB (Wkb: String, srid: Integer)` + +`ST_LineFromWKB (Wkb: Binary, srid: Integer)` + +Example: + +```sql +SELECT ST_LineFromWKB([01 02 00 00 00 02 00 00 00 00 00 00 00 84 D6 00 C0 00 00 00 00 80 B5 D6 BF 00 00 00 60 E1 EF F7 BF 00 00 00 80 07 5D E5 BF]) +``` + +Output: + +``` +LINESTRING (-2.1047439575195312 -0.354827880859375, -1.49606454372406 -0.6676061153411865) +``` + ## ST_MLineFromText Introduction: Construct a MultiLineString from Wkt. If srid is not set, it defaults to 0 (unknown). diff --git a/docs/api/sql/Constructor.md b/docs/api/sql/Constructor.md index 0fdd8177d..d1acba8d2 100644 --- a/docs/api/sql/Constructor.md +++ b/docs/api/sql/Constructor.md @@ -382,6 +382,37 @@ Output: LINESTRING (-74.0428197 40.6867969, -74.0421975 40.6921336, -74.050802 40.6912794) ``` +## ST_LineFromWKB + +Introduction: Construct a LineString geometry from WKB string or Binary and an optional SRID. This function also supports EWKB format. + +!!!note + Returns null if geometry is not of type LineString. + +Format: + +`ST_LineFromWKB (Wkb: String)` + +`ST_LineFromWKB (Wkb: Binary)` + +`ST_LineFromWKB (Wkb: String, srid: Integer)` + +`ST_LineFromWKB (Wkb: Binary, srid: Integer)` + +Since: `vTBD` + +Example: + +```sql +SELECT ST_LineFromWKB([01 02 00 00 00 02 00 00 00 00 00 00 00 84 D6 00 C0 00 00 00 00 80 B5 D6 BF 00 00 00 60 E1 EF F7 BF 00 00 00 80 07 5D E5 BF]) +``` + +Output: + +``` +LINESTRING (-2.1047439575195312 -0.354827880859375, -1.49606454372406 -0.6676061153411865) +``` + ## ST_MLineFromText Introduction: Construct a MultiLineString from Wkt. If srid is not set, it defaults to 0 (unknown). diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java index 6b5639f97..8411b4ded 100644 --- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java +++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java @@ -28,6 +28,7 @@ public class Catalog { new Constructors.ST_PointZM(), new Constructors.ST_PointFromText(), new Constructors.ST_PointFromWKB(), + new Constructors.ST_LineFromWKB(), new Constructors.ST_MakePoint(), new Constructors.ST_LineStringFromText(), new Constructors.ST_LineFromText(), diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java index 1e8f87ded..5b66e41dd 100644 --- a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java +++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java @@ -19,10 +19,7 @@ import org.apache.sedona.common.enums.FileDataSplitter; import org.apache.sedona.common.enums.GeometryType; import org.apache.sedona.common.utils.FormatUtils; import org.apache.sedona.common.utils.GeoHashDecoder; -import org.locationtech.jts.geom.Coordinate; -import org.locationtech.jts.geom.Geometry; -import org.locationtech.jts.geom.GeometryFactory; -import org.locationtech.jts.geom.Point; +import org.locationtech.jts.geom.*; import org.locationtech.jts.io.WKBReader; import org.locationtech.jts.io.ParseException; import org.locationtech.jts.io.gml2.GMLReader; @@ -250,6 +247,38 @@ public class Constructors { } } + public static class ST_LineFromWKB extends ScalarFunction { + @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) + public Geometry eval(@DataTypeHint("String") String wkbString) throws ParseException { + Geometry geometry = getGeometryByFileData(wkbString, FileDataSplitter.WKB); + if (geometry instanceof LineString) { + geometry.setSRID(0); + return geometry; + } + return null; // Return null if geometry is not a Point + } + + @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) + public Geometry eval(@DataTypeHint("String") String wkbString, int srid) throws ParseException { + Geometry geometry = getGeometryByFileData(wkbString, FileDataSplitter.WKB); + if (geometry instanceof LineString) { + geometry.setSRID(srid); + return geometry; + } + return null; // Return null if geometry is not a Linestring + } + + @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) + public Geometry eval(@DataTypeHint("Bytes") byte[] wkb) throws ParseException { + return org.apache.sedona.common.Constructors.lineFromWKB(wkb, 0); + } + + @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) + public Geometry eval(@DataTypeHint("Bytes") byte[] wkb, int srid) throws ParseException { + return org.apache.sedona.common.Constructors.lineFromWKB(wkb, srid); + } + } + public static class ST_GeomFromGeoJSON extends ScalarFunction { @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) public Geometry eval(@DataTypeHint("String") String geoJson) throws ParseException { diff --git a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java index 7534dc072..236a0cd61 100644 --- a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java +++ b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java @@ -13,6 +13,8 @@ */ package org.apache.sedona.flink; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; import org.apache.flink.table.api.Table; import org.apache.flink.types.Row; import org.apache.flink.streaming.api.datastream.DataStream; @@ -322,11 +324,11 @@ public class ConstructorTest extends TestBase{ @Test public void testPointFromWKB() throws Exception { String hexWkb1 = "010100000000000000000000000000000000000000"; - byte[] wkbPoint1 = org.apache.commons.codec.binary.Hex.decodeHex(hexWkb1); + byte[] wkbPoint1 = Hex.decodeHex(hexWkb1); String hexWkb2 = "010100000000000000000024400000000000002e40"; - byte[] wkbPoint2 = org.apache.commons.codec.binary.Hex.decodeHex(hexWkb2); + byte[] wkbPoint2 = Hex.decodeHex(hexWkb2); String hexWkb3 = "01030000000100000005000000000000000000e0bf000000000000e0bf000000000000e0bf000000000000e03f000000000000e03f000000000000e03f000000000000e03f000000000000e0bf000000000000e0bf000000000000e0bf"; - byte[] wkbPolygon = org.apache.commons.codec.binary.Hex.decodeHex(hexWkb3); + byte[] wkbPolygon = Hex.decodeHex(hexWkb3); List<Row> data1 = new ArrayList<>(); data1.add(Row.of(wkbPoint1)); @@ -362,6 +364,45 @@ public class ConstructorTest extends TestBase{ assertNull(results2.get(2).getField(0)); } + @Test + public void testLineFromWKB() throws DecoderException { + String hexWkb1 = "010200000003000000000000000000000000000000000000000000000000000040000000000000004000000000000010400000000000001040"; + byte[] wkbLine1 = Hex.decodeHex(hexWkb1); + String hexWkb2 = "010200000003000000000000000000000000000000000000000000000000407f400000000000407f400000000000407f4000000000000059c0"; + byte[] wkbLine2 = Hex.decodeHex(hexWkb2); + String hexWkb3 = "01030000000100000005000000000000000000e0bf000000000000e0bf000000000000e0bf000000000000e03f000000000000e03f000000000000e03f000000000000e03f000000000000e0bf000000000000e0bf000000000000e0bf"; + byte[] wkbPolygon = Hex.decodeHex(hexWkb3); + + List<Row> data1 = Arrays.asList(Row.of(wkbLine1), Row.of(wkbLine2), Row.of(wkbPolygon)); + List<Row> data2 = Arrays.asList(Row.of(hexWkb1), Row.of(hexWkb2), Row.of(hexWkb3)); + + TypeInformation<?>[] colTypes1 = {PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO}; + TypeInformation<?>[] colTypes2 = {BasicTypeInfo.STRING_TYPE_INFO}; + RowTypeInfo typeInfo1 = new RowTypeInfo(colTypes1, new String[]{"wkb"}); + RowTypeInfo typeInfo2 = new RowTypeInfo(colTypes2, new String[]{"wkb"}); + + DataStream<Row> wkbDS1 = env.fromCollection(data1).returns(typeInfo1); + Table wkbTable1 = tableEnv.fromDataStream(wkbDS1, $("wkb")); + + DataStream<Row> wkbDS2 = env.fromCollection(data2).returns(typeInfo2); + Table wkbTable2 = tableEnv.fromDataStream(wkbDS2, $("wkb")); + + Table lineTable1 = wkbTable1.select(call(Constructors.ST_LineFromWKB.class.getSimpleName(), $("wkb")).as("point")); + Table lineTable2 = wkbTable2.select(call(Constructors.ST_LineFromWKB.class.getSimpleName(), $("wkb")).as("point")); + + // Test with byte array + List<Row> results1 = TestBase.take(lineTable1, 3); + assertEquals("LINESTRING (0 0, 2 2, 4 4)", results1.get(0).getField(0).toString()); + assertEquals("LINESTRING (0 0, 500 500, 500 -100)", results1.get(1).getField(0).toString()); + assertNull(results1.get(2).getField(0)); + + // Test with hex string + List<Row> results2 = TestBase.take(lineTable2, 3); + assertEquals("LINESTRING (0 0, 2 2, 4 4)", results2.get(0).getField(0).toString()); + assertEquals("LINESTRING (0 0, 500 500, 500 -100)", results2.get(1).getField(0).toString()); + assertNull(results2.get(2).getField(0)); + } + @Test public void testGeomFromEWKB() { diff --git a/python/sedona/sql/st_constructors.py b/python/sedona/sql/st_constructors.py index fae53f49b..9289dc734 100644 --- a/python/sedona/sql/st_constructors.py +++ b/python/sedona/sql/st_constructors.py @@ -271,6 +271,20 @@ def ST_PointFromWKB(wkb: ColumnOrName, srid: Optional[ColumnOrNameOrNumber] = No args = (wkb) if srid is None else (wkb, srid) return _call_constructor_function("ST_PointFromWKB", args) +@validate_argument_types +def ST_LineFromWKB(wkb: ColumnOrName, srid: Optional[ColumnOrNameOrNumber] = None) -> Column: + """Generate a Line geometry column from a Well-Known Binary (WKB) binary column. + + :param wkb: WKB binary column to generate from. + :type wkb: ColumnOrName + :param srid: SRID to be set for the geometry. + :type srid: ColumnOrNameOrNumber + :return: Geometry column representing the WKB binary. + :rtype: Column + """ + args = (wkb) if srid is None else (wkb, srid) + return _call_constructor_function("ST_LineFromWKB", args) + @validate_argument_types def ST_MakePoint(x: ColumnOrNameOrNumber, y: ColumnOrNameOrNumber, z: Optional[ColumnOrNameOrNumber] = None, m: Optional[ColumnOrNameOrNumber] = None) -> Column: """Generate a 2D, 3D Z or 4D ZM Point geometry. If z is None then a 2D point is generated. diff --git a/python/tests/sql/test_dataframe_api.py b/python/tests/sql/test_dataframe_api.py index 628aceefa..3fb6e0724 100644 --- a/python/tests/sql/test_dataframe_api.py +++ b/python/tests/sql/test_dataframe_api.py @@ -50,6 +50,7 @@ test_configurations = [ (stc.ST_GeomFromWKT, ("wkt",4326), "linestring_wkt", "", "LINESTRING (1 2, 3 4)"), (stc.ST_GeomFromEWKT, ("ewkt",), "linestring_ewkt", "", "LINESTRING (1 2, 3 4)"), (stc.ST_LineFromText, ("wkt",), "linestring_wkt", "", "LINESTRING (1 2, 3 4)"), + (stc.ST_LineFromWKB, ("wkbLine",), "constructor", "ST_ReducePrecision(geom, 2)", "LINESTRING (-2.1 -0.35, -1.5 -0.67)"), (stc.ST_LineStringFromText, ("multiple_point", lambda: f.lit(',')), "constructor", "", "LINESTRING (0 0, 1 0, 1 1, 0 0)"), (stc.ST_Point, ("x", "y"), "constructor", "", "POINT (0 1)"), (stc.ST_PointZ, ("x", "y", "z", 4326), "constructor", "", "POINT Z (0 1 2)"), diff --git a/snowflake-tester/src/test/java/org/apache/sedona/snowflake/snowsql/TestConstructors.java b/snowflake-tester/src/test/java/org/apache/sedona/snowflake/snowsql/TestConstructors.java index dd6e6bc4f..e0664774d 100644 --- a/snowflake-tester/src/test/java/org/apache/sedona/snowflake/snowsql/TestConstructors.java +++ b/snowflake-tester/src/test/java/org/apache/sedona/snowflake/snowsql/TestConstructors.java @@ -104,6 +104,22 @@ public class TestConstructors extends TestBase{ "SRID=4326;POINT (10 15)" ); } + + @Test + public void test_ST_LineFromWKB() { + registerUDF("ST_LineFromWKB", byte[].class); + registerUDF("ST_AsEWKT", byte[].class); + verifySqlSingleRes( + "select sedona.ST_AsText(sedona.ST_LineFromWKB(ST_ASWKB(to_geometry('LINESTRING (0 0, 2 2, 4 4)'))))", + "LINESTRING (0 0, 2 2, 4 4)" + ); + registerUDF("ST_LineFromWKB", byte[].class, int.class); + verifySqlSingleRes( + "select sedona.ST_AsEWKT(sedona.ST_LineFromWKB(ST_ASWKB(to_geometry('LINESTRING (0 0, 2 2, 4 4)')), 4326))", + "SRID=4326;LINESTRING (0 0, 2 2, 4 4)" + ); + } + @Test public void test_ST_GeomFromEWKB() { registerUDF("ST_GeomFromEWKB", byte[].class); diff --git a/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java b/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java index 14c00a64f..4981573cc 100644 --- a/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java +++ b/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java @@ -964,6 +964,20 @@ public class UDFs { ); } + @UDFAnnotations.ParamMeta(argNames = {"wkb"}) + public static byte[] ST_LineFromWKB(byte[] wkb) throws ParseException { + return GeometrySerde.serialize( + Constructors.lineFromWKB(wkb, 0) + ); + } + + @UDFAnnotations.ParamMeta(argNames = {"wkb", "srid"}) + public static byte[] ST_LineFromWKB(byte[] wkb, int srid) throws ParseException { + return GeometrySerde.serialize( + Constructors.lineFromWKB(wkb, srid) + ); + } + @UDFAnnotations.ParamMeta(argNames = {"geometry", "srid"}) public static byte[] ST_Polygon(byte[] geometry, int srid) { return GeometrySerde.serialize( diff --git a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala index 3e7eacf0a..6144735b8 100644 --- a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala +++ b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala @@ -39,6 +39,7 @@ object Catalog { function[GeometryType](), function[ST_PointFromText](), function[ST_PointFromWKB](), + function[ST_LineFromWKB](), function[ST_PolygonFromText](), function[ST_LineStringFromText](), function[ST_GeomFromText](0), diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala index 5ca0a56bc..a1c9ba575 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala @@ -189,6 +189,51 @@ case class ST_GeomFromEWKB(inputExpressions: Seq[Expression]) } +case class ST_LineFromWKB(inputExpressions: Seq[Expression]) + extends Expression with FoldableExpression with ImplicitCastInputTypes with CodegenFallback with UserDataGeneratator { + + // Validate the number of input expressions (1 or 2) + assert(inputExpressions.length >= 1 && inputExpressions.length <= 2) + + override def nullable: Boolean = true + + override def eval(inputRow: InternalRow): Any = { + val wkb = inputExpressions.head.eval(inputRow) + val srid = if (inputExpressions.length > 1) inputExpressions(1).eval(inputRow) else 0 + + wkb match { + case geomString: UTF8String => + // Parse UTF-8 encoded WKB string + val geom = Constructors.lineStringFromText(geomString.toString, "wkb") + if (geom.getGeometryType == "LineString") { + geom.setSRID(srid.asInstanceOf[Int]) + geom.toGenericArrayData + } else { + null + } + + case wkbArray: Array[Byte] => + // Convert raw WKB byte array to geometry + Constructors.lineFromWKB(wkbArray, srid.asInstanceOf[Int]).toGenericArrayData + + case _ => null + } + } + + override def dataType: DataType = GeometryUDT + + override def inputTypes: Seq[AbstractDataType] = + if (inputExpressions.length == 1) Seq(TypeCollection(StringType, BinaryType)) + else Seq(TypeCollection(StringType, BinaryType), IntegerType) + + override def children: Seq[Expression] = inputExpressions + + protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): ST_LineFromWKB = { + copy(inputExpressions = newChildren) + } +} + + case class ST_PointFromWKB(inputExpressions: Seq[Expression]) extends Expression with FoldableExpression with ImplicitCastInputTypes with CodegenFallback with UserDataGeneratator { diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_constructors.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_constructors.scala index 7c99844f4..15f417a7d 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_constructors.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_constructors.scala @@ -107,6 +107,12 @@ object st_constructors extends DataFrameAPI { def ST_PointFromWKB(wkb: Column, srid: Column): Column = wrapExpression[ST_PointFromWKB](wkb, srid) def ST_PointFromWKB(wkb: String, srid: Int): Column = wrapExpression[ST_PointFromWKB](wkb, srid) + def ST_LineFromWKB(wkb: Column): Column = wrapExpression[ST_LineFromWKB](wkb, 0) + def ST_LineFromWKB(wkb: String): Column = wrapExpression[ST_LineFromWKB](wkb, 0) + + def ST_LineFromWKB(wkb: Column, srid: Column): Column = wrapExpression[ST_LineFromWKB](wkb, srid) + def ST_LineFromWKB(wkb: String, srid: Int): Column = wrapExpression[ST_LineFromWKB](wkb, srid) + def ST_MakePoint(x: Column, y: Column): Column = wrapExpression[ST_MakePoint](x, y, null, null) def ST_MakePoint(x: String, y: String): Column = wrapExpression[ST_MakePoint](x, y, null, null) def ST_MakePoint(x: Double, y: Double): Column = wrapExpression[ST_MakePoint](x, y, null, null) diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala b/spark/common/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala index 024771e05..995640790 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala @@ -247,6 +247,47 @@ class constructorTestScala extends TestBaseScala { assert(wkt.first().getAs[Geometry](0).getGeometryType === "MultiPolygon") } + it("Passed ST_LineFromWKB") { + val geometryDf = Seq( + "010200000003000000000000000000000000000000000000000000000000000840000000000000084000000000000010400000000000001040", + "0101000000000000000000F03F0000000000000040", + "01020000000300000000000000000000c000000000000000c000000000000010400000000000001040000000000000104000000000000000c0", + "0103000000010000000500000000000000000000000000000000000000000000000000f03f000000000000f03f0000000000001440000000000000f03f0000000000001440000000000000000000000000000000000000000000000000" + ).map(Tuple1.apply).toDF("wkb") + + geometryDf.createOrReplaceTempView("wkbtable") + + var validLineDf = sparkSession.sql("SELECT ST_LineFromWKB(wkbtable.wkb) FROM wkbtable") + var rows = validLineDf.collect() + assert(rows.length == 4) + + var expectedPoints = Seq("LINESTRING (0 0, 3 3, 4 4)", null, "LINESTRING (-2 -2, 4 4, 4 -2)", null) + for (i <- rows.indices) { + if (expectedPoints(i) == null) { + assert(rows(i).isNullAt(0)) + } else { + assert(rows(i).getAs[Geometry](0).toString == expectedPoints(i)) + } + } + + validLineDf = sparkSession.sql("SELECT ST_AsEWKT(ST_LineFromWKB(wkbtable.wkb, 4326)) FROM wkbtable") + rows = validLineDf.collect() + assert(rows.length == 4) + + expectedPoints = Seq("SRID=4326;LINESTRING (0 0, 3 3, 4 4)", null, "SRID=4326;LINESTRING (-2 -2, 4 4, 4 -2)", null) + for (i <- rows.indices) { + if (expectedPoints(i) == null) { + assert(rows(i).isNullAt(0)) + } else { + assert(rows(i).get(0).toString == expectedPoints(i)) + } + } + + intercept[Exception] { + sparkSession.sql("SELECT ST_LineFromWKB('invalid')").collect() + } + } + it("Passed ST_GeomFromWKB") { // UTF-8 encoded WKB String val polygonWkbDf = sparkSession.read.format("csv").option("delimiter", "\t").option("header", "false").load(mixedWkbGeometryInputLocation) diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/dataFrameAPITestScala.scala b/spark/common/src/test/scala/org/apache/sedona/sql/dataFrameAPITestScala.scala index 3bf263be4..6df29042e 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/dataFrameAPITestScala.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/dataFrameAPITestScala.scala @@ -140,6 +140,19 @@ class dataFrameAPITestScala extends TestBaseScala { assert(actualResult == expectedResult) } + it("passed st_linefromwkb") { + val wkbSeq = Seq[Array[Byte]](Array[Byte](1, 2, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, -124, -42, 0, -64, 0, 0, 0, 0, -128, -75, -42, -65, 0, 0, 0, 96, -31, -17, -9, -65, 0, 0, 0, -128, 7, 93, -27, -65)) + val df = wkbSeq.toDF("wkb").select(ST_LineFromWKB("wkb")) + val actualResult = df.take(1)(0).get(0).asInstanceOf[Geometry].toText() + val expectedResult = "LINESTRING (-2.1047439575195312 -0.354827880859375, -1.49606454372406 -0.6676061153411865)" + assert(actualResult == expectedResult) + + val wkbStringSeq = Seq("0102000000020000000000000084d600c00000000080b5d6bf00000060e1eff7bf00000080075de5bf") + val dfWithString = wkbStringSeq.toDF("wkb").select(ST_LineFromWKB("wkb")) + val actualStringResult = dfWithString.take(1)(0).get(0).asInstanceOf[Geometry].toText() + assert(actualStringResult == expectedResult) + } + it("passed st_geomfromwkt") { val df = sparkSession.sql("SELECT 'POINT(0.0 1.0)' AS wkt").select(ST_GeomFromWKT("wkt")) val actualResult = df.take(1)(0).get(0).asInstanceOf[Geometry].toText()
