This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 8d7ca370 [SEDONA-303] Port all Sedona Spark function to Sedona Flink
-- ST_Intersection_Aggr (#905)
8d7ca370 is described below
commit 8d7ca3707b104941de33699ac769f80c781a9234
Author: Junhao Liu <[email protected]>
AuthorDate: Mon Jul 17 22:34:54 2023 +0800
[SEDONA-303] Port all Sedona Spark function to Sedona Flink --
ST_Intersection_Aggr (#905)
---
docs/api/flink/Aggregator.md | 14 ++++++
.../main/java/org/apache/sedona/flink/Catalog.java | 1 +
.../sedona/flink/expressions/Aggregators.java | 55 ++++++++++++++++++++++
.../org/apache/sedona/flink/AggregatorTest.java | 13 +++++
4 files changed, 83 insertions(+)
diff --git a/docs/api/flink/Aggregator.md b/docs/api/flink/Aggregator.md
index 8e4e9554..57e0c7e8 100644
--- a/docs/api/flink/Aggregator.md
+++ b/docs/api/flink/Aggregator.md
@@ -12,6 +12,20 @@ SELECT ST_Envelope_Aggr(pointdf.arealandmark)
FROM pointdf
```
+## ST_Intersection_Aggr
+
+Introduction: Return the polygon intersection of all polygons in A
+
+Format: `ST_Intersection_Aggr (A:geometryColumn)`
+
+Since: `v1.5.0`
+
+SQL example:
+```sql
+SELECT ST_Intersection_Aggr(polygondf.polygonshape)
+FROM polygondf
+```
+
## ST_Union_Aggr
Introduction: Return the polygon union of all polygons in A. All inputs must
be polygons.
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index 3087ec23..d779b1ef 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -22,6 +22,7 @@ public class Catalog {
public static UserDefinedFunction[] getFuncs() {
return new UserDefinedFunction[]{
new Aggregators.ST_Envelope_Aggr(),
+ new Aggregators.ST_Intersection_Aggr(),
new Aggregators.ST_Union_Aggr(),
new Constructors.ST_Point(),
new Constructors.ST_PointZ(),
diff --git
a/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java
b/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java
index 3b019907..c69a98b3 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java
@@ -82,6 +82,61 @@ public class Aggregators {
}
}
+
+ // Compute the Union boundary of numbers of geometries
+ //
+ @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+ public static class ST_Intersection_Aggr extends
AggregateFunction<Geometry, Accumulators.AccGeometry> {
+
+ @Override
+ public Accumulators.AccGeometry createAccumulator() {
+ return new Accumulators.AccGeometry();
+ }
+
+ @Override
+ @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+ public Geometry getValue(Accumulators.AccGeometry acc) {
+ return acc.geom;
+ }
+
+ public void accumulate(Accumulators.AccGeometry acc,
+ @DataTypeHint(value = "RAW", bridgedTo =
Geometry.class) Object o) {
+ if (acc.geom == null){
+ acc.geom = (Geometry) o;
+ } else {
+ acc.geom = acc.geom.intersection((Geometry) o);
+ }
+ }
+
+ /**
+ * TODO: find an efficient algorithm to incrementally and
decrementally update the accumulator
+ *
+ * @param acc
+ * @param o
+ */
+ public void retract(Accumulators.AccGeometry acc,
+ @DataTypeHint(value = "RAW", bridgedTo =
Geometry.class) Object o) {
+ Geometry geometry = (Geometry) o;
+ assert (false);
+ }
+
+ public void merge (Accumulators.AccGeometry acc, Iterable <
Accumulators.AccGeometry > it){
+ for (Accumulators.AccGeometry a : it) {
+ if (acc.geom == null){
+ // make accumulate equal to acc
+ acc.geom = a.geom;
+ } else {
+ acc.geom = acc.geom.intersection(a.geom);
+ }
+ }
+ }
+
+ public void resetAccumulator (Accumulators.AccGeometry acc){
+ acc.geom = null;
+ }
+ }
+
+
// Compute the Union boundary of numbers of geometries
//
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
diff --git a/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java
b/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java
index 1e8e3fe3..9958a33e 100644
--- a/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java
@@ -55,6 +55,19 @@ public class AggregatorTest extends TestBase{
assertEquals(5.656854249492381, last(resultTable).getField(0));
}
+ @Test
+ public void testIntersection_Aggr(){
+ Table polygonTable = createPolygonOverlappingTable(testDataSize);
+ Table result = polygonTable.select(call("ST_Intersection_Aggr",
$(polygonColNames[0])));
+ Row last = last(result);
+ assertEquals("LINESTRING EMPTY", last.getField(0).toString());
+
+ polygonTable = createPolygonOverlappingTable(3);
+ result = polygonTable.select(call("ST_Intersection_Aggr",
$(polygonColNames[0])));
+ last = last(result);
+ assertEquals("LINESTRING (1 1, 1 0)", last.getField(0).toString());
+ }
+
@Test
public void testUnion_Aggr(){
Table polygonTable = createPolygonOverlappingTable(testDataSize);