This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 57ab4d59 [YUNIKORN-2530] Remove node readiness logic from shim (#812)
57ab4d59 is described below
commit 57ab4d59d43ab357e4a677b8973c70d5b151c7e4
Author: Craig Condit <[email protected]>
AuthorDate: Tue Apr 2 18:00:10 2024 -0500
[YUNIKORN-2530] Remove node readiness logic from shim (#812)
Closes: #812
Signed-off-by: Craig Condit <[email protected]>
---
go.mod | 4 ++--
go.sum | 8 ++++----
pkg/cache/context.go | 39 +++++++--------------------------------
pkg/cache/context_test.go | 17 +++++------------
pkg/common/si_helper.go | 14 +++++---------
pkg/common/si_helper_test.go | 13 ++++---------
pkg/shim/scheduler_mock_test.go | 2 +-
7 files changed, 28 insertions(+), 69 deletions(-)
diff --git a/go.mod b/go.mod
index 040e248e..9498a248 100644
--- a/go.mod
+++ b/go.mod
@@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim
go 1.21
require (
- github.com/apache/yunikorn-core v0.0.0-20240320142832-09e5d741b67c
- github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240325175743-cc087bb5a191
+ github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c
+ github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240402211642-e7421a4261fd
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/looplab/fsm v1.0.1
diff --git a/go.sum b/go.sum
index ae34d7e7..6c2622fb 100644
--- a/go.sum
+++ b/go.sum
@@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1
h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod
h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df
h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
github.com/antlr/antlr4/runtime/Go/antlr/v4
v4.0.0-20230305170008-8188dc5388df/go.mod
h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
-github.com/apache/yunikorn-core v0.0.0-20240320142832-09e5d741b67c
h1:xabRyfVmPle5gAnppiY3jeoS5t9+lVgbfoOlS24E5e4=
-github.com/apache/yunikorn-core v0.0.0-20240320142832-09e5d741b67c/go.mod
h1:eLQ4wT62D3L05Fu+0OHh7hXMga6EXU1aMb4aOBwzEEM=
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240325175743-cc087bb5a191
h1:EfDQhLaxdM6LxPK6BTKG+fAzj67sLMi576DWnDjnNgU=
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240325175743-cc087bb5a191/go.mod
h1:0f4l3ManMROX60xU7GbhejCEYYyMksH275oY2xIVkbM=
+github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c
h1:WoO71GKblZEKBOuWviJMD5f1W6tdbJp5Pv/utd4zYqw=
+github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c/go.mod
h1:RZCBSMe6UZ04b45ZzwvuhhkY2f7f8ZW7ERvVMUM6dy4=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240402211642-e7421a4261fd
h1:uNOijHkCotZLUZ+A85NSftEJGfP50Opf7ms6Daj6pco=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240402211642-e7421a4261fd/go.mod
h1:0f4l3ManMROX60xU7GbhejCEYYyMksH275oY2xIVkbM=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod
h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a
h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 46a23001..1ad4adf3 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -189,27 +189,16 @@ func (ctx *Context) updateNodeInternal(node *v1.Node,
register bool) {
// existing node
prevCapacity := common.GetNodeResource(&prevNode.Status)
newCapacity := common.GetNodeResource(&node.Status)
- prevReady := hasReadyCondition(prevNode)
- newReady := hasReadyCondition(node)
if !common.Equals(prevCapacity, newCapacity) {
// update capacity
if capacity, occupied, ok :=
ctx.schedulerCache.UpdateCapacity(node.Name, newCapacity); ok {
- if err := ctx.updateNodeResources(node,
capacity, occupied, newReady); err != nil {
+ if err := ctx.updateNodeResources(node,
capacity, occupied); err != nil {
log.Log(log.ShimContext).Warn("Failed
to update node capacity", zap.Error(err))
}
} else {
log.Log(log.ShimContext).Warn("Failed to update
cached node capacity", zap.String("nodeName", node.Name))
}
- } else if newReady != prevReady {
- // update readiness
- if capacity, occupied, ok :=
ctx.schedulerCache.SnapshotResources(node.Name); ok {
- if err := ctx.updateNodeResources(node,
capacity, occupied, newReady); err != nil {
- log.Log(log.ShimContext).Warn("Failed
to update node readiness", zap.Error(err))
- }
- } else {
- log.Log(log.ShimContext).Warn("Failed to
snapshot cached node capacity", zap.String("nodeName", node.Name))
- }
}
}
}
@@ -481,7 +470,7 @@ func (ctx *Context) updateNodeOccupiedResources(nodeName
string, namespace strin
return
}
if node, capacity, occupied, ok :=
ctx.schedulerCache.UpdateOccupiedResource(nodeName, namespace, podName,
resource, opt); ok {
- if err := ctx.updateNodeResources(node, capacity, occupied,
hasReadyCondition(node)); err != nil {
+ if err := ctx.updateNodeResources(node, capacity, occupied);
err != nil {
log.Log(log.ShimContext).Warn("scheduler rejected
update to node occupied resources", zap.Error(err))
}
} else {
@@ -1527,7 +1516,6 @@ func (ctx *Context) registerNodes(nodes []*v1.Node)
([]*v1.Node, error) {
Attributes: map[string]string{
constants.DefaultNodeAttributeHostNameKey:
node.Name,
constants.DefaultNodeAttributeRackNameKey:
constants.DefaultRackName,
- siCommon.NodeReadyAttribute:
strconv.FormatBool(hasReadyCondition(node)),
},
SchedulableResource:
common.GetNodeResource(&nodeStatus),
OccupiedResource:
common.NewResourceBuilder().Build(),
@@ -1598,8 +1586,8 @@ func (ctx *Context) decommissionNode(node *v1.Node) error
{
return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}
-func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource,
occupied *si.Resource, ready bool) error {
- request := common.CreateUpdateRequestForUpdatedNode(node.Name,
capacity, occupied, ready)
+func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource,
occupied *si.Resource) error {
+ request := common.CreateUpdateRequestForUpdatedNode(node.Name,
capacity, occupied)
return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}
@@ -1614,11 +1602,9 @@ func (ctx *Context) enableNodes(nodes []*v1.Node) error {
for _, node := range nodes {
log.Log(log.ShimContext).Info("Enabling node",
zap.String("name", node.Name))
nodesToEnable = append(nodesToEnable, &si.NodeInfo{
- NodeID: node.Name,
- Action: si.NodeInfo_DRAIN_TO_SCHEDULABLE,
- Attributes: map[string]string{
- siCommon.NodeReadyAttribute:
strconv.FormatBool(hasReadyCondition(node)),
- },
+ NodeID: node.Name,
+ Action: si.NodeInfo_DRAIN_TO_SCHEDULABLE,
+ Attributes: map[string]string{},
})
}
@@ -1757,14 +1743,3 @@ func convertToNode(obj interface{}) (*v1.Node, error) {
}
return nil, fmt.Errorf("cannot convert to *v1.Node: %v", obj)
}
-
-func hasReadyCondition(node *v1.Node) bool {
- if node != nil {
- for _, condition := range node.Status.Conditions {
- if condition.Type == v1.NodeReady && condition.Status
== v1.ConditionTrue {
- return true
- }
- }
- }
- return false
-}
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index 4c279e36..1a081765 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -1325,7 +1325,7 @@ func TestFilteredEventsNotPublished(t *testing.T) {
err := waitForNodeAcceptedEvent(recorder)
assert.NilError(t, err, "node accepted event was not sent")
- eventRecords := make([]*si.EventRecord, 7)
+ eventRecords := make([]*si.EventRecord, 6)
eventRecords[0] = &si.EventRecord{
Type: si.EventRecord_NODE,
EventChangeType: si.EventRecord_SET,
@@ -1334,41 +1334,34 @@ func TestFilteredEventsNotPublished(t *testing.T) {
Message: "",
}
eventRecords[1] = &si.EventRecord{
- Type: si.EventRecord_NODE,
- EventChangeType: si.EventRecord_SET,
- EventChangeDetail: si.EventRecord_NODE_READY,
- ObjectID: "host0001",
- Message: "",
- }
- eventRecords[2] = &si.EventRecord{
Type: si.EventRecord_NODE,
EventChangeType: si.EventRecord_SET,
EventChangeDetail: si.EventRecord_NODE_OCCUPIED,
ObjectID: "host0001",
Message: "",
}
- eventRecords[3] = &si.EventRecord{
+ eventRecords[2] = &si.EventRecord{
Type: si.EventRecord_NODE,
EventChangeType: si.EventRecord_SET,
EventChangeDetail: si.EventRecord_NODE_CAPACITY,
ObjectID: "host0001",
Message: "",
}
- eventRecords[4] = &si.EventRecord{
+ eventRecords[3] = &si.EventRecord{
Type: si.EventRecord_NODE,
EventChangeType: si.EventRecord_ADD,
EventChangeDetail: si.EventRecord_NODE_ALLOC,
ObjectID: "host0001",
Message: "",
}
- eventRecords[5] = &si.EventRecord{
+ eventRecords[4] = &si.EventRecord{
Type: si.EventRecord_APP,
EventChangeType: si.EventRecord_ADD,
EventChangeDetail: si.EventRecord_APP_RUNNING,
ObjectID: "app-1",
Message: "",
}
- eventRecords[6] = &si.EventRecord{
+ eventRecords[5] = &si.EventRecord{
Type: si.EventRecord_QUEUE,
EventChangeType: si.EventRecord_ADD,
EventChangeDetail: si.EventRecord_DETAILS_NONE,
diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go
index 4434a82b..336002df 100644
--- a/pkg/common/si_helper.go
+++ b/pkg/common/si_helper.go
@@ -173,7 +173,7 @@ func CreateReleaseAllocationRequestForTask(appID, taskID,
allocationID, partitio
// CreateUpdateRequestForNewNode builds a NodeRequest for new node addition
and restoring existing node
func CreateUpdateRequestForNewNode(nodeID string, nodeLabels
map[string]string, capacity *si.Resource, occupied *si.Resource,
- existingAllocations []*si.Allocation, ready bool) *si.NodeRequest {
+ existingAllocations []*si.Allocation) *si.NodeRequest {
// Use node's name as the NodeID, this is because when bind pod to node,
// name of node is required but uid is optional.
nodeInfo := &si.NodeInfo{
@@ -183,7 +183,6 @@ func CreateUpdateRequestForNewNode(nodeID string,
nodeLabels map[string]string,
Attributes: map[string]string{
constants.DefaultNodeAttributeHostNameKey: nodeID,
constants.DefaultNodeAttributeRackNameKey:
constants.DefaultRackName,
- common.NodeReadyAttribute:
strconv.FormatBool(ready),
},
ExistingAllocations: existingAllocations,
Action: si.NodeInfo_CREATE,
@@ -205,14 +204,11 @@ func CreateUpdateRequestForNewNode(nodeID string,
nodeLabels map[string]string,
}
}
-// CreateUpdateRequestForUpdatedNode builds a NodeRequest for any node updates
like capacity,
-// ready status flag etc
-func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource,
occupied *si.Resource, ready bool) *si.NodeRequest {
+// CreateUpdateRequestForUpdatedNode builds a NodeRequest for capacity and
occupied resource updates
+func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource,
occupied *si.Resource) *si.NodeRequest {
nodeInfo := &si.NodeInfo{
- NodeID: nodeID,
- Attributes: map[string]string{
- common.NodeReadyAttribute: strconv.FormatBool(ready),
- },
+ NodeID: nodeID,
+ Attributes: map[string]string{},
SchedulableResource: capacity,
OccupiedResource: occupied,
Action: si.NodeInfo_UPDATE,
diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go
index f3a5fcf6..1f63373a 100644
--- a/pkg/common/si_helper_test.go
+++ b/pkg/common/si_helper_test.go
@@ -18,7 +18,6 @@ limitations under the License.
package common
import (
- "strconv"
"testing"
"gotest.tools/v3/assert"
@@ -219,21 +218,19 @@ func TestCreateUpdateRequestForNewNode(t *testing.T) {
capacity := NewResourceBuilder().AddResource(common.Memory,
200).AddResource(common.CPU, 2).Build()
occupied := NewResourceBuilder().AddResource(common.Memory,
50).AddResource(common.CPU, 1).Build()
var existingAllocations []*si.Allocation
- ready := true
nodeLabels := map[string]string{
"label1": "key1",
"label2": "key2",
"node.kubernetes.io/instance-type": "HighMem",
}
- request := CreateUpdateRequestForNewNode(nodeID, nodeLabels, capacity,
occupied, existingAllocations, ready)
+ request := CreateUpdateRequestForNewNode(nodeID, nodeLabels, capacity,
occupied, existingAllocations)
assert.Equal(t, len(request.Nodes), 1)
assert.Equal(t, request.Nodes[0].NodeID, nodeID)
assert.Equal(t, request.Nodes[0].SchedulableResource, capacity)
assert.Equal(t, request.Nodes[0].OccupiedResource, occupied)
- assert.Equal(t, len(request.Nodes[0].Attributes), 7)
+ assert.Equal(t, len(request.Nodes[0].Attributes), 6)
assert.Equal(t,
request.Nodes[0].Attributes[constants.DefaultNodeAttributeHostNameKey], nodeID)
assert.Equal(t,
request.Nodes[0].Attributes[constants.DefaultNodeAttributeRackNameKey],
constants.DefaultRackName)
- assert.Equal(t, request.Nodes[0].Attributes[common.NodeReadyAttribute],
strconv.FormatBool(ready))
// Make sure include nodeLabel
assert.Equal(t, request.Nodes[0].Attributes["label1"], "key1")
@@ -247,14 +244,12 @@ func TestCreateUpdateRequestForNewNode(t *testing.T) {
func TestCreateUpdateRequestForUpdatedNode(t *testing.T) {
capacity := NewResourceBuilder().AddResource(common.Memory,
200).AddResource(common.CPU, 2).Build()
occupied := NewResourceBuilder().AddResource(common.Memory,
50).AddResource(common.CPU, 1).Build()
- ready := true
- request := CreateUpdateRequestForUpdatedNode(nodeID, capacity,
occupied, ready)
+ request := CreateUpdateRequestForUpdatedNode(nodeID, capacity, occupied)
assert.Equal(t, len(request.Nodes), 1)
assert.Equal(t, request.Nodes[0].NodeID, nodeID)
assert.Equal(t, request.Nodes[0].SchedulableResource, capacity)
assert.Equal(t, request.Nodes[0].OccupiedResource, occupied)
- assert.Equal(t, len(request.Nodes[0].Attributes), 1)
- assert.Equal(t, request.Nodes[0].Attributes[common.NodeReadyAttribute],
strconv.FormatBool(ready))
+ assert.Equal(t, len(request.Nodes[0].Attributes), 0)
}
func TestCreateUpdateRequestForDeleteNode(t *testing.T) {
diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go
index 9b13ee7c..6b6b9af6 100644
--- a/pkg/shim/scheduler_mock_test.go
+++ b/pkg/shim/scheduler_mock_test.go
@@ -124,7 +124,7 @@ func (fc *MockScheduler) addNode(nodeName string,
nodeLabels map[string]string,
AddResource(siCommon.CPU, cpu).
AddResource("pods", pods).
Build()
- request := common.CreateUpdateRequestForNewNode(nodeName, nodeLabels,
nodeResource, nil, nil, true)
+ request := common.CreateUpdateRequestForNewNode(nodeName, nodeLabels,
nodeResource, nil, nil)
fmt.Printf("report new nodes to scheduler, request: %s",
request.String())
return fc.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]