mridulm commented on code in PR #2997:
URL: https://github.com/apache/celeborn/pull/2997#discussion_r1894387607


##########
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala:
##########
@@ -79,4 +81,28 @@ class RetryReviveTest extends AnyFunSuite
     assert(result.size == 1000)
     ss.stop()
   }
+
+  test("celeborn spark integration test - revive test replicate enabled when 
workers are randomly killed") {
+    setupMiniClusterWithRandomPorts()
+    ShuffleClient.reset()
+    val sparkConf = new SparkConf()
+      .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "true")
+      .set(s"spark.${CelebornConf.CLIENT_STAGE_RERUN_ENABLED.key}", "false")
+      .setAppName("celeborn-demo").setMaster("local[2]")
+    val ss = SparkSession.builder()
+      .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+      .getOrCreate()
+
+    val startTime = System.currentTimeMillis()
+    val result = ss.sparkContext.parallelize(1 to 800, 100)
+      .flatMap(_ => (1 to 15000).iterator.map(num => 
num)).repartition(100).count()
+    val taskTime = System.currentTimeMillis() - startTime
+    val random = new Random()
+    val workerKillTime = random.nextInt(taskTime.toInt)
+    workerKiller(workerKillTime)
+    val result1 = ss.sparkContext.parallelize(1 to 800, 100)
+      .flatMap(_ => (1 to 15000).iterator.map(num => 
num)).repartition(100).count()
+    assert(result1 == result)

Review Comment:
   There is an inherent risk here that we actually dont end up triggering the 
behavior we need to test.
   
   Instead, do we want to introduce some delay (for specific partitions) to 
ensure failures happen while the application is running?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to