This is an automated email from the ASF dual-hosted git repository.
fchen pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 85c6cf41b [CELEBORN-1121] Improve WorkerInfo#hashCode method
85c6cf41b is described below
commit 85c6cf41b9f3c186c3ba1778ccd0feb14af151f1
Author: onebox-li <[email protected]>
AuthorDate: Fri Nov 17 10:31:57 2023 +0800
[CELEBORN-1121] Improve WorkerInfo#hashCode method
### What changes were proposed in this pull request?
Change WorkerInfo#hashCode() from map+foldLeft to while and cache.
Test the each way to calculate, code and result show as below:
```
val state = Seq(host, rpcPort, pushPort, fetchPort, replicatePort)
// origin
val originHash = state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
// for
var forHash = 0
for (i <- state) {
forHash = 31 * forHash + i.hashCode()
}
// while
var whileHash = 0
var i = 0
while (i < state.size) {
whileHash = 31 * whileHash + state(i).hashCode()
i = i + 1
}
```
Result:
```
java version "1.8.0_261"
origin hash result = -831724440, costs 1103914 ns
for hash result = -831724440, costs 444588 ns (2.5x)
while hash result = -831724440, costs 46510 ns (23x)
```
### Why are the changes needed?
The current WorkerInfo's hashCode() is a little time-consuming. Since it is
widely used in lots of hash maps, it needs to be improved.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added UT.
Closes #2086 from onebox-li/improve-worker-hash.
Authored-by: onebox-li <[email protected]>
Signed-off-by: Fu Chen <[email protected]>
(cherry picked from commit b5c5aa6d9d97949bda790076d218fccbc8c9bb4d)
Signed-off-by: Fu Chen <[email protected]>
---
.../apache/celeborn/common/meta/WorkerInfo.scala | 8 ++++--
.../celeborn/common/ComputeIfAbsentBenchmark.scala | 4 +--
.../celeborn/common/meta/WorkerInfoSuite.scala | 33 ++++++++++++++++++++--
3 files changed, 38 insertions(+), 7 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 597784816..65dc22918 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -232,8 +232,12 @@ class WorkerInfo(
}
override def hashCode(): Int = {
- val state = Seq(host, rpcPort, pushPort, fetchPort, replicatePort)
- state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+ var result = host.hashCode()
+ result = 31 * result + rpcPort.hashCode()
+ result = 31 * result + pushPort.hashCode()
+ result = 31 * result + fetchPort.hashCode()
+ result = 31 * result + replicatePort.hashCode()
+ result
}
}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/ComputeIfAbsentBenchmark.scala
b/common/src/test/scala/org/apache/celeborn/common/ComputeIfAbsentBenchmark.scala
index eeee4d78a..eb347e64f 100644
---
a/common/src/test/scala/org/apache/celeborn/common/ComputeIfAbsentBenchmark.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/ComputeIfAbsentBenchmark.scala
@@ -30,9 +30,9 @@ import org.apache.celeborn.benchmark.{Benchmark,
BenchmarkBase}
* ComputeIfAbsent benchmark.
* To run this benchmark:
* {{{
- * 1. build/sbt "common/test:runMain <this class>"
+ * 1. build/sbt "celeborn-common/test:runMain <this class>"
* 2. generate result:
- * CELEBORN_GENERATE_BENCHMARK_FILES=1 build/sbt "common/test:runMain
<this class>"
+ * CELEBORN_GENERATE_BENCHMARK_FILES=1 build/sbt
"celeborn-common/test:runMain <this class>"
* Results will be written to
"benchmarks/ComputeIfAbsentBenchmark-results.txt".
* }}}
*/
diff --git
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index 93f210f1f..b42862924 100644
---
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import scala.reflect.ClassTag
+import scala.util.Random
import org.junit.Assert.{assertEquals, assertNotEquals, assertNotNull}
@@ -32,8 +32,7 @@ import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.quota.ResourceConsumption
-import org.apache.celeborn.common.rpc.{RpcAddress, RpcEndpointAddress,
RpcEndpointRef, RpcEnv, RpcTimeout}
-import org.apache.celeborn.common.rpc.netty.{NettyRpcEndpointRef, NettyRpcEnv}
+import org.apache.celeborn.common.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils}
class WorkerInfoSuite extends CelebornFunSuite {
@@ -303,4 +302,32 @@ class WorkerInfoSuite extends CelebornFunSuite {
}
}
}
+
+ def generateRandomIPv4Address: String = {
+ val ipAddress = new StringBuilder
+ for (i <- 0 until 4) {
+ ipAddress.append(Random.nextInt(256))
+ if (i < 3) ipAddress.append(".")
+ }
+ ipAddress.toString
+ }
+
+ test("Test WorkerInfo hashcode") {
+ val host = generateRandomIPv4Address
+ val rpcPort = Random.nextInt(65536)
+ val pushPort = Random.nextInt(65536)
+ val fetchPort = Random.nextInt(65536)
+ val replicatePort = Random.nextInt(65536)
+ val workerInfo = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort)
+
+ // origin hashCode() logic
+ val state = Seq(host, rpcPort, pushPort, fetchPort, replicatePort)
+ val originHash = state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+
+ val hashCode1 = workerInfo.hashCode()
+ assert(originHash === hashCode1)
+
+ val hashCode2 = workerInfo.hashCode()
+ assert(hashCode1 === hashCode2)
+ }
}