This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new a9048fd [SPARK-34275][CORE][SQL][MLLIB] Replaces filter and size with
count
a9048fd is described below
commit a9048fdeb4d266fe3f1c14a33d8bedba4b88e6d2
Author: yangjie01 <[email protected]>
AuthorDate: Thu Jan 28 15:27:07 2021 +0900
[SPARK-34275][CORE][SQL][MLLIB] Replaces filter and size with count
### What changes were proposed in this pull request?
Use `count` to simplify `find + size(or length)` operation, it's
semantically consistent, but looks simpler.
**Before**
```
seq.filter(p).size
```
**After**
```
seq.count(p)
```
### Why are the changes needed?
Code Simpilefications.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes #31374 from LuciferYang/SPARK-34275.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 15445a8d9e8dd8660aa668a5b82ba2cbc6a5a233)
Signed-off-by: HyukjinKwon <[email protected]>
---
.../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +-
.../org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala | 6 +++---
core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 4 ++--
.../spark/storage/BlockManagerDecommissionIntegrationSuite.scala | 8 ++++----
.../org/apache/spark/ml/classification/NaiveBayesSuite.scala | 4 ++--
.../org/apache/spark/sql/TypedImperativeAggregateSuite.scala | 4 ++--
6 files changed, 14 insertions(+), 14 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index a83762f..bdb768e 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -908,7 +908,7 @@ private[spark] class ExecutorAllocationManager(
*/
def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = {
val attempts = resourceProfileIdToStageAttempt.getOrElse(rp,
Set.empty).toSeq
- attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size
+ attempts.count(attempt => unschedulableTaskSets.contains(attempt))
}
def hasPendingTasks: Boolean = {
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index 8dbdc84..b244475 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -183,7 +183,7 @@ private[spark] class ExecutorMonitor(
def pendingRemovalCount: Int = executors.asScala.count { case (_, exec) =>
exec.pendingRemoval }
def pendingRemovalCountPerResourceProfileId(id: Int): Int = {
- executors.asScala.filter { case (k, v) => v.resourceProfileId == id &&
v.pendingRemoval }.size
+ executors.asScala.count { case (k, v) => v.resourceProfileId == id &&
v.pendingRemoval }
}
def decommissioningCount: Int = executors.asScala.count { case (_, exec) =>
@@ -191,9 +191,9 @@ private[spark] class ExecutorMonitor(
}
def decommissioningPerResourceProfileId(id: Int): Int = {
- executors.asScala.filter { case (k, v) =>
+ executors.asScala.count { case (k, v) =>
v.resourceProfileId == id && v.decommissioning
- }.size
+ }
}
override def onJobStart(event: SparkListenerJobStart): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 8c9c217..a728108 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -154,7 +154,7 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
}
x
}).count()
- assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1)
+ assert(sc.listFiles().count(_.contains("somesuffix1")) == 1)
} finally {
sc.stop()
}
@@ -245,7 +245,7 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
try {
sc = new SparkContext(new
SparkConf().setAppName("test").setMaster("local"))
sc.addJar(jarPath.toString)
- assert(sc.listJars().filter(_.contains("TestUDTF.jar")).size == 1)
+ assert(sc.listJars().count(_.contains("TestUDTF.jar")) == 1)
} finally {
sc.stop()
}
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 672bd8c..e461474 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -288,14 +288,14 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
// If we're migrating shuffles we look for any shuffle block updates
// as there is no block update on the initial shuffle block write.
if (shuffle) {
- val numDataLocs = blocksUpdated.filter { update =>
+ val numDataLocs = blocksUpdated.count { update =>
val blockId = update.blockUpdatedInfo.blockId
blockId.isInstanceOf[ShuffleDataBlockId]
- }.size
- val numIndexLocs = blocksUpdated.filter { update =>
+ }
+ val numIndexLocs = blocksUpdated.count { update =>
val blockId = update.blockUpdatedInfo.blockId
blockId.isInstanceOf[ShuffleIndexBlockId]
- }.size
+ }
assert(numDataLocs === 1, s"Expect shuffle data block updates in
${blocksUpdated}")
assert(numIndexLocs === 1, s"Expect shuffle index block updates in
${blocksUpdated}")
}
diff --git
a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
index af76f04..6742d61 100644
---
a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
+++
b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
@@ -80,10 +80,10 @@ class NaiveBayesSuite extends MLTest with
DefaultReadWriteTest {
}
def validatePrediction(predictionAndLabels: Seq[Row]): Unit = {
- val numOfErrorPredictions = predictionAndLabels.filter {
+ val numOfErrorPredictions = predictionAndLabels.count {
case Row(prediction: Double, label: Double) =>
prediction != label
- }.length
+ }
// At least 80% of the predictions should be on.
assert(numOfErrorPredictions < predictionAndLabels.length / 5)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
index f2b608b..54fc090 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
@@ -147,9 +147,9 @@ class TypedImperativeAggregateSuite extends QueryTest with
SharedSparkSession {
val query = df.select(typedMax($"key"), count($"key"), typedMax($"value"),
count($"value"))
val maxKey = nullableData.map(_._1).filter(_ != null).max
- val countKey = nullableData.map(_._1).filter(_ != null).size
+ val countKey = nullableData.map(_._1).count(_ != null)
val maxValue = nullableData.map(_._2).filter(_ != null).max
- val countValue = nullableData.map(_._2).filter(_ != null).size
+ val countValue = nullableData.map(_._2).count(_ != null)
val expected = Seq(Row(maxKey, countKey, maxValue, countValue))
checkAnswer(query, expected)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]