This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 43d332d2b9d5 [SPARK-55228][SPARK-55230][SQL][CONNECT] Implement 
Dataset.zipWithIndex in Scala API
43d332d2b9d5 is described below

commit 43d332d2b9d5939c99321b3cdbebf9e6d5d91d5a
Author: Fangchen Li <[email protected]>
AuthorDate: Thu Feb 5 16:20:27 2026 +0800

    [SPARK-55228][SPARK-55230][SQL][CONNECT] Implement Dataset.zipWithIndex in 
Scala API
    
    ### What changes were proposed in this pull request?
    
    Implement Dataset.zipWithIndex in Scala API
    
    ### Why are the changes needed?
    
    Align Dataset and RDD api
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this PR added Dataset.zipWithIndex() and 
Dataset.zipWithIndex(indexColName: String)
    
    ### How was this patch tested?
    
    Unittests added
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Opus 4.5
    
    Closes #54014 from fangchenli/dataset-zip-with-index.
    
    Authored-by: Fangchen Li <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 project/MimaExcludes.scala                         |   4 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  31 ++++++++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |   8 ++
 .../spark/sql/connect/ClientE2ETestSuite.scala     |  36 +++++++++
 .../explain-results/zipWithIndex.explain           |   4 +
 .../zipWithIndex_custom_column.explain             |   4 +
 .../query-tests/queries/zipWithIndex.json          |  81 +++++++++++++++++++++
 .../query-tests/queries/zipWithIndex.proto.bin     | Bin 0 -> 644 bytes
 .../queries/zipWithIndex_custom_column.json        |  81 +++++++++++++++++++++
 .../queries/zipWithIndex_custom_column.proto.bin   | Bin 0 -> 647 bytes
 .../scala/org/apache/spark/sql/DatasetSuite.scala  |  42 +++++++++++
 11 files changed, 290 insertions(+), 1 deletion(-)

diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index c4e39e070f1a..9add6542841e 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -40,7 +40,9 @@ object MimaExcludes {
     // [SPARK-47086][BUILD][CORE][WEBUI] Upgrade Jetty to 12.1.4
     
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ui.ProxyRedirectHandler$ResponseWrapper"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.ProxyRedirectHandler#ResponseWrapper.sendRedirect"),
-    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ui.ProxyRedirectHandler#ResponseWrapper.this")
+    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ui.ProxyRedirectHandler#ResponseWrapper.this"),
+    // [SPARK-55228][SQL] Implement Dataset.zipWithIndex in Scala API
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.Dataset.zipWithIndex")
   )
 
   // Exclude rules for 4.1.x from 4.0.0
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
index 6b06ce58df6b..0f1fe314c350 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2010,6 +2010,37 @@ abstract class Dataset[T] extends Serializable {
    */
   def exceptAll(other: Dataset[T]): Dataset[T]
 
+  /**
+   * Returns a new `Dataset` by appending a column containing consecutive 
0-based Long indices,
+   * similar to `RDD.zipWithIndex()`.
+   *
+   * The index column is appended as the last column of the resulting 
`DataFrame`.
+   *
+   * @group untypedrel
+   * @since 4.2.0
+   */
+  def zipWithIndex(): DataFrame = zipWithIndex("index")
+
+  /**
+   * Returns a new `Dataset` by appending a column containing consecutive 
0-based Long indices,
+   * similar to `RDD.zipWithIndex()`.
+   *
+   * The index column is appended as the last column of the resulting 
`DataFrame`.
+   *
+   * @note
+   *   If a column with `indexColName` already exists in the schema, the 
resulting `DataFrame`
+   *   will have duplicate column names. Selecting the duplicate column by 
name will throw
+   *   `AMBIGUOUS_REFERENCE`, and writing the `DataFrame` will throw 
`COLUMN_ALREADY_EXISTS`.
+   *
+   * @param indexColName
+   *   The name of the index column to append.
+   * @group untypedrel
+   * @since 4.2.0
+   */
+  def zipWithIndex(indexColName: String): DataFrame = {
+    select(col("*"), 
Column.internalFn("distributed_sequence_id").alias(indexColName))
+  }
+
   /**
    * Returns a new [[Dataset]] by sampling a fraction of rows (without 
replacement), using a
    * user-supplied seed.
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index a21de2dbfaa6..852e832ded85 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -684,6 +684,14 @@ class PlanGenerationTestSuite extends ConnectFunSuite with 
Logging {
     simple.withMetadata("id", builder.build())
   }
 
+  test("zipWithIndex") {
+    simple.zipWithIndex()
+  }
+
+  test("zipWithIndex custom column") {
+    simple.zipWithIndex("my_index")
+  }
+
   test("drop single string") {
     simple.drop("a")
   }
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
index 5ff030b56d4c..52d87087805f 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
@@ -772,6 +772,42 @@ class ClientE2ETestSuite
     assert(spark.range(10).count() === 10)
   }
 
+  test("Dataset zipWithIndex") {
+    val df = spark.range(5).repartition(3)
+    val result = df.zipWithIndex()
+    assert(result.columns === Array("id", "index"))
+    assert(result.schema.last.dataType === LongType)
+    val indices = result.collect().map(_.getLong(1)).sorted
+    assert(indices === (0L until 5L).toArray)
+  }
+
+  test("Dataset zipWithIndex with custom column name") {
+    val result = spark.range(3).zipWithIndex("row_num")
+    assert(result.columns === Array("id", "row_num"))
+    val indices = result.collect().map(_.getLong(1)).sorted
+    assert(indices === Array(0L, 1L, 2L))
+  }
+
+  test("Dataset zipWithIndex should throw AMBIGUOUS_REFERENCE when selecting 
duplicate column") {
+    val df = spark.range(3).withColumnRenamed("id", "index")
+    val result = df.zipWithIndex() // Creates df with two "index" columns
+    val ex = intercept[AnalysisException] {
+      result.select("index").collect()
+    }
+    assert(ex.getCondition == "AMBIGUOUS_REFERENCE")
+  }
+
+  test("Dataset zipWithIndex should throw COLUMN_ALREADY_EXISTS when writing 
duplicate columns") {
+    val df = spark.range(3).withColumnRenamed("id", "index")
+    val result = df.zipWithIndex() // Creates df with two "index" columns
+    withTempPath { path =>
+      val ex = intercept[AnalysisException] {
+        result.write.parquet(path.getAbsolutePath)
+      }
+      assert(ex.getCondition == "COLUMN_ALREADY_EXISTS")
+    }
+  }
+
   test("Dataset collect tuple") {
     val session = spark
     import session.implicits._
diff --git 
a/sql/connect/common/src/test/resources/query-tests/explain-results/zipWithIndex.explain
 
b/sql/connect/common/src/test/resources/query-tests/explain-results/zipWithIndex.explain
new file mode 100644
index 000000000000..c1ee3b77d979
--- /dev/null
+++ 
b/sql/connect/common/src/test/resources/query-tests/explain-results/zipWithIndex.explain
@@ -0,0 +1,4 @@
+Project [id#0L, a#0, b#0, index#0L]
++- Project [id#0L, a#0, b#0, distributed_sequence_id#0L AS index#0L]
+   +- AttachDistributedSequence[distributed_sequence_id#0L, id#0L, a#0, b#0] 
Index: distributed_sequence_id#0L
+      +- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git 
a/sql/connect/common/src/test/resources/query-tests/explain-results/zipWithIndex_custom_column.explain
 
b/sql/connect/common/src/test/resources/query-tests/explain-results/zipWithIndex_custom_column.explain
new file mode 100644
index 000000000000..638f222caa67
--- /dev/null
+++ 
b/sql/connect/common/src/test/resources/query-tests/explain-results/zipWithIndex_custom_column.explain
@@ -0,0 +1,4 @@
+Project [id#0L, a#0, b#0, my_index#0L]
++- Project [id#0L, a#0, b#0, distributed_sequence_id#0L AS my_index#0L]
+   +- AttachDistributedSequence[distributed_sequence_id#0L, id#0L, a#0, b#0] 
Index: distributed_sequence_id#0L
+      +- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json 
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json
new file mode 100644
index 000000000000..9c0278561a61
--- /dev/null
+++ 
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json
@@ -0,0 +1,81 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedStar": {
+        "planId": "0"
+      },
+      "common": {
+        "origin": {
+          "jvmOrigin": {
+            "stackTrace": [{
+              "classLoaderName": "app",
+              "declaringClass": "org.apache.spark.sql.Dataset",
+              "methodName": "zipWithIndex",
+              "fileName": "Dataset.scala"
+            }, {
+              "classLoaderName": "app",
+              "declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
+              "methodName": "~~trimmed~anonfun~~",
+              "fileName": "PlanGenerationTestSuite.scala"
+            }]
+          }
+        }
+      }
+    }, {
+      "alias": {
+        "expr": {
+          "unresolvedFunction": {
+            "functionName": "distributed_sequence_id",
+            "isInternal": true
+          },
+          "common": {
+            "origin": {
+              "jvmOrigin": {
+                "stackTrace": [{
+                  "classLoaderName": "app",
+                  "declaringClass": "org.apache.spark.sql.Dataset",
+                  "methodName": "zipWithIndex",
+                  "fileName": "Dataset.scala"
+                }, {
+                  "classLoaderName": "app",
+                  "declaringClass": 
"org.apache.spark.sql.PlanGenerationTestSuite",
+                  "methodName": "~~trimmed~anonfun~~",
+                  "fileName": "PlanGenerationTestSuite.scala"
+                }]
+              }
+            }
+          }
+        },
+        "name": ["index"]
+      },
+      "common": {
+        "origin": {
+          "jvmOrigin": {
+            "stackTrace": [{
+              "classLoaderName": "app",
+              "declaringClass": "org.apache.spark.sql.Dataset",
+              "methodName": "zipWithIndex",
+              "fileName": "Dataset.scala"
+            }, {
+              "classLoaderName": "app",
+              "declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
+              "methodName": "~~trimmed~anonfun~~",
+              "fileName": "PlanGenerationTestSuite.scala"
+            }]
+          }
+        }
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.proto.bin
 
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.proto.bin
new file mode 100644
index 000000000000..2109d63291ee
Binary files /dev/null and 
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.proto.bin
 differ
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.json
 
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.json
new file mode 100644
index 000000000000..80229004d7a8
--- /dev/null
+++ 
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.json
@@ -0,0 +1,81 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedStar": {
+        "planId": "0"
+      },
+      "common": {
+        "origin": {
+          "jvmOrigin": {
+            "stackTrace": [{
+              "classLoaderName": "app",
+              "declaringClass": "org.apache.spark.sql.Dataset",
+              "methodName": "zipWithIndex",
+              "fileName": "Dataset.scala"
+            }, {
+              "classLoaderName": "app",
+              "declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
+              "methodName": "~~trimmed~anonfun~~",
+              "fileName": "PlanGenerationTestSuite.scala"
+            }]
+          }
+        }
+      }
+    }, {
+      "alias": {
+        "expr": {
+          "unresolvedFunction": {
+            "functionName": "distributed_sequence_id",
+            "isInternal": true
+          },
+          "common": {
+            "origin": {
+              "jvmOrigin": {
+                "stackTrace": [{
+                  "classLoaderName": "app",
+                  "declaringClass": "org.apache.spark.sql.Dataset",
+                  "methodName": "zipWithIndex",
+                  "fileName": "Dataset.scala"
+                }, {
+                  "classLoaderName": "app",
+                  "declaringClass": 
"org.apache.spark.sql.PlanGenerationTestSuite",
+                  "methodName": "~~trimmed~anonfun~~",
+                  "fileName": "PlanGenerationTestSuite.scala"
+                }]
+              }
+            }
+          }
+        },
+        "name": ["my_index"]
+      },
+      "common": {
+        "origin": {
+          "jvmOrigin": {
+            "stackTrace": [{
+              "classLoaderName": "app",
+              "declaringClass": "org.apache.spark.sql.Dataset",
+              "methodName": "zipWithIndex",
+              "fileName": "Dataset.scala"
+            }, {
+              "classLoaderName": "app",
+              "declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
+              "methodName": "~~trimmed~anonfun~~",
+              "fileName": "PlanGenerationTestSuite.scala"
+            }]
+          }
+        }
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.proto.bin
 
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.proto.bin
new file mode 100644
index 000000000000..0b60290a4241
Binary files /dev/null and 
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.proto.bin
 differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 2aefd011851e..ef053d638701 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -2997,6 +2997,48 @@ class DatasetSuite extends QueryTest
     val metrics = observation.get
     assert(metrics.isEmpty)
   }
+
+  test("zipWithIndex should append consecutive 0-based indices") {
+    val ds = Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4), ("e", 
5)).toDS().repartition(3)
+    val result = ds.zipWithIndex()
+
+    // Index column should be the last column
+    assert(result.columns === Array("_1", "_2", "index"))
+    assert(result.schema.last.dataType === LongType)
+
+    // Indices should be consecutive 0-based
+    val indices = result.collect().map(_.getLong(2)).sorted
+    assert(indices === (0L until 5L).toArray)
+  }
+
+  test("zipWithIndex with custom column name") {
+    val ds = Seq(1, 2, 3, 4, 5).toDS()
+    val result = ds.zipWithIndex("row_num")
+
+    assert(result.columns === Array("value", "row_num"))
+    val indices = result.collect().map(_.getLong(1)).sorted
+    assert(indices === (0L until 5L).toArray)
+  }
+
+  test("zipWithIndex should throw AMBIGUOUS_REFERENCE when selecting duplicate 
column") {
+    val ds = Seq(("a", 1), ("b", 2)).toDF("_1", "index")
+    val result = ds.zipWithIndex() // Creates df with two "index" columns
+    val ex = intercept[AnalysisException] {
+      result.select("index").collect()
+    }
+    assert(ex.getCondition == "AMBIGUOUS_REFERENCE")
+  }
+
+  test("zipWithIndex should throw COLUMN_ALREADY_EXISTS when writing duplicate 
columns") {
+    val ds = Seq(("a", 1), ("b", 2)).toDF("_1", "index")
+    val result = ds.zipWithIndex() // Creates df with two "index" columns
+    withTempPath { path =>
+      val ex = intercept[AnalysisException] {
+        result.write.parquet(path.getAbsolutePath)
+      }
+      assert(ex.getCondition == "COLUMN_ALREADY_EXISTS")
+    }
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to