This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new d13099f5 [SEDONA-303] Port all Sedona Spark functions to Sedona Flink
-- Step 7 (#896)
d13099f5 is described below
commit d13099f53af7ed8eec058221320119e812078f85
Author: Junhao Liu <[email protected]>
AuthorDate: Thu Jul 13 01:44:50 2023 +0800
[SEDONA-303] Port all Sedona Spark functions to Sedona Flink -- Step 7
(#896)
---
docs/api/flink/Constructor.md | 14 ++++
docs/api/flink/Predicate.md | 62 ++++++++++++++-
.../main/java/org/apache/sedona/flink/Catalog.java | 5 ++
.../sedona/flink/expressions/Constructors.java | 16 +++-
.../sedona/flink/expressions/Predicates.java | 92 ++++++++++++++++++++--
.../org/apache/sedona/flink/ConstructorTest.java | 26 ++++++
.../org/apache/sedona/flink/PredicateTest.java | 35 ++++++++
7 files changed, 239 insertions(+), 11 deletions(-)
diff --git a/docs/api/flink/Constructor.md b/docs/api/flink/Constructor.md
index f24313e8..226719b6 100644
--- a/docs/api/flink/Constructor.md
+++ b/docs/api/flink/Constructor.md
@@ -176,6 +176,20 @@ SELECT ST_Point(x, y) AS pointshape
FROM pointtable
```
+## ST_PointZ
+
+Introduction: Construct a Point from X, Y and Z and an optional srid. If srid
is not set, it defaults to 0 (unknown).
+
+Format: `ST_PointZ (X:decimal, Y:decimal, Z:decimal)`
+Format: `ST_PointZ (X:decimal, Y:decimal, Z:decimal, srid:integer)`
+
+Since: `v1.5.0`
+
+SQL example:
+```sql
+SELECT ST_PointZ(1.0, 2.0, 3.0) AS pointshape
+```
+
## ST_PointFromText
Introduction: Construct a Point from Text, delimited by Delimiter
diff --git a/docs/api/flink/Predicate.md b/docs/api/flink/Predicate.md
index 8794ddc6..994b7cd1 100644
--- a/docs/api/flink/Predicate.md
+++ b/docs/api/flink/Predicate.md
@@ -13,6 +13,21 @@ FROM pointdf
WHERE ST_Contains(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0),
pointdf.arealandmark)
```
+## ST_Crosses
+
+Introduction: Return true if A crosses B
+
+Format: `ST_Crosses (A:geometry, B:geometry)`
+
+Since: `v1.5.0`
+
+SQL example:
+```sql
+SELECT *
+FROM pointdf
+WHERE ST_Crosses(pointdf.arealandmark,
ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0))
+```
+
## ST_Disjoint
Introduction: Return true if A and B are disjoint
@@ -21,13 +36,28 @@ Format: `ST_Disjoint (A:geometry, B:geometry)`
Since: `v1.2.1`
-Spark SQL example:
+SQL example:
```sql
SELECT *
FROM pointdf
WHERE ST_Disjoinnt(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0),
pointdf.arealandmark)
```
+## ST_Equals
+
+Introduction: Return true if A equals to B
+
+Format: `ST_Equals (A:geometry, B:geometry)`
+
+Since: `v1.5.0`
+
+SQL example:
+```sql
+SELECT *
+FROM pointdf
+WHERE ST_Equals(pointdf.arealandmark,
ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0))
+```
+
## ST_Intersects
Introduction: Return true if A intersects B
@@ -43,6 +73,36 @@ FROM pointdf
WHERE ST_Intersects(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0),
pointdf.arealandmark)
```
+## ST_Overlaps
+
+Introduction: Return true if A overlaps B
+
+Format: `ST_Overlaps (A:geometry, B:geometry)`
+
+Since: `v1.5.0`
+
+SQL example:
+```sql
+SELECT *
+FROM geom
+WHERE ST_Overlaps(geom.geom_a, geom.geom_b)
+```
+
+## ST_Touches
+
+Introduction: Return true if A touches B
+
+Format: `ST_Touches (A:geometry, B:geometry)`
+
+Since: `v1.5.0`
+
+SQL example:
+```sql
+SELECT *
+FROM pointdf
+WHERE ST_Touches(pointdf.arealandmark,
ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0))
+```
+
## ST_Within
Introduction: Return true if A is within B
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index 5d343227..5cfe8537 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -24,6 +24,7 @@ public class Catalog {
new Aggregators.ST_Envelope_Aggr(),
new Aggregators.ST_Union_Aggr(),
new Constructors.ST_Point(),
+ new Constructors.ST_PointZ(),
new Constructors.ST_PointFromText(),
new Constructors.ST_LineStringFromText(),
new Constructors.ST_LineFromText(),
@@ -140,11 +141,15 @@ public class Catalog {
return new UserDefinedFunction[]{
new Predicates.ST_Intersects(),
new Predicates.ST_Contains(),
+ new Predicates.ST_Crosses(),
new Predicates.ST_Within(),
new Predicates.ST_Covers(),
new Predicates.ST_CoveredBy(),
new Predicates.ST_Disjoint(),
+ new Predicates.ST_Equals(),
new Predicates.ST_OrderingEquals(),
+ new Predicates.ST_Overlaps(),
+ new Predicates.ST_Touches(),
};
}
}
diff --git
a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
index 36bbeba8..fcc426a0 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
@@ -38,9 +38,19 @@ public class Constructors {
public static class ST_Point extends ScalarFunction {
@DataTypeHint(value = "RAW", bridgedTo =
org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint("Double") Double x,
@DataTypeHint("Double") Double y) throws ParseException {
- Coordinate coordinates = new Coordinate(x, y);
- GeometryFactory geometryFactory = new GeometryFactory();
- return geometryFactory.createPoint(coordinates);
+ return org.apache.sedona.common.Constructors.point(x, y);
+ }
+ }
+
+ public static class ST_PointZ extends ScalarFunction {
+ @DataTypeHint(value = "RAW", bridgedTo =
org.locationtech.jts.geom.Geometry.class)
+ public Geometry eval(@DataTypeHint("Double") Double x,
@DataTypeHint("Double") Double y, @DataTypeHint("Double") Double z) throws
ParseException {
+ return eval(x, y, z, 0);
+ }
+
+ @DataTypeHint(value = "RAW", bridgedTo =
org.locationtech.jts.geom.Geometry.class)
+ public Geometry eval(@DataTypeHint("Double") Double x,
@DataTypeHint("Double") Double y, @DataTypeHint("Double") Double z,
@DataTypeHint("Integer") Integer srid) throws ParseException {
+ return org.apache.sedona.common.Constructors.pointZ(x, y, z, srid);
}
}
diff --git
a/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java
b/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java
index b29e6ad5..f8416d19 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java
@@ -33,7 +33,7 @@ public class Predicates {
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
- return geom1.intersects(geom2);
+ return org.apache.sedona.common.Predicates.intersects(geom1,
geom2);
}
}
@@ -53,7 +53,7 @@ public class Predicates {
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
- return geom1.contains(geom2);
+ return org.apache.sedona.common.Predicates.contains(geom1, geom2);
}
}
@@ -72,7 +72,7 @@ public class Predicates {
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
- return geom1.within(geom2);
+ return org.apache.sedona.common.Predicates.within(geom1, geom2);
}
}
@@ -92,7 +92,7 @@ public class Predicates {
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
- return geom1.covers(geom2);
+ return org.apache.sedona.common.Predicates.covers(geom1, geom2);
}
}
@@ -112,7 +112,25 @@ public class Predicates {
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
- return geom1.coveredBy(geom2);
+ return org.apache.sedona.common.Predicates.coveredBy(geom1, geom2);
+ }
+ }
+
+ public static class ST_Crosses extends ScalarFunction
+ {
+ /**
+ * Constructor for relation checking without duplicate removal
+ */
+ public ST_Crosses()
+ {
+ }
+
+ @DataTypeHint("Boolean")
+ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo =
org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value =
"RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2)
+ {
+ Geometry geom1 = (Geometry) o1;
+ Geometry geom2 = (Geometry) o2;
+ return org.apache.sedona.common.Predicates.crosses(geom1, geom2);
}
}
@@ -132,7 +150,27 @@ public class Predicates {
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
- return geom1.disjoint(geom2);
+ return org.apache.sedona.common.Predicates.disjoint(geom1, geom2);
+ }
+ }
+
+ public static class ST_Equals
+ extends ScalarFunction
+ {
+
+ /**
+ * Constructor for relation checking without duplicate removal
+ */
+ public ST_Equals()
+ {
+ }
+
+ @DataTypeHint("Boolean")
+ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo =
org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value =
"RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2)
+ {
+ Geometry geom1 = (Geometry) o1;
+ Geometry geom2 = (Geometry) o2;
+ return org.apache.sedona.common.Predicates.equals(geom1, geom2);
}
}
@@ -152,7 +190,47 @@ public class Predicates {
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
- return geom1.equalsExact(geom2);
+ return org.apache.sedona.common.Predicates.orderingEquals(geom1,
geom2);
+ }
+ }
+
+ public static class ST_Overlaps
+ extends ScalarFunction
+ {
+
+ /**
+ * Constructor for relation checking without duplicate removal
+ */
+ public ST_Overlaps()
+ {
+ }
+
+ @DataTypeHint("Boolean")
+ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo =
org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value =
"RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2)
+ {
+ Geometry geom1 = (Geometry) o1;
+ Geometry geom2 = (Geometry) o2;
+ return org.apache.sedona.common.Predicates.overlaps(geom1, geom2);
+ }
+ }
+
+ public static class ST_Touches
+ extends ScalarFunction
+ {
+
+ /**
+ * Constructor for relation checking without duplicate removal
+ */
+ public ST_Touches()
+ {
+ }
+
+ @DataTypeHint("Boolean")
+ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo =
org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value =
"RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2)
+ {
+ Geometry geom1 = (Geometry) o1;
+ Geometry geom2 = (Geometry) o2;
+ return org.apache.sedona.common.Predicates.touches(geom1, geom2);
}
}
}
diff --git a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
index 2b0514e2..a39a8a04 100644
--- a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
@@ -21,6 +21,7 @@ import org.junit.Test;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.Point;
import org.locationtech.jts.geom.Polygon;
import org.wololo.jts2geojson.GeoJSONReader;
@@ -73,6 +74,31 @@ public class ConstructorTest extends TestBase{
assertEquals(expected, result);
}
+ @Test
+ public void testPointZ() {
+ List<Row> data = new ArrayList<>();
+ data.add(Row.of(2.0, 2.0, 5.0, "point"));
+ String[] colNames = new String[]{"x", "y", "z", "name_point"};
+
+ TypeInformation<?>[] colTypes = {
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO};
+ RowTypeInfo typeInfo = new RowTypeInfo(colTypes, colNames);
+ DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
+ Table pointTable = tableEnv.fromDataStream(ds);
+
+ Table geomTable = pointTable
+ .select(call(Constructors.ST_PointZ.class.getSimpleName(),
$(colNames[0]), $(colNames[1]), $(colNames[2]))
+ .as(colNames[3]));
+
+ Point result = first(geomTable)
+ .getFieldAs(colNames[3]);
+
+ assertEquals(5.0, result.getCoordinate().getZ(), 1e-6);
+ }
+
@Test
public void testPointFromText() {
List<Row> data = createPointWKT(testDataSize);
diff --git a/flink/src/test/java/org/apache/sedona/flink/PredicateTest.java
b/flink/src/test/java/org/apache/sedona/flink/PredicateTest.java
index 1b991f93..237c16d5 100644
--- a/flink/src/test/java/org/apache/sedona/flink/PredicateTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/PredicateTest.java
@@ -14,10 +14,13 @@
package org.apache.sedona.flink;
import org.apache.flink.table.api.Table;
+import org.apache.sedona.flink.expressions.Predicates;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
public class PredicateTest extends TestBase{
@BeforeClass
@@ -79,6 +82,22 @@ public class PredicateTest extends TestBase{
assertEquals(1, count(result));
}
+ @Test
+ public void testCrosses() {
+ Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('MULTIPOINT((0
0), (2 2))') AS g1, ST_GeomFromWKT('LINESTRING(-1 -1, 1 1)') as g2");
+ table = table.select(call(Predicates.ST_Crosses.class.getSimpleName(),
$("g1"), $("g2")));
+ Boolean actual = (Boolean) first(table).getField(0);
+ assertEquals(true, actual);
+ }
+
+ @Test
+ public void testEquals() {
+ Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('LINESTRING (0
0, 2 2)') AS g1, ST_GeomFromWKT('LINESTRING (0 0, 1 1, 2 2)') as g2");
+ table = table.select(call(Predicates.ST_Equals.class.getSimpleName(),
$("g1"), $("g2")));
+ Boolean actual = (Boolean) first(table).getField(0);
+ assertEquals(true, actual);
+ }
+
@Test
public void testOrderingEquals() {
Table lineStringTable = createLineStringTable(testDataSize);
@@ -87,4 +106,20 @@ public class PredicateTest extends TestBase{
Table result = lineStringTable.filter(expr);
assertEquals(1, count(result));
}
+
+ @Test
+ public void testOverlaps() {
+ Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('LINESTRING (0
0, 2 2)') AS g1, ST_GeomFromWKT('LINESTRING (1 1, 3 3)') as g2");
+ table =
table.select(call(Predicates.ST_Overlaps.class.getSimpleName(), $("g1"),
$("g2")));
+ Boolean actual = (Boolean) first(table).getField(0);
+ assertEquals(true, actual);
+ }
+
+ @Test
+ public void testTouches() {
+ Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('LINESTRING (0
0, 1 0)') AS g1, ST_GeomFromWKT('LINESTRING (0 0, 1 1)') as g2");
+ table = table.select(call(Predicates.ST_Touches.class.getSimpleName(),
$("g1"), $("g2")));
+ Boolean actual = (Boolean) first(table).getField(0);
+ assertEquals(true, actual);
+ }
}