This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch feature/2639-simplify-kdbtree-geoparquet
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 2c44df17c383a8ddc7c747fd3534fd2197169046
Author: Jia Yu <[email protected]>
AuthorDate: Wed Feb 11 02:12:22 2026 -0800

    feat: add StructuredAdapter.repartitionBySpatialKey for simplified spatial 
partitioning
    
    Add a convenience method that wraps the multi-step process of:
    1. Converting DataFrame to SpatialRDD
    2. Calling analyze()
    3. Applying spatialPartitioningWithoutDuplicates
    4. Converting back to DataFrame
    
    into a single call:
    
      StructuredAdapter.repartitionBySpatialKey(df, GridType.KDBTREE, 10)
    
    This simplifies the workflow for generating spatially partitioned
    GeoParquet files. Added to both Scala and Python APIs, with tests
    and updated documentation.
    
    Closes #2639
---
 docs/tutorial/files/geoparquet-sedona-spark.md     | 61 +++++++++++++++++
 docs/tutorial/sql.md                               | 50 ++++++++++----
 python/sedona/spark/utils/structured_adapter.py    | 48 ++++++++++++++
 python/tests/sql/test_structured_adapter.py        | 19 ++++++
 .../sedona_sql/adapters/StructuredAdapter.scala    | 77 ++++++++++++++++++++++
 .../sedona/sql/structuredAdapterTestScala.scala    | 17 +++++
 6 files changed, 259 insertions(+), 13 deletions(-)

diff --git a/docs/tutorial/files/geoparquet-sedona-spark.md 
b/docs/tutorial/files/geoparquet-sedona-spark.md
index bbe8273b3e..af04dfc701 100644
--- a/docs/tutorial/files/geoparquet-sedona-spark.md
+++ b/docs/tutorial/files/geoparquet-sedona-spark.md
@@ -244,6 +244,67 @@ ORDER BY geohash
 
 Let's look closer at how Sedona uses the GeoParquet bbox metadata to optimize 
queries.
 
+## Spatial Partitioning for GeoParquet
+
+When building a GeoParquet data lake, spatial partitioning can dramatically 
improve query performance by co-locating spatially nearby records into the same 
partitions. This makes GeoParquet bbox-based file skipping much more effective 
because each file's bounding box covers a compact spatial region instead of 
spanning the entire dataset.
+
+Sedona provides a one-step API β€” `StructuredAdapter.repartitionBySpatialKey` β€” 
that handles spatial partitioning directly on DataFrames. Under the hood it 
converts to a SpatialRDD, applies a partitioning scheme such as KDB-Tree, and 
converts back to a DataFrame β€” all in a single call.
+
+=== "Python"
+
+    ```python
+    from sedona.core.enums import GridType
+    from sedona.spark.adapters.structured_adapter import StructuredAdapter
+
+    df = sedona.read.format("geoparquet").load("/path/to/input")
+
+    # Repartition with explicit geometry column and partition count
+    partitioned_df = StructuredAdapter.repartitionBySpatialKey(
+        df, GridType.KDBTREE, "geometry", 16
+    )
+
+    # Or auto-detect geometry column
+    partitioned_df = StructuredAdapter.repartitionBySpatialKey(df, 
GridType.KDBTREE)
+
+    partitioned_df.write.format("geoparquet").save("/path/to/output")
+    ```
+
+=== "Scala"
+
+    ```scala
+    import org.apache.sedona.core.enums.GridType
+    import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter
+
+    val df = sedona.read.format("geoparquet").load("/path/to/input")
+
+    // Repartition with explicit geometry column and partition count
+    val partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, 
"geometry", GridType.KDBTREE, 16)
+
+    // Or auto-detect geometry column
+    val partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, 
GridType.KDBTREE)
+
+    partitionedDf.write.format("geoparquet").save("/path/to/output")
+    ```
+
+=== "Java"
+
+    ```java
+    import org.apache.sedona.core.enums.GridType;
+    import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter;
+
+    Dataset<Row> df = 
sedona.read().format("geoparquet").load("/path/to/input");
+
+    // Repartition with explicit geometry column and partition count
+    Dataset<Row> partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, 
"geometry", GridType.KDBTREE, 16);
+
+    // Or auto-detect geometry column
+    Dataset<Row> partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, 
GridType.KDBTREE);
+
+    partitionedDf.write().format("geoparquet").save("/path/to/output");
+    ```
+
+This approach is more effective than sorting by GeoHash because it uses a 
KDB-Tree to create balanced spatial partitions that respect the actual data 
distribution. Each output file will cover a compact spatial region, maximizing 
the effectiveness of GeoParquet's bbox-based file skipping.
+
 ## How Sedona uses GeoParquet bounding box (bbox) metadata with Spark
 
 The bounding box metadata specifies the area covered by geometric shapes in a 
given file.  Suppose you query points in a region not covered by the bounding 
box for a given file.  The engine can skip that entire file when executing the 
query because it’s known that it does not cover any relevant data.
diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md
index 5031af398d..94fa501418 100644
--- a/docs/tutorial/sql.md
+++ b/docs/tutorial/sql.md
@@ -1383,37 +1383,61 @@ are introduced on purpose to ensure correctness when 
performing a spatial join;
 however, when using Sedona to prepare a dataset for distribution this is not 
typically
 desired.
 
-You can use `StructuredAdapter` and the 
`spatialRDD.spatialPartitioningWithoutDuplicates` function to obtain a Sedona 
DataFrame that is spatially partitioned without duplicates. This is especially 
useful for generating balanced GeoParquet files while preserving spatial 
proximity within files, which is crucial for optimizing filter pushdown 
performance in GeoParquet files.
+You can use `StructuredAdapter.repartitionBySpatialKey` to spatially partition 
a DataFrame in one step, without duplicates. This is especially useful for 
generating balanced GeoParquet files while preserving spatial proximity within 
files, which is crucial for optimizing filter pushdown performance in 
GeoParquet files.
 
 === "Scala"
 
        ```scala
-       spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE)
-       // Specify the desired number of partitions as 10, though the actual 
number may vary
-       // spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE, 10)
-       var spatialDf = StructuredAdapter.toSpatialPartitionedDf(spatialRDD, 
sedona)
+       import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter
+       import org.apache.sedona.core.enums.GridType
+
+       // Repartition using KDB-Tree with auto-detected geometry column
+       var spatialDf = StructuredAdapter.repartitionBySpatialKey(df, 
GridType.KDBTREE)
+
+       // Specify a geometry column and the desired number of partitions
+       var spatialDf = StructuredAdapter.repartitionBySpatialKey(df, 
"geometry", GridType.KDBTREE, 10)
+
+       // Write to GeoParquet
+       spatialDf.write.format("geoparquet").save("/path/to/output")
        ```
 
 === "Java"
 
        ```java
-       spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE)
-       // Specify the desired number of partitions as 10, though the actual 
number may vary
-       // spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE, 10)
-       Dataset<Row> spatialDf = 
StructuredAdapter.toSpatialPartitionedDf(spatialRDD, sedona)
+       import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter;
+       import org.apache.sedona.core.enums.GridType;
+
+       // Repartition using KDB-Tree with auto-detected geometry column
+       Dataset<Row> spatialDf = StructuredAdapter.repartitionBySpatialKey(df, 
GridType.KDBTREE);
+
+       // Specify a geometry column and the desired number of partitions
+       Dataset<Row> spatialDf = StructuredAdapter.repartitionBySpatialKey(df, 
"geometry", GridType.KDBTREE, 10);
+
+       // Write to GeoParquet
+       spatialDf.write().format("geoparquet").save("/path/to/output");
        ```
 
 === "Python"
 
        ```python
        from sedona.spark import StructuredAdapter
+       from sedona.spark.core.enums.grid_type import GridType
+
+       # Repartition using KDB-Tree with auto-detected geometry column
+       spatial_df = StructuredAdapter.repartitionBySpatialKey(df, 
GridType.KDBTREE)
+
+       # Specify a geometry column and the desired number of partitions
+       spatial_df = StructuredAdapter.repartitionBySpatialKey(
+           df, GridType.KDBTREE, geometryFieldName="geometry", numPartitions=10
+       )
 
-       spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE)
-       # Specify the desired number of partitions as 10, though the actual 
number may vary
-       # spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE, 10)
-       spatialDf = StructuredAdapter.toSpatialPartitionedDf(spatialRDD, sedona)
+       # Write to GeoParquet
+       spatial_df.write.format("geoparquet").save("/path/to/output")
        ```
 
+!!!note
+       You can also achieve spatial partitioning manually using the 
lower-level API. See the `spatialPartitioningWithoutDuplicates` method on 
`SpatialRDD` and `StructuredAdapter.toSpatialPartitionedDf` for the 
step-by-step approach.
+
 ### SpatialPairRDD to DataFrame
 
 PairRDD is the result of a spatial join query or distance join query. 
SedonaSQL DataFrame-RDD Adapter can convert the result to a DataFrame. But you 
need to provide the schema of the left and right RDDs.
diff --git a/python/sedona/spark/utils/structured_adapter.py 
b/python/sedona/spark/utils/structured_adapter.py
index d1d5e9bd29..633ef76398 100644
--- a/python/sedona/spark/utils/structured_adapter.py
+++ b/python/sedona/spark/utils/structured_adapter.py
@@ -18,6 +18,7 @@
 from pyspark.sql import DataFrame, SparkSession
 from pyspark.sql.types import StructType
 
+from sedona.spark.core.enums.grid_type import GridType
 from sedona.spark.core.SpatialRDD.spatial_rdd import SpatialRDD
 from sedona.spark.core.spatialOperator.rdd import SedonaPairRDD
 
@@ -124,3 +125,50 @@ class StructuredAdapter:
         )
         df = StructuredAdapter._create_dataframe(jdf, sparkSession)
         return df
+
+    @classmethod
+    def repartitionBySpatialKey(
+        cls,
+        dataFrame: DataFrame,
+        gridType: GridType = GridType.KDBTREE,
+        geometryFieldName: str = None,
+        numPartitions: int = 0,
+    ) -> DataFrame:
+        """
+        Repartition a DataFrame using a spatial partitioning scheme (e.g., 
KDB-Tree).
+        This is a convenience method that wraps the multi-step process of 
converting a
+        DataFrame to a SpatialRDD, applying spatial partitioning without 
duplicates,
+        and converting back to a DataFrame.
+
+        Example usage::
+
+            partitioned_df = StructuredAdapter.repartitionBySpatialKey(df, 
GridType.KDBTREE, "geometry", 16)
+            partitioned_df.write.format("geoparquet").save("/path/to/output")
+
+        Args:
+            dataFrame: The input DataFrame containing a geometry column.
+            gridType: The spatial partitioning grid type (default: 
GridType.KDBTREE).
+            geometryFieldName: The name of the geometry column. If None, 
auto-detects.
+            numPartitions: The target number of partitions. If 0, uses the 
current number.
+
+        Returns:
+            A spatially partitioned DataFrame.
+        """
+        sc = dataFrame._sc
+        jvm = sc._jvm
+        sparkSession = dataFrame.sparkSession
+
+        jgrid_type = jvm.org.apache.sedona.core.enums.GridType.getGridType(
+            gridType.value
+        )
+
+        if geometryFieldName is not None:
+            jdf = jvm.StructuredAdapter.repartitionBySpatialKey(
+                dataFrame._jdf, geometryFieldName, jgrid_type, numPartitions
+            )
+        else:
+            jdf = jvm.StructuredAdapter.repartitionBySpatialKey(
+                dataFrame._jdf, jgrid_type, numPartitions
+            )
+
+        return StructuredAdapter._create_dataframe(jdf, sparkSession)
diff --git a/python/tests/sql/test_structured_adapter.py 
b/python/tests/sql/test_structured_adapter.py
index 640540ca34..76a922dfc6 100644
--- a/python/tests/sql/test_structured_adapter.py
+++ b/python/tests/sql/test_structured_adapter.py
@@ -210,6 +210,25 @@ class TestStructuredAdapter(TestBase):
         result_count = query_result.count()
         assert result_count > 0, f"Expected at least one result, got 
{result_count}"
 
+    def test_repartition_by_spatial_key(self):
+        xys = [(i, i // 100, i % 100) for i in range(1_000)]
+        df = self.spark.createDataFrame(xys, ["id", "x", "y"]).selectExpr(
+            "id", "ST_Point(x, y) AS geom"
+        )
+        partitioned_df = StructuredAdapter.repartitionBySpatialKey(
+            df, GridType.KDBTREE, "geom", 4
+        )
+        assert partitioned_df.count() == 1_000
+        assert partitioned_df.rdd.getNumPartitions() >= 4
+
+    def test_repartition_by_spatial_key_auto_detect(self):
+        xys = [(i, i // 100, i % 100) for i in range(1_000)]
+        df = self.spark.createDataFrame(xys, ["id", "x", "y"]).selectExpr(
+            "id", "ST_Point(x, y) AS geom"
+        )
+        partitioned_df = StructuredAdapter.repartitionBySpatialKey(df, 
GridType.KDBTREE)
+        assert partitioned_df.count() == 1_000
+
     def test_toDf_preserves_columns_with_proper_types(self):
         # Create a spatial DataFrame with various columns and types
         data = [
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
index 4046f38fe9..cebca2e076 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
@@ -18,6 +18,7 @@
  */
 package org.apache.spark.sql.sedona_sql.adapters
 
+import org.apache.sedona.core.enums.GridType
 import org.apache.sedona.core.spatialPartitioning.GenericUniquePartitioner
 import org.apache.sedona.core.spatialRDD.SpatialRDD
 import org.apache.sedona.sql.utils.GeometrySerializer
@@ -245,4 +246,80 @@ object StructuredAdapter {
       originalRightSpatialRdd.schema,
       sparkSession)
   }
+
+  /**
+   * Repartition a DataFrame using a spatial partitioning scheme (e.g., 
KDB-Tree). This is a
+   * convenience method that wraps the multi-step process of converting a 
DataFrame to a
+   * SpatialRDD, applying spatial partitioning without duplicates, and 
converting back to a
+   * DataFrame.
+   *
+   * Example usage:
+   * {{{
+   * val partitionedDf = StructuredAdapter.repartitionBySpatialKey(df, 
"geometry", GridType.KDBTREE, 16)
+   * partitionedDf.write.format("geoparquet").save("/path/to/output")
+   * }}}
+   *
+   * @param dataFrame
+   *   The input DataFrame containing a geometry column.
+   * @param geometryFieldName
+   *   The name of the geometry column.
+   * @param gridType
+   *   The spatial partitioning grid type (e.g., GridType.KDBTREE).
+   * @param numPartitions
+   *   The target number of partitions. If 0, defaults to the current number 
of partitions.
+   * @return
+   *   A spatially partitioned DataFrame.
+   */
+  def repartitionBySpatialKey(
+      dataFrame: DataFrame,
+      geometryFieldName: String,
+      gridType: GridType,
+      numPartitions: Int = 0): DataFrame = {
+    val spatialRDD = toSpatialRdd(dataFrame, geometryFieldName)
+    spatialRDD.analyze()
+    val partCount =
+      if (numPartitions > 0) numPartitions
+      else dataFrame.rdd.getNumPartitions
+    spatialRDD.spatialPartitioningWithoutDuplicates(gridType, partCount)
+    toSpatialPartitionedDf(spatialRDD, dataFrame.sparkSession)
+  }
+
+  /**
+   * Repartition a DataFrame using a spatial partitioning scheme (e.g., 
KDB-Tree). Auto-detects
+   * the geometry column.
+   *
+   * @param dataFrame
+   *   The input DataFrame containing a geometry column.
+   * @param gridType
+   *   The spatial partitioning grid type (e.g., GridType.KDBTREE).
+   * @param numPartitions
+   *   The target number of partitions. If 0, defaults to the current number 
of partitions.
+   * @return
+   *   A spatially partitioned DataFrame.
+   */
+  def repartitionBySpatialKey(
+      dataFrame: DataFrame,
+      gridType: GridType,
+      numPartitions: Int): DataFrame = {
+    repartitionBySpatialKey(
+      dataFrame,
+      DfUtils.getGeometryColumnName(dataFrame.schema),
+      gridType,
+      numPartitions)
+  }
+
+  /**
+   * Repartition a DataFrame using a spatial partitioning scheme (e.g., 
KDB-Tree). Auto-detects
+   * the geometry column and uses the current number of partitions.
+   *
+   * @param dataFrame
+   *   The input DataFrame containing a geometry column.
+   * @param gridType
+   *   The spatial partitioning grid type (e.g., GridType.KDBTREE).
+   * @return
+   *   A spatially partitioned DataFrame.
+   */
+  def repartitionBySpatialKey(dataFrame: DataFrame, gridType: GridType): 
DataFrame = {
+    repartitionBySpatialKey(dataFrame, gridType, 0)
+  }
 }
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
 
b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
index d258ce3b40..54c76e7d44 100644
--- 
a/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
+++ 
b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
@@ -126,5 +126,22 @@ class structuredAdapterTestScala extends TestBaseScala 
with GivenWhenThen {
       val dfPartitions: Long = 
partitionedDF.select(spark_partition_id).distinct().count()
       assert(dfPartitions == numSpatialPartitions)
     }
+
+    it("Should repartition by spatial key in one step") {
+      val seq = generateTestData()
+      val dfOrigin = sparkSession.createDataFrame(seq)
+      val partitionedDf =
+        StructuredAdapter.repartitionBySpatialKey(dfOrigin, "_3", 
GridType.KDBTREE, 4)
+      assertEquals(seq.size, partitionedDf.count())
+      assert(partitionedDf.rdd.getNumPartitions >= 4)
+    }
+
+    it("Should repartition by spatial key with auto-detected geometry column") 
{
+      val seq = generateTestData()
+      val dfOrigin = sparkSession.createDataFrame(seq)
+      val partitionedDf =
+        StructuredAdapter.repartitionBySpatialKey(dfOrigin, GridType.KDBTREE)
+      assertEquals(seq.size, partitionedDf.count())
+    }
   }
 }

Reply via email to