This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0f145f514e3 [SPARK-45667][CORE][SQL][CONNECT] Clean up the deprecated
API usage related to `IterableOnceExtensionMethods`
0f145f514e3 is described below
commit 0f145f514e35b134e3f16ba0f1131a0f01546db0
Author: yangjie01 <[email protected]>
AuthorDate: Fri Oct 27 10:52:26 2023 +0800
[SPARK-45667][CORE][SQL][CONNECT] Clean up the deprecated API usage related
to `IterableOnceExtensionMethods`
### What changes were proposed in this pull request?
This PR cleans up the use of the following APIs in
`IterableOnceExtensionMethods`, as they have been deprecated after Scala 2.13.0:
- `.toSeq` -> `.iterator.to(Seq)`
- `.toIterable` -> `.iterator.to(Iterable)`
- `.toTraversable` -> `.iterator.to(Iterable)`
- `.toArray` -> `.iterator.toArray`
- `.map` -> `.iterator.map`
- `.foreach` -> `.iterator.foreach`
- `.isEmpty` -> `.iterator.isEmpty`
```scala
deprecated("Use .iterator.to(Seq) instead", "2.13.0")
`inline` def toSeq: immutable.Seq[A] = immutable.Seq.from(it)
deprecated("Use .iterator.to(Iterable) instead", "2.13.0")
`inline` final def toIterable: Iterable[A] = Iterable.from(it)
deprecated("Use .iterator.to(Iterable) instead", "2.13.0")
`inline` final def toTraversable: Traversable[A] = toIterable
deprecated("Use .iterator.toArray", "2.13.0")
def toArray[B >: A: ClassTag]: Array[B] = it match {
case it: Iterable[B] => it.toArray[B]
case _ => it.iterator.toArray[B]
}
deprecated("Use .iterator.map instead or consider requiring an Iterable",
"2.13.0")
def map[B](f: A => B): IterableOnce[B] = it match {
case it: Iterable[A] => it.map(f)
case _ => it.iterator.map(f)
}
deprecated("Use .iterator.foreach(...) instead", "2.13.0")
`inline` def foreach[U](f: A => U): Unit = it match {
case it: Iterable[A] => it.foreach(f)
case _ => it.iterator.foreach(f)
}
deprecated("Use .iterator.isEmpty instead", "2.13.0")
def isEmpty: Boolean = it match {
case it: Iterable[A] => it.isEmpty
case _ => it.iterator.isEmpty
}
```
### Why are the changes needed?
Clean up deprecated API usage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GitHub Acitons.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43532 from LuciferYang/SPARK-45667.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala | 2 +-
core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 2 +-
core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala | 2 +-
core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala | 2 +-
core/src/main/scala/org/apache/spark/util/StatCounter.scala | 2 +-
core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +-
.../main/scala/org/apache/spark/util/collection/CompactBuffer.scala | 2 +-
.../spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala | 2 +-
.../apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala | 2 +-
.../src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala | 4 ++--
sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala | 4 ++--
.../main/scala/org/apache/spark/sql/execution/streaming/memory.scala | 2 +-
.../sql/execution/streaming/sources/ContinuousMemoryStream.scala | 2 +-
.../scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala | 2 +-
.../org/apache/spark/streaming/dstream/MapWithStateDStream.scala | 2 +-
15 files changed, 17 insertions(+), 17 deletions(-)
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
index 7050e62d549..adcc6e3d6bf 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
@@ -132,7 +132,7 @@ private[sql] object UdfUtils extends Serializable {
def noOp[V, K](): V => K = _ => null.asInstanceOf[K]
def iterableOnceToSeq[A, B](f: A => IterableOnce[B]): A => Seq[B] = { value
=>
- f(value).toSeq
+ f(value).iterator.to(Seq)
}
// (1 to 22).foreach { i =>
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 0739367ec79..a40d6636ff0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -761,7 +761,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.flatMap { case (k, v) =>
- cleanF(v).map(x => (k, x))
+ cleanF(v).iterator.map(x => (k, x))
},
preservesPartitioning = true)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 3c1451a0185..7c6eaebbc6b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -84,7 +84,7 @@ class UnionRDD[T: ClassTag](
} else {
rdds
}
- val array = new Array[Partition](parRDDs.map(_.partitions.length).sum)
+ val array = new
Array[Partition](parRDDs.iterator.map(_.partitions.length).sum)
var pos = 0
for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
diff --git
a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
index 074dab84726..ccb4d2063ff 100644
--- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
+++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
@@ -42,7 +42,7 @@ private[spark] class BoundedPriorityQueue[A](maxSize:
Int)(implicit ord: Orderin
override def knownSize: Int = size
override def addAll(xs: IterableOnce[A]): this.type = {
- xs.foreach { this += _ }
+ xs.iterator.foreach { this += _ }
this
}
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
index 0960a474155..7df25fd78c4 100644
--- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -52,7 +52,7 @@ class StatCounter(values: IterableOnce[Double]) extends
Serializable {
/** Add multiple values into this StatCounter, updating the internal
statistics. */
def merge(values: IterableOnce[Double]): StatCounter = {
- values.foreach(v => merge(v))
+ values.iterator.foreach(v => merge(v))
this
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f22bec5c2be..6e3f42bd16d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -838,7 +838,7 @@ private[spark] object Utils
* uses a local random number generator, avoiding inter-thread contention.
*/
def randomize[T: ClassTag](seq: IterableOnce[T]): Seq[T] = {
- randomizeInPlace(seq.toArray)
+ randomizeInPlace(seq.iterator.toArray)
}
/**
diff --git
a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
index 8d9fb85309b..f667aea63c7 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
@@ -105,7 +105,7 @@ private[spark] class CompactBuffer[T: ClassTag] extends
Seq[T] with Serializable
}
case _ =>
- values.foreach(e => this += e)
+ values.iterator.foreach(e => this += e)
}
this
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
index 49c6625851d..dd512b0d83e 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.types._
class GeneratorExpressionSuite extends SparkFunSuite with ExpressionEvalHelper
{
private def checkTuple(actual: Expression, expected: Seq[InternalRow]): Unit
= {
- assert(actual.eval(null).asInstanceOf[IterableOnce[InternalRow]].toSeq ===
expected)
+
assert(actual.eval(null).asInstanceOf[IterableOnce[InternalRow]].iterator.to(Seq)
=== expected)
}
private final val empty_array = CreateArray(Seq.empty)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 94e40b98065..9b651ef8762 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -269,7 +269,7 @@ class JsonExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper with
Nil
private def checkJsonTuple(jt: JsonTuple, expected: InternalRow): Unit = {
- assert(jt.eval(null).toSeq.head === expected)
+ assert(jt.eval(null).iterator.to(Seq).head === expected)
}
test("json_tuple escaping") {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index 987c0668d94..49546935da9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -97,7 +97,7 @@ case class GenerateExec(
// we should always set the left (required child output)
joinedRow.withLeft(pruneChildForResult(row))
val outputRows = boundGenerator.eval(row)
- if (outer && outputRows.isEmpty) {
+ if (outer && outputRows.iterator.isEmpty) {
joinedRow.withRight(generatorNullRow) :: Nil
} else {
outputRows.iterator.map(joinedRow.withRight)
@@ -110,7 +110,7 @@ case class GenerateExec(
} else {
iter.flatMap { row =>
val outputRows = boundGenerator.eval(row)
- if (outer && outputRows.isEmpty) {
+ if (outer && outputRows.iterator.isEmpty) {
Seq(generatorNullRow)
} else {
outputRows
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 31bfa5aff35..2fefd8f70cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -421,7 +421,7 @@ case class MapGroupsExec(
val result = func(
getKey(key),
rowIter.map(getValue))
- result.map(outputObject)
+ result.iterator.map(outputObject)
}
}
}
@@ -653,7 +653,7 @@ case class CoGroupExec(
getKey(key),
leftResult.map(getLeft),
rightResult.map(getRight))
- result.map(outputObject)
+ result.iterator.map(outputObject)
}
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index f81fac8e892..8fe5319ef4c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -189,7 +189,7 @@ case class MemoryStream[A : Encoder](
protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
def addData(data: IterableOnce[A]): Offset = {
- val objects = data.toSeq
+ val objects = data.iterator.to(Seq)
val rows = objects.iterator.map(d =>
toRow(d).copy().asInstanceOf[UnsafeRow]).toArray
logDebug(s"Adding: $objects")
this.synchronized {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index 028884e8f34..dc97386d8fc 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -58,7 +58,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int,
sqlContext: SQLContext, numPa
def addData(data: IterableOnce[A]): Offset = synchronized {
// Distribute data evenly among partition lists.
- data.toSeq.zipWithIndex.map {
+ data.iterator.to(Seq).zipWithIndex.map {
case (item, index) =>
records(index % numPartitions) +=
toRow(item).copy().asInstanceOf[UnsafeRow]
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index f8dde124b31..0f1a45319af 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -122,7 +122,7 @@ class ExpressionInfoSuite extends SparkFunSuite with
SharedSparkSession {
withClue(s"Expression class '$className'") {
val exprExamples = info.getOriginalExamples
if (!exprExamples.isEmpty && !ignoreSet.contains(className)) {
- assert(exampleRe.findAllIn(exprExamples).toIterable
+ assert(exampleRe.findAllIn(exprExamples).iterator.to(Iterable)
.filter(setStmtRe.findFirstIn(_).isEmpty) // Ignore SET commands
.forall(_.contains("_FUNC_")))
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
index 3368382a552..eae5f1a27b3 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
@@ -74,7 +74,7 @@ private[streaming] class MapWithStateDStreamImpl[
/** Return a pair DStream where each RDD is the snapshot of the state of all
the keys. */
def stateSnapshots(): DStream[(KeyType, StateType)] = {
internalStream.flatMap {
- _.stateMap.getAll().map { case (k, s, _) => (k, s) }.toTraversable }
+ _.stateMap.getAll().map { case (k, s, _) => (k, s)
}.iterator.to(Iterable) }
}
def keyClass: Class[_] = implicitly[ClassTag[KeyType]].runtimeClass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]