This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch release-1.4.0
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/release-1.4.0 by this push:
new 69dae960 Add example for Sedona Flink
69dae960 is described below
commit 69dae9600c89092c090cd37e513168df64a0a0ea
Author: Jia Yu <[email protected]>
AuthorDate: Sun Mar 19 18:51:46 2023 -0700
Add example for Sedona Flink
---
examples/{sql => flink-sql}/.gitignore | 0
examples/flink-sql/pom.xml | 208 +++++++++++++++++++++
examples/flink-sql/src/main/java/FlinkExample.java | 80 ++++++++
examples/flink-sql/src/main/java/Utils.java | 117 ++++++++++++
.../src/test/resources/.gitignore | 0
.../src/test/resources/raster/test1.tiff | Bin
.../src/test/resources/raster/test2.tiff | Bin
.../src/test/resources/scalastyle_config.xml | 0
.../src/test/resources/shapefiles/dbf/map.dbf | Bin
.../src/test/resources/shapefiles/dbf/map.shp | Bin
.../src/test/resources/shapefiles/dbf/map.shx | Bin
.../src/test/resources/testenvelope.csv | 0
.../src/test/resources/testpoint.csv | 0
13 files changed, 405 insertions(+)
diff --git a/examples/sql/.gitignore b/examples/flink-sql/.gitignore
similarity index 100%
rename from examples/sql/.gitignore
rename to examples/flink-sql/.gitignore
diff --git a/examples/flink-sql/pom.xml b/examples/flink-sql/pom.xml
new file mode 100644
index 00000000..a177a447
--- /dev/null
+++ b/examples/flink-sql/pom.xml
@@ -0,0 +1,208 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.sedona</groupId>
+ <artifactId>sedona-flink-example</artifactId>
+ <version>1.0.0</version>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <geotools.version>1.4.0-28.2</geotools.version>
+ <geotools.scope>compile</geotools.scope>
+ <scala.compat.version>2.12</scala.compat.version>
+ <sedona.version>1.4.0</sedona.version>
+ <flink.version>1.14.3</flink.version>
+ <flink.scope>compile</flink.scope>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.sedona</groupId>
+
<artifactId>sedona-flink-shaded_${scala.compat.version}</artifactId>
+ <version>${sedona.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.datasyslab</groupId>
+ <artifactId>geotools-wrapper</artifactId>
+ <version>${geotools.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sedona</groupId>
+ <artifactId>sedona-flink_${scala.compat.version}</artifactId>
+ <version>${sedona.version}</version>
+ <scope>${sedona.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>${flink.scope}</scope>
+ </dependency>
+ <!-- For Flink DataStream API-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.compat.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${flink.scope}</scope>
+ </dependency>
+ <!-- Flink Kafka connector-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-kafka_${scala.compat.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${flink.scope}</scope>
+ </dependency>
+ <!-- For playing flink in IDE-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.compat.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${flink.scope}</scope>
+ </dependency>
+ <!-- For Flink flink api, planner, udf/udt, csv-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-java-bridge_${scala.compat.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${flink.scope}</scope>
+ </dependency>
+ <!-- Starting Flink 14, Blink planner has been renamed to the
official Flink planner-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.compat.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${flink.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>${flink.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-csv</artifactId>
+ <version>${flink.version}</version>
+ <scope>${flink.scope}</scope>
+ </dependency>
+ <!-- For Flink Web Ui in test-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime-web_${scala.compat.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <repositories>
+ <repository>
+ <id>maven2-repository.dev.java.net</id>
+ <name>Java.net repository</name>
+ <url>https://download.java.net/maven/2</url>
+ </repository>
+ <repository>
+ <id>osgeo</id>
+ <name>OSGeo Release Repository</name>
+ <url>https://repo.osgeo.org/repository/release/</url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ </repository>
+ </repositories>
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <!-- use transformer to handle merge of
META-INF/services - see
http://java.net/jira/browse/JERSEY-440?focusedCommentId=14822&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_14822
-->
+ <transformer
+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>reference.conf</resource>
+ </transformer>
+ <transformer
+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <manifestEntries>
+ <Specification-Title>Java Advanced
Imaging Image I/O Tools</Specification-Title>
+
<Specification-Version>1.1</Specification-Version>
+ <Specification-Vendor>Sun
Microsystems, Inc.</Specification-Vendor>
+
<Implementation-Title>com.sun.media.imageio</Implementation-Title>
+
<Implementation-Version>1.1</Implementation-Version>
+ <Implementation-Vendor>Sun
Microsystems, Inc.</Implementation-Vendor>
+
<Extension-Name>com.sun.media.imageio</Extension-Name>
+ </manifestEntries>
+ </transformer>
+ </transformers>
+ <filters>
+ <!-- filter to address "Invalid signature
file" issue - see http://stackoverflow.com/a/6743609/589215-->
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <version>0.8.7</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>prepare-agent</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>report</id>
+ <phase>test</phase>
+ <goals>
+ <goal>report</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>src/test/resources/</directory>
+ </resource>
+ </resources>
+ </build>
+</project>
+
diff --git a/examples/flink-sql/src/main/java/FlinkExample.java
b/examples/flink-sql/src/main/java/FlinkExample.java
new file mode 100644
index 00000000..d39c34b7
--- /dev/null
+++ b/examples/flink-sql/src/main/java/FlinkExample.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.sedona.flink.SedonaFlinkRegistrator;
+import org.apache.sedona.flink.expressions.Constructors;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+
+public class FlinkExample
+{
+ static String[] pointColNames = {"geom_point", "name_point", "event_time",
"proc_time"};
+
+ static String[] polygonColNames = {"geom_polygon", "name_polygon",
"event_time", "proc_time"};
+
+ public static void main(String[] args) {
+ int testDataSize = 10;
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ SedonaFlinkRegistrator.registerType(env);
+ SedonaFlinkRegistrator.registerFunc(tableEnv);
+
+ // Create a fake WKT string table source
+ Table pointWktTable = Utils.createTextTable(env, tableEnv,
Utils.createPointWKT(testDataSize), pointColNames);
+ // Create a geometry column
+ Table pointTable =
pointWktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
+ $(pointColNames[0])).as(pointColNames[0]),
+ $(pointColNames[1]));
+ // Create S2CellID
+ pointTable = pointTable.select($(pointColNames[0]),
$(pointColNames[1]),
+ call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array"));
+ // Explode s2id array
+ tableEnv.createTemporaryView("pointTable", pointTable);
+ pointTable = tableEnv.sqlQuery("SELECT geom_point, name_point,
s2id_point FROM pointTable CROSS JOIN UNNEST(pointTable.s2id_array) AS
tmpTbl1(s2id_point)");
+ pointTable.execute().print();
+
+
+ // Create a fake WKT string table source
+ Table polygonWktTable = Utils.createTextTable(env, tableEnv,
Utils.createPolygonWKT(testDataSize), polygonColNames);
+ // Create a geometry column
+ Table polygonTable =
polygonWktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
+ $(polygonColNames[0])).as(polygonColNames[0]),
+ $(polygonColNames[1]));
+ // Create S2CellID
+ polygonTable = polygonTable.select($(polygonColNames[0]),
$(polygonColNames[1]),
+ call("ST_S2CellIDs", $(polygonColNames[0]),
6).as("s2id_array"));
+ // Explode s2id array
+ tableEnv.createTemporaryView("polygonTable", polygonTable);
+ polygonTable = tableEnv.sqlQuery("SELECT geom_polygon, name_polygon,
s2id_polygon FROM polygonTable CROSS JOIN UNNEST(polygonTable.s2id_array) AS
tmpTbl2(s2id_polygon)");
+ polygonTable.execute().print();
+
+ // Join two tables by their S2 ids
+ Table joinResult =
pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon")));
+ // Optional: remove false positives
+ joinResult = joinResult.where("ST_Contains(geom_polygon, geom_point)");
+ joinResult.execute().print();
+ }
+
+}
diff --git a/examples/flink-sql/src/main/java/Utils.java
b/examples/flink-sql/src/main/java/Utils.java
new file mode 100644
index 00000000..0a95ab1b
--- /dev/null
+++ b/examples/flink-sql/src/main/java/Utils.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+public class Utils
+{
+ static Long timestamp_base = new
Timestamp(System.currentTimeMillis()).getTime();
+ static Long time_interval = 1L; // Generate a record per this interval.
Unit is second
+
+ static List<Row> createPointText(int size){
+ List<Row> data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ // Create a number of points (1, 1) (2, 2) ...
+ data.add(Row.of(i + "," + i, "point" + i, timestamp_base +
time_interval * 1000 * i));
+ }
+ return data;
+ }
+
+ static List<Row> createPolygonText(int size) {
+ List<Row> data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ // Create polygons each of which only has 1 match in points
+ // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+ String minX = String.valueOf(i - 0.5);
+ String minY = String.valueOf(i - 0.5);
+ String maxX = String.valueOf(i + 0.5);
+ String maxY = String.valueOf(i + 0.5);
+ List<String> polygon = new ArrayList<>();
+ polygon.add(minX);polygon.add(minY);
+ polygon.add(minX);polygon.add(maxY);
+ polygon.add(maxX);polygon.add(maxY);
+ polygon.add(maxX);polygon.add(minY);
+ polygon.add(minX);polygon.add(minY);
+ data.add(Row.of(String.join(",", polygon), "polygon" + i,
timestamp_base + time_interval * 1000 * i));
+ }
+ return data;
+ }
+
+ static List<Row> createPointWKT(int size){
+ List<Row> data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ // Create a number of points (1, 1) (2, 2) ...
+ data.add(Row.of("POINT (" + i + " " + i +")", "point" + i,
timestamp_base + time_interval * 1000 * i));
+ }
+ return data;
+ }
+
+ static List<Row> createPolygonWKT(int size) {
+ List<Row> data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ // Create polygons each of which only has 1 match in points
+ // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+ String minX = String.valueOf(i - 0.5);
+ String minY = String.valueOf(i - 0.5);
+ String maxX = String.valueOf(i + 0.5);
+ String maxY = String.valueOf(i + 0.5);
+ List<String> polygon = new ArrayList<>();
+ polygon.add(minX + " " + minY);
+ polygon.add(minX + " " + maxY);
+ polygon.add(maxX + " " + maxY);
+ polygon.add(maxX + " " + minY);
+ polygon.add(minX + " " + minY);
+ data.add(Row.of("POLYGON ((" + String.join(", ", polygon) + "))",
"polygon" + i, timestamp_base + time_interval * 1000 * i));
+ }
+ return data;
+ }
+
+ static Table createTextTable(StreamExecutionEnvironment env,
StreamTableEnvironment tableEnv, List<Row> data, String[] colNames){
+ TypeInformation<?>[] colTypes = {
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO
+ };
+ RowTypeInfo typeInfo = new RowTypeInfo(colTypes,
Arrays.copyOfRange(colNames, 0, 3));
+ DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
+ // Generate Time Attribute
+ WatermarkStrategy<Row> wmStrategy =
+ WatermarkStrategy
+ .<Row>forMonotonousTimestamps()
+ .withTimestampAssigner((event, timestamp) ->
event.getFieldAs(2));
+ return
tableEnv.fromDataStream(ds.assignTimestampsAndWatermarks(wmStrategy),
$(colNames[0]), $(colNames[1]), $(colNames[2]).rowtime(),
$(colNames[3]).proctime());
+ }
+
+
+}
diff --git a/examples/sql/src/test/resources/.gitignore
b/examples/flink-sql/src/test/resources/.gitignore
similarity index 100%
rename from examples/sql/src/test/resources/.gitignore
rename to examples/flink-sql/src/test/resources/.gitignore
diff --git a/examples/sql/src/test/resources/raster/test1.tiff
b/examples/flink-sql/src/test/resources/raster/test1.tiff
similarity index 100%
rename from examples/sql/src/test/resources/raster/test1.tiff
rename to examples/flink-sql/src/test/resources/raster/test1.tiff
diff --git a/examples/sql/src/test/resources/raster/test2.tiff
b/examples/flink-sql/src/test/resources/raster/test2.tiff
similarity index 100%
rename from examples/sql/src/test/resources/raster/test2.tiff
rename to examples/flink-sql/src/test/resources/raster/test2.tiff
diff --git a/examples/sql/src/test/resources/scalastyle_config.xml
b/examples/flink-sql/src/test/resources/scalastyle_config.xml
similarity index 100%
rename from examples/sql/src/test/resources/scalastyle_config.xml
rename to examples/flink-sql/src/test/resources/scalastyle_config.xml
diff --git a/examples/sql/src/test/resources/shapefiles/dbf/map.dbf
b/examples/flink-sql/src/test/resources/shapefiles/dbf/map.dbf
similarity index 100%
rename from examples/sql/src/test/resources/shapefiles/dbf/map.dbf
rename to examples/flink-sql/src/test/resources/shapefiles/dbf/map.dbf
diff --git a/examples/sql/src/test/resources/shapefiles/dbf/map.shp
b/examples/flink-sql/src/test/resources/shapefiles/dbf/map.shp
similarity index 100%
rename from examples/sql/src/test/resources/shapefiles/dbf/map.shp
rename to examples/flink-sql/src/test/resources/shapefiles/dbf/map.shp
diff --git a/examples/sql/src/test/resources/shapefiles/dbf/map.shx
b/examples/flink-sql/src/test/resources/shapefiles/dbf/map.shx
similarity index 100%
rename from examples/sql/src/test/resources/shapefiles/dbf/map.shx
rename to examples/flink-sql/src/test/resources/shapefiles/dbf/map.shx
diff --git a/examples/sql/src/test/resources/testenvelope.csv
b/examples/flink-sql/src/test/resources/testenvelope.csv
similarity index 100%
rename from examples/sql/src/test/resources/testenvelope.csv
rename to examples/flink-sql/src/test/resources/testenvelope.csv
diff --git a/examples/sql/src/test/resources/testpoint.csv
b/examples/flink-sql/src/test/resources/testpoint.csv
similarity index 100%
rename from examples/sql/src/test/resources/testpoint.csv
rename to examples/flink-sql/src/test/resources/testpoint.csv