This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 65d35fc9 [YUNIKORN-3219] Cancel reservations after the wait time
expires (#1067)
65d35fc9 is described below
commit 65d35fc9f0b061a7e9e420acb6a099d4be552f5e
Author: mani <[email protected]>
AuthorDate: Mon Mar 30 16:25:18 2026 +0530
[YUNIKORN-3219] Cancel reservations after the wait time expires (#1067)
Closes: #1067
Signed-off-by: mani <[email protected]>
---
pkg/scheduler/objects/application.go | 14 ++++++++++++++
pkg/scheduler/objects/application_test.go | 20 ++++++++++++++++++++
pkg/scheduler/objects/reservation.go | 18 +++++++++++-------
pkg/scheduler/objects/reservation_test.go | 5 +++--
4 files changed, 48 insertions(+), 9 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 986f1bb4..1bf020c4 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -1425,6 +1425,20 @@ func (sa *Application) tryReservedAllocate(headRoom
*resources.Resource, nodeIte
}
if !sa.checkHeadRooms(ask, userHeadroom, headRoom) {
+ // Cancel the reservation after wait time expires
+ createTime := reserve.createTime
+
+ askAge :=
time.Since(createTime.Add(reservationWaitTimeout))
+
+ // Has wait time reached?
+ if askAge > reservationWaitTimeout {
+ num := sa.unReserveInternal(reserve)
+ sa.queue.UnReserve(sa.ApplicationID, num)
+ log.Log(log.SchedApplication).Info("Cancelled
reservation as wait time expired",
+ zap.String("reserve app",
reserve.appID),
+ zap.String("reserve alloc key",
reserve.allocKey),
+ zap.String("node", reserve.nodeID))
+ }
continue
}
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 1be8c4ea..663ca2ee 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2543,6 +2543,9 @@ func TestTryAllocatePreemptNodeWithReservations(t
*testing.T) {
alloc3 := result3.Request
assert.Assert(t, alloc3 != nil, "alloc3 expected")
assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been
preempted")
+
+ // reset wait timeout
+ reservationWaitTimeout = 60 * time.Minute
}
func TestTryAllocatePreemptNodeWithReservationsWithHighPriority(t *testing.T) {
@@ -2568,6 +2571,9 @@ func
TestTryAllocatePreemptNodeWithReservationsWithHighPriority(t *testing.T) {
alloc3 := result4.Request
assert.Assert(t, alloc3 != nil, "alloc3 expected")
assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been
preempted")
+
+ // reset wait timeout
+ reservationWaitTimeout = 60 * time.Minute
}
// TestTryAllocatePreemptNodeWithReservationsNotPossibleToCancel Ensures
reservations cannot be cancelled because of the following constraints:
@@ -2631,6 +2637,9 @@ func
TestTryAllocatePreemptNodeWithReservationsNotPossibleToCancel(t *testing.T)
alloc3 := result5.Request
assert.Assert(t, alloc3 != nil, "alloc3 expected")
assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been
preempted")
+
+ // reset wait timeout
+ reservationWaitTimeout = 60 * time.Minute
}
func TestMaxAskPriority(t *testing.T) {
@@ -3285,6 +3294,7 @@ func TestGetRateLimitedAppLog(t *testing.T) {
}
func TestTryAllocateWithReservedHeadRoomChecking(t *testing.T) {
+ setupUGM()
defer func() {
if r := recover(); r != nil {
t.Fatalf("reserved headroom test regression: %v", r)
@@ -3315,6 +3325,16 @@ func TestTryAllocateWithReservedHeadRoomChecking(t
*testing.T) {
iter := getNodeIteratorFn(node1, node2)
result := app.tryReservedAllocate(headRoom, iter)
assert.Assert(t, result == nil, "result is expected to be nil due to
insufficient headroom")
+ assert.Equal(t, len(app.reservations), 1)
+
+ // pass the time and try again
+ reservationWaitTimeout = -60 * time.Second
+ result = app.tryReservedAllocate(headRoom, iter)
+ assert.Assert(t, result == nil, "result is expected to be nil due to
insufficient headroom")
+ assert.Equal(t, len(app.reservations), 0)
+
+ // reset wait timeout
+ reservationWaitTimeout = 60 * time.Minute
}
func TestUpdateRunnableStatus(t *testing.T) {
diff --git a/pkg/scheduler/objects/reservation.go
b/pkg/scheduler/objects/reservation.go
index 3001372d..764d434d 100644
--- a/pkg/scheduler/objects/reservation.go
+++ b/pkg/scheduler/objects/reservation.go
@@ -19,6 +19,8 @@
package objects
import (
+ "time"
+
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/log"
@@ -30,9 +32,10 @@ type reservation struct {
allocKey string
// these references must ONLY be used for alloc, node and application
removal otherwise
// the reservations cannot be removed and scheduling might be impacted.
- app *Application
- node *Node
- alloc *Allocation
+ app *Application
+ node *Node
+ alloc *Allocation
+ createTime time.Time
}
// The reservation inside the scheduler. A reservation object is never mutated
and does not use locking.
@@ -47,10 +50,11 @@ func newReservation(node *Node, app *Application, alloc
*Allocation, appBased bo
return nil
}
res := &reservation{
- allocKey: alloc.GetAllocationKey(),
- alloc: alloc,
- app: app,
- node: node,
+ allocKey: alloc.GetAllocationKey(),
+ alloc: alloc,
+ app: app,
+ node: node,
+ createTime: time.Now(),
}
if appBased {
res.nodeID = node.NodeID
diff --git a/pkg/scheduler/objects/reservation_test.go
b/pkg/scheduler/objects/reservation_test.go
index 69716332..56457ec2 100644
--- a/pkg/scheduler/objects/reservation_test.go
+++ b/pkg/scheduler/objects/reservation_test.go
@@ -20,6 +20,7 @@ package objects
import (
"testing"
+ "time"
"gotest.tools/v3/assert"
@@ -45,8 +46,8 @@ func TestNewReservation(t *testing.T) {
{"nil alloc", node, app, nil, true, nil},
{"nil app", node, nil, ask, true, nil},
{"nil node", nil, app, ask, true, nil},
- {"node based", node, app, ask, false, &reservation{"app-1", "",
"alloc-1", app, node, ask}},
- {"app based", node, app, ask, true, &reservation{"", "node-1",
"alloc-1", app, node, ask}},
+ {"node based", node, app, ask, false, &reservation{"app-1", "",
"alloc-1", app, node, ask, time.Now()}},
+ {"app based", node, app, ask, true, &reservation{"", "node-1",
"alloc-1", app, node, ask, time.Now()}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]