This is an automated email from the ASF dual-hosted git repository. ethanfeng pushed a commit to branch branch-0.2 in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 7693140bf8130cb0f1ad1c24248957aefc997ff2 Author: Angerszhuuuu <[email protected]> AuthorDate: Thu Feb 2 14:03:45 2023 +0800 [CELEBORN-266] Fix wrong old version configurations (#1198) (cherry picked from commit ced08a1d89740a1eaace03fde91ee9cb6319a22f) --- .../celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala | 4 ++-- .../scala/org/apache/celeborn/tests/spark/PushdataTimeoutTest.scala | 2 +- .../scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala | 4 +--- .../test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala | 4 ++-- toolkit/scripts/genConfs.py | 1 - 5 files changed, 6 insertions(+), 9 deletions(-) diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala index a29c5248..62b7fd5a 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala @@ -62,8 +62,8 @@ class AppDiskUsageMetricSuite extends AnyFunSuite Thread.sleep(5000) val conf = new CelebornConf() - conf.set("rss.metrics.app.topDiskUsage.windowSize", "5") - conf.set("rss.metrids.app.topDiskUsage.interval", "2s") + conf.set("celeborn.metrics.app.topDiskUsage.windowSize", "5") + conf.set("celeborn.metrics.app.topDiskUsage.interval", "2s") val usageMetric = new AppDiskUsageMetric(conf) val map1 = new util.HashMap[String, java.lang.Long]() diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushdataTimeoutTest.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushdataTimeoutTest.scala index 74eb2d60..48630209 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushdataTimeoutTest.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushdataTimeoutTest.scala @@ -29,7 +29,7 @@ class PushdataTimeoutTest extends AnyFunSuite with BeforeAndAfterEach { override def beforeAll(): Unit = { - logInfo("test initialized , setup rss mini cluster") + logInfo("test initialized , setup celeborn mini cluster") val workerConf = Map( "celeborn.test.pushdataTimeout" -> s"true") setUpMiniCluster(masterConfs = null, workerConfs = workerConf) diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala index 6aa529df..92b036fa 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala @@ -41,8 +41,7 @@ class ShuffleFallbackSuite extends AnyFunSuite private def enableRss(conf: SparkConf) = { conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.celeborn.RssShuffleManager") - .set("spark.rss.master.address", masterInfo._1.rpcEnv.address.toString) - .set("spark.rss.shuffle.split.threshold", "10MB") + .set("spark.celeborn.master.endpoints", masterInfo._1.rpcEnv.address.toString) } test(s"celeborn spark integration test - fallback") { @@ -53,7 +52,6 @@ class ShuffleFallbackSuite extends AnyFunSuite enableRss(sparkConf) val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() - import sparkSession.implicits._ val df = sparkSession.sparkContext.parallelize(1 to 120000, 8) .repartition(100) df.collect() diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala index 69f3768f..d74cdef1 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala @@ -41,8 +41,8 @@ class SkewJoinSuite extends AnyFunSuite private def enableRss(conf: SparkConf) = { conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.celeborn.RssShuffleManager") - .set("spark.rss.master.address", masterInfo._1.rpcEnv.address.toString) - .set("spark.rss.shuffle.split.threshold", "10MB") + .set("spark.celeborn.master.endpoints", masterInfo._1.rpcEnv.address.toString) + .set("spark.celeborn.shuffle.partitionSplit.threshold", "10MB") } CompressionCodec.values.foreach { codec => diff --git a/toolkit/scripts/genConfs.py b/toolkit/scripts/genConfs.py index a3830888..8ab51a82 100644 --- a/toolkit/scripts/genConfs.py +++ b/toolkit/scripts/genConfs.py @@ -96,7 +96,6 @@ def set_skew_join_confs(conf): def set_rss_confs(conf, replicate=False): conf["spark.shuffle.manager"] = "org.apache.spark.shuffle.celeborn.RssShuffleManager" conf["spark.serializer"] = "org.apache.spark.serializer.KryoSerializer" - conf["spark.rss.master.address"] = "master-1-1:9097" conf["spark.celeborn.master.endpoints"] = "master-1-1:9097" conf["spark.shuffle.service.enabled"] = "false"
