This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 75afa8ee3c80 [SPARK-55229][SPARK-55231][PYTHON] Implement
DataFrame.zipWithIndex in PySpark
75afa8ee3c80 is described below
commit 75afa8ee3c80189750f6721fa93a1025b4edbf5b
Author: Fangchen Li <[email protected]>
AuthorDate: Mon Feb 9 15:51:27 2026 +0800
[SPARK-55229][SPARK-55231][PYTHON] Implement DataFrame.zipWithIndex in
PySpark
### What changes were proposed in this pull request?
Implement DataFrame.zipWithIndex in PySpark Classic
### Why are the changes needed?
This method was added in Scala earlier. We need to add it in PySpark
classic so user can use it in PySpark.
### Does this PR introduce _any_ user-facing change?
Yes, user can see and use this API in PySpark.
### How was this patch tested?
Unittests added.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.6
Closes #54195 from fangchenli/pyspark-zip-with-index.
Authored-by: Fangchen Li <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/classic/dataframe.py | 3 ++
python/pyspark/sql/connect/dataframe.py | 5 +++
python/pyspark/sql/dataframe.py | 52 ++++++++++++++++++++++++++++++
python/pyspark/sql/tests/test_dataframe.py | 30 +++++++++++++++++
4 files changed, 90 insertions(+)
diff --git a/python/pyspark/sql/classic/dataframe.py
b/python/pyspark/sql/classic/dataframe.py
index 41d9d4166727..854ab0ff89a0 100644
--- a/python/pyspark/sql/classic/dataframe.py
+++ b/python/pyspark/sql/classic/dataframe.py
@@ -280,6 +280,9 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin,
PandasConversionMixin):
def exceptAll(self, other: ParentDataFrame) -> ParentDataFrame:
return DataFrame(self._jdf.exceptAll(other._jdf), self.sparkSession)
+ def zipWithIndex(self, indexColName: str = "index") -> ParentDataFrame:
+ return DataFrame(self._jdf.zipWithIndex(indexColName),
self.sparkSession)
+
def isLocal(self) -> bool:
return self._jdf.isLocal()
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index 7efb0887e573..4c40f9512ce3 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1212,6 +1212,11 @@ class DataFrame(ParentDataFrame):
res._cached_schema = self._merge_cached_schema(other)
return res
+ def zipWithIndex(self, indexColName: str = "index") -> ParentDataFrame:
+ return self.select(
+ F.col("*"),
F._invoke_function("distributed_sequence_id").alias(indexColName)
+ )
+
def intersect(self, other: ParentDataFrame) -> ParentDataFrame:
self._check_same_session(other)
res = DataFrame(
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index c6f348ce600a..a1607df4ecef 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -782,6 +782,58 @@ class DataFrame:
"""
...
+ @dispatch_df_method
+ def zipWithIndex(self, indexColName: str = "index") -> "DataFrame":
+ """Returns a new :class:`DataFrame` by appending a column containing
consecutive
+ 0-based Long indices, similar to :meth:`RDD.zipWithIndex`.
+
+ The index column is appended as the last column of the resulting
:class:`DataFrame`.
+
+ .. versionadded:: 4.2.0
+
+ Parameters
+ ----------
+ indexColName : str, default "index"
+ The name of the index column to append.
+
+ Returns
+ -------
+ :class:`DataFrame`
+ A new DataFrame with an appended index column.
+
+ Notes
+ -----
+ If a column with `indexColName` already exists in the schema, the
resulting
+ :class:`DataFrame` will have duplicate column names. Selecting the
duplicate column
+ by name will throw `AMBIGUOUS_REFERENCE`, and writing the
:class:`DataFrame` will
+ throw `COLUMN_ALREADY_EXISTS`.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame(
+ ... [("a", 1), ("b", 2), ("c", 3)], ["letter", "number"])
+ >>> df.zipWithIndex().show()
+ +------+------+-----+
+ |letter|number|index|
+ +------+------+-----+
+ | a| 1| 0|
+ | b| 2| 1|
+ | c| 3| 2|
+ +------+------+-----+
+
+ Custom index column name:
+
+ >>> df.zipWithIndex("row_id").show()
+ +------+------+------+
+ |letter|number|row_id|
+ +------+------+------+
+ | a| 1| 0|
+ | b| 2| 1|
+ | c| 3| 2|
+ +------+------+------+
+ """
+ ...
+
@dispatch_df_method
def isLocal(self) -> bool:
"""Returns ``True`` if the :func:`collect` and :func:`take` methods
can be run locally
diff --git a/python/pyspark/sql/tests/test_dataframe.py
b/python/pyspark/sql/tests/test_dataframe.py
index d850f2598a2d..7aa25605b2ee 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -1214,6 +1214,36 @@ class DataFrameTestsMixin:
self.assertIsInstance(df, DataFrame)
self.assertEqual(df.select("value").count(), 10)
+ def test_zip_with_index(self):
+ df = self.spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)],
["letter", "number"])
+
+ # Default column name "index"
+ result = df.zipWithIndex()
+ self.assertEqual(result.columns, ["letter", "number", "index"])
+ rows = result.collect()
+ self.assertEqual(len(rows), 3)
+ indices = [row["index"] for row in rows]
+ self.assertEqual(sorted(indices), [0, 1, 2])
+
+ # Custom column name
+ result = df.zipWithIndex("row_id")
+ self.assertEqual(result.columns, ["letter", "number", "row_id"])
+ rows = result.collect()
+ indices = [row["row_id"] for row in rows]
+ self.assertEqual(sorted(indices), [0, 1, 2])
+
+ # Duplicate column name causes AMBIGUOUS_REFERENCE on select
+ result = df.zipWithIndex("letter")
+ with self.assertRaises(AnalysisException) as ctx:
+ result.select("letter").collect()
+ self.assertEqual(ctx.exception.getCondition(), "AMBIGUOUS_REFERENCE")
+
+ # Duplicate column name causes COLUMN_ALREADY_EXISTS on write
+ with tempfile.TemporaryDirectory() as d:
+ with self.assertRaises(AnalysisException) as ctx:
+ result.write.parquet(d)
+ self.assertEqual(ctx.exception.getCondition(),
"COLUMN_ALREADY_EXISTS")
+
class DataFrameTests(DataFrameTestsMixin, ReusedSQLTestCase):
pass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]