This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new a0178147c7 [GH-2208] Geopandas: Fix sjoin implementation + proper
naming and index behavior (#2209)
a0178147c7 is described below
commit a0178147c7b45e9d210824df1859118ff3e6348e
Author: Peter Nguyen <[email protected]>
AuthorDate: Mon Aug 4 12:17:16 2025 -0700
[GH-2208] Geopandas: Fix sjoin implementation + proper naming and index
behavior (#2209)
* Remove conversion to EWKB
* Remove support for sjoin-ing GeoSeries
* Use the proper active_geometry_column instead of first geom column
* Remove sjoin from geoseries.py and base.py
* Implement proper index behavior for sjoin
* Remove gpd match check because diff results across versions
* Fix eager call to .crs in geodataframe __init__
* Remove allow_override from set_geometry call
* Check pd.isna instead of is not None for crs check
* Use first() instead of first_value() for all versions in crs
* Remove redundant check (PR feedback)
* Fix suffixes and add tests
* Remove orderBy and add comment about using sort_index()
---
python/sedona/geopandas/base.py | 3 -
python/sedona/geopandas/geodataframe.py | 9 +--
python/sedona/geopandas/geoseries.py | 76 ++-----------------
python/sedona/geopandas/tools/sjoin.py | 129 +++++++++++++++++---------------
python/tests/geopandas/test_sjoin.py | 91 +++++++++++-----------
5 files changed, 126 insertions(+), 182 deletions(-)
diff --git a/python/sedona/geopandas/base.py b/python/sedona/geopandas/base.py
index 6098c7f245..36d444391d 100644
--- a/python/sedona/geopandas/base.py
+++ b/python/sedona/geopandas/base.py
@@ -2322,9 +2322,6 @@ class GeoFrame(metaclass=ABCMeta):
@abstractmethod
def plot(self, *args, **kwargs): ...
- @abstractmethod
- def sjoin(self, other, predicate="intersects", **kwargs): ...
-
def _delegate_to_geometry_column(op, this, *args, **kwargs):
geom_column = this.geometry
diff --git a/python/sedona/geopandas/geodataframe.py
b/python/sedona/geopandas/geodataframe.py
index 2d35b78dde..328634d115 100644
--- a/python/sedona/geopandas/geodataframe.py
+++ b/python/sedona/geopandas/geodataframe.py
@@ -417,13 +417,8 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame):
geometry: pspd.Series = self["geometry"]
if isinstance(geometry, sgpd.GeoSeries):
- geom_crs = geometry.crs
- if geom_crs is None:
- if crs is not None:
- self.set_geometry(geometry, inplace=True, crs=crs)
- else:
- if crs is not None and geom_crs != crs:
- raise ValueError(crs_mismatch_error)
+ if crs is not None:
+ self.set_geometry(geometry, inplace=True, crs=crs)
# No need to call set_geometry() here since it's already part of
the df, just set the name
self._geometry_column_name = "geometry"
diff --git a/python/sedona/geopandas/geoseries.py
b/python/sedona/geopandas/geoseries.py
index a3eabe5a09..f92c9a6090 100644
--- a/python/sedona/geopandas/geoseries.py
+++ b/python/sedona/geopandas/geoseries.py
@@ -466,30 +466,21 @@ class GeoSeries(GeoFrame, pspd.Series):
if len(self) == 0:
return None
- if parse_version(pyspark.__version__) >= parse_version("3.5.0"):
- spark_col = stf.ST_SRID(F.first_value(self.spark.column,
ignoreNulls=True))
- # Set this to avoid error complaining that we don't have a groupby
column
- is_aggr = True
- else:
- spark_col = stf.ST_SRID(self.spark.column)
- is_aggr = False
+ # F.first is non-deterministic, but it doesn't matter because all
non-null values should be the same
+ spark_col = stf.ST_SRID(F.first(self.spark.column, ignorenulls=True))
+ # Set this to avoid error complaining that we don't have a groupby
column
tmp_series = self._query_geometry_column(
spark_col,
returns_geom=False,
- is_aggr=is_aggr,
+ is_aggr=True,
)
# All geometries should have the same srid
# so we just take the srid of the first non-null element
-
- if parse_version(pyspark.__version__) >= parse_version("3.5.0"):
- srid = tmp_series.item()
- # Turn np.nan to 0 to avoid error
- srid = 0 if np.isnan(srid) else srid
- else:
- first_idx = tmp_series.first_valid_index()
- srid = tmp_series[first_idx] if first_idx is not None else 0
+ srid = tmp_series.item()
+ # Turn np.nan to 0 to avoid error
+ srid = 0 if np.isnan(srid) else srid
# Sedona returns 0 if doesn't exist
return CRS.from_user_input(srid) if srid != 0 else None
@@ -1557,59 +1548,6 @@ class GeoSeries(GeoFrame, pspd.Series):
"""
return self.to_geopandas().plot(*args, **kwargs)
- def sjoin(
- self,
- other,
- how="inner",
- predicate="intersects",
- lsuffix="left",
- rsuffix="right",
- distance=None,
- on_attribute=None,
- **kwargs,
- ):
- """Perform a spatial join between two GeoSeries.
-
- Parameters
- ----------
- other : GeoSeries
- The GeoSeries to join with.
- how : str, default 'inner'
- The type of join to perform.
- predicate : str, default 'intersects'
- The spatial predicate to use for the join.
- lsuffix : str, default 'left'
- Suffix to apply to the left GeoSeries' column names.
- rsuffix : str, default 'right'
- Suffix to apply to the right GeoSeries' column names.
- distance : float, optional
- The distance threshold for the join.
- on_attribute : str, optional
- The attribute to join on.
- **kwargs
- Additional arguments to pass to the join function.
-
- Returns
- -------
- GeoSeries
- A new GeoSeries containing the result of the spatial join.
-
- """
- from sedona.geopandas import sjoin
-
- # Implementation of the abstract method
- return sjoin(
- self,
- other,
- how,
- predicate,
- lsuffix,
- rsuffix,
- distance,
- on_attribute,
- **kwargs,
- )
-
@property
def geometry(self) -> "GeoSeries":
return self
diff --git a/python/sedona/geopandas/tools/sjoin.py
b/python/sedona/geopandas/tools/sjoin.py
index 2b9ce1923c..5e4a6184a6 100644
--- a/python/sedona/geopandas/tools/sjoin.py
+++ b/python/sedona/geopandas/tools/sjoin.py
@@ -16,11 +16,9 @@
# under the License.
import re
import pyspark.pandas as ps
-from pyspark.pandas.internal import InternalFrame
-from pyspark.pandas.series import first_series
+from pyspark.pandas.internal import SPARK_DEFAULT_INDEX_NAME, InternalFrame
from pyspark.pandas.utils import scol_for
-from pyspark.sql.functions import expr, col, lit
-from pyspark.sql.types import StructType, StructField, StringType, IntegerType
+from pyspark.sql.functions import expr
from sedona.geopandas import GeoDataFrame, GeoSeries
@@ -29,8 +27,8 @@ SUFFIX_PATTERN = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
def _frame_join(
- left_df,
- right_df,
+ left_df: GeoDataFrame,
+ right_df: GeoDataFrame,
how="inner",
predicate="intersects",
lsuffix="left",
@@ -42,9 +40,9 @@ def _frame_join(
Parameters
----------
- left_df : GeoDataFrame or GeoSeries
+ left_df : GeoDataFrame
Left dataset to join
- right_df : GeoDataFrame or GeoSeries
+ right_df : GeoDataFrame
Right dataset to join
how : str, default 'inner'
Join type: 'inner', 'left', 'right'
@@ -59,6 +57,8 @@ def _frame_join(
on_attribute : list, optional
Additional columns to join on
+ Note: Unlike GeoPandas, Sedona does not preserve key order for performance
reasons. Consider using .sort_index() after the join, if you need to preserve
the order.
+
Returns
-------
GeoDataFrame or GeoSeries
@@ -91,30 +91,18 @@ def _frame_join(
right_geom_col = None
# Find geometry columns in left dataframe
- for field in left_sdf.schema.fields:
- if field.dataType.typeName() in ("geometrytype", "binary"):
- left_geom_col = field.name
- break
+ left_geom_col = left_df.active_geometry_name
# Find geometry columns in right dataframe
- for field in right_sdf.schema.fields:
- if field.dataType.typeName() in ("geometrytype", "binary"):
- right_geom_col = field.name
- break
+ right_geom_col = right_df.active_geometry_name
- if left_geom_col is None or right_geom_col is None:
- raise ValueError("Both datasets must have geometry columns")
+ if not left_geom_col:
+ raise ValueError("Left dataframe geometry column not set")
+ if not right_geom_col:
+ raise ValueError("Right dataframe geometry column not set")
- # Prepare geometry expressions for spatial join
- if left_sdf.schema[left_geom_col].dataType.typeName() == "binary":
- left_geom_expr = f"ST_GeomFromWKB(`{left_geom_col}`) as l_geometry"
- else:
- left_geom_expr = f"`{left_geom_col}` as l_geometry"
-
- if right_sdf.schema[right_geom_col].dataType.typeName() == "binary":
- right_geom_expr = f"ST_GeomFromWKB(`{right_geom_col}`) as r_geometry"
- else:
- right_geom_expr = f"`{right_geom_col}` as r_geometry"
+ left_geom_expr = f"`{left_geom_col}` as l_geometry"
+ right_geom_expr = f"`{right_geom_col}` as r_geometry"
# Select all columns with geometry
left_cols = [left_geom_expr] + [
@@ -128,8 +116,12 @@ def _frame_join(
if field.name != right_geom_col and not field.name.startswith("__")
]
- left_geo_df = left_sdf.selectExpr(*left_cols)
- right_geo_df = right_sdf.selectExpr(*right_cols)
+ left_geo_df = left_sdf.selectExpr(
+ *left_cols, f"`{SPARK_DEFAULT_INDEX_NAME}` as index_{lsuffix}"
+ )
+ right_geo_df = right_sdf.selectExpr(
+ *right_cols, f"`{SPARK_DEFAULT_INDEX_NAME}` as index_{rsuffix}"
+ )
# Build spatial join condition
if predicate == "dwithin":
@@ -161,16 +153,31 @@ def _frame_join(
else:
raise ValueError(f"Join type '{how}' not supported")
+ # Pick which index to use for the resulting df's index based on 'how'
+ index_col = f"index_{lsuffix}" if how in ("inner", "left") else
f"index_{rsuffix}"
+
# Handle column naming with suffixes
final_columns = []
# Add geometry column (always from left for geopandas compatibility)
- # Currently, Sedona stores geometries in EWKB format
- final_columns.append("ST_AsEWKB(l_geometry) as geometry")
+ final_columns.append("l_geometry as geometry")
# Add other columns with suffix handling
- left_data_cols = [col for col in left_geo_df.columns if col !=
"l_geometry"]
- right_data_cols = [col for col in right_geo_df.columns if col !=
"r_geometry"]
+ left_data_cols = [
+ col
+ for col in left_geo_df.columns
+ if col not in ["l_geometry", f"index_{lsuffix}"]
+ ]
+ right_data_cols = [
+ col
+ for col in right_geo_df.columns
+ if col not in ["r_geometry", f"index_{rsuffix}"]
+ ]
+
+ final_columns.append(f"{index_col} as {SPARK_DEFAULT_INDEX_NAME}")
+
+ if index_col != f"index_{lsuffix}":
+ final_columns.append(f"index_{lsuffix}")
for col_name in left_data_cols:
base_name = col_name[2:] # Remove "l_" prefix
@@ -183,6 +190,9 @@ def _frame_join(
# Column only in left
final_columns.append(f"{col_name} as {base_name}")
+ if index_col != f"index_{rsuffix}":
+ final_columns.append(f"index_{rsuffix}")
+
for col_name in right_data_cols:
base_name = col_name[2:] # Remove "r_" prefix
left_col = f"l_{base_name}"
@@ -196,27 +206,25 @@ def _frame_join(
# Select final columns
result_df = spatial_join_df.selectExpr(*final_columns)
+ # Note, we do not .orderBy(SPARK_DEFAULT_INDEX_NAME) to avoid a
performance hit
- # Return appropriate type based on input
- if isinstance(left_df, GeoSeries) and isinstance(right_df, GeoSeries):
- # Return GeoSeries for GeoSeries inputs
- internal = InternalFrame(
- spark_frame=result_df,
- index_spark_columns=None,
- column_labels=[left_df._col_label],
- data_spark_columns=[scol_for(result_df, "geometry")],
- data_fields=[left_df._internal.data_fields[0]],
- column_label_names=left_df._internal.column_label_names,
- )
- return _to_geo_series(first_series(ps.DataFrame(internal)))
- else:
- # Return GeoDataFrame for GeoDataFrame inputs
- return GeoDataFrame(result_df)
+ data_spark_columns = [
+ scol_for(result_df, col)
+ for col in result_df.columns
+ if col != SPARK_DEFAULT_INDEX_NAME
+ ]
+
+ internal = InternalFrame(
+ spark_frame=result_df,
+ index_spark_columns=[scol_for(result_df, SPARK_DEFAULT_INDEX_NAME)],
+ data_spark_columns=data_spark_columns,
+ )
+ return GeoDataFrame(ps.DataFrame(internal))
def sjoin(
- left_df,
- right_df,
+ left_df: GeoDataFrame,
+ right_df: GeoDataFrame,
how="inner",
predicate="intersects",
lsuffix="left",
@@ -224,7 +232,7 @@ def sjoin(
distance=None,
on_attribute=None,
**kwargs,
-):
+) -> GeoDataFrame:
"""Spatial join of two GeoDataFrames.
Parameters
@@ -258,6 +266,11 @@ def sjoin(
If set, observations are joined only if the predicate applies
and values in specified columns match.
+ Returns
+ -------
+ GeoDataFrame
+ The joined GeoDataFrame.
+
Examples
--------
>>> groceries_w_communities = geopandas.sjoin(groceries, chicago)
@@ -325,15 +338,11 @@ def _basic_checks(left_df, right_df, how, lsuffix,
rsuffix, on_attribute=None):
on_attribute : list, default None
list of column names to merge on along with geometry
"""
- if not isinstance(left_df, (GeoSeries, GeoDataFrame)):
- raise ValueError(
- f"'left_df' should be GeoSeries or GeoDataFrame, got
{type(left_df)}"
- )
+ if not isinstance(left_df, GeoDataFrame):
+ raise ValueError(f"'left_df' should be GeoDataFrame, got
{type(left_df)}")
- if not isinstance(right_df, (GeoSeries, GeoDataFrame)):
- raise ValueError(
- f"'right_df' should be GeoSeries or GeoDataFrame, got
{type(right_df)}"
- )
+ if not isinstance(right_df, GeoDataFrame):
+ raise ValueError(f"'right_df' should be GeoDataFrame, got
{type(right_df)}")
allowed_hows = ["inner", "left", "right"]
if how not in allowed_hows:
diff --git a/python/tests/geopandas/test_sjoin.py
b/python/tests/geopandas/test_sjoin.py
index 9dda951f18..669869326b 100644
--- a/python/tests/geopandas/test_sjoin.py
+++ b/python/tests/geopandas/test_sjoin.py
@@ -18,10 +18,12 @@ import shutil
import tempfile
import pytest
import shapely
+import pandas as pd
+import geopandas as gpd
from shapely.geometry import Polygon, Point, LineString
-from sedona.geopandas import GeoSeries, GeoDataFrame, sjoin
-from tests.test_base import TestBase
+from sedona.geopandas import GeoDataFrame, sjoin
+from tests.geopandas.test_geopandas_base import TestGeopandasBase
from packaging.version import parse as parse_version
@@ -29,7 +31,7 @@ from packaging.version import parse as parse_version
parse_version(shapely.__version__) < parse_version("2.0.0"),
reason=f"Tests require shapely>=2.0.0, but found v{shapely.__version__}",
)
-class TestSpatialJoin(TestBase):
+class TestSpatialJoin(TestGeopandasBase):
def setup_method(self):
self.tempdir = tempfile.mkdtemp()
@@ -41,12 +43,6 @@ class TestSpatialJoin(TestBase):
self.point2 = Point(1.5, 1.5)
self.line1 = LineString([(0, 0), (1, 1)])
- # GeoSeries for testing
- self.g1 = GeoSeries([self.t1, self.t2])
- self.g2 = GeoSeries([self.sq, self.t1])
- self.g3 = GeoSeries([self.t1, self.t2], crs="epsg:4326")
- self.g4 = GeoSeries([self.t2, self.t1])
-
# GeoDataFrames for testing
self.gdf1 = GeoDataFrame(
{"geometry": [self.t1, self.t2], "id": [1, 2], "name": ["poly1",
"poly2"]}
@@ -78,24 +74,6 @@ class TestSpatialJoin(TestBase):
def teardown_method(self):
shutil.rmtree(self.tempdir)
- def test_sjoin_method1(self):
- """Test basic sjoin functionality with GeoSeries"""
- left = self.g1
- right = self.g2
- joined = sjoin(left, right)
- assert joined is not None
- assert type(joined) is GeoSeries
- assert joined.count() == 4
-
- def test_sjoin_method2(self):
- """Test GeoSeries.sjoin method"""
- left = self.g1
- right = self.g2
- joined = left.sjoin(right)
- assert joined is not None
- assert type(joined) is GeoSeries
- assert joined.count() == 4
-
def test_sjoin_geodataframe_basic(self):
"""Test basic sjoin with GeoDataFrame"""
joined = sjoin(self.gdf1, self.gdf2)
@@ -110,9 +88,25 @@ class TestSpatialJoin(TestBase):
def test_sjoin_geodataframe_method(self):
"""Test GeoDataFrame.sjoin method"""
joined = self.gdf1.sjoin(self.gdf2)
- assert joined is not None
- assert type(joined) is GeoDataFrame
- assert "geometry" in joined.columns
+ expected = gpd.GeoDataFrame(
+ {
+ "geometry": [
+ Polygon([(0, 0), (1, 0), (1, 1), (0, 0)]),
+ Polygon([(0, 0), (1, 0), (1, 1), (0, 0)]),
+ Polygon([(0, 0), (1, 1), (0, 1), (0, 0)]),
+ Polygon([(0, 0), (1, 1), (0, 1), (0, 0)]),
+ ],
+ "id_left": [1, 1, 2, 2],
+ "name": ["poly1", "poly1", "poly2", "poly2"],
+ "index_right": [0, 1, 0, 1],
+ "id_right": [3, 4, 3, 4],
+ "category": ["square", "triangle", "square", "triangle"],
+ },
+ index=pd.Index([0, 0, 1, 1]),
+ )
+ # Sedona's join does not preserve key order, so we sort by index for
testing exact results
+ joined.sort_index(inplace=True)
+ self.check_sgpd_df_equals_gpd_df(joined, expected)
def test_sjoin_predicates(self):
"""Test different spatial predicates"""
@@ -128,8 +122,10 @@ class TestSpatialJoin(TestBase):
for predicate in predicates:
try:
joined = sjoin(self.gdf1, self.gdf2, predicate=predicate)
- assert joined is not None
- assert type(joined) is GeoDataFrame
+ gpd_joined = self.gdf1.to_geopandas().sjoin(
+ self.gdf2.to_geopandas(), predicate=predicate
+ )
+ self.check_sgpd_df_equals_gpd_df(joined, gpd_joined)
except Exception as e:
# Some predicates might not return results for our test data
# but the function should not raise errors for valid predicates
@@ -148,17 +144,26 @@ class TestSpatialJoin(TestBase):
def test_sjoin_column_suffixes(self):
"""Test column suffix handling"""
- joined = sjoin(self.gdf1, self.gdf2, lsuffix="_left", rsuffix="_right")
- assert joined is not None
- assert type(joined) is GeoDataFrame
-
- # Check that suffixes are applied to overlapping columns
- columns = joined.columns
- if "id_left" in columns and "id_right" in columns:
- # Both datasets have 'id' column, so suffixes should be applied
- assert "id_left" in columns
- assert "id_right" in columns
- assert "id" not in columns # Original column should not exist
+ joined = sjoin(self.gdf1, self.gdf2, lsuffix="L", rsuffix="R")
+ expected = ["geometry", "id_L", "name", "index_R", "id_R", "category"]
+ assert list(joined.columns) == expected
+
+ # Specify only one side
+ joined = sjoin(self.gdf1, self.gdf2, lsuffix="L")
+ expected = ["geometry", "id_L", "name", "index_right", "id_right",
"category"]
+ assert list(joined.columns) == expected
+
+ # Use mixed suffixes
+ joined = sjoin(self.gdf1, self.gdf2, lsuffix="LEFT", rsuffix="random")
+ expected = [
+ "geometry",
+ "id_LEFT",
+ "name",
+ "index_random",
+ "id_random",
+ "category",
+ ]
+ assert list(joined.columns) == expected
def test_sjoin_dwithin_distance(self):
"""Test dwithin predicate with distance parameter"""