This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f145f514e3 [SPARK-45667][CORE][SQL][CONNECT] Clean up the deprecated 
API usage related to `IterableOnceExtensionMethods`
0f145f514e3 is described below

commit 0f145f514e35b134e3f16ba0f1131a0f01546db0
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Fri Oct 27 10:52:26 2023 +0800

    [SPARK-45667][CORE][SQL][CONNECT] Clean up the deprecated API usage related 
to `IterableOnceExtensionMethods`
    
    ### What changes were proposed in this pull request?
    This PR cleans up the use of the following APIs in 
`IterableOnceExtensionMethods`, as they have been deprecated after Scala 2.13.0:
    
    - `.toSeq` -> `.iterator.to(Seq)`
    - `.toIterable` -> `.iterator.to(Iterable)`
    - `.toTraversable` -> `.iterator.to(Iterable)`
    - `.toArray` -> `.iterator.toArray`
    - `.map` -> `.iterator.map`
    - `.foreach` -> `.iterator.foreach`
    - `.isEmpty` -> `.iterator.isEmpty`
    
    ```scala
      deprecated("Use .iterator.to(Seq) instead", "2.13.0")
      `inline` def toSeq: immutable.Seq[A] = immutable.Seq.from(it)
    
      deprecated("Use .iterator.to(Iterable) instead", "2.13.0")
      `inline` final def toIterable: Iterable[A] = Iterable.from(it)
    
      deprecated("Use .iterator.to(Iterable) instead", "2.13.0")
      `inline` final def toTraversable: Traversable[A] = toIterable
    
      deprecated("Use .iterator.toArray", "2.13.0")
      def toArray[B >: A: ClassTag]: Array[B] = it match {
        case it: Iterable[B] => it.toArray[B]
        case _ => it.iterator.toArray[B]
      }
    
      deprecated("Use .iterator.map instead or consider requiring an Iterable", 
"2.13.0")
      def map[B](f: A => B): IterableOnce[B] = it match {
        case it: Iterable[A] => it.map(f)
        case _ => it.iterator.map(f)
      }
    
      deprecated("Use .iterator.foreach(...) instead", "2.13.0")
      `inline` def foreach[U](f: A => U): Unit = it match {
        case it: Iterable[A] => it.foreach(f)
        case _ => it.iterator.foreach(f)
      }
    
      deprecated("Use .iterator.isEmpty instead", "2.13.0")
      def isEmpty: Boolean = it match {
        case it: Iterable[A] => it.isEmpty
        case _ => it.iterator.isEmpty
      }
    ```
    
    ### Why are the changes needed?
    Clean up deprecated API usage.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Acitons.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43532 from LuciferYang/SPARK-45667.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 .../src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala | 2 +-
 core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala       | 2 +-
 core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala               | 2 +-
 core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala  | 2 +-
 core/src/main/scala/org/apache/spark/util/StatCounter.scala           | 2 +-
 core/src/main/scala/org/apache/spark/util/Utils.scala                 | 2 +-
 .../main/scala/org/apache/spark/util/collection/CompactBuffer.scala   | 2 +-
 .../spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala     | 2 +-
 .../apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala  | 2 +-
 .../src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala  | 4 ++--
 sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala  | 4 ++--
 .../main/scala/org/apache/spark/sql/execution/streaming/memory.scala  | 2 +-
 .../sql/execution/streaming/sources/ContinuousMemoryStream.scala      | 2 +-
 .../scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala  | 2 +-
 .../org/apache/spark/streaming/dstream/MapWithStateDStream.scala      | 2 +-
 15 files changed, 17 insertions(+), 17 deletions(-)

diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
index 7050e62d549..adcc6e3d6bf 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
@@ -132,7 +132,7 @@ private[sql] object UdfUtils extends Serializable {
   def noOp[V, K](): V => K = _ => null.asInstanceOf[K]
 
   def iterableOnceToSeq[A, B](f: A => IterableOnce[B]): A => Seq[B] = { value 
=>
-    f(value).toSeq
+    f(value).iterator.to(Seq)
   }
 
   //  (1 to 22).foreach { i =>
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 0739367ec79..a40d6636ff0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -761,7 +761,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val cleanF = self.context.clean(f)
     new MapPartitionsRDD[(K, U), (K, V)](self,
       (context, pid, iter) => iter.flatMap { case (k, v) =>
-        cleanF(v).map(x => (k, x))
+        cleanF(v).iterator.map(x => (k, x))
       },
       preservesPartitioning = true)
   }
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 3c1451a0185..7c6eaebbc6b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -84,7 +84,7 @@ class UnionRDD[T: ClassTag](
     } else {
       rdds
     }
-    val array = new Array[Partition](parRDDs.map(_.partitions.length).sum)
+    val array = new 
Array[Partition](parRDDs.iterator.map(_.partitions.length).sum)
     var pos = 0
     for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
       array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
diff --git 
a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala 
b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
index 074dab84726..ccb4d2063ff 100644
--- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
+++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
@@ -42,7 +42,7 @@ private[spark] class BoundedPriorityQueue[A](maxSize: 
Int)(implicit ord: Orderin
   override def knownSize: Int = size
 
   override def addAll(xs: IterableOnce[A]): this.type = {
-    xs.foreach { this += _ }
+    xs.iterator.foreach { this += _ }
     this
   }
 
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala 
b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
index 0960a474155..7df25fd78c4 100644
--- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -52,7 +52,7 @@ class StatCounter(values: IterableOnce[Double]) extends 
Serializable {
 
   /** Add multiple values into this StatCounter, updating the internal 
statistics. */
   def merge(values: IterableOnce[Double]): StatCounter = {
-    values.foreach(v => merge(v))
+    values.iterator.foreach(v => merge(v))
     this
   }
 
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f22bec5c2be..6e3f42bd16d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -838,7 +838,7 @@ private[spark] object Utils
    * uses a local random number generator, avoiding inter-thread contention.
    */
   def randomize[T: ClassTag](seq: IterableOnce[T]): Seq[T] = {
-    randomizeInPlace(seq.toArray)
+    randomizeInPlace(seq.iterator.toArray)
   }
 
   /**
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
index 8d9fb85309b..f667aea63c7 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
@@ -105,7 +105,7 @@ private[spark] class CompactBuffer[T: ClassTag] extends 
Seq[T] with Serializable
         }
 
       case _ =>
-        values.foreach(e => this += e)
+        values.iterator.foreach(e => this += e)
     }
     this
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
index 49c6625851d..dd512b0d83e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.types._
 
 class GeneratorExpressionSuite extends SparkFunSuite with ExpressionEvalHelper 
{
   private def checkTuple(actual: Expression, expected: Seq[InternalRow]): Unit 
= {
-    assert(actual.eval(null).asInstanceOf[IterableOnce[InternalRow]].toSeq === 
expected)
+    
assert(actual.eval(null).asInstanceOf[IterableOnce[InternalRow]].iterator.to(Seq)
 === expected)
   }
 
   private final val empty_array = CreateArray(Seq.empty)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 94e40b98065..9b651ef8762 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -269,7 +269,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
     Nil
 
   private def checkJsonTuple(jt: JsonTuple, expected: InternalRow): Unit = {
-    assert(jt.eval(null).toSeq.head === expected)
+    assert(jt.eval(null).iterator.to(Seq).head === expected)
   }
 
   test("json_tuple escaping") {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index 987c0668d94..49546935da9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -97,7 +97,7 @@ case class GenerateExec(
           // we should always set the left (required child output)
           joinedRow.withLeft(pruneChildForResult(row))
           val outputRows = boundGenerator.eval(row)
-          if (outer && outputRows.isEmpty) {
+          if (outer && outputRows.iterator.isEmpty) {
             joinedRow.withRight(generatorNullRow) :: Nil
           } else {
             outputRows.iterator.map(joinedRow.withRight)
@@ -110,7 +110,7 @@ case class GenerateExec(
       } else {
         iter.flatMap { row =>
           val outputRows = boundGenerator.eval(row)
-          if (outer && outputRows.isEmpty) {
+          if (outer && outputRows.iterator.isEmpty) {
             Seq(generatorNullRow)
           } else {
             outputRows
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 31bfa5aff35..2fefd8f70cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -421,7 +421,7 @@ case class MapGroupsExec(
         val result = func(
           getKey(key),
           rowIter.map(getValue))
-        result.map(outputObject)
+        result.iterator.map(outputObject)
       }
     }
   }
@@ -653,7 +653,7 @@ case class CoGroupExec(
             getKey(key),
             leftResult.map(getLeft),
             rightResult.map(getRight))
-          result.map(outputObject)
+          result.iterator.map(outputObject)
       }
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index f81fac8e892..8fe5319ef4c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -189,7 +189,7 @@ case class MemoryStream[A : Encoder](
   protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
 
   def addData(data: IterableOnce[A]): Offset = {
-    val objects = data.toSeq
+    val objects = data.iterator.to(Seq)
     val rows = objects.iterator.map(d => 
toRow(d).copy().asInstanceOf[UnsafeRow]).toArray
     logDebug(s"Adding: $objects")
     this.synchronized {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index 028884e8f34..dc97386d8fc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -58,7 +58,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext, numPa
 
   def addData(data: IterableOnce[A]): Offset = synchronized {
     // Distribute data evenly among partition lists.
-    data.toSeq.zipWithIndex.map {
+    data.iterator.to(Seq).zipWithIndex.map {
       case (item, index) =>
         records(index % numPartitions) += 
toRow(item).copy().asInstanceOf[UnsafeRow]
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index f8dde124b31..0f1a45319af 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -122,7 +122,7 @@ class ExpressionInfoSuite extends SparkFunSuite with 
SharedSparkSession {
       withClue(s"Expression class '$className'") {
         val exprExamples = info.getOriginalExamples
         if (!exprExamples.isEmpty && !ignoreSet.contains(className)) {
-          assert(exampleRe.findAllIn(exprExamples).toIterable
+          assert(exampleRe.findAllIn(exprExamples).iterator.to(Iterable)
             .filter(setStmtRe.findFirstIn(_).isEmpty) // Ignore SET commands
             .forall(_.contains("_FUNC_")))
         }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
index 3368382a552..eae5f1a27b3 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
@@ -74,7 +74,7 @@ private[streaming] class MapWithStateDStreamImpl[
   /** Return a pair DStream where each RDD is the snapshot of the state of all 
the keys. */
   def stateSnapshots(): DStream[(KeyType, StateType)] = {
     internalStream.flatMap {
-      _.stateMap.getAll().map { case (k, s, _) => (k, s) }.toTraversable }
+      _.stateMap.getAll().map { case (k, s, _) => (k, s) 
}.iterator.to(Iterable) }
   }
 
   def keyClass: Class[_] = implicitly[ClassTag[KeyType]].runtimeClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to