This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new acbc9668 [YUNIKORN-2629] Adding a node can result in a deadlock (#859)
acbc9668 is described below
commit acbc9668baab890b51f8c3f40cb1947dcd6aebab
Author: Peter Bacsko <[email protected]>
AuthorDate: Fri Aug 2 15:21:58 2024 +0200
[YUNIKORN-2629] Adding a node can result in a deadlock (#859)
Closes: #859
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/cache/context.go | 65 ++++++------------------------------
pkg/cache/context_test.go | 6 ++--
pkg/cache/scheduler_callback_test.go | 1 -
3 files changed, 14 insertions(+), 58 deletions(-)
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index c608b98e..0c0dd30a 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -166,8 +166,6 @@ func (ctx *Context) addNode(obj interface{}) {
}
func (ctx *Context) updateNode(_, obj interface{}) {
- ctx.lock.Lock()
- defer ctx.lock.Unlock()
node, err := convertToNode(obj)
if err != nil {
log.Log(log.ShimContext).Error("node conversion failed",
zap.Error(err))
@@ -229,8 +227,6 @@ func (ctx *Context) updateNodeInternal(node *v1.Node,
register bool) {
}
func (ctx *Context) deleteNode(obj interface{}) {
- ctx.lock.Lock()
- defer ctx.lock.Unlock()
var node *v1.Node
switch t := obj.(type) {
case *v1.Node:
@@ -250,9 +246,6 @@ func (ctx *Context) deleteNode(obj interface{}) {
}
func (ctx *Context) addNodesWithoutRegistering(nodes []*v1.Node) {
- ctx.lock.Lock()
- defer ctx.lock.Unlock()
-
for _, node := range nodes {
ctx.updateNodeInternal(node, false)
}
@@ -288,9 +281,6 @@ func (ctx *Context) AddPod(obj interface{}) {
}
func (ctx *Context) UpdatePod(_, newObj interface{}) {
- ctx.lock.Lock()
- defer ctx.lock.Unlock()
-
pod, err := utils.Convert2Pod(newObj)
if err != nil {
log.Log(log.ShimContext).Error("failed to update pod",
zap.Error(err))
@@ -338,9 +328,9 @@ func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod) {
}
// add app if it doesn't already exist
- app := ctx.getApplication(appMeta.ApplicationID)
+ app := ctx.GetApplication(appMeta.ApplicationID)
if app == nil {
- app = ctx.addApplication(&AddApplicationRequest{
+ app = ctx.AddApplication(&AddApplicationRequest{
Metadata: appMeta,
})
}
@@ -444,10 +434,8 @@ func (ctx *Context) DeletePod(obj interface{}) {
}
func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
- ctx.lock.Lock()
- defer ctx.lock.Unlock()
if taskMeta, ok := getTaskMetadata(pod); ok {
- if app := ctx.getApplication(taskMeta.ApplicationID); app !=
nil {
+ if app := ctx.GetApplication(taskMeta.ApplicationID); app !=
nil {
ctx.notifyTaskComplete(taskMeta.ApplicationID,
taskMeta.TaskID)
}
}
@@ -457,9 +445,6 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
}
func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
- ctx.lock.Lock()
- defer ctx.lock.Unlock()
-
oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
if oldPod == nil {
// if pod is not in scheduler cache, no node updates are needed
@@ -590,8 +575,6 @@ func (ctx *Context) addPriorityClass(obj interface{}) {
}
func (ctx *Context) updatePriorityClass(_, newObj interface{}) {
- ctx.lock.Lock()
- defer ctx.lock.Unlock()
if priorityClass := utils.Convert2PriorityClass(newObj); priorityClass
!= nil {
ctx.updatePriorityClassInternal(priorityClass)
}
@@ -602,9 +585,6 @@ func (ctx *Context)
updatePriorityClassInternal(priorityClass *schedulingv1.Prio
}
func (ctx *Context) deletePriorityClass(obj interface{}) {
- ctx.lock.Lock()
- defer ctx.lock.Unlock()
-
log.Log(log.ShimContext).Debug("priorityClass deleted")
var priorityClass *schedulingv1.PriorityClass
switch t := obj.(type) {
@@ -670,8 +650,6 @@ func (ctx *Context) EventsToRegister(queueingHintFn
framework.QueueingHintFn) []
// IsPodFitNode evaluates given predicates based on current context
func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error {
- ctx.lock.RLock()
- defer ctx.lock.RUnlock()
pod := ctx.schedulerCache.GetPod(name)
if pod == nil {
return ErrorPodNotFound
@@ -692,8 +670,6 @@ func (ctx *Context) IsPodFitNode(name, node string,
allocate bool) error {
}
func (ctx *Context) IsPodFitNodeViaPreemption(name, node string, allocations
[]string, startIndex int) (int, bool) {
- ctx.lock.RLock()
- defer ctx.lock.RUnlock()
if pod := ctx.schedulerCache.GetPod(name); pod != nil {
// if pod exists in cache, try to run predicates
if targetNode := ctx.schedulerCache.GetNode(node); targetNode
!= nil {
@@ -802,8 +778,6 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
// this way, the core can make allocation decisions with consideration of
// other assumed pods before they are actually bound to the node (bound is
slow).
func (ctx *Context) AssumePod(name, node string) error {
- ctx.lock.Lock()
- defer ctx.lock.Unlock()
if pod := ctx.schedulerCache.GetPod(name); pod != nil {
// when add assumed pod, we make a copy of the pod to avoid
// modifying its original reference. otherwise, it may have
@@ -863,9 +837,6 @@ func (ctx *Context) AssumePod(name, node string) error {
// forget pod must be called when a pod is assumed to be running on a node,
// but then for some reason it is failed to bind or released.
func (ctx *Context) ForgetPod(name string) {
- ctx.lock.Lock()
- defer ctx.lock.Unlock()
-
if pod := ctx.schedulerCache.GetPod(name); pod != nil {
log.Log(log.ShimContext).Debug("forget pod", zap.String("pod",
pod.Name))
ctx.schedulerCache.ForgetPod(pod)
@@ -908,17 +879,11 @@ func (ctx *Context) StartPodAllocation(podKey string,
nodeID string) bool {
return ctx.schedulerCache.StartPodAllocation(podKey, nodeID)
}
-func (ctx *Context) NotifyTaskComplete(appID, taskID string) {
- ctx.lock.Lock()
- defer ctx.lock.Unlock()
- ctx.notifyTaskComplete(appID, taskID)
-}
-
func (ctx *Context) notifyTaskComplete(appID, taskID string) {
log.Log(log.ShimContext).Debug("NotifyTaskComplete",
zap.String("appID", appID),
zap.String("taskID", taskID))
- if app := ctx.getApplication(appID); app != nil {
+ if app := ctx.GetApplication(appID); app != nil {
log.Log(log.ShimContext).Debug("release allocation",
zap.String("appID", appID),
zap.String("taskID", taskID))
@@ -994,10 +959,6 @@ func (ctx *Context) AddApplication(request
*AddApplicationRequest) *Application
ctx.lock.Lock()
defer ctx.lock.Unlock()
- return ctx.addApplication(request)
-}
-
-func (ctx *Context) addApplication(request *AddApplicationRequest)
*Application {
log.Log(log.ShimContext).Debug("AddApplication", zap.Any("Request",
request))
if app := ctx.getApplication(request.Metadata.ApplicationID); app !=
nil {
return app
@@ -1065,25 +1026,28 @@ func (ctx *Context) getApplication(appID string)
*Application {
func (ctx *Context) RemoveApplication(appID string) error {
ctx.lock.Lock()
- defer ctx.lock.Unlock()
if app, exist := ctx.applications[appID]; exist {
// get the non-terminated task alias
nonTerminatedTaskAlias := app.getNonTerminatedTaskAlias()
// check there are any non-terminated task or not
if len(nonTerminatedTaskAlias) > 0 {
- return fmt.Errorf("failed to remove application %s
because it still has task in non-terminated task, tasks: %s", appID,
strings.Join(nonTerminatedTaskAlias, ","))
+ ctx.lock.Unlock()
+ return fmt.Errorf("failed to remove application %s
because it still has task in non-terminated tasks: %s", appID,
strings.Join(nonTerminatedTaskAlias, ","))
}
+ delete(ctx.applications, appID)
+ ctx.lock.Unlock()
// send the update request to scheduler core
rr :=
common.CreateUpdateRequestForRemoveApplication(app.applicationID, app.partition)
if err :=
ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateApplication(rr); err != nil {
log.Log(log.ShimContext).Error("failed to send remove
application request to core", zap.Error(err))
}
- delete(ctx.applications, appID)
+
log.Log(log.ShimContext).Info("app removed",
zap.String("appID", appID))
return nil
}
+ ctx.lock.Unlock()
return fmt.Errorf("application %s is not found in the context", appID)
}
@@ -1099,8 +1063,6 @@ func (ctx *Context) RemoveApplicationInternal(appID
string) {
// this implements ApplicationManagementProtocol
func (ctx *Context) AddTask(request *AddTaskRequest) *Task {
- ctx.lock.Lock()
- defer ctx.lock.Unlock()
return ctx.addTask(request)
}
@@ -1160,9 +1122,7 @@ func (ctx *Context) RemoveTask(appID, taskID string) {
}
func (ctx *Context) getTask(appID string, taskID string) *Task {
- ctx.lock.RLock()
- defer ctx.lock.RUnlock()
- app := ctx.getApplication(appID)
+ app := ctx.GetApplication(appID)
if app == nil {
log.Log(log.ShimContext).Debug("application is not found in the
context",
zap.String("appID", appID))
@@ -1684,9 +1644,6 @@ func (ctx *Context) finalizeNodes(existingNodes
[]*v1.Node) error {
nodeMap[node.Name] = node
}
- ctx.lock.Lock()
- defer ctx.lock.Unlock()
-
// find any existing nodes that no longer exist
for _, node := range existingNodes {
if _, ok := nodeMap[node.Name]; !ok {
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index c4802d4a..717c4eb0 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -368,7 +368,7 @@ func TestRemoveApplication(t *testing.T) {
assert.Equal(t, len(context.applications), 3)
err := context.RemoveApplication(appID1)
assert.Assert(t, err != nil)
- assert.ErrorContains(t, err, "application app00001 because it still has
task in non-terminated task, tasks: /remove-test-00001")
+ assert.ErrorContains(t, err, "application app00001 because it still has
task in non-terminated tasks: /remove-test-00001")
app := context.GetApplication(appID1)
assert.Assert(t, app != nil)
@@ -1103,7 +1103,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) {
assert.Equal(t, len(app.GetBoundTasks()), 2)
// release one of the tasks
- context.NotifyTaskComplete(appID, pod2UID)
+ context.notifyTaskComplete(appID, pod2UID)
// wait for release
err = utils.WaitForCondition(func() bool {
@@ -2144,7 +2144,7 @@ func TestTaskRemoveOnCompletion(t *testing.T) {
assert.NilError(t, err)
// mark completion
- context.NotifyTaskComplete(appID, taskUID1)
+ context.notifyTaskComplete(appID, taskUID1)
err = utils.WaitForCondition(func() bool {
return task.GetTaskState() == TaskStates().Completed
}, 100*time.Millisecond, time.Second)
diff --git a/pkg/cache/scheduler_callback_test.go
b/pkg/cache/scheduler_callback_test.go
index 47d1b146..59bd875e 100644
--- a/pkg/cache/scheduler_callback_test.go
+++ b/pkg/cache/scheduler_callback_test.go
@@ -85,7 +85,6 @@ func TestUpdateAllocation_NewTask_TaskNotFound(t *testing.T) {
}
func TestUpdateAllocation_NewTask_AssumePodFails(t *testing.T) {
- t.Skip("disabled until YUNIKORN-2629 is resolved") // test can randomly
trigger a deadlock, resulting in a failed build
callback, context := initCallbackTest(t, false, false)
defer dispatcher.UnregisterAllEventHandlers()
defer dispatcher.Stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]