This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new bb5ba4429 [CELEBORN-653][TEST] Fix invalid configuration key in 
SparkTestBase
bb5ba4429 is described below

commit bb5ba4429f44694ab10345e0df64f992a38ee8b6
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jun 9 10:31:25 2023 +0800

    [CELEBORN-653][TEST] Fix invalid configuration key in SparkTestBase
    
    ### What changes were proposed in this pull request?
    
    Dot is missing after `spark`
    
    ### Why are the changes needed?
    
    Correct the configuration key.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #1563 from pan3793/CELEBORN-653.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 588dbdfbe0fdef14e41d0e16f2247e8e6a4b6099)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../scala/org/apache/celeborn/common/CelebornConf.scala     |  8 ++++----
 .../apache/celeborn/tests/client/ShuffleClientSuite.scala   |  2 +-
 .../org/apache/celeborn/tests/spark/HugeDataTest.scala      |  5 ++++-
 .../apache/celeborn/tests/spark/PushDataTimeoutTest.scala   |  7 +++++--
 .../apache/celeborn/tests/spark/RetryCommitFilesTest.scala  |  5 ++++-
 .../org/apache/celeborn/tests/spark/RetryReviveTest.scala   |  5 ++++-
 .../apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala |  4 +++-
 .../org/apache/celeborn/tests/spark/RssHashSuite.scala      |  4 +++-
 .../celeborn/tests/spark/RssNonPipelineSortSuite.scala      |  6 ++++--
 .../apache/celeborn/tests/spark/RssPipelineSortSuite.scala  |  7 +++++--
 .../org/apache/celeborn/tests/spark/SparkTestBase.scala     | 13 +++++--------
 11 files changed, 42 insertions(+), 24 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 86818f74d..2128a2218 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -710,7 +710,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   //               Shuffle Client Push                   //
   // //////////////////////////////////////////////////////
   def clientPushReplicateEnabled: Boolean = get(CLIENT_PUSH_REPLICATE_ENABLED)
-  def clientPushBufferInitialSize: Int = 
get(client_PUSH_BUFFER_INITIAL_SIZE).toInt
+  def clientPushBufferInitialSize: Int = 
get(CLIENT_PUSH_BUFFER_INITIAL_SIZE).toInt
   def clientPushBufferMaxSize: Int = get(CLIENT_PUSH_BUFFER_MAX_SIZE).toInt
   def clientPushQueueCapacity: Int = get(CLIENT_PUSH_QUEUE_CAPACITY)
   def clientPushMaxReqsInFlight: Int = get(CLIENT_PUSH_MAX_REQS_IN_FLIGHT)
@@ -718,7 +718,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def clientPushSortMemoryThreshold: Long = 
get(CLIENT_PUSH_SORT_MEMORY_THRESHOLD)
   def clientPushSortPipelineEnabled: Boolean = 
get(CLIENT_PUSH_SORT_PIPELINE_ENABLED)
   def clientPushSortRandomizePartitionIdEnabled: Boolean =
-    get(CLIENT_PUSH_SORT_RANDOMIZE_PARITION_ENABLED)
+    get(CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED)
   def clientPushRetryThreads: Int = get(CLIENT_PUSH_RETRY_THREADS)
   def clientPushStageEndTimeout: Long = get(CLIENT_PUSH_STAGE_END_TIMEOUT)
   def clientPushUnsafeRowFastWrite: Boolean = 
get(CLIENT_PUSH_UNSAFEROW_FASTWRITE_ENABLED)
@@ -2510,7 +2510,7 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
-  val client_PUSH_BUFFER_INITIAL_SIZE: ConfigEntry[Long] =
+  val CLIENT_PUSH_BUFFER_INITIAL_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.client.push.buffer.initial.size")
       .withAlternative("celeborn.push.buffer.initial.size")
       .categories("client")
@@ -2647,7 +2647,7 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("50ms")
 
-  val CLIENT_PUSH_SORT_RANDOMIZE_PARITION_ENABLED: ConfigEntry[Boolean] =
+  val CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.client.push.sort.randomizePartitionId.enabled")
       .withAlternative("celeborn.push.sort.randomizePartitionId.enabled")
       .categories("client")
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
index 61372cbe1..a38cece56 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
@@ -53,7 +53,7 @@ class ShuffleClientSuite extends WithShuffleClientSuite with 
MiniClusterFeature
     }
 
     assertThrows[IOException] {
-      () -> shuffleClient.registerMapPartitionTask(APP, 1, 1, 0, 0, 1)
+      shuffleClient.registerMapPartitionTask(APP, 1, 1, 0, 0, 1)
     }
 
     lifecycleManager.stop()
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HugeDataTest.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HugeDataTest.scala
index 2918a3288..fbf825509 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HugeDataTest.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HugeDataTest.scala
@@ -23,6 +23,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.client.ShuffleClient
+import org.apache.celeborn.common.protocol.ShuffleMode
 
 class HugeDataTest extends AnyFunSuite
   with SparkTestBase
@@ -38,7 +39,9 @@ class HugeDataTest extends AnyFunSuite
 
   test("celeborn spark integration test - huge data") {
     val sparkConf = new 
SparkConf().setAppName("rss-demo").setMaster("local[2]")
-    val ss = SparkSession.builder().config(updateSparkConf(sparkConf, 
false)).getOrCreate()
+    val ss = SparkSession.builder()
+      .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+      .getOrCreate()
     val value = Range(1, 10000).mkString(",")
     val tuples = ss.sparkContext.parallelize(1 to 10000, 2)
       .map { i => (i, value) }.groupByKey(16).collect()
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
index dd35001e0..0e49c0d36 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
@@ -24,6 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.client.ShuffleClient
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
 
 class PushDataTimeoutTest extends AnyFunSuite
   with SparkTestBase
@@ -63,7 +64,8 @@ class PushDataTimeoutTest extends AnyFunSuite
       sparkSession.stop()
 
       val rssSparkSession = SparkSession.builder()
-        .config(updateSparkConf(sparkConf, false)).getOrCreate()
+        .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+        .getOrCreate()
       val rssCombineResult = combine(rssSparkSession)
       val rssGroupbyResult = groupBy(rssSparkSession)
       val rssRepartitionResult = repartition(rssSparkSession)
@@ -86,7 +88,8 @@ class PushDataTimeoutTest extends AnyFunSuite
       .set(s"spark.celeborn.data.push.timeoutCheck.interval", "2s")
       .set(s"spark.${CelebornConf.CLIENT_BLACKLIST_SLAVE_ENABLED.key}", 
"false")
     val rssSparkSession = SparkSession.builder()
-      .config(updateSparkConf(sparkConf, false)).getOrCreate()
+      .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+      .getOrCreate()
     try {
       combine(rssSparkSession)
     } catch {
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryCommitFilesTest.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryCommitFilesTest.scala
index 433e4d4dd..f1fba73b9 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryCommitFilesTest.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryCommitFilesTest.scala
@@ -24,6 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.client.ShuffleClient
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
 
 class RetryCommitFilesTest extends AnyFunSuite
   with SparkTestBase
@@ -48,7 +49,9 @@ class RetryCommitFilesTest extends AnyFunSuite
     val sparkConf = new SparkConf()
       .set(s"spark.${CelebornConf.TEST_CLIENT_RETRY_COMMIT_FILE.key}", "true")
       .setAppName("rss-demo").setMaster("local[2]")
-    val ss = SparkSession.builder().config(updateSparkConf(sparkConf, 
false)).getOrCreate()
+    val ss = SparkSession.builder()
+      .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+      .getOrCreate()
     ss.sparkContext.parallelize(1 to 1000, 2)
       .map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect()
     ss.stop()
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala
index 10d98e893..791ac05df 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala
@@ -24,6 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.client.ShuffleClient
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
 
 class RetryReviveTest extends AnyFunSuite
   with SparkTestBase
@@ -46,7 +47,9 @@ class RetryReviveTest extends AnyFunSuite
     val sparkConf = new SparkConf()
       .set(s"spark.${CelebornConf.TEST_CLIENT_RETRY_REVIVE.key}", "true")
       .setAppName("rss-demo").setMaster("local[2]")
-    val ss = SparkSession.builder().config(updateSparkConf(sparkConf, 
false)).getOrCreate()
+    val ss = SparkSession.builder()
+      .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+      .getOrCreate()
     ss.sparkContext.parallelize(1 to 1000, 2)
       .map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect()
     ss.stop()
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala
index 02d507d4b..d07a96463 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.client.ShuffleClient
+import org.apache.celeborn.common.protocol.ShuffleMode
 import org.apache.celeborn.service.deploy.worker.Worker
 
 class RssHashCheckDiskSuite extends AnyFunSuite
@@ -62,7 +63,8 @@ class RssHashCheckDiskSuite extends AnyFunSuite
     sparkSession.stop()
 
     val rssSparkSession = SparkSession.builder()
-      .config(updateSparkConf(sparkConf, false)).getOrCreate()
+      .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+      .getOrCreate()
     val rssCombineResult = combine(rssSparkSession)
     val rssGroupbyResult = groupBy(rssSparkSession)
     val rssRepartitionResult = repartition(rssSparkSession)
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashSuite.scala
index d3fa885d3..c1ac94ca0 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.client.ShuffleClient
+import org.apache.celeborn.common.protocol.ShuffleMode
 
 class RssHashSuite extends AnyFunSuite
   with SparkTestBase
@@ -48,7 +49,8 @@ class RssHashSuite extends AnyFunSuite
     sparkSession.stop()
 
     val rssSparkSession = SparkSession.builder()
-      .config(updateSparkConf(sparkConf, false)).getOrCreate()
+      .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+      .getOrCreate()
     val rssCombineResult = combine(rssSparkSession)
     val rssGroupbyResult = groupBy(rssSparkSession)
     val rssRepartitionResult = repartition(rssSparkSession)
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssNonPipelineSortSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssNonPipelineSortSuite.scala
index 598b5c97f..47f768494 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssNonPipelineSortSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssNonPipelineSortSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.client.ShuffleClient
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
 
 class RssNonPipelineSortSuite extends AnyFunSuite
   with SparkTestBase
@@ -40,7 +41,7 @@ class RssNonPipelineSortSuite extends AnyFunSuite
   test("celeborn spark integration test - non pipeline sort") {
     val sparkConf = new 
SparkConf().setAppName("rss-demo").setMaster("local[2]")
       .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}", 
"false")
-      
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARITION_ENABLED.key}", 
"false")
+      
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}", 
"false")
     val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
     val combineResult = combine(sparkSession)
     val groupbyResult = groupBy(sparkSession)
@@ -51,7 +52,8 @@ class RssNonPipelineSortSuite extends AnyFunSuite
     sparkSession.stop()
 
     val rssSparkSession = SparkSession.builder()
-      .config(updateSparkConf(sparkConf, true)).getOrCreate()
+      .config(updateSparkConf(sparkConf, ShuffleMode.SORT))
+      .getOrCreate()
     val rssCombineResult = combine(rssSparkSession)
     val rssGroupbyResult = groupBy(rssSparkSession)
     val rssRepartitionResult = repartition(rssSparkSession)
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssPipelineSortSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssPipelineSortSuite.scala
index 520053b6d..4cc280641 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssPipelineSortSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssPipelineSortSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.client.ShuffleClient
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
 
 class RssPipelineSortSuite extends AnyFunSuite
   with SparkTestBase
@@ -40,8 +41,10 @@ class RssPipelineSortSuite extends AnyFunSuite
   test("celeborn spark integration test - pipeline sort") {
     val sparkConf = new 
SparkConf().setAppName("rss-demo").setMaster("local[2]")
       .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}", 
"true")
-      
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARITION_ENABLED.key}", 
"true")
-    val ss = SparkSession.builder().config(updateSparkConf(sparkConf, 
true)).getOrCreate()
+      
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}", 
"true")
+    val ss = SparkSession.builder()
+      .config(updateSparkConf(sparkConf, ShuffleMode.SORT))
+      .getOrCreate()
     val value = Range(1, 10000).mkString(",")
     val tuples = ss.sparkContext.parallelize(1 to 10000, 2)
       .map { i => (i, value) }.groupByKey(16).collect()
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
index a9a968907..69da6a731 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
@@ -24,8 +24,9 @@ import org.apache.spark.sql.SparkSession
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.funsuite.AnyFunSuite
 
-import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.CelebornConf._
 import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.protocol.ShuffleMode
 import org.apache.celeborn.service.deploy.MiniClusterFeature
 
 trait SparkTestBase extends AnyFunSuite
@@ -45,7 +46,7 @@ trait SparkTestBase extends AnyFunSuite
     shutdownMiniCluster()
   }
 
-  def updateSparkConf(sparkConf: SparkConf, sort: Boolean): SparkConf = {
+  def updateSparkConf(sparkConf: SparkConf, mode: ShuffleMode): SparkConf = {
     sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
     sparkConf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
     sparkConf.set("spark.shuffle.useOldFetchProtocol", "true")
@@ -53,12 +54,8 @@ trait SparkTestBase extends AnyFunSuite
     sparkConf.set("spark.shuffle.service.enabled", "false")
     sparkConf.set("spark.sql.adaptive.skewJoin.enabled", "false")
     sparkConf.set("spark.sql.adaptive.localShuffleReader.enabled", "false")
-    sparkConf.set(
-      s"spark${CelebornConf.MASTER_ENDPOINTS.key}",
-      masterInfo._1.rpcEnv.address.toString)
-    if (sort) {
-      sparkConf.set(s"spark.${CelebornConf.SPARK_SHUFFLE_WRITER_MODE.key}", 
"sort")
-    }
+    sparkConf.set(s"spark.${MASTER_ENDPOINTS.key}", 
masterInfo._1.rpcEnv.address.toString)
+    sparkConf.set(s"spark.${SPARK_SHUFFLE_WRITER_MODE.key}", mode.toString)
     sparkConf
   }
 

Reply via email to