This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new bce75ba3 [YUNIKORN-2329] Ensure RMProxy events are processed in-order 
(#776)
bce75ba3 is described below

commit bce75ba326840030d34bec5c953db40642cbca8d
Author: Yu-Lin Chen <[email protected]>
AuthorDate: Fri Jan 19 12:50:10 2024 -0600

    [YUNIKORN-2329] Ensure RMProxy events are processed in-order (#776)
    
    Remove the unnecessary goroutine invocations in the UpdateApplication(),
    UpdateAllcoation() and UpdateNode() functions, ensuring that events are
    passed to the scheduler's HandleEvent() function in the order received.
    
    Closes: #776
    
    Signed-off-by: Craig Condit <[email protected]>
---
 pkg/rmproxy/rmproxy.go | 91 ++++++++++++++++++++------------------------------
 1 file changed, 37 insertions(+), 54 deletions(-)

diff --git a/pkg/rmproxy/rmproxy.go b/pkg/rmproxy/rmproxy.go
index c607eea3..ecf06bea 100644
--- a/pkg/rmproxy/rmproxy.go
+++ b/pkg/rmproxy/rmproxy.go
@@ -314,37 +314,26 @@ func (rmp *RMProxy) UpdateAllocation(request 
*si.AllocationRequest) error {
        if rmp.GetResourceManagerCallback(request.RmID) == nil {
                return fmt.Errorf("received AllocationRequest, but RmID=\"%s\" 
not registered", request.RmID)
        }
-       go func() {
-               // Update allocations
-               if len(request.Allocations) > 0 {
-                       for _, alloc := range request.Allocations {
-                               alloc.PartitionName = 
common.GetNormalizedPartitionName(alloc.PartitionName, request.RmID)
-                       }
-               }
-
-               // Update asks
-               if len(request.Asks) > 0 {
-                       for _, ask := range request.Asks {
-                               ask.PartitionName = 
common.GetNormalizedPartitionName(ask.PartitionName, request.RmID)
-                       }
-               }
+       // Update allocations
+       for _, alloc := range request.Allocations {
+               alloc.PartitionName = 
common.GetNormalizedPartitionName(alloc.PartitionName, request.RmID)
+       }
 
-               // Update releases
-               if request.Releases != nil {
-                       if len(request.Releases.AllocationsToRelease) > 0 {
-                               for _, rel := range 
request.Releases.AllocationsToRelease {
-                                       rel.PartitionName = 
common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
-                               }
-                       }
+       // Update asks
+       for _, ask := range request.Asks {
+               ask.PartitionName = 
common.GetNormalizedPartitionName(ask.PartitionName, request.RmID)
+       }
 
-                       if len(request.Releases.AllocationAsksToRelease) > 0 {
-                               for _, rel := range 
request.Releases.AllocationAsksToRelease {
-                                       rel.PartitionName = 
common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
-                               }
-                       }
+       // Update releases
+       if request.Releases != nil {
+               for _, rel := range request.Releases.AllocationsToRelease {
+                       rel.PartitionName = 
common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
                }
-               
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request:
 request})
-       }()
+               for _, rel := range request.Releases.AllocationAsksToRelease {
+                       rel.PartitionName = 
common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
+               }
+       }
+       
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request:
 request})
        return nil
 }
 
@@ -353,21 +342,17 @@ func (rmp *RMProxy) UpdateApplication(request 
*si.ApplicationRequest) error {
                return fmt.Errorf("received ApplicationRequest, but RmID=\"%s\" 
not registered", request.RmID)
        }
 
-       go func() {
-               // Update New apps
-               if len(request.New) > 0 {
-                       for _, app := range request.New {
-                               app.PartitionName = 
common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
-                       }
-               }
-               // Update Remove apps
-               if len(request.Remove) > 0 {
-                       for _, app := range request.Remove {
-                               app.PartitionName = 
common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
-                       }
-               }
-               
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request:
 request})
-       }()
+       // Update New apps
+       for _, app := range request.New {
+               app.PartitionName = 
common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
+       }
+
+       // Update Remove apps
+       for _, app := range request.Remove {
+               app.PartitionName = 
common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
+       }
+
+       
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request:
 request})
        return nil
 }
 
@@ -375,18 +360,16 @@ func (rmp *RMProxy) UpdateNode(request *si.NodeRequest) 
error {
        if rmp.GetResourceManagerCallback(request.RmID) == nil {
                return fmt.Errorf("received NodeRequest, but RmID=\"%s\" not 
registered", request.RmID)
        }
-       go func() {
-               if len(request.Nodes) > 0 {
-                       for _, node := range request.Nodes {
-                               if len(node.GetAttributes()) == 0 {
-                                       node.Attributes = map[string]string{}
-                               }
-                               partition := 
node.Attributes[siCommon.NodePartition]
-                               node.Attributes[siCommon.NodePartition] = 
common.GetNormalizedPartitionName(partition, request.RmID)
-                       }
+
+       for _, node := range request.Nodes {
+               if len(node.GetAttributes()) == 0 {
+                       node.Attributes = map[string]string{}
                }
-               
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request:
 request})
-       }()
+               partition := node.Attributes[siCommon.NodePartition]
+               node.Attributes[siCommon.NodePartition] = 
common.GetNormalizedPartitionName(partition, request.RmID)
+       }
+
+       
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request:
 request})
        return nil
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to