This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ced08a1d [CELEBORN-266] Fix wrong old version configurations (#1198)
ced08a1d is described below
commit ced08a1d89740a1eaace03fde91ee9cb6319a22f
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Feb 2 14:03:45 2023 +0800
[CELEBORN-266] Fix wrong old version configurations (#1198)
---
.../celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala | 4 ++--
.../scala/org/apache/celeborn/tests/spark/PushdataTimeoutTest.scala | 2 +-
.../scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala | 4 +---
.../test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala | 4 ++--
toolkit/scripts/genConfs.py | 1 -
5 files changed, 6 insertions(+), 9 deletions(-)
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala
index a29c5248..62b7fd5a 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala
@@ -62,8 +62,8 @@ class AppDiskUsageMetricSuite extends AnyFunSuite
Thread.sleep(5000)
val conf = new CelebornConf()
- conf.set("rss.metrics.app.topDiskUsage.windowSize", "5")
- conf.set("rss.metrids.app.topDiskUsage.interval", "2s")
+ conf.set("celeborn.metrics.app.topDiskUsage.windowSize", "5")
+ conf.set("celeborn.metrics.app.topDiskUsage.interval", "2s")
val usageMetric = new AppDiskUsageMetric(conf)
val map1 = new util.HashMap[String, java.lang.Long]()
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushdataTimeoutTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushdataTimeoutTest.scala
index 74eb2d60..48630209 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushdataTimeoutTest.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushdataTimeoutTest.scala
@@ -29,7 +29,7 @@ class PushdataTimeoutTest extends AnyFunSuite
with BeforeAndAfterEach {
override def beforeAll(): Unit = {
- logInfo("test initialized , setup rss mini cluster")
+ logInfo("test initialized , setup celeborn mini cluster")
val workerConf = Map(
"celeborn.test.pushdataTimeout" -> s"true")
setUpMiniCluster(masterConfs = null, workerConfs = workerConf)
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala
index 6aa529df..92b036fa 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala
@@ -41,8 +41,7 @@ class ShuffleFallbackSuite extends AnyFunSuite
private def enableRss(conf: SparkConf) = {
conf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
- .set("spark.rss.master.address", masterInfo._1.rpcEnv.address.toString)
- .set("spark.rss.shuffle.split.threshold", "10MB")
+ .set("spark.celeborn.master.endpoints",
masterInfo._1.rpcEnv.address.toString)
}
test(s"celeborn spark integration test - fallback") {
@@ -53,7 +52,6 @@ class ShuffleFallbackSuite extends AnyFunSuite
enableRss(sparkConf)
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
- import sparkSession.implicits._
val df = sparkSession.sparkContext.parallelize(1 to 120000, 8)
.repartition(100)
df.collect()
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala
index 69f3768f..d74cdef1 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala
@@ -41,8 +41,8 @@ class SkewJoinSuite extends AnyFunSuite
private def enableRss(conf: SparkConf) = {
conf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
- .set("spark.rss.master.address", masterInfo._1.rpcEnv.address.toString)
- .set("spark.rss.shuffle.split.threshold", "10MB")
+ .set("spark.celeborn.master.endpoints",
masterInfo._1.rpcEnv.address.toString)
+ .set("spark.celeborn.shuffle.partitionSplit.threshold", "10MB")
}
CompressionCodec.values.foreach { codec =>
diff --git a/toolkit/scripts/genConfs.py b/toolkit/scripts/genConfs.py
index a3830888..8ab51a82 100644
--- a/toolkit/scripts/genConfs.py
+++ b/toolkit/scripts/genConfs.py
@@ -96,7 +96,6 @@ def set_skew_join_confs(conf):
def set_rss_confs(conf, replicate=False):
conf["spark.shuffle.manager"] =
"org.apache.spark.shuffle.celeborn.RssShuffleManager"
conf["spark.serializer"] = "org.apache.spark.serializer.KryoSerializer"
- conf["spark.rss.master.address"] = "master-1-1:9097"
conf["spark.celeborn.master.endpoints"] = "master-1-1:9097"
conf["spark.shuffle.service.enabled"] = "false"