This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/branch-1.5 by this push:
new 9f42d49e [YUNIKORN-2543] Fix locking in RMProxy (#837)
9f42d49e is described below
commit 9f42d49e2b8182c0799dbe00e84f4455936bdb78
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Apr 10 20:44:49 2024 +0200
[YUNIKORN-2543] Fix locking in RMProxy (#837)
Closes: #837
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/entrypoint/entrypoint.go | 5 ++---
pkg/rmproxy/rmproxy.go | 52 +++++++++++++++-----------------------------
2 files changed, 19 insertions(+), 38 deletions(-)
diff --git a/pkg/entrypoint/entrypoint.go b/pkg/entrypoint/entrypoint.go
index 2df5a64c..fe8e3cd9 100644
--- a/pkg/entrypoint/entrypoint.go
+++ b/pkg/entrypoint/entrypoint.go
@@ -80,8 +80,7 @@ func startAllServicesWithParameters(opts startupOptions)
*ServiceContext {
events.GetEventSystem().StartService()
sched := scheduler.NewScheduler()
- proxy := rmproxy.NewRMProxy()
-
+ proxy := rmproxy.NewRMProxy(sched)
eventHandler := handler.EventHandlers{
SchedulerEventHandler: sched,
RMProxyEventHandler: proxy,
@@ -90,7 +89,7 @@ func startAllServicesWithParameters(opts startupOptions)
*ServiceContext {
// start services
log.Log(log.Entrypoint).Info("ServiceContext start scheduling services")
sched.StartService(eventHandler, opts.manualScheduleFlag)
- proxy.StartService(eventHandler)
+ proxy.StartService()
context := &ServiceContext{
RMProxy: proxy,
diff --git a/pkg/rmproxy/rmproxy.go b/pkg/rmproxy/rmproxy.go
index 3215072a..97ccbcdd 100644
--- a/pkg/rmproxy/rmproxy.go
+++ b/pkg/rmproxy/rmproxy.go
@@ -39,8 +39,8 @@ import (
// Gateway to talk to ResourceManager (behind grpc/API of scheduler-interface)
type RMProxy struct {
- EventHandlers handler.EventHandlers
- stop chan struct{}
+ schedulerEventHandler handler.EventHandler // read-only, no lock needed
to access it
+ stop chan struct{}
// Internal fields
pendingRMEvents chan interface{}
@@ -71,18 +71,17 @@ func (rmp *RMProxy) HandleEvent(ev interface{}) {
enqueueAndCheckFull(rmp.pendingRMEvents, ev)
}
-func NewRMProxy() *RMProxy {
+func NewRMProxy(schedulerEventHandler handler.EventHandler) *RMProxy {
rm := &RMProxy{
- rmIDToCallback: make(map[string]api.ResourceManagerCallback),
- pendingRMEvents: make(chan interface{}, 1024*1024),
- stop: make(chan struct{}),
+ rmIDToCallback:
make(map[string]api.ResourceManagerCallback),
+ pendingRMEvents: make(chan interface{}, 1024*1024),
+ stop: make(chan struct{}),
+ schedulerEventHandler: schedulerEventHandler,
}
return rm
}
-func (rmp *RMProxy) StartService(handlers handler.EventHandlers) {
- rmp.EventHandlers = handlers
-
+func (rmp *RMProxy) StartService() {
go rmp.handleRMEvents()
}
@@ -95,8 +94,6 @@ func (rmp *RMProxy) handleUpdateResponseError(rmID string,
err error) {
func (rmp *RMProxy) processAllocationUpdateEvent(event
*rmevent.RMNewAllocationsEvent) {
allocationsCount := len(event.Allocations)
if allocationsCount != 0 {
- rmp.RLock()
- defer rmp.RUnlock()
response := &si.AllocationResponse{
New: event.Allocations,
}
@@ -111,8 +108,6 @@ func (rmp *RMProxy) processAllocationUpdateEvent(event
*rmevent.RMNewAllocations
}
func (rmp *RMProxy) processApplicationUpdateEvent(event
*rmevent.RMApplicationUpdateEvent) {
- rmp.RLock()
- defer rmp.RUnlock()
if len(event.RejectedApplications) == 0 &&
len(event.AcceptedApplications) == 0 && len(event.UpdatedApplications) == 0 {
return
}
@@ -121,7 +116,7 @@ func (rmp *RMProxy) processApplicationUpdateEvent(event
*rmevent.RMApplicationUp
Accepted: event.AcceptedApplications,
Updated: event.UpdatedApplications,
}
- if callback := rmp.rmIDToCallback[event.RmID]; callback != nil {
+ if callback := rmp.GetResourceManagerCallback(event.RmID); callback !=
nil {
if err := callback.UpdateApplication(response); err != nil {
rmp.handleUpdateResponseError(event.RmID, err)
}
@@ -142,8 +137,6 @@ func (rmp *RMProxy) processApplicationUpdateEvent(event
*rmevent.RMApplicationUp
func (rmp *RMProxy) processRMReleaseAllocationEvent(event
*rmevent.RMReleaseAllocationEvent) {
allocationsCount := len(event.ReleasedAllocations)
if allocationsCount != 0 {
- rmp.RLock()
- defer rmp.RUnlock()
response := &si.AllocationResponse{
Released: event.ReleasedAllocations,
}
@@ -159,7 +152,7 @@ func (rmp *RMProxy) processRMReleaseAllocationEvent(event
*rmevent.RMReleaseAllo
}
func (rmp *RMProxy) triggerUpdateAllocation(rmID string, response
*si.AllocationResponse) {
- if callback := rmp.rmIDToCallback[rmID]; callback != nil {
+ if callback := rmp.GetResourceManagerCallback(rmID); callback != nil {
if err := callback.UpdateAllocation(response); err != nil {
rmp.handleUpdateResponseError(rmID, err)
}
@@ -170,8 +163,6 @@ func (rmp *RMProxy) triggerUpdateAllocation(rmID string,
response *si.Allocation
}
func (rmp *RMProxy) processRMReleaseAllocationAskEvent(event
*rmevent.RMReleaseAllocationAskEvent) {
- rmp.RLock()
- defer rmp.RUnlock()
if len(event.ReleasedAllocationAsks) == 0 {
return
}
@@ -182,8 +173,6 @@ func (rmp *RMProxy)
processRMReleaseAllocationAskEvent(event *rmevent.RMReleaseA
}
func (rmp *RMProxy) processRMRejectedAllocationAskEvent(event
*rmevent.RMRejectedAllocationAskEvent) {
- rmp.RLock()
- defer rmp.RUnlock()
if len(event.RejectedAllocationAsks) == 0 {
return
}
@@ -195,8 +184,6 @@ func (rmp *RMProxy)
processRMRejectedAllocationAskEvent(event *rmevent.RMRejecte
}
func (rmp *RMProxy) processRMRejectedAllocationEvent(event
*rmevent.RMRejectedAllocationEvent) {
- rmp.RLock()
- defer rmp.RUnlock()
if len(event.RejectedAllocations) == 0 {
return
}
@@ -208,8 +195,6 @@ func (rmp *RMProxy) processRMRejectedAllocationEvent(event
*rmevent.RMRejectedAl
}
func (rmp *RMProxy) processRMNodeUpdateEvent(event *rmevent.RMNodeUpdateEvent)
{
- rmp.RLock()
- defer rmp.RUnlock()
if len(event.RejectedNodes) == 0 && len(event.AcceptedNodes) == 0 {
return
}
@@ -218,7 +203,7 @@ func (rmp *RMProxy) processRMNodeUpdateEvent(event
*rmevent.RMNodeUpdateEvent) {
Accepted: event.AcceptedNodes,
}
- if callback := rmp.rmIDToCallback[event.RmID]; callback != nil {
+ if callback := rmp.GetResourceManagerCallback(event.RmID); callback !=
nil {
if err := callback.UpdateNode(response); err != nil {
rmp.handleUpdateResponseError(event.RmID, err)
}
@@ -264,7 +249,7 @@ func (rmp *RMProxy) RegisterResourceManager(request
*si.RegisterResourceManagerR
// If this is a re-register we need to clean up first
if rmp.rmIDToCallback[request.RmID] != nil {
go func() {
- rmp.EventHandlers.SchedulerEventHandler.HandleEvent(
+ rmp.schedulerEventHandler.HandleEvent(
&rmevent.RMPartitionsRemoveEvent{
RmID: request.RmID,
Channel: c,
@@ -282,7 +267,7 @@ func (rmp *RMProxy) RegisterResourceManager(request
*si.RegisterResourceManagerR
// Add new RM.
go func() {
- rmp.EventHandlers.SchedulerEventHandler.HandleEvent(
+ rmp.schedulerEventHandler.HandleEvent(
&rmevent.RMRegistrationEvent{
Registration: request,
Channel: c,
@@ -333,7 +318,7 @@ func (rmp *RMProxy) UpdateAllocation(request
*si.AllocationRequest) error {
rel.PartitionName =
common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
}
}
-
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request:
request})
+
rmp.schedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request:
request})
return nil
}
@@ -352,7 +337,7 @@ func (rmp *RMProxy) UpdateApplication(request
*si.ApplicationRequest) error {
app.PartitionName =
common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
}
-
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request:
request})
+
rmp.schedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request:
request})
return nil
}
@@ -369,18 +354,15 @@ func (rmp *RMProxy) UpdateNode(request *si.NodeRequest)
error {
node.Attributes[siCommon.NodePartition] =
common.GetNormalizedPartitionName(partition, request.RmID)
}
-
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request:
request})
+
rmp.schedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request:
request})
return nil
}
// Triggers scheduler to reload configuration and apply the changes on-the-fly
to the scheduler itself.
func (rmp *RMProxy) UpdateConfiguration(request
*si.UpdateConfigurationRequest) error {
- rmp.RLock()
- defer rmp.RUnlock()
-
c := make(chan *rmevent.Result)
go func() {
-
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMConfigUpdateEvent{
+
rmp.schedulerEventHandler.HandleEvent(&rmevent.RMConfigUpdateEvent{
RmID: request.RmID,
PolicyGroup: request.PolicyGroup,
Config: request.Config,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]