This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 6ef347b3 [YUNIKORN-2953] Placeholder release count incorrect (#991)
6ef347b3 is described below
commit 6ef347b31c1cd61a2d8b681a6159893839b3697b
Author: Wilfred Spiegelenburg <[email protected]>
AuthorDate: Mon Nov 4 12:48:02 2024 -0600
[YUNIKORN-2953] Placeholder release count incorrect (#991)
While releasing placeholders the allocated placeholders are counted
twice in the tracking information. With YUNIKORN-2926 in place this
happens if some but not all placeholders are allocated only.
Mark unallocated placeholders that timeout as released to prevent
scheduling issues.
Closes: #991
Signed-off-by: Craig Condit <[email protected]>
---
pkg/scheduler/objects/application.go | 48 +++++++++++++++++++------------
pkg/scheduler/objects/application_test.go | 46 +++++++++++++++++++----------
2 files changed, 60 insertions(+), 34 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 4de35b3e..98bbcdb7 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -409,9 +409,8 @@ func (sa *Application) clearPlaceholderTimer() {
func (sa *Application) timeoutPlaceholderProcessing() {
sa.Lock()
defer sa.Unlock()
- switch {
- // Case 1: if all app's placeholders are allocated, only part of them
gets replaced, just delete the remaining placeholders
- case (sa.IsRunning() || sa.IsCompleting()) &&
!resources.IsZero(sa.allocatedPlaceholder):
+ if (sa.IsRunning() || sa.IsCompleting()) &&
!resources.IsZero(sa.allocatedPlaceholder) {
+ // Case 1: if all app's placeholders are allocated, only part
of them gets replaced, just delete the remaining placeholders
var toRelease []*Allocation
replacing := 0
for _, alloc := range sa.getPlaceholderAllocations() {
@@ -423,19 +422,14 @@ func (sa *Application) timeoutPlaceholderProcessing() {
alloc.SetReleased(true)
toRelease = append(toRelease, alloc)
}
- log.Log(log.SchedApplication).Info("Placeholder timeout,
releasing placeholders",
+ log.Log(log.SchedApplication).Info("Placeholder timeout,
releasing allocated placeholders",
zap.String("AppID", sa.ApplicationID),
zap.Int("placeholders being replaced", replacing),
zap.Int("releasing placeholders", len(toRelease)))
// trigger the release of the placeholders: accounting updates
when the release is done
sa.notifyRMAllocationReleased(toRelease,
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder
timeout")
- // Case 2: in every other case progress the application, and notify the
context about the expired placeholder asks
- default:
- log.Log(log.SchedApplication).Info("Placeholder timeout,
releasing asks and placeholders",
- zap.String("AppID", sa.ApplicationID),
- zap.Int("releasing placeholders", len(sa.allocations)),
- zap.Int("releasing asks", len(sa.requests)),
- zap.String("gang scheduling style",
sa.gangSchedulingStyle))
+ } else {
+ // Case 2: in every other case progress the application, and
notify the context about the expired placeholders
// change the status of the app based on gang style: soft
resume normal allocations, hard fail the app
event := ResumeApplication
if sa.gangSchedulingStyle == Hard {
@@ -447,14 +441,30 @@ func (sa *Application) timeoutPlaceholderProcessing() {
zap.String("currentState", sa.CurrentState()),
zap.Error(err))
}
- sa.notifyRMAllocationReleased(sa.getAllRequestsInternal(),
si.TerminationType_TIMEOUT, "releasing placeholders asks on placeholder
timeout")
- sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
- // all allocations are placeholders but GetAllAllocations is
locked and cannot be used
- sa.notifyRMAllocationReleased(sa.getPlaceholderAllocations(),
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder
timeout")
- // we are in an accepted or new state so nothing can be
replaced yet: mark everything as timedout
- for _, phData := range sa.placeholderData {
- phData.TimedOut = phData.Count
+ // all allocations are placeholders release them all
+ var toRelease, pendingRelease []*Allocation
+ for _, alloc := range sa.allocations {
+ alloc.SetReleased(true)
+ toRelease = append(toRelease, alloc)
+ }
+ // get all open requests and remove them all filter out already
allocated as they are already released
+ for _, alloc := range sa.requests {
+ if !alloc.IsAllocated() {
+ alloc.SetReleased(true)
+ pendingRelease = append(pendingRelease, alloc)
+
sa.placeholderData[alloc.taskGroupName].TimedOut++
+ }
}
+ log.Log(log.SchedApplication).Info("Placeholder timeout,
releasing allocated and pending placeholders",
+ zap.String("AppID", sa.ApplicationID),
+ zap.Int("releasing placeholders", len(toRelease)),
+ zap.Int("pending placeholders", len(pendingRelease)),
+ zap.String("gang scheduling style",
sa.gangSchedulingStyle))
+ sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
+ // trigger the release of the allocated placeholders:
accounting updates when the release is done
+ sa.notifyRMAllocationReleased(toRelease,
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder
timeout")
+ // trigger the release of the pending placeholders: accounting
has been done
+ sa.notifyRMAllocationReleased(pendingRelease,
si.TerminationType_TIMEOUT, "releasing pending placeholders on placeholder
timeout")
}
sa.clearPlaceholderTimer()
}
@@ -1906,7 +1916,7 @@ func (sa *Application)
removeAllocationInternal(allocationKey string, releaseTyp
if sa.hasZeroAllocations() {
removeApp = true
event = CompleteApplication
- eventWarning = "Application state not changed to
Waiting while removing an allocation"
+ eventWarning = "Application state not changed to
Completing while removing an allocation"
}
sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp)
}
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 91db298f..0b77db56 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1516,14 +1516,14 @@ func TestReplaceAllocationTracking(t *testing.T) {
}
func TestTimeoutPlaceholderSoft(t *testing.T) {
- runTimeoutPlaceholderTest(t, Resuming.String(), Soft, []int{1, 2})
+ runTimeoutPlaceholderTest(t, Resuming.String(), Soft)
}
func TestTimeoutPlaceholderHard(t *testing.T) {
- runTimeoutPlaceholderTest(t, Failing.String(), Hard, []int{1, 2})
+ runTimeoutPlaceholderTest(t, Failing.String(), Hard)
}
-func runTimeoutPlaceholderTest(t *testing.T, expectedState string,
gangSchedulingStyle string, expectedReleases []int) {
+func runTimeoutPlaceholderTest(t *testing.T, expectedState string,
gangSchedulingStyle string) {
const (
tg1 = "tg-1"
tg2 = "tg-2"
@@ -1555,16 +1555,16 @@ func runTimeoutPlaceholderTest(t *testing.T,
expectedState string, gangSchedulin
assert.Assert(t, app.IsAccepted(), "Application should be in accepted
state")
// add the placeholder to the app
- ph := newPlaceholderAlloc(appID1, nodeID1, res, tg2)
- app.AddAllocation(ph)
- app.addPlaceholderDataWithLocking(ph)
+ ph1 := newPlaceholderAlloc(appID1, nodeID1, res, tg2)
+ app.AddAllocation(ph1)
+ app.addPlaceholderDataWithLocking(ph1)
assertPlaceholderData(t, app, tg2, 1, 0, 0, res)
assertUserGroupResource(t, getTestUserGroup(), res)
assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer
should be initiated after the first placeholder allocation")
// add a second one to check the filter
- ph = newPlaceholderAlloc(appID1, nodeID1, res, tg2)
- app.AddAllocation(ph)
- app.addPlaceholderDataWithLocking(ph)
+ ph2 := newPlaceholderAlloc(appID1, nodeID1, res, tg2)
+ app.AddAllocation(ph2)
+ app.addPlaceholderDataWithLocking(ph2)
assertPlaceholderData(t, app, tg2, 2, 0, 0, res)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
assert.Assert(t, app.IsAccepted(), "Application should be in accepted
state")
@@ -1574,11 +1574,22 @@ func runTimeoutPlaceholderTest(t *testing.T,
expectedState string, gangSchedulin
return app.placeholderTimer == nil
})
assert.NilError(t, err, "Placeholder timeout cleanup did not trigger
unexpectedly")
- // When the app was in the accepted state, timeout should equal to count
+ // pending updates immediately
assertPlaceholderData(t, app, tg1, 1, 1, 0, res)
- assertPlaceholderData(t, app, tg2, 2, 2, 0, res)
+ // No changes until the removals are confirmed
+ assertPlaceholderData(t, app, tg2, 2, 0, 0, res)
+
assert.Equal(t, app.stateMachine.Current(), expectedState, "Application
did not progress into expected state")
+ log := app.GetStateLog()
+ assert.Equal(t, len(log), 2, "wrong number of app events")
+ assert.Equal(t, log[0].ApplicationState, Accepted.String())
+ assert.Equal(t, log[1].ApplicationState, expectedState)
+
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
+ // ordering of events is based on the order in which we call the
release in the application
+ // first are the allocated placeholders then the pending placeholders,
always same values
+ // See the timeoutPlaceholderProcessing() function
+ expectedReleases := []int{2, 1}
events := testHandler.GetEvents()
var found int
idx := 0
@@ -1597,10 +1608,15 @@ func runTimeoutPlaceholderTest(t *testing.T,
expectedState string, gangSchedulin
assert.Assert(t, resources.Equals(app.GetPlaceholderResource(),
resources.Multiply(res, 2)), "Unexpected placeholder resources for the app")
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
- log := app.GetStateLog()
- assert.Equal(t, len(log), 2, "wrong number of app events")
- assert.Equal(t, log[0].ApplicationState, Accepted.String())
- assert.Equal(t, log[1].ApplicationState, expectedState)
+ removed := app.RemoveAllocation(ph1.allocationKey,
si.TerminationType_TIMEOUT)
+ assert.Assert(t, removed != nil, "expected allocation got nil")
+ assert.Equal(t, ph1.allocationKey, removed.allocationKey, "expected
placeholder to be returned")
+ removed = app.RemoveAllocation(ph2.allocationKey,
si.TerminationType_TIMEOUT)
+ assert.Assert(t, removed != nil, "expected allocation got nil")
+ assert.Equal(t, ph2.allocationKey, removed.allocationKey, "expected
placeholder to be returned")
+
+ // Removals are confirmed: timeout should equal to count
+ assertPlaceholderData(t, app, tg2, 2, 2, 0, res)
}
func TestTimeoutPlaceholderAllocReleased(t *testing.T) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]