reswqa commented on code in PR #2716:
URL: https://github.com/apache/celeborn/pull/2716#discussion_r1743255104


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -786,8 +787,12 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
 
   private def handleGetReducerFileGroup(
       context: RpcCallContext,
-      shuffleId: Int): Unit = {
-    if (!registeredShuffle.contains(shuffleId)) {
+      shuffleId: Int,
+      isSegmentGranularityVisible: Boolean): Unit = {
+    // If isSegmentGranularityVisible is set to true, the downstream reduce 
task may start early than upstream map task, e.g. flink hybrid shuffle.
+    // Under these circumstances, there's a possibility that the shuffle might 
not yet be registered when the downstream reduce task send GetReduceFileGroup 
request,
+    // so we shouldn't send a SHUFFLE_NOT_REGISTERED response directly, should 
enqueue this request to pending list, and response to the downstream reduce 
task the ReduceFileGroup when the upstream map task register shuffle done
+    if (!registeredShuffle.contains(shuffleId) && 
!isSegmentGranularityVisible) {

Review Comment:
   When create `bufferStream` for hybrid shuffle. But all the read logic is 
filed in the later PR, so for now you only see `false`(for non-hybrid rss) in 
this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to