This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2a667fbff7e [MINOR] Improve to update some mutable hash maps
2a667fbff7e is described below
commit 2a667fbff7e7fd21bf69668ef78b175f60a24dba
Author: weixiuli <[email protected]>
AuthorDate: Mon Apr 18 09:22:24 2022 -0500
[MINOR] Improve to update some mutable hash maps
### What changes were proposed in this pull request?
Improve to update some mutable hash maps
### Why are the changes needed?
Reduce some mutable hash maps calls and cleanup code.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unittests.
Closes #36179 from weixiuli/update-numBlocksInFlightPerAddress.
Authored-by: weixiuli <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../main/scala/org/apache/spark/deploy/worker/Worker.scala | 4 ++--
.../scala/org/apache/spark/resource/ResourceAllocator.scala | 4 ++--
.../scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala | 2 +-
.../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 11 +++++------
.../scala/org/apache/spark/ml/classification/NaiveBayes.scala | 2 +-
.../src/main/scala/org/apache/spark/ml/stat/Summarizer.scala | 2 +-
6 files changed, 12 insertions(+), 13 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 2c7021bdcb9..b26a0f82d69 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -264,13 +264,13 @@ private[deploy] class Worker(
private def addResourcesUsed(deltaInfo: Map[String, ResourceInformation]):
Unit = {
deltaInfo.foreach { case (rName, rInfo) =>
- resourcesUsed(rName) = resourcesUsed(rName) + rInfo
+ resourcesUsed(rName) += rInfo
}
}
private def removeResourcesUsed(deltaInfo: Map[String,
ResourceInformation]): Unit = {
deltaInfo.foreach { case (rName, rInfo) =>
- resourcesUsed(rName) = resourcesUsed(rName) - rInfo
+ resourcesUsed(rName) -= rInfo
}
}
diff --git
a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
index 7605e8c44b9..10cf0402d5f 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
@@ -82,7 +82,7 @@ private[spark] trait ResourceAllocator {
}
val isAvailable = addressAvailabilityMap(address)
if (isAvailable > 0) {
- addressAvailabilityMap(address) = addressAvailabilityMap(address) - 1
+ addressAvailabilityMap(address) -= 1
} else {
throw new SparkException("Try to acquire an address that is not
available. " +
s"$resourceName address $address is not available.")
@@ -103,7 +103,7 @@ private[spark] trait ResourceAllocator {
}
val isAvailable = addressAvailabilityMap(address)
if (isAvailable < slotsPerAddress) {
- addressAvailabilityMap(address) = addressAvailabilityMap(address) + 1
+ addressAvailabilityMap(address) += 1
} else {
throw new SparkException(s"Try to release an address that is not
assigned. $resourceName " +
s"address $address is not assigned.")
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index 230ec7efdb1..15a9ddd40e6 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -317,7 +317,7 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf)
extends Logging {
pushResult: PushResult): Boolean = synchronized {
remainingBlocks -= pushResult.blockId
bytesInFlight -= bytesPushed
- numBlocksInFlightPerAddress(address) =
numBlocksInFlightPerAddress(address) - 1
+ numBlocksInFlightPerAddress(address) -= 1
if (remainingBlocks.isEmpty) {
reqsInFlight -= 1
}
diff --git
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index e2fc5389091..c91aaa8ddb7 100644
---
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -763,7 +763,7 @@ final class ShuffleBlockFetcherIterator(
shuffleMetrics.incLocalBlocksFetched(1)
shuffleMetrics.incLocalBytesRead(buf.size)
} else {
- numBlocksInFlightPerAddress(address) =
numBlocksInFlightPerAddress(address) - 1
+ numBlocksInFlightPerAddress(address) -= 1
shuffleMetrics.incRemoteBytesRead(buf.size)
if (buf.isInstanceOf[FileSegmentManagedBuffer]) {
shuffleMetrics.incRemoteBytesReadToDisk(buf.size)
@@ -905,8 +905,7 @@ final class ShuffleBlockFetcherIterator(
case DeferFetchRequestResult(request) =>
val address = request.address
- numBlocksInFlightPerAddress(address) =
- numBlocksInFlightPerAddress(address) - request.blocks.size
+ numBlocksInFlightPerAddress(address) -= request.blocks.size
bytesInFlight -= request.size
reqsInFlight -= 1
logDebug("Number of requests in flight " + reqsInFlight)
@@ -924,7 +923,7 @@ final class ShuffleBlockFetcherIterator(
// 3. Failure to get the push-merged-local directories from the
external shuffle service.
// In this case, the blockId is ShuffleBlockId.
if (pushBasedFetchHelper.isRemotePushMergedBlockAddress(address)) {
- numBlocksInFlightPerAddress(address) =
numBlocksInFlightPerAddress(address) - 1
+ numBlocksInFlightPerAddress(address) -= 1
bytesInFlight -= size
}
if (isNetworkReqDone) {
@@ -975,7 +974,7 @@ final class ShuffleBlockFetcherIterator(
// The original meta request is processed so we decrease
numBlocksToFetch and
// numBlocksInFlightPerAddress by 1. We will collect new shuffle
chunks request and the
// count of this is added to numBlocksToFetch in
collectFetchReqsFromMergedBlocks.
- numBlocksInFlightPerAddress(address) =
numBlocksInFlightPerAddress(address) - 1
+ numBlocksInFlightPerAddress(address) -= 1
numBlocksToFetch -= 1
val blocksToFetch =
pushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(
shuffleId, shuffleMergeId, reduceId, blockSize, bitmaps)
@@ -988,7 +987,7 @@ final class ShuffleBlockFetcherIterator(
case PushMergedRemoteMetaFailedFetchResult(
shuffleId, shuffleMergeId, reduceId, address) =>
// The original meta request failed so we decrease
numBlocksInFlightPerAddress by 1.
- numBlocksInFlightPerAddress(address) =
numBlocksInFlightPerAddress(address) - 1
+ numBlocksInFlightPerAddress(address) -= 1
// If we fail to fetch the meta of a push-merged block, we fall back
to fetching the
// original blocks.
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
index dde4234a2e8..16176136a7e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
@@ -502,7 +502,7 @@ class NaiveBayesModel private[ml] (
j = 0
while (j < probArray.length) {
- probArray(j) = probArray(j) - logSumExp
+ probArray(j) -= logSumExp
j += 1
}
Vectors.dense(probArray)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
index a3dd133a4ce..7fd99faf0c8 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
@@ -596,7 +596,7 @@ private[spark] class SummarizerBuffer(
// merge max and min
if (currMax != null) { currMax(i) = math.max(currMax(i),
other.currMax(i)) }
if (currMin != null) { currMin(i) = math.min(currMin(i),
other.currMin(i)) }
- if (nnz != null) { nnz(i) = nnz(i) + other.nnz(i) }
+ if (nnz != null) { nnz(i) += other.nnz(i) }
i += 1
}
} else if (totalWeightSum == 0.0 && other.totalWeightSum != 0.0) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]