This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 9cfb2243ed [SEDONA-718] Auto Detect geometry column in GeoJSON writer
(#1841)
9cfb2243ed is described below
commit 9cfb2243ed855f8894401834e16fd07b1efddd39
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Mar 5 13:23:36 2025 +0800
[SEDONA-718] Auto Detect geometry column in GeoJSON writer (#1841)
---
docs/tutorial/files/geojson-sedona-spark.md | 11 +-
.../sedona_sql/io/geojson/GeoJSONFileFormat.scala | 44 ++++++--
.../org/apache/sedona/sql/geojsonIOTests.scala | 117 +++++++++++++++++++++
3 files changed, 161 insertions(+), 11 deletions(-)
diff --git a/docs/tutorial/files/geojson-sedona-spark.md
b/docs/tutorial/files/geojson-sedona-spark.md
index 6081981b8f..56d4816efc 100644
--- a/docs/tutorial/files/geojson-sedona-spark.md
+++ b/docs/tutorial/files/geojson-sedona-spark.md
@@ -190,7 +190,16 @@ a_thing/
Sedona writes multiple GeoJSON files in parallel, which is faster than writing
a single file.
-Note that the DataFrame must contain a column named geometry for the write
operation to work.
+Note that the DataFrame must contain at least one column with geometry type
for the write operation to work. Sedona will use the following rules to
determine which column to use as the geometry:
+
+1. If there's a column named "geometry" with geometry type, Sedona will use
this column
+2. Otherwise, Sedona will use the first geometry column found in the root
schema
+
+You can also manually specify which geometry column to use with the
"geometry.column" option:
+
+```python
+df.write.format("geojson").option("geometry.column",
"geometry").save("/tmp/a_thing")
+```
Now let’s read these GeoJSON files into a DataFrame:
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
index 6a843d4754..ca588c33be 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
@@ -88,17 +88,25 @@ class GeoJSONFileFormat extends TextBasedFileFormat with
DataSourceRegister {
parsedOptions.compressionCodec.foreach { codec =>
CompressionCodecs.setCodecConfiguration(conf, codec)
}
- val geometryColumnName = options.getOrElse("geometry.column", "geometry")
- SparkCompatUtil.findNestedField(
- dataSchema,
- geometryColumnName.split('.'),
- resolver = SQLConf.get.resolver) match {
- case Some(StructField(_, dataType, _, _)) =>
- if (!dataType.acceptsType(GeometryUDT)) {
- throw new IllegalArgumentException(s"$geometryColumnName is not a
geometry column")
- }
+
+ val geometryColumnName = options.get("geometry.column") match {
+ case Some(columnName) =>
+ validateGeometryColumnName(dataSchema, columnName)
+ columnName
case None =>
- throw new IllegalArgumentException(s"Column $geometryColumnName not
found in the schema")
+ // Pick up a geometry column to be written as the geometry field of
GeoJSON.
+ // 1. If there's a column named "geometry" with geometry type, Sedona
will use this column
+ // 2. Otherwise, Sedona will use the first geometry column found in
the root schema
+ dataSchema.fields.filter(_.dataType == GeometryUDT) match {
+ case Array() =>
+ throw new IllegalArgumentException("No geometry column found in
the schema")
+ case geometryFields =>
+ // Look for a field named "geometry" first
+ geometryFields
+ .find(_.name.equalsIgnoreCase("geometry"))
+ .getOrElse(geometryFields.head) // Otherwise take the first
geometry field
+ .name
+ }
}
new OutputWriterFactory {
@@ -115,6 +123,22 @@ class GeoJSONFileFormat extends TextBasedFileFormat with
DataSourceRegister {
}
}
+ private def validateGeometryColumnName(
+ dataSchema: StructType,
+ geometryColumnName: String): Unit = {
+ SparkCompatUtil.findNestedField(
+ dataSchema,
+ geometryColumnName.split('.'),
+ resolver = SQLConf.get.resolver) match {
+ case Some(StructField(_, dataType, _, _)) =>
+ if (!dataType.acceptsType(GeometryUDT)) {
+ throw new IllegalArgumentException(s"$geometryColumnName is not a
geometry column")
+ }
+ case None =>
+ throw new IllegalArgumentException(s"Column $geometryColumnName not
found in the schema")
+ }
+ }
+
override def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
diff --git
a/spark/common/src/test/scala/org/apache/sedona/sql/geojsonIOTests.scala
b/spark/common/src/test/scala/org/apache/sedona/sql/geojsonIOTests.scala
index ce3f6b7cf0..61d79165d9 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/geojsonIOTests.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/geojsonIOTests.scala
@@ -71,6 +71,96 @@ class geojsonIOTests extends TestBaseScala with
BeforeAndAfterAll {
}
}
+ it("GeoJSON Test - Automatically select the top-level geometry column
named geometry") {
+ // We have a dataframe containing two geometry columns, named "g" and
"geometry".
+ // We want to write this dataframe to a GeoJSON file, and automatically
select the "geometry"
+ // column as the geometry column.
+ val df = sparkSession
+ .range(0, 10)
+ .toDF("id")
+ .withColumn("g", expr("ST_MakeLine(ST_Point(id, id), ST_Point(id, id +
1))"))
+ .withColumn("geometry", expr("ST_Point(id, id)"))
+ .withColumn("text", expr("concat('test', id)"))
+ df.write
+ .format("geojson")
+ .mode(SaveMode.Overwrite)
+ .save(geojsonoutputlocation + "/geojson_write.json")
+
+ // Read the GeoJSON back using JSON reader
+ val schema =
+ "type string, geometry string, properties struct<id:int, g: string,
text:string>"
+ val dfJson = sparkSession.read
+ .schema(schema)
+ .format("json")
+ .load(geojsonoutputlocation + "/geojson_write.json")
+ dfJson.collect().foreach { row =>
+
assert(row.getAs("geometry").toString.startsWith("{\"type\":\"Point\""))
+ val properties = row.getAs[GenericRowWithSchema]("properties")
+ assert(properties.getAs("text").toString.startsWith("test"))
+ assert(properties.getAs("g").toString.startsWith("LINESTRING"))
+ }
+ }
+
+ it("GeoJSON Test - Automatically select the first top-level geometry
column") {
+ // We have a dataframe containing two geometry columns, named "g0" and
"g1".
+ // We want to write this dataframe to a GeoJSON file, and automatically
select the "g0"
+ // column as the geometry column.
+ val df = sparkSession
+ .range(0, 10)
+ .toDF("id")
+ .withColumn("g0", expr("ST_MakeLine(ST_Point(id, id), ST_Point(id, id
+ 1))"))
+ .withColumn("g1", expr("ST_Point(id, id)"))
+ .withColumn("text", expr("concat('test', id)"))
+ df.write
+ .format("geojson")
+ .mode(SaveMode.Overwrite)
+ .save(geojsonoutputlocation + "/geojson_write.json")
+
+ // Read the GeoJSON back using JSON reader
+ val schema =
+ "type string, geometry string, properties struct<id:int, g1: string,
text:string>"
+ val dfJson = sparkSession.read
+ .schema(schema)
+ .format("json")
+ .load(geojsonoutputlocation + "/geojson_write.json")
+ dfJson.collect().foreach { row =>
+
assert(row.getAs("geometry").toString.startsWith("{\"type\":\"LineString\""))
+ val properties = row.getAs[GenericRowWithSchema]("properties")
+ assert(properties.getAs("text").toString.startsWith("test"))
+ assert(properties.getAs("g1").toString.startsWith("POINT"))
+ }
+ }
+
+ it("GeoJSON Test - Automatically select the top-level geometry column
other than geometry") {
+ // We have a dataframe containing two geometry columns, named "g" and
"geometry".
+ // We want to write this dataframe to a GeoJSON file, and automatically
select the "geometry"
+ // column as the geometry column.
+ val df = sparkSession
+ .range(0, 10)
+ .toDF("id")
+ .withColumn("g", expr("ST_MakeLine(ST_Point(id, id), ST_Point(id, id +
1))"))
+ .withColumn("geometry", expr("concat('g_', id)"))
+ .withColumn("text", expr("concat('test', id)"))
+ df.write
+ .format("geojson")
+ .mode(SaveMode.Overwrite)
+ .save(geojsonoutputlocation + "/geojson_write.json")
+
+ // Read the GeoJSON back using JSON reader
+ val schema =
+ "type string, geometry string, properties struct<id:int, geometry:
string, text:string>"
+ val dfJson = sparkSession.read
+ .schema(schema)
+ .format("json")
+ .load(geojsonoutputlocation + "/geojson_write.json")
+ dfJson.collect().foreach { row =>
+
assert(row.getAs("geometry").toString.startsWith("{\"type\":\"LineString\""))
+ val properties = row.getAs[GenericRowWithSchema]("properties")
+ assert(properties.getAs("text").toString.startsWith("test"))
+ assert(properties.getAs("geometry").toString.startsWith("g_"))
+ }
+ }
+
it("GeoJSON Test - Specifying geometry column other than geometry") {
val df = sparkSession
.range(0, 10)
@@ -110,6 +200,33 @@ class geojsonIOTests extends TestBaseScala with
BeforeAndAfterAll {
}
}
+ it("GeoJSON Test - Write a dataframe without geometry columns should throw
an exception") {
+ // Even though we have a column named "geometry", but it is not a
geometry column.
+ val df = sparkSession
+ .range(0, 10)
+ .toDF("id")
+ .withColumn("geometry", expr("concat('g_', id)"))
+ .withColumn("text", expr("concat('test', id)"))
+ assertThrows[IllegalArgumentException] {
+ df.write
+ .format("geojson")
+ .mode(SaveMode.Overwrite)
+ .save(geojsonoutputlocation + "/geojson_write.json")
+ }
+
+ // We have no geometry column, not even a column named "geometry".
+ val df2 = sparkSession
+ .range(0, 10)
+ .toDF("id")
+ .withColumn("text", expr("concat('test', id)"))
+ assertThrows[IllegalArgumentException] {
+ df2.write
+ .format("geojson")
+ .mode(SaveMode.Overwrite)
+ .save(geojsonoutputlocation + "/geojson_write.json")
+ }
+ }
+
it("GeoJSON Test - Specifying geometry column in a nested struct column") {
val df = sparkSession
.range(0, 10)