This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 65d35fc9 [YUNIKORN-3219] Cancel reservations after the wait time 
expires (#1067)
65d35fc9 is described below

commit 65d35fc9f0b061a7e9e420acb6a099d4be552f5e
Author: mani <[email protected]>
AuthorDate: Mon Mar 30 16:25:18 2026 +0530

    [YUNIKORN-3219] Cancel reservations after the wait time expires (#1067)
    
    Closes: #1067
    
    Signed-off-by: mani <[email protected]>
---
 pkg/scheduler/objects/application.go      | 14 ++++++++++++++
 pkg/scheduler/objects/application_test.go | 20 ++++++++++++++++++++
 pkg/scheduler/objects/reservation.go      | 18 +++++++++++-------
 pkg/scheduler/objects/reservation_test.go |  5 +++--
 4 files changed, 48 insertions(+), 9 deletions(-)

diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 986f1bb4..1bf020c4 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -1425,6 +1425,20 @@ func (sa *Application) tryReservedAllocate(headRoom 
*resources.Resource, nodeIte
                }
 
                if !sa.checkHeadRooms(ask, userHeadroom, headRoom) {
+                       // Cancel the reservation after wait time expires
+                       createTime := reserve.createTime
+
+                       askAge := 
time.Since(createTime.Add(reservationWaitTimeout))
+
+                       // Has wait time reached?
+                       if askAge > reservationWaitTimeout {
+                               num := sa.unReserveInternal(reserve)
+                               sa.queue.UnReserve(sa.ApplicationID, num)
+                               log.Log(log.SchedApplication).Info("Cancelled 
reservation as wait time expired",
+                                       zap.String("reserve app", 
reserve.appID),
+                                       zap.String("reserve alloc key", 
reserve.allocKey),
+                                       zap.String("node", reserve.nodeID))
+                       }
                        continue
                }
 
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 1be8c4ea..663ca2ee 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2543,6 +2543,9 @@ func TestTryAllocatePreemptNodeWithReservations(t 
*testing.T) {
        alloc3 := result3.Request
        assert.Assert(t, alloc3 != nil, "alloc3 expected")
        assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been 
preempted")
+
+       // reset wait timeout
+       reservationWaitTimeout = 60 * time.Minute
 }
 
 func TestTryAllocatePreemptNodeWithReservationsWithHighPriority(t *testing.T) {
@@ -2568,6 +2571,9 @@ func 
TestTryAllocatePreemptNodeWithReservationsWithHighPriority(t *testing.T) {
        alloc3 := result4.Request
        assert.Assert(t, alloc3 != nil, "alloc3 expected")
        assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been 
preempted")
+
+       // reset wait timeout
+       reservationWaitTimeout = 60 * time.Minute
 }
 
 // TestTryAllocatePreemptNodeWithReservationsNotPossibleToCancel Ensures 
reservations cannot be cancelled because of the following constraints:
@@ -2631,6 +2637,9 @@ func 
TestTryAllocatePreemptNodeWithReservationsNotPossibleToCancel(t *testing.T)
        alloc3 := result5.Request
        assert.Assert(t, alloc3 != nil, "alloc3 expected")
        assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been 
preempted")
+
+       // reset wait timeout
+       reservationWaitTimeout = 60 * time.Minute
 }
 
 func TestMaxAskPriority(t *testing.T) {
@@ -3285,6 +3294,7 @@ func TestGetRateLimitedAppLog(t *testing.T) {
 }
 
 func TestTryAllocateWithReservedHeadRoomChecking(t *testing.T) {
+       setupUGM()
        defer func() {
                if r := recover(); r != nil {
                        t.Fatalf("reserved headroom test regression: %v", r)
@@ -3315,6 +3325,16 @@ func TestTryAllocateWithReservedHeadRoomChecking(t 
*testing.T) {
        iter := getNodeIteratorFn(node1, node2)
        result := app.tryReservedAllocate(headRoom, iter)
        assert.Assert(t, result == nil, "result is expected to be nil due to 
insufficient headroom")
+       assert.Equal(t, len(app.reservations), 1)
+
+       // pass the time and try again
+       reservationWaitTimeout = -60 * time.Second
+       result = app.tryReservedAllocate(headRoom, iter)
+       assert.Assert(t, result == nil, "result is expected to be nil due to 
insufficient headroom")
+       assert.Equal(t, len(app.reservations), 0)
+
+       // reset wait timeout
+       reservationWaitTimeout = 60 * time.Minute
 }
 
 func TestUpdateRunnableStatus(t *testing.T) {
diff --git a/pkg/scheduler/objects/reservation.go 
b/pkg/scheduler/objects/reservation.go
index 3001372d..764d434d 100644
--- a/pkg/scheduler/objects/reservation.go
+++ b/pkg/scheduler/objects/reservation.go
@@ -19,6 +19,8 @@
 package objects
 
 import (
+       "time"
+
        "go.uber.org/zap"
 
        "github.com/apache/yunikorn-core/pkg/log"
@@ -30,9 +32,10 @@ type reservation struct {
        allocKey string
        // these references must ONLY be used for alloc, node and application 
removal otherwise
        // the reservations cannot be removed and scheduling might be impacted.
-       app   *Application
-       node  *Node
-       alloc *Allocation
+       app        *Application
+       node       *Node
+       alloc      *Allocation
+       createTime time.Time
 }
 
 // The reservation inside the scheduler. A reservation object is never mutated 
and does not use locking.
@@ -47,10 +50,11 @@ func newReservation(node *Node, app *Application, alloc 
*Allocation, appBased bo
                return nil
        }
        res := &reservation{
-               allocKey: alloc.GetAllocationKey(),
-               alloc:    alloc,
-               app:      app,
-               node:     node,
+               allocKey:   alloc.GetAllocationKey(),
+               alloc:      alloc,
+               app:        app,
+               node:       node,
+               createTime: time.Now(),
        }
        if appBased {
                res.nodeID = node.NodeID
diff --git a/pkg/scheduler/objects/reservation_test.go 
b/pkg/scheduler/objects/reservation_test.go
index 69716332..56457ec2 100644
--- a/pkg/scheduler/objects/reservation_test.go
+++ b/pkg/scheduler/objects/reservation_test.go
@@ -20,6 +20,7 @@ package objects
 
 import (
        "testing"
+       "time"
 
        "gotest.tools/v3/assert"
 
@@ -45,8 +46,8 @@ func TestNewReservation(t *testing.T) {
                {"nil alloc", node, app, nil, true, nil},
                {"nil app", node, nil, ask, true, nil},
                {"nil node", nil, app, ask, true, nil},
-               {"node based", node, app, ask, false, &reservation{"app-1", "", 
"alloc-1", app, node, ask}},
-               {"app based", node, app, ask, true, &reservation{"", "node-1", 
"alloc-1", app, node, ask}},
+               {"node based", node, app, ask, false, &reservation{"app-1", "", 
"alloc-1", app, node, ask, time.Now()}},
+               {"app based", node, app, ask, true, &reservation{"", "node-1", 
"alloc-1", app, node, ask, time.Now()}},
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to