This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b70c88bbef0c [SPARK-45384][CORE][SQL][SS][DSTREAM][CONNECT] Replace 
`TraversableOnce` with `IterableOnce`
b70c88bbef0c is described below

commit b70c88bbef0cedc6a4b591f92cbb7fcc869667d5
Author: yangjie01 <[email protected]>
AuthorDate: Sun Oct 1 21:41:21 2023 +0800

    [SPARK-45384][CORE][SQL][SS][DSTREAM][CONNECT] Replace `TraversableOnce` 
with `IterableOnce`
    
    ### What changes were proposed in this pull request?
    This pr replace `TraversableOnce` with `IterableOnce` because 
`TraversableOnce` has been marked as deprecated after Scala 2.13.0.
    
    ```
    deprecated("Use IterableOnce instead of TraversableOnce", "2.13.0")
    type TraversableOnce[+A] = scala.collection.IterableOnce[A]
    ```
    
    Additionally, this PR renames two functions:
    - rename `UdfUtils#traversableOnceToSeq` to `UdfUtils#iterableOnceToSeq`
    - rename `GenerateExec#codeGenTraversableOnce` to 
`GenerateExec#codeGenIterableOnce`
    
    ### Why are the changes needed?
    Clean up deprecated Scala API usage.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43186 from LuciferYang/SPARK-45384.
    
    Authored-by: yangjie01 <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala    | 10 +++++-----
 .../apache/spark/sql/KeyValueGroupedDataset.scala    | 12 ++++++------
 .../org/apache/spark/sql/ClientE2ETestSuite.scala    |  2 +-
 .../apache/spark/sql/connect/common/UdfUtils.scala   | 20 ++++++++++----------
 .../sql/connect/planner/SparkConnectPlanner.scala    |  4 ++--
 .../org/apache/spark/api/java/JavaPairRDD.scala      |  4 ++--
 .../org/apache/spark/api/java/JavaRDDLike.scala      |  2 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala      |  8 ++++----
 core/src/main/scala/org/apache/spark/rdd/RDD.scala   |  4 ++--
 .../scala/org/apache/spark/util/StatCounter.scala    |  6 +++---
 .../src/main/scala/org/apache/spark/util/Utils.scala |  2 +-
 .../apache/spark/util/collection/CompactBuffer.scala |  2 +-
 .../org/apache/spark/util/collection/Utils.scala     |  2 +-
 .../spark/sql/catalyst/analysis/unresolved.scala     |  4 ++--
 .../org/apache/spark/sql/catalyst/dsl/package.scala  |  2 +-
 .../spark/sql/catalyst/expressions/PythonUDF.scala   |  2 +-
 .../spark/sql/catalyst/expressions/generators.scala  | 20 ++++++++++----------
 .../sql/catalyst/expressions/jsonExpressions.scala   |  2 +-
 .../spark/sql/catalyst/plans/logical/object.scala    | 12 ++++++------
 .../apache/spark/sql/catalyst/trees/TreeNode.scala   |  2 +-
 .../expressions/GeneratorExpressionSuite.scala       |  2 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala    |  8 ++++----
 .../apache/spark/sql/KeyValueGroupedDataset.scala    |  8 ++++----
 .../apache/spark/sql/execution/GenerateExec.scala    |  8 ++++----
 .../execution/adaptive/AdaptiveSparkPlanHelper.scala |  2 +-
 .../org/apache/spark/sql/execution/objects.scala     |  6 +++---
 .../spark/sql/execution/streaming/memory.scala       |  4 ++--
 .../streaming/sources/ContinuousMemoryStream.scala   |  2 +-
 .../apache/spark/sql/GeneratorFunctionSuite.scala    |  2 +-
 .../scala/org/apache/spark/sql/hive/hiveUDFs.scala   |  4 ++--
 .../org/apache/spark/streaming/dstream/DStream.scala |  2 +-
 .../streaming/dstream/FlatMapValuedDStream.scala     |  2 +-
 .../spark/streaming/dstream/FlatMappedDStream.scala  |  2 +-
 .../streaming/dstream/PairDStreamFunctions.scala     |  2 +-
 34 files changed, 88 insertions(+), 88 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index eb5460cbc86a..a1e57226e530 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2733,7 +2733,7 @@ class Dataset[T] private[sql] (
    * @group typedrel
    * @since 3.5.0
    */
-  def flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U] =
+  def flatMap[U: Encoder](func: T => IterableOnce[U]): Dataset[U] =
     mapPartitions(UdfUtils.flatMapFuncToMapPartitionsAdaptor(func))
 
   /**
@@ -2775,9 +2775,9 @@ class Dataset[T] private[sql] (
    * @since 3.5.0
    */
   @deprecated("use flatMap() or select() with functions.explode() instead", 
"3.5.0")
-  def explode[A <: Product: TypeTag](input: Column*)(f: Row => 
TraversableOnce[A]): DataFrame = {
+  def explode[A <: Product: TypeTag](input: Column*)(f: Row => 
IterableOnce[A]): DataFrame = {
     val generator = ScalarUserDefinedFunction(
-      UdfUtils.traversableOnceToSeq(f),
+      UdfUtils.iterableOnceToSeq(f),
       UnboundRowEncoder :: Nil,
       ScalaReflection.encoderFor[Seq[A]])
     select(col("*"), functions.inline(generator(struct(input: _*))))
@@ -2807,9 +2807,9 @@ class Dataset[T] private[sql] (
    */
   @deprecated("use flatMap() or select() with functions.explode() instead", 
"3.5.0")
   def explode[A, B: TypeTag](inputColumn: String, outputColumn: String)(
-      f: A => TraversableOnce[B]): DataFrame = {
+      f: A => IterableOnce[B]): DataFrame = {
     val generator = ScalarUserDefinedFunction(
-      UdfUtils.traversableOnceToSeq(f),
+      UdfUtils.iterableOnceToSeq(f),
       Nil,
       ScalaReflection.encoderFor[Seq[B]])
     select(col("*"), 
functions.explode(generator(col(inputColumn))).as((outputColumn)))
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 9e7d8e1320ef..dac89bf3eb5a 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -111,7 +111,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends 
Serializable {
    *
    * @since 3.5.0
    */
-  def flatMapGroups[U: Encoder](f: (K, Iterator[V]) => TraversableOnce[U]): 
Dataset[U] = {
+  def flatMapGroups[U: Encoder](f: (K, Iterator[V]) => IterableOnce[U]): 
Dataset[U] = {
     flatMapSortedGroups()(f)
   }
 
@@ -162,7 +162,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends 
Serializable {
    * @since 3.5.0
    */
   def flatMapSortedGroups[U: Encoder](sortExprs: Column*)(
-      f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = {
+      f: (K, Iterator[V]) => IterableOnce[U]): Dataset[U] = {
     throw new UnsupportedOperationException
   }
 
@@ -397,7 +397,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends 
Serializable {
    * @since 3.5.0
    */
   def cogroup[U, R: Encoder](other: KeyValueGroupedDataset[K, U])(
-      f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
+      f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] = {
     cogroupSorted(other)()()(f)
   }
 
@@ -433,7 +433,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends 
Serializable {
    */
   def cogroupSorted[U, R: Encoder](other: KeyValueGroupedDataset[K, 
U])(thisSortExprs: Column*)(
       otherSortExprs: Column*)(
-      f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
+      f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] = {
     throw new UnsupportedOperationException
   }
 
@@ -865,7 +865,7 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
   }
 
   override def flatMapSortedGroups[U: Encoder](sortExprs: Column*)(
-      f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = {
+      f: (K, Iterator[V]) => IterableOnce[U]): Dataset[U] = {
     // Apply mapValues changes to the udf
     val nf =
       if (valueMapFunc == UdfUtils.identical()) f else 
UdfUtils.mapValuesAdaptor(f, valueMapFunc)
@@ -881,7 +881,7 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
 
   override def cogroupSorted[U, R: Encoder](other: KeyValueGroupedDataset[K, 
U])(
       thisSortExprs: Column*)(otherSortExprs: Column*)(
-      f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
+      f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] = {
     assert(other.isInstanceOf[KeyValueGroupedDatasetImpl[K, U, _, _]])
     val otherImpl = other.asInstanceOf[KeyValueGroupedDatasetImpl[K, U, _, _]]
     // Apply mapValues changes to the udf
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 45a3eee55565..85d98babcf9a 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -627,7 +627,7 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper with PrivateM
   }
 
   test("Dataset result collection") {
-    def checkResult(rows: TraversableOnce[java.lang.Long], expectedValues: 
Long*): Unit = {
+    def checkResult(rows: IterableOnce[java.lang.Long], expectedValues: 
Long*): Unit = {
       rows.toIterator.zipAll(expectedValues.iterator, null, null).foreach {
         case (actual, expected) => assert(actual === expected)
       }
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
index d7be199f033e..7050e62d549c 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
@@ -45,7 +45,7 @@ private[sql] object UdfUtils extends Serializable {
   }
 
   def flatMapFuncToMapPartitionsAdaptor[T, U](
-      f: T => TraversableOnce[U]): Iterator[T] => Iterator[U] = _.flatMap(f)
+      f: T => IterableOnce[U]): Iterator[T] => Iterator[U] = _.flatMap(f)
 
   def filterFuncToScalaFunc[T](f: FilterFunction[T]): T => Boolean = f.call
 
@@ -62,28 +62,28 @@ private[sql] object UdfUtils extends Serializable {
   def foreachBatchFuncToScalaFunc[D](f: VoidFunction2[D, java.lang.Long]): (D, 
Long) => Unit =
     (d, i) => f.call(d, i)
 
-  def flatMapFuncToScalaFunc[T, U](f: FlatMapFunction[T, U]): T => 
TraversableOnce[U] = x =>
+  def flatMapFuncToScalaFunc[T, U](f: FlatMapFunction[T, U]): T => 
IterableOnce[U] = x =>
     f.call(x).asScala
 
   def flatMapGroupsFuncToScalaFunc[K, V, U](
-      f: FlatMapGroupsFunction[K, V, U]): (K, Iterator[V]) => 
TraversableOnce[U] = (key, data) =>
+      f: FlatMapGroupsFunction[K, V, U]): (K, Iterator[V]) => IterableOnce[U] 
= (key, data) =>
     f.call(key, data.asJava).asScala
 
   def mapGroupsFuncToScalaFunc[K, V, U](f: MapGroupsFunction[K, V, U]): (K, 
Iterator[V]) => U =
     (key, data) => f.call(key, data.asJava)
 
   def coGroupFunctionToScalaFunc[K, V, U, R](
-      f: CoGroupFunction[K, V, U, R]): (K, Iterator[V], Iterator[U]) => 
TraversableOnce[R] =
+      f: CoGroupFunction[K, V, U, R]): (K, Iterator[V], Iterator[U]) => 
IterableOnce[R] =
     (key, left, right) => f.call(key, left.asJava, right.asJava).asScala
 
   def mapGroupsFuncToFlatMapAdaptor[K, V, U](
-      f: (K, Iterator[V]) => U): (K, Iterator[V]) => TraversableOnce[U] = {
+      f: (K, Iterator[V]) => U): (K, Iterator[V]) => IterableOnce[U] = {
     (key: K, it: Iterator[V]) => Iterator(f(key, it))
   }
 
   def mapValuesAdaptor[K, V, U, IV](
-      f: (K, Iterator[V]) => TraversableOnce[U],
-      valueMapFunc: IV => V): (K, Iterator[IV]) => TraversableOnce[U] = {
+      f: (K, Iterator[V]) => IterableOnce[U],
+      valueMapFunc: IV => V): (K, Iterator[IV]) => IterableOnce[U] = {
     (k: K, itr: Iterator[IV]) =>
       {
         f(k, itr.map(v => valueMapFunc(v)))
@@ -91,9 +91,9 @@ private[sql] object UdfUtils extends Serializable {
   }
 
   def mapValuesAdaptor[K, V, U, R, IV, IU](
-      f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R],
+      f: (K, Iterator[V], Iterator[U]) => IterableOnce[R],
       valueMapFunc: IV => V,
-      otherValueMapFunc: IU => U): (K, Iterator[IV], Iterator[IU]) => 
TraversableOnce[R] = {
+      otherValueMapFunc: IU => U): (K, Iterator[IV], Iterator[IU]) => 
IterableOnce[R] = {
     (k: K, itr: Iterator[IV], otherItr: Iterator[IU]) =>
       {
         f(k, itr.map(v => valueMapFunc(v)), otherItr.map(u => 
otherValueMapFunc(u)))
@@ -131,7 +131,7 @@ private[sql] object UdfUtils extends Serializable {
 
   def noOp[V, K](): V => K = _ => null.asInstanceOf[K]
 
-  def traversableOnceToSeq[A, B](f: A => TraversableOnce[B]): A => Seq[B] = { 
value =>
+  def iterableOnceToSeq[A, B](f: A => IterableOnce[B]): A => Seq[B] = { value 
=>
     f(value).toSeq
   }
 
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 9018b3d80dc5..3dd2be9c54cf 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -664,7 +664,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) 
extends Logging {
       SerializeFromObject(udf.outputNamedExpression, flatMapGroupsWithState)
     } else {
       val mapped = new MapGroups(
-        udf.function.asInstanceOf[(Any, Iterator[Any]) => 
TraversableOnce[Any]],
+        udf.function.asInstanceOf[(Any, Iterator[Any]) => IterableOnce[Any]],
         udf.inputDeserializer(ds.groupingAttributes),
         ds.valueDeserializer,
         ds.groupingAttributes,
@@ -721,7 +721,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) 
extends Logging {
       rel.getOtherSortingExpressionsList)
 
     val mapped = CoGroup(
-      udf.function.asInstanceOf[(Any, Iterator[Any], Iterator[Any]) => 
TraversableOnce[Any]],
+      udf.function.asInstanceOf[(Any, Iterator[Any], Iterator[Any]) => 
IterableOnce[Any]],
       // The `leftGroup` and `rightGroup` are guaranteed te be of same schema, 
so it's safe to
       // resolve the `keyDeserializer` based on either of them, here we pick 
the left one.
       udf.inputDeserializer(left.groupingAttributes),
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index a41cbb058e9e..7d11e4b157af 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -332,7 +332,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * Aggregate the values of each key, using given combine functions and a 
neutral "zero value".
    * This function can return a different result type, U, than the type of the 
values in this RDD,
    * V. Thus, we need one operation for merging a V into a U and one operation 
for merging two U's,
-   * as in scala.TraversableOnce. The former operation is used for merging 
values within a
+   * as in scala.IterableOnce. The former operation is used for merging values 
within a
    * partition, and the latter is used for merging values between partitions. 
To avoid memory
    * allocation, both of these functions are allowed to modify and return 
their first argument
    * instead of creating a new U.
@@ -347,7 +347,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * Aggregate the values of each key, using given combine functions and a 
neutral "zero value".
    * This function can return a different result type, U, than the type of the 
values in this RDD,
    * V. Thus, we need one operation for merging a V into a U and one operation 
for merging two U's,
-   * as in scala.TraversableOnce. The former operation is used for merging 
values within a
+   * as in scala.IterableOnce. The former operation is used for merging values 
within a
    * partition, and the latter is used for merging values between partitions. 
To avoid memory
    * allocation, both of these functions are allowed to modify and return 
their first argument
    * instead of creating a new U.
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 30189b233aab..af1cc127bcad 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -418,7 +418,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
    * Aggregate the elements of each partition, and then the results for all 
the partitions, using
    * given combine functions and a neutral "zero value". This function can 
return a different result
    * type, U, than the type of this RDD, T. Thus, we need one operation for 
merging a T into an U
-   * and one operation for merging two U's, as in scala.TraversableOnce. Both 
of these functions are
+   * and one operation for merging two U's, as in scala.IterableOnce. Both of 
these functions are
    * allowed to modify and return their first argument instead of creating a 
new U to avoid memory
    * allocation.
    */
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index ba4402f5e88d..0739367ec79d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -149,7 +149,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * Aggregate the values of each key, using given combine functions and a 
neutral "zero value".
    * This function can return a different result type, U, than the type of the 
values in this RDD,
    * V. Thus, we need one operation for merging a V into a U and one operation 
for merging two U's,
-   * as in scala.TraversableOnce. The former operation is used for merging 
values within a
+   * as in scala.IterableOnce. The former operation is used for merging values 
within a
    * partition, and the latter is used for merging values between partitions. 
To avoid memory
    * allocation, both of these functions are allowed to modify and return 
their first argument
    * instead of creating a new U.
@@ -174,7 +174,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * Aggregate the values of each key, using given combine functions and a 
neutral "zero value".
    * This function can return a different result type, U, than the type of the 
values in this RDD,
    * V. Thus, we need one operation for merging a V into a U and one operation 
for merging two U's,
-   * as in scala.TraversableOnce. The former operation is used for merging 
values within a
+   * as in scala.IterableOnce. The former operation is used for merging values 
within a
    * partition, and the latter is used for merging values between partitions. 
To avoid memory
    * allocation, both of these functions are allowed to modify and return 
their first argument
    * instead of creating a new U.
@@ -188,7 +188,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * Aggregate the values of each key, using given combine functions and a 
neutral "zero value".
    * This function can return a different result type, U, than the type of the 
values in this RDD,
    * V. Thus, we need one operation for merging a V into a U and one operation 
for merging two U's,
-   * as in scala.TraversableOnce. The former operation is used for merging 
values within a
+   * as in scala.IterableOnce. The former operation is used for merging values 
within a
    * partition, and the latter is used for merging values between partitions. 
To avoid memory
    * allocation, both of these functions are allowed to modify and return 
their first argument
    * instead of creating a new U.
@@ -757,7 +757,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * Pass each value in the key-value pair RDD through a flatMap function 
without changing the
    * keys; this also retains the original RDD's partitioning.
    */
-  def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = 
self.withScope {
+  def flatMapValues[U](f: V => IterableOnce[U]): RDD[(K, U)] = self.withScope {
     val cleanF = self.context.clean(f)
     new MapPartitionsRDD[(K, U), (K, V)](self,
       (context, pid, iter) => iter.flatMap { case (k, v) =>
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a21d2ae77396..e74e4eac1b87 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -420,7 +420,7 @@ abstract class RDD[T: ClassTag](
    *  Return a new RDD by first applying a function to all elements of this
    *  RDD, and then flattening the results.
    */
-  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
+  def flatMap[U: ClassTag](f: T => IterableOnce[U]): RDD[U] = withScope {
     val cleanF = sc.clean(f)
     new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
   }
@@ -1204,7 +1204,7 @@ abstract class RDD[T: ClassTag](
    * Aggregate the elements of each partition, and then the results for all 
the partitions, using
    * given combine functions and a neutral "zero value". This function can 
return a different result
    * type, U, than the type of this RDD, T. Thus, we need one operation for 
merging a T into an U
-   * and one operation for merging two U's, as in scala.TraversableOnce. Both 
of these functions are
+   * and one operation for merging two U's, as in scala.IterableOnce. Both of 
these functions are
    * allowed to modify and return their first argument instead of creating a 
new U to avoid memory
    * allocation.
    *
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala 
b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
index 1e02638591f8..0960a4741558 100644
--- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -27,7 +27,7 @@ import org.apache.spark.annotation.Since
  *
  * @constructor Initialize the StatCounter with the given values.
  */
-class StatCounter(values: TraversableOnce[Double]) extends Serializable {
+class StatCounter(values: IterableOnce[Double]) extends Serializable {
   private var n: Long = 0     // Running count of our values
   private var mu: Double = 0  // Running mean of our values
   private var m2: Double = 0  // Running variance numerator (sum of (x - 
mean)^2)
@@ -51,7 +51,7 @@ class StatCounter(values: TraversableOnce[Double]) extends 
Serializable {
   }
 
   /** Add multiple values into this StatCounter, updating the internal 
statistics. */
-  def merge(values: TraversableOnce[Double]): StatCounter = {
+  def merge(values: IterableOnce[Double]): StatCounter = {
     values.foreach(v => merge(v))
     this
   }
@@ -155,7 +155,7 @@ class StatCounter(values: TraversableOnce[Double]) extends 
Serializable {
 
 object StatCounter {
   /** Build a StatCounter from a list of values. */
-  def apply(values: TraversableOnce[Double]): StatCounter = new 
StatCounter(values)
+  def apply(values: IterableOnce[Double]): StatCounter = new 
StatCounter(values)
 
   /** Build a StatCounter from a list of values passed as variable-length 
arguments. */
   def apply(values: Double*): StatCounter = new StatCounter(values)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index dcffa99dc64c..f8decbcff5f4 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -916,7 +916,7 @@ private[spark] object Utils
    * result in a new collection. Unlike scala.util.Random.shuffle, this method
    * uses a local random number generator, avoiding inter-thread contention.
    */
-  def randomize[T: ClassTag](seq: TraversableOnce[T]): Seq[T] = {
+  def randomize[T: ClassTag](seq: IterableOnce[T]): Seq[T] = {
     randomizeInPlace(seq.toArray)
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
index 9d5f1aac3391..8d9fb85309b1 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
@@ -81,7 +81,7 @@ private[spark] class CompactBuffer[T: ClassTag] extends 
Seq[T] with Serializable
     this
   }
 
-  def ++= (values: TraversableOnce[T]): CompactBuffer[T] = {
+  def ++= (values: IterableOnce[T]): CompactBuffer[T] = {
     values match {
       // Optimize merging of CompactBuffers, used in cogroup and groupByKey
       case compactBuf: CompactBuffer[T] =>
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
index 151d6c8268d4..436899448d63 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
@@ -50,7 +50,7 @@ private[spark] object Utils extends SparkCollectionUtils {
    * Callers must ensure that all the input iterators are already sorted by
    * the same ordering `ord`, otherwise the result is likely to be incorrect.
    */
-  def mergeOrdered[T](inputs: Iterable[TraversableOnce[T]])(
+  def mergeOrdered[T](inputs: Iterable[IterableOnce[T]])(
     implicit ord: Ordering[T]): Iterator[T] = {
     val ordering = new GuavaOrdering[T] {
       override def compare(l: T, r: T): Int = ord.compare(l, r)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index b1dcb465b477..e232dfc05faa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -275,13 +275,13 @@ case class UnresolvedGenerator(name: FunctionIdentifier, 
children: Seq[Expressio
   override def prettyName: String = name.unquotedString
   override def toString: String = s"'$name(${children.mkString(", ")})"
 
-  override def eval(input: InternalRow = null): TraversableOnce[InternalRow] =
+  override def eval(input: InternalRow = null): IterableOnce[InternalRow] =
     throw QueryExecutionErrors.cannotEvaluateExpressionError(this)
 
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
     throw QueryExecutionErrors.cannotGenerateCodeForExpressionError(this)
 
-  override def terminate(): TraversableOnce[InternalRow] =
+  override def terminate(): IterableOnce[InternalRow] =
     throw QueryExecutionErrors.cannotTerminateGeneratorError(this)
 
   override protected def withNewChildrenInternal(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 39e0f836e240..5f85716fa283 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -415,7 +415,7 @@ package object dsl {
 
       def cogroup[Key: Encoder, Left: Encoder, Right: Encoder, Result: 
Encoder](
           otherPlan: LogicalPlan,
-          func: (Key, Iterator[Left], Iterator[Right]) => 
TraversableOnce[Result],
+          func: (Key, Iterator[Left], Iterator[Right]) => IterableOnce[Result],
           leftGroup: Seq[Attribute],
           rightGroup: Seq[Attribute],
           leftAttr: Seq[Attribute],
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
index bc74572444c8..539505543a40 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
@@ -146,7 +146,7 @@ case class PythonUDAF(
 }
 
 abstract class UnevaluableGenerator extends Generator {
-  final override def eval(input: InternalRow): TraversableOnce[InternalRow] =
+  final override def eval(input: InternalRow): IterableOnce[InternalRow] =
     throw QueryExecutionErrors.cannotEvaluateExpressionError(this)
 
   final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 3b44add3b1f0..49cf01d472eb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -66,13 +66,13 @@ trait Generator extends Expression {
   def elementSchema: StructType
 
   /** Should be implemented by child classes to perform specific Generators. */
-  override def eval(input: InternalRow): TraversableOnce[InternalRow]
+  override def eval(input: InternalRow): IterableOnce[InternalRow]
 
   /**
    * Notifies that there are no more rows to process, clean up code, and 
additional
    * rows can be made here.
    */
-  def terminate(): TraversableOnce[InternalRow] = Nil
+  def terminate(): IterableOnce[InternalRow] = Nil
 
   /**
    * Check if this generator supports code generation.
@@ -100,7 +100,7 @@ trait CollectionGenerator extends Generator {
  */
 case class UserDefinedGenerator(
     elementSchema: StructType,
-    function: Row => TraversableOnce[InternalRow],
+    function: Row => IterableOnce[InternalRow],
     children: Seq[Expression])
   extends Generator with CodegenFallback {
 
@@ -117,7 +117,7 @@ case class UserDefinedGenerator(
     }.asInstanceOf[InternalRow => Row]
   }
 
-  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+  override def eval(input: InternalRow): IterableOnce[InternalRow] = {
     if (inputRow == null) {
       initializeConverters()
     }
@@ -232,7 +232,7 @@ case class Stack(children: Seq[Expression]) extends 
Generator {
       case (e, index) => StructField(s"col$index", e.dataType)
     })
 
-  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+  override def eval(input: InternalRow): IterableOnce[InternalRow] = {
     val values = children.tail.map(_.eval(input)).toArray
     for (row <- 0 until numRows) yield {
       val fields = new Array[Any](numFields)
@@ -290,7 +290,7 @@ case class ReplicateRows(children: Seq[Expression]) extends 
Generator with Codeg
       case (e, index) => StructField(s"col$index", e.dataType)
     })
 
-  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+  override def eval(input: InternalRow): IterableOnce[InternalRow] = {
     val numRows = children.head.eval(input).asInstanceOf[Long]
     val values = children.tail.map(_.eval(input)).toArray
     Range.Long(0, numRows, 1).map { _ =>
@@ -311,7 +311,7 @@ case class ReplicateRows(children: Seq[Expression]) extends 
Generator with Codeg
  * such as explode_outer. This expression gets replaced during analysis.
  */
 case class GeneratorOuter(child: Generator) extends UnaryExpression with 
Generator {
-  final override def eval(input: InternalRow = null): 
TraversableOnce[InternalRow] =
+  final override def eval(input: InternalRow = null): 
IterableOnce[InternalRow] =
     throw QueryExecutionErrors.cannotEvaluateExpressionError(this)
 
   final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
@@ -369,7 +369,7 @@ abstract class ExplodeBase extends UnaryExpression with 
CollectionGenerator with
       }
   }
 
-  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+  override def eval(input: InternalRow): IterableOnce[InternalRow] = {
     child.dataType match {
       case ArrayType(et, _) =>
         val inputArray = child.eval(input).asInstanceOf[ArrayData]
@@ -571,7 +571,7 @@ case class Inline(child: Expression) extends 
UnaryExpression with CollectionGene
 
   private lazy val generatorNullRow = new 
GenericInternalRow(elementSchema.length)
 
-  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+  override def eval(input: InternalRow): IterableOnce[InternalRow] = {
     val inputArray = child.eval(input).asInstanceOf[ArrayData]
     if (inputArray == null) {
       Nil
@@ -605,7 +605,7 @@ case class SQLKeywords() extends LeafExpression with 
Generator with CodegenFallb
     .add("keyword", StringType, nullable = false)
     .add("reserved", BooleanType, nullable = false)
 
-  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+  override def eval(input: InternalRow): IterableOnce[InternalRow] = {
     val reservedList = getReservedList()
     keywords.zip(reservedList).map { case (keyword, isReserved) =>
       InternalRow(UTF8String.fromString(keyword), isReserved)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 435c27a6e00d..e7df542ddab8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -407,7 +407,7 @@ case class JsonTuple(children: Seq[Expression])
     }
   }
 
-  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+  override def eval(input: InternalRow): IterableOnce[InternalRow] = {
     val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
     if (json == null) {
       return nullRow
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index 9bf8db0b4faf..772984549694 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -401,13 +401,13 @@ case class AppendColumnsWithObject(
 /** Factory for constructing new `MapGroups` nodes. */
 object MapGroups {
   def apply[K : Encoder, T : Encoder, U : Encoder](
-      func: (K, Iterator[T]) => TraversableOnce[U],
+      func: (K, Iterator[T]) => IterableOnce[U],
       groupingAttributes: Seq[Attribute],
       dataAttributes: Seq[Attribute],
       dataOrder: Seq[SortOrder],
       child: LogicalPlan): LogicalPlan = {
     val mapped = new MapGroups(
-      func.asInstanceOf[(Any, Iterator[Any]) => TraversableOnce[Any]],
+      func.asInstanceOf[(Any, Iterator[Any]) => IterableOnce[Any]],
       UnresolvedDeserializer(encoderFor[K].deserializer, groupingAttributes),
       UnresolvedDeserializer(encoderFor[T].deserializer, dataAttributes),
       groupingAttributes,
@@ -436,7 +436,7 @@ object MapGroups {
  * @param valueDeserializer used to extract the items in the iterator from an 
input row.
  */
 case class MapGroups(
-    func: (Any, Iterator[Any]) => TraversableOnce[Any],
+    func: (Any, Iterator[Any]) => IterableOnce[Any],
     keyDeserializer: Expression,
     valueDeserializer: Expression,
     groupingAttributes: Seq[Attribute],
@@ -662,7 +662,7 @@ case class FlatMapGroupsInRWithArrow(
 /** Factory for constructing new `CoGroup` nodes. */
 object CoGroup {
   def apply[K : Encoder, L : Encoder, R : Encoder, OUT : Encoder](
-      func: (K, Iterator[L], Iterator[R]) => TraversableOnce[OUT],
+      func: (K, Iterator[L], Iterator[R]) => IterableOnce[OUT],
       leftGroup: Seq[Attribute],
       rightGroup: Seq[Attribute],
       leftAttr: Seq[Attribute],
@@ -674,7 +674,7 @@ object CoGroup {
     require(DataTypeUtils.fromAttributes(leftGroup) == 
DataTypeUtils.fromAttributes(rightGroup))
 
     val cogrouped = CoGroup(
-      func.asInstanceOf[(Any, Iterator[Any], Iterator[Any]) => 
TraversableOnce[Any]],
+      func.asInstanceOf[(Any, Iterator[Any], Iterator[Any]) => 
IterableOnce[Any]],
       // The `leftGroup` and `rightGroup` are guaranteed te be of same schema, 
so it's safe to
       // resolve the `keyDeserializer` based on either of them, here we pick 
the left one.
       UnresolvedDeserializer(encoderFor[K].deserializer, leftGroup),
@@ -698,7 +698,7 @@ object CoGroup {
  * right children.
  */
 case class CoGroup(
-    func: (Any, Iterator[Any], Iterator[Any]) => TraversableOnce[Any],
+    func: (Any, Iterator[Any], Iterator[Any]) => IterableOnce[Any],
     keyDeserializer: Expression,
     leftDeserializer: Expression,
     rightDeserializer: Expression,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 39ce5e77e28a..a34ad10f36ab 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -261,7 +261,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
    * Returns a Seq by applying a function to all nodes in this tree and using 
the elements of the
    * resulting collections.
    */
-  def flatMap[A](f: BaseType => TraversableOnce[A]): Seq[A] = {
+  def flatMap[A](f: BaseType => IterableOnce[A]): Seq[A] = {
     val ret = new collection.mutable.ArrayBuffer[A]()
     foreach(ret ++= f(_))
     ret.toSeq
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
index 03ab04f23487..49c6625851dd 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.types._
 
 class GeneratorExpressionSuite extends SparkFunSuite with ExpressionEvalHelper 
{
   private def checkTuple(actual: Expression, expected: Seq[InternalRow]): Unit 
= {
-    assert(actual.eval(null).asInstanceOf[TraversableOnce[InternalRow]].toSeq 
=== expected)
+    assert(actual.eval(null).asInstanceOf[IterableOnce[InternalRow]].toSeq === 
expected)
   }
 
   private final val empty_array = CreateArray(Seq.empty)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 174f47a3b82b..a3f8a2cf4a05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2676,7 +2676,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   @deprecated("use flatMap() or select() with functions.explode() instead", 
"2.0.0")
-  def explode[A <: Product : TypeTag](input: Column*)(f: Row => 
TraversableOnce[A]): DataFrame = {
+  def explode[A <: Product : TypeTag](input: Column*)(f: Row => 
IterableOnce[A]): DataFrame = {
     val elementSchema = 
ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
 
     val convert = 
CatalystTypeConverters.createToCatalystConverter(elementSchema)
@@ -2713,14 +2713,14 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   @deprecated("use flatMap() or select() with functions.explode() instead", 
"2.0.0")
-  def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A 
=> TraversableOnce[B])
+  def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A 
=> IterableOnce[B])
     : DataFrame = {
     val dataType = ScalaReflection.schemaFor[B].dataType
     val attributes = AttributeReference(outputColumn, dataType)() :: Nil
     // TODO handle the metadata?
     val elementSchema = attributes.toStructType
 
-    def rowFunction(row: Row): TraversableOnce[InternalRow] = {
+    def rowFunction(row: Row): IterableOnce[InternalRow] = {
       val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
       f(row(0).asInstanceOf[A]).map(o => InternalRow(convert(o)))
     }
@@ -3510,7 +3510,7 @@ class Dataset[T] private[sql](
    * @group typedrel
    * @since 1.6.0
    */
-  def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] =
+  def flatMap[U : Encoder](func: T => IterableOnce[U]): Dataset[U] =
     mapPartitions(_.flatMap(func))
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 2239a128a2c4..ef0a3e0266c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -138,7 +138,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    *
    * @since 1.6.0
    */
-  def flatMapGroups[U : Encoder](f: (K, Iterator[V]) => TraversableOnce[U]): 
Dataset[U] = {
+  def flatMapGroups[U : Encoder](f: (K, Iterator[V]) => IterableOnce[U]): 
Dataset[U] = {
     Dataset[U](
       sparkSession,
       MapGroups(
@@ -198,7 +198,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    */
   def flatMapSortedGroups[U : Encoder](
       sortExprs: Column*)(
-      f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = {
+      f: (K, Iterator[V]) => IterableOnce[U]): Dataset[U] = {
     val sortOrder: Seq[SortOrder] = MapGroups.sortOrder(sortExprs.map(_.expr))
 
     Dataset[U](
@@ -807,7 +807,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    */
   def cogroup[U, R : Encoder](
       other: KeyValueGroupedDataset[K, U])(
-      f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
+      f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] = {
     implicit val uEncoder = other.vExprEnc
     Dataset[R](
       sparkSession,
@@ -857,7 +857,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
       other: KeyValueGroupedDataset[K, U])(
       thisSortExprs: Column*)(
       otherSortExprs: Column*)(
-      f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
+      f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] = {
     def toSortOrder(col: Column): SortOrder = col.expr match {
       case expr: SortOrder => expr
       case expr: Expression => SortOrder(expr, Ascending)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index b99361437e0d..11d0448e026f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types._
  * For lazy computing, be sure the generator.terminate() called in the very 
last
  * TODO reusing the CompletionIterator?
  */
-private[execution] sealed case class LazyIterator(func: () => 
TraversableOnce[InternalRow])
+private[execution] sealed case class LazyIterator(func: () => 
IterableOnce[InternalRow])
   extends Iterator[InternalRow] {
 
   lazy val results: Iterator[InternalRow] = func().toIterator
@@ -147,7 +147,7 @@ case class GenerateExec(
     }.map(_._2)
     boundGenerator match {
       case e: CollectionGenerator => codeGenCollection(ctx, e, requiredInput)
-      case g => codeGenTraversableOnce(ctx, g, requiredInput)
+      case g => codeGenIterableOnce(ctx, g, requiredInput)
     }
   }
 
@@ -239,9 +239,9 @@ case class GenerateExec(
   }
 
   /**
-   * Generate code for a regular [[TraversableOnce]] returning [[Generator]].
+   * Generate code for a regular [[IterableOnce]] returning [[Generator]].
    */
-  private def codeGenTraversableOnce(
+  private def codeGenIterableOnce(
       ctx: CodegenContext,
       e: Expression,
       requiredInput: Seq[ExprCode]): String = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala
index c58d925f28e5..afbb1f0e5a37 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala
@@ -70,7 +70,7 @@ trait AdaptiveSparkPlanHelper {
    * Returns a Seq by applying a function to all nodes in this tree and using 
the elements of the
    * resulting collections.
    */
-  def flatMap[A](p: SparkPlan)(f: SparkPlan => TraversableOnce[A]): Seq[A] = {
+  def flatMap[A](p: SparkPlan)(f: SparkPlan => IterableOnce[A]): Seq[A] = {
     val ret = new collection.mutable.ArrayBuffer[A]()
     foreach(p)(ret ++= f(_))
     ret.toSeq
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 10878c810690..31bfa5aff351 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -392,7 +392,7 @@ case class AppendColumnsWithObjectExec(
  * The result of this function is flattened before being output.
  */
 case class MapGroupsExec(
-    func: (Any, Iterator[Any]) => TraversableOnce[Any],
+    func: (Any, Iterator[Any]) => IterableOnce[Any],
     keyDeserializer: Expression,
     valueDeserializer: Expression,
     groupingAttributes: Seq[Attribute],
@@ -432,7 +432,7 @@ case class MapGroupsExec(
 
 object MapGroupsExec {
   def apply(
-      func: (Any, Iterator[Any], LogicalGroupState[Any]) => 
TraversableOnce[Any],
+      func: (Any, Iterator[Any], LogicalGroupState[Any]) => IterableOnce[Any],
       keyDeserializer: Expression,
       valueDeserializer: Expression,
       groupingAttributes: Seq[Attribute],
@@ -615,7 +615,7 @@ case class FlatMapGroupsInRWithArrowExec(
  * The result of this function is flattened before being output.
  */
 case class CoGroupExec(
-    func: (Any, Iterator[Any], Iterator[Any]) => TraversableOnce[Any],
+    func: (Any, Iterator[Any], Iterator[Any]) => IterableOnce[Any],
     keyDeserializer: Expression,
     leftDeserializer: Expression,
     rightDeserializer: Expression,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index fa0744dc19b2..f81fac8e892b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -72,7 +72,7 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: 
SQLContext) extends Spa
     addData(data.toTraversable)
   }
 
-  def addData(data: TraversableOnce[A]): OffsetV2
+  def addData(data: IterableOnce[A]): OffsetV2
 
   def fullSchema(): StructType = encoder.schema
 
@@ -188,7 +188,7 @@ case class MemoryStream[A : Encoder](
   @GuardedBy("this")
   protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
 
-  def addData(data: TraversableOnce[A]): Offset = {
+  def addData(data: IterableOnce[A]): Offset = {
     val objects = data.toSeq
     val rows = objects.iterator.map(d => 
toRow(d).copy().asInstanceOf[UnsafeRow]).toArray
     logDebug(s"Adding: $objects")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index d0cf602c7cca..028884e8f34c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -56,7 +56,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext, numPa
   private val recordEndpoint = new ContinuousRecordEndpoint(records, this)
   @volatile private var endpointRef: RpcEndpointRef = _
 
-  def addData(data: TraversableOnce[A]): Offset = synchronized {
+  def addData(data: IterableOnce[A]): Offset = synchronized {
     // Distribute data evenly among partition lists.
     data.toSeq.zipWithIndex.map {
       case (item, index) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index 68f63feb5c51..e228d4792d14 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -547,7 +547,7 @@ class GeneratorFunctionSuite extends QueryTest with 
SharedSparkSession {
 
 case class EmptyGenerator() extends Generator with LeafLike[Expression] {
   override def elementSchema: StructType = new StructType().add("id", 
IntegerType)
-  override def eval(input: InternalRow): TraversableOnce[InternalRow] = 
Seq.empty
+  override def eval(input: InternalRow): IterableOnce[InternalRow] = Seq.empty
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
     val iteratorClass = classOf[Iterator[_]].getName
     ev.copy(code =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 9452209e3184..6efdb676ccbd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -239,7 +239,7 @@ private[hive] case class HiveGenericUDTF(
   @transient
   private lazy val inputProjection = new InterpretedProjection(children)
 
-  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+  override def eval(input: InternalRow): IterableOnce[InternalRow] = {
     outputInspector // Make sure initialized.
     function.process(wrap(inputProjection(input), wrappers, udtInput, 
inputDataTypes))
     collector.collectRows()
@@ -262,7 +262,7 @@ private[hive] case class HiveGenericUDTF(
     }
   }
 
-  override def terminate(): TraversableOnce[InternalRow] = {
+  override def terminate(): IterableOnce[InternalRow] = {
     outputInspector // Make sure initialized.
     function.close()
     collector.collectRows()
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index ca4f3670d5ad..ffa65a739de2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -552,7 +552,7 @@ abstract class DStream[T: ClassTag] (
    * Return a new DStream by applying a function to all elements of this 
DStream,
    * and then flattening the results
    */
-  def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = 
ssc.withScope {
+  def flatMap[U: ClassTag](flatMapFunc: T => IterableOnce[U]): DStream[U] = 
ssc.withScope {
     new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
   }
 
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
index f5b1e5f3a145..aca33295fbb1 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -25,7 +25,7 @@ import org.apache.spark.streaming.{Duration, Time}
 private[streaming]
 class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
     parent: DStream[(K, V)],
-    flatMapValueFunc: V => TraversableOnce[U]
+    flatMapValueFunc: V => IterableOnce[U]
   ) extends DStream[(K, U)](parent.ssc) {
 
   override def dependencies: List[DStream[_]] = List(parent)
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index d60a6179782e..b87ba7dc1207 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -25,7 +25,7 @@ import org.apache.spark.streaming.{Duration, Time}
 private[streaming]
 class FlatMappedDStream[T: ClassTag, U: ClassTag](
     parent: DStream[T],
-    flatMapFunc: T => TraversableOnce[U]
+    flatMapFunc: T => IterableOnce[U]
   ) extends DStream[U](parent.ssc) {
 
   override def dependencies: List[DStream[_]] = List(parent)
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index a5bed752bd66..81904ea633a4 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -548,7 +548,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
    * 'this' DStream without changing the key.
    */
   def flatMapValues[U: ClassTag](
-      flatMapValuesFunc: V => TraversableOnce[U]
+      flatMapValuesFunc: V => IterableOnce[U]
     ): DStream[(K, U)] = ssc.withScope {
     new FlatMapValuedDStream[K, V, U](self, 
sparkContext.clean(flatMapValuesFunc))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to