This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d7ca370 [SEDONA-303] Port all Sedona Spark function to Sedona Flink 
-- ST_Intersection_Aggr (#905)
8d7ca370 is described below

commit 8d7ca3707b104941de33699ac769f80c781a9234
Author: Junhao Liu <[email protected]>
AuthorDate: Mon Jul 17 22:34:54 2023 +0800

    [SEDONA-303] Port all Sedona Spark function to Sedona Flink -- 
ST_Intersection_Aggr (#905)
---
 docs/api/flink/Aggregator.md                       | 14 ++++++
 .../main/java/org/apache/sedona/flink/Catalog.java |  1 +
 .../sedona/flink/expressions/Aggregators.java      | 55 ++++++++++++++++++++++
 .../org/apache/sedona/flink/AggregatorTest.java    | 13 +++++
 4 files changed, 83 insertions(+)

diff --git a/docs/api/flink/Aggregator.md b/docs/api/flink/Aggregator.md
index 8e4e9554..57e0c7e8 100644
--- a/docs/api/flink/Aggregator.md
+++ b/docs/api/flink/Aggregator.md
@@ -12,6 +12,20 @@ SELECT ST_Envelope_Aggr(pointdf.arealandmark)
 FROM pointdf
 ```
 
+## ST_Intersection_Aggr
+
+Introduction: Return the polygon intersection of all polygons in A
+
+Format: `ST_Intersection_Aggr (A:geometryColumn)`
+
+Since: `v1.5.0`
+
+SQL example:
+```sql
+SELECT ST_Intersection_Aggr(polygondf.polygonshape)
+FROM polygondf
+```
+
 ## ST_Union_Aggr
 
 Introduction: Return the polygon union of all polygons in A. All inputs must 
be polygons.
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java 
b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index 3087ec23..d779b1ef 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -22,6 +22,7 @@ public class Catalog {
     public static UserDefinedFunction[] getFuncs() {
         return new UserDefinedFunction[]{
                 new Aggregators.ST_Envelope_Aggr(),
+                new Aggregators.ST_Intersection_Aggr(),
                 new Aggregators.ST_Union_Aggr(),
                 new Constructors.ST_Point(),
                 new Constructors.ST_PointZ(),
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java 
b/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java
index 3b019907..c69a98b3 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java
@@ -82,6 +82,61 @@ public class Aggregators {
         }
     }
 
+
+    // Compute the Union boundary of numbers of geometries
+    //
+    @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+    public static class ST_Intersection_Aggr extends 
AggregateFunction<Geometry, Accumulators.AccGeometry> {
+
+        @Override
+        public Accumulators.AccGeometry createAccumulator() {
+            return new Accumulators.AccGeometry();
+        }
+
+        @Override
+        @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+        public Geometry getValue(Accumulators.AccGeometry acc) {
+            return acc.geom;
+        }
+
+        public void accumulate(Accumulators.AccGeometry acc,
+                               @DataTypeHint(value = "RAW", bridgedTo = 
Geometry.class) Object o) {
+            if (acc.geom == null){
+                acc.geom = (Geometry) o;
+            } else {
+                acc.geom = acc.geom.intersection((Geometry) o);
+            }
+        }
+
+        /**
+         * TODO: find an efficient algorithm to incrementally and 
decrementally update the accumulator
+         *
+         * @param acc
+         * @param o
+         */
+        public void retract(Accumulators.AccGeometry acc,
+                            @DataTypeHint(value = "RAW", bridgedTo = 
Geometry.class) Object o) {
+            Geometry geometry = (Geometry) o;
+            assert (false);
+        }
+
+        public void merge (Accumulators.AccGeometry acc, Iterable < 
Accumulators.AccGeometry > it){
+            for (Accumulators.AccGeometry a : it) {
+                    if (acc.geom == null){
+    //      make accumulate equal to acc
+                        acc.geom = a.geom;
+                    } else {
+                        acc.geom = acc.geom.intersection(a.geom);
+                    }
+                }
+        }
+
+        public void resetAccumulator (Accumulators.AccGeometry acc){
+            acc.geom = null;
+        }
+    }
+
+
     // Compute the Union boundary of numbers of geometries
     //
     @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
diff --git a/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java 
b/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java
index 1e8e3fe3..9958a33e 100644
--- a/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java
@@ -55,6 +55,19 @@ public class AggregatorTest extends TestBase{
         assertEquals(5.656854249492381, last(resultTable).getField(0));
     }
 
+    @Test
+    public void testIntersection_Aggr(){
+        Table polygonTable = createPolygonOverlappingTable(testDataSize);
+        Table result = polygonTable.select(call("ST_Intersection_Aggr", 
$(polygonColNames[0])));
+        Row last = last(result);
+        assertEquals("LINESTRING EMPTY", last.getField(0).toString());
+
+        polygonTable = createPolygonOverlappingTable(3);
+        result = polygonTable.select(call("ST_Intersection_Aggr", 
$(polygonColNames[0])));
+        last = last(result);
+        assertEquals("LINESTRING (1 1, 1 0)", last.getField(0).toString());
+    }
+
     @Test
     public void testUnion_Aggr(){
         Table polygonTable = createPolygonOverlappingTable(testDataSize);

Reply via email to