This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new a9048fd  [SPARK-34275][CORE][SQL][MLLIB] Replaces filter and size with 
count
a9048fd is described below

commit a9048fdeb4d266fe3f1c14a33d8bedba4b88e6d2
Author: yangjie01 <[email protected]>
AuthorDate: Thu Jan 28 15:27:07 2021 +0900

    [SPARK-34275][CORE][SQL][MLLIB] Replaces filter and size with count
    
    ### What changes were proposed in this pull request?
    Use `count` to simplify `find + size(or length)` operation, it's 
semantically consistent, but looks simpler.
    
    **Before**
    ```
    seq.filter(p).size
    ```
    
    **After**
    ```
    seq.count(p)
    ```
    
    ### Why are the changes needed?
    Code Simpilefications.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass the Jenkins or GitHub Action
    
    Closes #31374 from LuciferYang/SPARK-34275.
    
    Authored-by: yangjie01 <[email protected]>
    Signed-off-by: HyukjinKwon <[email protected]>
    (cherry picked from commit 15445a8d9e8dd8660aa668a5b82ba2cbc6a5a233)
    Signed-off-by: HyukjinKwon <[email protected]>
---
 .../main/scala/org/apache/spark/ExecutorAllocationManager.scala   | 2 +-
 .../org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala     | 6 +++---
 core/src/test/scala/org/apache/spark/SparkContextSuite.scala      | 4 ++--
 .../spark/storage/BlockManagerDecommissionIntegrationSuite.scala  | 8 ++++----
 .../org/apache/spark/ml/classification/NaiveBayesSuite.scala      | 4 ++--
 .../org/apache/spark/sql/TypedImperativeAggregateSuite.scala      | 4 ++--
 6 files changed, 14 insertions(+), 14 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index a83762f..bdb768e 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -908,7 +908,7 @@ private[spark] class ExecutorAllocationManager(
      */
     def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = {
       val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, 
Set.empty).toSeq
-      attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size
+      attempts.count(attempt => unschedulableTaskSets.contains(attempt))
     }
 
     def hasPendingTasks: Boolean = {
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala 
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index 8dbdc84..b244475 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -183,7 +183,7 @@ private[spark] class ExecutorMonitor(
   def pendingRemovalCount: Int = executors.asScala.count { case (_, exec) => 
exec.pendingRemoval }
 
   def pendingRemovalCountPerResourceProfileId(id: Int): Int = {
-    executors.asScala.filter { case (k, v) => v.resourceProfileId == id && 
v.pendingRemoval }.size
+    executors.asScala.count { case (k, v) => v.resourceProfileId == id && 
v.pendingRemoval }
   }
 
   def decommissioningCount: Int = executors.asScala.count { case (_, exec) =>
@@ -191,9 +191,9 @@ private[spark] class ExecutorMonitor(
   }
 
   def decommissioningPerResourceProfileId(id: Int): Int = {
-    executors.asScala.filter { case (k, v) =>
+    executors.asScala.count { case (k, v) =>
       v.resourceProfileId == id && v.decommissioning
-    }.size
+    }
   }
 
   override def onJobStart(event: SparkListenerJobStart): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 8c9c217..a728108 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -154,7 +154,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
           }
           x
         }).count()
-        assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1)
+        assert(sc.listFiles().count(_.contains("somesuffix1")) == 1)
       } finally {
         sc.stop()
       }
@@ -245,7 +245,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
     try {
       sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))
       sc.addJar(jarPath.toString)
-      assert(sc.listJars().filter(_.contains("TestUDTF.jar")).size == 1)
+      assert(sc.listJars().count(_.contains("TestUDTF.jar")) == 1)
     } finally {
       sc.stop()
     }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 672bd8c..e461474 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -288,14 +288,14 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
       // If we're migrating shuffles we look for any shuffle block updates
       // as there is no block update on the initial shuffle block write.
       if (shuffle) {
-        val numDataLocs = blocksUpdated.filter { update =>
+        val numDataLocs = blocksUpdated.count { update =>
           val blockId = update.blockUpdatedInfo.blockId
           blockId.isInstanceOf[ShuffleDataBlockId]
-        }.size
-        val numIndexLocs = blocksUpdated.filter { update =>
+        }
+        val numIndexLocs = blocksUpdated.count { update =>
           val blockId = update.blockUpdatedInfo.blockId
           blockId.isInstanceOf[ShuffleIndexBlockId]
-        }.size
+        }
         assert(numDataLocs === 1, s"Expect shuffle data block updates in 
${blocksUpdated}")
         assert(numIndexLocs === 1, s"Expect shuffle index block updates in 
${blocksUpdated}")
       }
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
index af76f04..6742d61 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
@@ -80,10 +80,10 @@ class NaiveBayesSuite extends MLTest with 
DefaultReadWriteTest {
   }
 
   def validatePrediction(predictionAndLabels: Seq[Row]): Unit = {
-    val numOfErrorPredictions = predictionAndLabels.filter {
+    val numOfErrorPredictions = predictionAndLabels.count {
       case Row(prediction: Double, label: Double) =>
         prediction != label
-    }.length
+    }
     // At least 80% of the predictions should be on.
     assert(numOfErrorPredictions < predictionAndLabels.length / 5)
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
index f2b608b..54fc090 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
@@ -147,9 +147,9 @@ class TypedImperativeAggregateSuite extends QueryTest with 
SharedSparkSession {
     val query = df.select(typedMax($"key"), count($"key"), typedMax($"value"),
       count($"value"))
     val maxKey = nullableData.map(_._1).filter(_ != null).max
-    val countKey = nullableData.map(_._1).filter(_ != null).size
+    val countKey = nullableData.map(_._1).count(_ != null)
     val maxValue = nullableData.map(_._2).filter(_ != null).max
-    val countValue = nullableData.map(_._2).filter(_ != null).size
+    val countValue = nullableData.map(_._2).count(_ != null)
     val expected = Seq(Row(maxKey, countKey, maxValue, countValue))
     checkAnswer(query, expected)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to