This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/branch-1.5 by this push:
     new 9f42d49e [YUNIKORN-2543] Fix locking in RMProxy (#837)
9f42d49e is described below

commit 9f42d49e2b8182c0799dbe00e84f4455936bdb78
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Apr 10 20:44:49 2024 +0200

    [YUNIKORN-2543] Fix locking in RMProxy (#837)
    
    Closes: #837
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/entrypoint/entrypoint.go |  5 ++---
 pkg/rmproxy/rmproxy.go       | 52 +++++++++++++++-----------------------------
 2 files changed, 19 insertions(+), 38 deletions(-)

diff --git a/pkg/entrypoint/entrypoint.go b/pkg/entrypoint/entrypoint.go
index 2df5a64c..fe8e3cd9 100644
--- a/pkg/entrypoint/entrypoint.go
+++ b/pkg/entrypoint/entrypoint.go
@@ -80,8 +80,7 @@ func startAllServicesWithParameters(opts startupOptions) 
*ServiceContext {
        events.GetEventSystem().StartService()
 
        sched := scheduler.NewScheduler()
-       proxy := rmproxy.NewRMProxy()
-
+       proxy := rmproxy.NewRMProxy(sched)
        eventHandler := handler.EventHandlers{
                SchedulerEventHandler: sched,
                RMProxyEventHandler:   proxy,
@@ -90,7 +89,7 @@ func startAllServicesWithParameters(opts startupOptions) 
*ServiceContext {
        // start services
        log.Log(log.Entrypoint).Info("ServiceContext start scheduling services")
        sched.StartService(eventHandler, opts.manualScheduleFlag)
-       proxy.StartService(eventHandler)
+       proxy.StartService()
 
        context := &ServiceContext{
                RMProxy:   proxy,
diff --git a/pkg/rmproxy/rmproxy.go b/pkg/rmproxy/rmproxy.go
index 3215072a..97ccbcdd 100644
--- a/pkg/rmproxy/rmproxy.go
+++ b/pkg/rmproxy/rmproxy.go
@@ -39,8 +39,8 @@ import (
 
 // Gateway to talk to ResourceManager (behind grpc/API of scheduler-interface)
 type RMProxy struct {
-       EventHandlers handler.EventHandlers
-       stop          chan struct{}
+       schedulerEventHandler handler.EventHandler // read-only, no lock needed 
to access it
+       stop                  chan struct{}
 
        // Internal fields
        pendingRMEvents chan interface{}
@@ -71,18 +71,17 @@ func (rmp *RMProxy) HandleEvent(ev interface{}) {
        enqueueAndCheckFull(rmp.pendingRMEvents, ev)
 }
 
-func NewRMProxy() *RMProxy {
+func NewRMProxy(schedulerEventHandler handler.EventHandler) *RMProxy {
        rm := &RMProxy{
-               rmIDToCallback:  make(map[string]api.ResourceManagerCallback),
-               pendingRMEvents: make(chan interface{}, 1024*1024),
-               stop:            make(chan struct{}),
+               rmIDToCallback:        
make(map[string]api.ResourceManagerCallback),
+               pendingRMEvents:       make(chan interface{}, 1024*1024),
+               stop:                  make(chan struct{}),
+               schedulerEventHandler: schedulerEventHandler,
        }
        return rm
 }
 
-func (rmp *RMProxy) StartService(handlers handler.EventHandlers) {
-       rmp.EventHandlers = handlers
-
+func (rmp *RMProxy) StartService() {
        go rmp.handleRMEvents()
 }
 
@@ -95,8 +94,6 @@ func (rmp *RMProxy) handleUpdateResponseError(rmID string, 
err error) {
 func (rmp *RMProxy) processAllocationUpdateEvent(event 
*rmevent.RMNewAllocationsEvent) {
        allocationsCount := len(event.Allocations)
        if allocationsCount != 0 {
-               rmp.RLock()
-               defer rmp.RUnlock()
                response := &si.AllocationResponse{
                        New: event.Allocations,
                }
@@ -111,8 +108,6 @@ func (rmp *RMProxy) processAllocationUpdateEvent(event 
*rmevent.RMNewAllocations
 }
 
 func (rmp *RMProxy) processApplicationUpdateEvent(event 
*rmevent.RMApplicationUpdateEvent) {
-       rmp.RLock()
-       defer rmp.RUnlock()
        if len(event.RejectedApplications) == 0 && 
len(event.AcceptedApplications) == 0 && len(event.UpdatedApplications) == 0 {
                return
        }
@@ -121,7 +116,7 @@ func (rmp *RMProxy) processApplicationUpdateEvent(event 
*rmevent.RMApplicationUp
                Accepted: event.AcceptedApplications,
                Updated:  event.UpdatedApplications,
        }
-       if callback := rmp.rmIDToCallback[event.RmID]; callback != nil {
+       if callback := rmp.GetResourceManagerCallback(event.RmID); callback != 
nil {
                if err := callback.UpdateApplication(response); err != nil {
                        rmp.handleUpdateResponseError(event.RmID, err)
                }
@@ -142,8 +137,6 @@ func (rmp *RMProxy) processApplicationUpdateEvent(event 
*rmevent.RMApplicationUp
 func (rmp *RMProxy) processRMReleaseAllocationEvent(event 
*rmevent.RMReleaseAllocationEvent) {
        allocationsCount := len(event.ReleasedAllocations)
        if allocationsCount != 0 {
-               rmp.RLock()
-               defer rmp.RUnlock()
                response := &si.AllocationResponse{
                        Released: event.ReleasedAllocations,
                }
@@ -159,7 +152,7 @@ func (rmp *RMProxy) processRMReleaseAllocationEvent(event 
*rmevent.RMReleaseAllo
 }
 
 func (rmp *RMProxy) triggerUpdateAllocation(rmID string, response 
*si.AllocationResponse) {
-       if callback := rmp.rmIDToCallback[rmID]; callback != nil {
+       if callback := rmp.GetResourceManagerCallback(rmID); callback != nil {
                if err := callback.UpdateAllocation(response); err != nil {
                        rmp.handleUpdateResponseError(rmID, err)
                }
@@ -170,8 +163,6 @@ func (rmp *RMProxy) triggerUpdateAllocation(rmID string, 
response *si.Allocation
 }
 
 func (rmp *RMProxy) processRMReleaseAllocationAskEvent(event 
*rmevent.RMReleaseAllocationAskEvent) {
-       rmp.RLock()
-       defer rmp.RUnlock()
        if len(event.ReleasedAllocationAsks) == 0 {
                return
        }
@@ -182,8 +173,6 @@ func (rmp *RMProxy) 
processRMReleaseAllocationAskEvent(event *rmevent.RMReleaseA
 }
 
 func (rmp *RMProxy) processRMRejectedAllocationAskEvent(event 
*rmevent.RMRejectedAllocationAskEvent) {
-       rmp.RLock()
-       defer rmp.RUnlock()
        if len(event.RejectedAllocationAsks) == 0 {
                return
        }
@@ -195,8 +184,6 @@ func (rmp *RMProxy) 
processRMRejectedAllocationAskEvent(event *rmevent.RMRejecte
 }
 
 func (rmp *RMProxy) processRMRejectedAllocationEvent(event 
*rmevent.RMRejectedAllocationEvent) {
-       rmp.RLock()
-       defer rmp.RUnlock()
        if len(event.RejectedAllocations) == 0 {
                return
        }
@@ -208,8 +195,6 @@ func (rmp *RMProxy) processRMRejectedAllocationEvent(event 
*rmevent.RMRejectedAl
 }
 
 func (rmp *RMProxy) processRMNodeUpdateEvent(event *rmevent.RMNodeUpdateEvent) 
{
-       rmp.RLock()
-       defer rmp.RUnlock()
        if len(event.RejectedNodes) == 0 && len(event.AcceptedNodes) == 0 {
                return
        }
@@ -218,7 +203,7 @@ func (rmp *RMProxy) processRMNodeUpdateEvent(event 
*rmevent.RMNodeUpdateEvent) {
                Accepted: event.AcceptedNodes,
        }
 
-       if callback := rmp.rmIDToCallback[event.RmID]; callback != nil {
+       if callback := rmp.GetResourceManagerCallback(event.RmID); callback != 
nil {
                if err := callback.UpdateNode(response); err != nil {
                        rmp.handleUpdateResponseError(event.RmID, err)
                }
@@ -264,7 +249,7 @@ func (rmp *RMProxy) RegisterResourceManager(request 
*si.RegisterResourceManagerR
        // If this is a re-register we need to clean up first
        if rmp.rmIDToCallback[request.RmID] != nil {
                go func() {
-                       rmp.EventHandlers.SchedulerEventHandler.HandleEvent(
+                       rmp.schedulerEventHandler.HandleEvent(
                                &rmevent.RMPartitionsRemoveEvent{
                                        RmID:    request.RmID,
                                        Channel: c,
@@ -282,7 +267,7 @@ func (rmp *RMProxy) RegisterResourceManager(request 
*si.RegisterResourceManagerR
 
        // Add new RM.
        go func() {
-               rmp.EventHandlers.SchedulerEventHandler.HandleEvent(
+               rmp.schedulerEventHandler.HandleEvent(
                        &rmevent.RMRegistrationEvent{
                                Registration: request,
                                Channel:      c,
@@ -333,7 +318,7 @@ func (rmp *RMProxy) UpdateAllocation(request 
*si.AllocationRequest) error {
                        rel.PartitionName = 
common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
                }
        }
-       
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request:
 request})
+       
rmp.schedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request: 
request})
        return nil
 }
 
@@ -352,7 +337,7 @@ func (rmp *RMProxy) UpdateApplication(request 
*si.ApplicationRequest) error {
                app.PartitionName = 
common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
        }
 
-       
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request:
 request})
+       
rmp.schedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request:
 request})
        return nil
 }
 
@@ -369,18 +354,15 @@ func (rmp *RMProxy) UpdateNode(request *si.NodeRequest) 
error {
                node.Attributes[siCommon.NodePartition] = 
common.GetNormalizedPartitionName(partition, request.RmID)
        }
 
-       
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request:
 request})
+       
rmp.schedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request: 
request})
        return nil
 }
 
 // Triggers scheduler to reload configuration and apply the changes on-the-fly 
to the scheduler itself.
 func (rmp *RMProxy) UpdateConfiguration(request 
*si.UpdateConfigurationRequest) error {
-       rmp.RLock()
-       defer rmp.RUnlock()
-
        c := make(chan *rmevent.Result)
        go func() {
-               
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMConfigUpdateEvent{
+               
rmp.schedulerEventHandler.HandleEvent(&rmevent.RMConfigUpdateEvent{
                        RmID:        request.RmID,
                        PolicyGroup: request.PolicyGroup,
                        Config:      request.Config,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to