This is an automated email from the ASF dual-hosted git repository.
manirajv06 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 39dccd1c [YUNIKORN-3269] Respect Kubernetes node cordon state (#1020)
39dccd1c is described below
commit 39dccd1c7a047c581da5d7981745c631398e95bb
Author: Xi Chen <[email protected]>
AuthorDate: Tue May 5 13:38:32 2026 +0530
[YUNIKORN-3269] Respect Kubernetes node cordon state (#1020)
Closes: #1020
Signed-off-by: Manikandan R <[email protected]>
---
pkg/cache/context.go | 35 +++++++++--
pkg/cache/context_test.go | 145 ++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 176 insertions(+), 4 deletions(-)
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 8a60786f..ec6f7c3b 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -219,9 +219,11 @@ func (ctx *Context) updateNodeInternal(node *v1.Node,
register bool) {
}
}
- // if node was registered in-line, enable it in the core
- if err := ctx.enableNode(node); err != nil {
- log.Log(log.ShimContext).Warn("Failed to enable node",
zap.Error(err))
+ // if node is not cordoned, enable it in the core
+ if !node.Spec.Unschedulable {
+ if err := ctx.enableNode(node); err != nil {
+ log.Log(log.ShimContext).Warn("Failed to enable
node", zap.Error(err))
+ }
}
} else {
// existing node
@@ -236,9 +238,24 @@ func (ctx *Context) updateNodeInternal(node *v1.Node,
register bool) {
log.Log(log.ShimContext).Warn("Failed to update
cached node capacity", zap.String("nodeName", node.Name))
}
}
+ if err := ctx.updateNodeSchedulability(prevNode, node); err !=
nil {
+ log.Log(log.ShimContext).Warn("Failed to update node
schedulability", zap.Error(err))
+ }
}
}
+func (ctx *Context) updateNodeSchedulability(prevNode, node *v1.Node) error {
+ if prevNode.Spec.Unschedulable == node.Spec.Unschedulable {
+ return nil
+ }
+ action := si.NodeInfo_DRAIN_TO_SCHEDULABLE
+ if node.Spec.Unschedulable {
+ action = si.NodeInfo_DRAIN_NODE
+ }
+ request := common.CreateUpdateRequestForDeleteOrRestoreNode(node.Name,
action)
+ return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
+}
+
func (ctx *Context) deleteNode(obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
@@ -1405,7 +1422,7 @@ func (ctx *Context) InitializeState() error {
// Step 4: Enable nodes. At this point all allocations and asks have
been processed, so it is safe to allow the
// core to begin scheduling.
- err = ctx.enableNodes(acceptedNodes)
+ err = ctx.enableNodes(filterSchedulableNodes(acceptedNodes))
if err != nil {
log.Log(log.ShimContext).Error("failed to enable nodes",
zap.Error(err))
return err
@@ -1646,6 +1663,16 @@ func (ctx *Context) enableNodes(nodes []*v1.Node) error {
return nil
}
+func filterSchedulableNodes(nodes []*v1.Node) []*v1.Node {
+ schedulableNodes := make([]*v1.Node, 0, len(nodes))
+ for _, node := range nodes {
+ if node != nil && !node.Spec.Unschedulable {
+ schedulableNodes = append(schedulableNodes, node)
+ }
+ }
+ return schedulableNodes
+}
+
func (ctx *Context) finalizeNodes(existingNodes []*v1.Node) error {
// list all nodes via the informer
nodes, err :=
ctx.apiProvider.GetAPIs().NodeInformer.Lister().List(labels.Everything())
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index 26dc7ed8..7732d7d2 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -212,6 +212,110 @@ func TestUpdateNodes(t *testing.T) {
ctx.updateNode(&oldNode, &newNode)
}
+func TestAddCordonedNodeDoesNotEnable(t *testing.T) {
+ ctx, apiProvider := initContextAndAPIProviderForTest()
+ dispatcher.Start()
+ defer dispatcher.UnregisterAllEventHandlers()
+ defer dispatcher.Stop()
+
+ actions := make([]si.NodeInfo_ActionFromRM, 0)
+ apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest)
error {
+ for _, node := range request.Nodes {
+ actions = append(actions, node.Action)
+ if node.Action == si.NodeInfo_CREATE_DRAIN {
+ dispatcher.Dispatch(CachedSchedulerNodeEvent{
+ NodeID: node.NodeID,
+ Event: NodeAccepted,
+ })
+ }
+ }
+ return nil
+ })
+
+ node := v1.Node{
+ ObjectMeta: apis.ObjectMeta{
+ Name: Host1,
+ Namespace: "default",
+ UID: uid1,
+ },
+ Spec: v1.NodeSpec{
+ Unschedulable: true,
+ },
+ }
+
+ ctx.addNode(&node)
+
+ assert.Equal(t, len(actions), 1)
+ assert.Equal(t, actions[0], si.NodeInfo_CREATE_DRAIN)
+}
+
+func TestUpdateNodeSchedulability(t *testing.T) {
+ ctx, apiProvider := initContextAndAPIProviderForTest()
+ dispatcher.Start()
+ defer dispatcher.UnregisterAllEventHandlers()
+ defer dispatcher.Stop()
+
+ actions := make([]si.NodeInfo_ActionFromRM, 0)
+ apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest)
error {
+ for _, node := range request.Nodes {
+ actions = append(actions, node.Action)
+ if node.Action == si.NodeInfo_CREATE_DRAIN {
+ dispatcher.Dispatch(CachedSchedulerNodeEvent{
+ NodeID: node.NodeID,
+ Event: NodeAccepted,
+ })
+ }
+ }
+ return nil
+ })
+
+ node := v1.Node{
+ ObjectMeta: apis.ObjectMeta{
+ Name: Host1,
+ Namespace: "default",
+ UID: uid1,
+ },
+ }
+
+ ctx.addNode(&node)
+ assert.Equal(t, len(actions), 2)
+ assert.Equal(t, actions[0], si.NodeInfo_CREATE_DRAIN)
+ assert.Equal(t, actions[1], si.NodeInfo_DRAIN_TO_SCHEDULABLE)
+
+ cordoned := node.DeepCopy()
+ cordoned.Spec.Unschedulable = true
+ ctx.updateNode(&node, cordoned)
+ assert.Equal(t, len(actions), 3)
+ assert.Equal(t, actions[2], si.NodeInfo_DRAIN_NODE)
+
+ cordonedCopy := cordoned.DeepCopy()
+ ctx.updateNode(cordoned, cordonedCopy)
+ assert.Equal(t, len(actions), 3)
+
+ uncordoned := cordoned.DeepCopy()
+ uncordoned.Spec.Unschedulable = false
+ ctx.updateNode(cordoned, uncordoned)
+ assert.Equal(t, len(actions), 4)
+ assert.Equal(t, actions[3], si.NodeInfo_DRAIN_TO_SCHEDULABLE)
+}
+
+func TestFilterSchedulableNodes(t *testing.T) {
+ node1 := &v1.Node{
+ ObjectMeta: apis.ObjectMeta{Name: Host1},
+ }
+ node2 := &v1.Node{
+ ObjectMeta: apis.ObjectMeta{Name: Host2},
+ Spec: v1.NodeSpec{
+ Unschedulable: true,
+ },
+ }
+
+ nodes := filterSchedulableNodes([]*v1.Node{node1, nil, node2})
+
+ assert.Equal(t, len(nodes), 1)
+ assert.Equal(t, nodes[0].Name, Host1)
+}
+
func TestDeleteNodes(t *testing.T) {
ctx, apiProvider := initContextAndAPIProviderForTest()
dispatcher.Start()
@@ -2000,6 +2104,47 @@ func TestInitializeState(t *testing.T) {
assert.Assert(t, task3 == nil, "pod3 was found")
}
+func TestInitializeStateDoesNotEnableCordonedNodes(t *testing.T) {
+ context, apiProvider := initContextAndAPIProviderForTest()
+ apiProvider.RunEventHandler()
+ nodeLister, ok :=
apiProvider.GetAPIs().NodeInformer.Lister().(*test.NodeListerMock)
+ assert.Assert(t, ok, "unable to get mock node lister")
+
+ dispatcher.Start()
+ defer dispatcher.UnregisterAllEventHandlers()
+ defer dispatcher.Stop()
+
+ actionsByNode := make(map[string][]si.NodeInfo_ActionFromRM)
+ apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest)
error {
+ for _, node := range request.Nodes {
+ actionsByNode[node.NodeID] =
append(actionsByNode[node.NodeID], node.Action)
+ if node.Action == si.NodeInfo_CREATE_DRAIN {
+ dispatcher.Dispatch(CachedSchedulerNodeEvent{
+ NodeID: node.NodeID,
+ Event: NodeAccepted,
+ })
+ }
+ }
+ return nil
+ })
+
+ schedulableNode := nodeForTest(nodeName1, "10G", "4")
+ cordonedNode := nodeForTest(nodeName2, "10G", "4")
+ cordonedNode.Spec.Unschedulable = true
+ nodeLister.AddNode(schedulableNode)
+ nodeLister.AddNode(cordonedNode)
+
+ err := context.InitializeState()
+ assert.NilError(t, err, "InitializeState failed")
+
+ assert.Assert(t, context.schedulerCache.GetNode(nodeName1) != nil,
"schedulable node was not cached")
+ assert.Assert(t, context.schedulerCache.GetNode(nodeName2) != nil,
"cordoned node was not cached")
+
+ cordonedActions := actionsByNode[nodeName2]
+ assert.Equal(t, len(cordonedActions), 1)
+ assert.Equal(t, cordonedActions[0], si.NodeInfo_CREATE_DRAIN)
+}
+
func TestPodAdoption(t *testing.T) {
ctx, apiProvider := initContextAndAPIProviderForTest()
dispatcher.Start()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]