This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/branch-1.5 by this push:
new 389314e0 [YUNIKORN-2629] Adding a node can result in a deadlock
389314e0 is described below
commit 389314e002a150bf4285056dc4fad486b9d888b9
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu May 16 22:22:27 2024 +0200
[YUNIKORN-2629] Adding a node can result in a deadlock
---
pkg/cache/context.go | 57 +++++++++++++++++++++++++++++++++++-----------------
1 file changed, 39 insertions(+), 18 deletions(-)
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index ff6e89ed..bea59346 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -1394,7 +1394,7 @@ func (ctx *Context) InitializeState() error {
log.Log(log.ShimContext).Error("failed to load nodes",
zap.Error(err))
return err
}
- acceptedNodes, err := ctx.registerNodes(nodes)
+ acceptedNodes, err := ctx.RegisterNodes(nodes)
if err != nil {
log.Log(log.ShimContext).Error("failed to register nodes",
zap.Error(err))
return err
@@ -1510,11 +1510,17 @@ func (ctx *Context) registerNode(node *v1.Node) error {
return nil
}
+func (ctx *Context) RegisterNodes(nodes []*v1.Node) ([]*v1.Node, error) {
+ ctx.lock.Lock()
+ defer ctx.lock.Unlock()
+ return ctx.registerNodes(nodes)
+}
+
+// registerNodes registers the nodes to the scheduler core.
+// This method must be called while holding the Context write lock.
func (ctx *Context) registerNodes(nodes []*v1.Node) ([]*v1.Node, error) {
nodesToRegister := make([]*si.NodeInfo, 0)
pendingNodes := make(map[string]*v1.Node)
- acceptedNodes := make([]*v1.Node, 0)
- rejectedNodes := make([]*v1.Node, 0)
// Generate a NodeInfo object for each node and add to the registration
request
for _, node := range nodes {
@@ -1535,12 +1541,34 @@ func (ctx *Context) registerNodes(nodes []*v1.Node)
([]*v1.Node, error) {
pendingNodes[node.Name] = node
}
- var wg sync.WaitGroup
+ acceptedNodes, rejectedNodes, err :=
ctx.registerNodesInternal(nodesToRegister, pendingNodes)
+ if err != nil {
+ log.Log(log.ShimContext).Error("Failed to register nodes",
zap.Error(err))
+ return nil, err
+ }
+
+ for _, node := range acceptedNodes {
+ // post a successful event to the node
+ events.GetRecorder().Eventf(node.DeepCopy(), nil,
v1.EventTypeNormal, "NodeAccepted", "NodeAccepted",
+ fmt.Sprintf("node %s is accepted by the scheduler",
node.Name))
+ }
+ for _, node := range rejectedNodes {
+ // post a failure event to the node
+ events.GetRecorder().Eventf(node.DeepCopy(), nil,
v1.EventTypeWarning, "NodeRejected", "NodeRejected",
+ fmt.Sprintf("node %s is rejected by the scheduler",
node.Name))
+ }
+ return acceptedNodes, nil
+}
+
+func (ctx *Context) registerNodesInternal(nodesToRegister []*si.NodeInfo,
pendingNodes map[string]*v1.Node) ([]*v1.Node, []*v1.Node, error) {
+ acceptedNodes := make([]*v1.Node, 0)
+ rejectedNodes := make([]*v1.Node, 0)
+
+ var wg sync.WaitGroup
// initialize wait group with the number of responses we expect
wg.Add(len(pendingNodes))
- // register with the dispatcher so that we can track our response
handlerID := fmt.Sprintf("%s-%d", registerNodeContextHandler,
ctx.txnID.Add(1))
dispatcher.RegisterEventHandler(handlerID, dispatcher.EventTypeNode,
func(event interface{}) {
nodeEvent, ok := event.(CachedSchedulerNodeEvent)
@@ -1572,24 +1600,17 @@ func (ctx *Context) registerNodes(nodes []*v1.Node)
([]*v1.Node, error) {
RmID: schedulerconf.GetSchedulerConf().ClusterID,
}); err != nil {
log.Log(log.ShimContext).Error("Failed to register nodes",
zap.Error(err))
- return nil, err
+ return nil, nil, err
}
+ // write lock must always be held at this point, releasing it while
waiting to avoid any potential deadlocks
+ ctx.lock.Unlock()
+ defer ctx.lock.Lock()
+
// wait for all responses to accumulate
wg.Wait()
- for _, node := range acceptedNodes {
- // post a successful event to the node
- events.GetRecorder().Eventf(node.DeepCopy(), nil,
v1.EventTypeNormal, "NodeAccepted", "NodeAccepted",
- fmt.Sprintf("node %s is accepted by the scheduler",
node.Name))
- }
- for _, node := range rejectedNodes {
- // post a failure event to the node
- events.GetRecorder().Eventf(node.DeepCopy(), nil,
v1.EventTypeWarning, "NodeRejected", "NodeRejected",
- fmt.Sprintf("node %s is rejected by the scheduler",
node.Name))
- }
-
- return acceptedNodes, nil
+ return acceptedNodes, rejectedNodes, nil
}
func (ctx *Context) decommissionNode(node *v1.Node) error {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]