This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b70c88bbef0c [SPARK-45384][CORE][SQL][SS][DSTREAM][CONNECT] Replace
`TraversableOnce` with `IterableOnce`
b70c88bbef0c is described below
commit b70c88bbef0cedc6a4b591f92cbb7fcc869667d5
Author: yangjie01 <[email protected]>
AuthorDate: Sun Oct 1 21:41:21 2023 +0800
[SPARK-45384][CORE][SQL][SS][DSTREAM][CONNECT] Replace `TraversableOnce`
with `IterableOnce`
### What changes were proposed in this pull request?
This pr replace `TraversableOnce` with `IterableOnce` because
`TraversableOnce` has been marked as deprecated after Scala 2.13.0.
```
deprecated("Use IterableOnce instead of TraversableOnce", "2.13.0")
type TraversableOnce[+A] = scala.collection.IterableOnce[A]
```
Additionally, this PR renames two functions:
- rename `UdfUtils#traversableOnceToSeq` to `UdfUtils#iterableOnceToSeq`
- rename `GenerateExec#codeGenTraversableOnce` to
`GenerateExec#codeGenIterableOnce`
### Why are the changes needed?
Clean up deprecated Scala API usage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GitHub Actions
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43186 from LuciferYang/SPARK-45384.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 10 +++++-----
.../apache/spark/sql/KeyValueGroupedDataset.scala | 12 ++++++------
.../org/apache/spark/sql/ClientE2ETestSuite.scala | 2 +-
.../apache/spark/sql/connect/common/UdfUtils.scala | 20 ++++++++++----------
.../sql/connect/planner/SparkConnectPlanner.scala | 4 ++--
.../org/apache/spark/api/java/JavaPairRDD.scala | 4 ++--
.../org/apache/spark/api/java/JavaRDDLike.scala | 2 +-
.../org/apache/spark/rdd/PairRDDFunctions.scala | 8 ++++----
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 ++--
.../scala/org/apache/spark/util/StatCounter.scala | 6 +++---
.../src/main/scala/org/apache/spark/util/Utils.scala | 2 +-
.../apache/spark/util/collection/CompactBuffer.scala | 2 +-
.../org/apache/spark/util/collection/Utils.scala | 2 +-
.../spark/sql/catalyst/analysis/unresolved.scala | 4 ++--
.../org/apache/spark/sql/catalyst/dsl/package.scala | 2 +-
.../spark/sql/catalyst/expressions/PythonUDF.scala | 2 +-
.../spark/sql/catalyst/expressions/generators.scala | 20 ++++++++++----------
.../sql/catalyst/expressions/jsonExpressions.scala | 2 +-
.../spark/sql/catalyst/plans/logical/object.scala | 12 ++++++------
.../apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +-
.../expressions/GeneratorExpressionSuite.scala | 2 +-
.../main/scala/org/apache/spark/sql/Dataset.scala | 8 ++++----
.../apache/spark/sql/KeyValueGroupedDataset.scala | 8 ++++----
.../apache/spark/sql/execution/GenerateExec.scala | 8 ++++----
.../execution/adaptive/AdaptiveSparkPlanHelper.scala | 2 +-
.../org/apache/spark/sql/execution/objects.scala | 6 +++---
.../spark/sql/execution/streaming/memory.scala | 4 ++--
.../streaming/sources/ContinuousMemoryStream.scala | 2 +-
.../apache/spark/sql/GeneratorFunctionSuite.scala | 2 +-
.../scala/org/apache/spark/sql/hive/hiveUDFs.scala | 4 ++--
.../org/apache/spark/streaming/dstream/DStream.scala | 2 +-
.../streaming/dstream/FlatMapValuedDStream.scala | 2 +-
.../spark/streaming/dstream/FlatMappedDStream.scala | 2 +-
.../streaming/dstream/PairDStreamFunctions.scala | 2 +-
34 files changed, 88 insertions(+), 88 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index eb5460cbc86a..a1e57226e530 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2733,7 +2733,7 @@ class Dataset[T] private[sql] (
* @group typedrel
* @since 3.5.0
*/
- def flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U] =
+ def flatMap[U: Encoder](func: T => IterableOnce[U]): Dataset[U] =
mapPartitions(UdfUtils.flatMapFuncToMapPartitionsAdaptor(func))
/**
@@ -2775,9 +2775,9 @@ class Dataset[T] private[sql] (
* @since 3.5.0
*/
@deprecated("use flatMap() or select() with functions.explode() instead",
"3.5.0")
- def explode[A <: Product: TypeTag](input: Column*)(f: Row =>
TraversableOnce[A]): DataFrame = {
+ def explode[A <: Product: TypeTag](input: Column*)(f: Row =>
IterableOnce[A]): DataFrame = {
val generator = ScalarUserDefinedFunction(
- UdfUtils.traversableOnceToSeq(f),
+ UdfUtils.iterableOnceToSeq(f),
UnboundRowEncoder :: Nil,
ScalaReflection.encoderFor[Seq[A]])
select(col("*"), functions.inline(generator(struct(input: _*))))
@@ -2807,9 +2807,9 @@ class Dataset[T] private[sql] (
*/
@deprecated("use flatMap() or select() with functions.explode() instead",
"3.5.0")
def explode[A, B: TypeTag](inputColumn: String, outputColumn: String)(
- f: A => TraversableOnce[B]): DataFrame = {
+ f: A => IterableOnce[B]): DataFrame = {
val generator = ScalarUserDefinedFunction(
- UdfUtils.traversableOnceToSeq(f),
+ UdfUtils.iterableOnceToSeq(f),
Nil,
ScalaReflection.encoderFor[Seq[B]])
select(col("*"),
functions.explode(generator(col(inputColumn))).as((outputColumn)))
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 9e7d8e1320ef..dac89bf3eb5a 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -111,7 +111,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends
Serializable {
*
* @since 3.5.0
*/
- def flatMapGroups[U: Encoder](f: (K, Iterator[V]) => TraversableOnce[U]):
Dataset[U] = {
+ def flatMapGroups[U: Encoder](f: (K, Iterator[V]) => IterableOnce[U]):
Dataset[U] = {
flatMapSortedGroups()(f)
}
@@ -162,7 +162,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends
Serializable {
* @since 3.5.0
*/
def flatMapSortedGroups[U: Encoder](sortExprs: Column*)(
- f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = {
+ f: (K, Iterator[V]) => IterableOnce[U]): Dataset[U] = {
throw new UnsupportedOperationException
}
@@ -397,7 +397,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends
Serializable {
* @since 3.5.0
*/
def cogroup[U, R: Encoder](other: KeyValueGroupedDataset[K, U])(
- f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
+ f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] = {
cogroupSorted(other)()()(f)
}
@@ -433,7 +433,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends
Serializable {
*/
def cogroupSorted[U, R: Encoder](other: KeyValueGroupedDataset[K,
U])(thisSortExprs: Column*)(
otherSortExprs: Column*)(
- f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
+ f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] = {
throw new UnsupportedOperationException
}
@@ -865,7 +865,7 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
}
override def flatMapSortedGroups[U: Encoder](sortExprs: Column*)(
- f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = {
+ f: (K, Iterator[V]) => IterableOnce[U]): Dataset[U] = {
// Apply mapValues changes to the udf
val nf =
if (valueMapFunc == UdfUtils.identical()) f else
UdfUtils.mapValuesAdaptor(f, valueMapFunc)
@@ -881,7 +881,7 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
override def cogroupSorted[U, R: Encoder](other: KeyValueGroupedDataset[K,
U])(
thisSortExprs: Column*)(otherSortExprs: Column*)(
- f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
+ f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] = {
assert(other.isInstanceOf[KeyValueGroupedDatasetImpl[K, U, _, _]])
val otherImpl = other.asInstanceOf[KeyValueGroupedDatasetImpl[K, U, _, _]]
// Apply mapValues changes to the udf
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 45a3eee55565..85d98babcf9a 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -627,7 +627,7 @@ class ClientE2ETestSuite extends RemoteSparkSession with
SQLHelper with PrivateM
}
test("Dataset result collection") {
- def checkResult(rows: TraversableOnce[java.lang.Long], expectedValues:
Long*): Unit = {
+ def checkResult(rows: IterableOnce[java.lang.Long], expectedValues:
Long*): Unit = {
rows.toIterator.zipAll(expectedValues.iterator, null, null).foreach {
case (actual, expected) => assert(actual === expected)
}
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
index d7be199f033e..7050e62d549c 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
@@ -45,7 +45,7 @@ private[sql] object UdfUtils extends Serializable {
}
def flatMapFuncToMapPartitionsAdaptor[T, U](
- f: T => TraversableOnce[U]): Iterator[T] => Iterator[U] = _.flatMap(f)
+ f: T => IterableOnce[U]): Iterator[T] => Iterator[U] = _.flatMap(f)
def filterFuncToScalaFunc[T](f: FilterFunction[T]): T => Boolean = f.call
@@ -62,28 +62,28 @@ private[sql] object UdfUtils extends Serializable {
def foreachBatchFuncToScalaFunc[D](f: VoidFunction2[D, java.lang.Long]): (D,
Long) => Unit =
(d, i) => f.call(d, i)
- def flatMapFuncToScalaFunc[T, U](f: FlatMapFunction[T, U]): T =>
TraversableOnce[U] = x =>
+ def flatMapFuncToScalaFunc[T, U](f: FlatMapFunction[T, U]): T =>
IterableOnce[U] = x =>
f.call(x).asScala
def flatMapGroupsFuncToScalaFunc[K, V, U](
- f: FlatMapGroupsFunction[K, V, U]): (K, Iterator[V]) =>
TraversableOnce[U] = (key, data) =>
+ f: FlatMapGroupsFunction[K, V, U]): (K, Iterator[V]) => IterableOnce[U]
= (key, data) =>
f.call(key, data.asJava).asScala
def mapGroupsFuncToScalaFunc[K, V, U](f: MapGroupsFunction[K, V, U]): (K,
Iterator[V]) => U =
(key, data) => f.call(key, data.asJava)
def coGroupFunctionToScalaFunc[K, V, U, R](
- f: CoGroupFunction[K, V, U, R]): (K, Iterator[V], Iterator[U]) =>
TraversableOnce[R] =
+ f: CoGroupFunction[K, V, U, R]): (K, Iterator[V], Iterator[U]) =>
IterableOnce[R] =
(key, left, right) => f.call(key, left.asJava, right.asJava).asScala
def mapGroupsFuncToFlatMapAdaptor[K, V, U](
- f: (K, Iterator[V]) => U): (K, Iterator[V]) => TraversableOnce[U] = {
+ f: (K, Iterator[V]) => U): (K, Iterator[V]) => IterableOnce[U] = {
(key: K, it: Iterator[V]) => Iterator(f(key, it))
}
def mapValuesAdaptor[K, V, U, IV](
- f: (K, Iterator[V]) => TraversableOnce[U],
- valueMapFunc: IV => V): (K, Iterator[IV]) => TraversableOnce[U] = {
+ f: (K, Iterator[V]) => IterableOnce[U],
+ valueMapFunc: IV => V): (K, Iterator[IV]) => IterableOnce[U] = {
(k: K, itr: Iterator[IV]) =>
{
f(k, itr.map(v => valueMapFunc(v)))
@@ -91,9 +91,9 @@ private[sql] object UdfUtils extends Serializable {
}
def mapValuesAdaptor[K, V, U, R, IV, IU](
- f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R],
+ f: (K, Iterator[V], Iterator[U]) => IterableOnce[R],
valueMapFunc: IV => V,
- otherValueMapFunc: IU => U): (K, Iterator[IV], Iterator[IU]) =>
TraversableOnce[R] = {
+ otherValueMapFunc: IU => U): (K, Iterator[IV], Iterator[IU]) =>
IterableOnce[R] = {
(k: K, itr: Iterator[IV], otherItr: Iterator[IU]) =>
{
f(k, itr.map(v => valueMapFunc(v)), otherItr.map(u =>
otherValueMapFunc(u)))
@@ -131,7 +131,7 @@ private[sql] object UdfUtils extends Serializable {
def noOp[V, K](): V => K = _ => null.asInstanceOf[K]
- def traversableOnceToSeq[A, B](f: A => TraversableOnce[B]): A => Seq[B] = {
value =>
+ def iterableOnceToSeq[A, B](f: A => IterableOnce[B]): A => Seq[B] = { value
=>
f(value).toSeq
}
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 9018b3d80dc5..3dd2be9c54cf 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -664,7 +664,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder)
extends Logging {
SerializeFromObject(udf.outputNamedExpression, flatMapGroupsWithState)
} else {
val mapped = new MapGroups(
- udf.function.asInstanceOf[(Any, Iterator[Any]) =>
TraversableOnce[Any]],
+ udf.function.asInstanceOf[(Any, Iterator[Any]) => IterableOnce[Any]],
udf.inputDeserializer(ds.groupingAttributes),
ds.valueDeserializer,
ds.groupingAttributes,
@@ -721,7 +721,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder)
extends Logging {
rel.getOtherSortingExpressionsList)
val mapped = CoGroup(
- udf.function.asInstanceOf[(Any, Iterator[Any], Iterator[Any]) =>
TraversableOnce[Any]],
+ udf.function.asInstanceOf[(Any, Iterator[Any], Iterator[Any]) =>
IterableOnce[Any]],
// The `leftGroup` and `rightGroup` are guaranteed te be of same schema,
so it's safe to
// resolve the `keyDeserializer` based on either of them, here we pick
the left one.
udf.inputDeserializer(left.groupingAttributes),
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index a41cbb058e9e..7d11e4b157af 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -332,7 +332,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Aggregate the values of each key, using given combine functions and a
neutral "zero value".
* This function can return a different result type, U, than the type of the
values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation
for merging two U's,
- * as in scala.TraversableOnce. The former operation is used for merging
values within a
+ * as in scala.IterableOnce. The former operation is used for merging values
within a
* partition, and the latter is used for merging values between partitions.
To avoid memory
* allocation, both of these functions are allowed to modify and return
their first argument
* instead of creating a new U.
@@ -347,7 +347,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Aggregate the values of each key, using given combine functions and a
neutral "zero value".
* This function can return a different result type, U, than the type of the
values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation
for merging two U's,
- * as in scala.TraversableOnce. The former operation is used for merging
values within a
+ * as in scala.IterableOnce. The former operation is used for merging values
within a
* partition, and the latter is used for merging values between partitions.
To avoid memory
* allocation, both of these functions are allowed to modify and return
their first argument
* instead of creating a new U.
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 30189b233aab..af1cc127bcad 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -418,7 +418,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends
Serializable {
* Aggregate the elements of each partition, and then the results for all
the partitions, using
* given combine functions and a neutral "zero value". This function can
return a different result
* type, U, than the type of this RDD, T. Thus, we need one operation for
merging a T into an U
- * and one operation for merging two U's, as in scala.TraversableOnce. Both
of these functions are
+ * and one operation for merging two U's, as in scala.IterableOnce. Both of
these functions are
* allowed to modify and return their first argument instead of creating a
new U to avoid memory
* allocation.
*/
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index ba4402f5e88d..0739367ec79d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -149,7 +149,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Aggregate the values of each key, using given combine functions and a
neutral "zero value".
* This function can return a different result type, U, than the type of the
values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation
for merging two U's,
- * as in scala.TraversableOnce. The former operation is used for merging
values within a
+ * as in scala.IterableOnce. The former operation is used for merging values
within a
* partition, and the latter is used for merging values between partitions.
To avoid memory
* allocation, both of these functions are allowed to modify and return
their first argument
* instead of creating a new U.
@@ -174,7 +174,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Aggregate the values of each key, using given combine functions and a
neutral "zero value".
* This function can return a different result type, U, than the type of the
values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation
for merging two U's,
- * as in scala.TraversableOnce. The former operation is used for merging
values within a
+ * as in scala.IterableOnce. The former operation is used for merging values
within a
* partition, and the latter is used for merging values between partitions.
To avoid memory
* allocation, both of these functions are allowed to modify and return
their first argument
* instead of creating a new U.
@@ -188,7 +188,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Aggregate the values of each key, using given combine functions and a
neutral "zero value".
* This function can return a different result type, U, than the type of the
values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation
for merging two U's,
- * as in scala.TraversableOnce. The former operation is used for merging
values within a
+ * as in scala.IterableOnce. The former operation is used for merging values
within a
* partition, and the latter is used for merging values between partitions.
To avoid memory
* allocation, both of these functions are allowed to modify and return
their first argument
* instead of creating a new U.
@@ -757,7 +757,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Pass each value in the key-value pair RDD through a flatMap function
without changing the
* keys; this also retains the original RDD's partitioning.
*/
- def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] =
self.withScope {
+ def flatMapValues[U](f: V => IterableOnce[U]): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.flatMap { case (k, v) =>
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a21d2ae77396..e74e4eac1b87 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -420,7 +420,7 @@ abstract class RDD[T: ClassTag](
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
- def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
+ def flatMap[U: ClassTag](f: T => IterableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
}
@@ -1204,7 +1204,7 @@ abstract class RDD[T: ClassTag](
* Aggregate the elements of each partition, and then the results for all
the partitions, using
* given combine functions and a neutral "zero value". This function can
return a different result
* type, U, than the type of this RDD, T. Thus, we need one operation for
merging a T into an U
- * and one operation for merging two U's, as in scala.TraversableOnce. Both
of these functions are
+ * and one operation for merging two U's, as in scala.IterableOnce. Both of
these functions are
* allowed to modify and return their first argument instead of creating a
new U to avoid memory
* allocation.
*
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
index 1e02638591f8..0960a4741558 100644
--- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -27,7 +27,7 @@ import org.apache.spark.annotation.Since
*
* @constructor Initialize the StatCounter with the given values.
*/
-class StatCounter(values: TraversableOnce[Double]) extends Serializable {
+class StatCounter(values: IterableOnce[Double]) extends Serializable {
private var n: Long = 0 // Running count of our values
private var mu: Double = 0 // Running mean of our values
private var m2: Double = 0 // Running variance numerator (sum of (x -
mean)^2)
@@ -51,7 +51,7 @@ class StatCounter(values: TraversableOnce[Double]) extends
Serializable {
}
/** Add multiple values into this StatCounter, updating the internal
statistics. */
- def merge(values: TraversableOnce[Double]): StatCounter = {
+ def merge(values: IterableOnce[Double]): StatCounter = {
values.foreach(v => merge(v))
this
}
@@ -155,7 +155,7 @@ class StatCounter(values: TraversableOnce[Double]) extends
Serializable {
object StatCounter {
/** Build a StatCounter from a list of values. */
- def apply(values: TraversableOnce[Double]): StatCounter = new
StatCounter(values)
+ def apply(values: IterableOnce[Double]): StatCounter = new
StatCounter(values)
/** Build a StatCounter from a list of values passed as variable-length
arguments. */
def apply(values: Double*): StatCounter = new StatCounter(values)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index dcffa99dc64c..f8decbcff5f4 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -916,7 +916,7 @@ private[spark] object Utils
* result in a new collection. Unlike scala.util.Random.shuffle, this method
* uses a local random number generator, avoiding inter-thread contention.
*/
- def randomize[T: ClassTag](seq: TraversableOnce[T]): Seq[T] = {
+ def randomize[T: ClassTag](seq: IterableOnce[T]): Seq[T] = {
randomizeInPlace(seq.toArray)
}
diff --git
a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
index 9d5f1aac3391..8d9fb85309b1 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
@@ -81,7 +81,7 @@ private[spark] class CompactBuffer[T: ClassTag] extends
Seq[T] with Serializable
this
}
- def ++= (values: TraversableOnce[T]): CompactBuffer[T] = {
+ def ++= (values: IterableOnce[T]): CompactBuffer[T] = {
values match {
// Optimize merging of CompactBuffers, used in cogroup and groupByKey
case compactBuf: CompactBuffer[T] =>
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
index 151d6c8268d4..436899448d63 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
@@ -50,7 +50,7 @@ private[spark] object Utils extends SparkCollectionUtils {
* Callers must ensure that all the input iterators are already sorted by
* the same ordering `ord`, otherwise the result is likely to be incorrect.
*/
- def mergeOrdered[T](inputs: Iterable[TraversableOnce[T]])(
+ def mergeOrdered[T](inputs: Iterable[IterableOnce[T]])(
implicit ord: Ordering[T]): Iterator[T] = {
val ordering = new GuavaOrdering[T] {
override def compare(l: T, r: T): Int = ord.compare(l, r)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index b1dcb465b477..e232dfc05faa 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -275,13 +275,13 @@ case class UnresolvedGenerator(name: FunctionIdentifier,
children: Seq[Expressio
override def prettyName: String = name.unquotedString
override def toString: String = s"'$name(${children.mkString(", ")})"
- override def eval(input: InternalRow = null): TraversableOnce[InternalRow] =
+ override def eval(input: InternalRow = null): IterableOnce[InternalRow] =
throw QueryExecutionErrors.cannotEvaluateExpressionError(this)
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode =
throw QueryExecutionErrors.cannotGenerateCodeForExpressionError(this)
- override def terminate(): TraversableOnce[InternalRow] =
+ override def terminate(): IterableOnce[InternalRow] =
throw QueryExecutionErrors.cannotTerminateGeneratorError(this)
override protected def withNewChildrenInternal(
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 39e0f836e240..5f85716fa283 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -415,7 +415,7 @@ package object dsl {
def cogroup[Key: Encoder, Left: Encoder, Right: Encoder, Result:
Encoder](
otherPlan: LogicalPlan,
- func: (Key, Iterator[Left], Iterator[Right]) =>
TraversableOnce[Result],
+ func: (Key, Iterator[Left], Iterator[Right]) => IterableOnce[Result],
leftGroup: Seq[Attribute],
rightGroup: Seq[Attribute],
leftAttr: Seq[Attribute],
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
index bc74572444c8..539505543a40 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
@@ -146,7 +146,7 @@ case class PythonUDAF(
}
abstract class UnevaluableGenerator extends Generator {
- final override def eval(input: InternalRow): TraversableOnce[InternalRow] =
+ final override def eval(input: InternalRow): IterableOnce[InternalRow] =
throw QueryExecutionErrors.cannotEvaluateExpressionError(this)
final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode =
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 3b44add3b1f0..49cf01d472eb 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -66,13 +66,13 @@ trait Generator extends Expression {
def elementSchema: StructType
/** Should be implemented by child classes to perform specific Generators. */
- override def eval(input: InternalRow): TraversableOnce[InternalRow]
+ override def eval(input: InternalRow): IterableOnce[InternalRow]
/**
* Notifies that there are no more rows to process, clean up code, and
additional
* rows can be made here.
*/
- def terminate(): TraversableOnce[InternalRow] = Nil
+ def terminate(): IterableOnce[InternalRow] = Nil
/**
* Check if this generator supports code generation.
@@ -100,7 +100,7 @@ trait CollectionGenerator extends Generator {
*/
case class UserDefinedGenerator(
elementSchema: StructType,
- function: Row => TraversableOnce[InternalRow],
+ function: Row => IterableOnce[InternalRow],
children: Seq[Expression])
extends Generator with CodegenFallback {
@@ -117,7 +117,7 @@ case class UserDefinedGenerator(
}.asInstanceOf[InternalRow => Row]
}
- override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+ override def eval(input: InternalRow): IterableOnce[InternalRow] = {
if (inputRow == null) {
initializeConverters()
}
@@ -232,7 +232,7 @@ case class Stack(children: Seq[Expression]) extends
Generator {
case (e, index) => StructField(s"col$index", e.dataType)
})
- override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+ override def eval(input: InternalRow): IterableOnce[InternalRow] = {
val values = children.tail.map(_.eval(input)).toArray
for (row <- 0 until numRows) yield {
val fields = new Array[Any](numFields)
@@ -290,7 +290,7 @@ case class ReplicateRows(children: Seq[Expression]) extends
Generator with Codeg
case (e, index) => StructField(s"col$index", e.dataType)
})
- override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+ override def eval(input: InternalRow): IterableOnce[InternalRow] = {
val numRows = children.head.eval(input).asInstanceOf[Long]
val values = children.tail.map(_.eval(input)).toArray
Range.Long(0, numRows, 1).map { _ =>
@@ -311,7 +311,7 @@ case class ReplicateRows(children: Seq[Expression]) extends
Generator with Codeg
* such as explode_outer. This expression gets replaced during analysis.
*/
case class GeneratorOuter(child: Generator) extends UnaryExpression with
Generator {
- final override def eval(input: InternalRow = null):
TraversableOnce[InternalRow] =
+ final override def eval(input: InternalRow = null):
IterableOnce[InternalRow] =
throw QueryExecutionErrors.cannotEvaluateExpressionError(this)
final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode =
@@ -369,7 +369,7 @@ abstract class ExplodeBase extends UnaryExpression with
CollectionGenerator with
}
}
- override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+ override def eval(input: InternalRow): IterableOnce[InternalRow] = {
child.dataType match {
case ArrayType(et, _) =>
val inputArray = child.eval(input).asInstanceOf[ArrayData]
@@ -571,7 +571,7 @@ case class Inline(child: Expression) extends
UnaryExpression with CollectionGene
private lazy val generatorNullRow = new
GenericInternalRow(elementSchema.length)
- override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+ override def eval(input: InternalRow): IterableOnce[InternalRow] = {
val inputArray = child.eval(input).asInstanceOf[ArrayData]
if (inputArray == null) {
Nil
@@ -605,7 +605,7 @@ case class SQLKeywords() extends LeafExpression with
Generator with CodegenFallb
.add("keyword", StringType, nullable = false)
.add("reserved", BooleanType, nullable = false)
- override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+ override def eval(input: InternalRow): IterableOnce[InternalRow] = {
val reservedList = getReservedList()
keywords.zip(reservedList).map { case (keyword, isReserved) =>
InternalRow(UTF8String.fromString(keyword), isReserved)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 435c27a6e00d..e7df542ddab8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -407,7 +407,7 @@ case class JsonTuple(children: Seq[Expression])
}
}
- override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+ override def eval(input: InternalRow): IterableOnce[InternalRow] = {
val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
if (json == null) {
return nullRow
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index 9bf8db0b4faf..772984549694 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -401,13 +401,13 @@ case class AppendColumnsWithObject(
/** Factory for constructing new `MapGroups` nodes. */
object MapGroups {
def apply[K : Encoder, T : Encoder, U : Encoder](
- func: (K, Iterator[T]) => TraversableOnce[U],
+ func: (K, Iterator[T]) => IterableOnce[U],
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
dataOrder: Seq[SortOrder],
child: LogicalPlan): LogicalPlan = {
val mapped = new MapGroups(
- func.asInstanceOf[(Any, Iterator[Any]) => TraversableOnce[Any]],
+ func.asInstanceOf[(Any, Iterator[Any]) => IterableOnce[Any]],
UnresolvedDeserializer(encoderFor[K].deserializer, groupingAttributes),
UnresolvedDeserializer(encoderFor[T].deserializer, dataAttributes),
groupingAttributes,
@@ -436,7 +436,7 @@ object MapGroups {
* @param valueDeserializer used to extract the items in the iterator from an
input row.
*/
case class MapGroups(
- func: (Any, Iterator[Any]) => TraversableOnce[Any],
+ func: (Any, Iterator[Any]) => IterableOnce[Any],
keyDeserializer: Expression,
valueDeserializer: Expression,
groupingAttributes: Seq[Attribute],
@@ -662,7 +662,7 @@ case class FlatMapGroupsInRWithArrow(
/** Factory for constructing new `CoGroup` nodes. */
object CoGroup {
def apply[K : Encoder, L : Encoder, R : Encoder, OUT : Encoder](
- func: (K, Iterator[L], Iterator[R]) => TraversableOnce[OUT],
+ func: (K, Iterator[L], Iterator[R]) => IterableOnce[OUT],
leftGroup: Seq[Attribute],
rightGroup: Seq[Attribute],
leftAttr: Seq[Attribute],
@@ -674,7 +674,7 @@ object CoGroup {
require(DataTypeUtils.fromAttributes(leftGroup) ==
DataTypeUtils.fromAttributes(rightGroup))
val cogrouped = CoGroup(
- func.asInstanceOf[(Any, Iterator[Any], Iterator[Any]) =>
TraversableOnce[Any]],
+ func.asInstanceOf[(Any, Iterator[Any], Iterator[Any]) =>
IterableOnce[Any]],
// The `leftGroup` and `rightGroup` are guaranteed te be of same schema,
so it's safe to
// resolve the `keyDeserializer` based on either of them, here we pick
the left one.
UnresolvedDeserializer(encoderFor[K].deserializer, leftGroup),
@@ -698,7 +698,7 @@ object CoGroup {
* right children.
*/
case class CoGroup(
- func: (Any, Iterator[Any], Iterator[Any]) => TraversableOnce[Any],
+ func: (Any, Iterator[Any], Iterator[Any]) => IterableOnce[Any],
keyDeserializer: Expression,
leftDeserializer: Expression,
rightDeserializer: Expression,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 39ce5e77e28a..a34ad10f36ab 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -261,7 +261,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
* Returns a Seq by applying a function to all nodes in this tree and using
the elements of the
* resulting collections.
*/
- def flatMap[A](f: BaseType => TraversableOnce[A]): Seq[A] = {
+ def flatMap[A](f: BaseType => IterableOnce[A]): Seq[A] = {
val ret = new collection.mutable.ArrayBuffer[A]()
foreach(ret ++= f(_))
ret.toSeq
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
index 03ab04f23487..49c6625851dd 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.types._
class GeneratorExpressionSuite extends SparkFunSuite with ExpressionEvalHelper
{
private def checkTuple(actual: Expression, expected: Seq[InternalRow]): Unit
= {
- assert(actual.eval(null).asInstanceOf[TraversableOnce[InternalRow]].toSeq
=== expected)
+ assert(actual.eval(null).asInstanceOf[IterableOnce[InternalRow]].toSeq ===
expected)
}
private final val empty_array = CreateArray(Seq.empty)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 174f47a3b82b..a3f8a2cf4a05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2676,7 +2676,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
@deprecated("use flatMap() or select() with functions.explode() instead",
"2.0.0")
- def explode[A <: Product : TypeTag](input: Column*)(f: Row =>
TraversableOnce[A]): DataFrame = {
+ def explode[A <: Product : TypeTag](input: Column*)(f: Row =>
IterableOnce[A]): DataFrame = {
val elementSchema =
ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val convert =
CatalystTypeConverters.createToCatalystConverter(elementSchema)
@@ -2713,14 +2713,14 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
@deprecated("use flatMap() or select() with functions.explode() instead",
"2.0.0")
- def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A
=> TraversableOnce[B])
+ def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A
=> IterableOnce[B])
: DataFrame = {
val dataType = ScalaReflection.schemaFor[B].dataType
val attributes = AttributeReference(outputColumn, dataType)() :: Nil
// TODO handle the metadata?
val elementSchema = attributes.toStructType
- def rowFunction(row: Row): TraversableOnce[InternalRow] = {
+ def rowFunction(row: Row): IterableOnce[InternalRow] = {
val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
f(row(0).asInstanceOf[A]).map(o => InternalRow(convert(o)))
}
@@ -3510,7 +3510,7 @@ class Dataset[T] private[sql](
* @group typedrel
* @since 1.6.0
*/
- def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] =
+ def flatMap[U : Encoder](func: T => IterableOnce[U]): Dataset[U] =
mapPartitions(_.flatMap(func))
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 2239a128a2c4..ef0a3e0266c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -138,7 +138,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
*
* @since 1.6.0
*/
- def flatMapGroups[U : Encoder](f: (K, Iterator[V]) => TraversableOnce[U]):
Dataset[U] = {
+ def flatMapGroups[U : Encoder](f: (K, Iterator[V]) => IterableOnce[U]):
Dataset[U] = {
Dataset[U](
sparkSession,
MapGroups(
@@ -198,7 +198,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
*/
def flatMapSortedGroups[U : Encoder](
sortExprs: Column*)(
- f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = {
+ f: (K, Iterator[V]) => IterableOnce[U]): Dataset[U] = {
val sortOrder: Seq[SortOrder] = MapGroups.sortOrder(sortExprs.map(_.expr))
Dataset[U](
@@ -807,7 +807,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
*/
def cogroup[U, R : Encoder](
other: KeyValueGroupedDataset[K, U])(
- f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
+ f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] = {
implicit val uEncoder = other.vExprEnc
Dataset[R](
sparkSession,
@@ -857,7 +857,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
other: KeyValueGroupedDataset[K, U])(
thisSortExprs: Column*)(
otherSortExprs: Column*)(
- f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
+ f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] = {
def toSortOrder(col: Column): SortOrder = col.expr match {
case expr: SortOrder => expr
case expr: Expression => SortOrder(expr, Ascending)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index b99361437e0d..11d0448e026f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types._
* For lazy computing, be sure the generator.terminate() called in the very
last
* TODO reusing the CompletionIterator?
*/
-private[execution] sealed case class LazyIterator(func: () =>
TraversableOnce[InternalRow])
+private[execution] sealed case class LazyIterator(func: () =>
IterableOnce[InternalRow])
extends Iterator[InternalRow] {
lazy val results: Iterator[InternalRow] = func().toIterator
@@ -147,7 +147,7 @@ case class GenerateExec(
}.map(_._2)
boundGenerator match {
case e: CollectionGenerator => codeGenCollection(ctx, e, requiredInput)
- case g => codeGenTraversableOnce(ctx, g, requiredInput)
+ case g => codeGenIterableOnce(ctx, g, requiredInput)
}
}
@@ -239,9 +239,9 @@ case class GenerateExec(
}
/**
- * Generate code for a regular [[TraversableOnce]] returning [[Generator]].
+ * Generate code for a regular [[IterableOnce]] returning [[Generator]].
*/
- private def codeGenTraversableOnce(
+ private def codeGenIterableOnce(
ctx: CodegenContext,
e: Expression,
requiredInput: Seq[ExprCode]): String = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala
index c58d925f28e5..afbb1f0e5a37 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala
@@ -70,7 +70,7 @@ trait AdaptiveSparkPlanHelper {
* Returns a Seq by applying a function to all nodes in this tree and using
the elements of the
* resulting collections.
*/
- def flatMap[A](p: SparkPlan)(f: SparkPlan => TraversableOnce[A]): Seq[A] = {
+ def flatMap[A](p: SparkPlan)(f: SparkPlan => IterableOnce[A]): Seq[A] = {
val ret = new collection.mutable.ArrayBuffer[A]()
foreach(p)(ret ++= f(_))
ret.toSeq
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 10878c810690..31bfa5aff351 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -392,7 +392,7 @@ case class AppendColumnsWithObjectExec(
* The result of this function is flattened before being output.
*/
case class MapGroupsExec(
- func: (Any, Iterator[Any]) => TraversableOnce[Any],
+ func: (Any, Iterator[Any]) => IterableOnce[Any],
keyDeserializer: Expression,
valueDeserializer: Expression,
groupingAttributes: Seq[Attribute],
@@ -432,7 +432,7 @@ case class MapGroupsExec(
object MapGroupsExec {
def apply(
- func: (Any, Iterator[Any], LogicalGroupState[Any]) =>
TraversableOnce[Any],
+ func: (Any, Iterator[Any], LogicalGroupState[Any]) => IterableOnce[Any],
keyDeserializer: Expression,
valueDeserializer: Expression,
groupingAttributes: Seq[Attribute],
@@ -615,7 +615,7 @@ case class FlatMapGroupsInRWithArrowExec(
* The result of this function is flattened before being output.
*/
case class CoGroupExec(
- func: (Any, Iterator[Any], Iterator[Any]) => TraversableOnce[Any],
+ func: (Any, Iterator[Any], Iterator[Any]) => IterableOnce[Any],
keyDeserializer: Expression,
leftDeserializer: Expression,
rightDeserializer: Expression,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index fa0744dc19b2..f81fac8e892b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -72,7 +72,7 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext:
SQLContext) extends Spa
addData(data.toTraversable)
}
- def addData(data: TraversableOnce[A]): OffsetV2
+ def addData(data: IterableOnce[A]): OffsetV2
def fullSchema(): StructType = encoder.schema
@@ -188,7 +188,7 @@ case class MemoryStream[A : Encoder](
@GuardedBy("this")
protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
- def addData(data: TraversableOnce[A]): Offset = {
+ def addData(data: IterableOnce[A]): Offset = {
val objects = data.toSeq
val rows = objects.iterator.map(d =>
toRow(d).copy().asInstanceOf[UnsafeRow]).toArray
logDebug(s"Adding: $objects")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index d0cf602c7cca..028884e8f34c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -56,7 +56,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int,
sqlContext: SQLContext, numPa
private val recordEndpoint = new ContinuousRecordEndpoint(records, this)
@volatile private var endpointRef: RpcEndpointRef = _
- def addData(data: TraversableOnce[A]): Offset = synchronized {
+ def addData(data: IterableOnce[A]): Offset = synchronized {
// Distribute data evenly among partition lists.
data.toSeq.zipWithIndex.map {
case (item, index) =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index 68f63feb5c51..e228d4792d14 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -547,7 +547,7 @@ class GeneratorFunctionSuite extends QueryTest with
SharedSparkSession {
case class EmptyGenerator() extends Generator with LeafLike[Expression] {
override def elementSchema: StructType = new StructType().add("id",
IntegerType)
- override def eval(input: InternalRow): TraversableOnce[InternalRow] =
Seq.empty
+ override def eval(input: InternalRow): IterableOnce[InternalRow] = Seq.empty
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode = {
val iteratorClass = classOf[Iterator[_]].getName
ev.copy(code =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 9452209e3184..6efdb676ccbd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -239,7 +239,7 @@ private[hive] case class HiveGenericUDTF(
@transient
private lazy val inputProjection = new InterpretedProjection(children)
- override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+ override def eval(input: InternalRow): IterableOnce[InternalRow] = {
outputInspector // Make sure initialized.
function.process(wrap(inputProjection(input), wrappers, udtInput,
inputDataTypes))
collector.collectRows()
@@ -262,7 +262,7 @@ private[hive] case class HiveGenericUDTF(
}
}
- override def terminate(): TraversableOnce[InternalRow] = {
+ override def terminate(): IterableOnce[InternalRow] = {
outputInspector // Make sure initialized.
function.close()
collector.collectRows()
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index ca4f3670d5ad..ffa65a739de2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -552,7 +552,7 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream by applying a function to all elements of this
DStream,
* and then flattening the results
*/
- def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] =
ssc.withScope {
+ def flatMap[U: ClassTag](flatMapFunc: T => IterableOnce[U]): DStream[U] =
ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
index f5b1e5f3a145..aca33295fbb1 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -25,7 +25,7 @@ import org.apache.spark.streaming.{Duration, Time}
private[streaming]
class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
parent: DStream[(K, V)],
- flatMapValueFunc: V => TraversableOnce[U]
+ flatMapValueFunc: V => IterableOnce[U]
) extends DStream[(K, U)](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index d60a6179782e..b87ba7dc1207 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -25,7 +25,7 @@ import org.apache.spark.streaming.{Duration, Time}
private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
- flatMapFunc: T => TraversableOnce[U]
+ flatMapFunc: T => IterableOnce[U]
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index a5bed752bd66..81904ea633a4 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -548,7 +548,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
* 'this' DStream without changing the key.
*/
def flatMapValues[U: ClassTag](
- flatMapValuesFunc: V => TraversableOnce[U]
+ flatMapValuesFunc: V => IterableOnce[U]
): DStream[(K, U)] = ssc.withScope {
new FlatMapValuedDStream[K, V, U](self,
sparkContext.clean(flatMapValuesFunc))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]