This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 40271160 [YUNIKORN-3190] Fix race condition occurring between released
and preempted allocations (#1058)
40271160 is described below
commit 40271160da1b93ff8363521cbbb711efb42f747f
Author: mani <[email protected]>
AuthorDate: Tue Jan 13 12:39:26 2026 +0530
[YUNIKORN-3190] Fix race condition occurring between released and preempted
allocations (#1058)
Closes: #1058
Signed-off-by: mani <[email protected]>
---
pkg/common/errors.go | 1 +
pkg/log/logger.go | 61 ++++----
pkg/scheduler/objects/allocation.go | 22 ++-
pkg/scheduler/objects/application.go | 168 +++++++++++++--------
pkg/scheduler/objects/application_test.go | 142 ++++++++++++++++-
pkg/scheduler/objects/preemption.go | 25 ++-
pkg/scheduler/objects/preemption_test.go | 77 ++++++++++
pkg/scheduler/objects/queue_test.go | 3 +-
pkg/scheduler/objects/quota_change_preemptor.go | 8 +-
.../objects/quota_change_preemptor_test.go | 12 +-
pkg/scheduler/objects/required_node_preemptor.go | 65 +++++++-
.../objects/required_node_preemptor_test.go | 54 +++++--
pkg/scheduler/partition_test.go | 3 +-
13 files changed, 528 insertions(+), 113 deletions(-)
diff --git a/pkg/common/errors.go b/pkg/common/errors.go
index 4987ec90..210c4089 100644
--- a/pkg/common/errors.go
+++ b/pkg/common/errors.go
@@ -41,4 +41,5 @@ const (
PreemptionDoesNotHelp = "Preemption does not help"
NoVictimForRequiredNode = "No fit on required node, preemption
does not help"
PreemptionMaxAttemptsExhausted = "Preemption max attempts exhausted"
+ PreemptionVictimsReleased = "Victims picked earlier were released
at the final stage"
)
diff --git a/pkg/log/logger.go b/pkg/log/logger.go
index bbfe3ba3..0c1f5388 100644
--- a/pkg/log/logger.go
+++ b/pkg/log/logger.go
@@ -54,36 +54,37 @@ const (
// Defined loggers: when adding new loggers, ids must be sequential, and all
must be added to the loggers slice in the same order
var (
- Core = &LoggerHandle{id: 0, name: "core"}
- Test = &LoggerHandle{id: 1, name: "test"}
- Deprecation = &LoggerHandle{id: 2, name: "deprecation"}
- Config = &LoggerHandle{id: 3, name: "core.config"}
- Entrypoint = &LoggerHandle{id: 4, name:
"core.entrypoint"}
- Events = &LoggerHandle{id: 5, name: "core.events"}
- OpenTracing = &LoggerHandle{id: 6, name:
"core.opentracing"}
- Resources = &LoggerHandle{id: 7, name:
"core.resources"}
- REST = &LoggerHandle{id: 8, name: "core.rest"}
- RMProxy = &LoggerHandle{id: 9, name: "core.rmproxy"}
- RPC = &LoggerHandle{id: 10, name: "core.rpc"}
- Metrics = &LoggerHandle{id: 11, name: "core.metrics"}
- Scheduler = &LoggerHandle{id: 12, name:
"core.scheduler"}
- SchedAllocation = &LoggerHandle{id: 13, name:
"core.scheduler.allocation"}
- SchedApplication = &LoggerHandle{id: 14, name:
"core.scheduler.application"}
- SchedAppUsage = &LoggerHandle{id: 15, name:
"core.scheduler.application.usage"}
- SchedContext = &LoggerHandle{id: 16, name:
"core.scheduler.context"}
- SchedFSM = &LoggerHandle{id: 17, name:
"core.scheduler.fsm"}
- SchedHealth = &LoggerHandle{id: 18, name:
"core.scheduler.health"}
- SchedNode = &LoggerHandle{id: 19, name:
"core.scheduler.node"}
- SchedPartition = &LoggerHandle{id: 20, name:
"core.scheduler.partition"}
- SchedPreemption = &LoggerHandle{id: 21, name:
"core.scheduler.preemption"}
- SchedQueue = &LoggerHandle{id: 22, name:
"core.scheduler.queue"}
- SchedReservation = &LoggerHandle{id: 23, name:
"core.scheduler.reservation"}
- SchedUGM = &LoggerHandle{id: 24, name:
"core.scheduler.ugm"}
- SchedNodesUsage = &LoggerHandle{id: 25, name:
"core.scheduler.nodesusage"}
- Security = &LoggerHandle{id: 26, name:
"core.security"}
- Utils = &LoggerHandle{id: 27, name: "core.utils"}
- Diagnostics = &LoggerHandle{id: 28, name:
"core.diagnostics"}
- SchedQuotaChangePreemption = &LoggerHandle{id: 29, name:
"core.scheduler.preemption.quotachange"}
+ Core = &LoggerHandle{id: 0, name: "core"}
+ Test = &LoggerHandle{id: 1, name: "test"}
+ Deprecation = &LoggerHandle{id: 2, name: "deprecation"}
+ Config = &LoggerHandle{id: 3, name: "core.config"}
+ Entrypoint = &LoggerHandle{id: 4, name:
"core.entrypoint"}
+ Events = &LoggerHandle{id: 5, name: "core.events"}
+ OpenTracing = &LoggerHandle{id: 6, name:
"core.opentracing"}
+ Resources = &LoggerHandle{id: 7, name:
"core.resources"}
+ REST = &LoggerHandle{id: 8, name: "core.rest"}
+ RMProxy = &LoggerHandle{id: 9, name: "core.rmproxy"}
+ RPC = &LoggerHandle{id: 10, name: "core.rpc"}
+ Metrics = &LoggerHandle{id: 11, name:
"core.metrics"}
+ Scheduler = &LoggerHandle{id: 12, name:
"core.scheduler"}
+ SchedAllocation = &LoggerHandle{id: 13, name:
"core.scheduler.allocation"}
+ SchedApplication = &LoggerHandle{id: 14, name:
"core.scheduler.application"}
+ SchedAppUsage = &LoggerHandle{id: 15, name:
"core.scheduler.application.usage"}
+ SchedContext = &LoggerHandle{id: 16, name:
"core.scheduler.context"}
+ SchedFSM = &LoggerHandle{id: 17, name:
"core.scheduler.fsm"}
+ SchedHealth = &LoggerHandle{id: 18, name:
"core.scheduler.health"}
+ SchedNode = &LoggerHandle{id: 19, name:
"core.scheduler.node"}
+ SchedPartition = &LoggerHandle{id: 20, name:
"core.scheduler.partition"}
+ SchedPreemption = &LoggerHandle{id: 21, name:
"core.scheduler.preemption"}
+ SchedQueue = &LoggerHandle{id: 22, name:
"core.scheduler.queue"}
+ SchedReservation = &LoggerHandle{id: 23, name:
"core.scheduler.reservation"}
+ SchedUGM = &LoggerHandle{id: 24, name:
"core.scheduler.ugm"}
+ SchedNodesUsage = &LoggerHandle{id: 25, name:
"core.scheduler.nodesusage"}
+ Security = &LoggerHandle{id: 26, name:
"core.security"}
+ Utils = &LoggerHandle{id: 27, name: "core.utils"}
+ Diagnostics = &LoggerHandle{id: 28, name:
"core.diagnostics"}
+ SchedQuotaChangePreemption = &LoggerHandle{id: 29, name:
"core.scheduler.preemption.quotachange"}
+ SchedRequiredNodePreemption = &LoggerHandle{id: 29, name:
"core.scheduler.preemption.requirednode"}
)
// this tracks all the known logger handles, used to preallocate the real
logger instances when configuration changes
diff --git a/pkg/scheduler/objects/allocation.go
b/pkg/scheduler/objects/allocation.go
index 3b60c80b..92d2b900 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -19,6 +19,7 @@
package objects
import (
+ "errors"
"fmt"
"strconv"
"time"
@@ -294,10 +295,16 @@ func (a *Allocation) IsReleased() bool {
}
// SetReleased updates the release status of the allocation.
-func (a *Allocation) SetReleased(released bool) {
+func (a *Allocation) SetReleased(released bool) error {
a.Lock()
defer a.Unlock()
+ if released {
+ if a.preempted {
+ return errors.New("allocation is already preempted")
+ }
+ }
a.released = released
+ return nil
}
// GetTagsClone returns the copy of the tags for this allocation.
@@ -348,10 +355,21 @@ func (a *Allocation)
SetAllocatedResource(allocatedResource *resources.Resource)
}
// MarkPreempted marks the allocation as preempted.
-func (a *Allocation) MarkPreempted() {
+func (a *Allocation) MarkPreempted() error {
a.Lock()
defer a.Unlock()
+ if a.released {
+ return errors.New("allocation is already released")
+ }
a.preempted = true
+ return nil
+}
+
+// MarkUnPreempted unmarks the allocation as preempted.
+func (a *Allocation) MarkUnPreempted() {
+ a.Lock()
+ defer a.Unlock()
+ a.preempted = false
}
// IsPreempted returns whether the allocation has been marked for preemption
or not.
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 3c065764..986f1bb4 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -330,15 +330,30 @@ func (sa *Application) timeoutStateTimer(expectedState
string, event application
zap.String("state", sa.stateMachine.Current()))
// if the app is completing, but there are placeholders
left, first do the cleanup
if sa.IsCompleting() &&
!resources.IsZero(sa.allocatedPlaceholder) {
+ replacing := 0
+ preempted := 0
var toRelease []*Allocation
for _, alloc := range
sa.getPlaceholderAllocations() {
// skip over the allocations that are
already marked for release
if alloc.IsReleased() {
+ replacing++
+ continue
+ }
+ err := alloc.SetReleased(true)
+ if err != nil {
+
log.Log(log.SchedApplication).Warn("allocation is already preempted, so
skipping release process",
+
zap.String("applicationID", sa.ApplicationID),
+
zap.String("allocationKey", alloc.GetAllocationKey()))
+ preempted++
continue
}
- alloc.SetReleased(true)
toRelease = append(toRelease, alloc)
}
+ log.Log(log.SchedApplication).Info("application
is getting timed out, releasing allocated placeholders",
+ zap.String("AppID", sa.ApplicationID),
+ zap.Int("replaced", replacing),
+ zap.Int("preempted", preempted),
+ zap.Int("releasing", len(toRelease)))
sa.notifyRMAllocationReleased(toRelease,
si.TerminationType_TIMEOUT, "releasing placeholders on app complete")
sa.clearStateTimer()
} else {
@@ -395,19 +410,28 @@ func (sa *Application) timeoutPlaceholderProcessing() {
// Case 1: if all app's placeholders are allocated, only part
of them gets replaced, just delete the remaining placeholders
var toRelease []*Allocation
replacing := 0
+ preempted := 0
for _, alloc := range sa.getPlaceholderAllocations() {
// skip over the allocations that are already marked
for release, they will be replaced soon
if alloc.IsReleased() {
replacing++
continue
}
- alloc.SetReleased(true)
+ err := alloc.SetReleased(true)
+ if err != nil {
+ log.Log(log.SchedApplication).Warn("allocation
is already preempted, so skipping release process",
+ zap.String("applicationID",
sa.ApplicationID),
+ zap.String("allocationKey",
alloc.GetAllocationKey()))
+ preempted++
+ continue
+ }
toRelease = append(toRelease, alloc)
}
log.Log(log.SchedApplication).Info("Placeholder timeout,
releasing allocated placeholders",
zap.String("AppID", sa.ApplicationID),
- zap.Int("placeholders being replaced", replacing),
- zap.Int("releasing placeholders", len(toRelease)))
+ zap.Int("replaced", replacing),
+ zap.Int("preempted", preempted),
+ zap.Int("releasing", len(toRelease)))
// trigger the release of the placeholders: accounting updates
when the release is done
sa.notifyRMAllocationReleased(toRelease,
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder
timeout")
} else {
@@ -425,22 +449,38 @@ func (sa *Application) timeoutPlaceholderProcessing() {
}
// all allocations are placeholders release them all
var toRelease, pendingRelease []*Allocation
+ preempted := 0
for _, alloc := range sa.allocations {
- alloc.SetReleased(true)
+ err := alloc.SetReleased(true)
+ if err != nil {
+ log.Log(log.SchedApplication).Warn("allocation
is already preempted, so skipping release process",
+ zap.String("applicationID",
sa.ApplicationID),
+ zap.String("allocationKey",
alloc.GetAllocationKey()))
+ preempted++
+ continue
+ }
toRelease = append(toRelease, alloc)
}
// get all open requests and remove them all filter out already
allocated as they are already released
for _, alloc := range sa.requests {
if !alloc.IsAllocated() {
- alloc.SetReleased(true)
+ err := alloc.SetReleased(true)
+ if err != nil {
+
log.Log(log.SchedApplication).Warn("allocation is already preempted, so
skipping release process",
+ zap.String("applicationID",
sa.ApplicationID),
+ zap.String("allocationKey",
alloc.GetAllocationKey()))
+ preempted++
+ continue
+ }
pendingRelease = append(pendingRelease, alloc)
sa.placeholderData[alloc.taskGroupName].TimedOut++
}
}
log.Log(log.SchedApplication).Info("Placeholder timeout,
releasing allocated and pending placeholders",
zap.String("AppID", sa.ApplicationID),
- zap.Int("releasing placeholders", len(toRelease)),
- zap.Int("pending placeholders", len(pendingRelease)),
+ zap.Int("releasing", len(toRelease)),
+ zap.Int("pending", len(pendingRelease)),
+ zap.Int("preempted", preempted),
zap.String("gang scheduling style",
sa.gangSchedulingStyle))
sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
// trigger the release of the allocated placeholders:
accounting updates when the release is done
@@ -1202,9 +1242,15 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
zap.String("placeholderKey",
ph.GetAllocationKey()),
zap.Stringer("placeholder resource",
ph.GetAllocatedResource()))
// release the placeholder and tell the RM
- ph.SetReleased(true)
-
sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT,
"cancel placeholder: resource incompatible")
-
sa.appEvents.SendPlaceholderLargerEvent(ph.taskGroupName, sa.ApplicationID,
ph.allocationKey, request.GetAllocatedResource(), ph.GetAllocatedResource())
+ err := ph.SetReleased(true)
+ if err != nil {
+
log.Log(log.SchedApplication).Warn("allocation is already preempted, so
skipping placeholder cancellation",
+ zap.String("applicationID",
sa.ApplicationID),
+ zap.String("allocationKey",
ph.GetAllocationKey()))
+ } else {
+
sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT,
"cancel placeholder: resource incompatible")
+
sa.appEvents.SendPlaceholderLargerEvent(ph.taskGroupName, sa.ApplicationID,
ph.allocationKey, request.GetAllocatedResource(), ph.GetAllocatedResource())
+ }
continue
}
// placeholder is the same or larger continue
processing and difference is handled when the placeholder
@@ -1228,7 +1274,25 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
// placeholder point to the real one in the
releases list
ph.SetRelease(request)
// mark placeholder as released
- ph.SetReleased(true)
+ err = ph.SetReleased(true)
+ if err != nil {
+
log.Log(log.SchedApplication).Warn("allocation is already preempted, so not
proceeding further and reverting to old state",
+ zap.String("applicationID",
sa.ApplicationID),
+ zap.String("allocationKey",
ph.GetAllocationKey()))
+
+ // revert: allocate ask
+ _, err = sa.deallocateAsk(request)
+ if err != nil {
+
log.Log(log.SchedApplication).Warn("deallocation of ask failed unexpectedly",
+ zap.Error(err))
+ }
+ // revert: double link to make it
easier to find
+ // alloc (the real one) releases points
to the placeholder in the releases list
+ request.ClearRelease()
+ // revert: placeholder point to the
real one in the releases list
+ ph.ClearRelease()
+ continue
+ }
// bind node here so it will be handled
properly upon replacement
request.SetBindTime(time.Now())
request.SetNodeID(node.NodeID)
@@ -1285,7 +1349,34 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
// placeholder point to the real one in the releases
list
phFit.SetRelease(reqFit)
// mark placeholder as released
- phFit.SetReleased(true)
+ err = phFit.SetReleased(true)
+
+ if err != nil {
+ log.Log(log.SchedApplication).Warn("allocation
is already preempted, so not proceeding further and reverting to old state",
+ zap.String("applicationID",
sa.ApplicationID),
+ zap.String("allocationKey",
phFit.GetAllocationKey()))
+
+ // revert: node allocation
+ if alloc :=
node.RemoveAllocation(reqFit.GetAllocationKey()); alloc != nil {
+
log.Log(log.SchedApplication).Debug("Reverting: Node removal failed
unexpectedly",
+ zap.String("applicationID",
sa.ApplicationID),
+ zap.Stringer("alloc", reqFit))
+ }
+
+ // revert: allocate ask
+ _, err = sa.deallocateAsk(reqFit)
+ if err != nil {
+
log.Log(log.SchedApplication).Warn("deallocation of ask failed unexpectedly",
+ zap.Error(err))
+ }
+ // revert: double link to make it easier to find
+ // alloc (the real one) releases points to the
placeholder in the releases list
+ reqFit.ClearRelease()
+ // revert: placeholder point to the real one in
the releases list
+ phFit.ClearRelease()
+ return false
+ }
+
// bind node here so it will be handled properly upon
replacement
reqFit.SetBindTime(time.Now())
reqFit.SetNodeID(node.NodeID)
@@ -1340,7 +1431,9 @@ func (sa *Application) tryReservedAllocate(headRoom
*resources.Resource, nodeIte
// Do we need a specific node?
if ask.GetRequiredNode() != "" {
if
!reserve.node.CanAllocate(ask.GetAllocatedResource()) &&
!ask.HasTriggeredPreemption() {
- sa.tryRequiredNodePreemption(reserve, ask)
+ // try preemption and see if we can free up
resource
+ preemptor :=
NewRequiredNodePreemptor(reserve.node, ask, sa)
+ preemptor.tryPreemption()
continue
}
}
@@ -1403,46 +1496,6 @@ func (sa *Application) tryPreemption(headRoom
*resources.Resource, preemptionDel
return preemptor.TryPreemption()
}
-func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask
*Allocation) bool {
- // try preemption and see if we can free up resource
- preemptor := NewRequiredNodePreemptor(reserve.node, ask)
- result := preemptor.filterAllocations()
- preemptor.sortAllocations()
-
- // Are there any victims/asks to preempt?
- victims := preemptor.GetVictims()
- if len(victims) > 0 {
- log.Log(log.SchedApplication).Info("Found victims for required
node preemption",
- zap.String("ds allocation key", ask.GetAllocationKey()),
- zap.String("allocation name", ask.GetAllocationName()),
- zap.Int("no.of victims", len(victims)))
- for _, victim := range victims {
- if victimQueue :=
sa.queue.GetQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
-
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
- }
- victim.MarkPreempted()
-
victim.SendPreemptedBySchedulerEvent(ask.GetAllocationKey(),
ask.GetApplicationID(), sa.ApplicationID)
- }
- ask.MarkTriggeredPreemption()
- sa.notifyRMAllocationReleased(victims,
si.TerminationType_PREEMPTED_BY_SCHEDULER,
- "preempting allocations to free up resources to run
daemon set ask: "+ask.GetAllocationKey())
- return true
- }
- ask.LogAllocationFailure(common.NoVictimForRequiredNode, true)
- ask.SendRequiredNodePreemptionFailedEvent(reserve.node.NodeID)
- getRateLimitedReqNodeLog().Info("no victim found for required node
preemption",
- zap.String("allocation key", ask.GetAllocationKey()),
- zap.String("allocation name", ask.GetAllocationName()),
- zap.String("node", reserve.node.NodeID),
- zap.Int("total allocations", result.totalAllocations),
- zap.Int("requiredNode allocations",
result.requiredNodeAllocations),
- zap.Int("allocations already preempted",
result.alreadyPreemptedAllocations),
- zap.Int("higher priority allocations",
result.higherPriorityAllocations),
- zap.Int("allocations with non-matching resources",
result.atLeastOneResNotMatched),
- zap.Int("released placeholder allocations",
result.releasedPhAllocations))
- return false
-}
-
// tryNodesNoReserve tries all the nodes for a reserved request that have not
been tried yet.
// This should never result in a reservation as the allocation is already
reserved
func (sa *Application) tryNodesNoReserve(ask *Allocation, iterator
NodeIterator, reservedNode string) *AllocationResult {
@@ -2219,13 +2272,6 @@ func getRateLimitedAppLog() *log.RateLimitedLogger {
return rateLimitedAppLog
}
-func getRateLimitedReqNodeLog() *log.RateLimitedLogger {
- initReqNodeLogOnce.Do(func() {
- rateLimitedReqNodeLog =
log.NewRateLimitedLogger(log.SchedApplication, time.Minute)
- })
- return rateLimitedReqNodeLog
-}
-
func (sa *Application) updateRunnableStatus(runnableInQueue,
runnableByUserLimit bool) {
sa.Lock()
defer sa.Unlock()
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 307bcf49..1be8c4ea 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1602,7 +1602,8 @@ func TestTimeoutPlaceholderAllocReleased(t *testing.T) {
assert.NilError(t, err, "Unexpected error when creating resource from
map")
// add the placeholders to the app: one released, one still available.
phReleased := newPlaceholderAlloc(appID1, nodeID1, res, tg1)
- phReleased.SetReleased(true)
+ err = phReleased.SetReleased(true)
+ assert.NilError(t, err, "Unexpected error when releasing placeholder")
app.AddAllocation(phReleased)
app.addPlaceholderDataWithLocking(phReleased)
assertPlaceholderData(t, app, tg1, 1, 0, 0, res)
@@ -1658,6 +1659,85 @@ func TestTimeoutPlaceholderAllocReleased(t *testing.T) {
assertUserGroupResource(t, getTestUserGroup(), res)
}
+func TestTimeoutPlaceholderAllocPreempted(t *testing.T) {
+ setupUGM()
+
+ originalPhTimeout := defaultPlaceholderTimeout
+ defaultPlaceholderTimeout = 100 * time.Millisecond
+ defer func() { defaultPlaceholderTimeout = originalPhTimeout }()
+
+ app, testHandler := newApplicationWithHandler(appID1, "default",
"root.a")
+ assert.Assert(t, app.getPlaceholderTimer() == nil, "Placeholder timer
should be nil on create")
+ app.SetState(Accepted.String())
+
+ resMap := map[string]string{"memory": "100", "vcores": "10"}
+ res, err := resources.NewResourceFromConf(resMap)
+ assert.NilError(t, err, "Unexpected error when creating resource from
map")
+ // add the placeholders to the app: one released, one still available.
+ phReleased := newPlaceholderAlloc(appID1, nodeID1, res, tg1)
+ err = phReleased.SetReleased(true)
+ assert.NilError(t, err, "Unexpected error when releasing placeholder")
+ app.AddAllocation(phReleased)
+ app.addPlaceholderDataWithLocking(phReleased)
+ assertPlaceholderData(t, app, tg1, 1, 0, 0, res)
+ assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
1))
+
+ assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer
should be initiated after the first placeholder allocation")
+ ph := newPlaceholderAlloc(appID1, nodeID1, res, tg1)
+ app.AddAllocation(ph)
+ app.addPlaceholderDataWithLocking(ph)
+ assertPlaceholderData(t, app, tg1, 2, 0, 0, res)
+ assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
+
+ alloc := newAllocation(appID1, nodeID1, res)
+ app.AddAllocation(alloc)
+ assert.Assert(t, app.IsRunning(), "App should be in running state after
the first allocation")
+ err = common.WaitForCondition(10*time.Millisecond, 1*time.Second,
func() bool {
+ // Preempt the placeholder in the meantime
+ err = ph.MarkPreempted()
+ assert.NilError(t, err, "Unexpected error when marking
placeholder preempted")
+ app.notifyRMAllocationReleased([]*Allocation{ph},
si.TerminationType_PREEMPTED_BY_SCHEDULER,
+ "preempting allocations to free up resources to run
ask: "+ph.GetAllocationKey())
+ return app.getPlaceholderTimer() == nil
+ })
+ assert.NilError(t, err, "Placeholder timeout cleanup did not trigger
unexpectedly")
+ assert.Assert(t, app.IsRunning(), "App should be in running state after
the first allocation")
+ assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
3))
+ // two state updates and 1 release event
+ events := testHandler.GetEvents()
+ var found bool
+ for _, event := range events {
+ if allocRelease, ok :=
event.(*rmevent.RMReleaseAllocationEvent); ok {
+ assert.Equal(t, len(allocRelease.ReleasedAllocations),
1, "one allocation should have been released")
+ assert.Equal(t,
allocRelease.ReleasedAllocations[0].GetTerminationType(),
si.TerminationType_PREEMPTED_BY_SCHEDULER, "")
+ assert.Equal(t,
allocRelease.ReleasedAllocations[0].AllocationKey, ph.allocationKey, "wrong
placeholder allocation released on timeout")
+ found = true
+ }
+ }
+ assert.Assert(t, found, "release allocation event not found in list")
+ assert.Assert(t, resources.Equals(app.GetAllocatedResource(), res),
"Unexpected allocated resources for the app")
+ // a released placeholder still holds the resource until release
confirmed by the RM
+ assert.Assert(t, resources.Equals(app.GetPlaceholderResource(),
resources.Multiply(res, 2)), "Unexpected placeholder resources for the app")
+ // tracking data not updated until confirmed by the RM
+ assertPlaceholderData(t, app, tg1, 2, 0, 0, res)
+ assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
3))
+ // do what the RM does and respond to the release
+ removed := app.RemoveAllocation(ph.allocationKey,
si.TerminationType_TIMEOUT)
+ assert.Assert(t, removed != nil, "expected allocation got nil")
+ assert.Equal(t, ph.allocationKey, removed.allocationKey, "expected
placeholder to be returned")
+ assertPlaceholderData(t, app, tg1, 2, 1, 0, res)
+ assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), res),
"placeholder resources still accounted for on the app")
+ assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
+
+ // process the replacement no real alloc linked account for that
+ removed = app.ReplaceAllocation(phReleased.allocationKey)
+ assert.Assert(t, removed != nil, "expected allocation got nil")
+ assert.Equal(t, phReleased.allocationKey, removed.allocationKey,
"expected placeholder to be returned")
+ assertPlaceholderData(t, app, tg1, 2, 1, 1, res)
+ assert.Assert(t, resources.IsZero(app.GetPlaceholderResource()),
"placeholder resources still accounted for on the app")
+ assertUserGroupResource(t, getTestUserGroup(), res)
+}
+
func TestTimeoutPlaceholderCompleting(t *testing.T) {
setupUGM()
phTimeout := common.ConvertSITimeout(5)
@@ -1666,6 +1746,59 @@ func TestTimeoutPlaceholderCompleting(t *testing.T) {
assert.Equal(t, app.execTimeout, phTimeout, "placeholder timeout not
initialised correctly")
app.SetState(Accepted.String())
+ resMap := map[string]string{"memory": "100", "vcores": "10"}
+ res, err := resources.NewResourceFromConf(resMap)
+ assert.NilError(t, err, "Unexpected error when creating resource from
map")
+ // add the placeholder to the app
+ ph := newPlaceholderAlloc(appID1, nodeID1, res, tg1)
+ app.AddAllocation(ph)
+ assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer
should be initiated after the first placeholder allocation")
+ app.addPlaceholderDataWithLocking(ph)
+ assertPlaceholderData(t, app, tg1, 1, 0, 0, res)
+ assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
1))
+ // add a real allocation as well
+ alloc := newAllocation(appID1, nodeID1, res)
+ app.AddAllocation(alloc)
+ // move on to running
+ app.SetState(Running.String())
+ assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
+ // remove allocation to trigger state change
+ app.RemoveAllocation(alloc.GetAllocationKey(),
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
+ assert.Assert(t, app.IsCompleting(), "App should be in completing state
all allocs have been removed")
+ assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
1))
+ // make sure the placeholders time out
+ err = common.WaitForCondition(10*time.Millisecond, 1*time.Second,
func() bool {
+ return app.getPlaceholderTimer() == nil
+ })
+ assert.NilError(t, err, "Placeholder timer did not time out as
expected")
+ events := testHandler.GetEvents()
+ var found bool
+ for _, event := range events {
+ if allocRelease, ok :=
event.(*rmevent.RMReleaseAllocationEvent); ok {
+ assert.Equal(t, len(allocRelease.ReleasedAllocations),
1, "one allocation should have been released")
+ found = true
+ }
+ }
+ assert.Assert(t, found, "release allocation event not found in list")
+ assert.Assert(t, app.IsCompleting(), "App should be in completing
state")
+ assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
1))
+ // tracking data not updated until confirmed by the RM
+ assertPlaceholderData(t, app, tg1, 1, 0, 0, res)
+ // do what the RM does and respond to the release
+ removed := app.RemoveAllocation(ph.allocationKey,
si.TerminationType_TIMEOUT)
+ assert.Assert(t, removed != nil, "expected allocation got nil")
+ assert.Equal(t, ph.allocationKey, removed.allocationKey, "expected
placeholder to be returned")
+ assertPlaceholderData(t, app, tg1, 1, 1, 0, res)
+}
+
+func TestTimeoutPlaceholderCompletingWithPreemptedPh(t *testing.T) {
+ setupUGM()
+ phTimeout := common.ConvertSITimeout(5)
+ app, testHandler := newApplicationWithPlaceholderTimeout(appID1,
"default", "root.a", 5)
+ assert.Assert(t, app.getPlaceholderTimer() == nil, "Placeholder timer
should be nil on create")
+ assert.Equal(t, app.execTimeout, phTimeout, "placeholder timeout not
initialised correctly")
+ app.SetState(Accepted.String())
+
resMap := map[string]string{"memory": "100", "vcores": "10"}
res, err := resources.NewResourceFromConf(resMap)
assert.NilError(t, err, "Unexpected error when creating resource from
map")
@@ -1689,6 +1822,11 @@ func TestTimeoutPlaceholderCompleting(t *testing.T) {
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
1))
// make sure the placeholders time out
err = common.WaitForCondition(10*time.Millisecond, 1*time.Second,
func() bool {
+ // Preempt the placeholder in the meantime
+ err = ph.MarkPreempted()
+ assert.NilError(t, err, "Unexpected error when marking
placeholder preempted")
+ app.notifyRMAllocationReleased([]*Allocation{ph},
si.TerminationType_PREEMPTED_BY_SCHEDULER,
+ "preempting allocations to free up resources to run
ask: "+ph.GetAllocationKey())
return app.getPlaceholderTimer() == nil
})
assert.NilError(t, err, "Placeholder timer did not time out as
expected")
@@ -1697,6 +1835,8 @@ func TestTimeoutPlaceholderCompleting(t *testing.T) {
for _, event := range events {
if allocRelease, ok :=
event.(*rmevent.RMReleaseAllocationEvent); ok {
assert.Equal(t, len(allocRelease.ReleasedAllocations),
1, "one allocation should have been released")
+ assert.Equal(t,
allocRelease.ReleasedAllocations[0].GetTerminationType(),
si.TerminationType_PREEMPTED_BY_SCHEDULER, "")
+ assert.Equal(t,
allocRelease.ReleasedAllocations[0].AllocationKey, ph.allocationKey, "wrong
placeholder allocation released on timeout")
found = true
}
}
diff --git a/pkg/scheduler/objects/preemption.go
b/pkg/scheduler/objects/preemption.go
index 555b9cea..f2aa91dc 100644
--- a/pkg/scheduler/objects/preemption.go
+++ b/pkg/scheduler/objects/preemption.go
@@ -647,11 +647,34 @@ func (p *Preemptor) TryPreemption() (*AllocationResult,
bool) {
return nil, false
}
+ // Has any victim released?
+ // (Placeholder ?) Allocation chosen as victim earlier in the beginning
of preemption cycle but released in the meantime also should be prevented from
+ // proceeding further to avoid choosing the same allocation which is
already at the verge of replacement process for this current preemption cycle.
+ var preemptedVictims []*Allocation
+ for _, victim := range finalVictims {
+ err := victim.MarkPreempted()
+ if err != nil {
+ log.Log(log.SchedPreemption).Info("Victim is already
released, so marking earlier allocations as un preempted and not moving forward
further on this preemption",
+ zap.String("askApplicationID",
p.ask.applicationID),
+ zap.String("askAllocationKey",
p.ask.allocationKey),
+ zap.String("victimApplicationID",
victim.GetApplicationID()),
+ zap.String("victimAllocationKey",
victim.GetAllocationKey()),
+ zap.Error(err))
+ // revert preempted victims (if any) to old state
+ for _, v := range preemptedVictims {
+ v.MarkUnPreempted()
+ }
+ // victim already released, so preemption doesn't help
+
p.ask.LogAllocationFailure(common.PreemptionVictimsReleased, true)
+ return nil, false
+ }
+ preemptedVictims = append(preemptedVictims, victim)
+ }
+
// preempt the victims
for _, victim := range finalVictims {
if victimQueue :=
p.queue.GetQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
- victim.MarkPreempted()
log.Log(log.SchedPreemption).Info("Preempting task",
zap.String("askApplicationID",
p.ask.applicationID),
zap.String("askAllocationKey",
p.ask.allocationKey),
diff --git a/pkg/scheduler/objects/preemption_test.go
b/pkg/scheduler/objects/preemption_test.go
index c09884cb..017c9395 100644
--- a/pkg/scheduler/objects/preemption_test.go
+++ b/pkg/scheduler/objects/preemption_test.go
@@ -652,6 +652,83 @@ func
TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource(t *testing.T
assertAllocationLog(t, ask3, []string{common.PreemptionShortfall})
}
+// TestTryPreemption_VictimReleased_InsufficientResource Test try preemption
on queue with simple queue hierarchy. Since Node has enough resources to
accommodate, preemption happens because of queue resource constraint.xw
+// Guaranteed and Max resource set on both victim queue path and preemptor
queue paths. victim and preemptor queue are siblings.
+// Request (Preemptor) resource type matches with all resource types of the
victim. Guaranteed set only on that specific resource type.
+// Setup:
+// Nodes are Node1 and Node2. Node has enough space to accommodate the new ask.
+// root.parent. Max set on parent, first: 18
+// root.parent.child1. Guaranteed not set. 2 Allocations (belongs to single
app) are running on node1 and node2. Each Allocation usage is first:5. Total
usage is first:10.
+// root.parent.child2. Guaranteed set 5. Request of first:5 is waiting for
resources.
+// root.parent.child3. Guaranteed not set. 1 Allocation is running on node2.
Total usage is first:5.
+// Preemption options are 1. 2 Alloc running on Node 2 but on child 1 and
child 3 queues. 2. 2 Alloc running on Node 2 and child 1 queue. 3. All three 3
allocs.
+// option 1 >> option 2 >> option 3. In option 3, preempting third allocation
is unnecessary, should avoid this option.
+// Either option 1 or option2 is fine, but not option 3.
+// Though victims are available in the beginning, some of those are "released"
in the meantime. Hence, preemption process is halted and cannot proceed with
the remaining victims
+func TestTryPreemption_VictimReleased_InsufficientResource(t *testing.T) {
+ appQueueMapping := NewAppQueueMapping()
+ node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 30})
+ node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 30})
+ iterator := getNodeIteratorFn(node1, node2)
+ rootQ, err := createRootQueue(map[string]string{"first": "60"})
+ assert.NilError(t, err)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "18"}, nil, appQueueMapping)
+ assert.NilError(t, err)
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, nil, appQueueMapping)
+ assert.NilError(t, err)
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "15"}, appQueueMapping)
+ assert.NilError(t, err)
+ childQ3, err := createManagedQueueGuaranteed(parentQ, "child3", false,
nil, nil, appQueueMapping)
+ assert.NilError(t, err)
+
+ alloc1, alloc2, err := creatApp1(childQ1, node1, node2,
map[string]resources.Quantity{"first": 5}, appQueueMapping)
+ assert.NilError(t, err)
+
+ app3 := newApplication(appID3, "default", "root.parent.child3")
+ app3.SetQueue(childQ3)
+ childQ3.AddApplication(app3)
+ appQueueMapping.AddAppQueueMapping(app3.ApplicationID, childQ3)
+
+ ask4 := newAllocationAsk("alloc4", appID3,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ ask4.createTime = time.Now()
+ assert.NilError(t, app3.AddAllocationAsk(ask4))
+
+ alloc4 := newAllocationWithKey("alloc4", appID3, nodeID2,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ alloc4.createTime = ask4.createTime
+ app3.AddAllocation(alloc4)
+ assert.Check(t, node2.TryAddAllocation(alloc4), "node alloc2 failed")
+ assert.NilError(t,
childQ3.TryIncAllocatedResource(ask4.GetAllocatedResource()))
+
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 10}, "alloc3", appQueueMapping)
+ assert.NilError(t, err)
+
+ headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods":
3})
+ preemptor := NewPreemptor(app2, headRoom, 30*time.Second, ask3,
iterator(), false)
+
+ // Init Queue snapshots before in hand so that victims collection
process does not miss "alloc4"
+ preemptor.initQueueSnapshots()
+
+ // Now that this has been included in the victims list, release it so
that it preemption fail at the end.
+ err = alloc4.SetReleased(true)
+ assert.NilError(t, err)
+
+ allocs := map[string]string{}
+ allocs["alloc3"] = nodeID2
+
+ plugin := mock.NewPreemptionPredicatePlugin(nil, allocs, nil)
+ plugins.RegisterSchedulerPlugin(plugin)
+ defer plugins.UnregisterSchedulerPlugins()
+
+ result, ok := preemptor.TryPreemption()
+
+ assert.Assert(t, result == nil, "unexpected result")
+ assert.Equal(t, ok, false, "no victims found")
+ assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
+ assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
+ assert.Check(t, !alloc4.IsPreempted(), "alloc2 preempted")
+ assertAllocationLog(t, ask3, []string{common.PreemptionVictimsReleased})
+}
+
// TestTryPreemption_VictimsAvailableOnDifferentNodes Test try preemption on
queue with simple queue hierarchy. Since Node doesn't have enough resources to
accomodate, preemption happens because of node resource constraint.
// Guaranteed and Max resource set on both victim queue path and preemptor
queue path. victim and preemptor queue are siblings.
// Request (Preemptor) resource type matches with 1 resource type of the
victim. Guaranteed also set on specific resource type. 2 Victims are available,
but total resource usage is lesser than ask requirement.
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index 520e7e2d..4308749a 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -2072,7 +2072,8 @@ func TestFindEligiblePreemptionVictims(t *testing.T) {
alloc2.released = false
// alloc2 has already been preempted and should not be considered a
valid victim
- alloc2.MarkPreempted()
+ err = alloc2.MarkPreempted()
+ assert.NilError(t, err, "failed to mark preempted node")
snapshot = leaf1.FindEligiblePreemptionVictims(leaf1.QueuePath, ask)
assert.Equal(t, 1, len(victims(snapshot)), "wrong victim count")
assert.Equal(t, alloc3.allocationKey,
victims(snapshot)[0].allocationKey, "wrong alloc")
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go
b/pkg/scheduler/objects/quota_change_preemptor.go
index 1d61c7d3..c302dfb3 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -303,6 +303,13 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims() {
for app, victims := range apps {
if len(victims) > 0 {
for _, victim := range victims {
+ err := victim.MarkPreempted()
+ if err != nil {
+
log.Log(log.SchedRequiredNodePreemption).Warn("allocation is already released,
so not proceeding further on the daemon set preemption process",
+ zap.String("applicationID",
victim.applicationID),
+ zap.String("allocationKey",
victim.GetAllocationKey()))
+ continue
+ }
log.Log(log.SchedQuotaChangePreemption).Info("Preempting victims for quota
change preemption",
zap.String("queue",
qcp.queue.GetQueuePath()),
zap.String("victim allocation key",
victim.allocationKey),
@@ -311,7 +318,6 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims() {
zap.String("victim node",
victim.GetNodeID()),
)
qcp.queue.IncPreemptingResource(victim.GetAllocatedResource())
- victim.MarkPreempted()
victim.SendPreemptedByQuotaChangeEvent(qcp.queue.GetQueuePath())
}
app.notifyRMAllocationReleased(victims,
si.TerminationType_PREEMPTED_BY_SCHEDULER,
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index 1fd3b1ef..d578122a 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -180,12 +180,16 @@ func TestQuotaChangeFilterVictims(t *testing.T) {
asks[1].SetRequiredNode("node2")
}
if tc.irrelevantAllocations[1] {
- asks[2].MarkPreempted()
- asks[3].MarkPreempted()
+ err = asks[2].MarkPreempted()
+ assert.NilError(t, err)
+ err = asks[3].MarkPreempted()
+ assert.NilError(t, err)
}
if tc.irrelevantAllocations[2] {
- asks[4].SetReleased(true)
- asks[5].SetReleased(true)
+ err = asks[4].SetReleased(true)
+ assert.NilError(t, err)
+ err = asks[5].SetReleased(true)
+ assert.NilError(t, err)
}
preemptor := NewQuotaChangePreemptor(tc.queue)
preemptor.preemptableResource = tc.preemptableResource
diff --git a/pkg/scheduler/objects/required_node_preemptor.go
b/pkg/scheduler/objects/required_node_preemptor.go
index 015cd9e1..404edae6 100644
--- a/pkg/scheduler/objects/required_node_preemptor.go
+++ b/pkg/scheduler/objects/required_node_preemptor.go
@@ -19,12 +19,20 @@
package objects
import (
+ "time"
+
+ "go.uber.org/zap"
+
+ "github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/log"
+ "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
type PreemptionContext struct {
node *Node
requiredAsk *Allocation
+ application *Application
allocations []*Allocation
}
@@ -37,15 +45,70 @@ type filteringResult struct {
releasedPhAllocations int // number of ph allocations released
}
-func NewRequiredNodePreemptor(node *Node, requiredAsk *Allocation)
*PreemptionContext {
+func getRateLimitedReqNodeLog() *log.RateLimitedLogger {
+ initReqNodeLogOnce.Do(func() {
+ rateLimitedReqNodeLog =
log.NewRateLimitedLogger(log.SchedRequiredNodePreemption, time.Minute)
+ })
+ return rateLimitedReqNodeLog
+}
+
+func NewRequiredNodePreemptor(node *Node, requiredAsk *Allocation, application
*Application) *PreemptionContext {
preemptor := &PreemptionContext{
node: node,
requiredAsk: requiredAsk,
+ application: application,
allocations: make([]*Allocation, 0),
}
return preemptor
}
+func (p *PreemptionContext) tryPreemption() {
+ result := p.filterAllocations()
+ p.sortAllocations()
+
+ // Are there any victims/asks to preempt?
+ victims := p.GetVictims()
+ if len(victims) > 0 {
+ log.Log(log.SchedRequiredNodePreemption).Info("Found victims
for required node preemption",
+ zap.String("ds allocation key",
p.requiredAsk.GetAllocationKey()),
+ zap.String("allocation name",
p.requiredAsk.GetAllocationName()),
+ zap.Int("no.of victims", len(victims)))
+ for _, victim := range victims {
+ err := victim.MarkPreempted()
+ if err != nil {
+
log.Log(log.SchedRequiredNodePreemption).Warn("allocation is already released,
so not proceeding further on the daemon set preemption process",
+ zap.String("applicationID",
p.requiredAsk.GetApplicationID()),
+ zap.String("allocationKey",
victim.GetAllocationKey()))
+ continue
+ }
+ if victimQueue :=
p.application.queue.GetQueueByAppID(victim.GetApplicationID()); victimQueue !=
nil {
+
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
+ } else {
+
log.Log(log.SchedRequiredNodePreemption).Warn("BUG: Queue not found for daemon
set preemption victim",
+ zap.String("queue",
p.application.queue.Name),
+ zap.String("victimApplicationID",
victim.GetApplicationID()),
+ zap.String("victimAllocationKey",
victim.GetAllocationKey()))
+ }
+
victim.SendPreemptedBySchedulerEvent(p.requiredAsk.GetAllocationKey(),
p.requiredAsk.GetApplicationID(), p.application.queuePath)
+ }
+ p.requiredAsk.MarkTriggeredPreemption()
+ p.application.notifyRMAllocationReleased(victims,
si.TerminationType_PREEMPTED_BY_SCHEDULER,
+ "preempting allocations to free up resources to run
daemon set ask: "+p.requiredAsk.GetAllocationKey())
+ } else {
+
p.requiredAsk.LogAllocationFailure(common.NoVictimForRequiredNode, true)
+
p.requiredAsk.SendRequiredNodePreemptionFailedEvent(p.node.NodeID)
+ getRateLimitedReqNodeLog().Info("no victim found for required
node preemption",
+ zap.String("allocation key",
p.requiredAsk.GetAllocationKey()),
+ zap.String("allocation name",
p.requiredAsk.GetAllocationName()),
+ zap.String("node", p.node.NodeID),
+ zap.Int("total allocations", result.totalAllocations),
+ zap.Int("requiredNode allocations",
result.requiredNodeAllocations),
+ zap.Int("allocations already preempted",
result.alreadyPreemptedAllocations),
+ zap.Int("higher priority allocations",
result.higherPriorityAllocations),
+ zap.Int("allocations with non-matching resources",
result.atLeastOneResNotMatched))
+ }
+}
+
func (p *PreemptionContext) filterAllocations() filteringResult {
var result filteringResult
yunikornAllocations := p.node.GetYunikornAllocations()
diff --git a/pkg/scheduler/objects/required_node_preemptor_test.go
b/pkg/scheduler/objects/required_node_preemptor_test.go
index 730164b5..01244f58 100644
--- a/pkg/scheduler/objects/required_node_preemptor_test.go
+++ b/pkg/scheduler/objects/required_node_preemptor_test.go
@@ -36,10 +36,18 @@ func TestFilterAllocations(t *testing.T) {
},
})
+ rootQ, err := createRootQueue(map[string]string{"first": "100"})
+ assert.NilError(t, err)
+ defaultQ, err := createManagedQueueGuaranteed(rootQ, "default", true,
map[string]string{"vcores": "3"}, nil, nil)
+ assert.NilError(t, err)
+
+ app := newApplication("app1", "default", "root.default")
+ app.SetQueue(defaultQ)
+
// case 1: allocations are available but none of its resources are
matching with ds request ask, hence no allocations considered
requiredAsk := createAllocationAsk("ask12", "app1", true, true, 20,
resources.NewResourceFromMap(map[string]resources.Quantity{"second": 5}))
- p := NewRequiredNodePreemptor(node, requiredAsk)
+ p := NewRequiredNodePreemptor(node, requiredAsk, app)
asks := prepareAllocationAsks(t, node)
result := p.filterAllocations()
verifyFilterResult(t, 10, 0, 10, 0, 0, 0, result)
@@ -52,7 +60,7 @@ func TestFilterAllocations(t *testing.T) {
// case 2: allocations are available but priority is higher than ds
request priority, hence no allocations considered
requiredAsk1 := createAllocationAsk("ask12", "app1", true, true, 1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
- p1 := NewRequiredNodePreemptor(node, requiredAsk1)
+ p1 := NewRequiredNodePreemptor(node, requiredAsk1, app)
asks = prepareAllocationAsks(t, node)
result = p1.filterAllocations()
verifyFilterResult(t, 10, 0, 0, 10, 0, 0, result)
@@ -65,7 +73,7 @@ func TestFilterAllocations(t *testing.T) {
// case 3: victims are available as there are allocations with lower
priority and resource match
requiredAsk2 := createAllocationAsk("ask12", "app1", true, true, 20,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
- p2 := NewRequiredNodePreemptor(node, requiredAsk2)
+ p2 := NewRequiredNodePreemptor(node, requiredAsk2, app)
asks = prepareAllocationAsks(t, node)
result = p2.filterAllocations()
verifyFilterResult(t, 10, 0, 0, 0, 0, 0, result)
@@ -74,9 +82,12 @@ func TestFilterAllocations(t *testing.T) {
removeAllocationAsks(node, asks)
// case 4: allocation has been preempted
- p3 := NewRequiredNodePreemptor(node, requiredAsk2)
+ requiredAsk3 := createAllocationAsk("ask12", "app1", true, true, 20,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ p3 := NewRequiredNodePreemptor(node, requiredAsk3, app)
asks = prepareAllocationAsks(t, node)
- node.GetAllocation("ask5").MarkPreempted() // "ask5" would be the first
and only victim without this
+ err = node.GetAllocation("ask5").MarkPreempted() // "ask5" would be the
first and only victim without this
+ assert.NilError(t, err)
result = p3.filterAllocations()
p3.sortAllocations()
@@ -86,7 +97,9 @@ func TestFilterAllocations(t *testing.T) {
removeAllocationAsks(node, asks)
// case 5: existing required node allocation
- p4 := NewRequiredNodePreemptor(node, requiredAsk2)
+ requiredAsk4 := createAllocationAsk("ask12", "app1", true, true, 20,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ p4 := NewRequiredNodePreemptor(node, requiredAsk4, app)
results := prepareAllocationAsks(t, node)
results[8].requiredNode = "node-3"
@@ -97,7 +110,7 @@ func TestFilterAllocations(t *testing.T) {
removeAllocationAsks(node, asks)
// case 6: release ph allocation
- p5 := NewRequiredNodePreemptor(node, requiredAsk2)
+ p5 := NewRequiredNodePreemptor(node, requiredAsk2, app)
results = prepareAllocationAsks(t, node)
results[9].released = true
@@ -117,11 +130,19 @@ func TestGetVictims(t *testing.T) {
},
})
+ rootQ, err := createRootQueue(map[string]string{"first": "100"})
+ assert.NilError(t, err)
+ defaultQ, err := createManagedQueueGuaranteed(rootQ, "default", true,
map[string]string{"vcores": "3"}, nil, nil)
+ assert.NilError(t, err)
+
+ app := newApplication("app1", "default", "root.default")
+ app.SetQueue(defaultQ)
+
// case 1: victims are available and its resources are matching with ds
request ask
requiredAsk := createAllocationAsk("ask11", "app1", true, true, 20,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 25}))
- p := NewRequiredNodePreemptor(node, requiredAsk)
+ p := NewRequiredNodePreemptor(node, requiredAsk, app)
asks := prepareAllocationAsks(t, node)
p.filterAllocations()
p.sortAllocations()
@@ -140,7 +161,7 @@ func TestGetVictims(t *testing.T) {
// case 2: victims are available and its resources are matching with ds
request ask (but with different quantity)
requiredAsk2 := createAllocationAsk("ask13", "app1", true, true, 20,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
- p2 := NewRequiredNodePreemptor(node, requiredAsk2)
+ p2 := NewRequiredNodePreemptor(node, requiredAsk2, app)
asks = prepareAllocationAsks(t, node)
p2.filterAllocations()
p2.sortAllocations()
@@ -151,7 +172,7 @@ func TestGetVictims(t *testing.T) {
// case 3: allocations are available and its resources are matching
partially with ds request ask (because of different resource types), hence no
victims
requiredAsk3 := createAllocationAsk("ask13", "app1", true, true, 20,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5,
"second": 5}))
- p3 := NewRequiredNodePreemptor(node, requiredAsk3)
+ p3 := NewRequiredNodePreemptor(node, requiredAsk3, app)
asks = prepareAllocationAsks(t, node)
p3.filterAllocations()
filteredAllocations := p3.getAllocations()
@@ -164,6 +185,19 @@ func TestGetVictims(t *testing.T) {
victims3 := p3.GetVictims()
assert.Equal(t, len(victims3), 0)
removeAllocationAsks(node, asks)
+
+ // case 4: victim chosen earlier released in the meantime
+ requiredAsk4 := createAllocationAsk("ask14", "app1", true, true, 20,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 25}))
+ p4 := NewRequiredNodePreemptor(node, requiredAsk4, app)
+ asks = prepareAllocationAsks(t, node)
+ p4.filterAllocations()
+ p4.sortAllocations()
+ err = asks[1].SetReleased(true)
+ assert.NilError(t, err)
+ victims = p4.GetVictims()
+ assert.Equal(t, len(victims), 4)
+ removeAllocationAsks(node, asks)
}
func verifyFilterResult(t *testing.T, totalAllocations,
requiredNodeAllocations, resourceNotEnough, higherPriorityAllocations,
alreadyPreemptedAllocations int, releasedPhAllocations int, result
filteringResult) {
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 1ee9a4b3..ebca07f5 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -3312,7 +3312,8 @@ func TestPreemptedPlaceholderSkip(t *testing.T) {
}
// mark the placeholder as preempted (shortcut not interested in usage
accounting etc.)
- ph.MarkPreempted()
+ err = ph.MarkPreempted()
+ assert.NilError(t, err, "failed to mark preempted placeholder")
// replace the placeholder should NOT work
result = partition.tryPlaceholderAllocate()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]