This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new bdf109b5 [YUNIKORN-2530] Remove readiness logic from core (#832)
bdf109b5 is described below
commit bdf109b5432c3339d14601c666720b21251d148b
Author: Craig Condit <[email protected]>
AuthorDate: Tue Apr 2 16:22:27 2024 -0500
[YUNIKORN-2530] Remove readiness logic from core (#832)
Closes: #832
Signed-off-by: Craig Condit <[email protected]>
---
go.mod | 2 +-
go.sum | 4 +--
pkg/metrics/scheduler.go | 9 ------
pkg/metrics/scheduler_test.go | 11 -------
pkg/scheduler/context.go | 25 --------------
pkg/scheduler/context_test.go | 54 ++++++-------------------------
pkg/scheduler/objects/node.go | 26 ++-------------
pkg/scheduler/objects/node_events.go | 16 ---------
pkg/scheduler/objects/node_events_test.go | 23 -------------
pkg/scheduler/objects/node_test.go | 39 ----------------------
pkg/scheduler/utilities_test.go | 7 ++--
11 files changed, 16 insertions(+), 200 deletions(-)
diff --git a/go.mod b/go.mod
index 8c031ae4..df8d2854 100644
--- a/go.mod
+++ b/go.mod
@@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core
go 1.21
require (
- github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240325175743-cc087bb5a191
+ github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240402211642-e7421a4261fd
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
diff --git a/go.sum b/go.sum
index d0acd76a..f38221b5 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,5 @@
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240325175743-cc087bb5a191
h1:EfDQhLaxdM6LxPK6BTKG+fAzj67sLMi576DWnDjnNgU=
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240325175743-cc087bb5a191/go.mod
h1:0f4l3ManMROX60xU7GbhejCEYYyMksH275oY2xIVkbM=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240402211642-e7421a4261fd
h1:uNOijHkCotZLUZ+A85NSftEJGfP50Opf7ms6Daj6pco=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240402211642-e7421a4261fd/go.mod
h1:0f4l3ManMROX60xU7GbhejCEYYyMksH275oY2xIVkbM=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0
h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
diff --git a/pkg/metrics/scheduler.go b/pkg/metrics/scheduler.go
index ad31c755..18be4ecc 100644
--- a/pkg/metrics/scheduler.go
+++ b/pkg/metrics/scheduler.go
@@ -39,7 +39,6 @@ const (
NodeActive = "active"
NodeDraining = "draining"
NodeDecommissioned = "decommissioned"
- NodeUnhealthy = "unhealthy"
)
var resourceUsageRangeBuckets = []string{
@@ -377,11 +376,3 @@ func (m *SchedulerMetrics) GetDrainingNodes() (int, error)
{
func (m *SchedulerMetrics) IncTotalDecommissionedNodes() {
m.node.WithLabelValues(NodeDecommissioned).Inc()
}
-
-func (m *SchedulerMetrics) IncUnhealthyNodes() {
- m.node.WithLabelValues(NodeUnhealthy).Inc()
-}
-
-func (m *SchedulerMetrics) DecUnhealthyNodes() {
- m.node.WithLabelValues(NodeUnhealthy).Dec()
-}
diff --git a/pkg/metrics/scheduler_test.go b/pkg/metrics/scheduler_test.go
index 5384b31b..b63e4831 100644
--- a/pkg/metrics/scheduler_test.go
+++ b/pkg/metrics/scheduler_test.go
@@ -51,17 +51,6 @@ func TestTotalDecommissionedNodes(t *testing.T) {
verifyMetric(t, 1, "decommissioned")
}
-func TestUnhealthyNodes(t *testing.T) {
- sm = getSchedulerMetrics(t)
- defer unregisterMetrics()
-
- sm.IncUnhealthyNodes()
- verifyMetric(t, 1, "unhealthy")
-
- sm.DecUnhealthyNodes()
- verifyMetric(t, 0, "unhealthy")
-}
-
func TestTryPreemptionLatency(t *testing.T) {
sm = getSchedulerMetrics(t)
defer unregisterMetrics()
diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go
index 1a71b600..4bc21bb6 100644
--- a/pkg/scheduler/context.go
+++ b/pkg/scheduler/context.go
@@ -21,7 +21,6 @@ package scheduler
import (
"fmt"
"math"
- "strconv"
"sync"
"time"
@@ -620,9 +619,6 @@ func (cc *ClusterContext) addNode(nodeInfo *si.NodeInfo,
schedulable bool) error
return wrapped
}
- if !sn.IsReady() {
- metrics.GetSchedulerMetrics().IncUnhealthyNodes()
- }
if !sn.IsSchedulable() {
metrics.GetSchedulerMetrics().IncDrainingNodes()
}
@@ -665,24 +661,6 @@ func (cc *ClusterContext) updateNode(nodeInfo
*si.NodeInfo) {
switch nodeInfo.Action {
case si.NodeInfo_UPDATE:
- var newReadyStatus bool
- var err error
- if newReadyStatus, err =
strconv.ParseBool(nodeInfo.Attributes[siCommon.NodeReadyAttribute]); err != nil
{
- log.Log(log.SchedContext).Error("Could not parse ready
attribute, assuming true", zap.Any("attributes", nodeInfo.Attributes))
- newReadyStatus = true
- }
-
- if node.IsReady() && !newReadyStatus {
- log.Log(log.SchedContext).Info("Node has become
unhealthy", zap.String("Node ID", node.NodeID))
- metrics.GetSchedulerMetrics().IncUnhealthyNodes()
- node.SetReady(newReadyStatus)
- }
- if !node.IsReady() && newReadyStatus {
- log.Log(log.SchedContext).Info("Node has become
healthy", zap.String("Node ID", node.NodeID))
- metrics.GetSchedulerMetrics().DecUnhealthyNodes()
- node.SetReady(newReadyStatus)
- }
-
if sr := nodeInfo.SchedulableResource; sr != nil {
partition.updatePartitionResource(node.SetCapacity(resources.NewResourceFromProto(sr)))
}
@@ -705,9 +683,6 @@ func (cc *ClusterContext) updateNode(nodeInfo *si.NodeInfo)
{
if !node.IsSchedulable() {
metrics.GetSchedulerMetrics().DecDrainingNodes()
}
- if !node.IsReady() {
- metrics.GetSchedulerMetrics().DecUnhealthyNodes()
- }
metrics.GetSchedulerMetrics().IncTotalDecommissionedNodes()
// set the state to not schedulable then tell the partition to
clean up
node.SetSchedulable(false)
diff --git a/pkg/scheduler/context_test.go b/pkg/scheduler/context_test.go
index 9023525b..075ab563 100644
--- a/pkg/scheduler/context_test.go
+++ b/pkg/scheduler/context_test.go
@@ -19,7 +19,6 @@
package scheduler
import (
- "strconv"
"strings"
"testing"
@@ -268,48 +267,15 @@ func TestContext_ProcessNode(t *testing.T) {
}
}
-func TestContextUpdateNodeMetrics(t *testing.T) {
- metrics.GetSchedulerMetrics().Reset()
- context := createTestContext(t, pName)
-
- n := getNodeInfoForAddingNode(true)
-
- err := context.addNode(n, true)
- assert.NilError(t, err, "unexpected error returned from addNode")
- verifyMetrics(t, 1, "active")
-
- // Update: node became unhealthy
- n = getNodeInfoForUpdatingNode(si.NodeInfo_UPDATE, false)
- context.updateNode(n)
- verifyMetrics(t, 1, "unhealthy")
-
- // Update: node became healthy
- n = getNodeInfoForUpdatingNode(si.NodeInfo_UPDATE, true)
- context.updateNode(n)
- verifyMetrics(t, 0, "unhealthy")
-}
-
-func TestContextAddUnhealthyNodeMetrics(t *testing.T) {
- metrics.GetSchedulerMetrics().Reset()
- context := createTestContext(t, pName)
-
- n := getNodeInfoForAddingNode(false)
-
- err := context.addNode(n, true)
- assert.NilError(t, err, "unexpected error returned from addNode")
- verifyMetrics(t, 1, "active")
- verifyMetrics(t, 1, "unhealthy")
-}
-
func TestContextDrainingNodeMetrics(t *testing.T) {
metrics.GetSchedulerMetrics().Reset()
context := createTestContext(t, pName)
- n := getNodeInfoForAddingNode(true)
+ n := getNodeInfoForAddingNode()
err := context.addNode(n, true)
assert.NilError(t, err, "unexpected error returned from addNode")
- n = getNodeInfoForUpdatingNode(si.NodeInfo_DRAIN_NODE, true)
+ n = getNodeInfoForUpdatingNode(si.NodeInfo_DRAIN_NODE)
context.updateNode(n)
verifyMetrics(t, 1, "draining")
}
@@ -318,39 +284,37 @@ func TestContextDrainingNodeBackToSchedulableMetrics(t
*testing.T) {
metrics.GetSchedulerMetrics().Reset()
context := createTestContext(t, pName)
- n := getNodeInfoForAddingNode(true)
+ n := getNodeInfoForAddingNode()
err := context.addNode(n, true)
assert.NilError(t, err, "unexpected error returned from addNode")
- n = getNodeInfoForUpdatingNode(si.NodeInfo_DRAIN_NODE, true)
+ n = getNodeInfoForUpdatingNode(si.NodeInfo_DRAIN_NODE)
context.updateNode(n)
- n = getNodeInfoForUpdatingNode(si.NodeInfo_DRAIN_TO_SCHEDULABLE, true)
+ n = getNodeInfoForUpdatingNode(si.NodeInfo_DRAIN_TO_SCHEDULABLE)
context.updateNode(n)
verifyMetrics(t, 0, "draining")
}
-func getNodeInfoForAddingNode(ready bool) *si.NodeInfo {
+func getNodeInfoForAddingNode() *si.NodeInfo {
n := &si.NodeInfo{
NodeID: "test-1",
Action: si.NodeInfo_UNKNOWN_ACTION_FROM_RM,
SchedulableResource: &si.Resource{Resources:
map[string]*si.Quantity{"first": {Value: 10}}},
Attributes: map[string]string{
- siCommon.NodePartition: pName,
- siCommon.NodeReadyAttribute: strconv.FormatBool(ready),
+ siCommon.NodePartition: pName,
},
}
return n
}
-func getNodeInfoForUpdatingNode(action si.NodeInfo_ActionFromRM, ready bool)
*si.NodeInfo {
+func getNodeInfoForUpdatingNode(action si.NodeInfo_ActionFromRM) *si.NodeInfo {
n := &si.NodeInfo{
NodeID: "test-1",
Action: action,
Attributes: map[string]string{
- siCommon.NodePartition: pName,
- siCommon.NodeReadyAttribute: strconv.FormatBool(ready),
+ siCommon.NodePartition: pName,
},
}
diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index edce1281..b295d3df 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -20,7 +20,6 @@ package objects
import (
"fmt"
- "strconv"
"strings"
"sync"
@@ -54,7 +53,6 @@ type Node struct {
availableResource *resources.Resource
allocations map[string]*Allocation
schedulable bool
- ready bool
reservations map[string]*reservation // a map of reservations
listeners []NodeListener // a list of node listeners
@@ -64,18 +62,11 @@ type Node struct {
}
func NewNode(proto *si.NodeInfo) *Node {
- // safe guard against panic
+ // safeguard against panic
if proto == nil {
return nil
}
- var ready bool
- var err error
- if ready, err =
strconv.ParseBool(proto.Attributes[common.NodeReadyAttribute]); err != nil {
- log.Log(log.SchedNode).Debug("Could not parse ready flag,
assuming true",
- zap.String("nodeID", proto.NodeID))
- ready = true
- }
sn := &Node{
NodeID: proto.NodeID,
reservations: make(map[string]*reservation),
@@ -85,10 +76,10 @@ func NewNode(proto *si.NodeInfo) *Node {
allocations: make(map[string]*Allocation),
schedulable: true,
listeners: make([]NodeListener, 0),
- ready: ready,
}
sn.nodeEvents = newNodeEvents(sn, events.GetEventSystem())
// initialise available resources
+ var err error
sn.availableResource, err =
resources.SubErrorNegative(sn.totalResource, sn.occupiedResource)
if err != nil {
log.Log(log.SchedNode).Error("New node created with no
available resources",
@@ -595,19 +586,6 @@ func (sn *Node) getListeners() []NodeListener {
return list
}
-func (sn *Node) IsReady() bool {
- sn.RLock()
- defer sn.RUnlock()
- return sn.ready
-}
-
-func (sn *Node) SetReady(ready bool) {
- sn.Lock()
- defer sn.Unlock()
- sn.ready = ready
- sn.nodeEvents.sendNodeReadyChangedEvent(sn.ready)
-}
-
func (sn *Node) SendNodeAddedEvent() {
sn.nodeEvents.sendNodeAddedEvent()
}
diff --git a/pkg/scheduler/objects/node_events.go
b/pkg/scheduler/objects/node_events.go
index 7e942609..26b040c6 100644
--- a/pkg/scheduler/objects/node_events.go
+++ b/pkg/scheduler/objects/node_events.go
@@ -66,22 +66,6 @@ func (n *nodeEvents) sendAllocationRemovedEvent(allocID
string, res *resources.R
n.eventSystem.AddEvent(event)
}
-func (n *nodeEvents) sendNodeReadyChangedEvent(ready bool) {
- if !n.eventSystem.IsEventTrackingEnabled() {
- return
- }
- reason := ""
- if ready {
- reason = "ready: true"
- } else {
- reason = "ready: false"
- }
-
- event := events.CreateNodeEventRecord(n.node.NodeID, reason,
common.Empty, si.EventRecord_SET,
- si.EventRecord_NODE_READY, nil)
- n.eventSystem.AddEvent(event)
-}
-
func (n *nodeEvents) sendNodeSchedulableChangedEvent(ready bool) {
if !n.eventSystem.IsEventTrackingEnabled() {
return
diff --git a/pkg/scheduler/objects/node_events_test.go
b/pkg/scheduler/objects/node_events_test.go
index aadd8375..07f1d6a3 100644
--- a/pkg/scheduler/objects/node_events_test.go
+++ b/pkg/scheduler/objects/node_events_test.go
@@ -124,29 +124,6 @@ func TestSendAllocationRemovedEvent(t *testing.T) {
assert.DeepEqual(t, protoRes, resource)
}
-func TestSendNodeReadyChangedEvent(t *testing.T) {
- node := &Node{
- NodeID: nodeID1,
- }
- eventSystem := mock.NewEventSystemDisabled()
- ne := newNodeEvents(node, eventSystem)
- ne.sendNodeReadyChangedEvent(true)
- assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
-
- eventSystem = mock.NewEventSystem()
- ne = newNodeEvents(node, eventSystem)
- ne.sendNodeReadyChangedEvent(true)
- assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
- assert.Equal(t, "ready: true", eventSystem.Events[0].Message)
- assert.Equal(t, nodeID1, eventSystem.Events[0].ObjectID)
-
- eventSystem.Reset()
- ne.sendNodeReadyChangedEvent(false)
- assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
- assert.Equal(t, "ready: false", eventSystem.Events[0].Message)
- assert.Equal(t, nodeID1, eventSystem.Events[0].ObjectID)
-}
-
func TestSendOccupiedResourceChangedEvent(t *testing.T) {
resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
node := &Node{
diff --git a/pkg/scheduler/objects/node_test.go
b/pkg/scheduler/objects/node_test.go
index f7f2dd67..097cfbe0 100644
--- a/pkg/scheduler/objects/node_test.go
+++ b/pkg/scheduler/objects/node_test.go
@@ -656,37 +656,6 @@ func TestAddRemoveListener(t *testing.T) {
assert.Equal(t, 1, tl.updateCount, "listener should not have fired
again")
}
-func TestReadyAttribute(t *testing.T) {
- // missing
- proto := newProto(testNode, nil, nil, nil)
- node := NewNode(proto)
- assert.Equal(t, true, node.ready, "Node should be in ready state")
-
- // exists, but faulty
- attr := map[string]string{
- "readyX": "true",
- }
- proto = newProto(testNode, nil, nil, attr)
- node = NewNode(proto)
- assert.Equal(t, true, node.ready, "Node should be in ready state")
-
- // exists, true
- attr = map[string]string{
- "ready": "true",
- }
- proto = newProto(testNode, nil, nil, attr)
- node = NewNode(proto)
- assert.Equal(t, true, node.ready, "Node should be in ready state")
-
- // exists, false
- attr = map[string]string{
- "ready": "false",
- }
- proto = newProto(testNode, nil, nil, attr)
- node = NewNode(proto)
- assert.Equal(t, false, node.ready, "Node should not be in ready state")
-}
-
func TestNodeEvents(t *testing.T) {
mockEvents := evtMock.NewEventSystem()
total :=
resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 100,
"memory": 100})
@@ -710,14 +679,6 @@ func TestNodeEvents(t *testing.T) {
assert.Equal(t, si.EventRecord_NODE, event.Type)
assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
- mockEvents.Reset()
- node.SetReady(false)
- assert.Equal(t, 1, len(mockEvents.Events))
- event = mockEvents.Events[0]
- assert.Equal(t, si.EventRecord_NODE, event.Type)
- assert.Equal(t, si.EventRecord_SET, event.EventChangeType)
- assert.Equal(t, si.EventRecord_NODE_READY, event.EventChangeDetail)
-
mockEvents.Reset()
node.AddAllocation(&Allocation{
allocatedResource:
resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 1, "memory":
1}),
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index e7f15359..e37a9380 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -31,7 +31,6 @@ import (
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
- "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -544,10 +543,8 @@ func newAllocationAskPreempt(allocKey, appID string, prio
int32, res *resources.
}
func newNodeWithResources(nodeID string, max, occupied *resources.Resource)
*objects.Node {
proto := &si.NodeInfo{
- NodeID: nodeID,
- Attributes: map[string]string{
- common.NodeReadyAttribute: "true",
- },
+ NodeID: nodeID,
+ Attributes: map[string]string{},
SchedulableResource: max.ToProto(),
OccupiedResource: occupied.ToProto(),
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]