This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch flink-docs
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git
The following commit(s) were added to refs/heads/flink-docs by this push:
new c222a36 Add Flink tutorial
c222a36 is described below
commit c222a3671dd023065c2ceeaf2aa61351d6735a94
Author: Jia Yu <[email protected]>
AuthorDate: Sun Mar 6 01:25:38 2022 -0800
Add Flink tutorial
---
docs/api/flink/Overview.md | 2 +-
docs/setup/flink/install-scala.md | 2 +-
docs/tutorial/flink/sql.md | 442 +++++++++++++++++++++++---------------
3 files changed, 273 insertions(+), 173 deletions(-)
diff --git a/docs/api/flink/Overview.md b/docs/api/flink/Overview.md
index 56fb41d..07b5fcc 100644
--- a/docs/api/flink/Overview.md
+++ b/docs/api/flink/Overview.md
@@ -1,6 +1,6 @@
# Introduction
-SedonaSQL supports SQL/MM Part3 Spatial SQL Standard. Please read the
programming guide: [Sedona with Flink application](../../tutorial/flink/sql.md).
+SedonaSQL supports SQL/MM Part3 Spatial SQL Standard. Please read the
programming guide: [Sedona with Flink SQL app](../../tutorial/flink/sql.md).
Sedona includes SQL operators as follows.
diff --git a/docs/setup/flink/install-scala.md
b/docs/setup/flink/install-scala.md
index 8e7af1c..cc4f4a9 100644
--- a/docs/setup/flink/install-scala.md
+++ b/docs/setup/flink/install-scala.md
@@ -5,7 +5,7 @@ Then you can create a self-contained Scala / Java project. A
self-contained proj
To use Sedona in your self-contained Flink project, you just need to add
Sedona as a dependency in your POM.xml or build.sbt.
1. To add Sedona as dependencies, please read [Sedona Maven Central
coordinates](maven-coordinates.md)
-2. Use Sedona Template project to start: [Sedona Template
Project](/tutorial/demo/)
+2. Read [Sedona Flink guide](/tutorial/flink/sql) and use Sedona Template
project to start: [Sedona Template Project](/tutorial/demo/)
3. Compile your project using Maven. Make sure you obtain the fat jar which
packages all dependencies.
4. Submit your compiled fat jar to Flink cluster. Make sure you are in the
root folder of Flink distribution. Then run the following command:
```
diff --git a/docs/tutorial/flink/sql.md b/docs/tutorial/flink/sql.md
index 2765b9f..bef96c8 100644
--- a/docs/tutorial/flink/sql.md
+++ b/docs/tutorial/flink/sql.md
@@ -1,141 +1,111 @@
-The page outlines the steps to manage spatial data using SedonaSQL. ==The
example code is written in Scala but also works for Java==.
+The page outlines the steps to manage spatial data using SedonaSQL. ==The
example code is written in Java but also works for Scala==.
SedonaSQL supports SQL/MM Part3 Spatial SQL Standard. It includes four kinds
of SQL operators as follows. All these operators can be directly called through:
-```Scala
-var myDataFrame = sparkSession.sql("YOUR_SQL")
+```Java
+Table myTable = tableEnv.sqlQuery("YOUR_SQL")
```
-Detailed SedonaSQL APIs are available here: [SedonaSQL
API](../api/sql/Overview.md)
+Detailed SedonaSQL APIs are available here: [SedonaSQL
API](/api/flink/Overview)
## Set up dependencies
-1. Read [Sedona Maven Central coordinates](../setup/maven-coordinates.md)
-2. Select ==the minimum dependencies==: Add [Apache Spark
core](https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11),
[Apache
SparkSQL](https://mvnrepository.com/artifact/org.apache.spark/spark-sql),
Sedona-core and Sedona-SQL
-3. Add the dependencies in build.sbt or pom.xml.
+1. Read [Sedona Maven Central coordinates](/setup/maven-coordinates)
+2. Add Sedona dependencies in build.sbt or pom.xml.
+3. Add [Flink
dependencies](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/overview/)
in build.sbt or pom.xml.
-!!!note
- To enjoy the full functions of Sedona, we suggest you include ==the
full dependencies==: [Apache Spark
core](https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11),
[Apache
SparkSQL](https://mvnrepository.com/artifact/org.apache.spark/spark-sql),
Sedona-core, Sedona-SQL, Sedona-Viz. Please see [SQL example
project](/tutorial/demo/)
-
-
-## Initiate SparkSession
-Use the following code to initiate your SparkSession at the beginning:
-```Scala
-var sparkSession = SparkSession.builder()
-.master("local[*]") // Delete this if run in cluster mode
-.appName("readTestScala") // Change this to a proper name
-// Enable Sedona custom Kryo serializer
-.config("spark.serializer", classOf[KryoSerializer].getName) //
org.apache.spark.serializer.KryoSerializer
-.config("spark.kryo.registrator", classOf[SedonaKryoRegistrator].getName)
-.getOrCreate() // org.apache.sedona.core.serde.SedonaKryoRegistrator
-```
-
-!!!warning
- Sedona has a suite of well-written geometry and index serializers.
Forgetting to enable these serializers will lead to high memory consumption.
-
-If you add ==the Sedona full dependencies== as suggested above, please use the
following two lines to enable Sedona Kryo serializer instead:
-```Scala
-.config("spark.serializer", classOf[KryoSerializer].getName) //
org.apache.spark.serializer.KryoSerializer
-.config("spark.kryo.registrator", classOf[SedonaVizKryoRegistrator].getName)
// org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
+## Initiate Stream Environment
+Use the following code to initiate your `StreamExecutionEnvironment` at the
beginning:
+```Java
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment()
+EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
```
## Register SedonaSQL
-Add the following line after your SparkSession declaration
+Add the following line after your `StreamExecutionEnvironment` and
`StreamTableEnvironment` declaration
-```Scala
-SedonaSQLRegistrator.registerAll(sparkSession)
+```Java
+SedonaFlinkRegistrator.registerType(env);
+SedonaFlinkRegistrator.registerFunc(tableEnv);
```
-This function will register Sedona User Defined Type, User Defined Function
and optimized join query strategy.
-
-You can also register everything by passing `--conf
spark.sql.extensions=org.apache.sedona.sql.SedonaSqlExtensions` to
`spark-submit` or `spark-shell`.
-
-## Load data from files
-
-Assume we have a WKT file, namely `usa-county.tsv`, at Path
`/Download/usa-county.tsv` as follows:
+!!!warning
+ Sedona has a suite of well-written geometry and index serializers.
Forgetting to enable these serializers will lead to high memory consumption.
-```
-POLYGON (..., ...) Cuming County
-POLYGON (..., ...) Wahkiakum County
-POLYGON (..., ...) De Baca County
-POLYGON (..., ...) Lancaster County
-```
-The file may have many other columns.
+This function will register Sedona User Defined Type and User Defined Function
-Use the following code to load the data and create a raw DataFrame:
+## Create a Geometry type column
-```Scala
-var rawDf = sparkSession.read.format("csv").option("delimiter",
"\t").option("header", "false").load("/Download/usa-county.tsv")
-rawDf.createOrReplaceTempView("rawdf")
-rawDf.show()
-```
+All geometrical operations in SedonaSQL are on Geometry type objects.
Therefore, before any kind of queries, you need to create a Geometry type
column on a DataFrame.
-The output will be like this:
+Assume you have a Flink Table `tbl` like this:
```
-| _c0|_c1|_c2| _c3| _c4| _c5|
_c6|_c7|_c8| _c9|_c10| _c11|_c12|_c13| _c14| _c15| _c16|
_c17|
-+--------------------+---+---+--------+-----+-----------+--------------------+---+---+-----+----+-----+----+----+----------+--------+-----------+------------+
-|POLYGON ((-97.019...| 31|039|00835841|31039| Cuming| Cuming County|
06| H1|G4020|null| null|null| A|1477895811|10447360|+41.9158651|-096.7885168|
-|POLYGON ((-123.43...| 53|069|01513275|53069| Wahkiakum| Wahkiakum County|
06| H1|G4020|null| null|null| A| 682138871|61658258|+46.2946377|-123.4244583|
-|POLYGON ((-104.56...| 35|011|00933054|35011| De Baca| De Baca County|
06| H1|G4020|null| null|null| A|6015539696|29159492|+34.3592729|-104.3686961|
-|POLYGON ((-96.910...| 31|109|00835876|31109| Lancaster| Lancaster County|
06| H1|G4020| 339|30700|null| A|2169240202|22877180|+40.7835474|-096.6886584|
++----+--------------------------------+--------------------------------+
+| op | geom_polygon | name_polygon |
++----+--------------------------------+--------------------------------+
+| +I | POLYGON ((-0.5 -0.5, -0.5 0... | polygon0 |
+| +I | POLYGON ((0.5 0.5, 0.5 1.5,... | polygon1 |
+| +I | POLYGON ((1.5 1.5, 1.5 2.5,... | polygon2 |
+| +I | POLYGON ((2.5 2.5, 2.5 3.5,... | polygon3 |
+| +I | POLYGON ((3.5 3.5, 3.5 4.5,... | polygon4 |
+| +I | POLYGON ((4.5 4.5, 4.5 5.5,... | polygon5 |
+| +I | POLYGON ((5.5 5.5, 5.5 6.5,... | polygon6 |
+| +I | POLYGON ((6.5 6.5, 6.5 7.5,... | polygon7 |
+| +I | POLYGON ((7.5 7.5, 7.5 8.5,... | polygon8 |
+| +I | POLYGON ((8.5 8.5, 8.5 9.5,... | polygon9 |
++----+--------------------------------+--------------------------------+
+10 rows in set
```
-## Create a Geometry type column
-
-All geometrical operations in SedonaSQL are on Geometry type objects.
Therefore, before any kind of queries, you need to create a Geometry type
column on a DataFrame.
-
+You can create a Table with a Geometry type column as follows:
-```Scala
-var spatialDf = sparkSession.sql(
- """
- |SELECT ST_GeomFromWKT(_c0) AS countyshape, _c1, _c2
- |FROM rawdf
- """.stripMargin)
-spatialDf.createOrReplaceTempView("spatialdf")
-spatialDf.show()
+```Java
+tableEnv.createTemporaryView("myTable", tbl)
+Table geomTbl = tableEnv.sql("SELECT ST_GeomFromWKT(geom_polygon) as
geom_polygon, name_polygon FROM myTable")
+geomTbl.execute().print()
```
-You can select many other attributes to compose this `spatialdDf`. The output
will be something like this:
+The output will be:
```
-| countyshape|_c1|_c2| _c3| _c4| _c5|
_c6|_c7|_c8| _c9|_c10| _c11|_c12|_c13| _c14| _c15| _c16|
_c17|
-+--------------------+---+---+--------+-----+-----------+--------------------+---+---+-----+----+-----+----+----+----------+--------+-----------+------------+
-|POLYGON ((-97.019...| 31|039|00835841|31039| Cuming| Cuming County|
06| H1|G4020|null| null|null| A|1477895811|10447360|+41.9158651|-096.7885168|
-|POLYGON ((-123.43...| 53|069|01513275|53069| Wahkiakum| Wahkiakum County|
06| H1|G4020|null| null|null| A| 682138871|61658258|+46.2946377|-123.4244583|
-|POLYGON ((-104.56...| 35|011|00933054|35011| De Baca| De Baca County|
06| H1|G4020|null| null|null| A|6015539696|29159492|+34.3592729|-104.3686961|
-|POLYGON ((-96.910...| 31|109|00835876|31109| Lancaster| Lancaster County|
06| H1|G4020| 339|30700|null| A|2169240202|22877180|+40.7835474|-096.6886584|
++----+--------------------------------+--------------------------------+
+| op | geom_polygon | name_polygon |
++----+--------------------------------+--------------------------------+
+| +I | POLYGON ((-0.5 -0.5, -0.5 0... | polygon0 |
+| +I | POLYGON ((0.5 0.5, 0.5 1.5,... | polygon1 |
+| +I | POLYGON ((1.5 1.5, 1.5 2.5,... | polygon2 |
+| +I | POLYGON ((2.5 2.5, 2.5 3.5,... | polygon3 |
+| +I | POLYGON ((3.5 3.5, 3.5 4.5,... | polygon4 |
+| +I | POLYGON ((4.5 4.5, 4.5 5.5,... | polygon5 |
+| +I | POLYGON ((5.5 5.5, 5.5 6.5,... | polygon6 |
+| +I | POLYGON ((6.5 6.5, 6.5 7.5,... | polygon7 |
+| +I | POLYGON ((7.5 7.5, 7.5 8.5,... | polygon8 |
+| +I | POLYGON ((8.5 8.5, 8.5 9.5,... | polygon9 |
++----+--------------------------------+--------------------------------+
+10 rows in set
```
-Although it looks same with the input, but actually the type of column
countyshape has been changed to ==Geometry== type.
+Although it looks same with the input, actually the type of column
geom_polygon has been changed to ==Geometry== type.
To verify this, use the following code to print the schema of the DataFrame:
-```Scala
-spatialDf.printSchema()
+```Java
+geomTbl.printSchema()
```
The output will be like this:
```
-root
- |-- countyshape: geometry (nullable = false)
- |-- _c1: string (nullable = true)
- |-- _c2: string (nullable = true)
- |-- _c3: string (nullable = true)
- |-- _c4: string (nullable = true)
- |-- _c5: string (nullable = true)
- |-- _c6: string (nullable = true)
- |-- _c7: string (nullable = true)
+(
+ `geom_polygon` RAW('org.locationtech.jts.geom.Geometry', '...'),
+ `name_polygon` STRING
+)
```
!!!note
- SedonaSQL provides lots of functions to create a Geometry column,
please read [SedonaSQL constructor API](../api/sql/Constructor.md).
-
-## Load Shapefile and GeoJSON
-
-Shapefile and GeoJSON must be loaded by SpatialRDD and converted to DataFrame
using Adapter. Please read [Load
SpatialRDD](../rdd/#create-a-generic-spatialrdd) and [DataFrame <->
RDD](#convert-between-dataframe-and-spatialrdd).
-
+ SedonaSQL provides lots of functions to create a Geometry column,
please read [SedonaSQL constructor API](/api/flink/Constructor).
## Transform the Coordinate Reference System
@@ -143,14 +113,9 @@ Sedona doesn't control the coordinate unit (degree-based
or meter-based) of all
To convert Coordinate Reference System of the Geometry column created before,
use the following code:
-```Scala
-spatialDf = sparkSession.sql(
- """
- |SELECT ST_Transform(countyshape, "epsg:4326", "epsg:3857") AS
newcountyshape, _c1, _c2, _c3, _c4, _c5, _c6, _c7
- |FROM spatialdf
- """.stripMargin)
-spatialDf.createOrReplaceTempView("spatialdf")
-spatialDf.show()
+```Java
+Table geomTbl3857 = tableEnv.sqlQuery("SELECT ST_Transform(countyshape,
"epsg:4326", "epsg:3857") AS geom_polygon, name_polygon FROM myTable")
+geomTbl3857.execute().print()
```
The first EPSG code EPSG:4326 in `ST_Transform` is the source CRS of the
geometries. It is WGS84, the most common degree-based CRS.
@@ -159,17 +124,46 @@ The second EPSG code EPSG:3857 in `ST_Transform` is the
target CRS of the geomet
This `ST_Transform` transform the CRS of these geomtries from EPSG:4326 to
EPSG:3857. The details CRS information can be found on
[EPSG.io](https://epsg.io/.)
-The coordinates of polygons have been changed. The output will be like this:
+!!!note
+ Read [SedonaSQL ST_Transform API](/api/flink/Function/#st_transform) to
learn different spatial query predicates.
+
+For example, a Table that has coordinates in the US will become like this.
+Before the transformation:
+```
++----+--------------------------------+--------------------------------+
+| op | geom_point | name_point |
++----+--------------------------------+--------------------------------+
+| +I | POINT (32 -118) | point |
+| +I | POINT (33 -117) | point |
+| +I | POINT (34 -116) | point |
+| +I | POINT (35 -115) | point |
+| +I | POINT (36 -114) | point |
+| +I | POINT (37 -113) | point |
+| +I | POINT (38 -112) | point |
+| +I | POINT (39 -111) | point |
+| +I | POINT (40 -110) | point |
+| +I | POINT (41 -109) | point |
++----+--------------------------------+--------------------------------+
```
-+--------------------+---+---+--------+-----+-----------+--------------------+---+
-| newcountyshape|_c1|_c2| _c3| _c4| _c5|
_c6|_c7|
-+--------------------+---+---+--------+-----+-----------+--------------------+---+
-|POLYGON ((-108001...| 31|039|00835841|31039| Cuming| Cuming County|
06|
-|POLYGON ((-137408...| 53|069|01513275|53069| Wahkiakum| Wahkiakum County|
06|
-|POLYGON ((-116403...| 35|011|00933054|35011| De Baca| De Baca County|
06|
-|POLYGON ((-107880...| 31|109|00835876|31109| Lancaster| Lancaster County|
06|
+After the transformation:
+
+```
++----+--------------------------------+--------------------------------+
+| op | _c0 | name_point |
++----+--------------------------------+--------------------------------+
+| +I | POINT (-13135699.91360628 3... | point |
+| +I | POINT (-13024380.422813008 ... | point |
+| +I | POINT (-12913060.932019735 ... | point |
+| +I | POINT (-12801741.44122646 4... | point |
+| +I | POINT (-12690421.950433187 ... | point |
+| +I | POINT (-12579102.459639912 ... | point |
+| +I | POINT (-12467782.96884664 4... | point |
+| +I | POINT (-12356463.478053367 ... | point |
+| +I | POINT (-12245143.987260092 ... | point |
+| +I | POINT (-12133824.496466817 ... | point |
++----+--------------------------------+--------------------------------+
```
@@ -179,105 +173,211 @@ After creating a Geometry type column, you are able to
run spatial queries.
### Range query
-Use ==ST_Contains==, ==ST_Intersects==, ==ST_Within== to run a range query
over a single column.
+Use ==ST_Contains==, ==ST_Intersects== and so on to run a range query over a
single column.
The following example finds all counties that are within the given polygon:
-```Scala
-spatialDf = sparkSession.sql(
- """
- |SELECT *
- |FROM spatialdf
- |WHERE ST_Contains (ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0),
newcountyshape)
- """.stripMargin)
-spatialDf.createOrReplaceTempView("spatialdf")
-spatialDf.show()
+```Java
+geomTable = tableEnv.sqlQuery(
+ "
+ SELECT *
+ FROM spatialdf
+ WHERE ST_Contains (ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0),
newcountyshape)
+ ")
+geomTable.execute().print()
```
!!!note
- Read [SedonaSQL constructor API](../api/sql/Constructor.md) to learn
how to create a Geometry type query window
+ Read [SedonaSQL Predicate API](/api/flink/Predicate) to learn different
spatial query predicates.
+
### KNN query
Use ==ST_Distance== to calculate the distance and rank the distance.
The following code returns the 5 nearest neighbor of the given polygon.
-```Scala
-spatialDf = sparkSession.sql(
- """
- |SELECT countyname,
ST_Distance(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape) AS
distance
- |FROM spatialdf
- |ORDER BY distance DESC
- |LIMIT 5
- """.stripMargin)
-spatialDf.createOrReplaceTempView("spatialdf")
-spatialDf.show()
+```Java
+geomTable = tableEnv.sqlQuery(
+ "
+ SELECT countyname,
ST_Distance(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape) AS
distance
+ FROM geomTable
+ ORDER BY distance DESC
+ LIMIT 5
+ ")
+geomTable.execute().print()
```
-### Join query
+## Convert Spatial Table to Spatial DataStream
-The details of a join query is available here [Join
query](../api/sql/Optimizer.md).
+### Get DataStream
-### Other queries
+Use TableEnv's toDataStream function
-There are lots of other functions can be combined with these queries. Please
read [SedonaSQL functions](../api/sql/Function.md) and [SedonaSQL aggregate
functions](../api/sql/AggregateFunction.md).
+```Java
+DataStream<Row> geomStream = tableEnv.toDataStream(geomTable)
+```
-## Save to permanent storage
+### Retrieve Geometries
-To save a Spatial DataFrame to some permanent storage such as Hive tables and
HDFS, you can simply convert each geometry in the Geometry type column back to
a plain String and save the plain DataFrame to wherever you want.
+Then get the Geometry from each Row object using Map
+```Java
+import org.locationtech.jts.geom.Geometry;
-Use the following code to convert the Geometry column in a DataFrame back to a
WKT string column:
-```Scala
-var stringDf = sparkSession.sql(
- """
- |SELECT ST_AsText(countyshape)
- |FROM polygondf
- """.stripMargin)
+DataStream<Geometry> geometries = geomStream.map(new MapFunction<Row,
Geometry>() {
+ @Override
+ public Geometry map(Row value) throws Exception {
+ return (Geometry) value.getField(0);
+ }
+ });
+geometries.print();
```
-!!!note
- ST_AsGeoJSON is also available. We would like to invite you to
contribute more functions
+The output will be
+```
+14> POLYGON ((1.5 1.5, 1.5 2.5, 2.5 2.5, 2.5 1.5, 1.5 1.5))
+2> POLYGON ((5.5 5.5, 5.5 6.5, 6.5 6.5, 6.5 5.5, 5.5 5.5))
+5> POLYGON ((8.5 8.5, 8.5 9.5, 9.5 9.5, 9.5 8.5, 8.5 8.5))
+16> POLYGON ((3.5 3.5, 3.5 4.5, 4.5 4.5, 4.5 3.5, 3.5 3.5))
+12> POLYGON ((-0.5 -0.5, -0.5 0.5, 0.5 0.5, 0.5 -0.5, -0.5 -0.5))
+13> POLYGON ((0.5 0.5, 0.5 1.5, 1.5 1.5, 1.5 0.5, 0.5 0.5))
+15> POLYGON ((2.5 2.5, 2.5 3.5, 3.5 3.5, 3.5 2.5, 2.5 2.5))
+3> POLYGON ((6.5 6.5, 6.5 7.5, 7.5 7.5, 7.5 6.5, 6.5 6.5))
+1> POLYGON ((4.5 4.5, 4.5 5.5, 5.5 5.5, 5.5 4.5, 4.5 4.5))
+4> POLYGON ((7.5 7.5, 7.5 8.5, 8.5 8.5, 8.5 7.5, 7.5 7.5))
+```
-## Convert between DataFrame and SpatialRDD
+### Store non-spatial attributes in Geometries
-### DataFrame to SpatialRDD
+You can concatenate other non-spatial attributes and store them in Geometry's
`userData` field so you can recover them later on. `userData` field can be any
object type.
-Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD.
Please read [Adapter
Scaladoc](/api/javadoc/sql/org/apache/sedona/sql/utils/index.html)
+```Java
+import org.locationtech.jts.geom.Geometry;
-```Scala
-var spatialRDD = Adapter.toSpatialRdd(spatialDf, "usacounty")
+DataStream<Geometry> geometries = geomStream.map(new MapFunction<Row,
Geometry>() {
+ @Override
+ public Geometry map(Row value) throws Exception {
+ Geometry geom = (Geometry) value.getField(0);
+ geom.setUserData(value.getField(1));
+ return geom;
+ }
+ });
+geometries.print();
```
-"usacounty" is the name of the geometry column
+The `print` command will not print out `userData` field. But you can get it
this way:
-!!!warning
- Only one Geometry type column is allowed per DataFrame.
+```Java
+import org.locationtech.jts.geom.Geometry;
+
+geometries.map(new MapFunction<Geometry, String>() {
+ @Override
+ public String map(Geometry value) throws Exception
+ {
+ return (String) value.getUserData();
+ }
+ }).print();
+```
+
+The output will be
+
+```
+13> polygon9
+6> polygon2
+10> polygon6
+11> polygon7
+5> polygon1
+12> polygon8
+8> polygon4
+4> polygon0
+7> polygon3
+9> polygon5
+```
-### SpatialRDD to DataFrame
+## Convert Spatial DataStream to Spatial Table
+
+### Create Geometries using Sedona FormatUtils
-Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD.
Please read [Adapter
Scaladoc](/api/javadoc/sql/org/apache/sedona/sql/utils/index.html)
+* Create a Geometry from a WKT string
-```Scala
-var spatialDf = Adapter.toDf(spatialRDD, sparkSession)
+```Java
+import org.apache.sedona.core.formatMapper.FormatUtils;
+import org.locationtech.jts.geom.Geometry;
+
+DataStream<Geometry> geometries = text.map(new MapFunction<String, Geometry>()
{
+ @Override
+ public Geometry map(String value) throws Exception
+ {
+ FormatUtils formatUtils = new
FormatUtils(FileDataSplitter.WKT, false);
+ return formatUtils.readGeometry(value);
+ }
+ })
```
-All other attributes such as price and age will be also brought to the
DataFrame as long as you specify ==carryOtherAttributes== (see [Read other
attributes in an SpatialRDD](../rdd#read-other-attributes-in-an-spatialrdd)).
+* Create a Point from a String `1.1, 2.2`. Use `,` as the delimiter.
-### SpatialPairRDD to DataFrame
+```Java
+import org.apache.sedona.core.formatMapper.FormatUtils;
+import org.locationtech.jts.geom.Geometry;
-PairRDD is the result of a spatial join query or distance join query.
SedonaSQL DataFrame-RDD Adapter can convert the result to a DataFrame. But you
need to provide the name of other attributes.
+DataStream<Geometry> geometries = text.map(new MapFunction<String, Geometry>()
{
+ @Override
+ public Geometry map(String value) throws Exception
+ {
+ FormatUtils<Geometry> formatUtils = new FormatUtils(",",
false, GeometryType.POINT);
+ return formatUtils.readGeometry(value);
+ }
+ })
+```
-```Scala
-var joinResultDf = Adapter.toDf(joinResultPairRDD, Seq("left_attribute1",
"left_attribute2"), Seq("right_attribute1", "right_attribute2"), sparkSession)
+* Create a Polygon from a String `1.1, 1.1, 10.1, 10.1`. This is a rectangle
with (1.1, 1.1) and (10.1, 10.1) as their min/max corners.
+
+```Java
+import org.apache.sedona.core.formatMapper.FormatUtils;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.Geometry;
+
+DataStream<Geometry> geometries = text.map(new MapFunction<String, Geometry>()
{
+ @Override
+ public Geometry map(String value) throws Exception
+ {
+ // Write some code to get four double type values: minX,
minY, maxX, maxY
+ ...
+ Coordinate[] coordinates = new Coordinate[5];
+ coordinates[0] = new Coordinate(minX, minY);
+ coordinates[1] = new Coordinate(minX, maxY);
+ coordinates[2] = new Coordinate(maxX, maxY);
+ coordinates[3] = new Coordinate(maxX, minY);
+ coordinates[4] = coordinates[0];
+ GeometryFactory geometryFactory = new GeometryFactory();
+ return geometryFactory.createPolygon(coordinates);
+ }
+ })
```
-or you can use the attribute names directly from the input RDD
+### Create Row objects
+
+Put a geometry in a Flink Row to a `geomStream`. Note that you can put other
attributes in Row as well. This example uses a constant value `myName` for all
geometries.
-```Scala
-import scala.collection.JavaConversions._
-var joinResultDf = Adapter.toDf(joinResultPairRDD, leftRdd.fieldNames,
rightRdd.fieldNames, sparkSession)
+```Java
+import org.apache.sedona.core.formatMapper.FormatUtils;
+import org.locationtech.jts.geom.Geometry;
+import org.apache.flink.types.Row;
+
+DataStream<Row> geomStream = text.map(new MapFunction<String, Row>() {
+ @Override
+ public Row map(String value) throws Exception
+ {
+ FormatUtils formatUtils = new
FormatUtils(FileDataSplitter.WKT, false);
+ return Row.of(formatUtils.readGeometry(value), "myName");
+ }
+ })
```
-All other attributes such as price and age will be also brought to the
DataFrame as long as you specify ==carryOtherAttributes== (see [Read other
attributes in an SpatialRDD](../rdd#read-other-attributes-in-an-spatialrdd)).
+### Get Spatial Table
+
+Use TableEnv's fromDataStream function, with two column names `geom` and
`geom_name`.
+```Java
+Table geomTable = tableEnv.fromDataStream(geomStream, "geom", "geom_name")
+```