This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 75afa8ee3c80 [SPARK-55229][SPARK-55231][PYTHON] Implement 
DataFrame.zipWithIndex in PySpark
75afa8ee3c80 is described below

commit 75afa8ee3c80189750f6721fa93a1025b4edbf5b
Author: Fangchen Li <[email protected]>
AuthorDate: Mon Feb 9 15:51:27 2026 +0800

    [SPARK-55229][SPARK-55231][PYTHON] Implement DataFrame.zipWithIndex in 
PySpark
    
    ### What changes were proposed in this pull request?
    
    Implement DataFrame.zipWithIndex in PySpark Classic
    
    ### Why are the changes needed?
    
    This method was added in Scala earlier. We need to add it in PySpark 
classic so user can use it in PySpark.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, user can see and use this API in PySpark.
    
    ### How was this patch tested?
    
    Unittests added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Opus 4.6
    
    Closes #54195 from fangchenli/pyspark-zip-with-index.
    
    Authored-by: Fangchen Li <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/classic/dataframe.py    |  3 ++
 python/pyspark/sql/connect/dataframe.py    |  5 +++
 python/pyspark/sql/dataframe.py            | 52 ++++++++++++++++++++++++++++++
 python/pyspark/sql/tests/test_dataframe.py | 30 +++++++++++++++++
 4 files changed, 90 insertions(+)

diff --git a/python/pyspark/sql/classic/dataframe.py 
b/python/pyspark/sql/classic/dataframe.py
index 41d9d4166727..854ab0ff89a0 100644
--- a/python/pyspark/sql/classic/dataframe.py
+++ b/python/pyspark/sql/classic/dataframe.py
@@ -280,6 +280,9 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin, 
PandasConversionMixin):
     def exceptAll(self, other: ParentDataFrame) -> ParentDataFrame:
         return DataFrame(self._jdf.exceptAll(other._jdf), self.sparkSession)
 
+    def zipWithIndex(self, indexColName: str = "index") -> ParentDataFrame:
+        return DataFrame(self._jdf.zipWithIndex(indexColName), 
self.sparkSession)
+
     def isLocal(self) -> bool:
         return self._jdf.isLocal()
 
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 7efb0887e573..4c40f9512ce3 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1212,6 +1212,11 @@ class DataFrame(ParentDataFrame):
         res._cached_schema = self._merge_cached_schema(other)
         return res
 
+    def zipWithIndex(self, indexColName: str = "index") -> ParentDataFrame:
+        return self.select(
+            F.col("*"), 
F._invoke_function("distributed_sequence_id").alias(indexColName)
+        )
+
     def intersect(self, other: ParentDataFrame) -> ParentDataFrame:
         self._check_same_session(other)
         res = DataFrame(
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index c6f348ce600a..a1607df4ecef 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -782,6 +782,58 @@ class DataFrame:
         """
         ...
 
+    @dispatch_df_method
+    def zipWithIndex(self, indexColName: str = "index") -> "DataFrame":
+        """Returns a new :class:`DataFrame` by appending a column containing 
consecutive
+        0-based Long indices, similar to :meth:`RDD.zipWithIndex`.
+
+        The index column is appended as the last column of the resulting 
:class:`DataFrame`.
+
+        .. versionadded:: 4.2.0
+
+        Parameters
+        ----------
+        indexColName : str, default "index"
+            The name of the index column to append.
+
+        Returns
+        -------
+        :class:`DataFrame`
+            A new DataFrame with an appended index column.
+
+        Notes
+        -----
+        If a column with `indexColName` already exists in the schema, the 
resulting
+        :class:`DataFrame` will have duplicate column names. Selecting the 
duplicate column
+        by name will throw `AMBIGUOUS_REFERENCE`, and writing the 
:class:`DataFrame` will
+        throw `COLUMN_ALREADY_EXISTS`.
+
+        Examples
+        --------
+        >>> df = spark.createDataFrame(
+        ...     [("a", 1), ("b", 2), ("c", 3)], ["letter", "number"])
+        >>> df.zipWithIndex().show()
+        +------+------+-----+
+        |letter|number|index|
+        +------+------+-----+
+        |     a|     1|    0|
+        |     b|     2|    1|
+        |     c|     3|    2|
+        +------+------+-----+
+
+        Custom index column name:
+
+        >>> df.zipWithIndex("row_id").show()
+        +------+------+------+
+        |letter|number|row_id|
+        +------+------+------+
+        |     a|     1|     0|
+        |     b|     2|     1|
+        |     c|     3|     2|
+        +------+------+------+
+        """
+        ...
+
     @dispatch_df_method
     def isLocal(self) -> bool:
         """Returns ``True`` if the :func:`collect` and :func:`take` methods 
can be run locally
diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index d850f2598a2d..7aa25605b2ee 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -1214,6 +1214,36 @@ class DataFrameTestsMixin:
             self.assertIsInstance(df, DataFrame)
             self.assertEqual(df.select("value").count(), 10)
 
+    def test_zip_with_index(self):
+        df = self.spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], 
["letter", "number"])
+
+        # Default column name "index"
+        result = df.zipWithIndex()
+        self.assertEqual(result.columns, ["letter", "number", "index"])
+        rows = result.collect()
+        self.assertEqual(len(rows), 3)
+        indices = [row["index"] for row in rows]
+        self.assertEqual(sorted(indices), [0, 1, 2])
+
+        # Custom column name
+        result = df.zipWithIndex("row_id")
+        self.assertEqual(result.columns, ["letter", "number", "row_id"])
+        rows = result.collect()
+        indices = [row["row_id"] for row in rows]
+        self.assertEqual(sorted(indices), [0, 1, 2])
+
+        # Duplicate column name causes AMBIGUOUS_REFERENCE on select
+        result = df.zipWithIndex("letter")
+        with self.assertRaises(AnalysisException) as ctx:
+            result.select("letter").collect()
+        self.assertEqual(ctx.exception.getCondition(), "AMBIGUOUS_REFERENCE")
+
+        # Duplicate column name causes COLUMN_ALREADY_EXISTS on write
+        with tempfile.TemporaryDirectory() as d:
+            with self.assertRaises(AnalysisException) as ctx:
+                result.write.parquet(d)
+            self.assertEqual(ctx.exception.getCondition(), 
"COLUMN_ALREADY_EXISTS")
+
 
 class DataFrameTests(DataFrameTestsMixin, ReusedSQLTestCase):
     pass


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to