This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new acbc9668 [YUNIKORN-2629] Adding a node can result in a deadlock (#859)
acbc9668 is described below

commit acbc9668baab890b51f8c3f40cb1947dcd6aebab
Author: Peter Bacsko <[email protected]>
AuthorDate: Fri Aug 2 15:21:58 2024 +0200

    [YUNIKORN-2629] Adding a node can result in a deadlock (#859)
    
    Closes: #859
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/cache/context.go                 | 65 ++++++------------------------------
 pkg/cache/context_test.go            |  6 ++--
 pkg/cache/scheduler_callback_test.go |  1 -
 3 files changed, 14 insertions(+), 58 deletions(-)

diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index c608b98e..0c0dd30a 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -166,8 +166,6 @@ func (ctx *Context) addNode(obj interface{}) {
 }
 
 func (ctx *Context) updateNode(_, obj interface{}) {
-       ctx.lock.Lock()
-       defer ctx.lock.Unlock()
        node, err := convertToNode(obj)
        if err != nil {
                log.Log(log.ShimContext).Error("node conversion failed", 
zap.Error(err))
@@ -229,8 +227,6 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, 
register bool) {
 }
 
 func (ctx *Context) deleteNode(obj interface{}) {
-       ctx.lock.Lock()
-       defer ctx.lock.Unlock()
        var node *v1.Node
        switch t := obj.(type) {
        case *v1.Node:
@@ -250,9 +246,6 @@ func (ctx *Context) deleteNode(obj interface{}) {
 }
 
 func (ctx *Context) addNodesWithoutRegistering(nodes []*v1.Node) {
-       ctx.lock.Lock()
-       defer ctx.lock.Unlock()
-
        for _, node := range nodes {
                ctx.updateNodeInternal(node, false)
        }
@@ -288,9 +281,6 @@ func (ctx *Context) AddPod(obj interface{}) {
 }
 
 func (ctx *Context) UpdatePod(_, newObj interface{}) {
-       ctx.lock.Lock()
-       defer ctx.lock.Unlock()
-
        pod, err := utils.Convert2Pod(newObj)
        if err != nil {
                log.Log(log.ShimContext).Error("failed to update pod", 
zap.Error(err))
@@ -338,9 +328,9 @@ func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod) {
        }
 
        // add app if it doesn't already exist
-       app := ctx.getApplication(appMeta.ApplicationID)
+       app := ctx.GetApplication(appMeta.ApplicationID)
        if app == nil {
-               app = ctx.addApplication(&AddApplicationRequest{
+               app = ctx.AddApplication(&AddApplicationRequest{
                        Metadata: appMeta,
                })
        }
@@ -444,10 +434,8 @@ func (ctx *Context) DeletePod(obj interface{}) {
 }
 
 func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
-       ctx.lock.Lock()
-       defer ctx.lock.Unlock()
        if taskMeta, ok := getTaskMetadata(pod); ok {
-               if app := ctx.getApplication(taskMeta.ApplicationID); app != 
nil {
+               if app := ctx.GetApplication(taskMeta.ApplicationID); app != 
nil {
                        ctx.notifyTaskComplete(taskMeta.ApplicationID, 
taskMeta.TaskID)
                }
        }
@@ -457,9 +445,6 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
 }
 
 func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
-       ctx.lock.Lock()
-       defer ctx.lock.Unlock()
-
        oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
        if oldPod == nil {
                // if pod is not in scheduler cache, no node updates are needed
@@ -590,8 +575,6 @@ func (ctx *Context) addPriorityClass(obj interface{}) {
 }
 
 func (ctx *Context) updatePriorityClass(_, newObj interface{}) {
-       ctx.lock.Lock()
-       defer ctx.lock.Unlock()
        if priorityClass := utils.Convert2PriorityClass(newObj); priorityClass 
!= nil {
                ctx.updatePriorityClassInternal(priorityClass)
        }
@@ -602,9 +585,6 @@ func (ctx *Context) 
updatePriorityClassInternal(priorityClass *schedulingv1.Prio
 }
 
 func (ctx *Context) deletePriorityClass(obj interface{}) {
-       ctx.lock.Lock()
-       defer ctx.lock.Unlock()
-
        log.Log(log.ShimContext).Debug("priorityClass deleted")
        var priorityClass *schedulingv1.PriorityClass
        switch t := obj.(type) {
@@ -670,8 +650,6 @@ func (ctx *Context) EventsToRegister(queueingHintFn 
framework.QueueingHintFn) []
 
 // IsPodFitNode evaluates given predicates based on current context
 func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error {
-       ctx.lock.RLock()
-       defer ctx.lock.RUnlock()
        pod := ctx.schedulerCache.GetPod(name)
        if pod == nil {
                return ErrorPodNotFound
@@ -692,8 +670,6 @@ func (ctx *Context) IsPodFitNode(name, node string, 
allocate bool) error {
 }
 
 func (ctx *Context) IsPodFitNodeViaPreemption(name, node string, allocations 
[]string, startIndex int) (int, bool) {
-       ctx.lock.RLock()
-       defer ctx.lock.RUnlock()
        if pod := ctx.schedulerCache.GetPod(name); pod != nil {
                // if pod exists in cache, try to run predicates
                if targetNode := ctx.schedulerCache.GetNode(node); targetNode 
!= nil {
@@ -802,8 +778,6 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
 // this way, the core can make allocation decisions with consideration of
 // other assumed pods before they are actually bound to the node (bound is 
slow).
 func (ctx *Context) AssumePod(name, node string) error {
-       ctx.lock.Lock()
-       defer ctx.lock.Unlock()
        if pod := ctx.schedulerCache.GetPod(name); pod != nil {
                // when add assumed pod, we make a copy of the pod to avoid
                // modifying its original reference. otherwise, it may have
@@ -863,9 +837,6 @@ func (ctx *Context) AssumePod(name, node string) error {
 // forget pod must be called when a pod is assumed to be running on a node,
 // but then for some reason it is failed to bind or released.
 func (ctx *Context) ForgetPod(name string) {
-       ctx.lock.Lock()
-       defer ctx.lock.Unlock()
-
        if pod := ctx.schedulerCache.GetPod(name); pod != nil {
                log.Log(log.ShimContext).Debug("forget pod", zap.String("pod", 
pod.Name))
                ctx.schedulerCache.ForgetPod(pod)
@@ -908,17 +879,11 @@ func (ctx *Context) StartPodAllocation(podKey string, 
nodeID string) bool {
        return ctx.schedulerCache.StartPodAllocation(podKey, nodeID)
 }
 
-func (ctx *Context) NotifyTaskComplete(appID, taskID string) {
-       ctx.lock.Lock()
-       defer ctx.lock.Unlock()
-       ctx.notifyTaskComplete(appID, taskID)
-}
-
 func (ctx *Context) notifyTaskComplete(appID, taskID string) {
        log.Log(log.ShimContext).Debug("NotifyTaskComplete",
                zap.String("appID", appID),
                zap.String("taskID", taskID))
-       if app := ctx.getApplication(appID); app != nil {
+       if app := ctx.GetApplication(appID); app != nil {
                log.Log(log.ShimContext).Debug("release allocation",
                        zap.String("appID", appID),
                        zap.String("taskID", taskID))
@@ -994,10 +959,6 @@ func (ctx *Context) AddApplication(request 
*AddApplicationRequest) *Application
        ctx.lock.Lock()
        defer ctx.lock.Unlock()
 
-       return ctx.addApplication(request)
-}
-
-func (ctx *Context) addApplication(request *AddApplicationRequest) 
*Application {
        log.Log(log.ShimContext).Debug("AddApplication", zap.Any("Request", 
request))
        if app := ctx.getApplication(request.Metadata.ApplicationID); app != 
nil {
                return app
@@ -1065,25 +1026,28 @@ func (ctx *Context) getApplication(appID string) 
*Application {
 
 func (ctx *Context) RemoveApplication(appID string) error {
        ctx.lock.Lock()
-       defer ctx.lock.Unlock()
        if app, exist := ctx.applications[appID]; exist {
                // get the non-terminated task alias
                nonTerminatedTaskAlias := app.getNonTerminatedTaskAlias()
                // check there are any non-terminated task or not
                if len(nonTerminatedTaskAlias) > 0 {
-                       return fmt.Errorf("failed to remove application %s 
because it still has task in non-terminated task, tasks: %s", appID, 
strings.Join(nonTerminatedTaskAlias, ","))
+                       ctx.lock.Unlock()
+                       return fmt.Errorf("failed to remove application %s 
because it still has task in non-terminated tasks: %s", appID, 
strings.Join(nonTerminatedTaskAlias, ","))
                }
+               delete(ctx.applications, appID)
+               ctx.lock.Unlock()
                // send the update request to scheduler core
                rr := 
common.CreateUpdateRequestForRemoveApplication(app.applicationID, app.partition)
                if err := 
ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateApplication(rr); err != nil {
                        log.Log(log.ShimContext).Error("failed to send remove 
application request to core", zap.Error(err))
                }
-               delete(ctx.applications, appID)
+
                log.Log(log.ShimContext).Info("app removed",
                        zap.String("appID", appID))
 
                return nil
        }
+       ctx.lock.Unlock()
        return fmt.Errorf("application %s is not found in the context", appID)
 }
 
@@ -1099,8 +1063,6 @@ func (ctx *Context) RemoveApplicationInternal(appID 
string) {
 
 // this implements ApplicationManagementProtocol
 func (ctx *Context) AddTask(request *AddTaskRequest) *Task {
-       ctx.lock.Lock()
-       defer ctx.lock.Unlock()
        return ctx.addTask(request)
 }
 
@@ -1160,9 +1122,7 @@ func (ctx *Context) RemoveTask(appID, taskID string) {
 }
 
 func (ctx *Context) getTask(appID string, taskID string) *Task {
-       ctx.lock.RLock()
-       defer ctx.lock.RUnlock()
-       app := ctx.getApplication(appID)
+       app := ctx.GetApplication(appID)
        if app == nil {
                log.Log(log.ShimContext).Debug("application is not found in the 
context",
                        zap.String("appID", appID))
@@ -1684,9 +1644,6 @@ func (ctx *Context) finalizeNodes(existingNodes 
[]*v1.Node) error {
                nodeMap[node.Name] = node
        }
 
-       ctx.lock.Lock()
-       defer ctx.lock.Unlock()
-
        // find any existing nodes that no longer exist
        for _, node := range existingNodes {
                if _, ok := nodeMap[node.Name]; !ok {
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index c4802d4a..717c4eb0 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -368,7 +368,7 @@ func TestRemoveApplication(t *testing.T) {
        assert.Equal(t, len(context.applications), 3)
        err := context.RemoveApplication(appID1)
        assert.Assert(t, err != nil)
-       assert.ErrorContains(t, err, "application app00001 because it still has 
task in non-terminated task, tasks: /remove-test-00001")
+       assert.ErrorContains(t, err, "application app00001 because it still has 
task in non-terminated tasks: /remove-test-00001")
 
        app := context.GetApplication(appID1)
        assert.Assert(t, app != nil)
@@ -1103,7 +1103,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) {
        assert.Equal(t, len(app.GetBoundTasks()), 2)
 
        // release one of the tasks
-       context.NotifyTaskComplete(appID, pod2UID)
+       context.notifyTaskComplete(appID, pod2UID)
 
        // wait for release
        err = utils.WaitForCondition(func() bool {
@@ -2144,7 +2144,7 @@ func TestTaskRemoveOnCompletion(t *testing.T) {
        assert.NilError(t, err)
 
        // mark completion
-       context.NotifyTaskComplete(appID, taskUID1)
+       context.notifyTaskComplete(appID, taskUID1)
        err = utils.WaitForCondition(func() bool {
                return task.GetTaskState() == TaskStates().Completed
        }, 100*time.Millisecond, time.Second)
diff --git a/pkg/cache/scheduler_callback_test.go 
b/pkg/cache/scheduler_callback_test.go
index 47d1b146..59bd875e 100644
--- a/pkg/cache/scheduler_callback_test.go
+++ b/pkg/cache/scheduler_callback_test.go
@@ -85,7 +85,6 @@ func TestUpdateAllocation_NewTask_TaskNotFound(t *testing.T) {
 }
 
 func TestUpdateAllocation_NewTask_AssumePodFails(t *testing.T) {
-       t.Skip("disabled until YUNIKORN-2629 is resolved") // test can randomly 
trigger a deadlock, resulting in a failed build
        callback, context := initCallbackTest(t, false, false)
        defer dispatcher.UnregisterAllEventHandlers()
        defer dispatcher.Stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to