This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0f145f514e3 [SPARK-45667][CORE][SQL][CONNECT] Clean up the deprecated API usage related to `IterableOnceExtensionMethods` 0f145f514e3 is described below commit 0f145f514e35b134e3f16ba0f1131a0f01546db0 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Fri Oct 27 10:52:26 2023 +0800 [SPARK-45667][CORE][SQL][CONNECT] Clean up the deprecated API usage related to `IterableOnceExtensionMethods` ### What changes were proposed in this pull request? This PR cleans up the use of the following APIs in `IterableOnceExtensionMethods`, as they have been deprecated after Scala 2.13.0: - `.toSeq` -> `.iterator.to(Seq)` - `.toIterable` -> `.iterator.to(Iterable)` - `.toTraversable` -> `.iterator.to(Iterable)` - `.toArray` -> `.iterator.toArray` - `.map` -> `.iterator.map` - `.foreach` -> `.iterator.foreach` - `.isEmpty` -> `.iterator.isEmpty` ```scala deprecated("Use .iterator.to(Seq) instead", "2.13.0") `inline` def toSeq: immutable.Seq[A] = immutable.Seq.from(it) deprecated("Use .iterator.to(Iterable) instead", "2.13.0") `inline` final def toIterable: Iterable[A] = Iterable.from(it) deprecated("Use .iterator.to(Iterable) instead", "2.13.0") `inline` final def toTraversable: Traversable[A] = toIterable deprecated("Use .iterator.toArray", "2.13.0") def toArray[B >: A: ClassTag]: Array[B] = it match { case it: Iterable[B] => it.toArray[B] case _ => it.iterator.toArray[B] } deprecated("Use .iterator.map instead or consider requiring an Iterable", "2.13.0") def map[B](f: A => B): IterableOnce[B] = it match { case it: Iterable[A] => it.map(f) case _ => it.iterator.map(f) } deprecated("Use .iterator.foreach(...) instead", "2.13.0") `inline` def foreach[U](f: A => U): Unit = it match { case it: Iterable[A] => it.foreach(f) case _ => it.iterator.foreach(f) } deprecated("Use .iterator.isEmpty instead", "2.13.0") def isEmpty: Boolean = it match { case it: Iterable[A] => it.isEmpty case _ => it.iterator.isEmpty } ``` ### Why are the changes needed? Clean up deprecated API usage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Acitons. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43532 from LuciferYang/SPARK-45667. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala | 2 +- core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala | 2 +- core/src/main/scala/org/apache/spark/util/StatCounter.scala | 2 +- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- .../main/scala/org/apache/spark/util/collection/CompactBuffer.scala | 2 +- .../spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala | 2 +- .../apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala | 2 +- .../src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala | 4 ++-- sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala | 4 ++-- .../main/scala/org/apache/spark/sql/execution/streaming/memory.scala | 2 +- .../sql/execution/streaming/sources/ContinuousMemoryStream.scala | 2 +- .../scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala | 2 +- .../org/apache/spark/streaming/dstream/MapWithStateDStream.scala | 2 +- 15 files changed, 17 insertions(+), 17 deletions(-) diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala index 7050e62d549..adcc6e3d6bf 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala @@ -132,7 +132,7 @@ private[sql] object UdfUtils extends Serializable { def noOp[V, K](): V => K = _ => null.asInstanceOf[K] def iterableOnceToSeq[A, B](f: A => IterableOnce[B]): A => Seq[B] = { value => - f(value).toSeq + f(value).iterator.to(Seq) } // (1 to 22).foreach { i => diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 0739367ec79..a40d6636ff0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -761,7 +761,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) => iter.flatMap { case (k, v) => - cleanF(v).map(x => (k, x)) + cleanF(v).iterator.map(x => (k, x)) }, preservesPartitioning = true) } diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 3c1451a0185..7c6eaebbc6b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -84,7 +84,7 @@ class UnionRDD[T: ClassTag]( } else { rdds } - val array = new Array[Partition](parRDDs.map(_.partitions.length).sum) + val array = new Array[Partition](parRDDs.iterator.map(_.partitions.length).sum) var pos = 0 for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) { array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index) diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala index 074dab84726..ccb4d2063ff 100644 --- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala +++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala @@ -42,7 +42,7 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin override def knownSize: Int = size override def addAll(xs: IterableOnce[A]): this.type = { - xs.foreach { this += _ } + xs.iterator.foreach { this += _ } this } diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index 0960a474155..7df25fd78c4 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -52,7 +52,7 @@ class StatCounter(values: IterableOnce[Double]) extends Serializable { /** Add multiple values into this StatCounter, updating the internal statistics. */ def merge(values: IterableOnce[Double]): StatCounter = { - values.foreach(v => merge(v)) + values.iterator.foreach(v => merge(v)) this } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f22bec5c2be..6e3f42bd16d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -838,7 +838,7 @@ private[spark] object Utils * uses a local random number generator, avoiding inter-thread contention. */ def randomize[T: ClassTag](seq: IterableOnce[T]): Seq[T] = { - randomizeInPlace(seq.toArray) + randomizeInPlace(seq.iterator.toArray) } /** diff --git a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala index 8d9fb85309b..f667aea63c7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala @@ -105,7 +105,7 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable } case _ => - values.foreach(e => this += e) + values.iterator.foreach(e => this += e) } this } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala index 49c6625851d..dd512b0d83e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.types._ class GeneratorExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { private def checkTuple(actual: Expression, expected: Seq[InternalRow]): Unit = { - assert(actual.eval(null).asInstanceOf[IterableOnce[InternalRow]].toSeq === expected) + assert(actual.eval(null).asInstanceOf[IterableOnce[InternalRow]].iterator.to(Seq) === expected) } private final val empty_array = CreateArray(Seq.empty) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 94e40b98065..9b651ef8762 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -269,7 +269,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with Nil private def checkJsonTuple(jt: JsonTuple, expected: InternalRow): Unit = { - assert(jt.eval(null).toSeq.head === expected) + assert(jt.eval(null).iterator.to(Seq).head === expected) } test("json_tuple escaping") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 987c0668d94..49546935da9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -97,7 +97,7 @@ case class GenerateExec( // we should always set the left (required child output) joinedRow.withLeft(pruneChildForResult(row)) val outputRows = boundGenerator.eval(row) - if (outer && outputRows.isEmpty) { + if (outer && outputRows.iterator.isEmpty) { joinedRow.withRight(generatorNullRow) :: Nil } else { outputRows.iterator.map(joinedRow.withRight) @@ -110,7 +110,7 @@ case class GenerateExec( } else { iter.flatMap { row => val outputRows = boundGenerator.eval(row) - if (outer && outputRows.isEmpty) { + if (outer && outputRows.iterator.isEmpty) { Seq(generatorNullRow) } else { outputRows diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 31bfa5aff35..2fefd8f70cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -421,7 +421,7 @@ case class MapGroupsExec( val result = func( getKey(key), rowIter.map(getValue)) - result.map(outputObject) + result.iterator.map(outputObject) } } } @@ -653,7 +653,7 @@ case class CoGroupExec( getKey(key), leftResult.map(getLeft), rightResult.map(getRight)) - result.map(outputObject) + result.iterator.map(outputObject) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index f81fac8e892..8fe5319ef4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -189,7 +189,7 @@ case class MemoryStream[A : Encoder]( protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) def addData(data: IterableOnce[A]): Offset = { - val objects = data.toSeq + val objects = data.iterator.to(Seq) val rows = objects.iterator.map(d => toRow(d).copy().asInstanceOf[UnsafeRow]).toArray logDebug(s"Adding: $objects") this.synchronized { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala index 028884e8f34..dc97386d8fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala @@ -58,7 +58,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa def addData(data: IterableOnce[A]): Offset = synchronized { // Distribute data evenly among partition lists. - data.toSeq.zipWithIndex.map { + data.iterator.to(Seq).zipWithIndex.map { case (item, index) => records(index % numPartitions) += toRow(item).copy().asInstanceOf[UnsafeRow] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala index f8dde124b31..0f1a45319af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala @@ -122,7 +122,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession { withClue(s"Expression class '$className'") { val exprExamples = info.getOriginalExamples if (!exprExamples.isEmpty && !ignoreSet.contains(className)) { - assert(exampleRe.findAllIn(exprExamples).toIterable + assert(exampleRe.findAllIn(exprExamples).iterator.to(Iterable) .filter(setStmtRe.findFirstIn(_).isEmpty) // Ignore SET commands .forall(_.contains("_FUNC_"))) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala index 3368382a552..eae5f1a27b3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala @@ -74,7 +74,7 @@ private[streaming] class MapWithStateDStreamImpl[ /** Return a pair DStream where each RDD is the snapshot of the state of all the keys. */ def stateSnapshots(): DStream[(KeyType, StateType)] = { internalStream.flatMap { - _.stateMap.getAll().map { case (k, s, _) => (k, s) }.toTraversable } + _.stateMap.getAll().map { case (k, s, _) => (k, s) }.iterator.to(Iterable) } } def keyClass: Class[_] = implicitly[ClassTag[KeyType]].runtimeClass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org