This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 1260245207 [SEDONA-705] Add unique partitioner wrapper to enable
partitioned writes with Sedona (#1778)
1260245207 is described below
commit 1260245207d434e73807d5a0379d232e526381ea
Author: Dewey Dunnington <[email protected]>
AuthorDate: Thu Feb 6 00:27:15 2025 -0600
[SEDONA-705] Add unique partitioner wrapper to enable partitioned writes
with Sedona (#1778)
* add generic unique partitioner
* first pass
* format, python
* remove parent class contructor constraint
* add java test
* make it so I can run pytest
* ignore files generated during Python test
* add python tests
* check something
* align default for the indexed grid partitioner
* maybe fix adapter error
* add comment
* update python API
* add docs
* spotless
* port documentation
* port test over
* port scala test
* actually use without dups
* maybe only warn sometimes
* Update docs/tutorial/sql.md
* Update docs/tutorial/sql.md
* Update docs/tutorial/sql.md
* Update docs/tutorial/sql.md
* lint
---------
Co-authored-by: Jia Yu <[email protected]>
---
docs/tutorial/sql.md | 39 +++++++++++
python/sedona/core/SpatialRDD/spatial_rdd.py | 59 ++++++++++++++--
python/tests/spatial_rdd/test_spatial_rdd.py | 26 +++++++
python/tests/sql/test_structured_adapter.py | 22 ++++++
python/tests/test_base.py | 10 +++
.../GenericUniquePartitioner.java | 80 ++++++++++++++++++++++
.../IndexedGridPartitioner.java | 2 +-
.../spatialPartitioning/SpatialPartitioner.java | 5 ++
.../apache/sedona/core/spatialRDD/SpatialRDD.java | 64 ++++++++++++++++-
.../sedona_sql/adapters/StructuredAdapter.scala | 8 ++-
.../GenericUniquePartitionerTest.java | 61 +++++++++++++++++
spark/common/src/test/resources/.gitignore | 1 +
.../sedona/sql/structuredAdapterTestScala.scala | 22 +++++-
13 files changed, 387 insertions(+), 12 deletions(-)
diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md
index ffea04132a..009a21c87e 100644
--- a/docs/tutorial/sql.md
+++ b/docs/tutorial/sql.md
@@ -1638,6 +1638,45 @@ Use SedonaSQL DataFrame-RDD Adapter to convert a
DataFrame to an SpatialRDD. Ple
spatialDf = StructuredAdapter.toDf(spatialRDD, sedona)
```
+### SpatialRDD to DataFrame with spatial partitioning
+
+By default, `StructuredAdapter.toDf()` does not preserve spatial partitions
because doing so
+may introduce duplicate features for most types of spatial data. These
duplicates
+are introduced on purpose to ensure correctness when performing a spatial join;
+however, when using Sedona to prepare a dataset for distribution this is not
typically
+desired.
+
+You can use `StructuredAdapter` and the
`spatialRDD.spatialPartitioningWithoutDuplicates` function to obtain a Sedona
DataFrame that is spatially partitioned without duplicates. This is especially
useful for generating balanced GeoParquet files while preserving spatial
proximity within files, which is crucial for optimizing filter pushdown
performance in GeoParquet files.
+
+=== "Scala"
+
+ ```scala
+ spatialRDD.spatialParitioningWithoutDuplicates(GridType.KDBTREE)
+ // Specify the desired number of partitions as 10, though the actual
number may vary
+ // spatialRDD.spatialParitioningWithoutDuplicates(GridType.KDBTREE, 10)
+ var spatialDf = StructuredAdapter.toSpatialPartitionedDf(spatialRDD,
sedona)
+ ```
+
+=== "Java"
+
+ ```java
+ spatialRDD.spatialParitioningWithoutDuplicates(GridType.KDBTREE)
+ // Specify the desired number of partitions as 10, though the actual
number may vary
+ // spatialRDD.spatialParitioningWithoutDuplicates(GridType.KDBTREE, 10)
+ Dataset<Row> spatialDf =
StructuredAdapter.toSpatialPartitionedDf(spatialRDD, sedona)
+ ```
+
+=== "Python"
+
+ ```python
+ from sedona.utils.structured_adapter import StructuredAdapter
+
+ spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE)
+ # Specify the desired number of partitions as 10, though the actual
number may vary
+ # spatialRDD.spatialParitioningWithoutDuplicates(GridType.KDBTREE, 10)
+ spatialDf = StructuredAdapter.toSpatialPartitionedDf(spatialRDD, sedona)
+ ```
+
### SpatialPairRDD to DataFrame
PairRDD is the result of a spatial join query or distance join query.
SedonaSQL DataFrame-RDD Adapter can convert the result to a DataFrame. But you
need to provide the schema of the left and right RDDs.
diff --git a/python/sedona/core/SpatialRDD/spatial_rdd.py
b/python/sedona/core/SpatialRDD/spatial_rdd.py
index a373309d8d..266676c0d5 100644
--- a/python/sedona/core/SpatialRDD/spatial_rdd.py
+++ b/python/sedona/core/SpatialRDD/spatial_rdd.py
@@ -19,7 +19,7 @@ import pickle
from typing import List, Optional, Union
import attr
-from py4j.java_gateway import get_field
+from py4j.java_gateway import get_field, get_method
from pyspark import RDD, SparkContext, StorageLevel
from pyspark.sql import SparkSession
@@ -51,6 +51,15 @@ class SpatialPartitioner:
return cls(partitioner, jvm_partitioner)
+ def getGrids(self) -> List[Envelope]:
+ jvm_grids = get_method(self.jvm_partitioner, "getGrids")()
+ number_of_grids = jvm_grids.size()
+ envelopes = [
+ Envelope.from_jvm_instance(jvm_grids[index])
+ for index in range(number_of_grids)
+ ]
+ return envelopes
+
@attr.s
class JvmSpatialRDD:
@@ -422,11 +431,49 @@ class SpatialRDD:
num_partitions: Optional[int] = None,
) -> bool:
"""
+ Calculate partitions and assign items in this RDD to a partition.
- :param partitioning: partitioning type
- :param num_partitions: number of partitions
- :return:
+ :param partitioning: Partitioning type or existing SpatialPartitioner
+ (e.g., one obtained from another SpatialRDD to align partitions among
+ input data)
+ :param num_partitions: If partitioning is a GridType, the target
+ number of partitions into which the RDD should be split.
+ :return: True on success
+ """
+ return self._spatial_partitioning_impl(
+ partitioning, num_partitions, self._srdd.spatialPartitioning
+ )
+
+ def spatialPartitioningWithoutDuplicates(
+ self,
+ partitioning: Union[str, GridType, SpatialPartitioner, List[Envelope]],
+ num_partitions: Optional[int] = None,
+ ) -> bool:
"""
+ Calculate partitions and assign items in this RDD to a partition
without
+ introducing duplicates. This is not the desired behaviour for
+ executing joins but is the correct option when partitioning in
+ preparation for a distributed write.
+
+ :param partitioning: Partitioning type or existing SpatialPartitioner
+ (e.g., one obtained from another SpatialRDD to align partitions among
+ input data)
+ :param num_partitions: If partitioning is a GridType, the target
+ number of partitions into which the RDD should be split.
+ :return: True on success
+ """
+ return self._spatial_partitioning_impl(
+ partitioning,
+ num_partitions,
+ self._srdd.spatialPartitioningWithoutDuplicates,
+ )
+
+ def _spatial_partitioning_impl(
+ self,
+ partitioning: Union[str, GridType, SpatialPartitioner, List[Envelope]],
+ num_partitions: Optional[int],
+ java_method,
+ ) -> bool:
if type(partitioning) == str:
grid = GridTypeJvm(self._jvm,
GridType.from_str(partitioning)).jvm_instance
elif type(partitioning) == GridType:
@@ -446,9 +493,9 @@ class SpatialRDD:
self._spatial_partitioned = True
if num_partitions:
- return self._srdd.spatialPartitioning(grid, num_partitions)
+ return java_method(grid, num_partitions)
else:
- return self._srdd.spatialPartitioning(grid)
+ return java_method(grid)
def set_srdd(self, srdd):
self._srdd = srdd
diff --git a/python/tests/spatial_rdd/test_spatial_rdd.py
b/python/tests/spatial_rdd/test_spatial_rdd.py
index ae1dd2f628..4230b87c53 100644
--- a/python/tests/spatial_rdd/test_spatial_rdd.py
+++ b/python/tests/spatial_rdd/test_spatial_rdd.py
@@ -26,6 +26,7 @@ from tests.tools import tests_resource
from sedona.core.enums import FileDataSplitter, GridType, IndexType
from sedona.core.formatMapper.geo_json_reader import GeoJsonReader
+from sedona.utils.adapter import Adapter
from sedona.core.geom.envelope import Envelope
from sedona.core.SpatialRDD import PointRDD
@@ -126,6 +127,10 @@ class TestSpatialRDD(TestBase):
else:
assert spatial_rdd.getPartitioner().name ==
"FlatGridPartitioner"
+ grids = spatial_rdd.getPartitioner().getGrids()
+ assert len(grids) > 0
+ assert all(isinstance(grid, Envelope) for grid in grids)
+
def test_get_raw_spatial_rdd(self):
spatial_rdd = self.create_spatial_rdd()
assert isinstance(spatial_rdd.getRawSpatialRDD(), RDD)
@@ -154,3 +159,24 @@ class TestSpatialRDD(TestBase):
spatial_rdd.spatialPartitioning(GridType.QUADTREE)
print(spatial_rdd.getPartitioner())
+
+ def test_partition_unique(self):
+ grids = [
+ Envelope(0.0, 10.0, 0.0, 10.0),
+ Envelope(10.0, 20.0, 0.0, 10.0),
+ Envelope(0.0, 10.0, 10.0, 20.0),
+ Envelope(10.0, 20.0, 10.0, 20.0),
+ ]
+
+ df = self.spark.createDataFrame(
+ [("POLYGON ((5 5, 15 5, 15 15, 5 15, 5 5))",)], ["wkt"]
+ ).selectExpr("ST_GeomFromText(wkt) as geometry")
+ spatial_rdd = Adapter.toSpatialRdd(df, "geometry")
+
+ spatial_rdd.spatialPartitioning(grids)
+ assert spatial_rdd.spatialPartitionedRDD.count() == 5
+ assert spatial_rdd.getPartitioner().getGrids() == grids
+
+ spatial_rdd.spatialPartitioningWithoutDuplicates(grids)
+ assert spatial_rdd.spatialPartitionedRDD.count() == 1
+ spatial_rdd.getPartitioner().getGrids() == grids
diff --git a/python/tests/sql/test_structured_adapter.py
b/python/tests/sql/test_structured_adapter.py
index 54fcbc44ef..f14a48f33e 100644
--- a/python/tests/sql/test_structured_adapter.py
+++ b/python/tests/sql/test_structured_adapter.py
@@ -14,6 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import glob
+import tempfile
+
from pyspark.sql import DataFrame
from sedona.core.SpatialRDD import CircleRDD
@@ -58,3 +61,22 @@ class TestStructuredAdapter(TestBase):
join_result_pair_rdd, schema, schema, self.spark
)
assert join_result_df.count() == 1
+
+ def test_spatial_partitioned_write(self):
+ xys = [(i, i // 100, i % 100) for i in range(1_000)]
+ df = self.spark.createDataFrame(xys, ["id", "x", "y"]).selectExpr(
+ "id", "ST_Point(x, y) AS geom"
+ )
+
+ rdd = StructuredAdapter.toSpatialRdd(df, "geom")
+ rdd.analyze()
+ rdd.spatialPartitioningWithoutDuplicates(GridType.KDBTREE,
num_partitions=16)
+ n_spatial_partitions = rdd.spatialPartitionedRDD.getNumPartitions()
+ assert n_spatial_partitions >= 16
+
+ partitioned_df = StructuredAdapter.toSpatialPartitionedDf(rdd,
self.spark)
+
+ with tempfile.TemporaryDirectory() as td:
+ out = td + "/out"
+ partitioned_df.write.format("geoparquet").save(out)
+ assert len(glob.glob(out + "/*.parquet")) == n_spatial_partitions
diff --git a/python/tests/test_base.py b/python/tests/test_base.py
index 84f27356f6..2769a93cdd 100644
--- a/python/tests/test_base.py
+++ b/python/tests/test_base.py
@@ -26,6 +26,7 @@ from sedona.spark import *
from sedona.utils.decorators import classproperty
SPARK_REMOTE = os.getenv("SPARK_REMOTE")
+EXTRA_JARS = os.getenv("SEDONA_PYTHON_EXTRA_JARS")
from shapely import wkt
from shapely.geometry.base import BaseGeometry
@@ -36,6 +37,10 @@ class TestBase:
@classproperty
def spark(self):
if not hasattr(self, "__spark"):
+ # This lets a caller override the value of SPARK_HOME to just use
whatever
+ # is provided by pyspark. Otherwise, export SPARK_HOME="" has no
effect.
+ if "SPARK_HOME" in os.environ and not os.environ["SPARK_HOME"]:
+ del os.environ["SPARK_HOME"]
builder = SedonaContext.builder()
if SPARK_REMOTE:
@@ -53,6 +58,11 @@ class TestBase:
else:
builder = builder.master("local[*]")
+ # Allows the Sedona .jar to be explicitly set by the caller (e.g,
to run
+ # pytest against a freshly-built development version of Sedona)
+ if EXTRA_JARS:
+ builder.config("spark.jars", EXTRA_JARS)
+
spark = SedonaContext.create(builder.getOrCreate())
if not SPARK_REMOTE:
diff --git
a/spark/common/src/main/java/org/apache/sedona/core/spatialPartitioning/GenericUniquePartitioner.java
b/spark/common/src/main/java/org/apache/sedona/core/spatialPartitioning/GenericUniquePartitioner.java
new file mode 100644
index 0000000000..214446d6dd
--- /dev/null
+++
b/spark/common/src/main/java/org/apache/sedona/core/spatialPartitioning/GenericUniquePartitioner.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.core.spatialPartitioning;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.sedona.core.enums.GridType;
+import org.apache.sedona.core.joinJudgement.DedupParams;
+import org.locationtech.jts.geom.Envelope;
+import org.locationtech.jts.geom.Geometry;
+import scala.Tuple2;
+
+public class GenericUniquePartitioner extends SpatialPartitioner {
+ private SpatialPartitioner parent;
+
+ public GenericUniquePartitioner(SpatialPartitioner parent) {
+ this.parent = parent;
+ }
+
+ public GridType getGridType() {
+ return parent.gridType;
+ }
+
+ public List<Envelope> getGrids() {
+ return parent.grids;
+ }
+
+ @Override
+ public Iterator<Tuple2<Integer, Geometry>> placeObject(Geometry
spatialObject) throws Exception {
+ // Rather than take the first result from the parent, consume the entire
iterator
+ // and return the partition with the minimum ID. This ensures that given
the same
+ // (parent) partitioner, the output partitions from this method will be
consistent.
+ Iterator<Tuple2<Integer, Geometry>> it = parent.placeObject(spatialObject);
+ int minParitionId = Integer.MAX_VALUE;
+ Geometry minGeometry = null;
+ while (it.hasNext()) {
+ Tuple2<Integer, Geometry> value = it.next();
+ if (value._1() < minParitionId) {
+ minParitionId = value._1();
+ minGeometry = value._2();
+ }
+ }
+
+ HashSet<Tuple2<Integer, Geometry>> out = new HashSet<Tuple2<Integer,
Geometry>>();
+ if (minGeometry != null) {
+ out.add(new Tuple2<Integer, Geometry>(minParitionId, minGeometry));
+ }
+
+ return out.iterator();
+ }
+
+ @Override
+ @Nullable
+ public DedupParams getDedupParams() {
+ throw new UnsupportedOperationException("Unique partitioner cannot
deduplicate join results");
+ }
+
+ @Override
+ public int numPartitions() {
+ return parent.numPartitions();
+ }
+}
diff --git
a/spark/common/src/main/java/org/apache/sedona/core/spatialPartitioning/IndexedGridPartitioner.java
b/spark/common/src/main/java/org/apache/sedona/core/spatialPartitioning/IndexedGridPartitioner.java
index 79073f2320..ab8d5cde11 100644
---
a/spark/common/src/main/java/org/apache/sedona/core/spatialPartitioning/IndexedGridPartitioner.java
+++
b/spark/common/src/main/java/org/apache/sedona/core/spatialPartitioning/IndexedGridPartitioner.java
@@ -48,7 +48,7 @@ public class IndexedGridPartitioner extends
FlatGridPartitioner {
}
public IndexedGridPartitioner(GridType gridType, List<Envelope> grids) {
- this(gridType, grids, false);
+ this(gridType, grids, true);
}
public IndexedGridPartitioner(List<Envelope> grids, Boolean
preserveUncontainedGeometries) {
diff --git
a/spark/common/src/main/java/org/apache/sedona/core/spatialPartitioning/SpatialPartitioner.java
b/spark/common/src/main/java/org/apache/sedona/core/spatialPartitioning/SpatialPartitioner.java
index c7deb3b704..96594c5c10 100644
---
a/spark/common/src/main/java/org/apache/sedona/core/spatialPartitioning/SpatialPartitioner.java
+++
b/spark/common/src/main/java/org/apache/sedona/core/spatialPartitioning/SpatialPartitioner.java
@@ -35,6 +35,11 @@ public abstract class SpatialPartitioner extends Partitioner
implements Serializ
protected final GridType gridType;
protected final List<Envelope> grids;
+ protected SpatialPartitioner() {
+ gridType = null;
+ grids = null;
+ }
+
protected SpatialPartitioner(GridType gridType, List<Envelope> grids) {
this.gridType = gridType;
this.grids = Objects.requireNonNull(grids, "grids");
diff --git
a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
index f286dd1588..b8b46ae35e 100644
---
a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
+++
b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
@@ -162,11 +162,71 @@ public class SpatialRDD<T extends Geometry> implements
Serializable {
return true;
}
+ public boolean spatialParitioningWithoutDuplicates(GridType gridType) throws
Exception {
+ int numPartitions = this.rawSpatialRDD.rdd().partitions().length;
+ spatialPartitioningWithoutDuplicates(gridType, numPartitions);
+ return true;
+ }
+
+ /**
+ * Calculate non-duplicate inducing partitioning
+ *
+ * <p>Note that non-duplicating partitioners are intended for use by
distributed partitioned
+ * writers and not able to be used for spatial joins.
+ *
+ * @param gridType The target GridType
+ * @param numPartitions The target number of partitions
+ * @throws Exception
+ */
+ public void spatialPartitioningWithoutDuplicates(GridType gridType, int
numPartitions)
+ throws Exception {
+ calc_partitioner(gridType, numPartitions);
+ partitioner = new GenericUniquePartitioner(partitioner);
+ this.spatialPartitionedRDD = partition(partitioner);
+ }
+
+ /**
+ * Calculate non-duplicate inducing partitioning from an existing
SpatialPartitioner
+ *
+ * <p>Note that non-duplicating partitioners are intended for use by
distributed partitioned
+ * writers and not able to be used for spatial joins.
+ *
+ * @param partitioner An existing partitioner obtained from the partitioning
of another
+ * SpatialRDD.
+ * @throws Exception
+ */
+ public void spatialPartitioningWithoutDuplicates(SpatialPartitioner
partitioner) {
+ partitioner = new GenericUniquePartitioner(partitioner);
+ this.spatialPartitionedRDD = partition(partitioner);
+ }
+
+ /**
+ * Calculate non-duplicate inducing partitioning based on a list of existing
envelopes
+ *
+ * <p>This is shorthand for spatialPartitioningWithoutDuplicates(new
IndexedGridPartitioner()).
+ * Using spatialPartitioningWithoutDuplicates(gridType, numPartitions) is
typically more
+ * appropriate because it is able to adapt to the content of the partition
and is able to produce
+ * more consistently balanced partitions.
+ *
+ * <p>Note that non-duplicating partitioners are intended for use by
distributed partitioned
+ * writers and not able to be used for spatial joins.
+ *
+ * @param otherGrids A list of existing envelopes
+ * @return true on success
+ * @throws Exception
+ */
+ public boolean spatialPartitioningWithoutDuplicates(final List<Envelope>
otherGrids)
+ throws Exception {
+ this.partitioner = new GenericUniquePartitioner(new
IndexedGridPartitioner(otherGrids));
+ this.spatialPartitionedRDD = partition(partitioner);
+ return true;
+ }
+
/**
* Spatial partitioning.
*
* @param gridType the grid type
- * @return true, if successful
+ * @param numPartitions the target number of partitions
* @throws Exception the exception
*/
public void calc_partitioner(GridType gridType, int numPartitions) throws
Exception {
@@ -281,7 +341,7 @@ public class SpatialRDD<T extends Geometry> implements
Serializable {
/** @deprecated Use spatialPartitioning(SpatialPartitioner partitioner) */
public boolean spatialPartitioning(final List<Envelope> otherGrids) throws
Exception {
- this.partitioner = new FlatGridPartitioner(otherGrids);
+ this.partitioner = new IndexedGridPartitioner(otherGrids);
this.spatialPartitionedRDD = partition(partitioner);
return true;
}
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
index d2ca5f8f09..70cef8d783 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.locationtech.jts.geom.Geometry
import org.slf4j.{Logger, LoggerFactory}
+import org.apache.sedona.core.spatialPartitioning.GenericUniquePartitioner
/**
* Adapter for converting between DataFrame and SpatialRDD. It provides
methods to convert
@@ -143,8 +144,11 @@ object StructuredAdapter {
if (spatialRDD.spatialPartitionedRDD == null)
throw new RuntimeException(
"SpatialRDD is not spatially partitioned. Please call
spatialPartitioning method before calling this method.")
- logger.warn(
- "SpatialPartitionedRDD might have duplicate geometries. Please make sure
you are aware of it.")
+
+ if (!spatialRDD.getPartitioner().isInstanceOf[GenericUniquePartitioner]) {
+ logger.warn(
+ "SpatialPartitionedRDD might have duplicate geometries. Please make
sure you are aware of it.")
+ }
val rowRdd = spatialRDD.spatialPartitionedRDD.map(geometry => {
val row = geometry.getUserData.asInstanceOf[InternalRow]
row
diff --git
a/spark/common/src/test/java/org/apache/sedona/core/spatialPartitioning/GenericUniquePartitionerTest.java
b/spark/common/src/test/java/org/apache/sedona/core/spatialPartitioning/GenericUniquePartitionerTest.java
new file mode 100644
index 0000000000..1df270c0a0
--- /dev/null
+++
b/spark/common/src/test/java/org/apache/sedona/core/spatialPartitioning/GenericUniquePartitionerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.core.spatialPartitioning;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Test;
+import org.locationtech.jts.geom.Envelope;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+import scala.Tuple2;
+
+public class GenericUniquePartitionerTest {
+ private final GeometryFactory factory = new GeometryFactory();
+
+ @Test
+ public void testUniquePartition() throws Exception {
+ ArrayList<Envelope> grids = new ArrayList<Envelope>();
+ grids.add(new Envelope(0, 10, 0, 10));
+ grids.add(new Envelope(10, 20, 0, 10));
+ grids.add(new Envelope(0, 10, 10, 20));
+ grids.add(new Envelope(10, 20, 10, 20));
+
+ FlatGridPartitioner partitioner = new FlatGridPartitioner(grids);
+ GenericUniquePartitioner uniquePartitioner = new
GenericUniquePartitioner(partitioner);
+
+ assertEquals(partitioner.getGridType(), uniquePartitioner.getGridType());
+ assertEquals(partitioner.getGrids(), uniquePartitioner.getGrids());
+
+ Envelope definitelyHasMultiplePartitions = new Envelope(5, 15, 5, 15);
+
+ Iterator<Tuple2<Integer, Geometry>> placedWithDuplicates =
+
partitioner.placeObject(factory.toGeometry(definitelyHasMultiplePartitions));
+ // Because the geometry is not completely contained by any of the
partitions,
+ // it also gets placed in the overflow partition (hence 5, not 4)
+ assertEquals(5, IteratorUtils.toList(placedWithDuplicates).size());
+
+ Iterator<Tuple2<Integer, Geometry>> placedWithoutDuplicates =
+
uniquePartitioner.placeObject(factory.toGeometry(definitelyHasMultiplePartitions));
+ assertEquals(1, IteratorUtils.toList(placedWithoutDuplicates).size());
+ }
+}
diff --git a/spark/common/src/test/resources/.gitignore
b/spark/common/src/test/resources/.gitignore
index 764e830895..958c6de423 100644
--- a/spark/common/src/test/resources/.gitignore
+++ b/spark/common/src/test/resources/.gitignore
@@ -1,2 +1,3 @@
*.DS_Store
real-*
+wkb/testSaveAs*
diff --git
a/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
index 6938916a6d..d258ce3b40 100644
---
a/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
+++
b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
@@ -21,6 +21,7 @@ package org.apache.sedona.sql
import org.apache.sedona.core.enums.{GridType, IndexType}
import org.apache.sedona.core.spatialOperator.{JoinQuery, SpatialPredicate}
import org.apache.sedona.core.spatialRDD.CircleRDD
+import org.apache.spark.sql.functions.spark_partition_id
import org.apache.spark.sql.Row
import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter
import org.junit.Assert.assertEquals
@@ -105,6 +106,25 @@ class structuredAdapterTestScala extends TestBaseScala
with GivenWhenThen {
assertEquals(0, spatialRdd.rawSpatialRDD.count())
assertEquals(0, spatialRdd.schema.size)
}
- }
+ it("can convert spatial RDD to Dataframe preserving spatial partitioning")
{
+ var pointCsvDF = sparkSession.read
+ .format("csv")
+ .option("delimiter", ",")
+ .option("header", "false")
+ .load(csvPointInputLocation)
+ pointCsvDF.createOrReplaceTempView("pointtable")
+ var pointDf = sparkSession.sql(
+ "select ST_Point(cast(pointtable._c0 as Decimal(24,20)),
cast(pointtable._c1 as Decimal(24,20))) as arealandmark from pointtable")
+ var srcRdd = StructuredAdapter.toSpatialRdd(pointDf, "arealandmark")
+ srcRdd.analyze()
+ srcRdd.spatialPartitioning(GridType.KDBTREE, 16)
+ var numSpatialPartitions = srcRdd.spatialPartitionedRDD.getNumPartitions
+ assert(numSpatialPartitions >= 16)
+
+ var partitionedDF = StructuredAdapter.toSpatialPartitionedDf(srcRdd,
sparkSession)
+ val dfPartitions: Long =
partitionedDF.select(spark_partition_id).distinct().count()
+ assert(dfPartitions == numSpatialPartitions)
+ }
+ }
}