This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 57ab4d59 [YUNIKORN-2530] Remove node readiness logic from shim (#812)
57ab4d59 is described below

commit 57ab4d59d43ab357e4a677b8973c70d5b151c7e4
Author: Craig Condit <[email protected]>
AuthorDate: Tue Apr 2 18:00:10 2024 -0500

    [YUNIKORN-2530] Remove node readiness logic from shim (#812)
    
    Closes: #812
    
    Signed-off-by: Craig Condit <[email protected]>
---
 go.mod                          |  4 ++--
 go.sum                          |  8 ++++----
 pkg/cache/context.go            | 39 +++++++--------------------------------
 pkg/cache/context_test.go       | 17 +++++------------
 pkg/common/si_helper.go         | 14 +++++---------
 pkg/common/si_helper_test.go    | 13 ++++---------
 pkg/shim/scheduler_mock_test.go |  2 +-
 7 files changed, 28 insertions(+), 69 deletions(-)

diff --git a/go.mod b/go.mod
index 040e248e..9498a248 100644
--- a/go.mod
+++ b/go.mod
@@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim
 go 1.21
 
 require (
-       github.com/apache/yunikorn-core v0.0.0-20240320142832-09e5d741b67c
-       github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240325175743-cc087bb5a191
+       github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c
+       github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240402211642-e7421a4261fd
        github.com/google/go-cmp v0.6.0
        github.com/google/uuid v1.6.0
        github.com/looplab/fsm v1.0.1
diff --git a/go.sum b/go.sum
index ae34d7e7..6c2622fb 100644
--- a/go.sum
+++ b/go.sum
@@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1 
h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
 github.com/NYTimes/gziphandler v1.1.1/go.mod 
h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
 github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df 
h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
 github.com/antlr/antlr4/runtime/Go/antlr/v4 
v4.0.0-20230305170008-8188dc5388df/go.mod 
h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
-github.com/apache/yunikorn-core v0.0.0-20240320142832-09e5d741b67c 
h1:xabRyfVmPle5gAnppiY3jeoS5t9+lVgbfoOlS24E5e4=
-github.com/apache/yunikorn-core v0.0.0-20240320142832-09e5d741b67c/go.mod 
h1:eLQ4wT62D3L05Fu+0OHh7hXMga6EXU1aMb4aOBwzEEM=
-github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240325175743-cc087bb5a191 
h1:EfDQhLaxdM6LxPK6BTKG+fAzj67sLMi576DWnDjnNgU=
-github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240325175743-cc087bb5a191/go.mod 
h1:0f4l3ManMROX60xU7GbhejCEYYyMksH275oY2xIVkbM=
+github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c 
h1:WoO71GKblZEKBOuWviJMD5f1W6tdbJp5Pv/utd4zYqw=
+github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c/go.mod 
h1:RZCBSMe6UZ04b45ZzwvuhhkY2f7f8ZW7ERvVMUM6dy4=
+github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240402211642-e7421a4261fd 
h1:uNOijHkCotZLUZ+A85NSftEJGfP50Opf7ms6Daj6pco=
+github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240402211642-e7421a4261fd/go.mod 
h1:0f4l3ManMROX60xU7GbhejCEYYyMksH275oY2xIVkbM=
 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 
h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod 
h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
 github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a 
h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 46a23001..1ad4adf3 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -189,27 +189,16 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, 
register bool) {
                // existing node
                prevCapacity := common.GetNodeResource(&prevNode.Status)
                newCapacity := common.GetNodeResource(&node.Status)
-               prevReady := hasReadyCondition(prevNode)
-               newReady := hasReadyCondition(node)
 
                if !common.Equals(prevCapacity, newCapacity) {
                        // update capacity
                        if capacity, occupied, ok := 
ctx.schedulerCache.UpdateCapacity(node.Name, newCapacity); ok {
-                               if err := ctx.updateNodeResources(node, 
capacity, occupied, newReady); err != nil {
+                               if err := ctx.updateNodeResources(node, 
capacity, occupied); err != nil {
                                        log.Log(log.ShimContext).Warn("Failed 
to update node capacity", zap.Error(err))
                                }
                        } else {
                                log.Log(log.ShimContext).Warn("Failed to update 
cached node capacity", zap.String("nodeName", node.Name))
                        }
-               } else if newReady != prevReady {
-                       // update readiness
-                       if capacity, occupied, ok := 
ctx.schedulerCache.SnapshotResources(node.Name); ok {
-                               if err := ctx.updateNodeResources(node, 
capacity, occupied, newReady); err != nil {
-                                       log.Log(log.ShimContext).Warn("Failed 
to update node readiness", zap.Error(err))
-                               }
-                       } else {
-                               log.Log(log.ShimContext).Warn("Failed to 
snapshot cached node capacity", zap.String("nodeName", node.Name))
-                       }
                }
        }
 }
@@ -481,7 +470,7 @@ func (ctx *Context) updateNodeOccupiedResources(nodeName 
string, namespace strin
                return
        }
        if node, capacity, occupied, ok := 
ctx.schedulerCache.UpdateOccupiedResource(nodeName, namespace, podName, 
resource, opt); ok {
-               if err := ctx.updateNodeResources(node, capacity, occupied, 
hasReadyCondition(node)); err != nil {
+               if err := ctx.updateNodeResources(node, capacity, occupied); 
err != nil {
                        log.Log(log.ShimContext).Warn("scheduler rejected 
update to node occupied resources", zap.Error(err))
                }
        } else {
@@ -1527,7 +1516,6 @@ func (ctx *Context) registerNodes(nodes []*v1.Node) 
([]*v1.Node, error) {
                        Attributes: map[string]string{
                                constants.DefaultNodeAttributeHostNameKey: 
node.Name,
                                constants.DefaultNodeAttributeRackNameKey: 
constants.DefaultRackName,
-                               siCommon.NodeReadyAttribute:               
strconv.FormatBool(hasReadyCondition(node)),
                        },
                        SchedulableResource: 
common.GetNodeResource(&nodeStatus),
                        OccupiedResource:    
common.NewResourceBuilder().Build(),
@@ -1598,8 +1586,8 @@ func (ctx *Context) decommissionNode(node *v1.Node) error 
{
        return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
 }
 
-func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, 
occupied *si.Resource, ready bool) error {
-       request := common.CreateUpdateRequestForUpdatedNode(node.Name, 
capacity, occupied, ready)
+func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, 
occupied *si.Resource) error {
+       request := common.CreateUpdateRequestForUpdatedNode(node.Name, 
capacity, occupied)
        return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
 }
 
@@ -1614,11 +1602,9 @@ func (ctx *Context) enableNodes(nodes []*v1.Node) error {
        for _, node := range nodes {
                log.Log(log.ShimContext).Info("Enabling node", 
zap.String("name", node.Name))
                nodesToEnable = append(nodesToEnable, &si.NodeInfo{
-                       NodeID: node.Name,
-                       Action: si.NodeInfo_DRAIN_TO_SCHEDULABLE,
-                       Attributes: map[string]string{
-                               siCommon.NodeReadyAttribute: 
strconv.FormatBool(hasReadyCondition(node)),
-                       },
+                       NodeID:     node.Name,
+                       Action:     si.NodeInfo_DRAIN_TO_SCHEDULABLE,
+                       Attributes: map[string]string{},
                })
        }
 
@@ -1757,14 +1743,3 @@ func convertToNode(obj interface{}) (*v1.Node, error) {
        }
        return nil, fmt.Errorf("cannot convert to *v1.Node: %v", obj)
 }
-
-func hasReadyCondition(node *v1.Node) bool {
-       if node != nil {
-               for _, condition := range node.Status.Conditions {
-                       if condition.Type == v1.NodeReady && condition.Status 
== v1.ConditionTrue {
-                               return true
-                       }
-               }
-       }
-       return false
-}
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index 4c279e36..1a081765 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -1325,7 +1325,7 @@ func TestFilteredEventsNotPublished(t *testing.T) {
        err := waitForNodeAcceptedEvent(recorder)
        assert.NilError(t, err, "node accepted event was not sent")
 
-       eventRecords := make([]*si.EventRecord, 7)
+       eventRecords := make([]*si.EventRecord, 6)
        eventRecords[0] = &si.EventRecord{
                Type:              si.EventRecord_NODE,
                EventChangeType:   si.EventRecord_SET,
@@ -1334,41 +1334,34 @@ func TestFilteredEventsNotPublished(t *testing.T) {
                Message:           "",
        }
        eventRecords[1] = &si.EventRecord{
-               Type:              si.EventRecord_NODE,
-               EventChangeType:   si.EventRecord_SET,
-               EventChangeDetail: si.EventRecord_NODE_READY,
-               ObjectID:          "host0001",
-               Message:           "",
-       }
-       eventRecords[2] = &si.EventRecord{
                Type:              si.EventRecord_NODE,
                EventChangeType:   si.EventRecord_SET,
                EventChangeDetail: si.EventRecord_NODE_OCCUPIED,
                ObjectID:          "host0001",
                Message:           "",
        }
-       eventRecords[3] = &si.EventRecord{
+       eventRecords[2] = &si.EventRecord{
                Type:              si.EventRecord_NODE,
                EventChangeType:   si.EventRecord_SET,
                EventChangeDetail: si.EventRecord_NODE_CAPACITY,
                ObjectID:          "host0001",
                Message:           "",
        }
-       eventRecords[4] = &si.EventRecord{
+       eventRecords[3] = &si.EventRecord{
                Type:              si.EventRecord_NODE,
                EventChangeType:   si.EventRecord_ADD,
                EventChangeDetail: si.EventRecord_NODE_ALLOC,
                ObjectID:          "host0001",
                Message:           "",
        }
-       eventRecords[5] = &si.EventRecord{
+       eventRecords[4] = &si.EventRecord{
                Type:              si.EventRecord_APP,
                EventChangeType:   si.EventRecord_ADD,
                EventChangeDetail: si.EventRecord_APP_RUNNING,
                ObjectID:          "app-1",
                Message:           "",
        }
-       eventRecords[6] = &si.EventRecord{
+       eventRecords[5] = &si.EventRecord{
                Type:              si.EventRecord_QUEUE,
                EventChangeType:   si.EventRecord_ADD,
                EventChangeDetail: si.EventRecord_DETAILS_NONE,
diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go
index 4434a82b..336002df 100644
--- a/pkg/common/si_helper.go
+++ b/pkg/common/si_helper.go
@@ -173,7 +173,7 @@ func CreateReleaseAllocationRequestForTask(appID, taskID, 
allocationID, partitio
 
 // CreateUpdateRequestForNewNode builds a NodeRequest for new node addition 
and restoring existing node
 func CreateUpdateRequestForNewNode(nodeID string, nodeLabels 
map[string]string, capacity *si.Resource, occupied *si.Resource,
-       existingAllocations []*si.Allocation, ready bool) *si.NodeRequest {
+       existingAllocations []*si.Allocation) *si.NodeRequest {
        // Use node's name as the NodeID, this is because when bind pod to node,
        // name of node is required but uid is optional.
        nodeInfo := &si.NodeInfo{
@@ -183,7 +183,6 @@ func CreateUpdateRequestForNewNode(nodeID string, 
nodeLabels map[string]string,
                Attributes: map[string]string{
                        constants.DefaultNodeAttributeHostNameKey: nodeID,
                        constants.DefaultNodeAttributeRackNameKey: 
constants.DefaultRackName,
-                       common.NodeReadyAttribute:                 
strconv.FormatBool(ready),
                },
                ExistingAllocations: existingAllocations,
                Action:              si.NodeInfo_CREATE,
@@ -205,14 +204,11 @@ func CreateUpdateRequestForNewNode(nodeID string, 
nodeLabels map[string]string,
        }
 }
 
-// CreateUpdateRequestForUpdatedNode builds a NodeRequest for any node updates 
like capacity,
-// ready status flag etc
-func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource, 
occupied *si.Resource, ready bool) *si.NodeRequest {
+// CreateUpdateRequestForUpdatedNode builds a NodeRequest for capacity and 
occupied resource updates
+func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource, 
occupied *si.Resource) *si.NodeRequest {
        nodeInfo := &si.NodeInfo{
-               NodeID: nodeID,
-               Attributes: map[string]string{
-                       common.NodeReadyAttribute: strconv.FormatBool(ready),
-               },
+               NodeID:              nodeID,
+               Attributes:          map[string]string{},
                SchedulableResource: capacity,
                OccupiedResource:    occupied,
                Action:              si.NodeInfo_UPDATE,
diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go
index f3a5fcf6..1f63373a 100644
--- a/pkg/common/si_helper_test.go
+++ b/pkg/common/si_helper_test.go
@@ -18,7 +18,6 @@ limitations under the License.
 package common
 
 import (
-       "strconv"
        "testing"
 
        "gotest.tools/v3/assert"
@@ -219,21 +218,19 @@ func TestCreateUpdateRequestForNewNode(t *testing.T) {
        capacity := NewResourceBuilder().AddResource(common.Memory, 
200).AddResource(common.CPU, 2).Build()
        occupied := NewResourceBuilder().AddResource(common.Memory, 
50).AddResource(common.CPU, 1).Build()
        var existingAllocations []*si.Allocation
-       ready := true
        nodeLabels := map[string]string{
                "label1":                           "key1",
                "label2":                           "key2",
                "node.kubernetes.io/instance-type": "HighMem",
        }
-       request := CreateUpdateRequestForNewNode(nodeID, nodeLabels, capacity, 
occupied, existingAllocations, ready)
+       request := CreateUpdateRequestForNewNode(nodeID, nodeLabels, capacity, 
occupied, existingAllocations)
        assert.Equal(t, len(request.Nodes), 1)
        assert.Equal(t, request.Nodes[0].NodeID, nodeID)
        assert.Equal(t, request.Nodes[0].SchedulableResource, capacity)
        assert.Equal(t, request.Nodes[0].OccupiedResource, occupied)
-       assert.Equal(t, len(request.Nodes[0].Attributes), 7)
+       assert.Equal(t, len(request.Nodes[0].Attributes), 6)
        assert.Equal(t, 
request.Nodes[0].Attributes[constants.DefaultNodeAttributeHostNameKey], nodeID)
        assert.Equal(t, 
request.Nodes[0].Attributes[constants.DefaultNodeAttributeRackNameKey], 
constants.DefaultRackName)
-       assert.Equal(t, request.Nodes[0].Attributes[common.NodeReadyAttribute], 
strconv.FormatBool(ready))
 
        // Make sure include nodeLabel
        assert.Equal(t, request.Nodes[0].Attributes["label1"], "key1")
@@ -247,14 +244,12 @@ func TestCreateUpdateRequestForNewNode(t *testing.T) {
 func TestCreateUpdateRequestForUpdatedNode(t *testing.T) {
        capacity := NewResourceBuilder().AddResource(common.Memory, 
200).AddResource(common.CPU, 2).Build()
        occupied := NewResourceBuilder().AddResource(common.Memory, 
50).AddResource(common.CPU, 1).Build()
-       ready := true
-       request := CreateUpdateRequestForUpdatedNode(nodeID, capacity, 
occupied, ready)
+       request := CreateUpdateRequestForUpdatedNode(nodeID, capacity, occupied)
        assert.Equal(t, len(request.Nodes), 1)
        assert.Equal(t, request.Nodes[0].NodeID, nodeID)
        assert.Equal(t, request.Nodes[0].SchedulableResource, capacity)
        assert.Equal(t, request.Nodes[0].OccupiedResource, occupied)
-       assert.Equal(t, len(request.Nodes[0].Attributes), 1)
-       assert.Equal(t, request.Nodes[0].Attributes[common.NodeReadyAttribute], 
strconv.FormatBool(ready))
+       assert.Equal(t, len(request.Nodes[0].Attributes), 0)
 }
 
 func TestCreateUpdateRequestForDeleteNode(t *testing.T) {
diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go
index 9b13ee7c..6b6b9af6 100644
--- a/pkg/shim/scheduler_mock_test.go
+++ b/pkg/shim/scheduler_mock_test.go
@@ -124,7 +124,7 @@ func (fc *MockScheduler) addNode(nodeName string, 
nodeLabels map[string]string,
                AddResource(siCommon.CPU, cpu).
                AddResource("pods", pods).
                Build()
-       request := common.CreateUpdateRequestForNewNode(nodeName, nodeLabels, 
nodeResource, nil, nil, true)
+       request := common.CreateUpdateRequestForNewNode(nodeName, nodeLabels, 
nodeResource, nil, nil)
        fmt.Printf("report new nodes to scheduler, request: %s", 
request.String())
        return fc.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to