This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new b02e15ea [YUNIKORN-2976] Handle multiple require node allocations per
node (#1001)
b02e15ea is described below
commit b02e15ea98429f34a3eb623f28f2911ea5a8e98d
Author: Wilfred Spiegelenburg <[email protected]>
AuthorDate: Fri Dec 13 11:22:18 2024 -0600
[YUNIKORN-2976] Handle multiple require node allocations per node (#1001)
If an allocation requires a specific node the scheduler should not
consider any other node. We should allow multiple allocations that
require the same node to reserve the node at the same time. A required
node allocation must be placed on the node before anything else.
If other non required node reservations are made on a node remove the
existing reservations that do not require that node. Make sure that the
releases are tracked correctly in the partition.
After the repeat count removal reservations can be simplified:
- track reservations using the allocation key
- removed the composite key setup
- removed collection listener call on reserve or unreserve of a node
Closes: #1001
Signed-off-by: Craig Condit <[email protected]>
---
pkg/common/errors.go | 27 +-
pkg/scheduler/objects/allocation.go | 20 +-
pkg/scheduler/objects/allocation_result.go | 9 +-
pkg/scheduler/objects/allocation_test.go | 1 -
pkg/scheduler/objects/application.go | 452 +++++++++++++-------------
pkg/scheduler/objects/application_test.go | 356 ++++++++++++++++----
pkg/scheduler/objects/node.go | 123 +++----
pkg/scheduler/objects/node_collection_test.go | 163 ++++++----
pkg/scheduler/objects/node_test.go | 163 ++++++----
pkg/scheduler/objects/preemption.go | 2 +-
pkg/scheduler/objects/reservation.go | 71 ++--
pkg/scheduler/objects/reservation_test.go | 91 +++---
pkg/scheduler/objects/utilities_test.go | 1 +
pkg/scheduler/partition.go | 32 +-
pkg/scheduler/partition_test.go | 92 ++++--
15 files changed, 929 insertions(+), 674 deletions(-)
diff --git a/pkg/common/errors.go b/pkg/common/errors.go
index f73c84b4..7e28426b 100644
--- a/pkg/common/errors.go
+++ b/pkg/common/errors.go
@@ -20,11 +20,24 @@ package common
import "errors"
-// InvalidQueueName returned when queue name is invalid
-var InvalidQueueName = errors.New("invalid queue name, max 64 characters
consisting of alphanumeric characters and '-', '_', '#', '@', '/', ':' allowed")
+var (
+ // InvalidQueueName returned when queue name is invalid
+ InvalidQueueName = errors.New("invalid queue name, max 64 characters
consisting of alphanumeric characters and '-', '_', '#', '@', '/', ':' allowed")
+ // ErrorReservingAlloc returned when an ask that is allocated tries to
reserve a node.
+ ErrorReservingAlloc = errors.New("ask already allocated, no reservation
allowed")
+ // ErrorDuplicateReserve returned when the same reservation already
exists on the application
+ ErrorDuplicateReserve = errors.New("reservation already exists")
+ // ErrorNodeAlreadyReserved returned when the node is already reserved,
failing the reservation
+ ErrorNodeAlreadyReserved = errors.New("node is already reserved")
+ // ErrorNodeNotFitReserve returned when the allocation does not fit on
an empty node, failing the reservation
+ ErrorNodeNotFitReserve = errors.New("reservation does not fit on node")
+)
-const PreemptionPreconditionsFailed = "Preemption preconditions failed"
-const PreemptionDoesNotGuarantee = "Preemption queue guarantees check failed"
-const PreemptionShortfall = "Preemption helped but short of resources"
-const PreemptionDoesNotHelp = "Preemption does not help"
-const NoVictimForRequiredNode = "No fit on required node, preemption does not
help"
+// Constant messages for AllocationLog entries
+const (
+ PreemptionPreconditionsFailed = "Preemption preconditions failed"
+ PreemptionDoesNotGuarantee = "Preemption queue guarantees check
failed"
+ PreemptionShortfall = "Preemption helped but short of
resources"
+ PreemptionDoesNotHelp = "Preemption does not help"
+ NoVictimForRequiredNode = "No fit on required node, preemption
does not help"
+)
diff --git a/pkg/scheduler/objects/allocation.go
b/pkg/scheduler/objects/allocation.go
index a4701cd6..0ef05222 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -48,7 +48,6 @@ type Allocation struct {
allowPreemptOther bool
originator bool
tags map[string]string
- resKeyWithoutNode string // the reservation key without node
foreign bool
preemptable bool
@@ -57,9 +56,8 @@ type Allocation struct {
allocLog map[string]*AllocationLogEntry
preemptionTriggered bool
preemptCheckTime time.Time
- schedulingAttempted bool // whether scheduler core has
tried to schedule this allocation
- scaleUpTriggered bool // whether this aloocation has
triggered autoscaling or not
- resKeyPerNode map[string]string // reservation key for a given
node
+ schedulingAttempted bool // whether scheduler core has tried to
schedule this allocation
+ scaleUpTriggered bool // whether this allocation has triggered
autoscaling or not
allocatedResource *resources.Resource
askEvents *schedEvt.AskEvents
userQuotaCheckFailed bool
@@ -145,8 +143,6 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
allowPreemptOther:
alloc.PreemptionPolicy.GetAllowPreemptOther(),
originator: alloc.Originator,
allocLog: make(map[string]*AllocationLogEntry),
- resKeyPerNode: make(map[string]string),
- resKeyWithoutNode:
reservationKeyWithoutNode(alloc.ApplicationID, alloc.AllocationKey),
askEvents:
schedEvt.NewAskEvents(events.GetEventSystem()),
allocated: allocated,
nodeID: nodeID,
@@ -554,18 +550,6 @@ func (a *Allocation) HasTriggeredScaleUp() bool {
return a.scaleUpTriggered
}
-func (a *Allocation) setReservationKeyForNode(node, resKey string) {
- a.Lock()
- defer a.Unlock()
- a.resKeyPerNode[node] = resKey
-}
-
-func (a *Allocation) getReservationKeyForNode(node string) string {
- a.RLock()
- defer a.RUnlock()
- return a.resKeyPerNode[node]
-}
-
func (a *Allocation) setHeadroomCheckFailed(headroom *resources.Resource,
queue string) {
a.Lock()
defer a.Unlock()
diff --git a/pkg/scheduler/objects/allocation_result.go
b/pkg/scheduler/objects/allocation_result.go
index bcd56a86..99af95b7 100644
--- a/pkg/scheduler/objects/allocation_result.go
+++ b/pkg/scheduler/objects/allocation_result.go
@@ -36,10 +36,11 @@ func (art AllocationResultType) String() string {
}
type AllocationResult struct {
- ResultType AllocationResultType
- Request *Allocation
- NodeID string
- ReservedNodeID string
+ ResultType AllocationResultType
+ Request *Allocation
+ NodeID string
+ ReservedNodeID string
+ CancelledReservations int
}
func (ar *AllocationResult) String() string {
diff --git a/pkg/scheduler/objects/allocation_test.go
b/pkg/scheduler/objects/allocation_test.go
index 62bf316a..6917acac 100644
--- a/pkg/scheduler/objects/allocation_test.go
+++ b/pkg/scheduler/objects/allocation_test.go
@@ -51,7 +51,6 @@ func TestNewAsk(t *testing.T) {
askStr := ask.String()
expected := "allocationKey ask-1, applicationID app-1, Resource
map[first:10], Allocated false"
assert.Equal(t, askStr, expected, "Strings should have been equal")
- assert.Equal(t, "app-1|ask-1", ask.resKeyWithoutNode)
//nolint:staticcheck
}
func TestAskAllocateDeallocate(t *testing.T) {
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 60ad55ca..adff3817 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -452,41 +452,44 @@ func (sa *Application) timeoutPlaceholderProcessing() {
func (sa *Application) GetReservations() []string {
sa.RLock()
defer sa.RUnlock()
- keys := make([]string, 0)
+ keys := make([]string, len(sa.reservations))
+ var i int
for key := range sa.reservations {
- keys = append(keys, key)
+ keys[i] = key
+ i++
}
return keys
}
-// Return the allocation ask for the key, nil if not found
+// GetAllocationAsk returns the allocation alloc for the key, nil if not found
func (sa *Application) GetAllocationAsk(allocationKey string) *Allocation {
sa.RLock()
defer sa.RUnlock()
return sa.requests[allocationKey]
}
-// Return the allocated resources for this application
+// GetAllocatedResource returns the currently allocated resources for this
application
func (sa *Application) GetAllocatedResource() *resources.Resource {
sa.RLock()
defer sa.RUnlock()
return sa.allocatedResource.Clone()
}
+// GetMaxAllocatedResource returns the peak of the allocated resources for
this application
func (sa *Application) GetMaxAllocatedResource() *resources.Resource {
sa.RLock()
defer sa.RUnlock()
return sa.maxAllocatedResource.Clone()
}
-// Return the allocated placeholder resources for this application
+// GetPlaceholderResource returns the currently allocated placeholder
resources for this application
func (sa *Application) GetPlaceholderResource() *resources.Resource {
sa.RLock()
defer sa.RUnlock()
return sa.allocatedPlaceholder.Clone()
}
-// Return the total placeholder ask for this application
+// GetPlaceholderAsk returns the total placeholder resource request for this
application
// Is only set on app creation and used when app is added to a queue
func (sa *Application) GetPlaceholderAsk() *resources.Resource {
sa.RLock()
@@ -501,8 +504,8 @@ func (sa *Application) GetPendingResource()
*resources.Resource {
return sa.pending
}
-// Remove one or more allocation asks from this application.
-// This also removes any reservations that are linked to the ask.
+// RemoveAllocationAsk removes one or more allocation asks from this
application.
+// This also removes any reservations that are linked to the allocations.
// The return value is the number of reservations released
func (sa *Application) RemoveAllocationAsk(allocKey string) int {
sa.Lock()
@@ -517,23 +520,16 @@ func (sa *Application) removeAsksInternal(allocKey
string, detail si.EventRecord
return 0
}
var deltaPendingResource *resources.Resource = nil
- // when allocation key not specified, cleanup all allocation ask
+ // when allocation key is not specified, cleanup all allocations
var toRelease int
if allocKey == "" {
// cleanup all reservations
- for key, reserve := range sa.reservations {
- releases, err := sa.unReserveInternal(reserve.node,
reserve.ask)
- if err != nil {
- log.Log(log.SchedApplication).Warn("Removal of
reservation failed while removing all allocation asks",
- zap.String("appID", sa.ApplicationID),
- zap.String("reservationKey", key),
- zap.Error(err))
- continue
- }
- // clean up the queue reservation (one at a time)
- sa.queue.UnReserve(sa.ApplicationID, releases)
+ for _, reserve := range sa.reservations {
+ releases := sa.unReserveInternal(reserve)
toRelease += releases
}
+ // clean up the queue reservation
+ sa.queue.UnReserve(sa.ApplicationID, toRelease)
// Cleanup total pending resource
deltaPendingResource = sa.pending
sa.pending = resources.NewResource()
@@ -546,16 +542,8 @@ func (sa *Application) removeAsksInternal(allocKey string,
detail si.EventRecord
sa.queue.UpdateApplicationPriority(sa.ApplicationID,
sa.askMaxPriority)
} else {
// cleanup the reservation for this allocation
- for _, key := range sa.GetAskReservations(allocKey) {
- reserve := sa.reservations[key]
- releases, err := sa.unReserveInternal(reserve.node,
reserve.ask)
- if err != nil {
- log.Log(log.SchedApplication).Warn("Removal of
reservation failed while removing allocation ask",
- zap.String("appID", sa.ApplicationID),
- zap.String("reservationKey", key),
- zap.Error(err))
- continue
- }
+ if reserve, ok := sa.reservations[allocKey]; ok {
+ releases := sa.unReserveInternal(reserve)
// clean up the queue reservation
sa.queue.UnReserve(sa.ApplicationID, releases)
toRelease += releases
@@ -576,7 +564,7 @@ func (sa *Application) removeAsksInternal(allocKey string,
detail si.EventRecord
}
// clean up the queue pending resources
sa.queue.decPendingResource(deltaPendingResource)
- // Check if we need to change state based on the ask removal:
+ // Check if we need to change state based on the removal:
// 1) if pending is zero (no more asks left)
// 2) if confirmed allocations is zero (no real tasks running)
// Change the state to completing.
@@ -808,152 +796,145 @@ func (sa *Application) HasReserved() bool {
return len(sa.reservations) > 0
}
-// IsReservedOnNode returns true if and only if the node has been reserved by
the application
-// An empty nodeID is never reserved.
-func (sa *Application) IsReservedOnNode(nodeID string) bool {
- if nodeID == "" {
- return false
- }
+// NodeReservedForAsk returns the nodeID that has been reserved by the
application for the ask
+// An empty nodeID means the ask is not reserved. An empty askKey is never
reserved.
+func (sa *Application) NodeReservedForAsk(askKey string) string {
sa.RLock()
defer sa.RUnlock()
- // make sure matches only for the whole nodeID
- separator := nodeID + "|"
- for key := range sa.reservations {
- if strings.HasPrefix(key, separator) {
- return true
- }
+ if reserved, ok := sa.reservations[askKey]; ok {
+ return reserved.nodeID
}
- return false
+ return ""
}
-// Reserve the application for this node and ask combination.
+// Reserve the application for this node and alloc combination.
// If the reservation fails the function returns false, if the reservation is
made it returns true.
-// If the node and ask combination was already reserved for the application
this is a noop and returns true.
+// If the node and alloc combination was already reserved for the application
this is a noop and returns true.
func (sa *Application) Reserve(node *Node, ask *Allocation) error {
+ if node == nil || ask == nil {
+ return fmt.Errorf("reservation creation failed node or alloc
are nil on appID %s", sa.ApplicationID)
+ }
sa.Lock()
defer sa.Unlock()
return sa.reserveInternal(node, ask)
}
-// Unlocked version for Reserve that really does the work.
+// reserveInternal is the unlocked version for Reserve that really does the
work.
// Must only be called while holding the application lock.
func (sa *Application) reserveInternal(node *Node, ask *Allocation) error {
+ allocKey := ask.GetAllocationKey()
+ if sa.requests[allocKey] == nil {
+ log.Log(log.SchedApplication).Debug("alloc is not registered to
this app",
+ zap.String("app", sa.ApplicationID),
+ zap.String("allocKey", allocKey))
+ return fmt.Errorf("reservation creation failed alloc %s not
found on appID %s", allocKey, sa.ApplicationID)
+ }
// create the reservation (includes nil checks)
nodeReservation := newReservation(node, sa, ask, true)
if nodeReservation == nil {
log.Log(log.SchedApplication).Debug("reservation creation
failed unexpectedly",
zap.String("app", sa.ApplicationID),
- zap.Any("node", node),
- zap.Any("ask", ask))
- return fmt.Errorf("reservation creation failed node or ask are
nil on appID %s", sa.ApplicationID)
- }
- allocKey := ask.GetAllocationKey()
- if sa.requests[allocKey] == nil {
- log.Log(log.SchedApplication).Debug("ask is not registered to
this app",
- zap.String("app", sa.ApplicationID),
- zap.String("allocKey", allocKey))
- return fmt.Errorf("reservation creation failed ask %s not found
on appID %s", allocKey, sa.ApplicationID)
+ zap.Stringer("node", node),
+ zap.Stringer("alloc", ask))
+ return fmt.Errorf("reservation creation failed node or alloc
are nil on appID %s", sa.ApplicationID)
}
- if !sa.canAskReserve(ask) {
- if ask.IsAllocated() {
- return fmt.Errorf("ask is already allocated")
- } else {
- return fmt.Errorf("ask is already reserved")
- }
+ // the alloc should not have reserved a node yet: do not allow multiple
nodes to be reserved
+ if err := sa.canAllocationReserve(ask); err != nil {
+ return err
}
// check if we can reserve the node before reserving on the app
if err := node.Reserve(sa, ask); err != nil {
return err
}
- sa.reservations[nodeReservation.getKey()] = nodeReservation
+ sa.reservations[allocKey] = nodeReservation
log.Log(log.SchedApplication).Info("reservation added successfully",
zap.String("app", sa.ApplicationID),
zap.String("node", node.NodeID),
- zap.String("ask", allocKey))
+ zap.String("alloc", allocKey))
return nil
}
-// UnReserve the application for this node and ask combination.
-// This first removes the reservation from the node.
+// UnReserve the application for this node and alloc combination.
// If the reservation does not exist it returns 0 for reservations removed, if
the reservation is removed it returns 1.
// The error is set if the reservation key cannot be removed from the app or
node.
-func (sa *Application) UnReserve(node *Node, ask *Allocation) (int, error) {
+func (sa *Application) UnReserve(node *Node, ask *Allocation) int {
+ log.Log(log.SchedApplication).Info("unreserving allocation from
application",
+ zap.String("appID", sa.ApplicationID),
+ zap.Stringer("node", node),
+ zap.Stringer("alloc", ask))
+ if node == nil || ask == nil {
+ return 0
+ }
sa.Lock()
defer sa.Unlock()
- return sa.unReserveInternal(node, ask)
+ reserve, ok := sa.reservations[ask.allocationKey]
+ if !ok {
+ log.Log(log.SchedApplication).Debug("reservation not found on
application",
+ zap.String("appID", sa.ApplicationID),
+ zap.String("allocationKey", ask.allocationKey))
+ return 0
+ }
+ if reserve.nodeID != node.NodeID {
+ log.Log(log.SchedApplication).Warn("UnReserve: provided info
not consistent with reservation",
+ zap.String("appID", sa.ApplicationID),
+ zap.String("node", reserve.nodeID),
+ zap.String("alloc", reserve.allocKey))
+ }
+ return sa.unReserveInternal(reserve)
}
// Unlocked version for UnReserve that really does the work.
+// This is idempotent and will not fail
// Must only be called while holding the application lock.
-func (sa *Application) unReserveInternal(node *Node, ask *Allocation) (int,
error) {
- resKey := reservationKey(node, nil, ask)
- if resKey == "" {
- log.Log(log.SchedApplication).Debug("unreserve reservation key
create failed unexpectedly",
- zap.String("appID", sa.ApplicationID),
- zap.Stringer("node", node),
- zap.Stringer("ask", ask))
- return 0, fmt.Errorf("reservation key failed node or ask are
nil for appID %s", sa.ApplicationID)
+func (sa *Application) unReserveInternal(reserve *reservation) int {
+ // this should not happen
+ if reserve == nil {
+ return 0
}
// unReserve the node before removing from the app
- var num int
- var err error
- if num, err = node.unReserve(sa, ask); err != nil {
- return 0, err
- }
+ num := reserve.node.unReserve(reserve.alloc)
// if the unreserve worked on the node check the app
- if _, found := sa.reservations[resKey]; found {
+ if _, found := sa.reservations[reserve.allocKey]; found {
// worked on the node means either found or not but no error,
log difference here
if num == 0 {
log.Log(log.SchedApplication).Info("reservation not
found while removing from node, app has reservation",
zap.String("appID", sa.ApplicationID),
- zap.String("nodeID", node.NodeID),
- zap.String("ask", ask.GetAllocationKey()))
+ zap.String("nodeID", reserve.nodeID),
+ zap.String("alloc", reserve.allocKey))
}
- delete(sa.reservations, resKey)
- log.Log(log.SchedApplication).Info("reservation removed
successfully", zap.String("node", node.NodeID),
- zap.String("app", ask.GetApplicationID()),
zap.String("ask", ask.GetAllocationKey()))
- return 1, nil
+ delete(sa.reservations, reserve.allocKey)
+ log.Log(log.SchedApplication).Info("reservation removed
successfully",
+ zap.String("appID", sa.ApplicationID),
+ zap.String("node", reserve.nodeID),
+ zap.String("alloc", reserve.allocKey))
+ return 1
}
// reservation was not found
log.Log(log.SchedApplication).Info("reservation not found while
removing from app",
zap.String("appID", sa.ApplicationID),
- zap.String("nodeID", node.NodeID),
- zap.String("ask", ask.GetAllocationKey()),
+ zap.String("node", reserve.nodeID),
+ zap.String("alloc", reserve.allocKey),
zap.Int("nodeReservationsRemoved", num))
- return 0, nil
+ return 0
}
-// Return the allocation reservations on any node.
-// The returned array is 0 or more keys into the reservations map.
+// canAllocationReserve Check if the allocation has already been reserved. An
alloc can never reserve more than one node.
// No locking must be called while holding the lock
-func (sa *Application) GetAskReservations(allocKey string) []string {
- reservationKeys := make([]string, 0)
- if allocKey == "" {
- return reservationKeys
- }
- for key := range sa.reservations {
- if strings.HasSuffix(key, allocKey) {
- reservationKeys = append(reservationKeys, key)
- }
- }
- return reservationKeys
-}
-
-// Check if the allocation has already been reserved. An ask can never reserve
more than one node.
-// No locking must be called while holding the lock
-func (sa *Application) canAskReserve(ask *Allocation) bool {
- allocKey := ask.GetAllocationKey()
- if ask.IsAllocated() {
- log.Log(log.SchedApplication).Debug("ask already allocated, no
reservation allowed",
- zap.String("askKey", allocKey))
- return false
- }
- if len(sa.GetAskReservations(allocKey)) > 0 {
+func (sa *Application) canAllocationReserve(alloc *Allocation) error {
+ allocKey := alloc.GetAllocationKey()
+ if alloc.IsAllocated() {
+ log.Log(log.SchedApplication).Debug("allocation is marked as
allocated, no reservation allowed",
+ zap.String("allocationKey", allocKey))
+ return common.ErrorReservingAlloc
+ }
+ reserved := sa.reservations[allocKey]
+ if reserved != nil {
log.Log(log.SchedApplication).Debug("reservation already
exists",
- zap.String("askKey", allocKey))
- return false
+ zap.String("allocKey", allocKey),
+ zap.String("nodeID", reserved.nodeID))
+ return common.ErrorDuplicateReserve
}
- return true
+ return nil
}
func (sa *Application) getOutstandingRequests(headRoom *resources.Resource,
userHeadRoom *resources.Resource, total *[]*Allocation) {
@@ -1045,42 +1026,13 @@ func (sa *Application) tryAllocate(headRoom
*resources.Resource, allowPreemption
requiredNode := request.GetRequiredNode()
// does request have any constraint to run on specific node?
if requiredNode != "" {
- // the iterator might not have the node we need as it
could be reserved, or we have not added it yet
- node := getNodeFn(requiredNode)
- if node == nil {
- getRateLimitedAppLog().Info("required node is
not found (could be transient)",
- zap.String("application ID",
sa.ApplicationID),
- zap.String("allocationKey",
request.GetAllocationKey()),
- zap.String("required node",
requiredNode))
- return nil
- }
- // Are there any non daemon set reservations on
specific required node?
- // Cancel those reservations to run daemon set pods
- reservations := node.GetReservations()
- if len(reservations) > 0 {
- if !sa.cancelReservations(reservations) {
- return nil
- }
- }
- // we don't care about predicate error messages here
- result, _ := sa.tryNode(node, request) //nolint:errcheck
+ result := sa.tryRequiredNode(request, getNodeFn)
if result != nil {
- // check if the node was reserved and we
allocated after a release
- if _, ok :=
sa.reservations[reservationKey(node, nil, request)]; ok {
-
log.Log(log.SchedApplication).Debug("allocation on required node after release",
- zap.String("appID",
sa.ApplicationID),
- zap.String("nodeID",
requiredNode),
- zap.String("allocationKey",
request.GetAllocationKey()))
- result.ResultType = AllocatedReserved
- return result
- }
- log.Log(log.SchedApplication).Debug("allocation
on required node is completed",
- zap.String("nodeID", node.NodeID),
- zap.String("allocationKey",
request.GetAllocationKey()),
- zap.Stringer("resultType",
result.ResultType))
return result
}
- return newReservedAllocationResult(node.NodeID, request)
+ // it did not allocate or reserve: should only happen
if the node is not registered yet
+ // just continue with the next request
+ continue
}
iterator := nodeIterator()
@@ -1108,50 +1060,86 @@ func (sa *Application) tryAllocate(headRoom
*resources.Resource, allowPreemption
return nil
}
-func (sa *Application) cancelReservations(reservations []*reservation) bool {
- for _, res := range reservations {
- // skip the node
- if res.ask.GetRequiredNode() != "" {
- log.Log(log.SchedApplication).Warn("reservation for ask
with required node already exists on the node",
- zap.String("required node", res.node.NodeID),
- zap.String("existing ask reservation key",
res.getKey()))
- return false
- }
+// tryRequiredNode tries to place the allocation in the specific node that is
set as the required node in the allocation.
+// The first time the allocation is seen it will try to make the allocation on
the node. If that does not work it will
+// always trigger the reservation of the node.
+func (sa *Application) tryRequiredNode(request *Allocation, getNodeFn
func(string) *Node) *AllocationResult {
+ requiredNode := request.GetRequiredNode()
+ allocationKey := request.GetAllocationKey()
+ // the iterator might not have the node we need as it could be
reserved, or we have not added it yet
+ node := getNodeFn(requiredNode)
+ if node == nil {
+ getRateLimitedAppLog().Info("required node is not found (could
be transient)",
+ zap.String("application ID", sa.ApplicationID),
+ zap.String("allocationKey", allocationKey),
+ zap.String("required node", requiredNode))
+ return nil
}
- var err error
+ // Are there any reservations on this node that does not specifically
require this node?
+ // Cancel any reservations to make room for the allocations that
require the node
var num int
+ reservations := node.GetReservations()
+ if len(reservations) > 0 {
+ num = sa.cancelReservations(reservations)
+ }
+ _, thisReserved := sa.reservations[allocationKey]
+ // now try the request, we don't care about predicate error messages
here
+ result, _ := sa.tryNode(node, request) //nolint:errcheck
+ if result != nil {
+ result.CancelledReservations = num
+ // check if the node was reserved and we allocated after a
release
+ if thisReserved {
+ log.Log(log.SchedApplication).Debug("allocation on
required node after release",
+ zap.String("appID", sa.ApplicationID),
+ zap.String("nodeID", requiredNode),
+ zap.String("allocationKey", allocationKey))
+ result.ResultType = AllocatedReserved
+ return result
+ }
+ log.Log(log.SchedApplication).Debug("allocation on required
node is completed",
+ zap.String("nodeID", node.NodeID),
+ zap.String("allocationKey", allocationKey),
+ zap.Stringer("resultType", result.ResultType))
+ return result
+ }
+ // if this ask was already reserved we should not have deleted any
reservations
+ // we also do not need to send back a reservation result and just
return nil to check the next ask
+ if thisReserved {
+ return nil
+ }
+ result = newReservedAllocationResult(node.NodeID, request)
+ result.CancelledReservations = num
+ return result
+}
+
+// cancelReservations will cancel all non required node reservations for a
node. The list of reservations passed in is
+// a copy of all reservations of a single node. This is called during the
required node allocation cycle only.
+// The returned int value is used to update the partition counter of active
reservations.
+func (sa *Application) cancelReservations(reservations []*reservation) int {
+ var released, num int
// un reserve all the apps that were reserved on the node
for _, res := range reservations {
- thisApp := res.app.ApplicationID == sa.ApplicationID
- if thisApp {
- num, err = sa.unReserveInternal(res.node, res.ask)
- } else {
- num, err = res.app.UnReserve(res.node, res.ask)
- }
- if err != nil {
- log.Log(log.SchedApplication).Warn("Unable to cancel
reservations on node",
- zap.String("victim application ID",
res.app.ApplicationID),
- zap.String("victim allocationKey",
res.getKey()),
- zap.String("required node", res.node.NodeID),
- zap.Int("reservations count", num),
- zap.String("application ID", sa.ApplicationID))
- return false
- } else {
- log.Log(log.SchedApplication).Info("Cancelled
reservation on required node",
- zap.String("affected application ID",
res.app.ApplicationID),
- zap.String("affected allocationKey",
res.getKey()),
- zap.String("required node", res.node.NodeID),
- zap.Int("reservations count", num),
- zap.String("application ID", sa.ApplicationID))
+ // cleanup if the reservation does not have this node as a
requirement
+ if res.alloc.requiredNode != "" {
+ continue
}
- // remove the reservation of the queue
+ thisApp := res.app.ApplicationID == sa.ApplicationID
if thisApp {
+ num = sa.unReserveInternal(res)
sa.queue.UnReserve(sa.ApplicationID, num)
} else {
+ num = res.app.UnReserve(res.node, res.alloc)
res.app.GetQueue().UnReserve(res.app.ApplicationID, num)
}
+ log.Log(log.SchedApplication).Info("Cancelled reservation for
required node allocation",
+ zap.String("triggered by appID", sa.ApplicationID),
+ zap.String("affected application ID", res.appID),
+ zap.String("affected allocationKey", res.allocKey),
+ zap.String("required node", res.nodeID),
+ zap.Int("reservations count", num))
+ released += num
}
- return true
+ return released
}
// tryPlaceholderAllocate tries to replace a placeholder that is allocated
with a real allocation
@@ -1238,14 +1226,15 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
// pick the first fit and try all nodes if that fails give up
var allocResult *AllocationResult
if phFit != nil && reqFit != nil {
+ resKey := reqFit.GetAllocationKey()
iterator.ForEachNode(func(node *Node) bool {
if !node.IsSchedulable() {
- log.Log(log.SchedApplication).Debug("skipping
node for placeholder ask as state is unschedulable",
- zap.String("allocationKey",
reqFit.GetAllocationKey()),
+ log.Log(log.SchedApplication).Debug("skipping
node for placeholder alloc as state is unschedulable",
+ zap.String("allocationKey", resKey),
zap.String("node", node.NodeID))
return true
}
- if
!node.preAllocateCheck(reqFit.GetAllocatedResource(), reservationKey(nil, sa,
reqFit)) {
+ if
!node.preAllocateCheck(reqFit.GetAllocatedResource(), resKey) {
return true
}
// skip the node if conditions can not be satisfied
@@ -1257,7 +1246,7 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
if !node.TryAddAllocation(reqFit) {
log.Log(log.SchedApplication).Debug("Node
update failed unexpectedly",
zap.String("applicationID",
sa.ApplicationID),
- zap.Stringer("ask", reqFit),
+ zap.Stringer("alloc", reqFit),
zap.Stringer("placeholder", phFit))
return false
}
@@ -1266,7 +1255,7 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
log.Log(log.SchedApplication).Warn("allocation
of ask failed unexpectedly",
zap.Error(err))
// unwind node allocation
- _ =
node.RemoveAllocation(reqFit.GetAllocationKey())
+ _ = node.RemoveAllocation(resKey)
return false
}
// allocation worked: on a non placeholder node update
resultType and return
@@ -1297,7 +1286,7 @@ func (sa *Application) checkHeadRooms(ask *Allocation,
userHeadroom *resources.R
return userHeadroom.FitInMaxUndef(ask.GetAllocatedResource()) &&
headRoom.FitInMaxUndef(ask.GetAllocatedResource())
}
-// Try a reserved allocation of an outstanding reservation
+// tryReservedAllocate tries allocating an outstanding reservation
func (sa *Application) tryReservedAllocate(headRoom *resources.Resource,
nodeIterator func() NodeIterator) *AllocationResult {
sa.Lock()
defer sa.Unlock()
@@ -1306,14 +1295,14 @@ func (sa *Application) tryReservedAllocate(headRoom
*resources.Resource, nodeIte
// process all outstanding reservations and pick the first one that fits
for _, reserve := range sa.reservations {
- ask := sa.requests[reserve.askKey]
+ ask := sa.requests[reserve.allocKey]
// sanity check and cleanup if needed
if ask == nil || ask.IsAllocated() {
var unreserveAsk *Allocation
// if the ask was not found we need to construct one to
unreserve
if ask == nil {
unreserveAsk = &Allocation{
- allocationKey: reserve.askKey,
+ allocationKey: reserve.allocKey,
applicationID: sa.ApplicationID,
allocLog:
make(map[string]*AllocationLogEntry),
}
@@ -1348,17 +1337,17 @@ func (sa *Application) tryReservedAllocate(headRoom
*resources.Resource, nodeIte
// try this on all other nodes
for _, reserve := range sa.reservations {
- // Other nodes cannot be tried if the ask has a required node
- ask := reserve.ask
- if ask.GetRequiredNode() != "" {
+ // Other nodes cannot be tried if a required node is requested
+ alloc := reserve.alloc
+ if alloc.GetRequiredNode() != "" {
continue
}
iterator := nodeIterator()
if iterator != nil {
- if !sa.checkHeadRooms(ask, userHeadroom, headRoom) {
+ if !sa.checkHeadRooms(alloc, userHeadroom, headRoom) {
continue
}
- result := sa.tryNodesNoReserve(ask, iterator,
reserve.nodeID)
+ result := sa.tryNodesNoReserve(alloc, iterator,
reserve.nodeID)
// have a candidate return it, including the node that
was reserved
if result != nil {
return result
@@ -1413,8 +1402,8 @@ func (sa *Application) tryRequiredNodePreemption(reserve
*reservation, ask *Allo
return false
}
-// Try all the nodes for a reserved request that have not been tried yet.
-// This should never result in a reservation as the ask is already reserved
+// tryNodesNoReserve tries all the nodes for a reserved request that have not
been tried yet.
+// This should never result in a reservation as the allocation is already
reserved
func (sa *Application) tryNodesNoReserve(ask *Allocation, iterator
NodeIterator, reservedNode string) *AllocationResult {
var allocResult *AllocationResult
iterator.ForEachNode(func(node *Node) bool {
@@ -1449,14 +1438,13 @@ func (sa *Application) tryNodesNoReserve(ask
*Allocation, iterator NodeIterator,
func (sa *Application) tryNodes(ask *Allocation, iterator NodeIterator)
*AllocationResult {
var nodeToReserve *Node
scoreReserved := math.Inf(1)
- // check if the ask is reserved or not
+ // check if the alloc is reserved or not
allocKey := ask.GetAllocationKey()
- reservedAsks := sa.GetAskReservations(allocKey)
- allowReserve := !ask.IsAllocated() && len(reservedAsks) == 0
+ reserved := sa.reservations[allocKey]
var allocResult *AllocationResult
var predicateErrors map[string]int
iterator.ForEachNode(func(node *Node) bool {
- // skip the node if the node is not valid for the ask
+ // skip the node if the node is not schedulable
if !node.IsSchedulable() {
log.Log(log.SchedApplication).Debug("skipping node for
ask as state is unschedulable",
zap.String("allocationKey", allocKey),
@@ -1478,28 +1466,23 @@ func (sa *Application) tryNodes(ask *Allocation,
iterator NodeIterator) *Allocat
// allocation worked so return
if result != nil {
metrics.GetSchedulerMetrics().ObserveTryNodeLatency(tryNodeStart)
- // check if the node was reserved for this ask: if it
is set the resultType and return
- // NOTE: this is a safeguard as reserved nodes should
never be part of the iterator
- // but we have no locking
- if _, ok := sa.reservations[reservationKey(node, nil,
ask)]; ok {
- log.Log(log.SchedApplication).Debug("allocate
found reserved ask during non reserved allocate",
- zap.String("appID", sa.ApplicationID),
- zap.String("nodeID", node.NodeID),
- zap.String("allocationKey", allocKey))
- result.ResultType = AllocatedReserved
- allocResult = result
- return false
- }
- // we could also have a different node reserved for
this ask if it has pick one of
- // the reserved nodes to unreserve (first one in the
list)
- if len(reservedAsks) > 0 {
- nodeID := strings.TrimSuffix(reservedAsks[0],
"|"+allocKey)
- log.Log(log.SchedApplication).Debug("allocate
picking reserved ask during non reserved allocate",
- zap.String("appID", sa.ApplicationID),
- zap.String("nodeID", nodeID),
- zap.String("allocationKey", allocKey))
+ // check if the alloc had a reservation: if it has set
the resultType and return
+ if reserved != nil {
+ if reserved.nodeID != node.NodeID {
+ // we have a different node reserved
for this alloc
+
log.Log(log.SchedApplication).Debug("allocate picking reserved alloc during non
reserved allocate",
+ zap.String("appID",
sa.ApplicationID),
+ zap.String("reserved nodeID",
reserved.nodeID),
+ zap.String("allocationKey",
allocKey))
+ result.ReservedNodeID = reserved.nodeID
+ } else {
+ // NOTE: this is a safeguard as
reserved nodes should never be part of the iterator
+
log.Log(log.SchedApplication).Debug("allocate found reserved alloc during non
reserved allocate",
+ zap.String("appID",
sa.ApplicationID),
+ zap.String("nodeID",
node.NodeID),
+ zap.String("allocationKey",
allocKey))
+ }
result.ResultType = AllocatedReserved
- result.ReservedNodeID = nodeID
allocResult = result
return false
}
@@ -1509,14 +1492,14 @@ func (sa *Application) tryNodes(ask *Allocation,
iterator NodeIterator) *Allocat
}
// nothing allocated should we look at a reservation?
askAge := time.Since(ask.GetCreateTime())
- if allowReserve && askAge > reservationDelay {
+ if reserved == nil && askAge > reservationDelay {
log.Log(log.SchedApplication).Debug("app reservation
check",
zap.String("allocationKey", allocKey),
zap.Time("createTime", ask.GetCreateTime()),
zap.Duration("askAge", askAge),
zap.Duration("reservationDelay",
reservationDelay))
score :=
node.GetFitInScoreForAvailableResource(ask.GetAllocatedResource())
- // Record the so-far best node to reserve
+ // Record the best node so-far to reserve
if score < scoreReserved {
scoreReserved = score
nodeToReserve = node
@@ -1540,7 +1523,7 @@ func (sa *Application) tryNodes(ask *Allocation, iterator
NodeIterator) *Allocat
zap.String("appID", sa.ApplicationID),
zap.String("nodeID", nodeToReserve.NodeID),
zap.String("allocationKey", allocKey),
- zap.Int("reservations", len(reservedAsks)))
+ zap.Int("reservations", len(sa.reservations)))
// skip the node if conditions can not be satisfied
if nodeToReserve.preReserveConditions(ask) != nil {
return nil
@@ -1552,11 +1535,12 @@ func (sa *Application) tryNodes(ask *Allocation,
iterator NodeIterator) *Allocat
return nil
}
-// Try allocating on one specific node
+// tryNode tries allocating on one specific node
func (sa *Application) tryNode(node *Node, ask *Allocation)
(*AllocationResult, error) {
toAllocate := ask.GetAllocatedResource()
+ allocationKey := ask.GetAllocationKey()
// create the key for the reservation
- if !node.preAllocateCheck(toAllocate, reservationKey(nil, sa, ask)) {
+ if !node.preAllocateCheck(toAllocate, allocationKey) {
// skip schedule onto node
return nil, nil
}
@@ -1571,13 +1555,13 @@ func (sa *Application) tryNode(node *Node, ask
*Allocation) (*AllocationResult,
log.Log(log.SchedApplication).DPanic("queue update
failed unexpectedly",
zap.Error(err))
// revert the node update
- node.RemoveAllocation(ask.GetAllocationKey())
+ node.RemoveAllocation(allocationKey)
return nil, nil
}
- // mark this ask as allocated
+ // mark this alloc as allocated
_, err := sa.allocateAsk(ask)
if err != nil {
- log.Log(log.SchedApplication).Warn("allocation of ask
failed unexpectedly",
+ log.Log(log.SchedApplication).Warn("allocation of alloc
failed unexpectedly",
zap.Error(err))
}
// all is OK, last update for the app
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 5b7eed87..c6cfca9c 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -234,11 +234,8 @@ func TestAppReservation(t *testing.T) {
if app.HasReserved() {
t.Fatal("new app should not have reservations")
}
- if app.IsReservedOnNode("") {
- t.Error("app should not have reservations for empty node ID")
- }
- if app.IsReservedOnNode("unknown") {
- t.Error("new app should not have reservations for unknown node")
+ if app.NodeReservedForAsk("") != "" {
+ t.Error("app should not have reservations for empty ask")
}
queue, err := createRootQueue(nil)
@@ -270,33 +267,21 @@ func TestAppReservation(t *testing.T) {
// reserve that works
err = app.Reserve(node, ask)
assert.NilError(t, err, "reservation should not have failed")
- if app.IsReservedOnNode("") {
- t.Errorf("app should not have reservations for empty node ID")
- }
- if app.IsReservedOnNode("unknown") {
- t.Error("app should not have reservations for unknown node")
+ if app.NodeReservedForAsk("") != "" {
+ t.Error("app should not have reservations for empty ask")
}
- if app.HasReserved() && !app.IsReservedOnNode(nodeID1) {
+ if app.HasReserved() && app.NodeReservedForAsk(aKey) != nodeID1 {
t.Errorf("app should have reservations for node %s", nodeID1)
}
- // node name similarity check: chop of the last char to make sure we
check the full name
- similar := nodeID1[:len(nodeID1)-1]
- if app.HasReserved() && app.IsReservedOnNode(similar) {
- t.Errorf("similar app should not have reservations for node
%s", similar)
- }
-
// reserve the same reservation
err = app.Reserve(node, ask)
if err == nil {
t.Errorf("reservation should have failed")
}
- // unreserve unknown node/ask
- _, err = app.UnReserve(nil, nil)
- if err == nil {
- t.Errorf("illegal reservation release but did not fail")
- }
+ // unreserve unknown node/alloc
+ assert.Equal(t, app.UnReserve(nil, nil), 0, "illegal reservation
release should have returned 0")
// 2nd reservation for app
ask2 := newAllocationAsk("alloc-2", appID1, res)
@@ -309,20 +294,16 @@ func TestAppReservation(t *testing.T) {
}
err = app.Reserve(node2, ask2)
assert.NilError(t, err, "reservation of 2nd node should not have
failed")
- _, err = app.UnReserve(node2, ask2)
- assert.NilError(t, err, "remove of reservation of 2nd node should not
have failed")
+ assert.Equal(t, app.UnReserve(node2, ask2), 1, "remove of reservation
of 2nd node should not have failed")
// unreserve the same should fail
- _, err = app.UnReserve(node2, ask2)
- assert.NilError(t, err, "remove twice of reservation of 2nd node should
have failed")
+ assert.Equal(t, app.UnReserve(node2, ask2), 0, "remove twice of
reservation of 2nd node should return 0")
// failure case: remove reservation from node, app still needs cleanup
var num int
- num, err = node.unReserve(app, ask)
- assert.NilError(t, err, "un-reserve on node should not have failed")
+ num = node.unReserve(ask)
assert.Equal(t, num, 1, "un-reserve on node should have removed
reservation")
- num, err = app.UnReserve(node, ask)
- assert.NilError(t, err, "app has reservation should not have failed")
+ num = app.UnReserve(node, ask)
assert.Equal(t, num, 1, "un-reserve on app should have removed
reservation from app")
}
@@ -338,12 +319,9 @@ func TestAppAllocReservation(t *testing.T) {
if app == nil || app.ApplicationID != appID1 {
t.Fatalf("app create failed which should not have %v", app)
}
- if app.HasReserved() {
- t.Fatal("new app should not have reservations")
- }
- if len(app.GetAskReservations("")) != 0 {
- t.Fatal("new app should not have reservation for empty
allocKey")
- }
+ assert.Assert(t, !app.HasReserved(), "new app should not have
reservations")
+ assert.Equal(t, len(app.GetReservations()), 0, "new app should not have
reservation for empty allocKey")
+
queue, err := createRootQueue(nil)
assert.NilError(t, err, "queue create failed")
app.queue = queue
@@ -360,46 +338,44 @@ func TestAppAllocReservation(t *testing.T) {
assert.NilError(t, err, "ask2 should have been added to app")
err = app.Reserve(node1, ask)
assert.NilError(t, err, "reservation should not have failed")
- if len(app.GetAskReservations("")) != 0 {
+ if app.reservations[""] != nil {
t.Fatal("app should not have reservation for empty allocKey")
}
- nodeKey1 := nodeID1 + "|" + aKey
- askReserved := app.GetAskReservations(aKey)
- if len(askReserved) != 1 || askReserved[0] != nodeKey1 {
- t.Errorf("app should have reservations for %s on %s and has
not", aKey, nodeID1)
+ allocReserved := app.reservations[aKey]
+ if allocReserved == nil || allocReserved.nodeID != nodeID1 {
+ t.Fatalf("app should have reservations for %s on %s and has
not", aKey, nodeID1)
}
+ assert.Equal(t, len(app.GetReservations()), 1, "app should have 1
reservation")
- nodeID2 := "node-2"
node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 10})
err = app.Reserve(node2, ask2)
assert.NilError(t, err, "reservation should not have failed: error %v",
err)
- nodeKey2 := nodeID2 + "|" + aKey2
- askReserved = app.GetAskReservations(aKey2)
- if len(askReserved) != 1 && askReserved[0] != nodeKey2 {
- t.Errorf("app should have reservations for %s on %s and has
not", aKey, nodeID2)
+ allocReserved = app.reservations[aKey2]
+ if allocReserved == nil || allocReserved.nodeID != nodeID2 {
+ t.Fatalf("app should have reservations for %s on %s and has
not", aKey, nodeID2)
}
+ assert.Equal(t, len(app.GetReservations()), 2, "app should have 2
reservation")
// check duplicate reserve: nothing should change
- if app.canAskReserve(ask) {
- t.Error("ask has already reserved, reserve check should have
failed")
- }
+ assert.Assert(t, app.canAllocationReserve(ask) != nil, "alloc has
already reserved, reserve check should have failed")
node3 := newNode("node-3", map[string]resources.Quantity{"first": 10})
err = app.Reserve(node3, ask)
if err == nil {
- t.Errorf("reservation should have failed")
+ t.Fatal("reservation should have failed")
}
- askReserved = app.GetAskReservations(aKey)
- if len(askReserved) != 1 && askReserved[0] != nodeKey1 {
- t.Errorf("app should have reservations for node %s and has not:
%v", nodeID1, askReserved)
+ allocReserved = app.reservations[aKey]
+ if allocReserved == nil || allocReserved.nodeID != nodeID1 {
+ t.Fatalf("app should have reservations for node %s and has not:
%v", nodeID1, allocReserved)
}
- askReserved = app.GetAskReservations(aKey2)
- if len(askReserved) != 1 && askReserved[0] != nodeKey2 {
- t.Errorf("app should have reservations for node %s and has not:
%v", nodeID2, askReserved)
+ allocReserved = app.reservations[aKey2]
+ if allocReserved == nil || allocReserved.nodeID != nodeID2 {
+ t.Fatalf("app should have reservations for node %s and has not:
%v", nodeID2, allocReserved)
}
+ assert.Equal(t, len(app.GetReservations()), 2, "app should have 2
reservation")
// clean up all asks and reservations
- reservedAsks := app.RemoveAllocationAsk("")
- if app.HasReserved() || node1.IsReserved() || node2.IsReserved() ||
reservedAsks != 2 {
- t.Errorf("ask removal did not clean up all reservations,
reserved released = %d", reservedAsks)
+ reservedRelease := app.RemoveAllocationAsk("")
+ if app.HasReserved() || node1.IsReserved() || node2.IsReserved() ||
reservedRelease != 2 {
+ t.Fatalf("ask removal did not clean up all reservations,
reserved released = %d", reservedRelease)
}
}
@@ -495,7 +471,6 @@ func TestAddAllocAsk(t *testing.T) {
// test add alloc ask event
noEvents := uint64(0)
err = common.WaitForCondition(10*time.Millisecond, time.Second, func()
bool {
- fmt.Printf("checking event length: %d\n",
eventSystem.Store.CountStoredEvents())
noEvents = eventSystem.Store.CountStoredEvents()
return noEvents == 2
})
@@ -652,7 +627,7 @@ func TestRemoveReservedAllocAsk(t *testing.T) {
node := newNode(nodeID1, map[string]resources.Quantity{"first": 10})
err = app.Reserve(node, ask2)
assert.NilError(t, err, "reservation should not have failed")
- if len(app.GetAskReservations(allocKey)) != 1 || !node.IsReserved() {
+ if app.reservations[allocKey] == nil || !node.IsReserved() {
t.Fatalf("app should have reservation for %v on node", allocKey)
}
before := app.GetPendingResource().Clone()
@@ -671,11 +646,10 @@ func TestRemoveReservedAllocAsk(t *testing.T) {
err = app.Reserve(node, ask2)
assert.NilError(t, err, "reservation should not have failed: error %v",
err)
- if len(app.GetAskReservations(allocKey)) != 1 || !node.IsReserved() {
+ if app.reservations[allocKey] == nil || !node.IsReserved() {
t.Fatalf("app should have reservation for %v on node", allocKey)
}
- var num int
- num, err = node.unReserve(app, ask2)
+ num := node.unReserve(ask2)
assert.NilError(t, err, "un-reserve on node should not have failed")
assert.Equal(t, num, 1, "un-reserve on node should have removed
reservation")
@@ -1954,6 +1928,235 @@ func TestCanReplace(t *testing.T) {
}
}
+func TestTryRequiredNode(t *testing.T) {
+ node := newNode(nodeID1, map[string]resources.Quantity{"first": 5})
+ nodeMap := map[string]*Node{nodeID1: node}
+ getNode := func(nodeID string) *Node {
+ return nodeMap[nodeID]
+ }
+ resMap := map[string]string{"first": "5"}
+ rootQ, err := createRootQueue(resMap)
+ assert.NilError(t, err, "unexpected error when creating root queue")
+ var childQ *Queue
+ childQ, err = createManagedQueue(rootQ, "child", false, resMap)
+ assert.NilError(t, err, "unexpected error when creating child queue")
+
+ app := newApplication(appID1, "default", "root.child")
+ app.SetQueue(childQ)
+ childQ.applications[appID1] = app
+ allocRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3})
+ alloc := newAllocation(aKey, nodeID1, allocRes)
+ app.AddAllocation(alloc)
+ node.AddAllocation(alloc)
+
+ ask := newAllocationAsk(aKey2, appID1, allocRes)
+ err = app.AddAllocationAsk(ask)
+ assert.NilError(t, err, "unexpected error when adding an ask")
+ err = app.Reserve(node, ask)
+ assert.NilError(t, err, "unexpected error when reserving ask")
+
+ // get a small enough allocation that fits after cancelling the
reservation
+ allocRes =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+ ask = newAllocationAsk(aKey3, appID1, allocRes)
+ ask.requiredNode = nodeID1
+ err = app.AddAllocationAsk(ask)
+ assert.NilError(t, err, "unexpected error when adding an ask")
+ result := app.tryRequiredNode(ask, getNode)
+ assert.Assert(t, result != nil, "alloc expected")
+ assert.Assert(t, result.Request == ask, "alloc expected for the ask")
+ assert.Equal(t, result.ResultType, Allocated, "expected allocated
result")
+ assert.Equal(t, result.CancelledReservations, 1, "expected 1 cancelled
reservation")
+ assert.Equal(t, nodeID1, result.NodeID, "wrong node")
+ // the non required node one should be cancelled
+ assert.Assert(t, !node.isReservedForAllocation(aKey2), "expecting no
reservation for alloc-2 on node")
+ assert.Equal(t, app.NodeReservedForAsk(aKey2), "", "expecting no
reservation for alloc-2 on node-1")
+}
+
+func TestTryRequiredNodeReserved(t *testing.T) {
+ node := newNode(nodeID1, map[string]resources.Quantity{"first": 5})
+ nodeMap := map[string]*Node{nodeID1: node}
+ getNode := func(nodeID string) *Node {
+ return nodeMap[nodeID]
+ }
+ resMap := map[string]string{"first": "5"}
+ rootQ, err := createRootQueue(resMap)
+ assert.NilError(t, err, "unexpected error when creating root queue")
+ var childQ *Queue
+ childQ, err = createManagedQueue(rootQ, "child", false, resMap)
+ assert.NilError(t, err, "unexpected error when creating child queue")
+
+ app := newApplication(appID1, "default", "root.child")
+ app.SetQueue(childQ)
+ childQ.applications[appID1] = app
+ allocRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2})
+ ask := newAllocationAsk(aKey2, appID1, allocRes)
+ ask.requiredNode = nodeID1
+ err = app.AddAllocationAsk(ask)
+ assert.NilError(t, err, "unexpected error when adding an ask")
+ err = app.Reserve(node, ask)
+ assert.NilError(t, err, "unexpected error when reserving ask")
+
+ result := app.tryRequiredNode(ask, getNode)
+ assert.Assert(t, result != nil, "alloc expected")
+ assert.Assert(t, result.Request == ask, "alloc expected for the ask")
+ assert.Equal(t, result.ResultType, AllocatedReserved, "expected
allocated reserved result")
+ assert.Equal(t, result.CancelledReservations, 0, "expected no cancelled
reservation")
+ assert.Equal(t, nodeID1, result.NodeID, "wrong node")
+}
+
+func TestTryRequiredNodeReserve(t *testing.T) {
+ node := newNode(nodeID1, map[string]resources.Quantity{"first": 5})
+ nodeMap := map[string]*Node{nodeID1: node}
+ getNode := func(nodeID string) *Node {
+ return nodeMap[nodeID]
+ }
+ resMap := map[string]string{"first": "5"}
+ rootQ, err := createRootQueue(resMap)
+ assert.NilError(t, err, "unexpected error when creating root queue")
+ var childQ *Queue
+ childQ, err = createManagedQueue(rootQ, "child", false, resMap)
+ assert.NilError(t, err, "unexpected error when creating child queue")
+
+ app := newApplication(appID1, "default", "root.child")
+ app.SetQueue(childQ)
+ childQ.applications[appID1] = app
+ allocRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3})
+ alloc := newAllocation(aKey, nodeID1, allocRes)
+ app.AddAllocation(alloc)
+ node.AddAllocation(alloc)
+
+ ask := newAllocationAsk(aKey2, appID1, allocRes)
+ ask.requiredNode = nodeID1
+ err = app.AddAllocationAsk(ask)
+ assert.NilError(t, err, "unexpected error when adding an ask")
+
+ result := app.tryRequiredNode(ask, getNode)
+ assert.Assert(t, result != nil, "alloc expected")
+ assert.Assert(t, result.Request == ask, "alloc expected for the ask")
+ assert.Equal(t, result.ResultType, Reserved, "expected reserved result")
+ assert.Equal(t, nodeID1, result.NodeID, "wrong node")
+}
+
+func TestTryRequiredNodeCancel(t *testing.T) {
+ node := newNode(nodeID1, map[string]resources.Quantity{"first": 5})
+ nodeMap := map[string]*Node{nodeID1: node}
+ getNode := func(nodeID string) *Node {
+ return nodeMap[nodeID]
+ }
+ resMap := map[string]string{"first": "5"}
+ rootQ, err := createRootQueue(resMap)
+ assert.NilError(t, err, "unexpected error when creating root queue")
+ var childQ *Queue
+ childQ, err = createManagedQueue(rootQ, "child", false, resMap)
+ assert.NilError(t, err, "unexpected error when creating child queue")
+
+ app := newApplication(appID1, "default", "root.child")
+ app.SetQueue(childQ)
+ childQ.applications[appID1] = app
+ allocRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3})
+ alloc := newAllocation(aKey, nodeID1, allocRes)
+ app.AddAllocation(alloc)
+ node.AddAllocation(alloc)
+
+ ask := newAllocationAsk(aKey2, appID1, allocRes)
+ err = app.AddAllocationAsk(ask)
+ assert.NilError(t, err, "adding new allocation to app failed
unexpected")
+ err = app.reserveInternal(node, ask)
+ assert.NilError(t, err, "reserving new allocation on app/node failed
unexpected")
+ assert.Assert(t, node.isReservedForAllocation(aKey2), "expecting alloc
reservation on node")
+ assert.Equal(t, app.NodeReservedForAsk(aKey2), nodeID1, "expecting app
reservation on node")
+
+ ask = newAllocationAsk(aKey3, appID1, allocRes)
+ ask.requiredNode = nodeID1
+ err = app.AddAllocationAsk(ask)
+ assert.NilError(t, err, "adding new allocation to app failed
unexpected")
+ result := app.tryRequiredNode(ask, getNode)
+ assert.Assert(t, result != nil, "alloc expected")
+ assert.Assert(t, result.Request == ask, "alloc expected for the ask")
+ assert.Equal(t, result.ResultType, Reserved, "expected allocated
result")
+ assert.Equal(t, result.CancelledReservations, 1, "expected 1 cancelled
reservation")
+ assert.Equal(t, nodeID1, result.NodeID, "wrong node")
+ assert.Assert(t, !node.isReservedForAllocation(aKey2), "expecting no
reservation for alloc-2 on node")
+ assert.Equal(t, app.NodeReservedForAsk(aKey2), "", "expecting no
reservation for alloc-2 on app")
+}
+
+func TestTryRequiredNodeAdd(t *testing.T) {
+ node := newNode(nodeID1, map[string]resources.Quantity{"first": 5})
+ nodeMap := map[string]*Node{nodeID1: node}
+ getNode := func(nodeID string) *Node {
+ return nodeMap[nodeID]
+ }
+ rootQ, err := createRootQueue(map[string]string{"first": "5"})
+ assert.NilError(t, err, "unexpected error when creating root queue")
+ var childQ *Queue
+ childQ, err = createManagedQueue(rootQ, "child", false,
map[string]string{"first": "5"})
+ assert.NilError(t, err, "unexpected error when creating child queue")
+
+ app := newApplication(appID1, "default", "root.child")
+ app.SetQueue(childQ)
+ childQ.applications[appID1] = app
+ allocRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3})
+ alloc := newAllocation(aKey, nodeID1, allocRes)
+ app.AddAllocation(alloc)
+ node.AddAllocation(alloc)
+
+ ask := newAllocationAsk(aKey2, appID1, allocRes)
+ ask.requiredNode = nodeID1
+ err = app.AddAllocationAsk(ask)
+ assert.NilError(t, err, "adding new allocation to app failed
unexpected")
+
+ result := app.tryRequiredNode(ask, getNode)
+ assert.Assert(t, result != nil, "alloc expected")
+ assert.Assert(t, result.Request == ask, "alloc expected for the ask")
+ assert.Equal(t, result.ResultType, Reserved, "expected reserved result")
+ assert.Equal(t, nodeID1, result.NodeID, "wrong node")
+
+ // finish processing do what the context would do
+ err = app.reserveInternal(node, ask)
+ assert.NilError(t, err, "reservation processing failed unexpected")
+
+ ask = newAllocationAsk(aKey3, appID1, allocRes)
+ ask.requiredNode = nodeID1
+ err = app.AddAllocationAsk(ask)
+ assert.NilError(t, err, "adding new allocation to app failed
unexpected")
+ result = app.tryRequiredNode(ask, getNode)
+ assert.Assert(t, result != nil, "alloc expected")
+ assert.Assert(t, result.Request == ask, "alloc expected for the ask")
+ assert.Equal(t, result.ResultType, Reserved, "expected allocated
result")
+ assert.Equal(t, result.CancelledReservations, 0, "expected no cancelled
reservation")
+ assert.Equal(t, nodeID1, result.NodeID, "wrong node")
+ assert.Assert(t, node.isReservedForAllocation(aKey2), "expecting
reservation for alloc-2 on node")
+ assert.Equal(t, app.NodeReservedForAsk(aKey2), nodeID1, "expecting
reservation for alloc-2 on app")
+ assert.Assert(t, !node.isReservedForAllocation(aKey3), "expecting no
reservation for alloc-3 on node")
+ assert.Equal(t, app.NodeReservedForAsk(aKey3), "", "expecting no
reservation for alloc-3 on app")
+}
+
+func TestTryRequiredNodeExists(t *testing.T) {
+ node := newNode(nodeID1, map[string]resources.Quantity{"first": 5})
+ nodeMap := map[string]*Node{nodeID1: node}
+ getNode := func(nodeID string) *Node {
+ return nodeMap[nodeID]
+ }
+ rootQ, err := createRootQueue(map[string]string{"first": "5"})
+ assert.NilError(t, err, "unexpected error when creating root queue")
+ var childQ *Queue
+ childQ, err = createManagedQueue(rootQ, "child", false,
map[string]string{"first": "5"})
+ assert.NilError(t, err, "unexpected error when creating child queue")
+
+ app := newApplication(appID1, "default", "root.child")
+ app.SetQueue(childQ)
+ childQ.applications[appID1] = app
+ allocRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3})
+
+ ask := newAllocationAsk(aKey2, appID1, allocRes)
+ ask.requiredNode = nodeID2
+ err = app.AddAllocationAsk(ask)
+ assert.NilError(t, err, "adding new allocation to app failed
unexpected")
+
+ result := app.tryRequiredNode(ask, getNode)
+ assert.Assert(t, result == nil, "alloc not expected")
+}
+
func TestTryAllocateNoRequests(t *testing.T) {
node := newNode("node1", map[string]resources.Quantity{"first": 5})
nodeMap := map[string]*Node{"node1": node}
@@ -3210,3 +3413,28 @@ type mockAppEventHandler struct {
func (m mockAppEventHandler) HandleEvent(ev interface{}) {
m.callback(ev)
}
+
+func TestApplication_canAllocationReserve(t *testing.T) {
+ res := resources.NewResource()
+ tests := []struct {
+ name string
+ alloc *Allocation
+ wantErr bool
+ }{
+ {"new", newAllocationWithKey(aKey, appID1, "", res), false},
+ {"allocated", newAllocationWithKey(aKey2, appID1, nodeID1,
res), true},
+ {"duplicate", newAllocationWithKey(aKey3, appID1, "", res),
true},
+ }
+ app := newApplication(appID0, "default", "root.unknown")
+ app.reservations[aKey3] = &reservation{
+ nodeID: nodeID1,
+ allocKey: aKey,
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if err := app.canAllocationReserve(tt.alloc); (err !=
nil) != tt.wantErr {
+ t.Errorf("canAllocationReserve() error = %v,
wantErr %v", err, tt.wantErr)
+ }
+ })
+ }
+}
diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index ae5971f4..2047ff44 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -20,17 +20,17 @@ package objects
import (
"fmt"
- "strings"
"go.uber.org/zap"
+ "github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/plugins"
schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
- "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
+ siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -110,9 +110,9 @@ func (sn *Node) initializeAttribute(newAttributes
map[string]string) {
sn.attributes = map[string]string{}
}
- sn.Hostname = sn.attributes[common.HostName]
- sn.Rackname = sn.attributes[common.RackName]
- sn.Partition = sn.attributes[common.NodePartition]
+ sn.Hostname = sn.attributes[siCommon.HostName]
+ sn.Rackname = sn.attributes[siCommon.RackName]
+ sn.Partition = sn.attributes[siCommon.NodePartition]
}
// Get an attribute by name. The most used attributes can be directly accessed
via the
@@ -129,7 +129,7 @@ func (sn *Node) GetAttributes() map[string]string {
// Get InstanceType of this node.
// This is a lock free call because all attributes are considered read only
func (sn *Node) GetInstanceType() string {
- itype := sn.GetAttribute(common.InstanceType)
+ itype := sn.GetAttribute(siCommon.InstanceType)
if itype != "" {
return itype
}
@@ -521,7 +521,7 @@ func (sn *Node) preConditions(ask *Allocation, allocate
bool) error {
// preAllocateCheck checks if the node should be considered as a possible node
to allocate on.
// No updates are made this only performs a pre allocate checks
-func (sn *Node) preAllocateCheck(res *resources.Resource, resKey string) bool {
+func (sn *Node) preAllocateCheck(res *resources.Resource, allocationKey
string) bool {
// cannot allocate zero or negative resource
if !resources.StrictlyGreaterThanZero(res) {
log.Log(log.SchedNode).Debug("pre alloc check: requested
resource is zero",
@@ -530,10 +530,10 @@ func (sn *Node) preAllocateCheck(res *resources.Resource,
resKey string) bool {
}
// check if the node is reserved for this app/alloc
if sn.IsReserved() {
- if !sn.isReservedForApp(resKey) {
- log.Log(log.SchedNode).Debug("pre alloc check: node
reserved for different app or ask",
+ if !sn.isReservedForAllocation(allocationKey) {
+ log.Log(log.SchedNode).Debug("pre alloc check: node
reserved for different alloc",
zap.String("nodeID", sn.NodeID),
- zap.String("resKey", resKey))
+ zap.String("allocationKey", allocationKey))
return false
}
}
@@ -544,95 +544,98 @@ func (sn *Node) preAllocateCheck(res *resources.Resource,
resKey string) bool {
return sn.availableResource.FitIn(res)
}
-// Return if the node has been reserved by any application
+// IsReserved returns true if the node has been reserved for an allocation
func (sn *Node) IsReserved() bool {
sn.RLock()
defer sn.RUnlock()
return len(sn.reservations) > 0
}
-// isReservedForApp returns true if and only if the node has been reserved by
the application
-// NOTE: a return value of false does not mean the node is not reserved by a
different app
-func (sn *Node) isReservedForApp(key string) bool {
+// isReservedForAllocation returns true if and only if the node has been
reserved by this allocation
+// NOTE: a return value of false does not mean the node is not reserved by a
different allocation, use IsReserved
+// to test if the node has any reservation.
+func (sn *Node) isReservedForAllocation(key string) bool {
if key == "" {
return false
}
sn.RLock()
defer sn.RUnlock()
- if strings.Contains(key, "|") {
- return sn.reservations[key] != nil
- }
- // make sure matches only for the whole appID
- separator := key + "|"
- for resKey := range sn.reservations {
- if strings.HasPrefix(resKey, separator) {
- return true
- }
- }
- return false
+ return sn.reservations[key] != nil
}
-// Reserve the node for this application and ask combination, if not reserved
yet.
+// Reserve the node for this application and alloc combination.
// The reservation is checked against the node resources.
-// If the reservation fails the function returns false, if the reservation is
made it returns true.
+// If the reservation fails the function returns an error, if the reservation
is made it returns nil.
func (sn *Node) Reserve(app *Application, ask *Allocation) error {
- defer sn.notifyListeners()
sn.Lock()
defer sn.Unlock()
- if len(sn.reservations) > 0 {
- return fmt.Errorf("node is already reserved, nodeID %s",
sn.NodeID)
- }
appReservation := newReservation(sn, app, ask, false)
- // this should really not happen just guard against panic
- // either app or ask are nil
+ // this should really not happen just guard against panic either app or
alloc are nil
if appReservation == nil {
log.Log(log.SchedNode).Debug("reservation creation failed
unexpectedly",
zap.String("nodeID", sn.NodeID),
- zap.Any("app", app),
- zap.Any("ask", ask))
- return fmt.Errorf("reservation creation failed app or ask are
nil on nodeID %s", sn.NodeID)
+ zap.Stringer("app", app),
+ zap.Stringer("alloc", ask))
+ return fmt.Errorf("reservation creation failed either app or
alloc are nil on nodeID %s", sn.NodeID)
+ }
+ reqNode := ask.requiredNode != ""
+ if !reqNode && len(sn.reservations) > 0 {
+ log.Log(log.SchedNode).Warn("normal reservation on already
reserved node",
+ zap.String("nodeID", sn.NodeID),
+ zap.String("new app", appReservation.appID),
+ zap.String("new alloc", appReservation.allocKey))
+ return common.ErrorNodeAlreadyReserved
+ }
+ // allow multiple required node reservations on the same node
+ if reqNode {
+ // make sure all other reservations are for required nodes
+ for _, reserved := range sn.reservations {
+ if reserved.alloc.requiredNode == "" {
+ log.Log(log.SchedNode).Warn("trying to add
normal reservation to node with required node reservation",
+ zap.String("nodeID", sn.NodeID),
+ zap.String("existing app",
reserved.appID),
+ zap.String("existing alloc",
reserved.allocKey),
+ zap.String("new app",
appReservation.appID),
+ zap.String("new alloc",
appReservation.allocKey))
+ return fmt.Errorf("normal reservation: required
node reservation present, nodeID %s", sn.NodeID)
+ }
+ }
}
// reservation must fit on the empty node
if !sn.totalResource.FitIn(ask.GetAllocatedResource()) {
log.Log(log.SchedNode).Debug("reservation does not fit on the
node",
zap.String("nodeID", sn.NodeID),
zap.String("appID", app.ApplicationID),
- zap.String("ask", ask.GetAllocationKey()),
- zap.Stringer("allocationAsk",
ask.GetAllocatedResource()))
- return fmt.Errorf("reservation does not fit on node %s, appID
%s, ask %s", sn.NodeID, app.ApplicationID, ask.GetAllocatedResource().String())
+ zap.String("alloc", ask.GetAllocationKey()),
+ zap.Stringer("requested resources",
ask.GetAllocatedResource()))
+ return common.ErrorNodeNotFitReserve
}
- sn.reservations[appReservation.getKey()] = appReservation
+ sn.reservations[ask.allocationKey] = appReservation
sn.nodeEvents.SendReservedEvent(sn.NodeID, ask.GetAllocatedResource(),
ask.GetAllocationKey())
// reservation added successfully
return nil
}
-// unReserve the node for this application and ask combination
-// If the reservation does not exist it returns 0 for reservations removed, if
the reservation is removed it returns 1.
-// The error is set if the reservation key cannot be generated.
-func (sn *Node) unReserve(app *Application, ask *Allocation) (int, error) {
- defer sn.notifyListeners()
+// unReserve the node for this application and alloc combination
+// If the reservation does not exist or alloc is nil it returns 0 for
reservations removed,
+// if the reservation is removed it returns 1.
+func (sn *Node) unReserve(alloc *Allocation) int {
+ if alloc == nil {
+ return 0
+ }
sn.Lock()
defer sn.Unlock()
- resKey := reservationKey(nil, app, ask)
- if resKey == "" {
- log.Log(log.SchedNode).Debug("unreserve reservation key create
failed unexpectedly",
- zap.String("nodeID", sn.NodeID),
- zap.Any("app", app),
- zap.Any("ask", ask))
- return 0, fmt.Errorf("reservation key failed app or ask are nil
on nodeID %s", sn.NodeID)
- }
- if _, ok := sn.reservations[resKey]; ok {
- delete(sn.reservations, resKey)
- sn.nodeEvents.SendUnreservedEvent(sn.NodeID,
ask.GetAllocatedResource(), ask.GetAllocationKey())
- return 1, nil
+ if _, ok := sn.reservations[alloc.allocationKey]; ok {
+ delete(sn.reservations, alloc.allocationKey)
+ sn.nodeEvents.SendUnreservedEvent(sn.NodeID,
alloc.GetAllocatedResource(), alloc.GetAllocationKey())
+ return 1
}
// reservation was not found
log.Log(log.SchedNode).Debug("reservation not found while removing from
node",
zap.String("nodeID", sn.NodeID),
- zap.String("appID", app.ApplicationID),
- zap.String("ask", ask.GetAllocationKey()))
- return 0, nil
+ zap.String("alloc", alloc.GetAllocationKey()),
+ zap.String("appID", alloc.GetApplicationID()))
+ return 0
}
// GetReservations returns all reservation made on this node
diff --git a/pkg/scheduler/objects/node_collection_test.go
b/pkg/scheduler/objects/node_collection_test.go
index 30d73e67..a43c49cb 100644
--- a/pkg/scheduler/objects/node_collection_test.go
+++ b/pkg/scheduler/objects/node_collection_test.go
@@ -163,13 +163,8 @@ func TestSetNodeSortingPolicy(t *testing.T) {
node := newNode(nodesInfo[id].nodeID,
map[string]resources.Quantity{"vcore": resources.Quantity(defaultCapicity[0]),
"memory": resources.Quantity(defaultCapicity[1])})
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore":
resources.Quantity(nodesInfo[id].allocatedVcore), "memory":
resources.Quantity(nodesInfo[id].allocatedMem)})
alloc :=
newAllocation(fmt.Sprintf("test-app-%d", id+1), fmt.Sprintf("test-%d", id+1),
res)
- if ok := node.TryAddAllocation(alloc); !ok {
- t.Error("Allocation error happen in
node.")
- }
-
- if err := nc.AddNode(node); err != nil {
- t.Errorf("AddNode error:%s",
err.Error())
- }
+ assert.Assert(t, node.TryAddAllocation(alloc),
"Allocation error happened on node")
+ assert.NilError(t, nc.AddNode(node), "Adding
node to collection failed")
}
conf := configs.PartitionConfig{
@@ -196,9 +191,8 @@ func TestSetNodeSortingPolicy(t *testing.T) {
}
nc.SetNodeSortingPolicy(NewNodeSortingPolicy(conf.NodeSortPolicy.Type,
conf.NodeSortPolicy.ResourceWeights))
- iter := nc.GetNodeIterator()
id := 0
- iter.ForEachNode(func(node *Node) bool {
+ nc.GetNodeIterator().ForEachNode(func(node *Node) bool {
assert.Equal(t, node.NodeID, tt.nodesOrder[id],
"%s: NodeID wanted %s, but it got %s.",
nc.GetNodeSortingPolicy().PolicyType().String(), tt.nodesOrder[id], node.NodeID)
id++
return true
@@ -231,9 +225,7 @@ func TestGetNodeSortingPolicy(t *testing.T) {
alloc :=
newAllocation(fmt.Sprintf("test-app-%d", id+1), fmt.Sprintf("test-%d", id), res)
node.AddAllocation(alloc)
- if err := nc.AddNode(node); err != nil {
- t.Errorf("AddNode error:%s",
err.Error())
- }
+ assert.NilError(t, nc.AddNode(node), "Adding
node to collection failed")
}
conf := configs.PartitionConfig{
@@ -260,21 +252,16 @@ func TestGetNodeSortingPolicy(t *testing.T) {
}
nc.SetNodeSortingPolicy(NewNodeSortingPolicy(conf.NodeSortPolicy.Type,
conf.NodeSortPolicy.ResourceWeights))
- if ans :=
nc.GetNodeSortingPolicy().PolicyType().String(); ans != tt.want {
- t.Errorf("got %s, want %s", ans, tt.want)
- }
+ assert.Equal(t,
nc.GetNodeSortingPolicy().PolicyType().String(), tt.want, "expected sort policy
not set")
- // Checking thes nodes order in iterator is after
setting node policy with Default weight{vcore:1, memory:1}.
- iter := nc.GetNodeIterator()
+ // Checking the nodes order in iterator is after
setting node policy with Default weight{vcore:1, memory:1}.
index := 0
- iter.ForEachNode(func(node *Node) bool {
+ nc.GetNodeIterator().ForEachNode(func(node *Node) bool {
if index >= len(tt.exceptNodeOrder) {
t.Error("Wrong length of nodes in node
iterator.")
}
- if node.NodeID != tt.exceptNodeOrder[index] {
- t.Errorf("Policy: %s, got %s, want %s",
nc.GetNodeSortingPolicy().PolicyType().String(), node.NodeID,
tt.exceptNodeOrder[index])
- }
+ assert.Equal(t, node.NodeID,
tt.exceptNodeOrder[index], "Policy: %s, node order wrong",
nc.GetNodeSortingPolicy().PolicyType().String())
index++
return true
})
@@ -292,21 +279,15 @@ func TestGetFullNodeIterator(t *testing.T) {
allocName := fmt.Sprintf("alloc-%02d", i)
app := newApplication(appName, "default", "root.test")
ask := newAllocationAsk(allocName, appName,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore":
resources.Quantity(i)}))
- if err := node.Reserve(app, ask); err != nil {
- t.Error("Reserving failed.")
- }
+ assert.NilError(t, node.Reserve(app, ask), "Reserving
failed.")
} else {
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore":
resources.Quantity(i)})
alloc := newAllocation(fmt.Sprintf("test-app-%d", i),
fmt.Sprintf("test-%d", i), res)
- if ok := node.TryAddAllocation(alloc); !ok {
- t.Error("Allocation error in node.")
- }
- }
- if err := nc.AddNode(node); err != nil {
- t.Error("Adding another node into BC failed.")
+ assert.Assert(t, node.TryAddAllocation(alloc), "Adding
allocation to node failed unexpectedly")
}
+ assert.NilError(t, nc.AddNode(node), "Adding another node into
BC failed.")
}
- nodes := make([]*Node, 0)
+ var nodes []*Node
nc.GetFullNodeIterator().ForEachNode(func(node *Node) bool {
nodes = append(nodes, node)
return true
@@ -328,6 +309,9 @@ func TestGetNodeIterator(t *testing.T) {
{"Some nodes are reserved", []bool{false, true, false, true},
[]int{1, 3}},
}
+ // Check order of available nodes
+ nsp := []string{policies.FairnessPolicy.String(),
policies.BinPackingPolicy.String()}
+
for _, tt := range tests {
t.Run("There are reserved nodes in an instance of node
collection.", func(t *testing.T) {
nc := NewNodeCollection("test")
@@ -338,47 +322,29 @@ func TestGetNodeIterator(t *testing.T) {
node := newNode(nodeName,
map[string]resources.Quantity{"vcore": resources.Quantity(10)})
if tt.reserved[i-1] {
appName := fmt.Sprintf("app-%02d", i)
- allocName := fmt.Sprintf("alloc-%02d",
i)
app := newApplication(appName,
"default", "root.test")
- ask := newAllocationAsk(allocName,
appName, resources.NewResourceFromMap(map[string]resources.Quantity{"vcore":
resources.Quantity(i)}))
- if err := node.Reserve(app, ask); err
!= nil {
- t.Error("Reserving failed.")
- }
+ ask :=
newAllocationAsk(fmt.Sprintf("alloc-%02d", i), appName,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore":
resources.Quantity(i)}))
+ assert.NilError(t, node.Reserve(app,
ask), "Reserving failed.")
} else {
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore":
resources.Quantity(i)})
alloc :=
newAllocation(fmt.Sprintf("test-app-%d", i), fmt.Sprintf("test-%d", i), res)
- if ok := node.TryAddAllocation(alloc);
!ok {
- t.Error("Allocation error
happen in node.")
- }
- }
-
- if err := nc.AddNode(node); err != nil {
- t.Error("Adding another node into BC
failed.")
+ assert.Assert(t,
node.TryAddAllocation(alloc), "Adding allocation to node failed unexpectedly")
}
+ assert.NilError(t, nc.AddNode(node), "Adding
another node into BC failed.")
}
- // Check order of avialble nodes
- NodeSortingPolicy :=
[]string{policies.FairnessPolicy.String(), policies.BinPackingPolicy.String()}
-
// Fair policy
-
nc.SetNodeSortingPolicy(NewNodeSortingPolicy(NodeSortingPolicy[0], nil))
- iter := nc.GetNodeIterator()
- if ans :=
nc.GetNodeSortingPolicy().PolicyType().String(); ans != NodeSortingPolicy[0] {
- t.Errorf("got %s, want %s", ans,
NodeSortingPolicy[0])
- }
- index := 0
- iter.ForEachNode(func(node *Node) bool {
- fmt.Println(node.NodeID)
- return true
- })
+ nc.SetNodeSortingPolicy(NewNodeSortingPolicy(nsp[0],
nil))
+ assert.Equal(t,
nc.GetNodeSortingPolicy().PolicyType().String(), nsp[0], "expected sort policy
not set")
- iter.ForEachNode(func(node *Node) bool {
+ index := 0
+ nc.GetNodeIterator().ForEachNode(func(node *Node) bool {
if index >= len(tt.wantWithFair) {
t.Errorf("Want length of nodes: %d, Get
length of nodes: %d", index, len(tt.wantWithFair))
}
if want := fmt.Sprintf("node-%d",
tt.wantWithFair[index]); node.NodeID != want {
- t.Errorf("%s with %s, Want %s, got
%s.", tt.name, NodeSortingPolicy[0], want, node.NodeID)
+ t.Errorf("%s with %s, Want %s, got
%s.", tt.name, nsp[0], want, node.NodeID)
}
index++
@@ -386,26 +352,87 @@ func TestGetNodeIterator(t *testing.T) {
})
// Binpacking policy
-
nc.SetNodeSortingPolicy(NewNodeSortingPolicy(NodeSortingPolicy[1], nil))
- if ans :=
nc.GetNodeSortingPolicy().PolicyType().String(); ans != NodeSortingPolicy[1] {
- t.Errorf("got %s, want %s", ans,
NodeSortingPolicy[1])
- }
+ nc.SetNodeSortingPolicy(NewNodeSortingPolicy(nsp[1],
nil))
+ assert.Equal(t,
nc.GetNodeSortingPolicy().PolicyType().String(), nsp[1], "expected sort policy
not set")
- iter = nc.GetNodeIterator()
- DescreasingIndex := len(tt.wantWithFair) - 1
+ decIndex := len(tt.wantWithFair) - 1
index = 0
- iter.ForEachNode(func(node *Node) bool {
+ nc.GetNodeIterator().ForEachNode(func(node *Node) bool {
if index >= len(tt.wantWithFair) {
t.Errorf("Want length of nodes: %d, Get
length of nodes: %d", index, len(tt.wantWithFair))
}
- if want := fmt.Sprintf("node-%d",
tt.wantWithFair[DescreasingIndex]); node.NodeID != want {
- t.Errorf("%s with %s, want %s, got
%s.", tt.name, NodeSortingPolicy[1], want, node.NodeID)
+ if want := fmt.Sprintf("node-%d",
tt.wantWithFair[decIndex]); node.NodeID != want {
+ t.Errorf("%s with %s, want %s, got
%s.", tt.name, nsp[1], want, node.NodeID)
}
index++
- DescreasingIndex--
+ decIndex--
return true
})
})
}
}
+
+// TestNodeIteratorReserveUpdate reservation add or remove should not need a
node collection update make sure it works.
+// YUNIKORN-2976 removed the listener notify in Reserve and unReserve
+func TestNodeIteratorReserveUpdate(t *testing.T) {
+ nc := NewNodeCollection("test")
+ count := 3
+ for i := 0; i < count; i++ {
+ node := newNode(fmt.Sprintf("node-%d", i),
map[string]resources.Quantity{"some": resources.Quantity(10)})
+ assert.NilError(t, nc.AddNode(node), "Adding another node into
BC failed.")
+ }
+ // first check: both iterators return all nodes
+ allNodes := make([]*Node, 0)
+ nc.GetFullNodeIterator().ForEachNode(func(node *Node) bool {
+ allNodes = append(allNodes, node)
+ return true
+ })
+ assert.Equal(t, len(allNodes), count, "wrong length")
+
+ var itNodes []*Node
+ nc.GetNodeIterator().ForEachNode(func(node *Node) bool {
+ itNodes = append(itNodes, node)
+ return true
+ })
+ assert.Equal(t, len(itNodes), count, "wrong length")
+
+ // add reservation to all nodes
+ app := newApplication(appID0, "default", "root.test")
+ for i, node := range allNodes {
+ ask := newAllocationAsk(fmt.Sprintf("ask-%d", i), appID0,
resources.NewResourceFromMap(map[string]resources.Quantity{"some":
resources.Quantity(5)}))
+ app.AddAllocation(ask)
+ assert.NilError(t, node.Reserve(app, ask), "Reserving failed.")
+ }
+
+ // full iterator returns all nodes
+ itNodes = nil
+ nc.GetFullNodeIterator().ForEachNode(func(node *Node) bool {
+ itNodes = append(itNodes, node)
+ return true
+ })
+ assert.Equal(t, len(itNodes), count, "wrong length")
+
+ // filtered iterator returns NO nodes
+ itNodes = nil
+ nc.GetNodeIterator().ForEachNode(func(node *Node) bool {
+ itNodes = append(itNodes, node)
+ return true
+ })
+ assert.Equal(t, len(itNodes), 0, "wrong length")
+
+ // run over initial list of nodes and remove reservations.
+ // only one reservation so just pick that one
+ for _, node := range allNodes {
+ alloc := node.GetReservations()[0].alloc
+ assert.Equal(t, node.unReserve(alloc), 1, "unReserve should
have returned a single removal")
+ assert.Assert(t, !node.IsReserved(), "node should not have been
reserved")
+ }
+ // filtered iterator returns all nodes again
+ itNodes = nil
+ nc.GetNodeIterator().ForEachNode(func(node *Node) bool {
+ itNodes = append(itNodes, node)
+ return true
+ })
+ assert.Equal(t, len(itNodes), count, "wrong length")
+}
diff --git a/pkg/scheduler/objects/node_test.go
b/pkg/scheduler/objects/node_test.go
index 27ad614d..6a725ac8 100644
--- a/pkg/scheduler/objects/node_test.go
+++ b/pkg/scheduler/objects/node_test.go
@@ -105,10 +105,9 @@ func TestCheckConditions(t *testing.T) {
}
func TestPreAllocateCheck(t *testing.T) {
- nodeID := nodeID1
resNode :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10,
"second": 1})
- node := newNode(nodeID, resNode.Resources)
- if node == nil || node.NodeID != nodeID {
+ node := newNode(nodeID1, resNode.Resources)
+ if node == nil || node.NodeID != nodeID1 {
t.Fatalf("node create failed which should not have %v", node)
}
@@ -132,24 +131,20 @@ func TestPreAllocateCheck(t *testing.T) {
assert.Assert(t, !node.preAllocateCheck(resOther, ""), "unknown
resource type should not have fitted on node")
// set allocated resource
- alloc := newAllocation(appID1, nodeID, resSmall)
+ alloc := newAllocation(appID1, nodeID1, resSmall)
node.AddAllocation(alloc)
assert.Assert(t, node.preAllocateCheck(resSmall, ""), "small resource
should have fitted in available allocation")
assert.Assert(t, !node.preAllocateCheck(resNode, ""), "node resource
should not have fitted in available allocation")
// check if we can allocate on a reserved node
- q := map[string]resources.Quantity{"first": 0}
- res := resources.NewResourceFromMap(q)
- ask := newAllocationAsk(aKey, appID1, res)
+ ask := newAllocationAsk(aKey, appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}))
app := newApplication(appID1, "default", "root.unknown")
// standalone reservation unreserve returns false as app is not reserved
reserve := newReservation(node, app, ask, false)
- node.reservations[reserve.getKey()] = reserve
- assert.Assert(t, !node.preAllocateCheck(resSmall, "app-2"), "node was
reserved for different app but check passed")
- assert.Assert(t, !node.preAllocateCheck(resSmall, "app-1|alloc-2"),
"node was reserved for this app but not the alloc and check passed")
- assert.Assert(t, node.preAllocateCheck(resSmall, appID1), "node was
reserved for this app but check did not pass check")
- assert.Assert(t, node.preAllocateCheck(resSmall, "app-1|alloc-1"),
"node was reserved for this app/alloc but check did not pass check")
+ node.reservations[reserve.allocKey] = reserve
+ assert.Assert(t, !node.preAllocateCheck(resSmall, aKey2), "node was
reserved for different app but check passed")
+ assert.Assert(t, node.preAllocateCheck(resSmall, aKey), "node was
reserved for this app/alloc but check did not pass check")
// Check if we can allocate on non scheduling node
node.SetSchedulable(false)
@@ -190,20 +185,14 @@ func TestNodeReservation(t *testing.T) {
if node == nil || node.NodeID != nodeID1 {
t.Fatalf("node create failed which should not have %v", node)
}
- if node.IsReserved() {
- t.Fatal("new node should not have reservations")
- }
- if node.isReservedForApp("") {
- t.Error("new node should not have reservations for empty key")
- }
- if node.isReservedForApp("unknown") {
- t.Error("new node should not have reservations for unknown key")
- }
+ assert.Assert(t, !node.IsReserved(), "new node should not have
reservations")
+ assert.Assert(t, !node.isReservedForAllocation(""), "new node should
not have reservations for empty key")
+ assert.Assert(t, !node.isReservedForAllocation("unknown"), "new node
should not have reservations for unknown key")
// reserve illegal request
err := node.Reserve(nil, nil)
if err == nil {
- t.Errorf("illegal reservation requested but did not fail: error
%v", err)
+ t.Fatal("illegal reservation requested but did not fail")
}
// too large for node
@@ -212,7 +201,7 @@ func TestNodeReservation(t *testing.T) {
app := newApplication(appID1, "default", "root.unknown")
err = node.Reserve(app, ask)
if err == nil {
- t.Errorf("requested reservation does not fit in node resource
but did not fail: error %v", err)
+ t.Fatal("requested reservation does not fit in node resource
but did not fail")
}
// resource type not available on node
@@ -221,7 +210,7 @@ func TestNodeReservation(t *testing.T) {
app = newApplication(appID1, "default", "root.unknown")
err = node.Reserve(app, ask)
if err == nil {
- t.Errorf("requested reservation does not match node resource
types but did not fail: error %v", err)
+ t.Fatal("requested reservation does not match node resource
types but did not fail")
}
res =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
@@ -230,39 +219,29 @@ func TestNodeReservation(t *testing.T) {
// reserve that works
err = node.Reserve(app, ask)
assert.NilError(t, err, "reservation should not have failed")
- if node.isReservedForApp("") {
- t.Error("node should not have reservations for empty key")
- }
- if node.isReservedForApp("unknown") {
- t.Errorf("node should not have reservations for unknown key")
- }
- if node.IsReserved() && !node.isReservedForApp(appID1) {
- t.Errorf("node should have reservations for app-1")
- }
+ assert.Assert(t, !node.isReservedForAllocation(""), "node should not
have reservations for empty key")
+ assert.Assert(t, !node.isReservedForAllocation("unknown"), "node should
not have reservations for unknown key")
+ assert.Assert(t, node.IsReserved(), "node should have been reserved")
+ assert.Assert(t, node.isReservedForAllocation(aKey), "node should have
reservations for alloc-1")
// 2nd reservation on node
- err = node.Reserve(nil, nil)
+ app2 := newApplication(appID2, "default", "root.unknown")
+ ask2 := newAllocationAsk("alloc-2", appID2, res)
+ err = node.Reserve(app2, ask2)
if err == nil {
- t.Errorf("reservation requested on already reserved node: error
%v", err)
+ t.Fatal("reservation requested on already reserved node")
}
- // unreserve different app
- _, err = node.unReserve(nil, nil)
- if err == nil {
- t.Errorf("illegal reservation release but did not fail: error
%v", err)
- }
- ask2 := newAllocationAsk("alloc-2", appID2, res)
- app2 := newApplication(appID2, "default", "root.unknown")
- var num int
- num, err = node.unReserve(app2, ask2)
- assert.NilError(t, err, "un-reserve different app should have failed
without error")
- assert.Equal(t, num, 0, "un-reserve different app should have failed
without releases")
- num, err = node.unReserve(app, ask)
- assert.NilError(t, err, "un-reserve should not have failed")
+ // unreserve different alloc
+ num := node.unReserve(nil)
+ assert.Equal(t, num, 0, "un-reserve different alloc should have failed
without releases")
+ num = node.unReserve(ask2)
+ assert.Equal(t, num, 0, "un-reserve different alloc should have failed
without releases")
+ num = node.unReserve(ask)
assert.Equal(t, num, 1, "un-reserve app should have released ")
}
-func TestIsReservedForApp(t *testing.T) {
+func TestRequiredNodeAfterReservation(t *testing.T) {
node := newNode(nodeID1, map[string]resources.Quantity{"first": 10})
if node == nil || node.NodeID != nodeID1 {
t.Fatalf("node create failed which should not have %v", node)
@@ -270,33 +249,76 @@ func TestIsReservedForApp(t *testing.T) {
if node.IsReserved() {
t.Fatal("new node should not have reservations")
}
-
- // check if we can allocate on a reserved node
- q := map[string]resources.Quantity{"first": 0}
- res := resources.NewResourceFromMap(q)
- ask := newAllocationAsk(aKey, appID1, res)
+ // normal node reservation
+ allocRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
+ ask := newAllocationAsk(aKey, appID1, allocRes)
app := newApplication(appID1, "default", "root.unknown")
+ err := node.Reserve(app, ask)
+ assert.NilError(t, err, "normal node reservation should not have
failed")
+ assert.Assert(t, node.IsReserved(), "node should have been reserved")
+ assert.Assert(t, node.isReservedForAllocation(aKey), "node should have
reservations for alloc-1")
- // standalone reservation unreserve returns false as app is not reserved
- reserve := newReservation(node, app, ask, false)
- node.reservations[reserve.getKey()] = reserve
- if node.isReservedForApp("app-2") {
- t.Error("node was reserved for different app but check passed ")
+ ask2 := newAllocationAsk(aKey2, appID1, allocRes)
+ ask2.requiredNode = nodeID1
+ app = newApplication(appID1, "default", "root.unknown")
+ // required node as 2nd reservation
+ err = node.Reserve(app, ask2)
+ if err == nil {
+ t.Fatalf("adding required node reservation should have failed")
}
- if node.isReservedForApp("app-1|alloc-2") {
- t.Error("node was reserved for this app but not the alloc and
check passed ")
+}
+
+func TestMultiRequiredNodeReservation(t *testing.T) {
+ node := newNode(nodeID1, map[string]resources.Quantity{"first": 10})
+ if node == nil || node.NodeID != nodeID1 {
+ t.Fatalf("node create failed which should not have %v", node)
}
- if !node.isReservedForApp(appID1) {
- t.Error("node was reserved for this app but check did not
passed ")
+ if node.IsReserved() {
+ t.Fatal("new node should not have reservations")
}
- if !node.isReservedForApp("app-1|alloc-1") {
- t.Error("node was reserved for this app/alloc but check did not
passed ")
+
+ // required node reservation
+ allocRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
+ ask := newAllocationAsk(aKey, appID1, allocRes)
+ ask.requiredNode = nodeID1
+ app := newApplication(appID1, "default", "root.unknown")
+ err := node.Reserve(app, ask)
+ assert.NilError(t, err, "required node reservation should not have
failed")
+ assert.Assert(t, node.IsReserved(), "node should have been reserved")
+ assert.Assert(t, node.isReservedForAllocation(aKey), "node should have
reservations for alloc-1")
+
+ ask2 := newAllocationAsk(aKey2, appID1, allocRes)
+ app = newApplication(appID1, "default", "root.unknown")
+ // non required node as 2nd reservation
+ err = node.Reserve(app, ask2)
+ if err == nil {
+ t.Fatalf("adding to required node reservation should have
failed")
}
- // app name similarity check: chop of the last char to make sure we
check the full name
- similar := appID1[:len(appID1)-1]
- if node.isReservedForApp(similar) {
- t.Errorf("similar app should not have reservations on node %s",
similar)
+
+ // required node as 2nd reservation on node
+ ask2.requiredNode = nodeID1
+ err = node.Reserve(app, ask2)
+ assert.NilError(t, err, "required node reservation should not have
failed")
+ assert.Assert(t, node.IsReserved(), "node should have been reserved")
+ assert.Assert(t, node.isReservedForAllocation(aKey2), "node should have
reservations for alloc-2")
+}
+
+func TestIsReservedForAllocation(t *testing.T) {
+ node := newNode(nodeID1, map[string]resources.Quantity{"first": 10})
+ if node == nil || node.NodeID != nodeID1 {
+ t.Fatalf("node create failed which should not have %v", node)
}
+ assert.Assert(t, !node.IsReserved(), "new node should not have
reservations")
+
+ // check if we can allocate on a reserved node
+ ask := newAllocationAsk(aKey, appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}))
+ app := newApplication(appID1, "default", "root.unknown")
+
+ // standalone reservation unreserve returns false as app is not reserved
+ reserve := newReservation(node, app, ask, false)
+ node.reservations[reserve.allocKey] = reserve
+ assert.Assert(t, !node.isReservedForAllocation(aKey2), "node was
reserved for different alloc but check passed ")
+ assert.Assert(t, node.isReservedForAllocation(aKey), "node was reserved
for this alloc but check did not passed ")
}
func TestAttributes(t *testing.T) {
@@ -858,8 +880,7 @@ func TestNodeEvents(t *testing.T) {
assert.Equal(t, si.EventRecord_NODE_RESERVATION,
event.EventChangeDetail)
mockEvents.Reset()
- _, err = node.unReserve(app, ask)
- assert.NilError(t, err, "could not unreserve")
+ assert.Equal(t, node.unReserve(ask), 1, "expected the reservation to be
removed")
event = mockEvents.Events[0]
assert.Equal(t, si.EventRecord_NODE, event.Type)
assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
diff --git a/pkg/scheduler/objects/preemption.go
b/pkg/scheduler/objects/preemption.go
index 10cacea2..c70aee4b 100644
--- a/pkg/scheduler/objects/preemption.go
+++ b/pkg/scheduler/objects/preemption.go
@@ -162,7 +162,7 @@ func (p *Preemptor) initWorkingState() {
// walk node iterator and track available resources per node
p.iterator.ForEachNode(func(node *Node) bool {
- if !node.IsSchedulable() || (node.IsReserved() &&
!node.isReservedForApp(reservationKey(nil, p.application, p.ask))) ||
!node.FitInNode(p.ask.GetAllocatedResource()) {
+ if !node.IsSchedulable() || (node.IsReserved() &&
!node.isReservedForAllocation(p.ask.GetAllocationKey())) ||
!node.FitInNode(p.ask.GetAllocatedResource()) {
// node is not available, remove any potential victims
from consideration
delete(allocationsByNode, node.NodeID)
} else {
diff --git a/pkg/scheduler/objects/reservation.go
b/pkg/scheduler/objects/reservation.go
index 5c7f672b..3001372d 100644
--- a/pkg/scheduler/objects/reservation.go
+++ b/pkg/scheduler/objects/reservation.go
@@ -21,37 +21,36 @@ package objects
import (
"go.uber.org/zap"
- "github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/log"
)
type reservation struct {
- nodeID string
- appID string
- askKey string
- // these references must ONLY be used for ask, node and application
removal otherwise
+ appID string
+ nodeID string
+ allocKey string
+ // these references must ONLY be used for alloc, node and application
removal otherwise
// the reservations cannot be removed and scheduling might be impacted.
- app *Application
- node *Node
- ask *Allocation
+ app *Application
+ node *Node
+ alloc *Allocation
}
// The reservation inside the scheduler. A reservation object is never mutated
and does not use locking.
// The key depends on where the reservation was made (node or app).
// appBased must be true for a reservation for an app and false for a
reservation on a node
-func newReservation(node *Node, app *Application, ask *Allocation, appBased
bool) *reservation {
- if ask == nil || app == nil || node == nil {
+func newReservation(node *Node, app *Application, alloc *Allocation, appBased
bool) *reservation {
+ if alloc == nil || app == nil || node == nil {
log.Log(log.SchedReservation).Warn("Illegal reservation
requested: one input is nil",
zap.Stringer("node", node),
zap.Stringer("app", app),
- zap.Stringer("ask", ask))
+ zap.Stringer("alloc", alloc))
return nil
}
res := &reservation{
- askKey: ask.GetAllocationKey(),
- ask: ask,
- app: app,
- node: node,
+ allocKey: alloc.GetAllocationKey(),
+ alloc: alloc,
+ app: app,
+ node: node,
}
if appBased {
res.nodeID = node.NodeID
@@ -61,51 +60,21 @@ func newReservation(node *Node, app *Application, ask
*Allocation, appBased bool
return res
}
-func reservationKey(node *Node, app *Application, ask *Allocation) string {
- if ask == nil || (app == nil && node == nil) || (app != nil && node !=
nil) {
- log.Log(log.SchedReservation).Warn("Illegal reservation key
requested",
- zap.Any("node", node),
- zap.Any("app", app),
- zap.Any("ask", ask))
- return ""
- }
- if node == nil {
- return ask.resKeyWithoutNode
- }
- key := ask.getReservationKeyForNode(node.NodeID)
- if key != common.Empty {
- return key
- }
-
- key = node.NodeID + "|" + ask.GetAllocationKey()
- ask.setReservationKeyForNode(node.NodeID, key)
- return key
-}
-
-func reservationKeyWithoutNode(appID, allocKey string) string {
- return appID + "|" + allocKey
-}
-
-// Return the reservation key
-func (r *reservation) getKey() string {
- if r.nodeID == "" {
- return r.appID + "|" + r.askKey
- }
- return r.nodeID + "|" + r.askKey
-}
-
func (r *reservation) String() string {
+ if r == nil {
+ return "nil reservation"
+ }
if r.nodeID == "" {
- return r.node.NodeID + " -> " + r.appID + "|" + r.askKey
+ return r.node.NodeID + " -> " + r.appID + "|" + r.allocKey
}
- return r.app.ApplicationID + " -> " + r.nodeID + "|" + r.askKey
+ return r.app.ApplicationID + " -> " + r.nodeID + "|" + r.allocKey
}
// GetObjects returns the objects that created the reservation.
// None of the returned values will be nil unless the reservation itself is nil
func (r *reservation) GetObjects() (*Node, *Application, *Allocation) {
if r != nil {
- return r.node, r.app, r.ask
+ return r.node, r.app, r.alloc
}
return nil, nil, nil
}
diff --git a/pkg/scheduler/objects/reservation_test.go
b/pkg/scheduler/objects/reservation_test.go
index 617ee55a..69716332 100644
--- a/pkg/scheduler/objects/reservation_test.go
+++ b/pkg/scheduler/objects/reservation_test.go
@@ -33,62 +33,49 @@ func TestNewReservation(t *testing.T) {
app := newApplication("app-1", "default", "root.unknown")
node := newNodeRes("node-1", res)
- // check the basics (failures)
- reserve := newReservation(nil, nil, nil, true)
- if reserve != nil {
- t.Errorf("reservation with nil objects should have returned
nil: %v", reserve)
+ tests := []struct {
+ name string
+ node *Node
+ app *Application
+ ask *Allocation
+ appBased bool
+ expected *reservation
+ }{
+ {"nil input", nil, nil, nil, true, nil},
+ {"nil alloc", node, app, nil, true, nil},
+ {"nil app", node, nil, ask, true, nil},
+ {"nil node", nil, app, ask, true, nil},
+ {"node based", node, app, ask, false, &reservation{"app-1", "",
"alloc-1", app, node, ask}},
+ {"app based", node, app, ask, true, &reservation{"", "node-1",
"alloc-1", app, node, ask}},
}
- reserve = newReservation(node, app, nil, true)
- if reserve != nil {
- t.Errorf("reservation with nil ask set should have returned
nil: '%v'", reserve)
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ reserve := newReservation(tt.node, tt.app, tt.ask,
tt.appBased)
+ if tt.expected == nil {
+ assert.Equal(t, tt.expected, reserve,
"unexpected reservation")
+ } else {
+ assert.Equal(t, reserve.appID,
tt.expected.appID, "incorrect appID")
+ assert.Equal(t, reserve.nodeID,
tt.expected.nodeID, "incorrect node ID")
+ assert.Equal(t, reserve.allocKey,
tt.expected.allocKey, "incorrect alloc key")
+ if tt.appBased {
+ assert.Equal(t, reserve.String(),
"app-1 -> node-1|alloc-1", "incorrect string form")
+ } else {
+ assert.Equal(t, reserve.String(),
"node-1 -> app-1|alloc-1", "incorrect string form")
+ }
+ }
+ })
}
- reserve = newReservation(node, nil, ask, true)
- if reserve != nil {
- t.Errorf("reservation with nil app set should have returned
nil: '%v'", reserve)
- }
- reserve = newReservation(nil, app, ask, true)
- if reserve != nil {
- t.Errorf("reservation with nil node set should have returned
nil: '%v'", reserve)
- }
-
- // working cases
- reserve = newReservation(node, app, ask, true)
- if reserve == nil {
- t.Fatalf("reservation with all objects set should have returned
nil: %v", reserve)
- }
- assert.Equal(t, reserve.getKey(), "node-1|alloc-1", "incorrect node
reservation key")
- assert.Equal(t, reserve.String(), "app-1 -> node-1|alloc-1", "incorrect
string form")
-
- reserve = newReservation(node, app, ask, false)
- if reserve == nil {
- t.Fatalf("reservation with all objects set should have returned
nil: %v", reserve)
- }
- assert.Equal(t, reserve.getKey(), "app-1|alloc-1", "incorrect app
reservation key")
- assert.Equal(t, reserve.String(), "node-1 -> app-1|alloc-1", "incorrect
string form")
}
-func TestReservationKey(t *testing.T) {
- // create the input objects
- res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
- ask := newAllocationAsk("alloc-1", "app-1", res)
- app := newApplication("app-1", "default", "root.unknown")
- node := newNodeRes("node-1", res)
-
- // check the basics
- reserve := reservationKey(nil, nil, nil)
- assert.Equal(t, reserve, "", "reservation with nil objects should have
empty key")
- reserve = reservationKey(node, app, nil)
- assert.Equal(t, reserve, "", "reservation with nil ask set should have
empty key")
- reserve = reservationKey(node, app, ask)
- assert.Equal(t, reserve, "", "reservation with all objects set should
have empty key")
-
- // other cases
- reserve = reservationKey(node, nil, ask)
- assert.Equal(t, reserve, "node-1|alloc-1", "incorrect node reservation
key")
- assert.Equal(t, "node-1|alloc-1", ask.resKeyPerNode["node-1"])
- assert.Equal(t, 1, len(ask.resKeyPerNode))
- reserve = reservationKey(nil, app, ask)
- assert.Equal(t, reserve, "app-1|alloc-1", "incorrect app reservation
key")
+func TestReservationString(t *testing.T) {
+ var nilReserve *reservation
+ defer func() {
+ if r := recover(); r != nil {
+ t.Fatal("panic on nil reservation in object test")
+ }
+ }()
+ str := nilReserve.String()
+ assert.Equal(t, "nil reservation", str, "nil reservation did not return
correct string")
}
func TestGetObjects(t *testing.T) {
diff --git a/pkg/scheduler/objects/utilities_test.go
b/pkg/scheduler/objects/utilities_test.go
index 97be13c2..20015a87 100644
--- a/pkg/scheduler/objects/utilities_test.go
+++ b/pkg/scheduler/objects/utilities_test.go
@@ -45,6 +45,7 @@ const (
appID3 = "app-3"
aKey = "alloc-1"
aKey2 = "alloc-2"
+ aKey3 = "alloc-3"
nodeID1 = "node-1"
nodeID2 = "node-2"
instType1 = "itype-1"
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 7db7e4fe..1375d185 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -891,6 +891,9 @@ func (pc *PartitionContext) allocate(result
*objects.AllocationResult) *objects.
return nil
}
+ // reservations were cancelled during the processing
+ pc.decReservationCount(result.CancelledReservations)
+
// reservation
if result.ResultType == objects.Reserved {
pc.reserve(app, targetNode, result.Request)
@@ -949,12 +952,23 @@ func (pc *PartitionContext) allocate(result
*objects.AllocationResult) *objects.
// Lock free call this must be called holding the context lock
func (pc *PartitionContext) reserve(app *objects.Application, node
*objects.Node, ask *objects.Allocation) {
appID := app.ApplicationID
- // app has node already reserved cannot reserve again
- if app.IsReservedOnNode(node.NodeID) {
- log.Log(log.SchedPartition).Info("Application is already
reserved on node",
+ // check if ask has reserved already, cannot have multiple reservations
for one ask
+ nodeID := app.NodeReservedForAsk(ask.GetAllocationKey())
+ // We should not see a reservation for this ask yet
+ // sanity check the node that is reserved: same node just be done
+ // different node: fix it, unreserve the old node and reserve the new
one
+ // this is all to safeguard the system it should never happen.
+ if nodeID != "" {
+ // same nodeID we do not need to do anything
+ if nodeID == node.NodeID {
+ return
+ }
+ log.Log(log.SchedPartition).Warn("ask is already reserved on
different node, fixing reservations",
zap.String("appID", appID),
- zap.String("nodeID", node.NodeID))
- return
+ zap.String("allocationKey", ask.GetAllocationKey()),
+ zap.String("reserved nodeID", nodeID),
+ zap.String("new nodeID", node.NodeID))
+ pc.unReserve(app, pc.nodes.GetNode(nodeID), ask)
}
// all ok, add the reservation to the app, this will also reserve the
node
if err := app.Reserve(node, ask); err != nil {
@@ -978,13 +992,7 @@ func (pc *PartitionContext) reserve(app
*objects.Application, node *objects.Node
// NOTE: this is a lock free call. It must NOT be called holding the
PartitionContext lock.
func (pc *PartitionContext) unReserve(app *objects.Application, node
*objects.Node, ask *objects.Allocation) {
// remove the reservation of the app, this will also unReserve the node
- var err error
- var num int
- if num, err = app.UnReserve(node, ask); err != nil {
- log.Log(log.SchedPartition).Info("Failed to unreserve, error
during allocate on the app",
- zap.Error(err))
- return
- }
+ num := app.UnReserve(node, ask)
// remove the reservation of the queue
appID := app.ApplicationID
app.GetQueue().UnReserve(appID, num)
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 75e529eb..673c3948 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -484,7 +484,7 @@ func TestPlaceholderDataWithPlaceholderPreemption(t
*testing.T) {
}
// check if updated (must be after allocate call)
assert.Equal(t, 1, len(app2.GetReservations()), "app reservation should
have been updated")
- assert.Equal(t, 1, len(app2.GetAskReservations(allocKey2)), "ask should
have been reserved")
+ assert.Equal(t, app2.NodeReservedForAsk(allocKey2), nodeID2, "ask
should have been reserved")
// try through reserved scheduling cycle this should trigger preemption
result = partition.tryReservedAllocate()
@@ -1626,6 +1626,10 @@ func TestRequiredNodeReservation(t *testing.T) {
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
+ node := partition.nodes.GetNode(nodeID1)
+ if node == nil {
+ t.Fatal("node-1 should have been created")
+ }
app := newApplication(appID1, "default", "root.parent.sub-leaf")
res, err := resources.NewResourceFromConf(map[string]string{"vcore":
"8"})
@@ -1661,7 +1665,12 @@ func TestRequiredNodeReservation(t *testing.T) {
}
// check if updated (must be after allocate call)
assert.Equal(t, 1, len(app.GetReservations()), "app should have one
reserved ask")
- assert.Equal(t, 1, len(app.GetAskReservations(allocKey2)), "ask should
have been reserved")
+ assert.Equal(t, app.NodeReservedForAsk(allocKey2), nodeID1, "ask should
have been reserved on node-1")
+ assert.Assert(t, node.IsReserved(), "node should have been reserved")
+ reservations := node.GetReservations()
+ assert.Equal(t, len(reservations), 1, "node should have two
reservations")
+ _, _, resAsk := reservations[0].GetObjects()
+ assert.Equal(t, resAsk.GetAllocationKey(), allocKey2, "alloc-2 should
have been reserved on the node")
assertLimits(t, getTestUserGroup(), res)
// allocation that fits on the node should not be allocated
@@ -1669,7 +1678,7 @@ func TestRequiredNodeReservation(t *testing.T) {
res2, err = resources.NewResourceFromConf(map[string]string{"vcore":
"1"})
assert.NilError(t, err, "failed to create resource")
- ask3 := newAllocationAsk("alloc-3", appID1, res2)
+ ask3 := newAllocationAsk(allocKey3, appID1, res2)
ask3.SetRequiredNode(nodeID1)
err = app.AddAllocationAsk(ask3)
assert.NilError(t, err, "failed to add ask alloc-3 to app-1")
@@ -1678,13 +1687,24 @@ func TestRequiredNodeReservation(t *testing.T) {
t.Fatal("allocation attempt should not have returned an
allocation")
}
- // reservation count remains same as last try allocate should have
failed to find a reservation
- assert.Equal(t, 1, len(app.GetReservations()), "ask should not have
been reserved, count changed")
+ // reservation count should be updated as tryAllocate should have added
a reservation for the 3rd ask
+ assert.Equal(t, 2, len(app.GetReservations()), "ask should not have
been reserved, count changed")
+ assert.Equal(t, app.NodeReservedForAsk(allocKey2), nodeID1, "alloc-2
should have been reserved on node-1")
+ assert.Equal(t, app.NodeReservedForAsk(allocKey3), nodeID1, "alloc-3
should have been reserved on node-1")
+ assert.Assert(t, node.IsReserved(), "node should have been reserved")
+ reservations = node.GetReservations()
+ assert.Equal(t, len(reservations), 2, "node should have two
reservations")
+ _, _, resAsk = reservations[0].GetObjects()
+ _, _, resAsk2 := reservations[1].GetObjects()
+ if !((resAsk.GetAllocationKey() == allocKey3 &&
resAsk2.GetAllocationKey() == allocKey2) ||
+ (resAsk2.GetAllocationKey() == allocKey3 &&
resAsk.GetAllocationKey() == allocKey2)) {
+ t.Fatal("missing reservation on the node")
+ }
assertLimits(t, getTestUserGroup(), res)
}
// allocate ask request with required node having non daemon set reservations
-func TestRequiredNodeCancelNonDSReservations(t *testing.T) {
+func TestRequiredNodeCancelOtherReservations(t *testing.T) {
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
@@ -1733,8 +1753,9 @@ func TestRequiredNodeCancelNonDSReservations(t
*testing.T) {
t.Fatal("2nd allocation did not return the correct allocation")
}
// check if updated (must be after allocate call)
- assert.Equal(t, 1, len(app.GetReservations()), "ask should have been
reserved")
+ assert.Equal(t, 1, len(app.GetReservations()), "allocation should have
been reserved")
assert.Equal(t, 1, len(app.GetQueue().GetReservedApps()), "queue
reserved apps should be 1")
+ assert.Equal(t, 1, partition.reservations, "partition reservations
should be 1")
res1, err := resources.NewResourceFromConf(map[string]string{"vcore":
"1"})
assert.NilError(t, err, "failed to create resource")
@@ -1757,8 +1778,9 @@ func TestRequiredNodeCancelNonDSReservations(t
*testing.T) {
assert.Equal(t, objects.Allocated, result.ResultType, "allocation
result type should have been allocated")
// earlier app (app1) reservation count should be zero
- assert.Equal(t, 0, len(app.GetReservations()), "ask should have been
reserved")
+ assert.Equal(t, 0, len(app.GetReservations()), "allocation should not
have been reserved")
assert.Equal(t, 0, len(app.GetQueue().GetReservedApps()), "queue
reserved apps should be 0")
+ assert.Equal(t, 0, partition.reservations, "partition reservations
should be 0")
}
// allocate ask request with required node having daemon set reservations
@@ -1787,11 +1809,11 @@ func TestRequiredNodeCancelDSReservations(t *testing.T)
{
err = partition.AddApplication(app)
assert.NilError(t, err, "failed to add app-1 to partition")
- ask := newAllocationAsk("alloc-1", appID1, res)
+ ask := newAllocationAsk(allocKey, appID1, res)
ask.SetRequiredNode(nodeID1)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add ask 1 to app")
- ask = newAllocationAsk("alloc-2", appID1, res)
+ ask = newAllocationAsk(allocKey2, appID1, res)
ask.SetRequiredNode(nodeID1)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add ask 2 to app")
@@ -1815,6 +1837,7 @@ func TestRequiredNodeCancelDSReservations(t *testing.T) {
// check if updated (must be after allocate call)
assert.Equal(t, 1, len(app.GetReservations()), "ask should have been
reserved")
assert.Equal(t, 1, len(app.GetQueue().GetReservedApps()), "queue
reserved apps should be 1")
+ assert.Equal(t, 1, partition.reservations, "partition reservations
should be 1")
res1, err := resources.NewResourceFromConf(map[string]string{"vcore":
"1"})
assert.NilError(t, err, "failed to create resource")
@@ -1825,18 +1848,24 @@ func TestRequiredNodeCancelDSReservations(t *testing.T)
{
assert.NilError(t, err, "failed to add app-2 to partition")
// required node set on ask
- ask2 := newAllocationAsk("alloc-2", appID2, res1)
+ ask2 := newAllocationAsk(allocKey3, appID2, res1)
ask2.SetRequiredNode(nodeID1)
err = app1.AddAllocationAsk(ask2)
- assert.NilError(t, err, "failed to add ask alloc-2 to app-1")
+ assert.NilError(t, err, "failed to add ask alloc-3 to app-1")
+ // this is a reservation handled before we get here and we get a nil
result = partition.tryAllocate()
if result != nil {
t.Fatal("3rd allocation should not return allocation")
}
- // still reservation count is 1
- assert.Equal(t, 1, len(app.GetReservations()), "ask should have been
reserved")
- assert.Equal(t, 1, len(app.GetQueue().GetReservedApps()), "queue
reserved apps should be 1")
+ // check the reservation count etc for each app
+ assert.Equal(t, app.NodeReservedForAsk(allocKey2), nodeID1, "alloc-2
should be reserved on node-1")
+ assert.Equal(t, 1, len(app.GetReservations()), "only one alloc on app-1
should have been reserved")
+ assert.Equal(t, app1.NodeReservedForAsk(allocKey3), nodeID1, "alloc-3
should be reserved on node-1")
+ assert.Equal(t, 1, len(app1.GetReservations()), "alloc-3 on app-2
should have been reserved")
+ // both apps run in the same queue so just pick one app to get the queue
+ assert.Equal(t, 2, len(app.GetQueue().GetReservedApps()), "queue
reserved apps should be 2")
+ assert.Equal(t, 2, partition.reservations, "partition reservations
should be 2")
}
func TestRequiredNodeNotExist(t *testing.T) {
@@ -2015,15 +2044,16 @@ func TestPreemption(t *testing.T) {
// Preemption followed by a normal allocation
func TestPreemptionForRequiredNodeNormalAlloc(t *testing.T) {
setupUGM()
- // setup the partition so we can try the real allocation
+ // set up the partition so we can try the real allocation
partition, app := setupPreemptionForRequiredNode(t)
// now try the allocation again: the normal path
result := partition.tryAllocate()
- if result != nil {
- t.Fatal("allocations should not have returned a result")
+ if result == nil || result.Request == nil {
+ t.Fatal("allocation should have returned a result")
}
// check if updated (must be after allocate call)
- assert.Equal(t, 1, len(app.GetReservations()), "ask should have been
reserved")
+ assert.Equal(t, 0, len(app.GetReservations()), "no allocations on app
should have been reserved")
+ assert.Equal(t, 0, partition.reservations, "no allocations on partition
should have been reserved")
}
// Preemption followed by a reserved allocation
@@ -2234,8 +2264,8 @@ func setupPreemptionForRequiredNode(t *testing.T)
(*PartitionContext, *objects.A
t.Fatal("allocation attempt should not have returned an
allocation")
}
// check if updated (must be after allocate call)
- assert.Equal(t, 1, len(app.GetReservations()), "ask should have been
reserved")
- assert.Equal(t, 1, len(app.GetAskReservations(allocKey2)), "ask should
have been reserved")
+ assert.Equal(t, 1, len(app.GetReservations()), "application should have
been reserved")
+ assert.Equal(t, app.NodeReservedForAsk(allocKey2), nodeID1, "allocation
should have been reserved on node-1")
assertUserGroupResourceMaxLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}),
getExpectedQueuesLimitsForPreemptionWithRequiredNode())
// try through reserved scheduling cycle this should trigger preemption
@@ -2395,7 +2425,7 @@ func TestTryAllocateReserve(t *testing.T) {
err = app.AddAllocationAsk(newAllocationAsk("alloc-1", appID1, res))
assert.NilError(t, err, "failed to add ask alloc-1 to app")
- ask := newAllocationAsk("alloc-2", appID1, res)
+ ask := newAllocationAsk(allocKey2, appID1, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add ask alloc-2 to app")
node2 := partition.GetNode(nodeID2)
@@ -2403,8 +2433,8 @@ func TestTryAllocateReserve(t *testing.T) {
t.Fatal("expected node-2 to be returned got nil")
}
partition.reserve(app, node2, ask)
- if !app.IsReservedOnNode(node2.NodeID) ||
len(app.GetAskReservations("alloc-2")) == 0 {
- t.Fatalf("reservation failure for ask2 and node2")
+ if app.NodeReservedForAsk(allocKey2) != nodeID2 {
+ t.Fatalf("reservation failure for alloc-2 and node-2")
}
// first allocation should be app-1 and alloc-2
@@ -2416,11 +2446,11 @@ func TestTryAllocateReserve(t *testing.T) {
assert.Equal(t, result.ReservedNodeID, "", "node should not be set for
allocated from reserved")
assert.Check(t, !result.Request.HasRelease(), "released allocation
should not be present")
assert.Equal(t, result.Request.GetApplicationID(), appID1, "expected
application app-1 to be allocated")
- assert.Equal(t, result.Request.GetAllocationKey(), "alloc-2", "expected
ask alloc-2 to be allocated")
+ assert.Equal(t, result.Request.GetAllocationKey(), allocKey2, "expected
ask alloc-2 to be allocated")
assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000}))
// reservations should have been removed: it is in progress
- if app.IsReservedOnNode(node2.NodeID) ||
len(app.GetAskReservations("alloc-2")) != 0 {
+ if app.NodeReservedForAsk(allocKey2) != "" {
t.Fatalf("reservation removal failure for ask2 and node2")
}
@@ -2462,11 +2492,11 @@ func TestTryAllocateWithReserved(t *testing.T) {
err = partition.AddApplication(app)
assert.NilError(t, err, "failed to add app-1 to partition")
- ask := newAllocationAsk("alloc-1", appID1, res)
+ ask := newAllocationAsk(allocKey, appID1, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add ask alloc-1 to app")
- ask2 := newAllocationAsk("alloc-2", appID1, res)
+ ask2 := newAllocationAsk(allocKey2, appID1, res)
err = app.AddAllocationAsk(ask2)
assert.NilError(t, err, "failed to add ask alloc-2 to app")
@@ -2476,8 +2506,8 @@ func TestTryAllocateWithReserved(t *testing.T) {
t.Fatal("expected node-2 to be returned got nil")
}
partition.reserve(app, node2, ask)
- if !app.IsReservedOnNode(node2.NodeID) ||
len(app.GetAskReservations("alloc-1")) == 0 {
- t.Fatal("reservation failure for ask and node2")
+ if app.NodeReservedForAsk(allocKey) != nodeID2 {
+ t.Fatal("reservation failure for alloc-1 and node-2")
}
result := partition.tryAllocate()
if result == nil || result.Request == nil {
@@ -2486,7 +2516,7 @@ func TestTryAllocateWithReserved(t *testing.T) {
assert.Equal(t, objects.AllocatedReserved, result.ResultType, "expected
reserved allocation to be returned")
assert.Equal(t, "", result.ReservedNodeID, "reserved node should be
reset after processing")
assert.Equal(t, 0, len(node2.GetReservationKeys()), "reservation should
have been removed from node")
- assert.Equal(t, false, app.IsReservedOnNode(node2.NodeID), "reservation
cleanup for ask on app failed")
+ assert.Equal(t, "", app.NodeReservedForAsk(allocKey), "reservation
cleanup for ask on app failed")
assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 5000}))
// node2 is unreserved now so the next one should allocate on the 2nd
node (fair sharing)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]