This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a3146c83d98 [SPARK-45663][CORE][MLLIB] Replace 
`IterableOnceOps#aggregate` with `IterableOnceOps#foldLeft`
a3146c83d98 is described below

commit a3146c83d98fe76aeb6880a40b61fcdd257685ce
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Thu Oct 26 13:20:56 2023 +0800

    [SPARK-45663][CORE][MLLIB] Replace `IterableOnceOps#aggregate` with 
`IterableOnceOps#foldLeft`
    
    ### What changes were proposed in this pull request?
    This pr replace `IterableOnceOps#aggregate` with `IterableOnceOps#foldLeft` 
due to `aggregate` has been marked as deprecated since Scala 2.13.0.
    
    ```scala
      deprecated("`aggregate` is not relevant for sequential collections. Use 
`foldLeft(z)(seqop)` instead.", "2.13.0")
      def aggregate[B](z: => B)(seqop: (B, A) => B, combop: (B, B) => B): B = 
foldLeft(z)(seqop)
    ```
    
    ### Why are the changes needed?
    Clean up deprecated API usage.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43527 from LuciferYang/SPARK-45663.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala                   | 5 ++---
 .../scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala | 2 +-
 .../scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala     | 5 ++---
 3 files changed, 5 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index c6770c77b92..5dc666c62d1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1219,8 +1219,7 @@ abstract class RDD[T: ClassTag](
     // Clone the zero value since we will also be serializing it as part of 
tasks
     var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
     val cleanSeqOp = sc.clean(seqOp)
-    val cleanCombOp = sc.clean(combOp)
-    val aggregatePartition = (it: Iterator[T]) => 
it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
+    val aggregatePartition = (it: Iterator[T]) => 
it.foldLeft(zeroValue)(cleanSeqOp)
     val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, 
taskResult)
     sc.runJob(this, aggregatePartition, mergeResult)
     jobResult
@@ -1258,7 +1257,7 @@ abstract class RDD[T: ClassTag](
       val cleanSeqOp = context.clean(seqOp)
       val cleanCombOp = context.clean(combOp)
       val aggregatePartition =
-        (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
+        (it: Iterator[T]) => it.foldLeft(zeroValue)(cleanSeqOp)
       var partiallyAggregated: RDD[U] = mapPartitions(it => 
Iterator(aggregatePartition(it)))
       var numPartitions = partiallyAggregated.partitions.length
       val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / 
depth)).toInt, 2)
diff --git 
a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
 
b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
index ce46fc8f201..f08cf44e4e1 100644
--- 
a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
@@ -69,7 +69,7 @@ private[spark] object StratifiedSamplingUtils extends Logging 
{
       val rng = new RandomDataGenerator()
       rng.reSeed(seed + partition)
       val seqOp = getSeqOp(withReplacement, fractions, rng, counts)
-      Iterator(iter.aggregate(zeroU)(seqOp, combOp))
+      Iterator(iter.foldLeft(zeroU)(seqOp))
     }
     mappedPartitionRDD.reduce(combOp)
   }
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
index cbe2776f664..2b86c7cd344 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
@@ -78,9 +78,8 @@ private[evaluation] object AreaUnderCurve {
    * @param curve an iterator over ordered 2D points stored in pairs 
representing a curve
    */
   def of(curve: Iterable[(Double, Double)]): Double = {
-    curve.iterator.sliding(2).withPartial(false).aggregate(0.0)(
-      seqop = (auc: Double, points: Seq[(Double, Double)]) => auc + 
trapezoid(points),
-      combop = _ + _
+    curve.iterator.sliding(2).withPartial(false).foldLeft(0.0)(
+      op = (auc: Double, points: Seq[(Double, Double)]) => auc + 
trapezoid(points)
     )
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to