This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new e97f3a20c6 [SEDONA-720] Add GeoPandas Compatible API on Sedona - 
framework (#1843)
e97f3a20c6 is described below

commit e97f3a20c638019aa13cfe45bb70b23eb31e0537
Author: Feng Zhang <[email protected]>
AuthorDate: Fri Mar 14 09:29:43 2025 -0700

    [SEDONA-720] Add GeoPandas Compatible API on Sedona - framework (#1843)
    
    * [SEDONA-720] Add GeoPandas Compatible API on Sedona - framework
    
    * temporarily disable lower python versions on ci
    
    * fix python tests and revert ci python pipeline
    
    * fix lint issue
    
    * remove numpy dtypes import
    
    * fix TestDataframe
    
    * add more functions to geodataframe implementation
    
    * remove schema print and show
    
    * Update version to 1.8.0
    
    * Remove show and printSchema
    
    ---------
    
    Co-authored-by: Jia Yu <[email protected]>
---
 python/sedona/geopandas/__init__.py         |  26 ++
 python/sedona/geopandas/_typing.py          |  51 +++
 python/sedona/geopandas/base.py             | 325 ++++++++++++++
 python/sedona/geopandas/geodataframe.py     | 672 ++++++++++++++++++++++++++++
 python/sedona/geopandas/geoindex.py         |  28 ++
 python/sedona/geopandas/geoseries.py        | 615 +++++++++++++++++++++++++
 python/sedona/geopandas/internal.py         |  33 ++
 python/sedona/geopandas/tools/__init__.py   |  22 +
 python/sedona/geopandas/tools/sjoin.py      | 183 ++++++++
 python/tests/geopandas/__init__.py          |  16 +
 python/tests/geopandas/test_geodataframe.py | 187 ++++++++
 python/tests/geopandas/test_geoseries.py    | 114 +++++
 python/tests/geopandas/test_sjoin.py        |  53 +++
 13 files changed, 2325 insertions(+)

diff --git a/python/sedona/geopandas/__init__.py 
b/python/sedona/geopandas/__init__.py
new file mode 100644
index 0000000000..f56b1699c9
--- /dev/null
+++ b/python/sedona/geopandas/__init__.py
@@ -0,0 +1,26 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+"""
+.. versionadded:: 1.8.0
+    geopandas API on Sedona
+"""
+
+from sedona.geopandas.geoseries import GeoSeries
+from sedona.geopandas.geodataframe import GeoDataFrame
+
+from sedona.geopandas.tools import sjoin
diff --git a/python/sedona/geopandas/_typing.py 
b/python/sedona/geopandas/_typing.py
new file mode 100644
index 0000000000..2a83e127ce
--- /dev/null
+++ b/python/sedona/geopandas/_typing.py
@@ -0,0 +1,51 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import datetime
+import decimal
+from typing import Any, Tuple, TypeVar, Union
+
+import numpy as np
+from pandas.api.extensions import ExtensionDtype
+
+# TypeVars
+T = TypeVar("T")
+
+GeoFrameLike = TypeVar("GeoFrameLike", bound="GeoFrame")
+GeoIndexOpsLike = TypeVar("GeoIndexOpsLike", bound="GeoIndexOpsMixin")
+
+# Type aliases
+Scalar = Union[
+    int,
+    float,
+    bool,
+    str,
+    bytes,
+    decimal.Decimal,
+    datetime.date,
+    datetime.datetime,
+    None,
+]
+
+Label = Tuple[Any, ...]
+Name = Union[Any, Label]
+
+Axis = Union[int, str]
+Dtype = Union[np.dtype, ExtensionDtype]
+
+DataFrameOrSeries = Union["GeoDataFrame", "GeoSeries"]
+SeriesOrIndex = Union["GeoSeries", "GeoIndex"]
diff --git a/python/sedona/geopandas/base.py b/python/sedona/geopandas/base.py
new file mode 100644
index 0000000000..c7cbc39ca3
--- /dev/null
+++ b/python/sedona/geopandas/base.py
@@ -0,0 +1,325 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+"""
+A base class of Sedona/Spark DataFrame/Column to behave like geopandas 
GeoDataFrame/GeoSeries.
+"""
+from abc import ABCMeta, abstractmethod
+from typing import (
+    Any,
+    Callable,
+    Optional,
+    Union,
+)
+
+import geopandas as gpd
+import pandas as pd
+from pyspark.pandas._typing import (
+    Axis,
+    Dtype,
+    Scalar,
+)
+from pyspark.sql import Column
+
+from sedona.geopandas._typing import GeoFrameLike
+
+bool_type = bool
+
+
+class GeoFrame(object, metaclass=ABCMeta):
+    """
+    A base class for both GeoDataFrame and GeoSeries.
+    """
+
+    @abstractmethod
+    def __getitem__(self, key: Any) -> Any:
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def _reduce_for_geostat_function(
+        self,
+        sfun: Callable[["GeoSeries"], Column],
+        name: str,
+        axis: Optional[Axis] = None,
+        numeric_only: bool = True,
+        skipna: bool = True,
+        **kwargs: Any,
+    ) -> Union["GeoSeries", Scalar]:
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def dtypes(self) -> Union[gpd.GeoSeries, pd.Series, Dtype]:
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def to_geopandas(self) -> Union[gpd.GeoDataFrame, pd.Series]:
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def _to_geopandas(self) -> Union[gpd.GeoDataFrame, pd.Series]:
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def geoindex(self) -> "GeoIndex":
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def copy(self: GeoFrameLike) -> GeoFrameLike:
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def area(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def crs(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @crs.setter
+    @abstractmethod
+    def crs(self, value):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def geom_type(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def type(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def length(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def is_valid(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def is_valid_reason(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def is_empty(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def count_coordinates(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def count_geometries(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def count_interior_rings(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def is_simple(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def is_ring(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def is_ccw(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def is_closed(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def has_z(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def get_precision(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def get_geometry(self, index):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def boundary(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def centroid(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def concave_hull(self, ratio=0.0, allow_holes=False):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def convex_hull(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def delaunay_triangles(self, tolerance=0.0, only_edges=False):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def voronoi_polygons(self, tolerance=0.0, extend_to=None, 
only_edges=False):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def envelope(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def minimum_rotated_rectangle(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def exterior(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def extract_unique_points(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def offset_curve(self, distance, quad_segs=8, join_style="round", 
mitre_limit=5.0):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def interiors(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def remove_repeated_points(self, tolerance=0.0):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def set_precision(self, grid_size, mode="valid_output"):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def representative_point(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def minimum_bounding_circle(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def minimum_bounding_radius(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def minimum_clearance(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def normalize(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def make_valid(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def reverse(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def segmentize(self, max_segment_length):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def transform(self, transformation, include_z=False):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def force_2d(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def force_3d(self, z=0):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def line_merge(self, directed=False):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    @abstractmethod
+    def unary_union(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def union_all(self, method="unary", grid_size=None):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def intersection_all(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def contains(self, other, align=None):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def contains_properly(self, other, align=None):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def to_parquet(self, path, **kwargs):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def buffer(
+        self,
+        distance,
+        resolution=16,
+        cap_style="round",
+        join_style="round",
+        mitre_limit=5.0,
+        single_sided=False,
+        **kwargs,
+    ):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @abstractmethod
+    def sjoin(self, other, predicate="intersects", **kwargs):
+        raise NotImplementedError("This method is not implemented yet.")
diff --git a/python/sedona/geopandas/geodataframe.py 
b/python/sedona/geopandas/geodataframe.py
new file mode 100644
index 0000000000..bdef237c1f
--- /dev/null
+++ b/python/sedona/geopandas/geodataframe.py
@@ -0,0 +1,672 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+from __future__ import annotations
+
+from typing import Any, Callable, Optional, Union
+
+from pyspark.sql import Column
+
+import pandas as pd
+import geopandas as gpd
+import pyspark.pandas as pspd
+
+from sedona.geopandas.base import GeoFrame
+from sedona.geopandas._typing import GeoFrameLike, Label
+from pyspark.pandas._typing import Axis, Dtype, Scalar
+from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
+from pyspark.pandas import Series as PandasOnSparkSeries
+
+
+class GeoDataFrame(GeoFrame, pspd.DataFrame):
+    """
+    A class representing a GeoDataFrame, inheriting from GeoFrame and 
pyspark.pandas.DataFrame.
+    """
+
+    def __getitem__(self, key: Any) -> Any:
+        """
+        Get item from GeoDataFrame by key.
+
+        Parameters
+        ----------
+        key : str, list, slice, ndarray or Series
+            - If key is a string, returns a Series for that column
+            - If key is a list of strings, returns a new GeoDataFrame with 
selected columns
+            - If key is a slice or array, returns rows in the GeoDataFrame
+
+        Returns
+        -------
+        Any
+            Series, GeoDataFrame, or other objects depending on the key type.
+
+        Examples
+        --------
+        >>> from shapely.geometry import Point
+        >>> from sedona.geopandas import GeoDataFrame
+        >>>
+        >>> data = {'geometry': [Point(0, 0), Point(1, 1)], 'value': [1, 2]}
+        >>> gdf = GeoDataFrame(data)
+        >>> gdf['value']
+        0    1
+        1    2
+        Name: value, dtype: int64
+        """
+        from sedona.geopandas import GeoSeries
+
+        # Handle column access by name
+        if isinstance(key, str):
+            # Access column directly from the spark DataFrame
+            column_name = key
+
+            # Check if column exists
+            if column_name not in self.columns:
+                raise KeyError(f"Column '{column_name}' does not exist")
+
+            # Get column data from spark_frame
+            spark_df = self._internal.spark_frame.select(column_name)
+            pandas_df = spark_df.toPandas()
+
+            # Check if this is a geometry column
+            field = next(
+                (f for f in self._internal.spark_frame.schema.fields if f.name 
== key),
+                None,
+            )
+
+            if field and (
+                field.dataType.typeName() == "geometrytype"
+                or field.dataType.typeName() == "binary"
+            ):
+                # Return as GeoSeries for geometry columns
+                return GeoSeries(pandas_df[column_name])
+            else:
+                # Return as regular pandas Series for non-geometry columns
+                from pyspark.pandas import Series
+
+                return Series(pandas_df[column_name])
+
+        # Handle list of column names
+        elif isinstance(key, list) and all(isinstance(k, str) for k in key):
+            # Check if all columns exist
+            missing_cols = [k for k in key if k not in self.columns]
+            if missing_cols:
+                raise KeyError(f"Columns {missing_cols} do not exist")
+
+            # Select columns from the spark DataFrame
+            spark_df = self._internal.spark_frame.select(*key)
+            pandas_df = spark_df.toPandas()
+
+            # Return as GeoDataFrame
+            return GeoDataFrame(pandas_df)
+
+        # Handle row selection via slice or boolean indexing
+        else:
+            # For now, convert to pandas first for row-based operations
+            # This could be optimized later for better performance
+            pandas_df = self._internal.spark_frame.toPandas()
+            selected_rows = pandas_df[key]
+            return GeoDataFrame(selected_rows)
+
+    def __init__(
+        self,
+        data=None,
+        index=None,
+        columns=None,
+        dtype=None,
+        copy=False,
+        geometry: Any | None = None,
+        crs: Any | None = None,
+        **kwargs,
+    ):
+        assert data is not None
+
+        self._anchor: GeoDataFrame
+        self._col_label: Label
+
+        from sedona.geopandas import GeoSeries
+
+        if isinstance(
+            data, (GeoDataFrame, GeoSeries, PandasOnSparkSeries, 
PandasOnSparkDataFrame)
+        ):
+            assert dtype is None
+            assert not copy
+
+            self._anchor = data
+            self._col_label = index
+        else:
+            if isinstance(data, pd.DataFrame):
+                assert index is None
+                assert dtype is None
+                assert not copy
+                df = data
+            else:
+                df = pd.DataFrame(
+                    data=data,
+                    index=index,
+                    dtype=dtype,
+                    copy=copy,
+                )
+            gdf = gpd.GeoDataFrame(df)
+            # convert each geometry column to wkb type
+            for col in gdf.columns:
+                if isinstance(gdf[col], gpd.GeoSeries):
+                    gdf[col] = gdf[col].apply(lambda geom: geom.wkb)
+            pdf = pd.DataFrame(gdf)
+            # initialize the parent class pyspark Dataframe with the pandas 
Series
+            super().__init__(
+                data=pdf,
+                index=index,
+                dtype=dtype,
+                copy=copy,
+            )
+
+    def _reduce_for_geostat_function(
+        self,
+        sfun: Callable[["GeoSeries"], Column],
+        name: str,
+        axis: Optional[Axis] = None,
+        numeric_only: bool = True,
+        skipna: bool = True,
+        **kwargs: Any,
+    ) -> Union["GeoSeries", Scalar]:
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def dtypes(self) -> Union[gpd.GeoSeries, pd.Series, Dtype]:
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def to_geopandas(self) -> Union[gpd.GeoDataFrame, pd.Series]:
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def _to_geopandas(self) -> Union[gpd.GeoDataFrame, pd.Series]:
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def geoindex(self) -> "GeoIndex":
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def copy(self, deep=False):
+        """
+        Make a copy of this GeoDataFrame object.
+
+        Parameters:
+        - deep: bool, default False
+            If True, a deep copy of the data is made. Otherwise, a shallow 
copy is made.
+
+        Returns:
+        - GeoDataFrame: A copy of this GeoDataFrame object.
+
+        Examples:
+        >>> from shapely.geometry import Point
+        >>> import geopandas as gpd
+        >>> from sedona.geopandas import GeoDataFrame
+
+        >>> gdf = GeoDataFrame([{"geometry": Point(1, 1), "value1": 2, 
"value2": 3}])
+        >>> gdf_copy = gdf.copy()
+        >>> print(gdf_copy)
+           geometry  value1  value2
+        0  POINT (1 1)       2       3
+        """
+        if deep:
+            return GeoDataFrame(
+                self._anchor.copy(), dtype=self.dtypes, index=self._col_label
+            )
+        else:
+            return self
+
+    @property
+    def area(self) -> "GeoDataFrame":
+        """
+        Returns a GeoDataFrame containing the area of each geometry expressed 
in the units of the CRS.
+
+        Returns
+        -------
+        GeoDataFrame
+            A GeoDataFrame with the areas of the geometries.
+
+        Examples
+        --------
+        >>> from shapely.geometry import Polygon
+        >>> from sedona.geopandas import GeoDataFrame
+        >>>
+        >>> data = {
+        ...     'geometry': [Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]), 
Polygon([(0, 0), (2, 0), (2, 2), (0, 2)])],
+        ...     'value': [1, 2]
+        ... }
+        >>> gdf = GeoDataFrame(data)
+        >>> gdf.area
+           geometry_area  value
+        0           1.0      1
+        1           4.0      2
+        """
+        # Create a list of all column expressions for the new dataframe
+        select_expressions = []
+
+        # Process geometry columns to calculate areas
+        for field in self._internal.spark_frame.schema.fields:
+            col_name = field.name
+
+            # Skip index column to avoid duplication
+            if col_name == "__index_level_0__" or col_name == 
"__natural_order__":
+                continue
+
+            if (
+                field.dataType.typeName() == "geometrytype"
+                or field.dataType.typeName() == "binary"
+            ):
+                # Calculate the area for each geometry column
+                if field.dataType.typeName() == "binary":
+                    area_expr = (
+                        f"ST_Area(ST_GeomFromWKB(`{col_name}`)) as 
{col_name}_area"
+                    )
+                else:
+                    area_expr = f"ST_Area(`{col_name}`) as {col_name}_area"
+                select_expressions.append(area_expr)
+            else:
+                # Keep non-geometry columns as they are
+                select_expressions.append(f"`{col_name}`")
+
+        # Execute the query to get all data in one go
+        result_df = self._internal.spark_frame.selectExpr(*select_expressions)
+
+        # Convert to pandas DataFrame
+        pandas_df = result_df.toPandas()
+
+        # Create a new GeoDataFrame with the result
+        # Note: This avoids the need to manipulate the index columns separately
+        return GeoDataFrame(pandas_df)
+
+    @property
+    def crs(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @crs.setter
+    def crs(self, value):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def geom_type(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def type(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def length(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def is_valid(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def is_valid_reason(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def is_empty(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def count_coordinates(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def count_geometries(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def count_interior_rings(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def is_simple(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def is_ring(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def is_ccw(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def is_closed(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def has_z(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def get_precision(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def get_geometry(self, index):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def boundary(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def centroid(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def concave_hull(self, ratio=0.0, allow_holes=False):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def convex_hull(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def delaunay_triangles(self, tolerance=0.0, only_edges=False):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def voronoi_polygons(self, tolerance=0.0, extend_to=None, 
only_edges=False):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def envelope(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def minimum_rotated_rectangle(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def exterior(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def extract_unique_points(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def offset_curve(self, distance, quad_segs=8, join_style="round", 
mitre_limit=5.0):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def interiors(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def remove_repeated_points(self, tolerance=0.0):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def set_precision(self, grid_size, mode="valid_output"):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def representative_point(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def minimum_bounding_circle(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def minimum_bounding_radius(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def minimum_clearance(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def normalize(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def make_valid(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def reverse(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def segmentize(self, max_segment_length):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def transform(self, transformation, include_z=False):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def force_2d(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def force_3d(self, z=0):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def line_merge(self, directed=False):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def unary_union(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def union_all(self, method="unary", grid_size=None):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def intersection_all(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def contains(self, other, align=None):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def contains_properly(self, other, align=None):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def buffer(
+        self,
+        distance,
+        resolution=16,
+        cap_style="round",
+        join_style="round",
+        mitre_limit=5.0,
+        single_sided=False,
+        **kwargs,
+    ) -> "GeoDataFrame":
+        """
+        Returns a GeoDataFrame with all geometries buffered by the specified 
distance.
+
+        Parameters
+        ----------
+        distance : float
+            The distance to buffer by. Negative distances will create inward 
buffers.
+        resolution : int, default 16
+            The number of segments used to approximate curves.
+        cap_style : str, default "round"
+            The style of the buffer cap. One of 'round', 'flat', 'square'.
+        join_style : str, default "round"
+            The style of the buffer join. One of 'round', 'mitre', 'bevel'.
+        mitre_limit : float, default 5.0
+            The mitre limit ratio for joins when join_style='mitre'.
+        single_sided : bool, default False
+            Whether to create a single-sided buffer.
+
+        Returns
+        -------
+        GeoDataFrame
+            A new GeoDataFrame with buffered geometries.
+
+        Examples
+        --------
+        >>> from shapely.geometry import Point
+        >>> from sedona.geopandas import GeoDataFrame
+        >>>
+        >>> data = {
+        ...     'geometry': [Point(0, 0), Point(1, 1)],
+        ...     'value': [1, 2]
+        ... }
+        >>> gdf = GeoDataFrame(data)
+        >>> buffered = gdf.buffer(0.5)
+        """
+        # Create a list of all column expressions for the new dataframe
+        select_expressions = []
+
+        # Process each field in the schema
+        for field in self._internal.spark_frame.schema.fields:
+            col_name = field.name
+
+            # Skip index and order columns
+            if col_name == "__index_level_0__" or col_name == 
"__natural_order__":
+                continue
+
+            # Apply buffer to geometry columns
+            if (
+                field.dataType.typeName() == "geometrytype"
+                or field.dataType.typeName() == "binary"
+            ):
+
+                if field.dataType.typeName() == "binary":
+                    # For binary geometry columns (WKB)
+                    buffer_expr = f"ST_Buffer(ST_GeomFromWKB(`{col_name}`), 
{distance}) as {col_name}"
+                else:
+                    # For native geometry columns
+                    buffer_expr = f"ST_Buffer(`{col_name}`, {distance}) as 
{col_name}"
+                select_expressions.append(buffer_expr)
+            else:
+                # Keep non-geometry columns as they are
+                select_expressions.append(f"`{col_name}`")
+
+        # Execute the query to get all data in one go
+        result_df = self._internal.spark_frame.selectExpr(*select_expressions)
+
+        # Convert to pandas DataFrame and create a new GeoDataFrame
+        pandas_df = result_df.toPandas()
+        return GeoDataFrame(pandas_df)
+
+    def sjoin(
+        self,
+        other,
+        how="inner",
+        predicate="intersects",
+        lsuffix="left",
+        rsuffix="right",
+        distance=None,
+        on_attribute=None,
+        **kwargs,
+    ):
+        """
+        Spatial join of two GeoDataFrames.
+
+        Parameters
+        ----------
+        other : GeoDataFrame
+            The right GeoDataFrame to join with.
+        how : str, default 'inner'
+            The type of join:
+            * 'left': use keys from left_df; retain only left_df geometry 
column
+            * 'right': use keys from right_df; retain only right_df geometry 
column
+            * 'inner': use intersection of keys from both dfs; retain only
+              left_df geometry column
+        predicate : str, default 'intersects'
+            Binary predicate. Valid values: 'intersects', 'contains', 
'within', 'dwithin'
+        lsuffix : str, default 'left'
+            Suffix to apply to overlapping column names (left GeoDataFrame).
+        rsuffix : str, default 'right'
+            Suffix to apply to overlapping column names (right GeoDataFrame).
+        distance : float, optional
+            Distance for 'dwithin' predicate. Required if predicate='dwithin'.
+        on_attribute : str, list or tuple, optional
+            Column name(s) to join on as an additional join restriction.
+            These must be found in both DataFrames.
+
+        Returns
+        -------
+        GeoDataFrame
+            A GeoDataFrame with the results of the spatial join.
+
+        Examples
+        --------
+        >>> from shapely.geometry import Point, Polygon
+        >>> from sedona.geopandas import GeoDataFrame
+        >>>
+        >>> polygons = GeoDataFrame({
+        ...     'geometry': [Polygon([(0, 0), (0, 1), (1, 1), (1, 0)])],
+        ...     'value': [1]
+        ... })
+        >>> points = GeoDataFrame({
+        ...     'geometry': [Point(0.5, 0.5), Point(2, 2)],
+        ...     'value': [1, 2]
+        ... })
+        >>> joined = points.sjoin(polygons)
+        """
+        from sedona.geopandas.tools.sjoin import sjoin as sjoin_tool
+
+        return sjoin_tool(
+            self,
+            other,
+            how=how,
+            predicate=predicate,
+            lsuffix=lsuffix,
+            rsuffix=rsuffix,
+            distance=distance,
+            on_attribute=on_attribute,
+            **kwargs,
+        )
+
+    def to_parquet(self, path, **kwargs):
+        """
+        Write the GeoSeries to a GeoParquet file.
+
+        Parameters:
+        - path: str
+            The file path where the GeoParquet file will be written.
+        - kwargs: Any
+            Additional arguments to pass to the Sedona DataFrame output 
function.
+        """
+        # Use the Spark DataFrame's write method to write to GeoParquet format
+        self._internal.spark_frame.write.format("geoparquet").save(path, 
**kwargs)
diff --git a/python/sedona/geopandas/geoindex.py 
b/python/sedona/geopandas/geoindex.py
new file mode 100644
index 0000000000..60c6991e83
--- /dev/null
+++ b/python/sedona/geopandas/geoindex.py
@@ -0,0 +1,28 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+
+class GeoIndex:
+    """
+    A placeholder class for GeoIndex.
+    """
+
+    def __init__(self):
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def some_method(self):
+        raise NotImplementedError("This method is not implemented yet.")
diff --git a/python/sedona/geopandas/geoseries.py 
b/python/sedona/geopandas/geoseries.py
new file mode 100644
index 0000000000..67f65c71e4
--- /dev/null
+++ b/python/sedona/geopandas/geoseries.py
@@ -0,0 +1,615 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from typing import Any, Callable, Optional, Union
+
+from pyspark.pandas._typing import Axis, Dtype, Scalar
+from pyspark.pandas.internal import InternalFrame
+from pyspark.pandas.series import first_series
+from pyspark.pandas.utils import scol_for
+from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
+from pyspark.pandas import Series as PandasOnSparkSeries
+from pyspark.sql import Column
+
+import pandas as pd
+import geopandas as gpd
+import pyspark.pandas as pspd
+from pyspark.sql.types import BinaryType
+
+from sedona.geopandas.geodataframe import GeoDataFrame
+from sedona.geopandas.base import GeoFrame
+from sedona.geopandas._typing import Label
+from sedona.geopandas.geoindex import GeoIndex
+
+
+class GeoSeries(GeoFrame, pspd.Series):
+    """
+    A class representing a GeoSeries, inheriting from GeoFrame and 
pyspark.pandas.DataFrame.
+    """
+
+    def __getitem__(self, key: Any) -> Any:
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def __init__(
+        self,
+        data=None,
+        index=None,
+        dtype=None,
+        name=None,
+        copy=False,
+        fastpath=False,
+        crs=None,
+        **kwargs,
+    ):
+        """
+        Initialize a GeoSeries object.
+
+        Parameters:
+        - data: The input data for the GeoSeries. It can be a GeoDataFrame, 
GeoSeries, or pandas Series.
+        - index: The index for the GeoSeries.
+        - crs: Coordinate Reference System for the GeoSeries.
+        - dtype: Data type for the GeoSeries.
+        - name: Name of the GeoSeries.
+        - copy: Whether to copy the input data.
+        - fastpath: Internal parameter for fast initialization.
+
+        Examples:
+        >>> from shapely.geometry import Point
+        >>> import geopandas as gpd
+        >>> from sedona.geopandas import GeoSeries
+
+        # Example 1: Initialize with GeoDataFrame
+        >>> gdf = gpd.GeoDataFrame({'geometry': [Point(1, 1), Point(2, 2)]})
+        >>> gs = GeoSeries(data=gdf)
+        >>> print(gs)
+        0    POINT (1 1)
+        1    POINT (2 2)
+        Name: geometry, dtype: geometry
+
+        # Example 2: Initialize with GeoSeries
+        >>> gseries = gpd.GeoSeries([Point(1, 1), Point(2, 2)])
+        >>> gs = GeoSeries(data=gseries)
+        >>> print(gs)
+        0    POINT (1 1)
+        1    POINT (2 2)
+        dtype: geometry
+
+        # Example 3: Initialize with pandas Series
+        >>> pseries = pd.Series([Point(1, 1), Point(2, 2)])
+        >>> gs = GeoSeries(data=pseries)
+        >>> print(gs)
+        0    POINT (1 1)
+        1    POINT (2 2)
+        dtype: geometry
+        """
+        assert data is not None
+
+        self._anchor: GeoDataFrame
+        self._col_label: Label
+
+        if isinstance(
+            data, (GeoDataFrame, GeoSeries, PandasOnSparkSeries, 
PandasOnSparkDataFrame)
+        ):
+            assert dtype is None
+            assert name is None
+            assert not copy
+            assert not fastpath
+
+            self._anchor = data
+            self._col_label = index
+        else:
+            if isinstance(data, pd.Series):
+                assert index is None
+                assert dtype is None
+                assert name is None
+                assert not copy
+                assert not fastpath
+                s = data
+            else:
+                s = pd.Series(
+                    data=data,
+                    index=index,
+                    dtype=dtype,
+                    name=name,
+                    copy=copy,
+                    fastpath=fastpath,
+                )
+            gs = gpd.GeoSeries(s)
+            pdf = pd.Series(gs.apply(lambda geom: geom.wkb))
+            # initialize the parent class pyspark Series with the pandas Series
+            super().__init__(
+                data=pdf,
+                index=index,
+                dtype=dtype,
+                name=name,
+                copy=copy,
+                fastpath=fastpath,
+            )
+
+    def _reduce_for_geostat_function(
+        self,
+        sfun: Callable[["GeoSeries"], Column],
+        name: str,
+        axis: Optional[Axis] = None,
+        numeric_only: bool = True,
+        skipna: bool = True,
+        **kwargs: Any,
+    ) -> Union["GeoSeries", Scalar]:
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def dtypes(self) -> Union[gpd.GeoSeries, pd.Series, Dtype]:
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def to_geopandas(self) -> Union[gpd.GeoDataFrame, pd.Series]:
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def _to_geopandas(self) -> Union[gpd.GeoDataFrame, pd.Series]:
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def geometry(self) -> "GeoSeries":
+        return self
+
+    @property
+    def geoindex(self) -> "GeoIndex":
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def copy(self, deep=False):
+        """
+        Make a copy of this GeoSeries object.
+
+        Parameters:
+        - deep: bool, default False
+            If True, a deep copy of the data is made. Otherwise, a shallow 
copy is made.
+
+        Returns:
+        - GeoSeries: A copy of this GeoSeries object.
+
+        Examples:
+        >>> from shapely.geometry import Point
+        >>> import geopandas as gpd
+        >>> from sedona.geopandas import GeoSeries
+
+        >>> gs = GeoSeries([Point(1, 1), Point(2, 2)])
+        >>> gs_copy = gs.copy()
+        >>> print(gs_copy)
+        0    POINT (1 1)
+        1    POINT (2 2)
+        dtype: geometry
+        """
+        if deep:
+            return GeoSeries(
+                self._anchor.copy(), dtype=self.dtype, index=self._col_label
+            )
+        else:
+            return self
+
+    @property
+    def area(self) -> "GeoSeries":
+        """
+        Returns a Series containing the area of each geometry in the GeoSeries 
expressed in the units of the CRS.
+
+        Returns
+        -------
+        Series
+            A Series containing the area of each geometry.
+
+        Examples
+        --------
+        >>> from shapely.geometry import Polygon
+        >>> import geopandas as gpd
+        >>> from sedona.geopandas import GeoSeries
+
+        >>> gs = GeoSeries([Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]), 
Polygon([(0, 0), (2, 0), (2, 2), (0, 2)])])
+        >>> gs.area
+        0    1.0
+        1    4.0
+        dtype: float64
+        """
+
+        # Find the first column with BinaryType or GeometryType
+        first_col = self.get_first_geometry_column()
+
+        if self.get_first_geometry_column():
+            data_type = self._internal.spark_frame.schema[first_col].dataType
+            if isinstance(data_type, BinaryType):
+                sql_expr = f"ST_Area(ST_GeomFromWKB(`{first_col}`)) as area"
+            else:
+                sql_expr = f"ST_Area(`{first_col}`) as area"
+
+        sdf = self._internal.spark_frame.selectExpr(sql_expr)
+        internal = InternalFrame(
+            spark_frame=sdf,
+            index_spark_columns=None,
+            column_labels=[self._column_label],
+            data_spark_columns=[scol_for(sdf, "area")],
+            data_fields=[self._internal.data_fields[0]],
+            column_label_names=self._internal.column_label_names,
+        )
+        return _to_geo_series(first_series(PandasOnSparkDataFrame(internal)))
+
+    @property
+    def crs(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @crs.setter
+    def crs(self, value):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def geom_type(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def type(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def length(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def is_valid(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def is_valid_reason(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def is_empty(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def count_coordinates(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def count_geometries(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def count_interior_rings(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def is_simple(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def is_ring(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def is_ccw(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def is_closed(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def has_z(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def get_precision(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def get_geometry(self, index):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def boundary(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def centroid(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def concave_hull(self, ratio=0.0, allow_holes=False):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def convex_hull(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def delaunay_triangles(self, tolerance=0.0, only_edges=False):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def voronoi_polygons(self, tolerance=0.0, extend_to=None, 
only_edges=False):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def envelope(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def minimum_rotated_rectangle(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def exterior(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def extract_unique_points(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def offset_curve(self, distance, quad_segs=8, join_style="round", 
mitre_limit=5.0):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def interiors(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def remove_repeated_points(self, tolerance=0.0):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def set_precision(self, grid_size, mode="valid_output"):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def representative_point(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def minimum_bounding_circle(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def minimum_bounding_radius(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def minimum_clearance(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def normalize(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def make_valid(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def reverse(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def segmentize(self, max_segment_length):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def transform(self, transformation, include_z=False):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def force_2d(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def force_3d(self, z=0):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def line_merge(self, directed=False):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    @property
+    def unary_union(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def union_all(self, method="unary", grid_size=None):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def intersection_all(self):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def contains(self, other, align=None):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def contains_properly(self, other, align=None):
+        # Implementation of the abstract method
+        raise NotImplementedError("This method is not implemented yet.")
+
+    def buffer(
+        self,
+        distance,
+        resolution=16,
+        cap_style="round",
+        join_style="round",
+        mitre_limit=5.0,
+        single_sided=False,
+        **kwargs,
+    ):
+        """
+        Returns a GeoSeries of geometries representing all points within a 
given distance of each geometric object.
+
+        Parameters
+        ----------
+        distance : float
+            The distance to buffer around each geometry.
+        resolution : int, optional, default 16
+            The resolution of the buffer around each geometry.
+        cap_style : str, optional, default "round"
+            The style of the buffer's cap (round, flat, or square).
+        join_style : str, optional, default "round"
+            The style of the buffer's join (round, mitre, or bevel).
+        mitre_limit : float, optional, default 5.0
+            The mitre limit for the buffer's join style.
+        single_sided : bool, optional, default False
+            Whether to create a single-sided buffer.
+
+        Returns
+        -------
+        GeoSeries
+            A GeoSeries of buffered geometries.
+        """
+        # Find the first column with BinaryType or GeometryType
+        first_col = self.get_first_geometry_column()
+
+        if self.get_first_geometry_column():
+            data_type = self._internal.spark_frame.schema[first_col].dataType
+            if isinstance(data_type, BinaryType):
+                sql_expr = (
+                    f"ST_Buffer(ST_GeomFromWKB(`{first_col}`), {distance}) as 
buffer"
+                )
+            else:
+                sql_expr = f"ST_Buffer(`{first_col}`, {distance}) as buffer"
+
+        sdf = self._internal.spark_frame.selectExpr(sql_expr)
+        internal = InternalFrame(
+            spark_frame=sdf,
+            index_spark_columns=None,
+            column_labels=[self._column_label],
+            data_spark_columns=[scol_for(sdf, "buffer")],
+            data_fields=[self._internal.data_fields[0]],
+            column_label_names=self._internal.column_label_names,
+        )
+        return _to_geo_series(first_series(PandasOnSparkDataFrame(internal)))
+
+    def to_parquet(self, path, **kwargs):
+        """
+        Write the GeoSeries to a GeoParquet file.
+
+        Parameters:
+        - path: str
+            The file path where the GeoParquet file will be written.
+        - kwargs: Any
+            Additional arguments to pass to the Sedona DataFrame output 
function.
+        """
+        # Use the Spark DataFrame's write method to write to GeoParquet format
+        self._internal.spark_frame.write.format("geoparquet").save(path, 
**kwargs)
+
+    def sjoin(
+        self,
+        other,
+        how="inner",
+        predicate="intersects",
+        lsuffix="left",
+        rsuffix="right",
+        distance=None,
+        on_attribute=None,
+        **kwargs,
+    ):
+        """
+        Perform a spatial join between two GeoSeries.
+        Parameters:
+        - other: GeoSeries
+        - how: str, default 'inner'
+            The type of join to perform.
+        - predicate: str, default 'intersects'
+            The spatial predicate to use for the join.
+        - lsuffix: str, default 'left'
+            Suffix to apply to the left GeoSeries' column names.
+        - rsuffix: str, default 'right'
+            Suffix to apply to the right GeoSeries' column names.
+        - distance: float, optional
+            The distance threshold for the join.
+        - on_attribute: str, optional
+            The attribute to join on.
+        - kwargs: Any
+            Additional arguments to pass to the join function.
+        Returns:
+        - GeoSeries
+        """
+        from sedona.geopandas import sjoin
+
+        # Implementation of the abstract method
+        return sjoin(
+            self,
+            other,
+            how,
+            predicate,
+            lsuffix,
+            rsuffix,
+            distance,
+            on_attribute,
+            **kwargs,
+        )
+
+    # 
-----------------------------------------------------------------------------
+    # # Utils
+    # 
-----------------------------------------------------------------------------
+
+    def get_first_geometry_column(self):
+        first_binary_or_geometry_col = next(
+            (
+                field.name
+                for field in self._internal.spark_frame.schema.fields
+                if isinstance(field.dataType, BinaryType)
+                or field.dataType.typeName() == "geometrytype"
+            ),
+            None,
+        )
+        return first_binary_or_geometry_col
+
+
+# -----------------------------------------------------------------------------
+# # Utils
+# -----------------------------------------------------------------------------
+
+
+def _to_geo_series(df: PandasOnSparkSeries) -> GeoSeries:
+    """
+    Get the first Series from the DataFrame.
+
+    Parameters:
+    - df: The input DataFrame.
+
+    Returns:
+    - GeoSeries: The first Series from the DataFrame.
+    """
+    return GeoSeries(data=df)
diff --git a/python/sedona/geopandas/internal.py 
b/python/sedona/geopandas/internal.py
new file mode 100644
index 0000000000..e1d4e3fd47
--- /dev/null
+++ b/python/sedona/geopandas/internal.py
@@ -0,0 +1,33 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import pandas as pd
+from pyspark._typing import F
+from pyspark.pandas.internal import InternalFrame as InternalPySparkFrame
+
+
+class InternalGeoFrame(InternalPySparkFrame):
+
+    @staticmethod
+    def from_pandas(pdf: pd.DataFrame) -> "InternalGeoFrame":
+        internal_frame = InternalPySparkFrame.from_pandas(pdf)
+        sdf = internal_frame.spark_frame.withColumn("geometry", F.lit(None))
+        return InternalGeoFrame(
+            spark_frame=sdf,
+            index_spark_columns=internal_frame.index_spark_columns,
+            data_spark_columns=internal_frame.data_spark_columns,
+        )
diff --git a/python/sedona/geopandas/tools/__init__.py 
b/python/sedona/geopandas/tools/__init__.py
new file mode 100644
index 0000000000..f097cc879b
--- /dev/null
+++ b/python/sedona/geopandas/tools/__init__.py
@@ -0,0 +1,22 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from .sjoin import sjoin
+
+__all__ = [
+    "sjoin",
+]
diff --git a/python/sedona/geopandas/tools/sjoin.py 
b/python/sedona/geopandas/tools/sjoin.py
new file mode 100644
index 0000000000..6cc6bbce6e
--- /dev/null
+++ b/python/sedona/geopandas/tools/sjoin.py
@@ -0,0 +1,183 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+from pyspark.pandas.internal import InternalFrame
+from pyspark.pandas.series import first_series
+from pyspark.pandas.utils import scol_for
+from pyspark.sql.functions import expr
+
+from sedona.geopandas import GeoDataFrame, GeoSeries
+from sedona.geopandas.geoseries import _to_geo_series
+from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
+
+
+def _frame_join(left_df, right_df):
+    """Join the GeoDataFrames at the DataFrame level.
+
+    Parameters
+    ----------
+    left_df : GeoDataFrame
+    right_df : GeoDataFrame
+
+    Returns
+    -------
+    GeoDataFrame
+        Joined GeoDataFrame.
+
+    TODO: Implement this function with more details and parameters.
+    """
+    # Get the internal Spark DataFrames
+    left_sdf = left_df._internal.spark_frame
+    right_sdf = right_df._internal.spark_frame
+
+    # Convert WKB to geometry
+    left_geo_df = left_sdf.selectExpr("ST_GeomFromWKB(`0`) as l_geometry")
+    right_geo_df = right_sdf.selectExpr("ST_GeomFromWKB(`0`) as r_geometry")
+
+    # Perform Spatial Join using ST_Intersects
+    spatial_join_df = left_geo_df.alias("l").join(
+        right_geo_df.alias("r"), expr("ST_Intersects(l_geometry, r_geometry)")
+    )
+
+    # Use the provided code template to create an InternalFrame and return a 
GeoSeries
+    internal = InternalFrame(
+        spark_frame=spatial_join_df,
+        index_spark_columns=None,
+        column_labels=[left_df._col_label],
+        data_spark_columns=[scol_for(spatial_join_df, "l_geometry")],
+        data_fields=[left_df._internal.data_fields[0]],
+        column_label_names=left_df._internal.column_label_names,
+    )
+    return _to_geo_series(first_series(PandasOnSparkDataFrame(internal)))
+
+
+def sjoin(
+    left_df,
+    right_df,
+    how="inner",
+    predicate="intersects",
+    lsuffix="left",
+    rsuffix="right",
+    distance=None,
+    on_attribute=None,
+    **kwargs,
+):
+    """Spatial join of two GeoDataFrames.
+
+    See the User Guide page :doc:`../../user_guide/mergingdata` for details.
+
+
+    Parameters
+    ----------
+    left_df, right_df : GeoDataFrames
+    how : string, default 'inner'
+        The type of join:
+
+        * 'left': use keys from left_df; retain only left_df geometry column
+        * 'right': use keys from right_df; retain only right_df geometry column
+        * 'inner': use intersection of keys from both dfs; retain only
+          left_df geometry column
+    predicate : string, default 'intersects'
+        Binary predicate. Valid values are determined by the spatial index 
used.
+        You can check the valid values in left_df or right_df as
+        ``left_df.sindex.valid_query_predicates`` or
+        ``right_df.sindex.valid_query_predicates``
+        Replaces deprecated ``op`` parameter.
+    lsuffix : string, default 'left'
+        Suffix to apply to overlapping column names (left GeoDataFrame).
+    rsuffix : string, default 'right'
+        Suffix to apply to overlapping column names (right GeoDataFrame).
+    distance : number or array_like, optional
+        Distance(s) around each input geometry within which to query the tree
+        for the 'dwithin' predicate. If array_like, must be
+        one-dimesional with length equal to length of left GeoDataFrame.
+        Required if ``predicate='dwithin'``.
+    on_attribute : string, list or tuple
+        Column name(s) to join on as an additional join restriction on top
+        of the spatial predicate. These must be found in both DataFrames.
+        If set, observations are joined only if the predicate applies
+        and values in specified columns match.
+
+    Examples
+    --------
+    >>> groceries_w_communities = geopandas.sjoin(groceries, chicago)
+    >>> groceries_w_communities.head()  # doctest: +SKIP
+       OBJECTID       community                           geometry
+    0        16          UPTOWN  MULTIPOINT ((-87.65661 41.97321))
+    1        18     MORGAN PARK  MULTIPOINT ((-87.68136 41.69713))
+    2        22  NEAR WEST SIDE  MULTIPOINT ((-87.63918 41.86847))
+    3        23  NEAR WEST SIDE  MULTIPOINT ((-87.65495 41.87783))
+    4        27         CHATHAM  MULTIPOINT ((-87.62715 41.73623))
+    [5 rows x 95 columns]
+
+    Notes
+    -----
+    Every operation in GeoPandas is planar, i.e. the potential third
+    dimension is not taken into account.
+    """
+    if kwargs:
+        first = next(iter(kwargs.keys()))
+        raise TypeError(f"sjoin() got an unexpected keyword argument 
'{first}'")
+
+    on_attribute = _maybe_make_list(on_attribute)
+
+    _basic_checks(left_df, right_df, how, lsuffix, rsuffix, 
on_attribute=on_attribute)
+
+    joined = _frame_join(
+        left_df,
+        right_df,
+    )
+
+    return joined
+
+
+def _maybe_make_list(obj):
+    if isinstance(obj, tuple):
+        return list(obj)
+    if obj is not None and not isinstance(obj, list):
+        return [obj]
+    return obj
+
+
+def _basic_checks(left_df, right_df, how, lsuffix, rsuffix, on_attribute=None):
+    """Checks the validity of join input parameters.
+
+    `how` must be one of the valid options.
+    `'index_'` concatenated with `lsuffix` or `rsuffix` must not already
+    exist as columns in the left or right data frames.
+
+    Parameters
+    ------------
+    left_df : GeoDataFrame
+    right_df : GeoData Frame
+    how : str, one of 'left', 'right', 'inner'
+        join type
+    lsuffix : str
+        left index suffix
+    rsuffix : str
+        right index suffix
+    on_attribute : list, default None
+        list of column names to merge on along with geometry
+    """
+    if not isinstance(left_df, GeoSeries):
+        raise ValueError(f"'left_df' should be GeoSeries, got {type(left_df)}")
+
+    if not isinstance(right_df, GeoSeries):
+        raise ValueError(f"'right_df' should be GeoSeries, got 
{type(right_df)}")
+
+    allowed_hows = ["inner"]
+    if how not in allowed_hows:
+        raise ValueError(f'`how` was "{how}" but is expected to be in 
{allowed_hows}')
diff --git a/python/tests/geopandas/__init__.py 
b/python/tests/geopandas/__init__.py
new file mode 100644
index 0000000000..a67d5ea255
--- /dev/null
+++ b/python/tests/geopandas/__init__.py
@@ -0,0 +1,16 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
diff --git a/python/tests/geopandas/test_geodataframe.py 
b/python/tests/geopandas/test_geodataframe.py
new file mode 100644
index 0000000000..502b521526
--- /dev/null
+++ b/python/tests/geopandas/test_geodataframe.py
@@ -0,0 +1,187 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+import shutil
+import tempfile
+
+from shapely.geometry import (
+    Point,
+)
+
+from sedona.geopandas import GeoDataFrame
+from tests.test_base import TestBase
+import pyspark.pandas as ps
+
+
+class TestDataframe(TestBase):
+    # def setup_method(self):
+    #     N = 10
+    #     self.tempdir = tempfile.mkdtemp()
+    #     self.crs = "epsg:4326"
+    #     self.df = GeoDataFrame(
+    #         [
+    #             {"geometry": Point(x, y), "value1": x + y, "value2": x * y}
+    #             for x, y in zip(range(N), range(N))
+    #         ],
+    #         crs=self.crs,
+    #     )
+    #
+    # def teardown_method(self):
+    #     shutil.rmtree(self.tempdir)
+
+    def test_constructor(self):
+        df = GeoDataFrame([Point(x, x) for x in range(3)])
+        check_geodataframe(df)
+
+    def test_psdf(self):
+        # this is to make sure the spark session works with pandas on spark api
+        psdf = ps.DataFrame(
+            {
+                "a": [1, 2, 3, 4, 5, 6],
+                "b": [100, 200, 300, 400, 500, 600],
+                "c": ["one", "two", "three", "four", "five", "six"],
+            },
+            index=[10, 20, 30, 40, 50, 60],
+        )
+        assert psdf.count().count() is 3
+
+    def test_type_single_geometry_column(self):
+        # Create a GeoDataFrame with a single geometry column and additional 
attributes
+        points = [Point(x, x) for x in range(3)]
+        data = {"geometry1": points, "id": [1, 2, 3], "value": ["a", "b", "c"]}
+
+        df = GeoDataFrame(data)
+
+        # Verify the GeoDataFrame type
+        assert type(df) is GeoDataFrame
+
+        # Check the underlying Spark DataFrame schema
+        schema = df._internal.spark_frame.schema
+
+        # Assert the geometry column has the correct type and is not nullable
+        geometry_field = schema["geometry1"]
+        assert geometry_field.dataType.typeName() == "geometrytype"
+        assert not geometry_field.nullable
+
+        # Assert non-geometry columns are present with correct types
+        assert schema["id"].dataType.typeName().startswith("long")
+        assert schema["value"].dataType.typeName().startswith("string")
+
+        # Verify number of columns
+        assert len(schema.fields) == 5
+
+    def test_type_multiple_geometry_columns(self):
+        # Create points for two geometry columns
+        points1 = [Point(x, x) for x in range(3)]
+        points2 = [Point(x + 5, x + 5) for x in range(3)]
+
+        # Create a dictionary with two geometry columns
+        data = {"geometry1": points1, "geometry2": points2, "attribute": [1, 
2, 3]}
+
+        df = GeoDataFrame(data)
+        assert type(df) is GeoDataFrame
+
+        schema = df._internal.spark_frame.schema
+        # Assert both geometry columns have the correct type
+        geometry_field1 = schema["geometry1"]
+        assert geometry_field1.dataType.typeName() == "geometrytype"
+        assert not geometry_field1.nullable
+
+        geometry_field2 = schema["geometry2"]
+        assert geometry_field2.dataType.typeName() == "geometrytype"
+        assert not geometry_field2.nullable
+
+        # Check non-geometry column
+        attribute_field = schema["attribute"]
+        assert attribute_field.dataType.typeName() != "geometrytype"
+
+    def test_copy(self):
+        df = GeoDataFrame([Point(x, x) for x in range(3)], name="test_df")
+        df_copy = df.copy()
+        assert type(df_copy) is GeoDataFrame
+
+    def test_area(self):
+        # Create a GeoDataFrame with polygons to test area calculation
+        from shapely.geometry import Polygon
+
+        # Create polygons with known areas (1.0 and 4.0 square units)
+        poly1 = Polygon([(0, 0), (1, 0), (1, 1), (0, 1)])  # 1 square unit
+        poly2 = Polygon([(0, 0), (2, 0), (2, 2), (0, 2)])  # 4 square units
+
+        data = {"geometry1": [poly1, poly2], "id": [1, 2], "value": ["a", "b"]}
+
+        df = GeoDataFrame(data)
+
+        # Calculate area
+        area_df = df.area
+
+        # Verify result is a GeoDataFrame
+        assert type(area_df) is GeoDataFrame
+
+        # Verify the geometry column was converted to area values
+        assert "geometry1_area" in area_df.columns
+
+        # Verify non-geometry columns were preserved
+        assert "id" in area_df.columns
+        assert "value" in area_df.columns
+
+        # Check the actual area values
+        area_values = area_df["geometry1_area"].to_list()
+        assert len(area_values) == 2
+        self.assert_almost_equal(area_values[0], 1.0)
+        self.assert_almost_equal(area_values[1], 4.0)
+
+    def test_buffer(self):
+        # Create a GeoDataFrame with geometries to test buffer operation
+        from shapely.geometry import Polygon, Point
+
+        # Create input geometries
+        point = Point(0, 0)
+        square = Polygon([(0, 0), (1, 0), (1, 1), (0, 1)])
+
+        data = {"geometry1": [point, square], "id": [1, 2], "value": ["a", 
"b"]}
+        df = GeoDataFrame(data)
+
+        # Apply buffer with distance 0.5
+        buffer_df = df.buffer(0.5)
+
+        # Verify result is a GeoDataFrame
+        assert type(buffer_df) is GeoDataFrame
+
+        # Verify the original columns are preserved
+        assert "geometry1" in buffer_df.columns
+        assert "id" in buffer_df.columns
+        assert "value" in buffer_df.columns
+
+        # Convert to pandas to extract individual geometries
+        pandas_df = 
buffer_df._internal.spark_frame.select("geometry1").toPandas()
+
+        # Calculate areas to verify buffer was applied correctly
+        # Point buffer with radius 0.5 should have area approximately π * 0.5² 
≈ 0.785
+        # Square buffer with radius 0.5 should expand the 1x1 square to 2x2 
square with rounded corners
+        areas = [geom.area for geom in pandas_df["geometry1"]]
+
+        # Check that square buffer area is greater than original (1.0)
+        assert areas[1] > 1.0
+
+
+# -----------------------------------------------------------------------------
+# # Utils
+# -----------------------------------------------------------------------------
+
+
+def check_geodataframe(df):
+    assert isinstance(df, GeoDataFrame)
diff --git a/python/tests/geopandas/test_geoseries.py 
b/python/tests/geopandas/test_geoseries.py
new file mode 100644
index 0000000000..52e60ab3f7
--- /dev/null
+++ b/python/tests/geopandas/test_geoseries.py
@@ -0,0 +1,114 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+import os
+import shutil
+import tempfile
+
+from shapely.geometry import (
+    Point,
+    Polygon,
+)
+
+from sedona.geopandas import GeoSeries
+from tests.test_base import TestBase
+import pyspark.pandas as ps
+
+
+class TestSeries(TestBase):
+    def setup_method(self):
+        self.tempdir = tempfile.mkdtemp()
+        self.t1 = Polygon([(0, 0), (1, 0), (1, 1)])
+        self.t2 = Polygon([(0, 0), (1, 1), (0, 1)])
+        self.sq = Polygon([(0, 0), (1, 0), (1, 1), (0, 1)])
+        self.g1 = GeoSeries([self.t1, self.t2])
+        self.g2 = GeoSeries([self.sq, self.t1])
+        self.g3 = GeoSeries([self.t1, self.t2], crs="epsg:4326")
+        self.g4 = GeoSeries([self.t2, self.t1])
+
+    def teardown_method(self):
+        shutil.rmtree(self.tempdir)
+
+    def test_constructor(self):
+        s = GeoSeries([Point(x, x) for x in range(3)])
+        check_geoseries(s)
+
+    def test_psdf(self):
+        # this is to make sure the spark session works with pandas on spark api
+        psdf = ps.DataFrame(
+            {
+                "a": [1, 2, 3, 4, 5, 6],
+                "b": [100, 200, 300, 400, 500, 600],
+                "c": ["one", "two", "three", "four", "five", "six"],
+            },
+            index=[10, 20, 30, 40, 50, 60],
+        )
+        assert psdf.count().count() is 3
+
+    def test_internal_st_function(self):
+        # this is to make sure the spark session works with internal sedona 
udfs
+        baseDf = self.spark.sql(
+            "SELECT ST_GeomFromWKT('POLYGON ((50 50 1, 50 80 2, 80 80 3, 80 50 
2, 50 50 1))') as geom"
+        )
+        actual = baseDf.selectExpr("ST_AsText(ST_Expand(geom, 10))").first()[0]
+        expected = "POLYGON Z((40 40 -9, 40 90 -9, 90 90 13, 90 40 13, 40 40 
-9))"
+        assert expected == actual
+
+    def test_type(self):
+        assert type(self.g1) is GeoSeries
+        assert type(self.g2) is GeoSeries
+        assert type(self.g3) is GeoSeries
+        assert type(self.g4) is GeoSeries
+
+    def test_copy(self):
+        gc = self.g3.copy()
+        assert type(gc) is GeoSeries
+        assert self.g3.name == gc.name
+
+    def test_area(self):
+        area = self.g1.area
+        assert area is not None
+        assert type(area) is GeoSeries
+        assert area.count() is 2
+
+    def test_buffer(self):
+        buffer = self.g1.buffer(0.2)
+        assert buffer is not None
+        assert type(buffer) is GeoSeries
+        assert buffer.count() is 2
+
+    def test_buffer_then_area(self):
+        area = self.g1.buffer(0.2).area
+        assert area is not None
+        assert type(area) is GeoSeries
+        assert area.count() is 2
+
+    def test_buffer_then_geoparquet(self):
+        temp_file_path = os.path.join(
+            self.tempdir, next(tempfile._get_candidate_names()) + ".parquet"
+        )
+        self.g1.buffer(0.2).to_parquet(temp_file_path)
+        assert os.path.exists(temp_file_path)
+
+
+# -----------------------------------------------------------------------------
+# # Utils
+# -----------------------------------------------------------------------------
+
+
+def check_geoseries(s):
+    assert isinstance(s, GeoSeries)
+    assert isinstance(s.geometry, GeoSeries)
diff --git a/python/tests/geopandas/test_sjoin.py 
b/python/tests/geopandas/test_sjoin.py
new file mode 100644
index 0000000000..f9e1c6f680
--- /dev/null
+++ b/python/tests/geopandas/test_sjoin.py
@@ -0,0 +1,53 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+import shutil
+import tempfile
+
+from shapely.geometry import Polygon
+from sedona.geopandas import GeoSeries, sjoin
+from tests.test_base import TestBase
+
+
+class TestSpatialJoin(TestBase):
+    def setup_method(self):
+        self.tempdir = tempfile.mkdtemp()
+        self.t1 = Polygon([(0, 0), (1, 0), (1, 1)])
+        self.t2 = Polygon([(0, 0), (1, 1), (0, 1)])
+        self.sq = Polygon([(0, 0), (1, 0), (1, 1), (0, 1)])
+        self.g1 = GeoSeries([self.t1, self.t2])
+        self.g2 = GeoSeries([self.sq, self.t1])
+        self.g3 = GeoSeries([self.t1, self.t2], crs="epsg:4326")
+        self.g4 = GeoSeries([self.t2, self.t1])
+
+    def teardown_method(self):
+        shutil.rmtree(self.tempdir)
+
+    def test_sjoin_method1(self):
+        left = self.g1
+        right = self.g2
+        joined = sjoin(left, right)
+        assert joined is not None
+        assert type(joined) is GeoSeries
+        assert joined.count() is 4
+
+    def test_sjoin_method2(self):
+        left = self.g1
+        right = self.g2
+        joined = left.sjoin(right)
+        assert joined is not None
+        assert type(joined) is GeoSeries
+        assert joined.count() is 4


Reply via email to