This is an automated email from the ASF dual-hosted git repository. jiayu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push: new f8fe6f38d4 [GH-2037] Implement _row_wise_operation + intersection, intersect (#2038) f8fe6f38d4 is described below commit f8fe6f38d4e93de82f1ce7dddad93f910f1f0312 Author: Peter Nguyen <petern0...@gmail.com> AuthorDate: Wed Jul 2 14:56:29 2025 -0700 [GH-2037] Implement _row_wise_operation + intersection, intersect (#2038) * Refactor process_geometry_column to create a more flexible query_geometry_column() * Implement length() * Implement intersection * Implement intersects * Add rename field to _row_wise_operation and provide select instead of operation * Replace PS_INDEX_COL w/ imported SPARK_DEFAULT_INDEX_NAME --- python/sedona/geopandas/geoseries.py | 250 ++++++++++++++++++++- python/tests/geopandas/test_geoseries.py | 79 +++++++ .../tests/geopandas/test_match_geopandas_series.py | 25 +++ 3 files changed, 344 insertions(+), 10 deletions(-) diff --git a/python/sedona/geopandas/geoseries.py b/python/sedona/geopandas/geoseries.py index 5844b3b21d..cf17eb31a2 100644 --- a/python/sedona/geopandas/geoseries.py +++ b/python/sedona/geopandas/geoseries.py @@ -17,11 +17,12 @@ import os import typing -from typing import Any, Union, Literal +from typing import Any, Union, Literal, List import geopandas as gpd import pandas as pd import pyspark.pandas as pspd +import pyspark from pyspark.pandas import Series as PandasOnSparkSeries from pyspark.pandas._typing import Dtype from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame @@ -31,12 +32,15 @@ from pyspark.pandas.utils import scol_for, log_advice from pyspark.sql.types import BinaryType import shapely +from shapely.geometry.base import BaseGeometry from sedona.geopandas._typing import Label from sedona.geopandas.base import GeoFrame from sedona.geopandas.geodataframe import GeoDataFrame from sedona.geopandas.geoindex import GeoIndex +from pyspark.pandas.internal import SPARK_DEFAULT_INDEX_NAME # __index_level_0__ + class GeoSeries(GeoFrame, pspd.Series): """ @@ -380,7 +384,11 @@ class GeoSeries(GeoFrame, pspd.Series): return self._query_geometry_column(sql_expr, first_col, rename) def _query_geometry_column( - self, query: str, col: Union[str, None], rename: str + self, + query: str, + cols: Union[List[str], str], + rename: str, + df: pyspark.sql.DataFrame = None, ) -> "GeoSeries": """ Helper method to query a single geometry column with a specified operation. @@ -389,28 +397,38 @@ class GeoSeries(GeoFrame, pspd.Series): ---------- query : str The query to apply to the geometry column. - col : str - The name of the column to query. + cols : List[str] or str + The names of the columns to query. rename : str The name of the resulting column. + df : pyspark.sql.DataFrame + The dataframe to query. If not provided, the internal dataframe will be used. Returns ------- GeoSeries A GeoSeries with the operation applied to the geometry column. """ - if not col: + if not cols: raise ValueError("No valid geometry column found.") - data_type = self._internal.spark_frame.schema[col].dataType + if isinstance(cols, str): + cols = [cols] + + df = self._internal.spark_frame if df is None else df - if isinstance(data_type, BinaryType): - # the backticks here are important so we don't match strings that happen to be the same as the column name - query = query.replace(f"`{col}`", f"ST_GeomFromWKB(`{col}`)") + for col in cols: + data_type = df.schema[col].dataType + + rename = col if not rename else rename + + if isinstance(data_type, BinaryType): + # the backticks here are important so we don't match strings that happen to be the same as the column name + query = query.replace(f"`{col}`", f"ST_GeomFromWKB(`{col}`)") sql_expr = f"{query} as `{rename}`" - sdf = self._internal.spark_frame.selectExpr(sql_expr) + sdf = df.selectExpr(sql_expr) internal = InternalFrame( spark_frame=sdf, index_spark_columns=None, @@ -778,6 +796,218 @@ class GeoSeries(GeoFrame, pspd.Series): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.") + def intersects( + self, other: Union["GeoSeries", BaseGeometry], align: Union[bool, None] = None + ) -> pspd.Series: + """Returns a ``Series`` of ``dtype('bool')`` with value ``True`` for + each aligned geometry that intersects `other`. + + An object is said to intersect `other` if its `boundary` and `interior` + intersects in any way with those of the other. + + The operation works on a 1-to-1 row-wise manner: + + Parameters + ---------- + other : GeoSeries or geometric object + The GeoSeries (elementwise) or geometric object to test if is + intersected. + align : bool | None (default None) + If True, automatically aligns GeoSeries based on their indices. None defaults to True. + If False, the order of elements is preserved. (not supported in Sedona Geopandas) + + Returns + ------- + Series (bool) + + Examples + -------- + >>> from shapely.geometry import Polygon, LineString, Point + >>> s = geopandas.GeoSeries( + ... [ + ... Polygon([(0, 0), (2, 2), (0, 2)]), + ... LineString([(0, 0), (2, 2)]), + ... LineString([(2, 0), (0, 2)]), + ... Point(0, 1), + ... ], + ... ) + >>> s2 = geopandas.GeoSeries( + ... [ + ... LineString([(1, 0), (1, 3)]), + ... LineString([(2, 0), (0, 2)]), + ... Point(1, 1), + ... Point(-100, -100), + ... ], + ... index=range(1, 5), + ... ) + + We can check two GeoSeries against each other, row by row. + The GeoSeries above have different indices. We align both GeoSeries + based on index values and compare elements with the same index: + + >>> s.intersects(s2) + 0 True + 1 True + 2 True + 3 False + dtype: bool + + We can also check if each geometry of GeoSeries intersects a single + geometry: + + >>> line = LineString([(-1, 1), (3, 1)]) + >>> s.intersects(line) + 0 True + 1 True + 2 True + 3 True + dtype: bool + + Notes + ----- + This method works in a row-wise manner. It does not check if an element + of one GeoSeries ``crosses`` *any* element of the other one. + + See also + -------- + GeoSeries.disjoint + GeoSeries.crosses + GeoSeries.touches + GeoSeries.intersection + """ + return ( + self._row_wise_operation( + "ST_Intersects(`L`, `R`)", other, align, rename="intersects" + ) + .to_spark_pandas() + .astype("bool") + ) + + def intersection( + self, other: Union["GeoSeries", BaseGeometry], align: Union[bool, None] = None + ) -> "GeoSeries": + """Returns a ``GeoSeries`` of the intersection of points in each + aligned geometry with `other`. + + The operation works on a 1-to-1 row-wise manner: + + Parameters + ---------- + other : Geoseries or geometric object + The Geoseries (elementwise) or geometric object to find the + intersection with. + align : bool | None (default None) + If True, automatically aligns GeoSeries based on their indices. None defaults to True. + If False, the order of elements is preserved. (not supported in Sedona Geopandas) + + Returns + ------- + GeoSeries + + Examples + -------- + >>> from shapely.geometry import Polygon, LineString, Point + >>> s = geopandas.GeoSeries( + ... [ + ... Polygon([(0, 0), (2, 2), (0, 2)]), + ... Polygon([(0, 0), (2, 2), (0, 2)]), + ... LineString([(0, 0), (2, 2)]), + ... LineString([(2, 0), (0, 2)]), + ... Point(0, 1), + ... ], + ... ) + >>> s2 = geopandas.GeoSeries( + ... [ + ... Polygon([(0, 0), (1, 1), (0, 1)]), + ... LineString([(1, 0), (1, 3)]), + ... LineString([(2, 0), (0, 2)]), + ... Point(1, 1), + ... Point(-100, -100), + ... ], + ... ) + + We can do an intersection of each geometry and a single + shapely geometry: + + >>> geom = Polygon([(-0.5, -0.5), (-0.5, 2.5), (2.5, 2.5), (2.5, -0.5), (-0.5, -0.5)]) + >>> s.intersection(geom) + Polygon([(0, 0), (2, 2), (0, 2)]), + Polygon([(0, 0), (2, 2), (0, 2)]), + LineString([(0, 0), (2, 2)]), + LineString([(2, 0), (0, 2)]), + Point(0, 1), + dtype: geometry + + >>> geom = Polygon([(-0.5, -0.5), (-0.5, 2.5), (2.5, 2.5), (2.5, -0.5), (-0.5, -0.5)]) + >>> s.intersection(Polygon([(0, 0), (1, 1), (0, 1)])) + 0 POLYGON ((0 0, 2 2, 0 2)) + 1 POLYGON ((0 0, 2 2, 0 2)) + 2 LINESTRING (0 0, 2 2) + 3 LINESTRING (2 0, 0 2) + 4 POINT (0 1) + dtype: geometry + + We can also check two GeoSeries against each other, row by row. + The GeoSeries above have different indices. We align both GeoSeries + based on index values and compare elements with the same index. + + >>> s.intersection(s2) + 0 POLYGON ((0 0, 1 1, 0 1, 0 0)) + 1 LINESTRING (1 1, 1 2) + 2 POINT (1 1) + 3 POINT (1 1) + 4 POLYGON EMPTY + dtype: geometry + + See Also + -------- + GeoSeries.difference + GeoSeries.symmetric_difference + GeoSeries.union + """ + return self._row_wise_operation( + "ST_Intersection(`L`, `R`)", other, align, rename="intersection" + ) + + def _row_wise_operation( + self, + select: str, + other: Union["GeoSeries", BaseGeometry], + align: Union[bool, None], + rename: str, + ): + """ + Helper function to perform a row-wise operation on two GeoSeries. + The self column and other column are aliased to `L` and `R`, respectively. + """ + from pyspark.sql.functions import col + + # Note: this is specifically False. None is valid since it defaults to True similar to geopandas + if align is False: + raise NotImplementedError("Sedona Geopandas does not support align=False") + + if isinstance(other, BaseGeometry): + other = GeoSeries([other] * len(self)) + + assert isinstance(other, GeoSeries), f"Invalid type for other: {type(other)}" + + # TODO: this does not yet support multi-index + df = self._internal.spark_frame.select( + col(self.get_first_geometry_column()).alias("L"), + col(SPARK_DEFAULT_INDEX_NAME), + ) + other_df = other._internal.spark_frame.select( + col(other.get_first_geometry_column()).alias("R"), + col(SPARK_DEFAULT_INDEX_NAME), + ) + joined_df = df.join(other_df, on=SPARK_DEFAULT_INDEX_NAME, how="outer") + return self._query_geometry_column( + select, + cols=["L", "R"], + rename=rename, + df=joined_df, + ) + def intersection_all(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.") diff --git a/python/tests/geopandas/test_geoseries.py b/python/tests/geopandas/test_geoseries.py index cc1b3338b3..8c0805b5f8 100644 --- a/python/tests/geopandas/test_geoseries.py +++ b/python/tests/geopandas/test_geoseries.py @@ -50,6 +50,8 @@ class TestGeoSeries(TestBase): assert len(actual) == len(expected) sgpd_result = actual.to_geopandas() for a, e in zip(sgpd_result, expected): + if a.is_empty and e.is_empty: + continue self.assert_geometry_almost_equal(a, e) def test_area(self): @@ -322,6 +324,83 @@ class TestGeoSeries(TestBase): def test_union_all(self): pass + def test_intersects(self): + s = sgpd.GeoSeries( + [ + Polygon([(0, 0), (2, 2), (0, 2)]), + LineString([(0, 0), (2, 2)]), + LineString([(2, 0), (0, 2)]), + Point(0, 1), + ], + ) + s2 = sgpd.GeoSeries( + [ + LineString([(1, 0), (1, 3)]), + LineString([(2, 0), (0, 2)]), + Point(1, 1), + Point(-100, -100), + ], + ) + + result = s.intersects(s2) + expected = pd.Series([True, True, True, False]) + assert_series_equal(result.to_pandas(), expected) + + line = LineString([(-1, 1), (3, 1)]) + result = s.intersects(line) + expected = pd.Series([True, True, True, True]) + assert_series_equal(result.to_pandas(), expected) + + def test_intersection(self): + s = sgpd.GeoSeries( + [ + Polygon([(0, 0), (2, 2), (0, 2)]), + Polygon([(0, 0), (2, 2), (0, 2)]), + LineString([(0, 0), (2, 2)]), + LineString([(2, 0), (0, 2)]), + Point(0, 1), + ], + ) + + geom = Polygon( + [(-0.5, -0.5), (-0.5, 2.5), (2.5, 2.5), (2.5, -0.5), (-0.5, -0.5)] + ) + result = s.intersection(geom) + expected = gpd.GeoSeries( + [ + Polygon([(0, 0), (2, 2), (0, 2)]), + Polygon([(0, 0), (2, 2), (0, 2)]), + LineString([(0, 0), (2, 2)]), + LineString([(2, 0), (0, 2)]), + Point(0, 1), + ] + ) + self.check_sgpd_equals_gpd(result, expected) + + s2 = sgpd.GeoSeries( + [ + Polygon([(0, 0), (1, 1), (0, 1)]), + LineString([(1, 0), (1, 3)]), + LineString([(2, 0), (0, 2)]), + Point(1, 1), + Point(-100, -100), + ], + ) + result = s.intersection(s2) + expected = gpd.GeoSeries( + [ + Polygon([(0, 0), (0, 1), (1, 1), (0, 0)]), + LineString([(1, 1), (1, 2)]), + Point(1, 1), + Point(1, 1), + Point(), + ] + ) + self.check_sgpd_equals_gpd(result, expected) + + with pytest.raises(NotImplementedError): + s.intersection(s2, align=False) + def test_intersection_all(self): pass diff --git a/python/tests/geopandas/test_match_geopandas_series.py b/python/tests/geopandas/test_match_geopandas_series.py index c55db097db..ec89ba23bd 100644 --- a/python/tests/geopandas/test_match_geopandas_series.py +++ b/python/tests/geopandas/test_match_geopandas_series.py @@ -435,6 +435,28 @@ class TestMatchGeopandasSeries(TestBase): def test_union_all(self): pass + def test_intersects(self): + for _, geom in self.geoms: + for _, geom2 in self.geoms: + sgpd_result = GeoSeries(geom).intersects(GeoSeries(geom2)) + gpd_result = gpd.GeoSeries(geom).intersects(gpd.GeoSeries(geom2)) + self.check_pd_series_equal(sgpd_result, gpd_result) + + def test_intersection(self): + geometries = [ + Polygon([(0, 0), (1, 0), (1, 1)]), + Polygon([(2, 0), (3, 0), (3, 1)]), + Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]), + Polygon([(0, 0), (3, 0), (3, 3), (0, 2)]), + Polygon([(2, 0), (3, 0), (3, 3), (2, 3)]), + Point(0, 0), + ] + for g1 in geometries: + for g2 in geometries: + sgpd_result = GeoSeries(g1).intersection(GeoSeries(g2)) + gpd_result = gpd.GeoSeries(g1).intersection(gpd.GeoSeries(g2)) + self.check_sgpd_equals_gpd(sgpd_result, gpd_result) + def test_intersection_all(self): pass @@ -478,6 +500,9 @@ class TestMatchGeopandasSeries(TestBase): assert isinstance(expected, gpd.GeoSeries) sgpd_result = actual.to_geopandas() for a, e in zip(sgpd_result, expected): + # Sometimes sedona and geopandas both return empty geometries but of different types (e.g Point and Polygon) + if a.is_empty and e.is_empty: + continue self.assert_geometry_almost_equal( a, e, tolerance=1e-2 ) # increased tolerance from 1e-6