zaynt4606 commented on code in PR #3601:
URL: https://github.com/apache/celeborn/pull/3601#discussion_r2851544254
##########
tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala:
##########
@@ -126,6 +132,68 @@ class LifecycleManagerSuite extends WithShuffleClientSuite
with MiniClusterFeatu
}
}
+ test("CELEBORN-2264: Support cancel shuffle when write bytes exceeds
threshold") {
+ val conf = celebornConf.clone
+ conf.set(CelebornConf.SHUFFLE_WRITE_LIMIT_ENABLED.key, "true")
+ .set(CelebornConf.SHUFFLE_WRITE_LIMIT_THRESHOLD.key, "2000")
+ val lifecycleManager: LifecycleManager = new LifecycleManager(APP, conf)
+ val ctx = Mockito.mock(classOf[RpcCallContext])
+
+ // Custom BiConsumer callback to track if cancelShuffle is invoked
+ var isCancelShuffleInvoked = false
+ val cancelShuffleCallback = new BiConsumer[Integer, String] {
+ override def accept(shuffleId: Integer, reason: String): Unit = {
+ isCancelShuffleInvoked = true
+ }
+ }
+ lifecycleManager.registerCancelShuffleCallback(cancelShuffleCallback)
+
+ // Scenario 1: Same mapper with multiple attempts (total bytes exceed
threshold but no cancel)
+ val shuffleId = 0
+ val mapId1 = 0
+ lifecycleManager.receiveAndReply(ctx)(MapperEnd(
+ shuffleId = shuffleId,
+ mapId = mapId1,
+ attemptId = 0,
+ 2,
+ 1,
+ Collections.emptyMap(),
+ 1,
+ Array.emptyIntArray,
+ Array.emptyLongArray,
+ SerdeVersion.V1,
+ bytesWritten = 1500))
+ lifecycleManager.receiveAndReply(ctx)(MapperEnd(
+ shuffleId = shuffleId,
+ mapId = mapId1,
+ attemptId = 1,
+ 2,
+ 1,
+ Collections.emptyMap(),
+ 1,
+ Array.emptyIntArray,
+ Array.emptyLongArray,
+ SerdeVersion.V1,
+ bytesWritten = 1500))
+ assert(!isCancelShuffleInvoked)
+
+ // Scenario 2: Total bytes of mapId1 + mapId2 exceed threshold (trigger
cancel)
+ val mapId2 = 1
+ lifecycleManager.receiveAndReply(ctx)(MapperEnd(
+ shuffleId = shuffleId,
+ mapId = mapId2,
+ attemptId = 0,
+ 2,
+ 1,
+ Collections.emptyMap(),
+ 1,
+ Array.emptyIntArray,
+ Array.emptyLongArray,
+ SerdeVersion.V1,
+ bytesWritten = 1000))
+ assert(isCancelShuffleInvoked)
+ }
+
Review Comment:
We can add some test cases with SHUFFLE_WRITE_LIMIT_ENABLED false.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]