This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch SEDONA-575
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 7594a3c34bbc1b240bf8ebccf7ffea6cf110024a
Author: Pranav Toggi <[email protected]>
AuthorDate: Fri Apr 5 16:14:00 2024 -0400

    [TASK-137] Add ST_LineFromWKB (#151)
    
    * init
    
    * init 2
    
    * add flink test
    
    * add scala, python, snowflake tests
    
    * add docs
    
    * Fix scala implementation
    
    * Fix test
    
    * Add ST_LineFromWKB
    
    * fix typo
    
    * Fix scala implementation
    
    * add scala dataframeapi test
    
    * Add flink tests
    
    * Add python dataframeapi test
    
    * Fix test
    
    * Fix doc
    
    * Fix test
    
    * Add tests
    
    * Fix test
    
    * Fix typo
    
    * Update docs
---
 .../org/apache/sedona/common/Constructors.java     | 13 ++++++
 .../org/apache/sedona/common/FunctionsTest.java    | 20 +++++++++
 docs/api/flink/Constructor.md                      | 31 ++++++++++++++
 docs/api/snowflake/vector-data/Constructor.md      | 29 +++++++++++++
 docs/api/sql/Constructor.md                        | 31 ++++++++++++++
 .../main/java/org/apache/sedona/flink/Catalog.java |  1 +
 .../sedona/flink/expressions/Constructors.java     | 37 +++++++++++++++--
 .../org/apache/sedona/flink/ConstructorTest.java   | 47 ++++++++++++++++++++--
 python/sedona/sql/st_constructors.py               | 14 +++++++
 python/tests/sql/test_dataframe_api.py             |  1 +
 .../sedona/snowflake/snowsql/TestConstructors.java | 16 ++++++++
 .../org/apache/sedona/snowflake/snowsql/UDFs.java  | 14 +++++++
 .../scala/org/apache/sedona/sql/UDF/Catalog.scala  |  1 +
 .../sql/sedona_sql/expressions/Constructors.scala  | 45 +++++++++++++++++++++
 .../sedona_sql/expressions/st_constructors.scala   |  6 +++
 .../apache/sedona/sql/constructorTestScala.scala   | 41 +++++++++++++++++++
 .../apache/sedona/sql/dataFrameAPITestScala.scala  | 13 ++++++
 17 files changed, 353 insertions(+), 7 deletions(-)

diff --git a/common/src/main/java/org/apache/sedona/common/Constructors.java 
b/common/src/main/java/org/apache/sedona/common/Constructors.java
index 6dd348c01..e33789ab6 100644
--- a/common/src/main/java/org/apache/sedona/common/Constructors.java
+++ b/common/src/main/java/org/apache/sedona/common/Constructors.java
@@ -79,6 +79,19 @@ public class Constructors {
         return geom;
     }
 
+    public  static Geometry lineFromWKB(byte[] wkb) throws ParseException {
+        return lineFromWKB(wkb, 0);
+    }
+
+    public static Geometry lineFromWKB(byte[] wkb, int srid) throws 
ParseException {
+        GeometryFactory geometryFactory = new GeometryFactory(new 
PrecisionModel(), srid);
+        Geometry geom =  new WKBReader(geometryFactory).read(wkb);
+        if (!(geom instanceof LineString)) {
+            return null;
+        }
+        return geom;
+    }
+
     public static Geometry mLineFromText(String wkt, int srid) throws 
ParseException {
         if (wkt == null || !wkt.startsWith("MULTILINESTRING")) {
             return null;
diff --git a/common/src/test/java/org/apache/sedona/common/FunctionsTest.java 
b/common/src/test/java/org/apache/sedona/common/FunctionsTest.java
index 6d15e20ca..4fe7d1578 100644
--- a/common/src/test/java/org/apache/sedona/common/FunctionsTest.java
+++ b/common/src/test/java/org/apache/sedona/common/FunctionsTest.java
@@ -152,6 +152,26 @@ public class FunctionsTest extends TestBase {
         assertNull(result);
     }
 
+    @Test
+    public void lineFromWKB() throws Exception{
+        Geometry geometry = GEOMETRY_FACTORY.createPoint(new Coordinate(1.0, 
2.0));
+        byte[] wkbGeom = Functions.asWKB(geometry);
+        Geometry result = Constructors.lineFromWKB(wkbGeom);
+        assertNull(result);
+
+        geometry = GEOMETRY_FACTORY.createLineString(coordArray(0.0, 0.0, 5.0, 
5.0, 5.0, 2.0));
+        wkbGeom = Functions.asWKB(geometry);
+        result = Constructors.lineFromWKB(wkbGeom, 4326);
+        assertEquals(geometry, result);
+        assertEquals(4326, Objects.requireNonNull(result).getSRID());
+
+        geometry = GEOMETRY_FACTORY.createLineString(coordArray(0.0, 0.0, 1.5, 
1.5, 2.0, 2.0));
+        wkbGeom = Functions.asWKB(geometry);
+        result = Constructors.lineFromWKB(wkbGeom);
+        assertEquals(geometry, result);
+        assertEquals(0, Objects.requireNonNull(result).getSRID());
+    }
+
     @Test
     public void splitLineStringByMultipoint() {
         LineString lineString = 
GEOMETRY_FACTORY.createLineString(coordArray(0.0, 0.0, 1.5, 1.5, 2.0, 2.0));
diff --git a/docs/api/flink/Constructor.md b/docs/api/flink/Constructor.md
index 3f26e08b7..2d0e91e5d 100644
--- a/docs/api/flink/Constructor.md
+++ b/docs/api/flink/Constructor.md
@@ -331,6 +331,37 @@ Output:
 LINESTRING (-74.0428197 40.6867969, -74.0421975 40.6921336, -74.050802 
40.6912794)
 ```
 
+## ST_LineFromWKB
+
+Introduction: Construct a LineString geometry from WKB string or Binary and an 
optional SRID. This function also supports EWKB format.
+
+!!!note
+    Returns null if geometry is not of type LineString.
+
+Format:
+
+`ST_LineFromWKB (Wkb: String)`
+
+`ST_LineFromWKB (Wkb: Binary)`
+
+`ST_LineFromWKB (Wkb: String, srid: Integer)`
+
+`ST_LineFromWKB (Wkb: Binary, srid: Integer)`
+
+Since: `vTBD`
+
+Example:
+
+```sql
+SELECT ST_LineFromWKB([01 02 00 00 00 02 00 00 00 00 00 00 00 84 D6 00 C0 00 
00 00 00 80 B5 D6 BF 00 00 00 60 E1 EF F7 BF 00 00 00 80 07 5D E5 BF])
+```
+
+Output:
+
+```
+LINESTRING (-2.1047439575195312 -0.354827880859375, -1.49606454372406 
-0.6676061153411865)
+```
+
 ## ST_MLineFromText
 
 Introduction: Construct a MultiLineString from Text and Optional SRID
diff --git a/docs/api/snowflake/vector-data/Constructor.md 
b/docs/api/snowflake/vector-data/Constructor.md
index 796f77994..3ba2433e2 100644
--- a/docs/api/snowflake/vector-data/Constructor.md
+++ b/docs/api/snowflake/vector-data/Constructor.md
@@ -303,6 +303,35 @@ Output:
 LINESTRING (-74.0428197 40.6867969, -74.0421975 40.6921336, -74.050802 
40.6912794)
 ```
 
+## ST_LineFromWKB
+
+Introduction: Construct a LineString geometry from WKB string or Binary and an 
optional SRID. This function also supports EWKB format.
+
+!!!note
+    Returns null if geometry is not of type LineString.
+
+Format:
+
+`ST_LineFromWKB (Wkb: String)`
+
+`ST_LineFromWKB (Wkb: Binary)`
+
+`ST_LineFromWKB (Wkb: String, srid: Integer)`
+
+`ST_LineFromWKB (Wkb: Binary, srid: Integer)`
+
+Example:
+
+```sql
+SELECT ST_LineFromWKB([01 02 00 00 00 02 00 00 00 00 00 00 00 84 D6 00 C0 00 
00 00 00 80 B5 D6 BF 00 00 00 60 E1 EF F7 BF 00 00 00 80 07 5D E5 BF])
+```
+
+Output:
+
+```
+LINESTRING (-2.1047439575195312 -0.354827880859375, -1.49606454372406 
-0.6676061153411865)
+```
+
 ## ST_MLineFromText
 
 Introduction: Construct a MultiLineString from Wkt. If srid is not set, it 
defaults to 0 (unknown).
diff --git a/docs/api/sql/Constructor.md b/docs/api/sql/Constructor.md
index 0fdd8177d..d1acba8d2 100644
--- a/docs/api/sql/Constructor.md
+++ b/docs/api/sql/Constructor.md
@@ -382,6 +382,37 @@ Output:
 LINESTRING (-74.0428197 40.6867969, -74.0421975 40.6921336, -74.050802 
40.6912794)
 ```
 
+## ST_LineFromWKB
+
+Introduction: Construct a LineString geometry from WKB string or Binary and an 
optional SRID. This function also supports EWKB format.
+
+!!!note
+       Returns null if geometry is not of type LineString.
+
+Format:
+
+`ST_LineFromWKB (Wkb: String)`
+
+`ST_LineFromWKB (Wkb: Binary)`
+
+`ST_LineFromWKB (Wkb: String, srid: Integer)`
+
+`ST_LineFromWKB (Wkb: Binary, srid: Integer)`
+
+Since: `vTBD`
+
+Example:
+
+```sql
+SELECT ST_LineFromWKB([01 02 00 00 00 02 00 00 00 00 00 00 00 84 D6 00 C0 00 
00 00 00 80 B5 D6 BF 00 00 00 60 E1 EF F7 BF 00 00 00 80 07 5D E5 BF])
+```
+
+Output:
+
+```
+LINESTRING (-2.1047439575195312 -0.354827880859375, -1.49606454372406 
-0.6676061153411865)
+```
+
 ## ST_MLineFromText
 
 Introduction: Construct a MultiLineString from Wkt. If srid is not set, it 
defaults to 0 (unknown).
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java 
b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index 6b5639f97..8411b4ded 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -28,6 +28,7 @@ public class Catalog {
                 new Constructors.ST_PointZM(),
                 new Constructors.ST_PointFromText(),
                 new Constructors.ST_PointFromWKB(),
+                new Constructors.ST_LineFromWKB(),
                 new Constructors.ST_MakePoint(),
                 new Constructors.ST_LineStringFromText(),
                 new Constructors.ST_LineFromText(),
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java 
b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
index 1e8f87ded..5b66e41dd 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
@@ -19,10 +19,7 @@ import org.apache.sedona.common.enums.FileDataSplitter;
 import org.apache.sedona.common.enums.GeometryType;
 import org.apache.sedona.common.utils.FormatUtils;
 import org.apache.sedona.common.utils.GeoHashDecoder;
-import org.locationtech.jts.geom.Coordinate;
-import org.locationtech.jts.geom.Geometry;
-import org.locationtech.jts.geom.GeometryFactory;
-import org.locationtech.jts.geom.Point;
+import org.locationtech.jts.geom.*;
 import org.locationtech.jts.io.WKBReader;
 import org.locationtech.jts.io.ParseException;
 import org.locationtech.jts.io.gml2.GMLReader;
@@ -250,6 +247,38 @@ public class Constructors {
         }
     }
 
+    public static class ST_LineFromWKB extends ScalarFunction {
+        @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+        public Geometry eval(@DataTypeHint("String") String wkbString) throws 
ParseException {
+            Geometry geometry = getGeometryByFileData(wkbString, 
FileDataSplitter.WKB);
+            if (geometry instanceof LineString) {
+                geometry.setSRID(0);
+                return geometry;
+            }
+            return null;  // Return null if geometry is not a Point
+        }
+
+        @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+        public Geometry eval(@DataTypeHint("String") String wkbString, int 
srid) throws ParseException {
+            Geometry geometry = getGeometryByFileData(wkbString, 
FileDataSplitter.WKB);
+            if (geometry instanceof LineString) {
+                geometry.setSRID(srid);
+                return geometry;
+            }
+            return null;  // Return null if geometry is not a Linestring
+        }
+
+        @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+        public Geometry eval(@DataTypeHint("Bytes") byte[] wkb) throws 
ParseException {
+            return org.apache.sedona.common.Constructors.lineFromWKB(wkb, 0);
+        }
+
+        @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+        public Geometry eval(@DataTypeHint("Bytes") byte[] wkb, int srid) 
throws ParseException {
+            return org.apache.sedona.common.Constructors.lineFromWKB(wkb, 
srid);
+        }
+    }
+
     public static class ST_GeomFromGeoJSON extends ScalarFunction {
         @DataTypeHint(value = "RAW", bridgedTo = 
org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String geoJson) throws 
ParseException {
diff --git a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java 
b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
index 7534dc072..236a0cd61 100644
--- a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
@@ -13,6 +13,8 @@
  */
 package org.apache.sedona.flink;
 
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -322,11 +324,11 @@ public class ConstructorTest extends TestBase{
     @Test
     public void testPointFromWKB() throws Exception {
         String hexWkb1 = "010100000000000000000000000000000000000000";
-        byte[] wkbPoint1 = 
org.apache.commons.codec.binary.Hex.decodeHex(hexWkb1);
+        byte[] wkbPoint1 = Hex.decodeHex(hexWkb1);
         String hexWkb2 = "010100000000000000000024400000000000002e40";
-        byte[] wkbPoint2 = 
org.apache.commons.codec.binary.Hex.decodeHex(hexWkb2);
+        byte[] wkbPoint2 = Hex.decodeHex(hexWkb2);
         String hexWkb3 = 
"01030000000100000005000000000000000000e0bf000000000000e0bf000000000000e0bf000000000000e03f000000000000e03f000000000000e03f000000000000e03f000000000000e0bf000000000000e0bf000000000000e0bf";
-        byte[] wkbPolygon = 
org.apache.commons.codec.binary.Hex.decodeHex(hexWkb3);
+        byte[] wkbPolygon = Hex.decodeHex(hexWkb3);
 
         List<Row> data1 = new ArrayList<>();
         data1.add(Row.of(wkbPoint1));
@@ -362,6 +364,45 @@ public class ConstructorTest extends TestBase{
         assertNull(results2.get(2).getField(0));
     }
 
+    @Test
+    public void testLineFromWKB() throws DecoderException {
+        String hexWkb1 = 
"010200000003000000000000000000000000000000000000000000000000000040000000000000004000000000000010400000000000001040";
+        byte[] wkbLine1 = Hex.decodeHex(hexWkb1);
+        String hexWkb2 = 
"010200000003000000000000000000000000000000000000000000000000407f400000000000407f400000000000407f4000000000000059c0";
+        byte[] wkbLine2 = Hex.decodeHex(hexWkb2);
+        String hexWkb3 = 
"01030000000100000005000000000000000000e0bf000000000000e0bf000000000000e0bf000000000000e03f000000000000e03f000000000000e03f000000000000e03f000000000000e0bf000000000000e0bf000000000000e0bf";
+        byte[] wkbPolygon = Hex.decodeHex(hexWkb3);
+
+        List<Row> data1 = Arrays.asList(Row.of(wkbLine1), Row.of(wkbLine2), 
Row.of(wkbPolygon));
+        List<Row> data2 = Arrays.asList(Row.of(hexWkb1), Row.of(hexWkb2), 
Row.of(hexWkb3));
+
+        TypeInformation<?>[] colTypes1 = 
{PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO};
+        TypeInformation<?>[] colTypes2 = {BasicTypeInfo.STRING_TYPE_INFO};
+        RowTypeInfo typeInfo1 = new RowTypeInfo(colTypes1, new 
String[]{"wkb"});
+        RowTypeInfo typeInfo2 = new RowTypeInfo(colTypes2, new 
String[]{"wkb"});
+
+        DataStream<Row> wkbDS1 = env.fromCollection(data1).returns(typeInfo1);
+        Table wkbTable1 = tableEnv.fromDataStream(wkbDS1, $("wkb"));
+
+        DataStream<Row> wkbDS2 = env.fromCollection(data2).returns(typeInfo2);
+        Table wkbTable2 = tableEnv.fromDataStream(wkbDS2, $("wkb"));
+
+        Table lineTable1 = 
wkbTable1.select(call(Constructors.ST_LineFromWKB.class.getSimpleName(), 
$("wkb")).as("point"));
+        Table lineTable2 = 
wkbTable2.select(call(Constructors.ST_LineFromWKB.class.getSimpleName(), 
$("wkb")).as("point"));
+
+        // Test with byte array
+        List<Row> results1 = TestBase.take(lineTable1, 3);
+        assertEquals("LINESTRING (0 0, 2 2, 4 4)", 
results1.get(0).getField(0).toString());
+        assertEquals("LINESTRING (0 0, 500 500, 500 -100)", 
results1.get(1).getField(0).toString());
+        assertNull(results1.get(2).getField(0));
+
+        // Test with hex string
+        List<Row> results2 = TestBase.take(lineTable2, 3);
+        assertEquals("LINESTRING (0 0, 2 2, 4 4)", 
results2.get(0).getField(0).toString());
+        assertEquals("LINESTRING (0 0, 500 500, 500 -100)", 
results2.get(1).getField(0).toString());
+        assertNull(results2.get(2).getField(0));
+    }
+
     @Test
     public void testGeomFromEWKB()
     {
diff --git a/python/sedona/sql/st_constructors.py 
b/python/sedona/sql/st_constructors.py
index fae53f49b..9289dc734 100644
--- a/python/sedona/sql/st_constructors.py
+++ b/python/sedona/sql/st_constructors.py
@@ -271,6 +271,20 @@ def ST_PointFromWKB(wkb: ColumnOrName, srid: 
Optional[ColumnOrNameOrNumber] = No
     args = (wkb) if srid is None else (wkb, srid)
     return _call_constructor_function("ST_PointFromWKB", args)
 
+@validate_argument_types
+def ST_LineFromWKB(wkb: ColumnOrName, srid: Optional[ColumnOrNameOrNumber] = 
None) -> Column:
+    """Generate a Line geometry column from a Well-Known Binary (WKB) binary 
column.
+
+    :param wkb: WKB binary column to generate from.
+    :type wkb: ColumnOrName
+    :param srid: SRID to be set for the geometry.
+    :type srid: ColumnOrNameOrNumber
+    :return: Geometry column representing the WKB binary.
+    :rtype: Column
+    """
+    args = (wkb) if srid is None else (wkb, srid)
+    return _call_constructor_function("ST_LineFromWKB", args)
+
 @validate_argument_types
 def ST_MakePoint(x: ColumnOrNameOrNumber, y: ColumnOrNameOrNumber, z: 
Optional[ColumnOrNameOrNumber] = None, m: Optional[ColumnOrNameOrNumber] = 
None) -> Column:
     """Generate a 2D, 3D Z or 4D ZM Point geometry. If z is None then a 2D 
point is generated.
diff --git a/python/tests/sql/test_dataframe_api.py 
b/python/tests/sql/test_dataframe_api.py
index 628aceefa..3fb6e0724 100644
--- a/python/tests/sql/test_dataframe_api.py
+++ b/python/tests/sql/test_dataframe_api.py
@@ -50,6 +50,7 @@ test_configurations = [
     (stc.ST_GeomFromWKT, ("wkt",4326), "linestring_wkt", "", "LINESTRING (1 2, 
3 4)"),
     (stc.ST_GeomFromEWKT, ("ewkt",), "linestring_ewkt", "", "LINESTRING (1 2, 
3 4)"),
     (stc.ST_LineFromText, ("wkt",), "linestring_wkt", "", "LINESTRING (1 2, 3 
4)"),
+    (stc.ST_LineFromWKB, ("wkbLine",), "constructor", 
"ST_ReducePrecision(geom, 2)", "LINESTRING (-2.1 -0.35, -1.5 -0.67)"),
     (stc.ST_LineStringFromText, ("multiple_point", lambda: f.lit(',')), 
"constructor", "", "LINESTRING (0 0, 1 0, 1 1, 0 0)"),
     (stc.ST_Point, ("x", "y"), "constructor", "", "POINT (0 1)"),
     (stc.ST_PointZ, ("x", "y", "z", 4326), "constructor", "", "POINT Z (0 1 
2)"),
diff --git 
a/snowflake-tester/src/test/java/org/apache/sedona/snowflake/snowsql/TestConstructors.java
 
b/snowflake-tester/src/test/java/org/apache/sedona/snowflake/snowsql/TestConstructors.java
index dd6e6bc4f..e0664774d 100644
--- 
a/snowflake-tester/src/test/java/org/apache/sedona/snowflake/snowsql/TestConstructors.java
+++ 
b/snowflake-tester/src/test/java/org/apache/sedona/snowflake/snowsql/TestConstructors.java
@@ -104,6 +104,22 @@ public class TestConstructors extends TestBase{
                 "SRID=4326;POINT (10 15)"
         );
     }
+
+    @Test
+    public void test_ST_LineFromWKB() {
+        registerUDF("ST_LineFromWKB", byte[].class);
+        registerUDF("ST_AsEWKT", byte[].class);
+        verifySqlSingleRes(
+                "select 
sedona.ST_AsText(sedona.ST_LineFromWKB(ST_ASWKB(to_geometry('LINESTRING (0 0, 2 
2, 4 4)'))))",
+                "LINESTRING (0 0, 2 2, 4 4)"
+        );
+        registerUDF("ST_LineFromWKB", byte[].class, int.class);
+        verifySqlSingleRes(
+                "select 
sedona.ST_AsEWKT(sedona.ST_LineFromWKB(ST_ASWKB(to_geometry('LINESTRING (0 0, 2 
2, 4 4)')), 4326))",
+                "SRID=4326;LINESTRING (0 0, 2 2, 4 4)"
+        );
+    }
+
     @Test
     public void test_ST_GeomFromEWKB() {
         registerUDF("ST_GeomFromEWKB", byte[].class);
diff --git 
a/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java 
b/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java
index 14c00a64f..4981573cc 100644
--- a/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java
+++ b/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java
@@ -964,6 +964,20 @@ public class UDFs {
         );
     }
 
+    @UDFAnnotations.ParamMeta(argNames = {"wkb"})
+    public static byte[] ST_LineFromWKB(byte[] wkb) throws ParseException {
+        return GeometrySerde.serialize(
+                Constructors.lineFromWKB(wkb, 0)
+        );
+    }
+
+    @UDFAnnotations.ParamMeta(argNames = {"wkb", "srid"})
+    public static byte[] ST_LineFromWKB(byte[] wkb, int srid) throws 
ParseException {
+        return GeometrySerde.serialize(
+                Constructors.lineFromWKB(wkb, srid)
+        );
+    }
+
     @UDFAnnotations.ParamMeta(argNames = {"geometry", "srid"})
     public static byte[] ST_Polygon(byte[] geometry, int srid) {
         return GeometrySerde.serialize(
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala 
b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
index 3e7eacf0a..6144735b8 100644
--- a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
@@ -39,6 +39,7 @@ object Catalog {
     function[GeometryType](),
     function[ST_PointFromText](),
     function[ST_PointFromWKB](),
+    function[ST_LineFromWKB](),
     function[ST_PolygonFromText](),
     function[ST_LineStringFromText](),
     function[ST_GeomFromText](0),
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala
index 5ca0a56bc..a1c9ba575 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala
@@ -189,6 +189,51 @@ case class ST_GeomFromEWKB(inputExpressions: 
Seq[Expression])
 }
 
 
+case class ST_LineFromWKB(inputExpressions: Seq[Expression])
+  extends Expression with FoldableExpression with ImplicitCastInputTypes with 
CodegenFallback with UserDataGeneratator {
+
+  // Validate the number of input expressions (1 or 2)
+  assert(inputExpressions.length >= 1 && inputExpressions.length <= 2)
+
+  override def nullable: Boolean = true
+
+  override def eval(inputRow: InternalRow): Any = {
+    val wkb = inputExpressions.head.eval(inputRow)
+    val srid = if (inputExpressions.length > 1) 
inputExpressions(1).eval(inputRow) else 0
+
+    wkb match {
+      case geomString: UTF8String =>
+        // Parse UTF-8 encoded WKB string
+        val geom = Constructors.lineStringFromText(geomString.toString, "wkb")
+        if (geom.getGeometryType == "LineString") {
+          geom.setSRID(srid.asInstanceOf[Int])
+          geom.toGenericArrayData
+        } else {
+          null
+        }
+
+      case wkbArray: Array[Byte] =>
+        // Convert raw WKB byte array to geometry
+        Constructors.lineFromWKB(wkbArray, 
srid.asInstanceOf[Int]).toGenericArrayData
+
+      case _ => null
+    }
+  }
+
+  override def dataType: DataType = GeometryUDT
+
+  override def inputTypes: Seq[AbstractDataType] =
+    if (inputExpressions.length == 1) Seq(TypeCollection(StringType, 
BinaryType))
+    else Seq(TypeCollection(StringType, BinaryType), IntegerType)
+
+  override def children: Seq[Expression] = inputExpressions
+
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): 
ST_LineFromWKB = {
+    copy(inputExpressions = newChildren)
+  }
+}
+
+
 case class ST_PointFromWKB(inputExpressions: Seq[Expression])
   extends Expression with FoldableExpression with ImplicitCastInputTypes with 
CodegenFallback with UserDataGeneratator {
 
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_constructors.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_constructors.scala
index 7c99844f4..15f417a7d 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_constructors.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_constructors.scala
@@ -107,6 +107,12 @@ object st_constructors extends DataFrameAPI {
   def ST_PointFromWKB(wkb: Column, srid: Column): Column = 
wrapExpression[ST_PointFromWKB](wkb, srid)
   def ST_PointFromWKB(wkb: String, srid: Int): Column = 
wrapExpression[ST_PointFromWKB](wkb, srid)
 
+  def ST_LineFromWKB(wkb: Column): Column = 
wrapExpression[ST_LineFromWKB](wkb, 0)
+  def ST_LineFromWKB(wkb: String): Column = 
wrapExpression[ST_LineFromWKB](wkb, 0)
+
+  def ST_LineFromWKB(wkb: Column, srid: Column): Column = 
wrapExpression[ST_LineFromWKB](wkb, srid)
+  def ST_LineFromWKB(wkb: String, srid: Int): Column = 
wrapExpression[ST_LineFromWKB](wkb, srid)
+
   def ST_MakePoint(x: Column, y: Column): Column = 
wrapExpression[ST_MakePoint](x, y, null, null)
   def ST_MakePoint(x: String, y: String): Column = 
wrapExpression[ST_MakePoint](x, y, null, null)
   def ST_MakePoint(x: Double, y: Double): Column = 
wrapExpression[ST_MakePoint](x, y, null, null)
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala 
b/spark/common/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala
index 024771e05..995640790 100644
--- 
a/spark/common/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala
+++ 
b/spark/common/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala
@@ -247,6 +247,47 @@ class constructorTestScala extends TestBaseScala {
       assert(wkt.first().getAs[Geometry](0).getGeometryType === "MultiPolygon")
     }
 
+    it("Passed ST_LineFromWKB") {
+      val geometryDf = Seq(
+        
"010200000003000000000000000000000000000000000000000000000000000840000000000000084000000000000010400000000000001040",
+        "0101000000000000000000F03F0000000000000040",
+        
"01020000000300000000000000000000c000000000000000c000000000000010400000000000001040000000000000104000000000000000c0",
+        
"0103000000010000000500000000000000000000000000000000000000000000000000f03f000000000000f03f0000000000001440000000000000f03f0000000000001440000000000000000000000000000000000000000000000000"
+      ).map(Tuple1.apply).toDF("wkb")
+
+      geometryDf.createOrReplaceTempView("wkbtable")
+
+      var validLineDf = sparkSession.sql("SELECT ST_LineFromWKB(wkbtable.wkb) 
FROM wkbtable")
+      var rows = validLineDf.collect()
+      assert(rows.length == 4)
+
+      var expectedPoints = Seq("LINESTRING (0 0, 3 3, 4 4)", null, "LINESTRING 
(-2 -2, 4 4, 4 -2)", null)
+      for (i <- rows.indices) {
+        if (expectedPoints(i) == null) {
+          assert(rows(i).isNullAt(0))
+        } else {
+          assert(rows(i).getAs[Geometry](0).toString == expectedPoints(i))
+        }
+      }
+
+      validLineDf = sparkSession.sql("SELECT 
ST_AsEWKT(ST_LineFromWKB(wkbtable.wkb, 4326)) FROM wkbtable")
+      rows = validLineDf.collect()
+      assert(rows.length == 4)
+
+      expectedPoints = Seq("SRID=4326;LINESTRING (0 0, 3 3, 4 4)", null, 
"SRID=4326;LINESTRING (-2 -2, 4 4, 4 -2)", null)
+      for (i <- rows.indices) {
+        if (expectedPoints(i) == null) {
+          assert(rows(i).isNullAt(0))
+        } else {
+          assert(rows(i).get(0).toString == expectedPoints(i))
+        }
+      }
+
+      intercept[Exception] {
+        sparkSession.sql("SELECT ST_LineFromWKB('invalid')").collect()
+      }
+    }
+
     it("Passed ST_GeomFromWKB") {
       // UTF-8 encoded WKB String
       val polygonWkbDf = sparkSession.read.format("csv").option("delimiter", 
"\t").option("header", "false").load(mixedWkbGeometryInputLocation)
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/dataFrameAPITestScala.scala 
b/spark/common/src/test/scala/org/apache/sedona/sql/dataFrameAPITestScala.scala
index 3bf263be4..6df29042e 100644
--- 
a/spark/common/src/test/scala/org/apache/sedona/sql/dataFrameAPITestScala.scala
+++ 
b/spark/common/src/test/scala/org/apache/sedona/sql/dataFrameAPITestScala.scala
@@ -140,6 +140,19 @@ class dataFrameAPITestScala extends TestBaseScala {
       assert(actualResult == expectedResult)
     }
 
+    it("passed st_linefromwkb") {
+      val wkbSeq = Seq[Array[Byte]](Array[Byte](1, 2, 0, 0, 0, 2, 0, 0, 0, 0, 
0, 0, 0, -124, -42, 0, -64, 0, 0, 0, 0, -128, -75, -42, -65, 0, 0, 0, 96, -31, 
-17, -9, -65, 0, 0, 0, -128, 7, 93, -27, -65))
+      val df = wkbSeq.toDF("wkb").select(ST_LineFromWKB("wkb"))
+      val actualResult = df.take(1)(0).get(0).asInstanceOf[Geometry].toText()
+      val expectedResult = "LINESTRING (-2.1047439575195312 
-0.354827880859375, -1.49606454372406 -0.6676061153411865)"
+      assert(actualResult == expectedResult)
+
+      val wkbStringSeq = 
Seq("0102000000020000000000000084d600c00000000080b5d6bf00000060e1eff7bf00000080075de5bf")
+      val dfWithString = wkbStringSeq.toDF("wkb").select(ST_LineFromWKB("wkb"))
+      val actualStringResult = 
dfWithString.take(1)(0).get(0).asInstanceOf[Geometry].toText()
+      assert(actualStringResult == expectedResult)
+    }
+
     it("passed st_geomfromwkt") {
       val df = sparkSession.sql("SELECT 'POINT(0.0 1.0)' AS 
wkt").select(ST_GeomFromWKT("wkt"))
       val actualResult = df.take(1)(0).get(0).asInstanceOf[Geometry].toText()

Reply via email to