This is an automated email from the ASF dual-hosted git repository.

manirajv06 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 39dccd1c [YUNIKORN-3269] Respect Kubernetes node cordon state (#1020)
39dccd1c is described below

commit 39dccd1c7a047c581da5d7981745c631398e95bb
Author: Xi Chen <[email protected]>
AuthorDate: Tue May 5 13:38:32 2026 +0530

    [YUNIKORN-3269] Respect Kubernetes node cordon state (#1020)
    
    Closes: #1020
    
    Signed-off-by: Manikandan R <[email protected]>
---
 pkg/cache/context.go      |  35 +++++++++--
 pkg/cache/context_test.go | 145 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 176 insertions(+), 4 deletions(-)

diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 8a60786f..ec6f7c3b 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -219,9 +219,11 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, 
register bool) {
                        }
                }
 
-               // if node was registered in-line, enable it in the core
-               if err := ctx.enableNode(node); err != nil {
-                       log.Log(log.ShimContext).Warn("Failed to enable node", 
zap.Error(err))
+               // if node is not cordoned, enable it in the core
+               if !node.Spec.Unschedulable {
+                       if err := ctx.enableNode(node); err != nil {
+                               log.Log(log.ShimContext).Warn("Failed to enable 
node", zap.Error(err))
+                       }
                }
        } else {
                // existing node
@@ -236,9 +238,24 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, 
register bool) {
                                log.Log(log.ShimContext).Warn("Failed to update 
cached node capacity", zap.String("nodeName", node.Name))
                        }
                }
+               if err := ctx.updateNodeSchedulability(prevNode, node); err != 
nil {
+                       log.Log(log.ShimContext).Warn("Failed to update node 
schedulability", zap.Error(err))
+               }
        }
 }
 
+func (ctx *Context) updateNodeSchedulability(prevNode, node *v1.Node) error {
+       if prevNode.Spec.Unschedulable == node.Spec.Unschedulable {
+               return nil
+       }
+       action := si.NodeInfo_DRAIN_TO_SCHEDULABLE
+       if node.Spec.Unschedulable {
+               action = si.NodeInfo_DRAIN_NODE
+       }
+       request := common.CreateUpdateRequestForDeleteOrRestoreNode(node.Name, 
action)
+       return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
+}
+
 func (ctx *Context) deleteNode(obj interface{}) {
        ctx.lock.Lock()
        defer ctx.lock.Unlock()
@@ -1405,7 +1422,7 @@ func (ctx *Context) InitializeState() error {
 
        // Step 4: Enable nodes. At this point all allocations and asks have 
been processed, so it is safe to allow the
        // core to begin scheduling.
-       err = ctx.enableNodes(acceptedNodes)
+       err = ctx.enableNodes(filterSchedulableNodes(acceptedNodes))
        if err != nil {
                log.Log(log.ShimContext).Error("failed to enable nodes", 
zap.Error(err))
                return err
@@ -1646,6 +1663,16 @@ func (ctx *Context) enableNodes(nodes []*v1.Node) error {
        return nil
 }
 
+func filterSchedulableNodes(nodes []*v1.Node) []*v1.Node {
+       schedulableNodes := make([]*v1.Node, 0, len(nodes))
+       for _, node := range nodes {
+               if node != nil && !node.Spec.Unschedulable {
+                       schedulableNodes = append(schedulableNodes, node)
+               }
+       }
+       return schedulableNodes
+}
+
 func (ctx *Context) finalizeNodes(existingNodes []*v1.Node) error {
        // list all nodes via the informer
        nodes, err := 
ctx.apiProvider.GetAPIs().NodeInformer.Lister().List(labels.Everything())
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index 26dc7ed8..7732d7d2 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -212,6 +212,110 @@ func TestUpdateNodes(t *testing.T) {
        ctx.updateNode(&oldNode, &newNode)
 }
 
+func TestAddCordonedNodeDoesNotEnable(t *testing.T) {
+       ctx, apiProvider := initContextAndAPIProviderForTest()
+       dispatcher.Start()
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+
+       actions := make([]si.NodeInfo_ActionFromRM, 0)
+       apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) 
error {
+               for _, node := range request.Nodes {
+                       actions = append(actions, node.Action)
+                       if node.Action == si.NodeInfo_CREATE_DRAIN {
+                               dispatcher.Dispatch(CachedSchedulerNodeEvent{
+                                       NodeID: node.NodeID,
+                                       Event:  NodeAccepted,
+                               })
+                       }
+               }
+               return nil
+       })
+
+       node := v1.Node{
+               ObjectMeta: apis.ObjectMeta{
+                       Name:      Host1,
+                       Namespace: "default",
+                       UID:       uid1,
+               },
+               Spec: v1.NodeSpec{
+                       Unschedulable: true,
+               },
+       }
+
+       ctx.addNode(&node)
+
+       assert.Equal(t, len(actions), 1)
+       assert.Equal(t, actions[0], si.NodeInfo_CREATE_DRAIN)
+}
+
+func TestUpdateNodeSchedulability(t *testing.T) {
+       ctx, apiProvider := initContextAndAPIProviderForTest()
+       dispatcher.Start()
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+
+       actions := make([]si.NodeInfo_ActionFromRM, 0)
+       apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) 
error {
+               for _, node := range request.Nodes {
+                       actions = append(actions, node.Action)
+                       if node.Action == si.NodeInfo_CREATE_DRAIN {
+                               dispatcher.Dispatch(CachedSchedulerNodeEvent{
+                                       NodeID: node.NodeID,
+                                       Event:  NodeAccepted,
+                               })
+                       }
+               }
+               return nil
+       })
+
+       node := v1.Node{
+               ObjectMeta: apis.ObjectMeta{
+                       Name:      Host1,
+                       Namespace: "default",
+                       UID:       uid1,
+               },
+       }
+
+       ctx.addNode(&node)
+       assert.Equal(t, len(actions), 2)
+       assert.Equal(t, actions[0], si.NodeInfo_CREATE_DRAIN)
+       assert.Equal(t, actions[1], si.NodeInfo_DRAIN_TO_SCHEDULABLE)
+
+       cordoned := node.DeepCopy()
+       cordoned.Spec.Unschedulable = true
+       ctx.updateNode(&node, cordoned)
+       assert.Equal(t, len(actions), 3)
+       assert.Equal(t, actions[2], si.NodeInfo_DRAIN_NODE)
+
+       cordonedCopy := cordoned.DeepCopy()
+       ctx.updateNode(cordoned, cordonedCopy)
+       assert.Equal(t, len(actions), 3)
+
+       uncordoned := cordoned.DeepCopy()
+       uncordoned.Spec.Unschedulable = false
+       ctx.updateNode(cordoned, uncordoned)
+       assert.Equal(t, len(actions), 4)
+       assert.Equal(t, actions[3], si.NodeInfo_DRAIN_TO_SCHEDULABLE)
+}
+
+func TestFilterSchedulableNodes(t *testing.T) {
+       node1 := &v1.Node{
+               ObjectMeta: apis.ObjectMeta{Name: Host1},
+       }
+       node2 := &v1.Node{
+               ObjectMeta: apis.ObjectMeta{Name: Host2},
+               Spec: v1.NodeSpec{
+                       Unschedulable: true,
+               },
+       }
+
+       nodes := filterSchedulableNodes([]*v1.Node{node1, nil, node2})
+
+       assert.Equal(t, len(nodes), 1)
+       assert.Equal(t, nodes[0].Name, Host1)
+}
+
 func TestDeleteNodes(t *testing.T) {
        ctx, apiProvider := initContextAndAPIProviderForTest()
        dispatcher.Start()
@@ -2000,6 +2104,47 @@ func TestInitializeState(t *testing.T) {
        assert.Assert(t, task3 == nil, "pod3 was found")
 }
 
+func TestInitializeStateDoesNotEnableCordonedNodes(t *testing.T) {
+       context, apiProvider := initContextAndAPIProviderForTest()
+       apiProvider.RunEventHandler()
+       nodeLister, ok := 
apiProvider.GetAPIs().NodeInformer.Lister().(*test.NodeListerMock)
+       assert.Assert(t, ok, "unable to get mock node lister")
+
+       dispatcher.Start()
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+
+       actionsByNode := make(map[string][]si.NodeInfo_ActionFromRM)
+       apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) 
error {
+               for _, node := range request.Nodes {
+                       actionsByNode[node.NodeID] = 
append(actionsByNode[node.NodeID], node.Action)
+                       if node.Action == si.NodeInfo_CREATE_DRAIN {
+                               dispatcher.Dispatch(CachedSchedulerNodeEvent{
+                                       NodeID: node.NodeID,
+                                       Event:  NodeAccepted,
+                               })
+                       }
+               }
+               return nil
+       })
+
+       schedulableNode := nodeForTest(nodeName1, "10G", "4")
+       cordonedNode := nodeForTest(nodeName2, "10G", "4")
+       cordonedNode.Spec.Unschedulable = true
+       nodeLister.AddNode(schedulableNode)
+       nodeLister.AddNode(cordonedNode)
+
+       err := context.InitializeState()
+       assert.NilError(t, err, "InitializeState failed")
+
+       assert.Assert(t, context.schedulerCache.GetNode(nodeName1) != nil, 
"schedulable node was not cached")
+       assert.Assert(t, context.schedulerCache.GetNode(nodeName2) != nil, 
"cordoned node was not cached")
+
+       cordonedActions := actionsByNode[nodeName2]
+       assert.Equal(t, len(cordonedActions), 1)
+       assert.Equal(t, cordonedActions[0], si.NodeInfo_CREATE_DRAIN)
+}
+
 func TestPodAdoption(t *testing.T) {
        ctx, apiProvider := initContextAndAPIProviderForTest()
        dispatcher.Start()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to