This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new bb5ba4429 [CELEBORN-653][TEST] Fix invalid configuration key in
SparkTestBase
bb5ba4429 is described below
commit bb5ba4429f44694ab10345e0df64f992a38ee8b6
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jun 9 10:31:25 2023 +0800
[CELEBORN-653][TEST] Fix invalid configuration key in SparkTestBase
### What changes were proposed in this pull request?
Dot is missing after `spark`
### Why are the changes needed?
Correct the configuration key.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes #1563 from pan3793/CELEBORN-653.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 588dbdfbe0fdef14e41d0e16f2247e8e6a4b6099)
Signed-off-by: Cheng Pan <[email protected]>
---
.../scala/org/apache/celeborn/common/CelebornConf.scala | 8 ++++----
.../apache/celeborn/tests/client/ShuffleClientSuite.scala | 2 +-
.../org/apache/celeborn/tests/spark/HugeDataTest.scala | 5 ++++-
.../apache/celeborn/tests/spark/PushDataTimeoutTest.scala | 7 +++++--
.../apache/celeborn/tests/spark/RetryCommitFilesTest.scala | 5 ++++-
.../org/apache/celeborn/tests/spark/RetryReviveTest.scala | 5 ++++-
.../apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala | 4 +++-
.../org/apache/celeborn/tests/spark/RssHashSuite.scala | 4 +++-
.../celeborn/tests/spark/RssNonPipelineSortSuite.scala | 6 ++++--
.../apache/celeborn/tests/spark/RssPipelineSortSuite.scala | 7 +++++--
.../org/apache/celeborn/tests/spark/SparkTestBase.scala | 13 +++++--------
11 files changed, 42 insertions(+), 24 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 86818f74d..2128a2218 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -710,7 +710,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
// Shuffle Client Push //
// //////////////////////////////////////////////////////
def clientPushReplicateEnabled: Boolean = get(CLIENT_PUSH_REPLICATE_ENABLED)
- def clientPushBufferInitialSize: Int =
get(client_PUSH_BUFFER_INITIAL_SIZE).toInt
+ def clientPushBufferInitialSize: Int =
get(CLIENT_PUSH_BUFFER_INITIAL_SIZE).toInt
def clientPushBufferMaxSize: Int = get(CLIENT_PUSH_BUFFER_MAX_SIZE).toInt
def clientPushQueueCapacity: Int = get(CLIENT_PUSH_QUEUE_CAPACITY)
def clientPushMaxReqsInFlight: Int = get(CLIENT_PUSH_MAX_REQS_IN_FLIGHT)
@@ -718,7 +718,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def clientPushSortMemoryThreshold: Long =
get(CLIENT_PUSH_SORT_MEMORY_THRESHOLD)
def clientPushSortPipelineEnabled: Boolean =
get(CLIENT_PUSH_SORT_PIPELINE_ENABLED)
def clientPushSortRandomizePartitionIdEnabled: Boolean =
- get(CLIENT_PUSH_SORT_RANDOMIZE_PARITION_ENABLED)
+ get(CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED)
def clientPushRetryThreads: Int = get(CLIENT_PUSH_RETRY_THREADS)
def clientPushStageEndTimeout: Long = get(CLIENT_PUSH_STAGE_END_TIMEOUT)
def clientPushUnsafeRowFastWrite: Boolean =
get(CLIENT_PUSH_UNSAFEROW_FASTWRITE_ENABLED)
@@ -2510,7 +2510,7 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
- val client_PUSH_BUFFER_INITIAL_SIZE: ConfigEntry[Long] =
+ val CLIENT_PUSH_BUFFER_INITIAL_SIZE: ConfigEntry[Long] =
buildConf("celeborn.client.push.buffer.initial.size")
.withAlternative("celeborn.push.buffer.initial.size")
.categories("client")
@@ -2647,7 +2647,7 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("50ms")
- val CLIENT_PUSH_SORT_RANDOMIZE_PARITION_ENABLED: ConfigEntry[Boolean] =
+ val CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.push.sort.randomizePartitionId.enabled")
.withAlternative("celeborn.push.sort.randomizePartitionId.enabled")
.categories("client")
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
index 61372cbe1..a38cece56 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
@@ -53,7 +53,7 @@ class ShuffleClientSuite extends WithShuffleClientSuite with
MiniClusterFeature
}
assertThrows[IOException] {
- () -> shuffleClient.registerMapPartitionTask(APP, 1, 1, 0, 0, 1)
+ shuffleClient.registerMapPartitionTask(APP, 1, 1, 0, 0, 1)
}
lifecycleManager.stop()
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HugeDataTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HugeDataTest.scala
index 2918a3288..fbf825509 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HugeDataTest.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HugeDataTest.scala
@@ -23,6 +23,7 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.client.ShuffleClient
+import org.apache.celeborn.common.protocol.ShuffleMode
class HugeDataTest extends AnyFunSuite
with SparkTestBase
@@ -38,7 +39,9 @@ class HugeDataTest extends AnyFunSuite
test("celeborn spark integration test - huge data") {
val sparkConf = new
SparkConf().setAppName("rss-demo").setMaster("local[2]")
- val ss = SparkSession.builder().config(updateSparkConf(sparkConf,
false)).getOrCreate()
+ val ss = SparkSession.builder()
+ .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+ .getOrCreate()
val value = Range(1, 10000).mkString(",")
val tuples = ss.sparkContext.parallelize(1 to 10000, 2)
.map { i => (i, value) }.groupByKey(16).collect()
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
index dd35001e0..0e49c0d36 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
@@ -24,6 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
class PushDataTimeoutTest extends AnyFunSuite
with SparkTestBase
@@ -63,7 +64,8 @@ class PushDataTimeoutTest extends AnyFunSuite
sparkSession.stop()
val rssSparkSession = SparkSession.builder()
- .config(updateSparkConf(sparkConf, false)).getOrCreate()
+ .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+ .getOrCreate()
val rssCombineResult = combine(rssSparkSession)
val rssGroupbyResult = groupBy(rssSparkSession)
val rssRepartitionResult = repartition(rssSparkSession)
@@ -86,7 +88,8 @@ class PushDataTimeoutTest extends AnyFunSuite
.set(s"spark.celeborn.data.push.timeoutCheck.interval", "2s")
.set(s"spark.${CelebornConf.CLIENT_BLACKLIST_SLAVE_ENABLED.key}",
"false")
val rssSparkSession = SparkSession.builder()
- .config(updateSparkConf(sparkConf, false)).getOrCreate()
+ .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+ .getOrCreate()
try {
combine(rssSparkSession)
} catch {
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryCommitFilesTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryCommitFilesTest.scala
index 433e4d4dd..f1fba73b9 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryCommitFilesTest.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryCommitFilesTest.scala
@@ -24,6 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
class RetryCommitFilesTest extends AnyFunSuite
with SparkTestBase
@@ -48,7 +49,9 @@ class RetryCommitFilesTest extends AnyFunSuite
val sparkConf = new SparkConf()
.set(s"spark.${CelebornConf.TEST_CLIENT_RETRY_COMMIT_FILE.key}", "true")
.setAppName("rss-demo").setMaster("local[2]")
- val ss = SparkSession.builder().config(updateSparkConf(sparkConf,
false)).getOrCreate()
+ val ss = SparkSession.builder()
+ .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+ .getOrCreate()
ss.sparkContext.parallelize(1 to 1000, 2)
.map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect()
ss.stop()
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala
index 10d98e893..791ac05df 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala
@@ -24,6 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
class RetryReviveTest extends AnyFunSuite
with SparkTestBase
@@ -46,7 +47,9 @@ class RetryReviveTest extends AnyFunSuite
val sparkConf = new SparkConf()
.set(s"spark.${CelebornConf.TEST_CLIENT_RETRY_REVIVE.key}", "true")
.setAppName("rss-demo").setMaster("local[2]")
- val ss = SparkSession.builder().config(updateSparkConf(sparkConf,
false)).getOrCreate()
+ val ss = SparkSession.builder()
+ .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+ .getOrCreate()
ss.sparkContext.parallelize(1 to 1000, 2)
.map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect()
ss.stop()
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala
index 02d507d4b..d07a96463 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.client.ShuffleClient
+import org.apache.celeborn.common.protocol.ShuffleMode
import org.apache.celeborn.service.deploy.worker.Worker
class RssHashCheckDiskSuite extends AnyFunSuite
@@ -62,7 +63,8 @@ class RssHashCheckDiskSuite extends AnyFunSuite
sparkSession.stop()
val rssSparkSession = SparkSession.builder()
- .config(updateSparkConf(sparkConf, false)).getOrCreate()
+ .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+ .getOrCreate()
val rssCombineResult = combine(rssSparkSession)
val rssGroupbyResult = groupBy(rssSparkSession)
val rssRepartitionResult = repartition(rssSparkSession)
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashSuite.scala
index d3fa885d3..c1ac94ca0 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.client.ShuffleClient
+import org.apache.celeborn.common.protocol.ShuffleMode
class RssHashSuite extends AnyFunSuite
with SparkTestBase
@@ -48,7 +49,8 @@ class RssHashSuite extends AnyFunSuite
sparkSession.stop()
val rssSparkSession = SparkSession.builder()
- .config(updateSparkConf(sparkConf, false)).getOrCreate()
+ .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+ .getOrCreate()
val rssCombineResult = combine(rssSparkSession)
val rssGroupbyResult = groupBy(rssSparkSession)
val rssRepartitionResult = repartition(rssSparkSession)
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssNonPipelineSortSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssNonPipelineSortSuite.scala
index 598b5c97f..47f768494 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssNonPipelineSortSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssNonPipelineSortSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
class RssNonPipelineSortSuite extends AnyFunSuite
with SparkTestBase
@@ -40,7 +41,7 @@ class RssNonPipelineSortSuite extends AnyFunSuite
test("celeborn spark integration test - non pipeline sort") {
val sparkConf = new
SparkConf().setAppName("rss-demo").setMaster("local[2]")
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}",
"false")
-
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARITION_ENABLED.key}",
"false")
+
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}",
"false")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val combineResult = combine(sparkSession)
val groupbyResult = groupBy(sparkSession)
@@ -51,7 +52,8 @@ class RssNonPipelineSortSuite extends AnyFunSuite
sparkSession.stop()
val rssSparkSession = SparkSession.builder()
- .config(updateSparkConf(sparkConf, true)).getOrCreate()
+ .config(updateSparkConf(sparkConf, ShuffleMode.SORT))
+ .getOrCreate()
val rssCombineResult = combine(rssSparkSession)
val rssGroupbyResult = groupBy(rssSparkSession)
val rssRepartitionResult = repartition(rssSparkSession)
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssPipelineSortSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssPipelineSortSuite.scala
index 520053b6d..4cc280641 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssPipelineSortSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssPipelineSortSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
class RssPipelineSortSuite extends AnyFunSuite
with SparkTestBase
@@ -40,8 +41,10 @@ class RssPipelineSortSuite extends AnyFunSuite
test("celeborn spark integration test - pipeline sort") {
val sparkConf = new
SparkConf().setAppName("rss-demo").setMaster("local[2]")
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}",
"true")
-
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARITION_ENABLED.key}",
"true")
- val ss = SparkSession.builder().config(updateSparkConf(sparkConf,
true)).getOrCreate()
+
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}",
"true")
+ val ss = SparkSession.builder()
+ .config(updateSparkConf(sparkConf, ShuffleMode.SORT))
+ .getOrCreate()
val value = Range(1, 10000).mkString(",")
val tuples = ss.sparkContext.parallelize(1 to 10000, 2)
.map { i => (i, value) }.groupByKey(16).collect()
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
index a9a968907..69da6a731 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
@@ -24,8 +24,9 @@ import org.apache.spark.sql.SparkSession
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
-import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.CelebornConf._
import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.protocol.ShuffleMode
import org.apache.celeborn.service.deploy.MiniClusterFeature
trait SparkTestBase extends AnyFunSuite
@@ -45,7 +46,7 @@ trait SparkTestBase extends AnyFunSuite
shutdownMiniCluster()
}
- def updateSparkConf(sparkConf: SparkConf, sort: Boolean): SparkConf = {
+ def updateSparkConf(sparkConf: SparkConf, mode: ShuffleMode): SparkConf = {
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
sparkConf.set("spark.shuffle.useOldFetchProtocol", "true")
@@ -53,12 +54,8 @@ trait SparkTestBase extends AnyFunSuite
sparkConf.set("spark.shuffle.service.enabled", "false")
sparkConf.set("spark.sql.adaptive.skewJoin.enabled", "false")
sparkConf.set("spark.sql.adaptive.localShuffleReader.enabled", "false")
- sparkConf.set(
- s"spark${CelebornConf.MASTER_ENDPOINTS.key}",
- masterInfo._1.rpcEnv.address.toString)
- if (sort) {
- sparkConf.set(s"spark.${CelebornConf.SPARK_SHUFFLE_WRITER_MODE.key}",
"sort")
- }
+ sparkConf.set(s"spark.${MASTER_ENDPOINTS.key}",
masterInfo._1.rpcEnv.address.toString)
+ sparkConf.set(s"spark.${SPARK_SHUFFLE_WRITER_MODE.key}", mode.toString)
sparkConf
}