zhangfengcdt commented on code in PR #2067: URL: https://github.com/apache/sedona/pull/2067#discussion_r2192656883
########## python/sedona/geopandas/sindex.py: ########## @@ -0,0 +1,225 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import numpy as np +from pyspark.pandas.utils import log_advice +from pyspark.sql import DataFrame as PySparkDataFrame + +from sedona.spark.utils.adapter import Adapter +from sedona.spark.core.enums import IndexType + + +class SpatialIndex: + """ + A wrapper around Sedona's spatial index functionality. + """ + + def __init__(self, geometry, index_type="strtree", column_name=None): + """ + Initialize the SpatialIndex with geometry data. + + Parameters + ---------- + geometry : np.array of Shapely geometries, PySparkDataFrame column, or PySparkDataFrame + index_type : str, default "strtree" + The type of spatial index to use. + column_name : str, optional + The column name to extract geometry from if `geometry` is a PySparkDataFrame. + """ + + if isinstance(geometry, np.ndarray): + self.geometry = geometry + self.index_type = index_type + self._dataframe = None + self._is_spark = False + # Build local index for numpy array + self._build_local_index() + elif isinstance(geometry, PySparkDataFrame): + if column_name is None: + raise ValueError( + "column_name must be specified when geometry is a PySparkDataFrame" + ) + self.geometry = geometry[column_name] + self.index_type = index_type + self._dataframe = geometry + self._is_spark = True + # Build distributed spatial index + self._build_spark_index(column_name) + else: + raise TypeError( + "Invalid type for `geometry`. Expected np.array or PySparkDataFrame." + ) + + def query(self, geometry, predicate=None, sort=False): + """ + Query the spatial index for geometries that intersect the given geometry. + + Parameters + ---------- + geometry : Shapely geometry + The geometry to query against the spatial index. + predicate : str, optional + Spatial predicate to filter results (e.g., 'intersects', 'contains'). + sort : bool, optional, default False + Whether to sort the results. + + Returns + ------- + list + List of indices of matching geometries. + """ + log_advice( + "`query` returns local list of indices of matching geometries onto driver's memory. " + "It should only be used if the resulting collection is expected to be small." + ) + + if self.is_empty: + return [] + + if self._is_spark: + # For Spark-based spatial index + from sedona.spark.core.spatialOperator import RangeQuery + + # Execute the spatial range query + if predicate == "contains": + result_rdd = RangeQuery.SpatialRangeQuery( + self._indexed_rdd, geometry, True, True + ) + else: # Default to intersects + result_rdd = RangeQuery.SpatialRangeQuery( + self._indexed_rdd, geometry, False, True + ) + + results = result_rdd.collect() + return results + else: + # For local spatial index based on Shapely STRtree + if predicate == "contains": + # STRtree doesn't directly support contains predicate + # We need to filter results after querying + candidate_indices = self._index.query(geometry) + results = [ + i for i in candidate_indices if geometry.contains(self.geometry[i]) + ] + else: + # Default is intersects + results = self._index.query(geometry) + + if sort and results: + # Sort by distance to the query geometry if requested + results = sorted( + results, key=lambda i: self.geometry[i].distance(geometry) + ) + + return results + + def nearest(self, geometry, k=1, return_distance=False): + """ + Find the nearest geometry in the spatial index. + + Parameters + ---------- + geometry : Shapely geometry + The geometry to find the nearest neighbor for. + k : int, optional, default 1 + Number of nearest neighbors to find. + return_distance : bool, optional, default False + Whether to return distances along with indices. + + Returns + ------- + list or tuple + List of indices of nearest geometries, optionally with distances. + """ + # Placeholder for KNN query using Sedona + raise NotImplementedError("This method is not implemented yet.") + + def intersection(self, bounds): + """ + Find geometries that intersect the given bounding box. + + Parameters + ---------- + bounds : tuple + Bounding box as (min_x, min_y, max_x, max_y). + + Returns + ------- + list + List of indices of matching geometries. + """ + raise NotImplementedError("This method is not implemented yet.") + + @property + def size(self): + """ + Get the size of the spatial index. + + Returns + ------- + int + Number of geometries in the index. + """ + if self._is_spark: + return self._dataframe.count() + return len(self.geometry) + + @property + def is_empty(self): + """ + Check if the spatial index is empty. + + Returns + ------- + bool + True if the index is empty, False otherwise. + """ + return self.size == 0 + + def _build_spark_index(self, column_name): + """ + Build a distributed spatial index on the geometry column of the DataFrame. + + This uses Sedona's built-in indexing functionality. + """ + + # Convert index_type string to Sedona IndexType enum + index_type_map = {"strtree": IndexType.RTREE, "quadtree": IndexType.QUADTREE} + sedona_index_type = index_type_map.get(self.index_type.lower(), IndexType.RTREE) + + # Create a SpatialRDD from the DataFrame + spatial_rdd = Adapter.toSpatialRdd(self._dataframe, column_name) Review Comment: Yes, i have tried using: ``` StructuredAdapter.toSpatialRdd(self._dataframe, column_name) ``` And it looks like it only works with Point data in the RangeQuery on the Rdd converted this way. For other types, it throws errors. Here is the unit test that used to test StructuredAdapter with Range Query: ``` def test_build_index_and_range_query(self): from sedona.spark.core.SpatialRDD import PointRDD from sedona.spark.core.enums import IndexType from sedona.spark.core.spatialOperator import RangeQuery from shapely.geometry import Polygon # # Create a dataframe with points # xys = [(i, i // 100, i % 100) for i in range(1000)] # spatial_df = self.spark.createDataFrame(xys, ["id", "x", "y"]).selectExpr( # "id", "ST_Point(x, y) AS geometry" # ) # Create a spatial DataFrame with polygons polygons_data = [ (1, "POLYGON((0 0, 1 0, 1 1, 0 1, 0 0))"), (2, "POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))"), (3, "POLYGON((2 2, 3 2, 3 3, 2 3, 2 2))"), (4, "POLYGON((3 3, 4 3, 4 4, 3 4, 3 3))"), (5, "POLYGON((4 4, 5 4, 5 5, 4 5, 4 4))"), ] df = self.spark.createDataFrame(polygons_data, ["id", "wkt"]) spatial_df = df.withColumn("geometry", expr("ST_GeomFromWKT(wkt)")) # Convert to SpatialRDD spatial_rdd = StructuredAdapter.toSpatialRdd(spatial_df, "geometry") # Analyze and build index on the spatial RDD spatial_rdd.analyze() spatial_rdd.buildIndex(IndexType.RTREE, False) # Create a range query window (a bounding box) boundary = spatial_rdd.boundaryEnvelope # Create a query window within the boundary query_window = Polygon([ (25, 25), (75, 25), (75, 75), (25, 75), (25, 25) ]) query_point = Point(2.2, 2.2) # Perform range query query_result = RangeQuery.SpatialRangeQuery( spatial_rdd, query_point, True, True ) # Assertions result_count = query_result.count() assert result_count >= 0, f"Expected at least one result, got {result_count}" ``` It works with Point (commented out) but not not Polygon. --> ``` scala.MatchError: [0,3,2000000022,4800000060,284e4f47594c4f50,2033202c32203228,202c332033202c32,322032202c332032,2929,500000032,4000000000000000,4000000000000000,4008000000000000,4000000000000000,4008000000000000,4008000000000000,4000000000000000,4008000000000000,4000000000000000,4000000000000000,500000001] (of class org.apache.spark.sql.catalyst.expressions.UnsafeRow) at org.apache.sedona.python.wrapper.utils.implicits$GeometryEnhancer.userDataToUtf8ByteArray(implicits.scala:52) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
