zwangsheng opened a new pull request, #2192:
URL: https://github.com/apache/celeborn/pull/2192

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] 
Your PR title ...'.
     - Be sure to keep the PR description updated to reflect all changes.
     - Please write your PR title to summarize what this PR proposes.
     - If possible, provide a concise example to reproduce the issue for a 
faster review.
   -->
   
   ### What changes were proposed in this pull request?
   
   For now, Spark submit job with Celeborn can’t schedule Reduce Task to 
correct node, due to following parts:
   Celeborn Shuffle Write submit MapStatus with Executor Info, but all shuffle 
data is store in Celeborn Worker
   Spark Framework only call MapOutputTrackerMaster to get ShuffledRDD or 
ShuffledRowRDD prefer location.
   
   Based on the above issues, we modified Celeborn and small part of Spark:
   Make Celeborn Master aware Celeborn Worker Topology Location(for now 
Topology Location means which node to be deployed)
   Make Celeborn LifecycleManager aware spark Executor Topology Location
   Spark Framework should open the interface to allow the custom shuffle 
service to provide prefer location info
   
   See mode detail in design doc: [CELEBORN-1200 Spark Reduce Task aware 
locality with Celeborn 
Cluster](https://docs.google.com/document/d/1hO_W5eaqrOPfuCJsy_5jg_PtTw1-OW0aO1vcbWEIt5U/edit?usp=sharing)
   
   ```mermaid
   sequenceDiagram 
   Celeborn Worker->>Celeborn Master : Register self with TopologyLocation
   
   Celeborn Master->>LifecycleManager: Reply Request Slots with TopologyLocation
   
   LifecycleManager->>LifecycleManager: Handle Commit File to Build 
PartitionLocation with TopologyLocation
   
   CelebornShuffleClient->>LifecycleManager:  Register Shuffle Client with 
TopologyLocation and Client Address when set up LifecycleManagerRef
   
   SparkDAGScheduler->> MapOutputTrackerMaster: Call to get Seq of host to 
allocate reduce task
   
   MapOutputTrackerMaster->>ShuffleHandler(CelebornShuffleHandler): Get Seq of 
host
   
   CelebornShuffleHandler->>LifecycleManager: Get Seq of host
   
   LifecycleManager->>LifecycleManager: Get Partition File TopologyLocation and 
Find Corresponding Shuffle Client Address
   ```
   
   ### Why are the changes needed?
   
   Based on production practices, when Celeborn Worker and Spark Executor 
operate on the same batch of Kubernetes nodes and both mount disks using 
`hostPath`, we have observed that under limited network IO, the Shuffle Fetch 
Wait Time is significantly extended.
   
   We expect the Spark Reduce Task to be scheduled to the executor on the 
corresponding node holding the partition data by simply moving the computing 
rather than moving the data idea.
   
   So, we propose this PR to make the Celeborn cluster running on Kubernetes 
aware of the Topology Info (node information for the current implementation) of 
the Celeborn Worker, Then, simply depend the Spark RDD Prefer Location feature 
to schedule Reduce tasks to nodes with data.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, the specific task assignment logic change is not perceived by the user, 
but the Task Locality Level change is clearly visible on the spark UI.
   
   ### How was this patch tested?
   - ✅ Pass CI & Unit Test
   - ✅ Functional Test
   - 🚧 Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to