This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ef347b3 [YUNIKORN-2953] Placeholder release count incorrect (#991)
6ef347b3 is described below

commit 6ef347b31c1cd61a2d8b681a6159893839b3697b
Author: Wilfred Spiegelenburg <[email protected]>
AuthorDate: Mon Nov 4 12:48:02 2024 -0600

    [YUNIKORN-2953] Placeholder release count incorrect (#991)
    
    While releasing placeholders the allocated placeholders are counted
    twice in the tracking information. With YUNIKORN-2926 in place this
    happens if some but not all placeholders are allocated only.
    
    Mark unallocated placeholders that timeout as released to prevent
    scheduling issues.
    
    Closes: #991
    
    Signed-off-by: Craig Condit <[email protected]>
---
 pkg/scheduler/objects/application.go      | 48 +++++++++++++++++++------------
 pkg/scheduler/objects/application_test.go | 46 +++++++++++++++++++----------
 2 files changed, 60 insertions(+), 34 deletions(-)

diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 4de35b3e..98bbcdb7 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -409,9 +409,8 @@ func (sa *Application) clearPlaceholderTimer() {
 func (sa *Application) timeoutPlaceholderProcessing() {
        sa.Lock()
        defer sa.Unlock()
-       switch {
-       // Case 1: if all app's placeholders are allocated, only part of them 
gets replaced, just delete the remaining placeholders
-       case (sa.IsRunning() || sa.IsCompleting()) && 
!resources.IsZero(sa.allocatedPlaceholder):
+       if (sa.IsRunning() || sa.IsCompleting()) && 
!resources.IsZero(sa.allocatedPlaceholder) {
+               // Case 1: if all app's placeholders are allocated, only part 
of them gets replaced, just delete the remaining placeholders
                var toRelease []*Allocation
                replacing := 0
                for _, alloc := range sa.getPlaceholderAllocations() {
@@ -423,19 +422,14 @@ func (sa *Application) timeoutPlaceholderProcessing() {
                        alloc.SetReleased(true)
                        toRelease = append(toRelease, alloc)
                }
-               log.Log(log.SchedApplication).Info("Placeholder timeout, 
releasing placeholders",
+               log.Log(log.SchedApplication).Info("Placeholder timeout, 
releasing allocated placeholders",
                        zap.String("AppID", sa.ApplicationID),
                        zap.Int("placeholders being replaced", replacing),
                        zap.Int("releasing placeholders", len(toRelease)))
                // trigger the release of the placeholders: accounting updates 
when the release is done
                sa.notifyRMAllocationReleased(toRelease, 
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder 
timeout")
-       // Case 2: in every other case progress the application, and notify the 
context about the expired placeholder asks
-       default:
-               log.Log(log.SchedApplication).Info("Placeholder timeout, 
releasing asks and placeholders",
-                       zap.String("AppID", sa.ApplicationID),
-                       zap.Int("releasing placeholders", len(sa.allocations)),
-                       zap.Int("releasing asks", len(sa.requests)),
-                       zap.String("gang scheduling style", 
sa.gangSchedulingStyle))
+       } else {
+               // Case 2: in every other case progress the application, and 
notify the context about the expired placeholders
                // change the status of the app based on gang style: soft 
resume normal allocations, hard fail the app
                event := ResumeApplication
                if sa.gangSchedulingStyle == Hard {
@@ -447,14 +441,30 @@ func (sa *Application) timeoutPlaceholderProcessing() {
                                zap.String("currentState", sa.CurrentState()),
                                zap.Error(err))
                }
-               sa.notifyRMAllocationReleased(sa.getAllRequestsInternal(), 
si.TerminationType_TIMEOUT, "releasing placeholders asks on placeholder 
timeout")
-               sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
-               // all allocations are placeholders but GetAllAllocations is 
locked and cannot be used
-               sa.notifyRMAllocationReleased(sa.getPlaceholderAllocations(), 
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder 
timeout")
-               // we are in an accepted or new state so nothing can be 
replaced yet: mark everything as timedout
-               for _, phData := range sa.placeholderData {
-                       phData.TimedOut = phData.Count
+               // all allocations are placeholders release them all
+               var toRelease, pendingRelease []*Allocation
+               for _, alloc := range sa.allocations {
+                       alloc.SetReleased(true)
+                       toRelease = append(toRelease, alloc)
+               }
+               // get all open requests and remove them all filter out already 
allocated as they are already released
+               for _, alloc := range sa.requests {
+                       if !alloc.IsAllocated() {
+                               alloc.SetReleased(true)
+                               pendingRelease = append(pendingRelease, alloc)
+                               
sa.placeholderData[alloc.taskGroupName].TimedOut++
+                       }
                }
+               log.Log(log.SchedApplication).Info("Placeholder timeout, 
releasing allocated and pending placeholders",
+                       zap.String("AppID", sa.ApplicationID),
+                       zap.Int("releasing placeholders", len(toRelease)),
+                       zap.Int("pending placeholders", len(pendingRelease)),
+                       zap.String("gang scheduling style", 
sa.gangSchedulingStyle))
+               sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
+               // trigger the release of the allocated placeholders: 
accounting updates when the release is done
+               sa.notifyRMAllocationReleased(toRelease, 
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder 
timeout")
+               // trigger the release of the pending placeholders: accounting 
has been done
+               sa.notifyRMAllocationReleased(pendingRelease, 
si.TerminationType_TIMEOUT, "releasing pending placeholders on placeholder 
timeout")
        }
        sa.clearPlaceholderTimer()
 }
@@ -1906,7 +1916,7 @@ func (sa *Application) 
removeAllocationInternal(allocationKey string, releaseTyp
                if sa.hasZeroAllocations() {
                        removeApp = true
                        event = CompleteApplication
-                       eventWarning = "Application state not changed to 
Waiting while removing an allocation"
+                       eventWarning = "Application state not changed to 
Completing while removing an allocation"
                }
                sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp)
        }
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 91db298f..0b77db56 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1516,14 +1516,14 @@ func TestReplaceAllocationTracking(t *testing.T) {
 }
 
 func TestTimeoutPlaceholderSoft(t *testing.T) {
-       runTimeoutPlaceholderTest(t, Resuming.String(), Soft, []int{1, 2})
+       runTimeoutPlaceholderTest(t, Resuming.String(), Soft)
 }
 
 func TestTimeoutPlaceholderHard(t *testing.T) {
-       runTimeoutPlaceholderTest(t, Failing.String(), Hard, []int{1, 2})
+       runTimeoutPlaceholderTest(t, Failing.String(), Hard)
 }
 
-func runTimeoutPlaceholderTest(t *testing.T, expectedState string, 
gangSchedulingStyle string, expectedReleases []int) {
+func runTimeoutPlaceholderTest(t *testing.T, expectedState string, 
gangSchedulingStyle string) {
        const (
                tg1 = "tg-1"
                tg2 = "tg-2"
@@ -1555,16 +1555,16 @@ func runTimeoutPlaceholderTest(t *testing.T, 
expectedState string, gangSchedulin
        assert.Assert(t, app.IsAccepted(), "Application should be in accepted 
state")
 
        // add the placeholder to the app
-       ph := newPlaceholderAlloc(appID1, nodeID1, res, tg2)
-       app.AddAllocation(ph)
-       app.addPlaceholderDataWithLocking(ph)
+       ph1 := newPlaceholderAlloc(appID1, nodeID1, res, tg2)
+       app.AddAllocation(ph1)
+       app.addPlaceholderDataWithLocking(ph1)
        assertPlaceholderData(t, app, tg2, 1, 0, 0, res)
        assertUserGroupResource(t, getTestUserGroup(), res)
        assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer 
should be initiated after the first placeholder allocation")
        // add a second one to check the filter
-       ph = newPlaceholderAlloc(appID1, nodeID1, res, tg2)
-       app.AddAllocation(ph)
-       app.addPlaceholderDataWithLocking(ph)
+       ph2 := newPlaceholderAlloc(appID1, nodeID1, res, tg2)
+       app.AddAllocation(ph2)
+       app.addPlaceholderDataWithLocking(ph2)
        assertPlaceholderData(t, app, tg2, 2, 0, 0, res)
        assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
2))
        assert.Assert(t, app.IsAccepted(), "Application should be in accepted 
state")
@@ -1574,11 +1574,22 @@ func runTimeoutPlaceholderTest(t *testing.T, 
expectedState string, gangSchedulin
                return app.placeholderTimer == nil
        })
        assert.NilError(t, err, "Placeholder timeout cleanup did not trigger 
unexpectedly")
-       // When the app was in the accepted state, timeout should equal to count
+       // pending updates immediately
        assertPlaceholderData(t, app, tg1, 1, 1, 0, res)
-       assertPlaceholderData(t, app, tg2, 2, 2, 0, res)
+       // No changes until the removals are confirmed
+       assertPlaceholderData(t, app, tg2, 2, 0, 0, res)
+
        assert.Equal(t, app.stateMachine.Current(), expectedState, "Application 
did not progress into expected state")
+       log := app.GetStateLog()
+       assert.Equal(t, len(log), 2, "wrong number of app events")
+       assert.Equal(t, log[0].ApplicationState, Accepted.String())
+       assert.Equal(t, log[1].ApplicationState, expectedState)
+
        assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
2))
+       // ordering of events is based on the order in which we call the 
release in the application
+       // first are the allocated placeholders then the pending placeholders, 
always same values
+       // See the timeoutPlaceholderProcessing() function
+       expectedReleases := []int{2, 1}
        events := testHandler.GetEvents()
        var found int
        idx := 0
@@ -1597,10 +1608,15 @@ func runTimeoutPlaceholderTest(t *testing.T, 
expectedState string, gangSchedulin
        assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), 
resources.Multiply(res, 2)), "Unexpected placeholder resources for the app")
        assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
2))
 
-       log := app.GetStateLog()
-       assert.Equal(t, len(log), 2, "wrong number of app events")
-       assert.Equal(t, log[0].ApplicationState, Accepted.String())
-       assert.Equal(t, log[1].ApplicationState, expectedState)
+       removed := app.RemoveAllocation(ph1.allocationKey, 
si.TerminationType_TIMEOUT)
+       assert.Assert(t, removed != nil, "expected allocation got nil")
+       assert.Equal(t, ph1.allocationKey, removed.allocationKey, "expected 
placeholder to be returned")
+       removed = app.RemoveAllocation(ph2.allocationKey, 
si.TerminationType_TIMEOUT)
+       assert.Assert(t, removed != nil, "expected allocation got nil")
+       assert.Equal(t, ph2.allocationKey, removed.allocationKey, "expected 
placeholder to be returned")
+
+       // Removals are confirmed: timeout should equal to count
+       assertPlaceholderData(t, app, tg2, 2, 2, 0, res)
 }
 
 func TestTimeoutPlaceholderAllocReleased(t *testing.T) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to