This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch release-1.4.0
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/release-1.4.0 by this push:
     new 69dae960 Add example for Sedona Flink
69dae960 is described below

commit 69dae9600c89092c090cd37e513168df64a0a0ea
Author: Jia Yu <[email protected]>
AuthorDate: Sun Mar 19 18:51:46 2023 -0700

    Add example for Sedona Flink
---
 examples/{sql => flink-sql}/.gitignore             |   0
 examples/flink-sql/pom.xml                         | 208 +++++++++++++++++++++
 examples/flink-sql/src/main/java/FlinkExample.java |  80 ++++++++
 examples/flink-sql/src/main/java/Utils.java        | 117 ++++++++++++
 .../src/test/resources/.gitignore                  |   0
 .../src/test/resources/raster/test1.tiff           | Bin
 .../src/test/resources/raster/test2.tiff           | Bin
 .../src/test/resources/scalastyle_config.xml       |   0
 .../src/test/resources/shapefiles/dbf/map.dbf      | Bin
 .../src/test/resources/shapefiles/dbf/map.shp      | Bin
 .../src/test/resources/shapefiles/dbf/map.shx      | Bin
 .../src/test/resources/testenvelope.csv            |   0
 .../src/test/resources/testpoint.csv               |   0
 13 files changed, 405 insertions(+)

diff --git a/examples/sql/.gitignore b/examples/flink-sql/.gitignore
similarity index 100%
rename from examples/sql/.gitignore
rename to examples/flink-sql/.gitignore
diff --git a/examples/flink-sql/pom.xml b/examples/flink-sql/pom.xml
new file mode 100644
index 00000000..a177a447
--- /dev/null
+++ b/examples/flink-sql/pom.xml
@@ -0,0 +1,208 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.sedona</groupId>
+    <artifactId>sedona-flink-example</artifactId>
+    <version>1.0.0</version>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <geotools.version>1.4.0-28.2</geotools.version>
+        <geotools.scope>compile</geotools.scope>
+        <scala.compat.version>2.12</scala.compat.version>
+        <sedona.version>1.4.0</sedona.version>
+        <flink.version>1.14.3</flink.version>
+        <flink.scope>compile</flink.scope>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.sedona</groupId>
+            
<artifactId>sedona-flink-shaded_${scala.compat.version}</artifactId>
+            <version>${sedona.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.datasyslab</groupId>
+            <artifactId>geotools-wrapper</artifactId>
+            <version>${geotools.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sedona</groupId>
+            <artifactId>sedona-flink_${scala.compat.version}</artifactId>
+            <version>${sedona.version}</version>
+            <scope>${sedona.scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>${flink.scope}</scope>
+        </dependency>
+        <!--        For Flink DataStream API-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-streaming-java_${scala.compat.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${flink.scope}</scope>
+        </dependency>
+        <!--        Flink Kafka connector-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-connector-kafka_${scala.compat.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${flink.scope}</scope>
+        </dependency>
+        <!--        For playing flink in IDE-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_${scala.compat.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${flink.scope}</scope>
+        </dependency>
+        <!--        For Flink flink api, planner, udf/udt, csv-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-java-bridge_${scala.compat.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${flink.scope}</scope>
+        </dependency>
+        <!--        Starting Flink 14, Blink planner has been renamed to the 
official Flink planner-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.compat.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${flink.scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>${flink.scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-csv</artifactId>
+            <version>${flink.version}</version>
+            <scope>${flink.scope}</scope>
+        </dependency>
+        <!--        For Flink Web Ui in test-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime-web_${scala.compat.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <repositories>
+        <repository>
+            <id>maven2-repository.dev.java.net</id>
+            <name>Java.net repository</name>
+            <url>https://download.java.net/maven/2</url>
+        </repository>
+        <repository>
+            <id>osgeo</id>
+            <name>OSGeo Release Repository</name>
+            <url>https://repo.osgeo.org/repository/release/</url>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+        </repository>
+    </repositories>
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.1</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <!--  use transformer to handle merge of 
META-INF/services - see 
http://java.net/jira/browse/JERSEY-440?focusedCommentId=14822&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_14822
 -->
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    <resource>reference.conf</resource>
+                                </transformer>
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <manifestEntries>
+                                        <Specification-Title>Java Advanced 
Imaging Image I/O Tools</Specification-Title>
+                                        
<Specification-Version>1.1</Specification-Version>
+                                        <Specification-Vendor>Sun 
Microsystems, Inc.</Specification-Vendor>
+                                        
<Implementation-Title>com.sun.media.imageio</Implementation-Title>
+                                        
<Implementation-Version>1.1</Implementation-Version>
+                                        <Implementation-Vendor>Sun 
Microsystems, Inc.</Implementation-Vendor>
+                                        
<Extension-Name>com.sun.media.imageio</Extension-Name>
+                                    </manifestEntries>
+                                </transformer>
+                            </transformers>
+                            <filters>
+                                <!--  filter to address "Invalid signature 
file" issue - see http://stackoverflow.com/a/6743609/589215-->
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jacoco</groupId>
+                <artifactId>jacoco-maven-plugin</artifactId>
+                <version>0.8.7</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>prepare-agent</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>report</id>
+                        <phase>test</phase>
+                        <goals>
+                            <goal>report</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <resources>
+            <resource>
+                <directory>src/test/resources/</directory>
+            </resource>
+        </resources>
+    </build>
+</project>
+  
diff --git a/examples/flink-sql/src/main/java/FlinkExample.java 
b/examples/flink-sql/src/main/java/FlinkExample.java
new file mode 100644
index 00000000..d39c34b7
--- /dev/null
+++ b/examples/flink-sql/src/main/java/FlinkExample.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.sedona.flink.SedonaFlinkRegistrator;
+import org.apache.sedona.flink.expressions.Constructors;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+
+public class FlinkExample
+{
+    static String[] pointColNames = {"geom_point", "name_point", "event_time", 
"proc_time"};
+
+    static String[] polygonColNames = {"geom_polygon", "name_polygon", 
"event_time", "proc_time"};
+
+    public static void main(String[] args) {
+        int testDataSize = 10;
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        SedonaFlinkRegistrator.registerType(env);
+        SedonaFlinkRegistrator.registerFunc(tableEnv);
+
+        // Create a fake WKT string table source
+        Table pointWktTable = Utils.createTextTable(env, tableEnv, 
Utils.createPointWKT(testDataSize), pointColNames);
+        // Create a geometry column
+        Table pointTable = 
pointWktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
+                        $(pointColNames[0])).as(pointColNames[0]),
+                $(pointColNames[1]));
+        // Create S2CellID
+        pointTable = pointTable.select($(pointColNames[0]), 
$(pointColNames[1]),
+                call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array"));
+        // Explode s2id array
+        tableEnv.createTemporaryView("pointTable", pointTable);
+        pointTable = tableEnv.sqlQuery("SELECT geom_point, name_point, 
s2id_point FROM pointTable CROSS JOIN UNNEST(pointTable.s2id_array) AS 
tmpTbl1(s2id_point)");
+        pointTable.execute().print();
+
+
+        // Create a fake WKT string table source
+        Table polygonWktTable = Utils.createTextTable(env, tableEnv, 
Utils.createPolygonWKT(testDataSize), polygonColNames);
+        // Create a geometry column
+        Table polygonTable = 
polygonWktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
+                        $(polygonColNames[0])).as(polygonColNames[0]),
+                $(polygonColNames[1]));
+        // Create S2CellID
+        polygonTable = polygonTable.select($(polygonColNames[0]), 
$(polygonColNames[1]),
+                call("ST_S2CellIDs", $(polygonColNames[0]), 
6).as("s2id_array"));
+        // Explode s2id array
+        tableEnv.createTemporaryView("polygonTable", polygonTable);
+        polygonTable = tableEnv.sqlQuery("SELECT geom_polygon, name_polygon, 
s2id_polygon FROM polygonTable CROSS JOIN UNNEST(polygonTable.s2id_array) AS 
tmpTbl2(s2id_polygon)");
+        polygonTable.execute().print();
+
+        // Join two tables by their S2 ids
+        Table joinResult = 
pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon")));
+        // Optional: remove false positives
+        joinResult = joinResult.where("ST_Contains(geom_polygon, geom_point)");
+        joinResult.execute().print();
+    }
+
+}
diff --git a/examples/flink-sql/src/main/java/Utils.java 
b/examples/flink-sql/src/main/java/Utils.java
new file mode 100644
index 00000000..0a95ab1b
--- /dev/null
+++ b/examples/flink-sql/src/main/java/Utils.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+public class Utils
+{
+    static Long timestamp_base = new 
Timestamp(System.currentTimeMillis()).getTime();
+    static Long time_interval = 1L; // Generate a record per this interval. 
Unit is second
+
+    static List<Row> createPointText(int size){
+        List<Row> data = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            // Create a number of points (1, 1) (2, 2) ...
+            data.add(Row.of(i + "," + i, "point" + i, timestamp_base + 
time_interval * 1000 * i));
+        }
+        return data;
+    }
+
+    static List<Row> createPolygonText(int size) {
+        List<Row> data = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            // Create polygons each of which only has 1 match in points
+            // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+            String minX = String.valueOf(i - 0.5);
+            String minY = String.valueOf(i - 0.5);
+            String maxX = String.valueOf(i + 0.5);
+            String maxY = String.valueOf(i + 0.5);
+            List<String> polygon = new ArrayList<>();
+            polygon.add(minX);polygon.add(minY);
+            polygon.add(minX);polygon.add(maxY);
+            polygon.add(maxX);polygon.add(maxY);
+            polygon.add(maxX);polygon.add(minY);
+            polygon.add(minX);polygon.add(minY);
+            data.add(Row.of(String.join(",", polygon), "polygon" + i, 
timestamp_base + time_interval * 1000 * i));
+        }
+        return data;
+    }
+
+    static List<Row> createPointWKT(int size){
+        List<Row> data = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            // Create a number of points (1, 1) (2, 2) ...
+            data.add(Row.of("POINT (" + i + " " + i +")", "point" + i, 
timestamp_base + time_interval * 1000 * i));
+        }
+        return data;
+    }
+
+    static List<Row> createPolygonWKT(int size) {
+        List<Row> data = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            // Create polygons each of which only has 1 match in points
+            // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+            String minX = String.valueOf(i - 0.5);
+            String minY = String.valueOf(i - 0.5);
+            String maxX = String.valueOf(i + 0.5);
+            String maxY = String.valueOf(i + 0.5);
+            List<String> polygon = new ArrayList<>();
+            polygon.add(minX + " " + minY);
+            polygon.add(minX + " " + maxY);
+            polygon.add(maxX + " " + maxY);
+            polygon.add(maxX + " " + minY);
+            polygon.add(minX + " " + minY);
+            data.add(Row.of("POLYGON ((" + String.join(", ", polygon) + "))", 
"polygon" + i, timestamp_base + time_interval * 1000 * i));
+        }
+        return data;
+    }
+
+    static Table createTextTable(StreamExecutionEnvironment env, 
StreamTableEnvironment tableEnv, List<Row> data, String[] colNames){
+        TypeInformation<?>[] colTypes = {
+                BasicTypeInfo.STRING_TYPE_INFO,
+                BasicTypeInfo.STRING_TYPE_INFO,
+                BasicTypeInfo.LONG_TYPE_INFO
+        };
+        RowTypeInfo typeInfo = new RowTypeInfo(colTypes, 
Arrays.copyOfRange(colNames, 0, 3));
+        DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
+        // Generate Time Attribute
+        WatermarkStrategy<Row> wmStrategy =
+                WatermarkStrategy
+                        .<Row>forMonotonousTimestamps()
+                        .withTimestampAssigner((event, timestamp) -> 
event.getFieldAs(2));
+        return 
tableEnv.fromDataStream(ds.assignTimestampsAndWatermarks(wmStrategy), 
$(colNames[0]), $(colNames[1]), $(colNames[2]).rowtime(), 
$(colNames[3]).proctime());
+    }
+
+
+}
diff --git a/examples/sql/src/test/resources/.gitignore 
b/examples/flink-sql/src/test/resources/.gitignore
similarity index 100%
rename from examples/sql/src/test/resources/.gitignore
rename to examples/flink-sql/src/test/resources/.gitignore
diff --git a/examples/sql/src/test/resources/raster/test1.tiff 
b/examples/flink-sql/src/test/resources/raster/test1.tiff
similarity index 100%
rename from examples/sql/src/test/resources/raster/test1.tiff
rename to examples/flink-sql/src/test/resources/raster/test1.tiff
diff --git a/examples/sql/src/test/resources/raster/test2.tiff 
b/examples/flink-sql/src/test/resources/raster/test2.tiff
similarity index 100%
rename from examples/sql/src/test/resources/raster/test2.tiff
rename to examples/flink-sql/src/test/resources/raster/test2.tiff
diff --git a/examples/sql/src/test/resources/scalastyle_config.xml 
b/examples/flink-sql/src/test/resources/scalastyle_config.xml
similarity index 100%
rename from examples/sql/src/test/resources/scalastyle_config.xml
rename to examples/flink-sql/src/test/resources/scalastyle_config.xml
diff --git a/examples/sql/src/test/resources/shapefiles/dbf/map.dbf 
b/examples/flink-sql/src/test/resources/shapefiles/dbf/map.dbf
similarity index 100%
rename from examples/sql/src/test/resources/shapefiles/dbf/map.dbf
rename to examples/flink-sql/src/test/resources/shapefiles/dbf/map.dbf
diff --git a/examples/sql/src/test/resources/shapefiles/dbf/map.shp 
b/examples/flink-sql/src/test/resources/shapefiles/dbf/map.shp
similarity index 100%
rename from examples/sql/src/test/resources/shapefiles/dbf/map.shp
rename to examples/flink-sql/src/test/resources/shapefiles/dbf/map.shp
diff --git a/examples/sql/src/test/resources/shapefiles/dbf/map.shx 
b/examples/flink-sql/src/test/resources/shapefiles/dbf/map.shx
similarity index 100%
rename from examples/sql/src/test/resources/shapefiles/dbf/map.shx
rename to examples/flink-sql/src/test/resources/shapefiles/dbf/map.shx
diff --git a/examples/sql/src/test/resources/testenvelope.csv 
b/examples/flink-sql/src/test/resources/testenvelope.csv
similarity index 100%
rename from examples/sql/src/test/resources/testenvelope.csv
rename to examples/flink-sql/src/test/resources/testenvelope.csv
diff --git a/examples/sql/src/test/resources/testpoint.csv 
b/examples/flink-sql/src/test/resources/testpoint.csv
similarity index 100%
rename from examples/sql/src/test/resources/testpoint.csv
rename to examples/flink-sql/src/test/resources/testpoint.csv

Reply via email to