This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new d0c1c33c [YUNIKORN-3190] Fix race condition occurring between released 
and preempted allocations (#1058) (#1059)
d0c1c33c is described below

commit d0c1c33cb39daa72fb16b3bfe92cd157915a1c99
Author: Manikandan R <[email protected]>
AuthorDate: Tue Jan 13 15:36:40 2026 +0530

    [YUNIKORN-3190] Fix race condition occurring between released and preempted 
allocations (#1058) (#1059)
    
    Closes: #1058
    
    
    (cherry picked from commit 40271160da1b93ff8363521cbbb711efb42f747f)
    
    Signed-off-by: mani <[email protected]>
---
 pkg/common/errors.go                               |   1 +
 pkg/log/logger.go                                  |  61 ++++----
 pkg/scheduler/objects/allocation.go                |  22 ++-
 pkg/scheduler/objects/application.go               | 168 +++++++++++++--------
 pkg/scheduler/objects/application_test.go          | 142 ++++++++++++++++-
 pkg/scheduler/objects/preemption.go                |  25 ++-
 pkg/scheduler/objects/preemption_test.go           |  77 ++++++++++
 pkg/scheduler/objects/queue_test.go                |   3 +-
 pkg/scheduler/objects/quota_change_preemptor.go    |   8 +-
 .../objects/quota_change_preemptor_test.go         |  12 +-
 pkg/scheduler/objects/required_node_preemptor.go   |  65 +++++++-
 .../objects/required_node_preemptor_test.go        |  54 +++++--
 pkg/scheduler/partition_test.go                    |   3 +-
 13 files changed, 528 insertions(+), 113 deletions(-)

diff --git a/pkg/common/errors.go b/pkg/common/errors.go
index 4987ec90..210c4089 100644
--- a/pkg/common/errors.go
+++ b/pkg/common/errors.go
@@ -41,4 +41,5 @@ const (
        PreemptionDoesNotHelp          = "Preemption does not help"
        NoVictimForRequiredNode        = "No fit on required node, preemption 
does not help"
        PreemptionMaxAttemptsExhausted = "Preemption max attempts exhausted"
+       PreemptionVictimsReleased      = "Victims picked earlier were released 
at the final stage"
 )
diff --git a/pkg/log/logger.go b/pkg/log/logger.go
index bbfe3ba3..0c1f5388 100644
--- a/pkg/log/logger.go
+++ b/pkg/log/logger.go
@@ -54,36 +54,37 @@ const (
 
 // Defined loggers: when adding new loggers, ids must be sequential, and all 
must be added to the loggers slice in the same order
 var (
-       Core                       = &LoggerHandle{id: 0, name: "core"}
-       Test                       = &LoggerHandle{id: 1, name: "test"}
-       Deprecation                = &LoggerHandle{id: 2, name: "deprecation"}
-       Config                     = &LoggerHandle{id: 3, name: "core.config"}
-       Entrypoint                 = &LoggerHandle{id: 4, name: 
"core.entrypoint"}
-       Events                     = &LoggerHandle{id: 5, name: "core.events"}
-       OpenTracing                = &LoggerHandle{id: 6, name: 
"core.opentracing"}
-       Resources                  = &LoggerHandle{id: 7, name: 
"core.resources"}
-       REST                       = &LoggerHandle{id: 8, name: "core.rest"}
-       RMProxy                    = &LoggerHandle{id: 9, name: "core.rmproxy"}
-       RPC                        = &LoggerHandle{id: 10, name: "core.rpc"}
-       Metrics                    = &LoggerHandle{id: 11, name: "core.metrics"}
-       Scheduler                  = &LoggerHandle{id: 12, name: 
"core.scheduler"}
-       SchedAllocation            = &LoggerHandle{id: 13, name: 
"core.scheduler.allocation"}
-       SchedApplication           = &LoggerHandle{id: 14, name: 
"core.scheduler.application"}
-       SchedAppUsage              = &LoggerHandle{id: 15, name: 
"core.scheduler.application.usage"}
-       SchedContext               = &LoggerHandle{id: 16, name: 
"core.scheduler.context"}
-       SchedFSM                   = &LoggerHandle{id: 17, name: 
"core.scheduler.fsm"}
-       SchedHealth                = &LoggerHandle{id: 18, name: 
"core.scheduler.health"}
-       SchedNode                  = &LoggerHandle{id: 19, name: 
"core.scheduler.node"}
-       SchedPartition             = &LoggerHandle{id: 20, name: 
"core.scheduler.partition"}
-       SchedPreemption            = &LoggerHandle{id: 21, name: 
"core.scheduler.preemption"}
-       SchedQueue                 = &LoggerHandle{id: 22, name: 
"core.scheduler.queue"}
-       SchedReservation           = &LoggerHandle{id: 23, name: 
"core.scheduler.reservation"}
-       SchedUGM                   = &LoggerHandle{id: 24, name: 
"core.scheduler.ugm"}
-       SchedNodesUsage            = &LoggerHandle{id: 25, name: 
"core.scheduler.nodesusage"}
-       Security                   = &LoggerHandle{id: 26, name: 
"core.security"}
-       Utils                      = &LoggerHandle{id: 27, name: "core.utils"}
-       Diagnostics                = &LoggerHandle{id: 28, name: 
"core.diagnostics"}
-       SchedQuotaChangePreemption = &LoggerHandle{id: 29, name: 
"core.scheduler.preemption.quotachange"}
+       Core                        = &LoggerHandle{id: 0, name: "core"}
+       Test                        = &LoggerHandle{id: 1, name: "test"}
+       Deprecation                 = &LoggerHandle{id: 2, name: "deprecation"}
+       Config                      = &LoggerHandle{id: 3, name: "core.config"}
+       Entrypoint                  = &LoggerHandle{id: 4, name: 
"core.entrypoint"}
+       Events                      = &LoggerHandle{id: 5, name: "core.events"}
+       OpenTracing                 = &LoggerHandle{id: 6, name: 
"core.opentracing"}
+       Resources                   = &LoggerHandle{id: 7, name: 
"core.resources"}
+       REST                        = &LoggerHandle{id: 8, name: "core.rest"}
+       RMProxy                     = &LoggerHandle{id: 9, name: "core.rmproxy"}
+       RPC                         = &LoggerHandle{id: 10, name: "core.rpc"}
+       Metrics                     = &LoggerHandle{id: 11, name: 
"core.metrics"}
+       Scheduler                   = &LoggerHandle{id: 12, name: 
"core.scheduler"}
+       SchedAllocation             = &LoggerHandle{id: 13, name: 
"core.scheduler.allocation"}
+       SchedApplication            = &LoggerHandle{id: 14, name: 
"core.scheduler.application"}
+       SchedAppUsage               = &LoggerHandle{id: 15, name: 
"core.scheduler.application.usage"}
+       SchedContext                = &LoggerHandle{id: 16, name: 
"core.scheduler.context"}
+       SchedFSM                    = &LoggerHandle{id: 17, name: 
"core.scheduler.fsm"}
+       SchedHealth                 = &LoggerHandle{id: 18, name: 
"core.scheduler.health"}
+       SchedNode                   = &LoggerHandle{id: 19, name: 
"core.scheduler.node"}
+       SchedPartition              = &LoggerHandle{id: 20, name: 
"core.scheduler.partition"}
+       SchedPreemption             = &LoggerHandle{id: 21, name: 
"core.scheduler.preemption"}
+       SchedQueue                  = &LoggerHandle{id: 22, name: 
"core.scheduler.queue"}
+       SchedReservation            = &LoggerHandle{id: 23, name: 
"core.scheduler.reservation"}
+       SchedUGM                    = &LoggerHandle{id: 24, name: 
"core.scheduler.ugm"}
+       SchedNodesUsage             = &LoggerHandle{id: 25, name: 
"core.scheduler.nodesusage"}
+       Security                    = &LoggerHandle{id: 26, name: 
"core.security"}
+       Utils                       = &LoggerHandle{id: 27, name: "core.utils"}
+       Diagnostics                 = &LoggerHandle{id: 28, name: 
"core.diagnostics"}
+       SchedQuotaChangePreemption  = &LoggerHandle{id: 29, name: 
"core.scheduler.preemption.quotachange"}
+       SchedRequiredNodePreemption = &LoggerHandle{id: 29, name: 
"core.scheduler.preemption.requirednode"}
 )
 
 // this tracks all the known logger handles, used to preallocate the real 
logger instances when configuration changes
diff --git a/pkg/scheduler/objects/allocation.go 
b/pkg/scheduler/objects/allocation.go
index 3b60c80b..92d2b900 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -19,6 +19,7 @@
 package objects
 
 import (
+       "errors"
        "fmt"
        "strconv"
        "time"
@@ -294,10 +295,16 @@ func (a *Allocation) IsReleased() bool {
 }
 
 // SetReleased updates the release status of the allocation.
-func (a *Allocation) SetReleased(released bool) {
+func (a *Allocation) SetReleased(released bool) error {
        a.Lock()
        defer a.Unlock()
+       if released {
+               if a.preempted {
+                       return errors.New("allocation is already preempted")
+               }
+       }
        a.released = released
+       return nil
 }
 
 // GetTagsClone returns the copy of the tags for this allocation.
@@ -348,10 +355,21 @@ func (a *Allocation) 
SetAllocatedResource(allocatedResource *resources.Resource)
 }
 
 // MarkPreempted marks the allocation as preempted.
-func (a *Allocation) MarkPreempted() {
+func (a *Allocation) MarkPreempted() error {
        a.Lock()
        defer a.Unlock()
+       if a.released {
+               return errors.New("allocation is already released")
+       }
        a.preempted = true
+       return nil
+}
+
+// MarkUnPreempted unmarks the allocation as preempted.
+func (a *Allocation) MarkUnPreempted() {
+       a.Lock()
+       defer a.Unlock()
+       a.preempted = false
 }
 
 // IsPreempted returns whether the allocation has been marked for preemption 
or not.
diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 3c065764..986f1bb4 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -330,15 +330,30 @@ func (sa *Application) timeoutStateTimer(expectedState 
string, event application
                                zap.String("state", sa.stateMachine.Current()))
                        // if the app is completing, but there are placeholders 
left, first do the cleanup
                        if sa.IsCompleting() && 
!resources.IsZero(sa.allocatedPlaceholder) {
+                               replacing := 0
+                               preempted := 0
                                var toRelease []*Allocation
                                for _, alloc := range 
sa.getPlaceholderAllocations() {
                                        // skip over the allocations that are 
already marked for release
                                        if alloc.IsReleased() {
+                                               replacing++
+                                               continue
+                                       }
+                                       err := alloc.SetReleased(true)
+                                       if err != nil {
+                                               
log.Log(log.SchedApplication).Warn("allocation is already preempted, so 
skipping release process",
+                                                       
zap.String("applicationID", sa.ApplicationID),
+                                                       
zap.String("allocationKey", alloc.GetAllocationKey()))
+                                               preempted++
                                                continue
                                        }
-                                       alloc.SetReleased(true)
                                        toRelease = append(toRelease, alloc)
                                }
+                               log.Log(log.SchedApplication).Info("application 
is getting timed out, releasing allocated placeholders",
+                                       zap.String("AppID", sa.ApplicationID),
+                                       zap.Int("replaced", replacing),
+                                       zap.Int("preempted", preempted),
+                                       zap.Int("releasing", len(toRelease)))
                                sa.notifyRMAllocationReleased(toRelease, 
si.TerminationType_TIMEOUT, "releasing placeholders on app complete")
                                sa.clearStateTimer()
                        } else {
@@ -395,19 +410,28 @@ func (sa *Application) timeoutPlaceholderProcessing() {
                // Case 1: if all app's placeholders are allocated, only part 
of them gets replaced, just delete the remaining placeholders
                var toRelease []*Allocation
                replacing := 0
+               preempted := 0
                for _, alloc := range sa.getPlaceholderAllocations() {
                        // skip over the allocations that are already marked 
for release, they will be replaced soon
                        if alloc.IsReleased() {
                                replacing++
                                continue
                        }
-                       alloc.SetReleased(true)
+                       err := alloc.SetReleased(true)
+                       if err != nil {
+                               log.Log(log.SchedApplication).Warn("allocation 
is already preempted, so skipping release process",
+                                       zap.String("applicationID", 
sa.ApplicationID),
+                                       zap.String("allocationKey", 
alloc.GetAllocationKey()))
+                               preempted++
+                               continue
+                       }
                        toRelease = append(toRelease, alloc)
                }
                log.Log(log.SchedApplication).Info("Placeholder timeout, 
releasing allocated placeholders",
                        zap.String("AppID", sa.ApplicationID),
-                       zap.Int("placeholders being replaced", replacing),
-                       zap.Int("releasing placeholders", len(toRelease)))
+                       zap.Int("replaced", replacing),
+                       zap.Int("preempted", preempted),
+                       zap.Int("releasing", len(toRelease)))
                // trigger the release of the placeholders: accounting updates 
when the release is done
                sa.notifyRMAllocationReleased(toRelease, 
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder 
timeout")
        } else {
@@ -425,22 +449,38 @@ func (sa *Application) timeoutPlaceholderProcessing() {
                }
                // all allocations are placeholders release them all
                var toRelease, pendingRelease []*Allocation
+               preempted := 0
                for _, alloc := range sa.allocations {
-                       alloc.SetReleased(true)
+                       err := alloc.SetReleased(true)
+                       if err != nil {
+                               log.Log(log.SchedApplication).Warn("allocation 
is already preempted, so skipping release process",
+                                       zap.String("applicationID", 
sa.ApplicationID),
+                                       zap.String("allocationKey", 
alloc.GetAllocationKey()))
+                               preempted++
+                               continue
+                       }
                        toRelease = append(toRelease, alloc)
                }
                // get all open requests and remove them all filter out already 
allocated as they are already released
                for _, alloc := range sa.requests {
                        if !alloc.IsAllocated() {
-                               alloc.SetReleased(true)
+                               err := alloc.SetReleased(true)
+                               if err != nil {
+                                       
log.Log(log.SchedApplication).Warn("allocation is already preempted, so 
skipping release process",
+                                               zap.String("applicationID", 
sa.ApplicationID),
+                                               zap.String("allocationKey", 
alloc.GetAllocationKey()))
+                                       preempted++
+                                       continue
+                               }
                                pendingRelease = append(pendingRelease, alloc)
                                
sa.placeholderData[alloc.taskGroupName].TimedOut++
                        }
                }
                log.Log(log.SchedApplication).Info("Placeholder timeout, 
releasing allocated and pending placeholders",
                        zap.String("AppID", sa.ApplicationID),
-                       zap.Int("releasing placeholders", len(toRelease)),
-                       zap.Int("pending placeholders", len(pendingRelease)),
+                       zap.Int("releasing", len(toRelease)),
+                       zap.Int("pending", len(pendingRelease)),
+                       zap.Int("preempted", preempted),
                        zap.String("gang scheduling style", 
sa.gangSchedulingStyle))
                sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
                // trigger the release of the allocated placeholders: 
accounting updates when the release is done
@@ -1202,9 +1242,15 @@ func (sa *Application) 
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
                                        zap.String("placeholderKey", 
ph.GetAllocationKey()),
                                        zap.Stringer("placeholder resource", 
ph.GetAllocatedResource()))
                                // release the placeholder and tell the RM
-                               ph.SetReleased(true)
-                               
sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT, 
"cancel placeholder: resource incompatible")
-                               
sa.appEvents.SendPlaceholderLargerEvent(ph.taskGroupName, sa.ApplicationID, 
ph.allocationKey, request.GetAllocatedResource(), ph.GetAllocatedResource())
+                               err := ph.SetReleased(true)
+                               if err != nil {
+                                       
log.Log(log.SchedApplication).Warn("allocation is already preempted, so 
skipping placeholder cancellation",
+                                               zap.String("applicationID", 
sa.ApplicationID),
+                                               zap.String("allocationKey", 
ph.GetAllocationKey()))
+                               } else {
+                                       
sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT, 
"cancel placeholder: resource incompatible")
+                                       
sa.appEvents.SendPlaceholderLargerEvent(ph.taskGroupName, sa.ApplicationID, 
ph.allocationKey, request.GetAllocatedResource(), ph.GetAllocatedResource())
+                               }
                                continue
                        }
                        // placeholder is the same or larger continue 
processing and difference is handled when the placeholder
@@ -1228,7 +1274,25 @@ func (sa *Application) 
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
                                // placeholder point to the real one in the 
releases list
                                ph.SetRelease(request)
                                // mark placeholder as released
-                               ph.SetReleased(true)
+                               err = ph.SetReleased(true)
+                               if err != nil {
+                                       
log.Log(log.SchedApplication).Warn("allocation is already preempted, so not 
proceeding further and reverting to old state",
+                                               zap.String("applicationID", 
sa.ApplicationID),
+                                               zap.String("allocationKey", 
ph.GetAllocationKey()))
+
+                                       // revert: allocate ask
+                                       _, err = sa.deallocateAsk(request)
+                                       if err != nil {
+                                               
log.Log(log.SchedApplication).Warn("deallocation of ask failed unexpectedly",
+                                                       zap.Error(err))
+                                       }
+                                       // revert: double link to make it 
easier to find
+                                       // alloc (the real one) releases points 
to the placeholder in the releases list
+                                       request.ClearRelease()
+                                       // revert: placeholder point to the 
real one in the releases list
+                                       ph.ClearRelease()
+                                       continue
+                               }
                                // bind node here so it will be handled 
properly upon replacement
                                request.SetBindTime(time.Now())
                                request.SetNodeID(node.NodeID)
@@ -1285,7 +1349,34 @@ func (sa *Application) 
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
                        // placeholder point to the real one in the releases 
list
                        phFit.SetRelease(reqFit)
                        // mark placeholder as released
-                       phFit.SetReleased(true)
+                       err = phFit.SetReleased(true)
+
+                       if err != nil {
+                               log.Log(log.SchedApplication).Warn("allocation 
is already preempted, so not proceeding further and reverting to old state",
+                                       zap.String("applicationID", 
sa.ApplicationID),
+                                       zap.String("allocationKey", 
phFit.GetAllocationKey()))
+
+                               // revert: node allocation
+                               if alloc := 
node.RemoveAllocation(reqFit.GetAllocationKey()); alloc != nil {
+                                       
log.Log(log.SchedApplication).Debug("Reverting: Node removal failed 
unexpectedly",
+                                               zap.String("applicationID", 
sa.ApplicationID),
+                                               zap.Stringer("alloc", reqFit))
+                               }
+
+                               // revert: allocate ask
+                               _, err = sa.deallocateAsk(reqFit)
+                               if err != nil {
+                                       
log.Log(log.SchedApplication).Warn("deallocation of ask failed unexpectedly",
+                                               zap.Error(err))
+                               }
+                               // revert: double link to make it easier to find
+                               // alloc (the real one) releases points to the 
placeholder in the releases list
+                               reqFit.ClearRelease()
+                               // revert: placeholder point to the real one in 
the releases list
+                               phFit.ClearRelease()
+                               return false
+                       }
+
                        // bind node here so it will be handled properly upon 
replacement
                        reqFit.SetBindTime(time.Now())
                        reqFit.SetNodeID(node.NodeID)
@@ -1340,7 +1431,9 @@ func (sa *Application) tryReservedAllocate(headRoom 
*resources.Resource, nodeIte
                // Do we need a specific node?
                if ask.GetRequiredNode() != "" {
                        if 
!reserve.node.CanAllocate(ask.GetAllocatedResource()) && 
!ask.HasTriggeredPreemption() {
-                               sa.tryRequiredNodePreemption(reserve, ask)
+                               // try preemption and see if we can free up 
resource
+                               preemptor := 
NewRequiredNodePreemptor(reserve.node, ask, sa)
+                               preemptor.tryPreemption()
                                continue
                        }
                }
@@ -1403,46 +1496,6 @@ func (sa *Application) tryPreemption(headRoom 
*resources.Resource, preemptionDel
        return preemptor.TryPreemption()
 }
 
-func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask 
*Allocation) bool {
-       // try preemption and see if we can free up resource
-       preemptor := NewRequiredNodePreemptor(reserve.node, ask)
-       result := preemptor.filterAllocations()
-       preemptor.sortAllocations()
-
-       // Are there any victims/asks to preempt?
-       victims := preemptor.GetVictims()
-       if len(victims) > 0 {
-               log.Log(log.SchedApplication).Info("Found victims for required 
node preemption",
-                       zap.String("ds allocation key", ask.GetAllocationKey()),
-                       zap.String("allocation name", ask.GetAllocationName()),
-                       zap.Int("no.of victims", len(victims)))
-               for _, victim := range victims {
-                       if victimQueue := 
sa.queue.GetQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
-                               
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
-                       }
-                       victim.MarkPreempted()
-                       
victim.SendPreemptedBySchedulerEvent(ask.GetAllocationKey(), 
ask.GetApplicationID(), sa.ApplicationID)
-               }
-               ask.MarkTriggeredPreemption()
-               sa.notifyRMAllocationReleased(victims, 
si.TerminationType_PREEMPTED_BY_SCHEDULER,
-                       "preempting allocations to free up resources to run 
daemon set ask: "+ask.GetAllocationKey())
-               return true
-       }
-       ask.LogAllocationFailure(common.NoVictimForRequiredNode, true)
-       ask.SendRequiredNodePreemptionFailedEvent(reserve.node.NodeID)
-       getRateLimitedReqNodeLog().Info("no victim found for required node 
preemption",
-               zap.String("allocation key", ask.GetAllocationKey()),
-               zap.String("allocation name", ask.GetAllocationName()),
-               zap.String("node", reserve.node.NodeID),
-               zap.Int("total allocations", result.totalAllocations),
-               zap.Int("requiredNode allocations", 
result.requiredNodeAllocations),
-               zap.Int("allocations already preempted", 
result.alreadyPreemptedAllocations),
-               zap.Int("higher priority allocations", 
result.higherPriorityAllocations),
-               zap.Int("allocations with non-matching resources", 
result.atLeastOneResNotMatched),
-               zap.Int("released placeholder allocations", 
result.releasedPhAllocations))
-       return false
-}
-
 // tryNodesNoReserve tries all the nodes for a reserved request that have not 
been tried yet.
 // This should never result in a reservation as the allocation is already 
reserved
 func (sa *Application) tryNodesNoReserve(ask *Allocation, iterator 
NodeIterator, reservedNode string) *AllocationResult {
@@ -2219,13 +2272,6 @@ func getRateLimitedAppLog() *log.RateLimitedLogger {
        return rateLimitedAppLog
 }
 
-func getRateLimitedReqNodeLog() *log.RateLimitedLogger {
-       initReqNodeLogOnce.Do(func() {
-               rateLimitedReqNodeLog = 
log.NewRateLimitedLogger(log.SchedApplication, time.Minute)
-       })
-       return rateLimitedReqNodeLog
-}
-
 func (sa *Application) updateRunnableStatus(runnableInQueue, 
runnableByUserLimit bool) {
        sa.Lock()
        defer sa.Unlock()
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 307bcf49..1be8c4ea 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1602,7 +1602,8 @@ func TestTimeoutPlaceholderAllocReleased(t *testing.T) {
        assert.NilError(t, err, "Unexpected error when creating resource from 
map")
        // add the placeholders to the app: one released, one still available.
        phReleased := newPlaceholderAlloc(appID1, nodeID1, res, tg1)
-       phReleased.SetReleased(true)
+       err = phReleased.SetReleased(true)
+       assert.NilError(t, err, "Unexpected error when releasing placeholder")
        app.AddAllocation(phReleased)
        app.addPlaceholderDataWithLocking(phReleased)
        assertPlaceholderData(t, app, tg1, 1, 0, 0, res)
@@ -1658,6 +1659,85 @@ func TestTimeoutPlaceholderAllocReleased(t *testing.T) {
        assertUserGroupResource(t, getTestUserGroup(), res)
 }
 
+func TestTimeoutPlaceholderAllocPreempted(t *testing.T) {
+       setupUGM()
+
+       originalPhTimeout := defaultPlaceholderTimeout
+       defaultPlaceholderTimeout = 100 * time.Millisecond
+       defer func() { defaultPlaceholderTimeout = originalPhTimeout }()
+
+       app, testHandler := newApplicationWithHandler(appID1, "default", 
"root.a")
+       assert.Assert(t, app.getPlaceholderTimer() == nil, "Placeholder timer 
should be nil on create")
+       app.SetState(Accepted.String())
+
+       resMap := map[string]string{"memory": "100", "vcores": "10"}
+       res, err := resources.NewResourceFromConf(resMap)
+       assert.NilError(t, err, "Unexpected error when creating resource from 
map")
+       // add the placeholders to the app: one released, one still available.
+       phReleased := newPlaceholderAlloc(appID1, nodeID1, res, tg1)
+       err = phReleased.SetReleased(true)
+       assert.NilError(t, err, "Unexpected error when releasing placeholder")
+       app.AddAllocation(phReleased)
+       app.addPlaceholderDataWithLocking(phReleased)
+       assertPlaceholderData(t, app, tg1, 1, 0, 0, res)
+       assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
1))
+
+       assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer 
should be initiated after the first placeholder allocation")
+       ph := newPlaceholderAlloc(appID1, nodeID1, res, tg1)
+       app.AddAllocation(ph)
+       app.addPlaceholderDataWithLocking(ph)
+       assertPlaceholderData(t, app, tg1, 2, 0, 0, res)
+       assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
2))
+
+       alloc := newAllocation(appID1, nodeID1, res)
+       app.AddAllocation(alloc)
+       assert.Assert(t, app.IsRunning(), "App should be in running state after 
the first allocation")
+       err = common.WaitForCondition(10*time.Millisecond, 1*time.Second, 
func() bool {
+               // Preempt the placeholder in the meantime
+               err = ph.MarkPreempted()
+               assert.NilError(t, err, "Unexpected error when marking 
placeholder preempted")
+               app.notifyRMAllocationReleased([]*Allocation{ph}, 
si.TerminationType_PREEMPTED_BY_SCHEDULER,
+                       "preempting allocations to free up resources to run 
ask: "+ph.GetAllocationKey())
+               return app.getPlaceholderTimer() == nil
+       })
+       assert.NilError(t, err, "Placeholder timeout cleanup did not trigger 
unexpectedly")
+       assert.Assert(t, app.IsRunning(), "App should be in running state after 
the first allocation")
+       assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
3))
+       // two state updates and 1 release event
+       events := testHandler.GetEvents()
+       var found bool
+       for _, event := range events {
+               if allocRelease, ok := 
event.(*rmevent.RMReleaseAllocationEvent); ok {
+                       assert.Equal(t, len(allocRelease.ReleasedAllocations), 
1, "one allocation should have been released")
+                       assert.Equal(t, 
allocRelease.ReleasedAllocations[0].GetTerminationType(), 
si.TerminationType_PREEMPTED_BY_SCHEDULER, "")
+                       assert.Equal(t, 
allocRelease.ReleasedAllocations[0].AllocationKey, ph.allocationKey, "wrong 
placeholder allocation released on timeout")
+                       found = true
+               }
+       }
+       assert.Assert(t, found, "release allocation event not found in list")
+       assert.Assert(t, resources.Equals(app.GetAllocatedResource(), res), 
"Unexpected allocated resources for the app")
+       // a released placeholder still holds the resource until release 
confirmed by the RM
+       assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), 
resources.Multiply(res, 2)), "Unexpected placeholder resources for the app")
+       // tracking data not updated until confirmed by the RM
+       assertPlaceholderData(t, app, tg1, 2, 0, 0, res)
+       assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
3))
+       // do what the RM does and respond to the release
+       removed := app.RemoveAllocation(ph.allocationKey, 
si.TerminationType_TIMEOUT)
+       assert.Assert(t, removed != nil, "expected allocation got nil")
+       assert.Equal(t, ph.allocationKey, removed.allocationKey, "expected 
placeholder to be returned")
+       assertPlaceholderData(t, app, tg1, 2, 1, 0, res)
+       assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), res), 
"placeholder resources still accounted for on the app")
+       assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
2))
+
+       // process the replacement no real alloc linked account for that
+       removed = app.ReplaceAllocation(phReleased.allocationKey)
+       assert.Assert(t, removed != nil, "expected allocation got nil")
+       assert.Equal(t, phReleased.allocationKey, removed.allocationKey, 
"expected placeholder to be returned")
+       assertPlaceholderData(t, app, tg1, 2, 1, 1, res)
+       assert.Assert(t, resources.IsZero(app.GetPlaceholderResource()), 
"placeholder resources still accounted for on the app")
+       assertUserGroupResource(t, getTestUserGroup(), res)
+}
+
 func TestTimeoutPlaceholderCompleting(t *testing.T) {
        setupUGM()
        phTimeout := common.ConvertSITimeout(5)
@@ -1666,6 +1746,59 @@ func TestTimeoutPlaceholderCompleting(t *testing.T) {
        assert.Equal(t, app.execTimeout, phTimeout, "placeholder timeout not 
initialised correctly")
        app.SetState(Accepted.String())
 
+       resMap := map[string]string{"memory": "100", "vcores": "10"}
+       res, err := resources.NewResourceFromConf(resMap)
+       assert.NilError(t, err, "Unexpected error when creating resource from 
map")
+       // add the placeholder to the app
+       ph := newPlaceholderAlloc(appID1, nodeID1, res, tg1)
+       app.AddAllocation(ph)
+       assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer 
should be initiated after the first placeholder allocation")
+       app.addPlaceholderDataWithLocking(ph)
+       assertPlaceholderData(t, app, tg1, 1, 0, 0, res)
+       assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
1))
+       // add a real allocation as well
+       alloc := newAllocation(appID1, nodeID1, res)
+       app.AddAllocation(alloc)
+       // move on to running
+       app.SetState(Running.String())
+       assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
2))
+       // remove allocation to trigger state change
+       app.RemoveAllocation(alloc.GetAllocationKey(), 
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
+       assert.Assert(t, app.IsCompleting(), "App should be in completing state 
all allocs have been removed")
+       assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
1))
+       // make sure the placeholders time out
+       err = common.WaitForCondition(10*time.Millisecond, 1*time.Second, 
func() bool {
+               return app.getPlaceholderTimer() == nil
+       })
+       assert.NilError(t, err, "Placeholder timer did not time out as 
expected")
+       events := testHandler.GetEvents()
+       var found bool
+       for _, event := range events {
+               if allocRelease, ok := 
event.(*rmevent.RMReleaseAllocationEvent); ok {
+                       assert.Equal(t, len(allocRelease.ReleasedAllocations), 
1, "one allocation should have been released")
+                       found = true
+               }
+       }
+       assert.Assert(t, found, "release allocation event not found in list")
+       assert.Assert(t, app.IsCompleting(), "App should be in completing 
state")
+       assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
1))
+       // tracking data not updated until confirmed by the RM
+       assertPlaceholderData(t, app, tg1, 1, 0, 0, res)
+       // do what the RM does and respond to the release
+       removed := app.RemoveAllocation(ph.allocationKey, 
si.TerminationType_TIMEOUT)
+       assert.Assert(t, removed != nil, "expected allocation got nil")
+       assert.Equal(t, ph.allocationKey, removed.allocationKey, "expected 
placeholder to be returned")
+       assertPlaceholderData(t, app, tg1, 1, 1, 0, res)
+}
+
+func TestTimeoutPlaceholderCompletingWithPreemptedPh(t *testing.T) {
+       setupUGM()
+       phTimeout := common.ConvertSITimeout(5)
+       app, testHandler := newApplicationWithPlaceholderTimeout(appID1, 
"default", "root.a", 5)
+       assert.Assert(t, app.getPlaceholderTimer() == nil, "Placeholder timer 
should be nil on create")
+       assert.Equal(t, app.execTimeout, phTimeout, "placeholder timeout not 
initialised correctly")
+       app.SetState(Accepted.String())
+
        resMap := map[string]string{"memory": "100", "vcores": "10"}
        res, err := resources.NewResourceFromConf(resMap)
        assert.NilError(t, err, "Unexpected error when creating resource from 
map")
@@ -1689,6 +1822,11 @@ func TestTimeoutPlaceholderCompleting(t *testing.T) {
        assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
1))
        // make sure the placeholders time out
        err = common.WaitForCondition(10*time.Millisecond, 1*time.Second, 
func() bool {
+               // Preempt the placeholder in the meantime
+               err = ph.MarkPreempted()
+               assert.NilError(t, err, "Unexpected error when marking 
placeholder preempted")
+               app.notifyRMAllocationReleased([]*Allocation{ph}, 
si.TerminationType_PREEMPTED_BY_SCHEDULER,
+                       "preempting allocations to free up resources to run 
ask: "+ph.GetAllocationKey())
                return app.getPlaceholderTimer() == nil
        })
        assert.NilError(t, err, "Placeholder timer did not time out as 
expected")
@@ -1697,6 +1835,8 @@ func TestTimeoutPlaceholderCompleting(t *testing.T) {
        for _, event := range events {
                if allocRelease, ok := 
event.(*rmevent.RMReleaseAllocationEvent); ok {
                        assert.Equal(t, len(allocRelease.ReleasedAllocations), 
1, "one allocation should have been released")
+                       assert.Equal(t, 
allocRelease.ReleasedAllocations[0].GetTerminationType(), 
si.TerminationType_PREEMPTED_BY_SCHEDULER, "")
+                       assert.Equal(t, 
allocRelease.ReleasedAllocations[0].AllocationKey, ph.allocationKey, "wrong 
placeholder allocation released on timeout")
                        found = true
                }
        }
diff --git a/pkg/scheduler/objects/preemption.go 
b/pkg/scheduler/objects/preemption.go
index 555b9cea..f2aa91dc 100644
--- a/pkg/scheduler/objects/preemption.go
+++ b/pkg/scheduler/objects/preemption.go
@@ -647,11 +647,34 @@ func (p *Preemptor) TryPreemption() (*AllocationResult, 
bool) {
                return nil, false
        }
 
+       // Has any victim released?
+       // (Placeholder ?) Allocation chosen as victim earlier in the beginning 
of preemption cycle but released in the meantime also should be prevented from
+       // proceeding further to avoid choosing the same allocation which is 
already at the verge of replacement process for this current preemption cycle.
+       var preemptedVictims []*Allocation
+       for _, victim := range finalVictims {
+               err := victim.MarkPreempted()
+               if err != nil {
+                       log.Log(log.SchedPreemption).Info("Victim is already 
released, so marking earlier allocations as un preempted and not moving forward 
further on this preemption",
+                               zap.String("askApplicationID", 
p.ask.applicationID),
+                               zap.String("askAllocationKey", 
p.ask.allocationKey),
+                               zap.String("victimApplicationID", 
victim.GetApplicationID()),
+                               zap.String("victimAllocationKey", 
victim.GetAllocationKey()),
+                               zap.Error(err))
+                       // revert preempted victims (if any) to old state
+                       for _, v := range preemptedVictims {
+                               v.MarkUnPreempted()
+                       }
+                       // victim already released, so preemption doesn't help
+                       
p.ask.LogAllocationFailure(common.PreemptionVictimsReleased, true)
+                       return nil, false
+               }
+               preemptedVictims = append(preemptedVictims, victim)
+       }
+
        // preempt the victims
        for _, victim := range finalVictims {
                if victimQueue := 
p.queue.GetQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
                        
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
-                       victim.MarkPreempted()
                        log.Log(log.SchedPreemption).Info("Preempting task",
                                zap.String("askApplicationID", 
p.ask.applicationID),
                                zap.String("askAllocationKey", 
p.ask.allocationKey),
diff --git a/pkg/scheduler/objects/preemption_test.go 
b/pkg/scheduler/objects/preemption_test.go
index c09884cb..017c9395 100644
--- a/pkg/scheduler/objects/preemption_test.go
+++ b/pkg/scheduler/objects/preemption_test.go
@@ -652,6 +652,83 @@ func 
TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource(t *testing.T
        assertAllocationLog(t, ask3, []string{common.PreemptionShortfall})
 }
 
+// TestTryPreemption_VictimReleased_InsufficientResource  Test try preemption 
on queue with simple queue hierarchy. Since Node has enough resources to 
accommodate, preemption happens because of queue resource constraint.xw
+// Guaranteed and Max resource set on both victim queue path and preemptor 
queue paths. victim and preemptor queue are siblings.
+// Request (Preemptor) resource type matches with all resource types of the 
victim. Guaranteed set only on that specific resource type.
+// Setup:
+// Nodes are Node1 and Node2. Node has enough space to accommodate the new ask.
+// root.parent. Max set on parent, first: 18
+// root.parent.child1. Guaranteed not set. 2 Allocations (belongs to single 
app) are running on node1 and node2. Each Allocation usage is first:5. Total 
usage is first:10.
+// root.parent.child2. Guaranteed set 5. Request of first:5 is waiting for 
resources.
+// root.parent.child3. Guaranteed not set. 1 Allocation is running on node2. 
Total usage is first:5.
+// Preemption options are 1. 2 Alloc running on Node 2 but on child 1 and 
child 3 queues.  2. 2 Alloc running on Node 2 and child 1 queue. 3. All three 3 
allocs.
+// option 1 >> option 2 >> option 3. In option 3, preempting third allocation 
is unnecessary, should avoid this option.
+// Either option 1 or option2 is fine, but not option 3.
+// Though victims are available in the beginning, some of those are "released" 
in the meantime. Hence, preemption process is halted and cannot proceed with 
the remaining victims
+func TestTryPreemption_VictimReleased_InsufficientResource(t *testing.T) {
+       appQueueMapping := NewAppQueueMapping()
+       node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 30})
+       node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 30})
+       iterator := getNodeIteratorFn(node1, node2)
+       rootQ, err := createRootQueue(map[string]string{"first": "60"})
+       assert.NilError(t, err)
+       parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, 
map[string]string{"first": "18"}, nil, appQueueMapping)
+       assert.NilError(t, err)
+       childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, 
nil, nil, appQueueMapping)
+       assert.NilError(t, err)
+       childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, 
nil, map[string]string{"first": "15"}, appQueueMapping)
+       assert.NilError(t, err)
+       childQ3, err := createManagedQueueGuaranteed(parentQ, "child3", false, 
nil, nil, appQueueMapping)
+       assert.NilError(t, err)
+
+       alloc1, alloc2, err := creatApp1(childQ1, node1, node2, 
map[string]resources.Quantity{"first": 5}, appQueueMapping)
+       assert.NilError(t, err)
+
+       app3 := newApplication(appID3, "default", "root.parent.child3")
+       app3.SetQueue(childQ3)
+       childQ3.AddApplication(app3)
+       appQueueMapping.AddAppQueueMapping(app3.ApplicationID, childQ3)
+
+       ask4 := newAllocationAsk("alloc4", appID3, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       ask4.createTime = time.Now()
+       assert.NilError(t, app3.AddAllocationAsk(ask4))
+
+       alloc4 := newAllocationWithKey("alloc4", appID3, nodeID2, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       alloc4.createTime = ask4.createTime
+       app3.AddAllocation(alloc4)
+       assert.Check(t, node2.TryAddAllocation(alloc4), "node alloc2 failed")
+       assert.NilError(t, 
childQ3.TryIncAllocatedResource(ask4.GetAllocatedResource()))
+
+       app2, ask3, err := creatApp2(childQ2, 
map[string]resources.Quantity{"first": 10}, "alloc3", appQueueMapping)
+       assert.NilError(t, err)
+
+       headRoom := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods": 
3})
+       preemptor := NewPreemptor(app2, headRoom, 30*time.Second, ask3, 
iterator(), false)
+
+       // Init Queue snapshots before in hand so that victims collection 
process does not miss "alloc4"
+       preemptor.initQueueSnapshots()
+
+       // Now that this has been included in the victims list, release it so 
that it preemption fail at the end.
+       err = alloc4.SetReleased(true)
+       assert.NilError(t, err)
+
+       allocs := map[string]string{}
+       allocs["alloc3"] = nodeID2
+
+       plugin := mock.NewPreemptionPredicatePlugin(nil, allocs, nil)
+       plugins.RegisterSchedulerPlugin(plugin)
+       defer plugins.UnregisterSchedulerPlugins()
+
+       result, ok := preemptor.TryPreemption()
+
+       assert.Assert(t, result == nil, "unexpected result")
+       assert.Equal(t, ok, false, "no victims found")
+       assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
+       assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
+       assert.Check(t, !alloc4.IsPreempted(), "alloc2 preempted")
+       assertAllocationLog(t, ask3, []string{common.PreemptionVictimsReleased})
+}
+
 // TestTryPreemption_VictimsAvailableOnDifferentNodes Test try preemption on 
queue with simple queue hierarchy. Since Node doesn't have enough resources to 
accomodate, preemption happens because of node resource constraint.
 // Guaranteed and Max resource set on both victim queue path and preemptor 
queue path. victim and preemptor queue are siblings.
 // Request (Preemptor) resource type matches with 1 resource type of the 
victim. Guaranteed also set on specific resource type. 2 Victims are available, 
but total resource usage is lesser than ask requirement.
diff --git a/pkg/scheduler/objects/queue_test.go 
b/pkg/scheduler/objects/queue_test.go
index 520e7e2d..4308749a 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -2072,7 +2072,8 @@ func TestFindEligiblePreemptionVictims(t *testing.T) {
        alloc2.released = false
 
        // alloc2 has already been preempted and should not be considered a 
valid victim
-       alloc2.MarkPreempted()
+       err = alloc2.MarkPreempted()
+       assert.NilError(t, err, "failed to mark preempted node")
        snapshot = leaf1.FindEligiblePreemptionVictims(leaf1.QueuePath, ask)
        assert.Equal(t, 1, len(victims(snapshot)), "wrong victim count")
        assert.Equal(t, alloc3.allocationKey, 
victims(snapshot)[0].allocationKey, "wrong alloc")
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go 
b/pkg/scheduler/objects/quota_change_preemptor.go
index 1d61c7d3..c302dfb3 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -303,6 +303,13 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims() {
        for app, victims := range apps {
                if len(victims) > 0 {
                        for _, victim := range victims {
+                               err := victim.MarkPreempted()
+                               if err != nil {
+                                       
log.Log(log.SchedRequiredNodePreemption).Warn("allocation is already released, 
so not proceeding further on the daemon set preemption process",
+                                               zap.String("applicationID", 
victim.applicationID),
+                                               zap.String("allocationKey", 
victim.GetAllocationKey()))
+                                       continue
+                               }
                                
log.Log(log.SchedQuotaChangePreemption).Info("Preempting victims for quota 
change preemption",
                                        zap.String("queue", 
qcp.queue.GetQueuePath()),
                                        zap.String("victim allocation key", 
victim.allocationKey),
@@ -311,7 +318,6 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims() {
                                        zap.String("victim node", 
victim.GetNodeID()),
                                )
                                
qcp.queue.IncPreemptingResource(victim.GetAllocatedResource())
-                               victim.MarkPreempted()
                                
victim.SendPreemptedByQuotaChangeEvent(qcp.queue.GetQueuePath())
                        }
                        app.notifyRMAllocationReleased(victims, 
si.TerminationType_PREEMPTED_BY_SCHEDULER,
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go 
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index 1fd3b1ef..d578122a 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -180,12 +180,16 @@ func TestQuotaChangeFilterVictims(t *testing.T) {
                                asks[1].SetRequiredNode("node2")
                        }
                        if tc.irrelevantAllocations[1] {
-                               asks[2].MarkPreempted()
-                               asks[3].MarkPreempted()
+                               err = asks[2].MarkPreempted()
+                               assert.NilError(t, err)
+                               err = asks[3].MarkPreempted()
+                               assert.NilError(t, err)
                        }
                        if tc.irrelevantAllocations[2] {
-                               asks[4].SetReleased(true)
-                               asks[5].SetReleased(true)
+                               err = asks[4].SetReleased(true)
+                               assert.NilError(t, err)
+                               err = asks[5].SetReleased(true)
+                               assert.NilError(t, err)
                        }
                        preemptor := NewQuotaChangePreemptor(tc.queue)
                        preemptor.preemptableResource = tc.preemptableResource
diff --git a/pkg/scheduler/objects/required_node_preemptor.go 
b/pkg/scheduler/objects/required_node_preemptor.go
index 015cd9e1..404edae6 100644
--- a/pkg/scheduler/objects/required_node_preemptor.go
+++ b/pkg/scheduler/objects/required_node_preemptor.go
@@ -19,12 +19,20 @@
 package objects
 
 import (
+       "time"
+
+       "go.uber.org/zap"
+
+       "github.com/apache/yunikorn-core/pkg/common"
        "github.com/apache/yunikorn-core/pkg/common/resources"
+       "github.com/apache/yunikorn-core/pkg/log"
+       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
 type PreemptionContext struct {
        node        *Node
        requiredAsk *Allocation
+       application *Application
        allocations []*Allocation
 }
 
@@ -37,15 +45,70 @@ type filteringResult struct {
        releasedPhAllocations       int // number of ph allocations released
 }
 
-func NewRequiredNodePreemptor(node *Node, requiredAsk *Allocation) 
*PreemptionContext {
+func getRateLimitedReqNodeLog() *log.RateLimitedLogger {
+       initReqNodeLogOnce.Do(func() {
+               rateLimitedReqNodeLog = 
log.NewRateLimitedLogger(log.SchedRequiredNodePreemption, time.Minute)
+       })
+       return rateLimitedReqNodeLog
+}
+
+func NewRequiredNodePreemptor(node *Node, requiredAsk *Allocation, application 
*Application) *PreemptionContext {
        preemptor := &PreemptionContext{
                node:        node,
                requiredAsk: requiredAsk,
+               application: application,
                allocations: make([]*Allocation, 0),
        }
        return preemptor
 }
 
+func (p *PreemptionContext) tryPreemption() {
+       result := p.filterAllocations()
+       p.sortAllocations()
+
+       // Are there any victims/asks to preempt?
+       victims := p.GetVictims()
+       if len(victims) > 0 {
+               log.Log(log.SchedRequiredNodePreemption).Info("Found victims 
for required node preemption",
+                       zap.String("ds allocation key", 
p.requiredAsk.GetAllocationKey()),
+                       zap.String("allocation name", 
p.requiredAsk.GetAllocationName()),
+                       zap.Int("no.of victims", len(victims)))
+               for _, victim := range victims {
+                       err := victim.MarkPreempted()
+                       if err != nil {
+                               
log.Log(log.SchedRequiredNodePreemption).Warn("allocation is already released, 
so not proceeding further on the daemon set preemption process",
+                                       zap.String("applicationID", 
p.requiredAsk.GetApplicationID()),
+                                       zap.String("allocationKey", 
victim.GetAllocationKey()))
+                               continue
+                       }
+                       if victimQueue := 
p.application.queue.GetQueueByAppID(victim.GetApplicationID()); victimQueue != 
nil {
+                               
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
+                       } else {
+                               
log.Log(log.SchedRequiredNodePreemption).Warn("BUG: Queue not found for daemon 
set preemption victim",
+                                       zap.String("queue", 
p.application.queue.Name),
+                                       zap.String("victimApplicationID", 
victim.GetApplicationID()),
+                                       zap.String("victimAllocationKey", 
victim.GetAllocationKey()))
+                       }
+                       
victim.SendPreemptedBySchedulerEvent(p.requiredAsk.GetAllocationKey(), 
p.requiredAsk.GetApplicationID(), p.application.queuePath)
+               }
+               p.requiredAsk.MarkTriggeredPreemption()
+               p.application.notifyRMAllocationReleased(victims, 
si.TerminationType_PREEMPTED_BY_SCHEDULER,
+                       "preempting allocations to free up resources to run 
daemon set ask: "+p.requiredAsk.GetAllocationKey())
+       } else {
+               
p.requiredAsk.LogAllocationFailure(common.NoVictimForRequiredNode, true)
+               
p.requiredAsk.SendRequiredNodePreemptionFailedEvent(p.node.NodeID)
+               getRateLimitedReqNodeLog().Info("no victim found for required 
node preemption",
+                       zap.String("allocation key", 
p.requiredAsk.GetAllocationKey()),
+                       zap.String("allocation name", 
p.requiredAsk.GetAllocationName()),
+                       zap.String("node", p.node.NodeID),
+                       zap.Int("total allocations", result.totalAllocations),
+                       zap.Int("requiredNode allocations", 
result.requiredNodeAllocations),
+                       zap.Int("allocations already preempted", 
result.alreadyPreemptedAllocations),
+                       zap.Int("higher priority allocations", 
result.higherPriorityAllocations),
+                       zap.Int("allocations with non-matching resources", 
result.atLeastOneResNotMatched))
+       }
+}
+
 func (p *PreemptionContext) filterAllocations() filteringResult {
        var result filteringResult
        yunikornAllocations := p.node.GetYunikornAllocations()
diff --git a/pkg/scheduler/objects/required_node_preemptor_test.go 
b/pkg/scheduler/objects/required_node_preemptor_test.go
index 730164b5..01244f58 100644
--- a/pkg/scheduler/objects/required_node_preemptor_test.go
+++ b/pkg/scheduler/objects/required_node_preemptor_test.go
@@ -36,10 +36,18 @@ func TestFilterAllocations(t *testing.T) {
                },
        })
 
+       rootQ, err := createRootQueue(map[string]string{"first": "100"})
+       assert.NilError(t, err)
+       defaultQ, err := createManagedQueueGuaranteed(rootQ, "default", true, 
map[string]string{"vcores": "3"}, nil, nil)
+       assert.NilError(t, err)
+
+       app := newApplication("app1", "default", "root.default")
+       app.SetQueue(defaultQ)
+
        // case 1: allocations are available but none of its resources are 
matching with ds request ask, hence no allocations considered
        requiredAsk := createAllocationAsk("ask12", "app1", true, true, 20,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"second": 5}))
-       p := NewRequiredNodePreemptor(node, requiredAsk)
+       p := NewRequiredNodePreemptor(node, requiredAsk, app)
        asks := prepareAllocationAsks(t, node)
        result := p.filterAllocations()
        verifyFilterResult(t, 10, 0, 10, 0, 0, 0, result)
@@ -52,7 +60,7 @@ func TestFilterAllocations(t *testing.T) {
        // case 2: allocations are available but priority is higher than ds 
request priority, hence no allocations considered
        requiredAsk1 := createAllocationAsk("ask12", "app1", true, true, 1,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
-       p1 := NewRequiredNodePreemptor(node, requiredAsk1)
+       p1 := NewRequiredNodePreemptor(node, requiredAsk1, app)
        asks = prepareAllocationAsks(t, node)
        result = p1.filterAllocations()
        verifyFilterResult(t, 10, 0, 0, 10, 0, 0, result)
@@ -65,7 +73,7 @@ func TestFilterAllocations(t *testing.T) {
        // case 3: victims are available as there are allocations with lower 
priority and resource match
        requiredAsk2 := createAllocationAsk("ask12", "app1", true, true, 20,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
-       p2 := NewRequiredNodePreemptor(node, requiredAsk2)
+       p2 := NewRequiredNodePreemptor(node, requiredAsk2, app)
        asks = prepareAllocationAsks(t, node)
        result = p2.filterAllocations()
        verifyFilterResult(t, 10, 0, 0, 0, 0, 0, result)
@@ -74,9 +82,12 @@ func TestFilterAllocations(t *testing.T) {
        removeAllocationAsks(node, asks)
 
        // case 4: allocation has been preempted
-       p3 := NewRequiredNodePreemptor(node, requiredAsk2)
+       requiredAsk3 := createAllocationAsk("ask12", "app1", true, true, 20,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       p3 := NewRequiredNodePreemptor(node, requiredAsk3, app)
        asks = prepareAllocationAsks(t, node)
-       node.GetAllocation("ask5").MarkPreempted() // "ask5" would be the first 
and only victim without this
+       err = node.GetAllocation("ask5").MarkPreempted() // "ask5" would be the 
first and only victim without this
+       assert.NilError(t, err)
        result = p3.filterAllocations()
        p3.sortAllocations()
 
@@ -86,7 +97,9 @@ func TestFilterAllocations(t *testing.T) {
        removeAllocationAsks(node, asks)
 
        // case 5: existing required node allocation
-       p4 := NewRequiredNodePreemptor(node, requiredAsk2)
+       requiredAsk4 := createAllocationAsk("ask12", "app1", true, true, 20,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       p4 := NewRequiredNodePreemptor(node, requiredAsk4, app)
        results := prepareAllocationAsks(t, node)
        results[8].requiredNode = "node-3"
 
@@ -97,7 +110,7 @@ func TestFilterAllocations(t *testing.T) {
        removeAllocationAsks(node, asks)
 
        // case 6: release ph allocation
-       p5 := NewRequiredNodePreemptor(node, requiredAsk2)
+       p5 := NewRequiredNodePreemptor(node, requiredAsk2, app)
        results = prepareAllocationAsks(t, node)
        results[9].released = true
 
@@ -117,11 +130,19 @@ func TestGetVictims(t *testing.T) {
                },
        })
 
+       rootQ, err := createRootQueue(map[string]string{"first": "100"})
+       assert.NilError(t, err)
+       defaultQ, err := createManagedQueueGuaranteed(rootQ, "default", true, 
map[string]string{"vcores": "3"}, nil, nil)
+       assert.NilError(t, err)
+
+       app := newApplication("app1", "default", "root.default")
+       app.SetQueue(defaultQ)
+
        // case 1: victims are available and its resources are matching with ds 
request ask
        requiredAsk := createAllocationAsk("ask11", "app1", true, true, 20,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 25}))
 
-       p := NewRequiredNodePreemptor(node, requiredAsk)
+       p := NewRequiredNodePreemptor(node, requiredAsk, app)
        asks := prepareAllocationAsks(t, node)
        p.filterAllocations()
        p.sortAllocations()
@@ -140,7 +161,7 @@ func TestGetVictims(t *testing.T) {
        // case 2: victims are available and its resources are matching with ds 
request ask (but with different quantity)
        requiredAsk2 := createAllocationAsk("ask13", "app1", true, true, 20,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
-       p2 := NewRequiredNodePreemptor(node, requiredAsk2)
+       p2 := NewRequiredNodePreemptor(node, requiredAsk2, app)
        asks = prepareAllocationAsks(t, node)
        p2.filterAllocations()
        p2.sortAllocations()
@@ -151,7 +172,7 @@ func TestGetVictims(t *testing.T) {
        // case 3: allocations are available and its resources are matching 
partially with ds request ask (because of different resource types), hence no 
victims
        requiredAsk3 := createAllocationAsk("ask13", "app1", true, true, 20,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5, 
"second": 5}))
-       p3 := NewRequiredNodePreemptor(node, requiredAsk3)
+       p3 := NewRequiredNodePreemptor(node, requiredAsk3, app)
        asks = prepareAllocationAsks(t, node)
        p3.filterAllocations()
        filteredAllocations := p3.getAllocations()
@@ -164,6 +185,19 @@ func TestGetVictims(t *testing.T) {
        victims3 := p3.GetVictims()
        assert.Equal(t, len(victims3), 0)
        removeAllocationAsks(node, asks)
+
+       // case 4: victim chosen earlier released in the meantime
+       requiredAsk4 := createAllocationAsk("ask14", "app1", true, true, 20,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 25}))
+       p4 := NewRequiredNodePreemptor(node, requiredAsk4, app)
+       asks = prepareAllocationAsks(t, node)
+       p4.filterAllocations()
+       p4.sortAllocations()
+       err = asks[1].SetReleased(true)
+       assert.NilError(t, err)
+       victims = p4.GetVictims()
+       assert.Equal(t, len(victims), 4)
+       removeAllocationAsks(node, asks)
 }
 
 func verifyFilterResult(t *testing.T, totalAllocations, 
requiredNodeAllocations, resourceNotEnough, higherPriorityAllocations, 
alreadyPreemptedAllocations int, releasedPhAllocations int, result 
filteringResult) {
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 1ee9a4b3..ebca07f5 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -3312,7 +3312,8 @@ func TestPreemptedPlaceholderSkip(t *testing.T) {
        }
 
        // mark the placeholder as preempted (shortcut not interested in usage 
accounting etc.)
-       ph.MarkPreempted()
+       err = ph.MarkPreempted()
+       assert.NilError(t, err, "failed to mark preempted placeholder")
 
        // replace the placeholder should NOT work
        result = partition.tryPlaceholderAllocate()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to