This is an automated email from the ASF dual-hosted git repository. jiayu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push: new 3863962444 [GH-2149] Geopandas: Implement `to_file`, `from_file`, `read_file` (#2150) 3863962444 is described below commit 3863962444a6dcf1baaaa9d20af2c4494ae105c0 Author: Peter Nguyen <petern0...@gmail.com> AuthorDate: Fri Jul 25 18:27:53 2025 -0700 [GH-2149] Geopandas: Implement `to_file`, `from_file`, `read_file` (#2150) * Implement to_file, from_file, read_file * Fix ci * Delete __repr__()'s and _process_geometry_columnsi and PR feedback * Make format and extension case insensitive in read_file * Remove sort by GeoHash logic in to_file parquet --- python/sedona/geopandas/__init__.py | 2 + python/sedona/geopandas/geodataframe.py | 209 ++++++++++-------- python/sedona/geopandas/geoseries.py | 153 +++++++++---- python/sedona/geopandas/io.py | 238 ++++++++++++++++++++ python/tests/geopandas/test_geodataframe.py | 17 +- python/tests/geopandas/test_geopandas_base.py | 10 + python/tests/geopandas/test_io.py | 245 +++++++++++++++++++++ .../tests/geopandas/test_match_geopandas_series.py | 7 - 8 files changed, 736 insertions(+), 145 deletions(-) diff --git a/python/sedona/geopandas/__init__.py b/python/sedona/geopandas/__init__.py index aa93af1197..855f27d591 100644 --- a/python/sedona/geopandas/__init__.py +++ b/python/sedona/geopandas/__init__.py @@ -24,3 +24,5 @@ from sedona.geopandas.geoseries import GeoSeries from sedona.geopandas.geodataframe import GeoDataFrame from sedona.geopandas.tools import sjoin + +from sedona.geopandas.io import read_file diff --git a/python/sedona/geopandas/geodataframe.py b/python/sedona/geopandas/geodataframe.py index 6edf0bb741..49722e6085 100644 --- a/python/sedona/geopandas/geodataframe.py +++ b/python/sedona/geopandas/geodataframe.py @@ -32,6 +32,7 @@ from pyspark.pandas import Series as PandasOnSparkSeries from pyspark.pandas._typing import Dtype from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame from pyspark.pandas.internal import InternalFrame +from pyspark.pandas.utils import log_advice from sedona.geopandas._typing import Label from sedona.geopandas.base import GeoFrame @@ -351,10 +352,10 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame): try: result = sgpd.GeoSeries(ps_series) - first_idx = ps_series.first_valid_index() - if first_idx is not None: - geom = ps_series.iloc[int(first_idx)] - srid = shapely.get_srid(geom) + not_null = ps_series[ps_series.notnull()] + if len(not_null) > 0: + first_geom = not_null.iloc[0] + srid = shapely.get_srid(first_geom) # Shapely objects stored in the ps.Series retain their srid # but the GeoSeries does not, so we manually re-set it here @@ -425,7 +426,7 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame): # instead of calling e.g assert not dtype ourselves. # This way, if Spark adds support later, than we inherit those changes naturally super().__init__(data, index=index, columns=columns, dtype=dtype, copy=copy) - elif isinstance(data, PandasOnSparkDataFrame): + elif isinstance(data, (PandasOnSparkDataFrame, SparkDataFrame)): super().__init__(data, index=index, columns=columns, dtype=dtype, copy=copy) elif isinstance(data, PandasOnSparkSeries): @@ -436,14 +437,6 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame): pass super().__init__(data, index=index, columns=columns, dtype=dtype, copy=copy) - elif isinstance(data, SparkDataFrame): - assert columns is None - assert dtype is None - assert not copy - - if index is None: - internal = InternalFrame(spark_frame=data, index_spark_columns=None) - object.__setattr__(self, "_internal_frame", internal) else: # below are not distributed dataframe types if isinstance(data, pd.DataFrame): @@ -480,6 +473,9 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame): if crs is not None and data.crs != crs: raise ValueError(crs_mismatch_error) + if geometry: + self.set_geometry(geometry, inplace=True) + if geometry is None and "geometry" in self.columns: if (self.columns == "geometry").sum() > 1: @@ -828,55 +824,6 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame): """ return self._geometry_column_name - def _process_geometry_columns( - self, operation: str, rename_suffix: str = "", *args, **kwargs - ) -> GeoDataFrame: - """ - Helper method to process geometry columns with a specified operation. - - Parameters - ---------- - operation : str - The spatial operation to apply (e.g., 'ST_Area', 'ST_Buffer'). - rename_suffix : str, default "" - Suffix to append to the resulting column name. - args : tuple - Positional arguments for the operation. - kwargs : dict - Keyword arguments for the operation. - - Returns - ------- - GeoDataFrame - A new GeoDataFrame with the operation applied to geometry columns. - """ - select_expressions = [] - - for field in self._internal.spark_frame.schema.fields: - col_name = field.name - - # Skip index and order columns - if col_name in ("__index_level_0__", "__natural_order__"): - continue - - if field.dataType.typeName() in ("geometrytype", "binary"): - # Prepare arguments for the operation - positional_params = ", ".join([repr(v) for v in args]) - keyword_params = ", ".join([repr(v) for v in kwargs.values()]) - params = ", ".join(filter(None, [positional_params, keyword_params])) - - if field.dataType.typeName() == "binary": - expr = f"{operation}(ST_GeomFromWKB(`{col_name}`){', ' + params if params else ''}) as {col_name}{rename_suffix}" - else: - expr = f"{operation}(`{col_name}`{', ' + params if params else ''}) as {col_name}{rename_suffix}" - select_expressions.append(expr) - else: - # Keep non-geometry columns as they are - select_expressions.append(f"`{col_name}`") - - sdf = self._internal.spark_frame.selectExpr(*select_expressions) - return GeoDataFrame(sdf) - def to_geopandas(self) -> gpd.GeoDataFrame: """ Note: Unlike in pandas and geopandas, Sedona will always return a general Index. @@ -884,7 +831,6 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame): e.g pd.Index([0, 1, 2]) instead of pd.RangeIndex(start=0, stop=3, step=1) """ - from pyspark.pandas.utils import log_advice log_advice( "`to_geopandas` loads all data into the driver's memory. " @@ -1007,10 +953,6 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame): ) -> GeoDataFrame: raise NotImplementedError("from_dict() is not implemented yet.") - @classmethod - def from_file(cls, filename: os.PathLike | typing.IO, **kwargs) -> GeoDataFrame: - raise NotImplementedError("from_file() is not implemented yet.") - @classmethod def from_features( cls, features, crs: Any | None = None, columns: Iterable[str] | None = None @@ -1290,16 +1232,6 @@ es": {"name": "urn:ogc:def:crs:EPSG::3857"}}}' ): raise NotImplementedError("to_feather() is not implemented yet.") - def to_file( - self, - filename: str, - driver: str | None = None, - schema: dict | None = None, - index: bool | None = None, - **kwargs, - ): - raise NotImplementedError("to_file() is not implemented yet.") - @property def geom_type(self) -> str: # Implementation of the abstract method @@ -1552,9 +1484,9 @@ es": {"name": "urn:ogc:def:crs:EPSG::3857"}}}' mitre_limit=5.0, single_sided=False, **kwargs, - ) -> GeoDataFrame: + ) -> sgpd.GeoSeries: """ - Returns a GeoDataFrame with all geometries buffered by the specified distance. + Returns a GeoSeries with all geometries buffered by the specified distance. Parameters ---------- @@ -1573,8 +1505,8 @@ es": {"name": "urn:ogc:def:crs:EPSG::3857"}}}' Returns ------- - GeoDataFrame - A new GeoDataFrame with buffered geometries. + GeoSeries + A new GeoSeries with buffered geometries. Examples -------- @@ -1588,8 +1520,14 @@ es": {"name": "urn:ogc:def:crs:EPSG::3857"}}}' >>> gdf = GeoDataFrame(data) >>> buffered = gdf.buffer(0.5) """ - return self._process_geometry_columns( - "ST_Buffer", rename_suffix="_buffered", distance=distance + return self.geometry.buffer( + distance, + resolution=16, + cap_style="round", + join_style="round", + mitre_limit=5.0, + single_sided=False, + **kwargs, ) def sjoin( @@ -1666,18 +1604,117 @@ es": {"name": "urn:ogc:def:crs:EPSG::3857"}}}' # I/O OPERATIONS # ============================================================================ + @classmethod + def from_file( + cls, filename: str, format: str | None = None, **kwargs + ) -> GeoDataFrame: + """ + Alternate constructor to create a ``GeoDataFrame`` from a file. + + Parameters + ---------- + filename : str + File path or file handle to read from. If the path is a directory, + Sedona will read all files in the directory into a dataframe. + format : str, default None + The format of the file to read. If None, Sedona will infer the format + from the file extension. Note, inferring the format from the file extension + is not supported for directories. + Options: + - "shapefile" + - "geojson" + - "geopackage" + - "geoparquet" + + table_name : str, default None + The name of the table to read from a geopackage file. Required if format is geopackage. + + See also + -------- + GeoDataFrame.to_file : write GeoDataFrame to file + """ + return sgpd.io.read_file(filename, format, **kwargs) + + def to_file( + self, + path: str, + driver: str | None = None, + schema: dict | None = None, + index: bool | None = None, + **kwargs, + ): + """ + Write the ``GeoDataFrame`` to a file. + + Parameters + ---------- + path : string + File path or file handle to write to. + driver : string, default None + The format driver used to write the file. + If not specified, it attempts to infer it from the file extension. + If no extension is specified, Sedona will error. + Options: + - "geojson" + - "geopackage" + - "geoparquet" + schema : dict, default None + Not applicable to Sedona's implementation + index : bool, default None + If True, write index into one or more columns (for MultiIndex). + Default None writes the index into one or more columns only if + the index is named, is a MultiIndex, or has a non-integer data + type. If False, no index is written. + mode : string, default 'w' + The write mode, 'w' to overwrite the existing file and 'a' to append. + 'overwrite' and 'append' are equivalent to 'w' and 'a' respectively. + crs : pyproj.CRS, default None + If specified, the CRS is passed to Fiona to + better control how the file is written. If None, GeoPandas + will determine the crs based on crs df attribute. + The value can be anything accepted + by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`, + such as an authority string (eg "EPSG:4326") or a WKT string. + engine : str + Not applicable to Sedona's implementation + metadata : dict[str, str], default None + Optional metadata to be stored in the file. Keys and values must be + strings. Supported only for "GPKG" driver. Not supported by Sedona + **kwargs : + Keyword args to be passed to the engine, and can be used to write + to multi-layer data, store data within archives (zip files), etc. + In case of the "pyogrio" engine, the keyword arguments are passed to + `pyogrio.write_dataframe`. In case of the "fiona" engine, the keyword + arguments are passed to fiona.open`. For more information on possible + keywords, type: ``import pyogrio; help(pyogrio.write_dataframe)``. + + Examples + -------- + + >>> gdf = GeoDataFrame({"geometry": [Point(0, 0), LineString([(0, 0), (1, 1)])], "int": [1, 2]} + >>> gdf.to_file(filepath, format="geoparquet") + + With selected drivers you can also append to a file with `mode="a"`: + + >>> gdf.to_file(gdf, driver="geojson", mode="a") + + When the index is of non-integer dtype, index=None (default) is treated as True, writing the index to the file. + + >>> gdf = GeoDataFrame({"geometry": [Point(0, 0)]}, index=["a", "b"]) + >>> gdf.to_file(gdf, driver="geoparquet") + """ + sgpd.io._to_file(self, path, driver, index, **kwargs) + def to_parquet(self, path, **kwargs): """ Write the GeoSeries to a GeoParquet file. - Parameters: - path: str The file path where the GeoParquet file will be written. - kwargs: Any Additional arguments to pass to the Sedona DataFrame output function. """ - # Use the Spark DataFrame's write method to write to GeoParquet format - self._internal.spark_frame.write.format("geoparquet").save(path, **kwargs) + self.to_file(path, driver="geoparquet", **kwargs) # ----------------------------------------------------------------------------- diff --git a/python/sedona/geopandas/geoseries.py b/python/sedona/geopandas/geoseries.py index 7bfaef582a..ac114a9be1 100644 --- a/python/sedona/geopandas/geoseries.py +++ b/python/sedona/geopandas/geoseries.py @@ -20,6 +20,7 @@ import typing from typing import Any, Union, Literal, List import geopandas as gpd +import sedona.geopandas as sgpd import pandas as pd import pyspark.pandas as pspd import pyspark @@ -325,13 +326,6 @@ class GeoSeries(GeoFrame, pspd.Series): def __getitem__(self, key: Any) -> Any: return pspd.Series.__getitem__(self, key) - def __repr__(self) -> str: - """ - Return a string representation of the GeoSeries in WKT format. - """ - gpd_series = self.to_geopandas() - return gpd_series.__repr__() - def __init__( self, data=None, @@ -3372,22 +3366,6 @@ class GeoSeries(GeoFrame, pspd.Series): return self._query_geometry_column(spark_expr) - def to_parquet(self, path, **kwargs): - """ - Write the GeoSeries to a GeoParquet file. - - Parameters: - - path: str - The file path where the GeoParquet file will be written. - - kwargs: Any - Additional arguments to pass to the Sedona DataFrame output function. - """ - - result = self._query_geometry_column(self.spark.column) - - # Use the Spark DataFrame's write method to write to GeoParquet format - result._internal.spark_frame.write.format("geoparquet").save(path, **kwargs) - def sjoin( self, other, @@ -3550,14 +3528,35 @@ class GeoSeries(GeoFrame, pspd.Series): @classmethod def from_file( - cls, filename: Union[os.PathLike, typing.IO], **kwargs + cls, filename: str, format: Union[str, None] = None, **kwargs ) -> "GeoSeries": - raise NotImplementedError( - _not_implemented_error( - "from_file", - "Creates GeoSeries from geometry files (shapefile, GeoJSON, etc.).", - ) - ) + """ + Alternate constructor to create a ``GeoDataFrame`` from a file. + + Parameters + ---------- + filename : str + File path or file handle to read from. If the path is a directory, + Sedona will read all files in the directory into a dataframe. + format : str, default None + The format of the file to read. If None, Sedona will infer the format + from the file extension. Note, inferring the format from the file extension + is not supported for directories. + Options: + - "shapefile" + - "geojson" + - "geopackage" + - "geoparquet" + + table_name : str, default None + The name of the table to read from a geopackage file. Required if format is geopackage. + + See also + -------- + GeoDataFrame.to_file : write GeoDataFrame to file + """ + df = sgpd.io.read_file(filename, format, **kwargs) + return GeoSeries(df.geometry, crs=df.crs) @classmethod def from_wkb( @@ -3908,15 +3907,6 @@ class GeoSeries(GeoFrame, pspd.Series): name=kwargs.get("name", None), ) - def to_file( - self, - filename: Union[os.PathLike, typing.IO], - driver: Union[str, None] = None, - index: Union[bool, None] = None, - **kwargs, - ): - raise NotImplementedError("GeoSeries.to_file() is not implemented yet.") - # ============================================================================ # DATA ACCESS AND MANIPULATION # ============================================================================ @@ -4689,6 +4679,87 @@ e": "Feature", "properties": {}, "geometry": {"type": "Point", "coordinates": [3 ) ) + def to_file( + self, + path: str, + driver: Union[str, None] = None, + schema: Union[dict, None] = None, + index: Union[bool, None] = None, + **kwargs, + ): + """ + Write the ``GeoSeries`` to a file. + + Parameters + ---------- + path : string + File path or file handle to write to. + driver : string, default None + The format driver used to write the file. + If not specified, it attempts to infer it from the file extension. + If no extension is specified, Sedona will error. + Options: + - "geojson" + - "geopackage" + - "geoparquet" + schema : dict, default None + Not applicable to Sedona's implementation + index : bool, default None + If True, write index into one or more columns (for MultiIndex). + Default None writes the index into one or more columns only if + the index is named, is a MultiIndex, or has a non-integer data + type. If False, no index is written. + mode : string, default 'w' + The write mode, 'w' to overwrite the existing file and 'a' to append. + 'overwrite' and 'append' are equivalent to 'w' and 'a' respectively. + crs : pyproj.CRS, default None + If specified, the CRS is passed to Fiona to + better control how the file is written. If None, GeoPandas + will determine the crs based on crs df attribute. + The value can be anything accepted + by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`, + such as an authority string (eg "EPSG:4326") or a WKT string. + engine : str + Not applicable to Sedona's implementation + metadata : dict[str, str], default None + Optional metadata to be stored in the file. Keys and values must be + strings. Supported only for "GPKG" driver. Not supported by Sedona + **kwargs : + Keyword args to be passed to the engine, and can be used to write + to multi-layer data, store data within archives (zip files), etc. + In case of the "pyogrio" engine, the keyword arguments are passed to + `pyogrio.write_dataframe`. In case of the "fiona" engine, the keyword + arguments are passed to fiona.open`. For more information on possible + keywords, type: ``import pyogrio; help(pyogrio.write_dataframe)``. + + Examples + -------- + + >>> gdf = GeoDataFrame({"geometry": [Point(0, 0), LineString([(0, 0), (1, 1)])], "int": [1, 2]} + >>> gdf.to_file(filepath, format="geoparquet") + + With selected drivers you can also append to a file with `mode="a"`: + + >>> gdf.to_file(gdf, driver="geojson", mode="a") + + When the index is of non-integer dtype, index=None (default) is treated as True, writing the index to the file. + + >>> gdf = GeoDataFrame({"geometry": [Point(0, 0)]}, index=["a", "b"]) + >>> gdf.to_file(gdf, driver="geoparquet") + """ + self._to_geoframe().to_file(path, driver, index=index, **kwargs) + + def to_parquet(self, path, **kwargs): + """ + Write the GeoSeries to a GeoParquet file. + Parameters: + - path: str + The file path where the GeoParquet file will be written. + - kwargs: Any + Additional arguments to pass to the Sedona DataFrame output function. + """ + self._to_geoframe().to_file(path, driver="geoparquet", **kwargs) + # ----------------------------------------------------------------------------- # # Utils # ----------------------------------------------------------------------------- @@ -4723,7 +4794,9 @@ e": "Feature", "properties": {}, "geometry": {"type": "Point", "coordinates": [3 renamed = self.rename("geometry") else: renamed = self - return GeoDataFrame(pspd.DataFrame(renamed._internal)) + + # to_spark() is important here to ensure that the spark column names are set to the pandas column ones + return GeoDataFrame(pspd.DataFrame(renamed._internal).to_spark()) # ----------------------------------------------------------------------------- diff --git a/python/sedona/geopandas/io.py b/python/sedona/geopandas/io.py new file mode 100644 index 0000000000..3b4d8bdf02 --- /dev/null +++ b/python/sedona/geopandas/io.py @@ -0,0 +1,238 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +from typing import Union +import warnings +import pyspark.pandas as ps +from sedona.geopandas import GeoDataFrame +from pyspark.pandas.utils import default_session, scol_for +from pyspark.pandas.internal import SPARK_DEFAULT_INDEX_NAME, NATURAL_ORDER_COLUMN_NAME +from pyspark.pandas.frame import InternalFrame +from pyspark.pandas.utils import validate_mode, log_advice +from pandas.api.types import is_integer_dtype + + +def _to_file( + df: GeoDataFrame, + path: str, + driver: Union[str, None] = None, + index: Union[bool, None] = True, + **kwargs, +): + """ + Write the ``GeoDataFrame`` to a file. + + Parameters + ---------- + path : string + File path or file handle to write to. + driver : string, default None + The format driver used to write the file. + If not specified, it attempts to infer it from the file extension. + If no extension is specified, Sedona will error. + Options: + - "geojson" + - "geopackage" + - "geoparquet" + schema : dict, default None + Not applicable to Sedona's implementation + index : bool, default None + If True, write index into one or more columns (for MultiIndex). + Default None writes the index into one or more columns only if + the index is named, is a MultiIndex, or has a non-integer data + type. If False, no index is written. + mode : string, default 'w' + The write mode, 'w' to overwrite the existing file and 'a' to append. + 'overwrite' and 'append' are equivalent to 'w' and 'a' respectively. + crs : pyproj.CRS, default None + If specified, the CRS is passed to Fiona to + better control how the file is written. If None, GeoPandas + will determine the crs based on crs df attribute. + The value can be anything accepted + by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`, + such as an authority string (eg "EPSG:4326") or a WKT string. + engine : str + Not applicable to Sedona's implementation + metadata : dict[str, str], default None + Optional metadata to be stored in the file. Keys and values must be + strings. Supported only for "GPKG" driver. Not supported by Sedona + **kwargs : + Keyword args to be passed to the engine, and can be used to write + to multi-layer data, store data within archives (zip files), etc. + In case of the "pyogrio" engine, the keyword arguments are passed to + `pyogrio.write_dataframe`. In case of the "fiona" engine, the keyword + arguments are passed to fiona.open`. For more information on possible + keywords, type: ``import pyogrio; help(pyogrio.write_dataframe)``. + + Examples + -------- + + >>> gdf = GeoDataFrame({"geometry": [Point(0, 0), LineString([(0, 0), (1, 1)])], "int": [1, 2]} + >>> gdf.to_file(filepath, format="geoparquet") + + With selected drivers you can also append to a file with `mode="a"`: + + >>> gdf.to_file(gdf, driver="geojson", mode="a") + + When the index is of non-integer dtype, index=None (default) is treated as True, writing the index to the file. + + >>> gdf = GeoDataFrame({"geometry": [Point(0, 0)]}, index=["a", "b"]) + >>> gdf.to_file(gdf, driver="geoparquet") + """ + + ext_to_driver = { + ".parquet": "Parquet", + ".json": "GeoJSON", + ".geojson": "GeoJSON", + } + + # auto detect driver from filename if not provided + if driver is None: + _, extension = os.path.splitext(path) + if extension not in ext_to_driver: + raise ValueError(f"Unsupported file extension: {extension}") + driver = ext_to_driver[extension] + + spark_fmt = driver.lower() + + crs = kwargs.pop("crs", None) + if crs: + from pyproj import CRS + + crs = CRS.from_user_input(crs) + + spark_df = df._internal.spark_frame.drop(NATURAL_ORDER_COLUMN_NAME) + + if index is None: + # Determine if index attribute(s) should be saved to file + # (only if they are named or are non-integer) + index = list(df.index.names) != [None] or not is_integer_dtype(df.index.dtype) + + if not index: + log_advice( + "If index is not True is not specified for `to_file`, " + "the existing index is lost when writing to a file." + ) + spark_df = spark_df.drop(SPARK_DEFAULT_INDEX_NAME) + + if spark_fmt == "geoparquet": + writer = spark_df.write.format("geoparquet") + + elif spark_fmt == "geojson": + writer = spark_df.write.format("geojson") + + else: + raise ValueError(f"Unsupported spark format: {spark_fmt}") + + default_mode = "overwrite" + mode = validate_mode(kwargs.pop("mode", default_mode)) + + writer.mode(mode).save(path, **kwargs) + + +def read_file(filename: str, format: Union[str, None] = None, **kwargs): + """ + Alternate constructor to create a ``GeoDataFrame`` from a file. + + Parameters + ---------- + filename : str + File path or file handle to read from. If the path is a directory, + Sedona will read all files in the directory into a dataframe. + format : str, default None + The format of the file to read. If None, Sedona will infer the format + from the file extension. Note, inferring the format from the file extension + is not supported for directories. + Options: + - "shapefile" + - "geojson" + - "geopackage" + - "geoparquet" + + table_name : str, default None + The name of the table to read from a geopackage file. Required if format is geopackage. + + See also + -------- + GeoDataFrame.to_file : write GeoDataFrame to file + """ + + # We warn the user if they try to use arguments that geopandas supports but not Sedona + if kwargs: + warnings.warn(f"The given arguments are not supported in Sedona: {kwargs}") + + spark = default_session() + + # If format is not specified, infer it from the file extension + if format is None: + if os.path.isdir(filename): + raise ValueError( + f"Inferring the format from the file extension is not supported for directories: {filename}" + ) + if filename.lower().endswith(".shp"): + format = "shapefile" + elif filename.lower().endswith(".json"): + format = "geojson" + elif filename.lower().endswith(".parquet"): + format = "geoparquet" + elif filename.lower().endswith(".gpkg"): + format = "geopackage" + else: + raise ValueError(f"Unsupported file type: {filename}") + else: + format = format.lower() + + if format == "shapefile": + sdf = spark.read.format("shapefile").load(filename) + return GeoDataFrame(sdf) + elif format == "geojson": + sdf = ( + spark.read.format("geojson") + .option("multiLine", "true") + .load(filename) + .select( + "geometry", f"properties.*" + ) # select all non-geometry columns (which are under properties) + ) + # geojson also has a 'type' field, but we ignore it + + elif format == "geopackage": + table_name = kwargs.get("table_name", None) + if not table_name: + raise ValueError("table_name is required for geopackage") + sdf = ( + spark.read.format("geopackage") + .option("tableName", table_name) + .load(filename) + ) + + elif format == "geoparquet": + sdf = spark.read.format("geoparquet").load(filename) + + else: + raise NotImplementedError(f"Unsupported file type: {filename}") + + index_spark_columns = [] + + # If index was retained, we sort by it so the dataframe has the same order as the original one + if SPARK_DEFAULT_INDEX_NAME in sdf.columns: + sdf = sdf.orderBy(SPARK_DEFAULT_INDEX_NAME) + index_spark_columns = [scol_for(sdf, SPARK_DEFAULT_INDEX_NAME)] + + internal = InternalFrame(spark_frame=sdf, index_spark_columns=index_spark_columns) + return GeoDataFrame(ps.DataFrame(internal)) diff --git a/python/tests/geopandas/test_geodataframe.py b/python/tests/geopandas/test_geodataframe.py index cceaa25af9..c962c316f2 100644 --- a/python/tests/geopandas/test_geodataframe.py +++ b/python/tests/geopandas/test_geodataframe.py @@ -334,28 +334,21 @@ class TestDataframe(TestGeopandasBase): square = Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]) data = {"geometry1": [point, square], "id": [1, 2], "value": ["a", "b"]} - df = GeoDataFrame(data) + df = GeoDataFrame(data, geometry="geometry1") # Apply buffer with distance 0.5 - buffer_df = df.buffer(0.5) + result = df.buffer(0.5) # Verify result is a GeoDataFrame - assert type(buffer_df) is GeoDataFrame - - # Verify the original columns are preserved - assert "geometry1_buffered" in buffer_df.columns - assert "id" in buffer_df.columns - assert "value" in buffer_df.columns + assert type(result) is GeoSeries # Convert to pandas to extract individual geometries - pandas_df = buffer_df._internal.spark_frame.select( - "geometry1_buffered" - ).toPandas() + pd_series = result.to_pandas() # Calculate areas to verify buffer was applied correctly # Point buffer with radius 0.5 should have area approximately π * 0.5² ≈ 0.785 # Square buffer with radius 0.5 should expand the 1x1 square to 2x2 square with rounded corners - areas = [geom.area for geom in pandas_df["geometry1_buffered"]] + areas = [geom.area for geom in pd_series] # Check that square buffer area is greater than original (1.0) assert areas[1] > 1.0 diff --git a/python/tests/geopandas/test_geopandas_base.py b/python/tests/geopandas/test_geopandas_base.py index 2d2fe22c50..1496a9a24e 100644 --- a/python/tests/geopandas/test_geopandas_base.py +++ b/python/tests/geopandas/test_geopandas_base.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +from typing import Union from tests.test_base import TestBase from sedona.geopandas import GeoDataFrame, GeoSeries import pyspark.sql @@ -25,6 +26,7 @@ from pandas.testing import assert_series_equal from contextlib import contextmanager from shapely.geometry import GeometryCollection from shapely.geometry.base import BaseGeometry +from pandas.testing import assert_index_equal class TestGeopandasBase(TestBase): @@ -66,6 +68,8 @@ class TestGeopandasBase(TestBase): a, e, tolerance=1e-2 ) # increased tolerance from 1e-6 + assert_index_equal(actual.index.to_pandas(), expected.index) + @classmethod def check_sgpd_df_equals_gpd_df( cls, actual: GeoDataFrame, expected: gpd.GeoDataFrame @@ -99,6 +103,12 @@ class TestGeopandasBase(TestBase): assert isinstance(expected, pd.Series), "expected series is not a pd.Series" assert_series_equal(actual.to_pandas(), expected) + @classmethod + def check_index_equal( + cls, actual: Union[ps.DataFrame, ps.Series], expected: ps.Index + ): + assert_index_equal(actual.index, expected) + @classmethod def contains_any_geom_collection(cls, geoms) -> bool: return any(isinstance(g, GeometryCollection) for g in geoms) diff --git a/python/tests/geopandas/test_io.py b/python/tests/geopandas/test_io.py new file mode 100644 index 0000000000..5d431e4d12 --- /dev/null +++ b/python/tests/geopandas/test_io.py @@ -0,0 +1,245 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import tempfile +import pytest +import shapely +import pandas as pd +import geopandas as gpd +import pyspark.pandas as ps +from functools import partial +from sedona.geopandas import GeoDataFrame, GeoSeries, read_file +from tests import tests_resource +from tests.geopandas.test_geopandas_base import TestGeopandasBase +from shapely.geometry import ( + Point, + Polygon, + MultiPoint, + MultiLineString, + LineString, + MultiPolygon, + GeometryCollection, + LinearRing, +) +from packaging.version import parse as parse_version + +TEST_DATA_DIR = os.path.join("..", "spark", "common", "src", "test", "resources") + + +@pytest.mark.skipif( + parse_version(shapely.__version__) < parse_version("2.0.0"), + reason=f"Tests require shapely>=2.0.0, but found v{shapely.__version__}", +) +class TestIO(TestGeopandasBase): + def setup_method(self): + self.tempdir = tempfile.mkdtemp() + + ######################################################### + # File reading tests + ######################################################### + + # Modified version of Sedona's test_shapefile.py test_read_simple + @pytest.mark.parametrize( + "read_func", + [ + partial(GeoDataFrame.from_file, format="shapefile"), + partial(read_file, format="Shapefile"), + ], + ) + def test_read_shapefile(self, read_func): + data_dir = os.path.join(tests_resource, "shapefiles/polygon") + + df = read_func(data_dir) + + assert df.count().item() == 10000 + + subset_df = GeoDataFrame(df.head(100)) + # assert only one column + assert subset_df.shape[1] == 1 + + # assert all geometries are polygons or multipolygons + assert subset_df["geometry"].geom_type.isin(["Polygon", "MultiPolygon"]).all() + + # Check inference and single file works + data_file = os.path.join(data_dir, "map.shp") + df = read_func(data_file) + + assert df.count().item() == 10000 + + @pytest.mark.parametrize( + "read_func", + [ + partial(GeoDataFrame.from_file, format="geojson"), + partial(read_file, format="GeoJSON"), + ], + ) + def test_read_geojson(self, read_func): + datafile = os.path.join(TEST_DATA_DIR, "geojson/test1.json") + df = read_func(datafile) + assert (df.count() == 1).all() + + # Check that inference works + df = read_func(datafile) + assert (df.count() == 1).all() + + @pytest.mark.parametrize( + "read_func", + [ + partial(GeoDataFrame.from_file, format="geoparquet"), + partial(read_file, format="GeoParquet"), + ], + ) + def test_read_geoparquet(self, read_func): + input_location = os.path.join(TEST_DATA_DIR, "geoparquet/example1.parquet") + df = read_func(input_location) + # check that all column counts are 5 + assert (df.count() == 5).all() + + # Check that inference works + df = read_func(input_location) + assert (df.count() == 5).all() + + # From Sedona's GeoPackageReaderTest.scala + @pytest.mark.parametrize( + "read_func", + [ + partial(GeoDataFrame.from_file, format="geopackage"), + partial(read_file, format="GeoPackage"), + ], + ) + def test_read_geopackage(self, read_func): + datafile = os.path.join(TEST_DATA_DIR, "geopackage/features.gpkg") + + table_name = "GB_Hex_5km_GS_CompressibleGround_v8" + expected_cnt = 4233 + df = read_func(datafile, table_name=table_name) + assert df["geom"].count() == expected_cnt + + # Ensure inference works + table_name = "GB_Hex_5km_GS_Landslides_v8" + expected_cnt = 4228 + df = read_func(datafile, table_name=table_name) + assert df["geom"].count() == expected_cnt + + ######################################################### + # File writing tests + ######################################################### + + def _get_next_temp_file_path(self, ext: str): + temp_file_path = os.path.join( + self.tempdir, next(tempfile._get_candidate_names()) + "." + ext + ) + return temp_file_path + + @pytest.mark.parametrize( + "write_func", + [ + partial(GeoDataFrame.to_file, driver="GeoParquet"), + partial(GeoDataFrame.to_file, driver="geoparquet"), + GeoDataFrame.to_parquet, + ], + ) + def test_to_geoparquet(self, write_func): + sgpd_df = GeoDataFrame( + {"geometry": [Point(0, 0), LineString([(0, 0), (1, 1)])], "int": [1, 2]} + ) + + temp_file_path = self._get_next_temp_file_path("parquet") + + self._apply_func(sgpd_df, write_func, temp_file_path) + + # Ensure reading from geopandas creates the same resulting GeoDataFrame + gpd_df = gpd.read_parquet(temp_file_path) + self.check_sgpd_df_equals_gpd_df(sgpd_df, gpd_df) + + @pytest.mark.parametrize( + "write_func", + [ + partial(GeoDataFrame.to_file, driver="geojson"), # index=None here is False + partial(GeoDataFrame.to_file, driver="GeoJSON", index=True), + partial(GeoDataFrame.to_file, driver="geojson", index=True), + ], + ) + def test_to_geojson(self, write_func): + sgpd_df = GeoDataFrame( + {"geometry": [Point(0, 0), LineString([(0, 0), (1, 1)])], "int": [1, 2]}, + index=[1, 2], + ) + temp_file_path = self._get_next_temp_file_path("json") + self._apply_func(sgpd_df, write_func, temp_file_path) + + read_result = GeoDataFrame.from_file( + temp_file_path, format="geojson" + ).to_geopandas() + + # if index was true, the contents should be in the same order as the original GeoDataFrame + if write_func.keywords.get("index", None) == True: + self.check_sgpd_df_equals_gpd_df(sgpd_df, read_result) + else: + # if index was not kept, just check we have all rows and we have default index + self.check_index_equal(read_result, pd.Index([0, 1])) + + @pytest.mark.parametrize( + "write_func", + [ + partial(GeoDataFrame.to_file, driver="geojson"), + ], + ) + def test_to_file_non_int_index(self, write_func): + sgpd_df = GeoDataFrame( + {"geometry": [Point(0, 0), LineString([(0, 0), (1, 1)])], "int": [1, 2]}, + index=["a", "b"], + ) + temp_file_path = self._get_next_temp_file_path("json") + self._apply_func(sgpd_df, write_func, temp_file_path) + + read_result = GeoDataFrame.from_file( + temp_file_path, format="geojson" + ).to_geopandas() + + # Since index was of non-int dtype, index=None here is True + self.check_sgpd_df_equals_gpd_df(sgpd_df, read_result) + + @pytest.mark.parametrize( + "format", + [ + "geojson", + "geoparquet", + ], + ) + def test_to_file_and_from_file_series(self, format): + sgpd_ser = GeoSeries([Point(0, 0), LineString([(0, 0), (1, 1)])]) + ext = format.replace("geo", "") + temp_file_path = self._get_next_temp_file_path(ext) + + sgpd_ser.to_file(temp_file_path, driver=format, index=True) + + read_result = GeoSeries.from_file(temp_file_path, format=format) + read_result = read_result.to_geopandas() + + # Since index=True, the contents should be in the same order as the original GeoSeries + self.check_sgpd_equals_gpd(sgpd_ser, read_result) + + def _apply_func(self, obj, func, *args): + """ + Helper function to conditionally apply functions or methods to an object correctly. + """ + if type(func) == str: + return getattr(obj, func)(*args) + else: + return func(obj, *args) diff --git a/python/tests/geopandas/test_match_geopandas_series.py b/python/tests/geopandas/test_match_geopandas_series.py index 85e38a9468..703753cf3f 100644 --- a/python/tests/geopandas/test_match_geopandas_series.py +++ b/python/tests/geopandas/test_match_geopandas_series.py @@ -207,13 +207,6 @@ class TestMatchGeopandasSeries(TestGeopandasBase): assert type(area) is ps.Series assert area.count() == 2 - def test_buffer_then_geoparquet(self): - temp_file_path = os.path.join( - self.tempdir, next(tempfile._get_candidate_names()) + ".parquet" - ) - self.g1.buffer(0.2).to_parquet(temp_file_path) - assert os.path.exists(temp_file_path) - def test_simplify(self): for geom in self.geoms: if isinstance(geom[0], LinearRing):