This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new f8fe6f38d4 [GH-2037] Implement _row_wise_operation + intersection, 
intersect (#2038)
f8fe6f38d4 is described below

commit f8fe6f38d4e93de82f1ce7dddad93f910f1f0312
Author: Peter Nguyen <petern0...@gmail.com>
AuthorDate: Wed Jul 2 14:56:29 2025 -0700

    [GH-2037] Implement _row_wise_operation + intersection, intersect (#2038)
    
    * Refactor process_geometry_column to create a more flexible 
query_geometry_column()
    
    * Implement length()
    
    * Implement intersection
    
    * Implement intersects
    
    * Add rename field to _row_wise_operation and provide select instead of 
operation
    
    * Replace PS_INDEX_COL w/ imported SPARK_DEFAULT_INDEX_NAME
---
 python/sedona/geopandas/geoseries.py               | 250 ++++++++++++++++++++-
 python/tests/geopandas/test_geoseries.py           |  79 +++++++
 .../tests/geopandas/test_match_geopandas_series.py |  25 +++
 3 files changed, 344 insertions(+), 10 deletions(-)

diff --git a/python/sedona/geopandas/geoseries.py 
b/python/sedona/geopandas/geoseries.py
index 5844b3b21d..cf17eb31a2 100644
--- a/python/sedona/geopandas/geoseries.py
+++ b/python/sedona/geopandas/geoseries.py
@@ -17,11 +17,12 @@
 
 import os
 import typing
-from typing import Any, Union, Literal
+from typing import Any, Union, Literal, List
 
 import geopandas as gpd
 import pandas as pd
 import pyspark.pandas as pspd
+import pyspark
 from pyspark.pandas import Series as PandasOnSparkSeries
 from pyspark.pandas._typing import Dtype
 from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
@@ -31,12 +32,15 @@ from pyspark.pandas.utils import scol_for, log_advice
 from pyspark.sql.types import BinaryType
 
 import shapely
+from shapely.geometry.base import BaseGeometry
 
 from sedona.geopandas._typing import Label
 from sedona.geopandas.base import GeoFrame
 from sedona.geopandas.geodataframe import GeoDataFrame
 from sedona.geopandas.geoindex import GeoIndex
 
+from pyspark.pandas.internal import SPARK_DEFAULT_INDEX_NAME  # 
__index_level_0__
+
 
 class GeoSeries(GeoFrame, pspd.Series):
     """
@@ -380,7 +384,11 @@ class GeoSeries(GeoFrame, pspd.Series):
         return self._query_geometry_column(sql_expr, first_col, rename)
 
     def _query_geometry_column(
-        self, query: str, col: Union[str, None], rename: str
+        self,
+        query: str,
+        cols: Union[List[str], str],
+        rename: str,
+        df: pyspark.sql.DataFrame = None,
     ) -> "GeoSeries":
         """
         Helper method to query a single geometry column with a specified 
operation.
@@ -389,28 +397,38 @@ class GeoSeries(GeoFrame, pspd.Series):
         ----------
         query : str
             The query to apply to the geometry column.
-        col : str
-            The name of the column to query.
+        cols : List[str] or str
+            The names of the columns to query.
         rename : str
             The name of the resulting column.
+        df : pyspark.sql.DataFrame
+            The dataframe to query. If not provided, the internal dataframe 
will be used.
 
         Returns
         -------
         GeoSeries
             A GeoSeries with the operation applied to the geometry column.
         """
-        if not col:
+        if not cols:
             raise ValueError("No valid geometry column found.")
 
-        data_type = self._internal.spark_frame.schema[col].dataType
+        if isinstance(cols, str):
+            cols = [cols]
+
+        df = self._internal.spark_frame if df is None else df
 
-        if isinstance(data_type, BinaryType):
-            # the backticks here are important so we don't match strings that 
happen to be the same as the column name
-            query = query.replace(f"`{col}`", f"ST_GeomFromWKB(`{col}`)")
+        for col in cols:
+            data_type = df.schema[col].dataType
+
+            rename = col if not rename else rename
+
+            if isinstance(data_type, BinaryType):
+                # the backticks here are important so we don't match strings 
that happen to be the same as the column name
+                query = query.replace(f"`{col}`", f"ST_GeomFromWKB(`{col}`)")
 
         sql_expr = f"{query} as `{rename}`"
 
-        sdf = self._internal.spark_frame.selectExpr(sql_expr)
+        sdf = df.selectExpr(sql_expr)
         internal = InternalFrame(
             spark_frame=sdf,
             index_spark_columns=None,
@@ -778,6 +796,218 @@ class GeoSeries(GeoFrame, pspd.Series):
         # Implementation of the abstract method
         raise NotImplementedError("This method is not implemented yet.")
 
+    def intersects(
+        self, other: Union["GeoSeries", BaseGeometry], align: Union[bool, 
None] = None
+    ) -> pspd.Series:
+        """Returns a ``Series`` of ``dtype('bool')`` with value ``True`` for
+        each aligned geometry that intersects `other`.
+
+        An object is said to intersect `other` if its `boundary` and `interior`
+        intersects in any way with those of the other.
+
+        The operation works on a 1-to-1 row-wise manner:
+
+        Parameters
+        ----------
+        other : GeoSeries or geometric object
+            The GeoSeries (elementwise) or geometric object to test if is
+            intersected.
+        align : bool | None (default None)
+            If True, automatically aligns GeoSeries based on their indices. 
None defaults to True.
+            If False, the order of elements is preserved. (not supported in 
Sedona Geopandas)
+
+        Returns
+        -------
+        Series (bool)
+
+        Examples
+        --------
+        >>> from shapely.geometry import Polygon, LineString, Point
+        >>> s = geopandas.GeoSeries(
+        ...     [
+        ...         Polygon([(0, 0), (2, 2), (0, 2)]),
+        ...         LineString([(0, 0), (2, 2)]),
+        ...         LineString([(2, 0), (0, 2)]),
+        ...         Point(0, 1),
+        ...     ],
+        ... )
+        >>> s2 = geopandas.GeoSeries(
+        ...     [
+        ...         LineString([(1, 0), (1, 3)]),
+        ...         LineString([(2, 0), (0, 2)]),
+        ...         Point(1, 1),
+        ...         Point(-100, -100),
+        ...     ],
+        ...     index=range(1, 5),
+        ... )
+
+        We can check two GeoSeries against each other, row by row.
+        The GeoSeries above have different indices. We align both GeoSeries
+        based on index values and compare elements with the same index:
+
+        >>> s.intersects(s2)
+        0     True
+        1     True
+        2     True
+        3    False
+        dtype: bool
+
+        We can also check if each geometry of GeoSeries intersects a single
+        geometry:
+
+        >>> line = LineString([(-1, 1), (3, 1)])
+        >>> s.intersects(line)
+        0    True
+        1    True
+        2    True
+        3    True
+        dtype: bool
+
+        Notes
+        -----
+        This method works in a row-wise manner. It does not check if an element
+        of one GeoSeries ``crosses`` *any* element of the other one.
+
+        See also
+        --------
+        GeoSeries.disjoint
+        GeoSeries.crosses
+        GeoSeries.touches
+        GeoSeries.intersection
+        """
+        return (
+            self._row_wise_operation(
+                "ST_Intersects(`L`, `R`)", other, align, rename="intersects"
+            )
+            .to_spark_pandas()
+            .astype("bool")
+        )
+
+    def intersection(
+        self, other: Union["GeoSeries", BaseGeometry], align: Union[bool, 
None] = None
+    ) -> "GeoSeries":
+        """Returns a ``GeoSeries`` of the intersection of points in each
+        aligned geometry with `other`.
+
+        The operation works on a 1-to-1 row-wise manner:
+
+        Parameters
+        ----------
+        other : Geoseries or geometric object
+            The Geoseries (elementwise) or geometric object to find the
+            intersection with.
+        align : bool | None (default None)
+            If True, automatically aligns GeoSeries based on their indices. 
None defaults to True.
+            If False, the order of elements is preserved. (not supported in 
Sedona Geopandas)
+
+        Returns
+        -------
+        GeoSeries
+
+        Examples
+        --------
+        >>> from shapely.geometry import Polygon, LineString, Point
+        >>> s = geopandas.GeoSeries(
+        ...     [
+        ...         Polygon([(0, 0), (2, 2), (0, 2)]),
+        ...         Polygon([(0, 0), (2, 2), (0, 2)]),
+        ...         LineString([(0, 0), (2, 2)]),
+        ...         LineString([(2, 0), (0, 2)]),
+        ...         Point(0, 1),
+        ...     ],
+        ... )
+        >>> s2 = geopandas.GeoSeries(
+        ...     [
+        ...         Polygon([(0, 0), (1, 1), (0, 1)]),
+        ...         LineString([(1, 0), (1, 3)]),
+        ...         LineString([(2, 0), (0, 2)]),
+        ...         Point(1, 1),
+        ...         Point(-100, -100),
+        ...     ],
+        ... )
+
+        We can do an intersection of each geometry and a single
+        shapely geometry:
+
+        >>> geom = Polygon([(-0.5, -0.5), (-0.5, 2.5), (2.5, 2.5), (2.5, 
-0.5), (-0.5, -0.5)])
+        >>> s.intersection(geom)
+            Polygon([(0, 0), (2, 2), (0, 2)]),
+            Polygon([(0, 0), (2, 2), (0, 2)]),
+            LineString([(0, 0), (2, 2)]),
+            LineString([(2, 0), (0, 2)]),
+            Point(0, 1),
+        dtype: geometry
+
+        >>> geom = Polygon([(-0.5, -0.5), (-0.5, 2.5), (2.5, 2.5), (2.5, 
-0.5), (-0.5, -0.5)])
+        >>> s.intersection(Polygon([(0, 0), (1, 1), (0, 1)]))
+        0         POLYGON ((0 0, 2 2, 0 2))
+        1         POLYGON ((0 0, 2 2, 0 2))
+        2             LINESTRING (0 0, 2 2)
+        3             LINESTRING (2 0, 0 2)
+        4                       POINT (0 1)
+        dtype: geometry
+
+        We can also check two GeoSeries against each other, row by row.
+        The GeoSeries above have different indices. We align both GeoSeries
+        based on index values and compare elements with the same index.
+
+        >>> s.intersection(s2)
+        0    POLYGON ((0 0, 1 1, 0 1, 0 0))
+        1             LINESTRING (1 1, 1 2)
+        2                       POINT (1 1)
+        3                       POINT (1 1)
+        4                     POLYGON EMPTY
+        dtype: geometry
+
+        See Also
+        --------
+        GeoSeries.difference
+        GeoSeries.symmetric_difference
+        GeoSeries.union
+        """
+        return self._row_wise_operation(
+            "ST_Intersection(`L`, `R`)", other, align, rename="intersection"
+        )
+
+    def _row_wise_operation(
+        self,
+        select: str,
+        other: Union["GeoSeries", BaseGeometry],
+        align: Union[bool, None],
+        rename: str,
+    ):
+        """
+        Helper function to perform a row-wise operation on two GeoSeries.
+        The self column and other column are aliased to `L` and `R`, 
respectively.
+        """
+        from pyspark.sql.functions import col
+
+        # Note: this is specifically False. None is valid since it defaults to 
True similar to geopandas
+        if align is False:
+            raise NotImplementedError("Sedona Geopandas does not support 
align=False")
+
+        if isinstance(other, BaseGeometry):
+            other = GeoSeries([other] * len(self))
+
+        assert isinstance(other, GeoSeries), f"Invalid type for other: 
{type(other)}"
+
+        # TODO: this does not yet support multi-index
+        df = self._internal.spark_frame.select(
+            col(self.get_first_geometry_column()).alias("L"),
+            col(SPARK_DEFAULT_INDEX_NAME),
+        )
+        other_df = other._internal.spark_frame.select(
+            col(other.get_first_geometry_column()).alias("R"),
+            col(SPARK_DEFAULT_INDEX_NAME),
+        )
+        joined_df = df.join(other_df, on=SPARK_DEFAULT_INDEX_NAME, how="outer")
+        return self._query_geometry_column(
+            select,
+            cols=["L", "R"],
+            rename=rename,
+            df=joined_df,
+        )
+
     def intersection_all(self):
         # Implementation of the abstract method
         raise NotImplementedError("This method is not implemented yet.")
diff --git a/python/tests/geopandas/test_geoseries.py 
b/python/tests/geopandas/test_geoseries.py
index cc1b3338b3..8c0805b5f8 100644
--- a/python/tests/geopandas/test_geoseries.py
+++ b/python/tests/geopandas/test_geoseries.py
@@ -50,6 +50,8 @@ class TestGeoSeries(TestBase):
         assert len(actual) == len(expected)
         sgpd_result = actual.to_geopandas()
         for a, e in zip(sgpd_result, expected):
+            if a.is_empty and e.is_empty:
+                continue
             self.assert_geometry_almost_equal(a, e)
 
     def test_area(self):
@@ -322,6 +324,83 @@ class TestGeoSeries(TestBase):
     def test_union_all(self):
         pass
 
+    def test_intersects(self):
+        s = sgpd.GeoSeries(
+            [
+                Polygon([(0, 0), (2, 2), (0, 2)]),
+                LineString([(0, 0), (2, 2)]),
+                LineString([(2, 0), (0, 2)]),
+                Point(0, 1),
+            ],
+        )
+        s2 = sgpd.GeoSeries(
+            [
+                LineString([(1, 0), (1, 3)]),
+                LineString([(2, 0), (0, 2)]),
+                Point(1, 1),
+                Point(-100, -100),
+            ],
+        )
+
+        result = s.intersects(s2)
+        expected = pd.Series([True, True, True, False])
+        assert_series_equal(result.to_pandas(), expected)
+
+        line = LineString([(-1, 1), (3, 1)])
+        result = s.intersects(line)
+        expected = pd.Series([True, True, True, True])
+        assert_series_equal(result.to_pandas(), expected)
+
+    def test_intersection(self):
+        s = sgpd.GeoSeries(
+            [
+                Polygon([(0, 0), (2, 2), (0, 2)]),
+                Polygon([(0, 0), (2, 2), (0, 2)]),
+                LineString([(0, 0), (2, 2)]),
+                LineString([(2, 0), (0, 2)]),
+                Point(0, 1),
+            ],
+        )
+
+        geom = Polygon(
+            [(-0.5, -0.5), (-0.5, 2.5), (2.5, 2.5), (2.5, -0.5), (-0.5, -0.5)]
+        )
+        result = s.intersection(geom)
+        expected = gpd.GeoSeries(
+            [
+                Polygon([(0, 0), (2, 2), (0, 2)]),
+                Polygon([(0, 0), (2, 2), (0, 2)]),
+                LineString([(0, 0), (2, 2)]),
+                LineString([(2, 0), (0, 2)]),
+                Point(0, 1),
+            ]
+        )
+        self.check_sgpd_equals_gpd(result, expected)
+
+        s2 = sgpd.GeoSeries(
+            [
+                Polygon([(0, 0), (1, 1), (0, 1)]),
+                LineString([(1, 0), (1, 3)]),
+                LineString([(2, 0), (0, 2)]),
+                Point(1, 1),
+                Point(-100, -100),
+            ],
+        )
+        result = s.intersection(s2)
+        expected = gpd.GeoSeries(
+            [
+                Polygon([(0, 0), (0, 1), (1, 1), (0, 0)]),
+                LineString([(1, 1), (1, 2)]),
+                Point(1, 1),
+                Point(1, 1),
+                Point(),
+            ]
+        )
+        self.check_sgpd_equals_gpd(result, expected)
+
+        with pytest.raises(NotImplementedError):
+            s.intersection(s2, align=False)
+
     def test_intersection_all(self):
         pass
 
diff --git a/python/tests/geopandas/test_match_geopandas_series.py 
b/python/tests/geopandas/test_match_geopandas_series.py
index c55db097db..ec89ba23bd 100644
--- a/python/tests/geopandas/test_match_geopandas_series.py
+++ b/python/tests/geopandas/test_match_geopandas_series.py
@@ -435,6 +435,28 @@ class TestMatchGeopandasSeries(TestBase):
     def test_union_all(self):
         pass
 
+    def test_intersects(self):
+        for _, geom in self.geoms:
+            for _, geom2 in self.geoms:
+                sgpd_result = GeoSeries(geom).intersects(GeoSeries(geom2))
+                gpd_result = 
gpd.GeoSeries(geom).intersects(gpd.GeoSeries(geom2))
+                self.check_pd_series_equal(sgpd_result, gpd_result)
+
+    def test_intersection(self):
+        geometries = [
+            Polygon([(0, 0), (1, 0), (1, 1)]),
+            Polygon([(2, 0), (3, 0), (3, 1)]),
+            Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]),
+            Polygon([(0, 0), (3, 0), (3, 3), (0, 2)]),
+            Polygon([(2, 0), (3, 0), (3, 3), (2, 3)]),
+            Point(0, 0),
+        ]
+        for g1 in geometries:
+            for g2 in geometries:
+                sgpd_result = GeoSeries(g1).intersection(GeoSeries(g2))
+                gpd_result = gpd.GeoSeries(g1).intersection(gpd.GeoSeries(g2))
+                self.check_sgpd_equals_gpd(sgpd_result, gpd_result)
+
     def test_intersection_all(self):
         pass
 
@@ -478,6 +500,9 @@ class TestMatchGeopandasSeries(TestBase):
         assert isinstance(expected, gpd.GeoSeries)
         sgpd_result = actual.to_geopandas()
         for a, e in zip(sgpd_result, expected):
+            # Sometimes sedona and geopandas both return empty geometries but 
of different types (e.g Point and Polygon)
+            if a.is_empty and e.is_empty:
+                continue
             self.assert_geometry_almost_equal(
                 a, e, tolerance=1e-2
             )  # increased tolerance from 1e-6

Reply via email to