This is an automated email from the ASF dual-hosted git repository. malka pushed a commit to branch SEDONA-121]-constructors-from-spark2flink in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git
commit 81353acd6ddcd43f34d549841e0327961373f7aa Author: Netanel Malka <[email protected]> AuthorDate: Wed Jun 8 23:02:22 2022 +0300 ST_GeomFromText and ST_LineFromText, ST_LineStringFromText --- docs/api/flink/Constructor.md | 40 +++++++++++++ .../main/java/org/apache/sedona/flink/Catalog.java | 3 + .../sedona/flink/expressions/Constructors.java | 70 +++++++++++++++++----- .../org/apache/sedona/flink/ConstructorTest.java | 35 +++++++++++ .../java/org/apache/sedona/flink/TestBase.java | 47 +++++++++++++-- 5 files changed, 176 insertions(+), 19 deletions(-) diff --git a/docs/api/flink/Constructor.md b/docs/api/flink/Constructor.md index e02d00ec..e8568b01 100644 --- a/docs/api/flink/Constructor.md +++ b/docs/api/flink/Constructor.md @@ -12,6 +12,20 @@ SQL example: SELECT ST_GeomFromWKT('POINT(40.7128 -74.0060)') AS geometry ``` +## ST_GeomFromText + +Introduction: Construct a Geometry from Wkt. Alias of [ST_GeomFromWKT](#ST_GeomFromWKT) + +Format: +`ST_GeomFromText (Wkt:string)` + +Since: `v1.2.1` + +SQL example: +```SQL +SELECT ST_GeomFromText('POINT(40.7128 -74.0060)') AS geometry +``` + ## ST_GeomFromWKB Introduction: Construct a Geometry from WKB string or Binary @@ -66,6 +80,32 @@ SQL example: SELECT ST_PointFromText('40.7128,-74.0060', ',') AS pointshape ``` +## ST_LineFromText + +Introduction: Construct a LineString from Text, delimited by Delimiter (Optional) + +Format: `ST_LineFromText (Text:string, Delimiter:char)` + +Since: `v1.2.1` + +SQL example: +```SQL +SELECT ST_LineFromText('Linestring(1 2, 3 4)') AS line +``` + +## ST_LineStringFromText + +Introduction: Construct a LineString from Text, delimited by Delimiter (Optional). Alias of [ST_LineFromText](#ST_LineFromText) + +Format: `ST_LineStringFromText (Text:string, Delimiter:char)` + +Since: `v1.2.1` + +Spark SQL example: +```SQL +SELECT ST_LineStringFromText('Linestring(1 2, 3 4)') AS line +``` + ## ST_PolygonFromText Introduction: Construct a Polygon from Text, delimited by Delimiter. Path must be closed diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java index eac9b4f9..9e4ff44a 100644 --- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java +++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java @@ -21,9 +21,12 @@ public class Catalog { public static UserDefinedFunction[] getFuncs() { return new UserDefinedFunction[]{ new Constructors.ST_PointFromText(), + new Constructors.ST_LineStringFromText(), + new Constructors.ST_LineFromText(), new Constructors.ST_PolygonFromText(), new Constructors.ST_PolygonFromEnvelope(), new Constructors.ST_GeomFromWKT(), + new Constructors.ST_GeomFromText(), new Constructors.ST_GeomFromWKB(), new Constructors.ST_GeomFromGeoJSON(), new Constructors.ST_GeomFromGeoHash(), diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java index 2b2815c0..97900ede 100644 --- a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java +++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java @@ -26,12 +26,17 @@ import org.locationtech.jts.io.WKBReader; import org.locationtech.jts.io.ParseException; public class Constructors { + + private static Geometry getGeometryByType(String geom, String inputDelimiter, GeometryType geometryType) throws ParseException { + FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter); + FormatUtils<Geometry> formatUtils = new FormatUtils<>(delimiter, false, geometryType); + return formatUtils.readGeometry(geom); + } + public static class ST_PointFromText extends ScalarFunction { @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) public Geometry eval(@DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) throws ParseException { - FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter); - FormatUtils<Geometry> formatUtils = new FormatUtils(delimiter, false, GeometryType.POINT); - return formatUtils.readGeometry(s); + return getGeometryByType(s, inputDelimiter, GeometryType.POINT); } @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) @@ -40,18 +45,44 @@ public class Constructors { } } + public static class ST_LineFromText extends ScalarFunction { + @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) + public Geometry eval(@DataTypeHint("String") String lineString, + @DataTypeHint("String") String inputDelimiter) throws ParseException { + // The default delimiter is comma. Otherwise, use the delimiter given by the user + return getGeometryByType(lineString, inputDelimiter, GeometryType.LINESTRING); + } + + @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) + public Geometry eval(@DataTypeHint("String") String lineString) throws ParseException { + return eval(lineString, null); + } + } + + public static class ST_LineStringFromText extends ScalarFunction { + @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) + public Geometry eval(@DataTypeHint("String") String lineString, + @DataTypeHint("String") String inputDelimiter) throws ParseException { + // The default delimiter is comma. Otherwise, use the delimiter given by the user + return new ST_LineFromText().eval(lineString, inputDelimiter); + } + + @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) + public Geometry eval(@DataTypeHint("String") String lineString) throws ParseException { + return eval(lineString, null); + } + } + public static class ST_PolygonFromText extends ScalarFunction { @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) - public Geometry eval(@DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) throws ParseException { + public Geometry eval(@DataTypeHint("String") String polygonString, @DataTypeHint("String") String inputDelimiter) throws ParseException { // The default delimiter is comma. Otherwise, use the delimiter given by the user - FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter); - FormatUtils<Geometry> formatUtils = new FormatUtils(delimiter, false, GeometryType.POLYGON); - return formatUtils.readGeometry(s); + return getGeometryByType(polygonString, inputDelimiter, GeometryType.POLYGON); } @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) - public Geometry eval(@DataTypeHint("String") String s) throws ParseException { - return eval(s, null); + public Geometry eval(@DataTypeHint("String") String polygonString) throws ParseException { + return eval(polygonString, null); } } @@ -70,19 +101,29 @@ public class Constructors { } } + private static Geometry getGeometryByFileData(String wktString, FileDataSplitter dataSplitter) throws ParseException { + FormatUtils<Geometry> formatUtils = new FormatUtils<>(dataSplitter, false); + return formatUtils.readGeometry(wktString); + } + public static class ST_GeomFromWKT extends ScalarFunction { @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) public Geometry eval(@DataTypeHint("String") String wktString) throws ParseException { - FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false); - return formatUtils.readGeometry(wktString); + return getGeometryByFileData(wktString, FileDataSplitter.WKT); + } + } + + public static class ST_GeomFromText extends ScalarFunction { + @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) + public Geometry eval(@DataTypeHint("String") String wktString) throws ParseException { + return new ST_GeomFromWKT().eval(wktString); } } public static class ST_GeomFromWKB extends ScalarFunction { @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) public Geometry eval(@DataTypeHint("String") String wkbString) throws ParseException { - FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKB, false); - return formatUtils.readGeometry(wkbString); + return getGeometryByFileData(wkbString, FileDataSplitter.WKB); } @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) @@ -96,8 +137,7 @@ public class Constructors { public static class ST_GeomFromGeoJSON extends ScalarFunction { @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) public Geometry eval(@DataTypeHint("String") String geoJson) throws ParseException { - FormatUtils formatUtils = new FormatUtils(FileDataSplitter.GEOJSON, false); - return formatUtils.readGeometry(geoJson); + return getGeometryByFileData(geoJson, FileDataSplitter.GEOJSON); } } diff --git a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java index e828a854..dd8925d6 100644 --- a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java +++ b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java @@ -50,6 +50,30 @@ public class ConstructorTest extends TestBase{ assertEquals(result.toString(), data.get(data.size() - 1).toString()); } + @Test + public void testLineFromText() { + List<Row> data = createLineStringWKT(testDataSize); + + Table lineStringTable = createLineStringTextTable(testDataSize) + .select(call(Constructors.ST_LineFromText.class.getSimpleName(), $(linestringColNames[0])).as(linestringColNames[0]), + $(linestringColNames[1])); + Row result = last(lineStringTable); + + assertEquals(result.toString(), data.get(data.size() - 1).toString()); + } + + @Test + public void testLineStringFromText() { + List<Row> data = createLineStringWKT(testDataSize); + + Table lineStringTable = createLineStringTextTable(testDataSize) + .select(call(Constructors.ST_LineStringFromText.class.getSimpleName(), $(linestringColNames[0])).as(linestringColNames[0]), + $(linestringColNames[1])); + Row result = last(lineStringTable); + + assertEquals(result.toString(), data.get(data.size() - 1).toString()); + } + @Test public void testPolygonFromText() { List<Row> data = createPolygonWKT(testDataSize); @@ -68,6 +92,17 @@ public class ConstructorTest extends TestBase{ assertEquals(result.toString(), data.get(data.size() - 1).toString()); } + @Test + public void testGeomFromText() { + List<Row> data = createPolygonWKT(testDataSize); + Table wktTable = createTextTable(data, polygonColNames); + Table geomTable = wktTable.select(call(Constructors.ST_GeomFromText.class.getSimpleName(), + $(polygonColNames[0])).as(polygonColNames[0]), + $(polygonColNames[1])); + Row result = last(geomTable); + assertEquals(result.toString(), data.get(data.size() - 1).toString()); + } + @Test public void testPolygonFromEnvelope() { Double minX = 1.0; diff --git a/flink/src/test/java/org/apache/sedona/flink/TestBase.java b/flink/src/test/java/org/apache/sedona/flink/TestBase.java index 9623d231..3dd99432 100644 --- a/flink/src/test/java/org/apache/sedona/flink/TestBase.java +++ b/flink/src/test/java/org/apache/sedona/flink/TestBase.java @@ -28,14 +28,10 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.sedona.flink.expressions.Constructors; import org.locationtech.jts.geom.*; -import org.locationtech.jts.geom.impl.CoordinateArraySequence; -import org.wololo.geojson.Feature; import org.wololo.jts2geojson.GeoJSONWriter; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; @@ -45,6 +41,7 @@ public class TestBase { protected static StreamTableEnvironment tableEnv; static int testDataSize = 1000; static String[] pointColNames = {"geom_point", "name_point"}; + static String[] linestringColNames = {"geom_linestring", "name_linestring"}; static String[] polygonColNames = {"geom_polygon", "name_polygon"}; static String pointTableName = "point_table"; static String polygonTableName = "polygon_table"; @@ -126,6 +123,44 @@ public class TestBase { return data; } + static List<Row> createLineStringText(int size) { + List<Row> data = new ArrayList<>(); + for (int i = 0; i < size; i++) { + // Create polygons each of which only has 1 match in points + // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5) + String minX = String.valueOf(i - 0.5); + String minY = String.valueOf(i - 0.5); + String maxX = String.valueOf(i + 0.5); + String maxY = String.valueOf(i + 0.5); + List<String> linestring = new ArrayList<>(); + linestring.add(minX); + linestring.add(minY); + linestring.add(maxX); + linestring.add(maxY); + + data.add(Row.of(String.join(",", linestring), "linestring" + i)); + } + return data; + } + + static List<Row> createLineStringWKT(int size) { + List<Row> data = new ArrayList<>(); + for (int i = 0; i < size; i++) { + + String minX = String.valueOf(i - 0.5); + String minY = String.valueOf(i - 0.5); + String maxX = String.valueOf(i + 0.5); + String maxY = String.valueOf(i + 0.5); + + List<String> linestring = new ArrayList<>(); + linestring.add(minX + " " + minY); + linestring.add(maxX + " " + maxY); + + data.add(Row.of("LINESTRING (" + String.join(", ", linestring) + ")", "linestring" + i)); + } + return data; + } + static List<Row> createPolygonWKT(int size) { List<Row> data = new ArrayList<>(); for (int i = 0; i < size; i++) { @@ -190,6 +225,10 @@ public class TestBase { return createTextTable(createPointText_real(size), pointColNames); } + static Table createLineStringTextTable(int size) { + return createTextTable(createLineStringText(size), linestringColNames); + } + static Table createPolygonTextTable(int size) { return createTextTable(createPolygonText(size), polygonColNames); }
