This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new e17eafaa [YUNIKORN-2458] Remove ask repeats from AllocationAsk (#854)
e17eafaa is described below
commit e17eafaab1c80eb15c668cc4241441305a1282f7
Author: Craig Condit <[email protected]>
AuthorDate: Wed Apr 24 09:55:21 2024 -0500
[YUNIKORN-2458] Remove ask repeats from AllocationAsk (#854)
Simplify ask and allocation handling by removing support for repeated
requests
in a single ask. This is functionality that is not used by the shim. By
removing support for repeated asks, we also ensure that there is a 1:1
relationship between ask and allocation.
Closes: #854
---
go.mod | 2 +-
go.sum | 4 +-
pkg/examples/simple_example.go | 3 +-
pkg/scheduler/health_checker_test.go | 2 +-
pkg/scheduler/objects/allocation.go | 5 +-
pkg/scheduler/objects/allocation_ask.go | 47 ++--
pkg/scheduler/objects/allocation_ask_test.go | 47 ++--
pkg/scheduler/objects/allocation_test.go | 6 +-
pkg/scheduler/objects/application.go | 136 ++++++-----
pkg/scheduler/objects/application_test.go | 184 +++++++-------
pkg/scheduler/objects/queue_test.go | 11 +-
.../objects/required_node_preemptor_test.go | 1 -
pkg/scheduler/objects/utilities_test.go | 30 +--
pkg/scheduler/partition.go | 4 +-
pkg/scheduler/partition_test.go | 126 +++++-----
pkg/scheduler/scheduler_test.go | 21 +-
pkg/scheduler/tests/application_tracking_test.go | 3 +-
pkg/scheduler/tests/mockscheduler_test.go | 19 +-
pkg/scheduler/tests/operation_test.go | 26 +-
pkg/scheduler/tests/performance_test.go | 53 +++--
pkg/scheduler/tests/plugin_test.go | 6 +-
pkg/scheduler/tests/recovery_test.go | 90 ++++---
pkg/scheduler/tests/smoke_test.go | 263 +++++++++++----------
pkg/scheduler/utilities_test.go | 44 ++--
pkg/webservice/dao/allocation_ask_info.go | 1 -
pkg/webservice/handlers.go | 3 +-
pkg/webservice/handlers_test.go | 33 ++-
27 files changed, 597 insertions(+), 573 deletions(-)
diff --git a/go.mod b/go.mod
index 39491005..7865af82 100644
--- a/go.mod
+++ b/go.mod
@@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core
go 1.21
require (
- github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240422062544-b70081933c38
+ github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240423191701-8c98b1604a7a
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
diff --git a/go.sum b/go.sum
index 85f36706..a76cb0cd 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,5 @@
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240422062544-b70081933c38
h1:/02cjuc0xpQPZIGezL45QZ6muGI7dfesu9l38U9fbx0=
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240422062544-b70081933c38/go.mod
h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240423191701-8c98b1604a7a
h1:H978zsTL2FvbRFnySO83DOFLO33PwHWFdmHvMoSVXsc=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240423191701-8c98b1604a7a/go.mod
h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0
h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
diff --git a/pkg/examples/simple_example.go b/pkg/examples/simple_example.go
index 590cf6fd..7cfaf864 100644
--- a/pkg/examples/simple_example.go
+++ b/pkg/examples/simple_example.go
@@ -218,8 +218,7 @@ partitions:
"vcore": {Value: 1},
},
},
- MaxAllocations: 20,
- ApplicationID: "app-1",
+ ApplicationID: "app-1",
},
},
RmID: "rm:123",
diff --git a/pkg/scheduler/health_checker_test.go
b/pkg/scheduler/health_checker_test.go
index a3f97f2a..23fc8854 100644
--- a/pkg/scheduler/health_checker_test.go
+++ b/pkg/scheduler/health_checker_test.go
@@ -216,7 +216,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) {
assert.Assert(t, healthInfo.HealthChecks[9].Succeeded, "The orphan
allocation check on the node should be successful")
// remove the allocation from the node, so we will have an orphan
allocation assigned to the app
- node.RemoveAllocation("key-0")
+ node.RemoveAllocation("key")
healthInfo = GetSchedulerHealthStatus(schedulerMetrics,
schedulerContext)
assert.Assert(t, healthInfo.HealthChecks[9].Succeeded, "The orphan
allocation check on the node should be successful")
assert.Assert(t, !healthInfo.HealthChecks[10].Succeeded, "The orphan
allocation check on the app should not be successful")
diff --git a/pkg/scheduler/objects/allocation.go
b/pkg/scheduler/objects/allocation.go
index 8812a891..ddca67f7 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -92,7 +92,7 @@ func NewAllocation(nodeID string, ask *AllocationAsk)
*Allocation {
bindTime: time.Now(),
nodeID: nodeID,
partitionName:
common.GetPartitionNameWithoutClusterID(ask.GetPartitionName()),
- allocationID: ask.allocationKey + "-" +
strconv.Itoa(ask.completedPendingAsk()),
+ allocationID: ask.allocationKey,
tags: ask.GetTagsClone(),
priority: ask.GetPriority(),
allocatedResource: ask.GetAllocatedResource().Clone(),
@@ -147,8 +147,7 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
allocatedResource:
resources.NewResourceFromProto(alloc.ResourcePerAlloc),
tags: CloneAllocationTags(alloc.AllocationTags),
priority: alloc.Priority,
- pendingAskRepeat: 0,
- maxAllocations: 1,
+ allocated: true,
taskGroupName: alloc.TaskGroupName,
placeholder: alloc.Placeholder,
createTime: time.Unix(creationTime, 0),
diff --git a/pkg/scheduler/objects/allocation_ask.go
b/pkg/scheduler/objects/allocation_ask.go
index 6b8bce6e..9611b9f0 100644
--- a/pkg/scheduler/objects/allocation_ask.go
+++ b/pkg/scheduler/objects/allocation_ask.go
@@ -42,7 +42,6 @@ type AllocationAsk struct {
execTimeout time.Duration // execTimeout for the allocation ask
createTime time.Time // the time this ask was created (used
in reservations)
priority int32
- maxAllocations int32
requiredNode string
allowPreemptSelf bool
allowPreemptOther bool
@@ -52,7 +51,7 @@ type AllocationAsk struct {
resKeyWithoutNode string // the reservation key without node
// Mutable fields which need protection
- pendingAskRepeat int32
+ allocated bool
allocLog map[string]*AllocationLogEntry
preemptionTriggered bool
preemptCheckTime time.Time
@@ -90,8 +89,6 @@ func NewAllocationAskFromSI(ask *si.AllocationAsk)
*AllocationAsk {
saa := &AllocationAsk{
allocationKey: ask.AllocationKey,
allocatedResource:
resources.NewResourceFromProto(ask.ResourceAsk),
- pendingAskRepeat: ask.MaxAllocations,
- maxAllocations: ask.MaxAllocations,
applicationID: ask.ApplicationID,
partitionName: ask.PartitionName,
@@ -124,7 +121,7 @@ func (aa *AllocationAsk) String() string {
if aa == nil {
return "ask is nil"
}
- return fmt.Sprintf("allocationKey %s, applicationID %s, Resource %s,
PendingRepeats %d", aa.allocationKey, aa.applicationID, aa.allocatedResource,
aa.GetPendingAskRepeat())
+ return fmt.Sprintf("allocationKey %s, applicationID %s, Resource %s,
Allocated %t", aa.allocationKey, aa.applicationID, aa.allocatedResource,
aa.IsAllocated())
}
// GetAllocationKey returns the allocation key for this ask
@@ -142,26 +139,35 @@ func (aa *AllocationAsk) GetPartitionName() string {
return aa.partitionName
}
-// updatePendingAskRepeat updates the pending ask repeat with the delta given.
-// Update the pending ask repeat counter with the delta (pos or neg). The
pending repeat is always 0 or higher.
-// If the update would cause the repeat to go negative the update is discarded
and false is returned.
-// In all other cases the repeat is updated and true is returned.
-func (aa *AllocationAsk) updatePendingAskRepeat(delta int32) bool {
+// allocate marks the ask as allocated and returns true if successful. An ask
may not be allocated multiple times.
+func (aa *AllocationAsk) allocate() bool {
aa.Lock()
defer aa.Unlock()
- if aa.pendingAskRepeat+delta >= 0 {
- aa.pendingAskRepeat += delta
- return true
+ if aa.allocated {
+ return false
}
- return false
+ aa.allocated = true
+ return true
}
-// GetPendingAskRepeat gets the number of repeat asks remaining
-func (aa *AllocationAsk) GetPendingAskRepeat() int32 {
+// deallocate marks the ask as pending and returns true if successful. An ask
may not be deallocated multiple times.
+func (aa *AllocationAsk) deallocate() bool {
+ aa.Lock()
+ defer aa.Unlock()
+
+ if !aa.allocated {
+ return false
+ }
+ aa.allocated = false
+ return true
+}
+
+// IsAllocated determines if this ask has been allocated yet
+func (aa *AllocationAsk) IsAllocated() bool {
aa.RLock()
defer aa.RUnlock()
- return aa.pendingAskRepeat
+ return aa.allocated
}
// GetCreateTime returns the time this ask was created
@@ -336,13 +342,6 @@ func (aa *AllocationAsk) HasTriggeredScaleUp() bool {
return aa.scaleUpTriggered
}
-// completedPendingAsk How many pending asks has been completed or processed
so far?
-func (aa *AllocationAsk) completedPendingAsk() int {
- aa.RLock()
- defer aa.RUnlock()
- return int(aa.maxAllocations - aa.pendingAskRepeat)
-}
-
func (aa *AllocationAsk) setReservationKeyForNode(node, resKey string) {
aa.Lock()
defer aa.Unlock()
diff --git a/pkg/scheduler/objects/allocation_ask_test.go
b/pkg/scheduler/objects/allocation_ask_test.go
index 59bc47da..8c913ee0 100644
--- a/pkg/scheduler/objects/allocation_ask_test.go
+++ b/pkg/scheduler/objects/allocation_ask_test.go
@@ -46,43 +46,36 @@ func TestAskToString(t *testing.T) {
func TestNewAsk(t *testing.T) {
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
siAsk := &si.AllocationAsk{
- AllocationKey: "ask-1",
- ApplicationID: "app-1",
- MaxAllocations: 1,
- ResourceAsk: res.ToProto(),
+ AllocationKey: "ask-1",
+ ApplicationID: "app-1",
+ ResourceAsk: res.ToProto(),
}
ask := NewAllocationAskFromSI(siAsk)
if ask == nil {
t.Fatal("NewAllocationAskFromSI create failed while it should
not")
}
askStr := ask.String()
- expected := "allocationKey ask-1, applicationID app-1, Resource
map[first:10], PendingRepeats 1"
+ expected := "allocationKey ask-1, applicationID app-1, Resource
map[first:10], Allocated false"
assert.Equal(t, askStr, expected, "Strings should have been equal")
assert.Equal(t, "app-1|ask-1", ask.resKeyWithoutNode)
//nolint:staticcheck
}
-func TestPendingAskRepeat(t *testing.T) {
+func TestAskAllocateDeallocate(t *testing.T) {
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
ask := newAllocationAsk("alloc-1", "app-1", res)
- assert.Equal(t, ask.GetPendingAskRepeat(), int32(1), "pending ask
repeat should be 1")
- if !ask.updatePendingAskRepeat(1) {
- t.Errorf("increase of pending ask with 1 failed, expected
repeat 2, current repeat: %d", ask.GetPendingAskRepeat())
- }
- if !ask.updatePendingAskRepeat(-1) {
- t.Errorf("decrease of pending ask with 1 failed, expected
repeat 1, current repeat: %d", ask.GetPendingAskRepeat())
- }
- if ask.updatePendingAskRepeat(-2) {
- t.Errorf("decrease of pending ask with 2 did not fail, expected
repeat 1, current repeat: %d", ask.GetPendingAskRepeat())
- }
- if !ask.updatePendingAskRepeat(-1) {
- t.Errorf("decrease of pending ask with 1 failed, expected
repeat 0, current repeat: %d", ask.GetPendingAskRepeat())
- }
+ assert.Assert(t, !ask.IsAllocated(), "pending ask should return false
for IsAllocated()")
+ assert.Assert(t, !ask.deallocate(), "attempt to deallocate pending ask
should fail")
+ assert.Assert(t, ask.allocate(), "attempt to allocate pending ask
should not fail")
+ assert.Assert(t, ask.IsAllocated(), "allocated ask should return true
for IsAllocated()")
+ assert.Assert(t, !ask.allocate(), "attempt to allocate previously
allocated ask should fail")
+ assert.Assert(t, ask.deallocate(), "deallocating previously allocated
ask should succeed")
+ assert.Assert(t, !ask.IsAllocated(), "deallocated ask should return
false for IsAllocated()")
}
// the create time should not be manipulated but we need it for reservation
testing
func TestGetCreateTime(t *testing.T) {
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
- ask := newAllocationAskRepeat("alloc-1", "app-1", res, 2)
+ ask := newAllocationAsk("alloc-1", "app-1", res)
created := ask.GetCreateTime()
// move time 10 seconds back
ask.createTime = created.Add(time.Second * -10)
@@ -192,10 +185,9 @@ func TestGetRequiredNode(t *testing.T) {
func TestAllocationLog(t *testing.T) {
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
siAsk := &si.AllocationAsk{
- AllocationKey: "ask-1",
- ApplicationID: "app-1",
- MaxAllocations: 1,
- ResourceAsk: res.ToProto(),
+ AllocationKey: "ask-1",
+ ApplicationID: "app-1",
+ ResourceAsk: res.ToProto(),
}
ask := NewAllocationAskFromSI(siAsk)
@@ -233,10 +225,9 @@ func TestAllocationLog(t *testing.T) {
func TestSendPredicateFailed(t *testing.T) {
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
siAsk := &si.AllocationAsk{
- AllocationKey: "ask-1",
- ApplicationID: "app-1",
- MaxAllocations: 1,
- ResourceAsk: res.ToProto(),
+ AllocationKey: "ask-1",
+ ApplicationID: "app-1",
+ ResourceAsk: res.ToProto(),
}
ask := NewAllocationAskFromSI(siAsk)
eventSystem := mock.NewEventSystemDisabled()
diff --git a/pkg/scheduler/objects/allocation_test.go
b/pkg/scheduler/objects/allocation_test.go
index 4d92ebca..e1c1bfe4 100644
--- a/pkg/scheduler/objects/allocation_test.go
+++ b/pkg/scheduler/objects/allocation_test.go
@@ -53,7 +53,7 @@ func TestNewAlloc(t *testing.T) {
if alloc == nil {
t.Fatal("NewAllocation create failed while it should not")
}
- assert.Equal(t, alloc.GetAllocationID(), "ask-1-0")
+ assert.Equal(t, alloc.GetAllocationID(), "ask-1")
assert.Equal(t, alloc.GetResult(), Allocated, "New alloc should default
to result Allocated")
assert.Assert(t, resources.Equals(alloc.GetAllocatedResource(), res),
"Allocated resource not set correctly")
assert.Assert(t, !alloc.IsPlaceholder(), "ask should not have been a
placeholder")
@@ -62,7 +62,7 @@ func TestNewAlloc(t *testing.T) {
alloc.SetInstanceType(instType1)
assert.Equal(t, alloc.GetInstanceType(), instType1, "Instance type not
set as expected")
allocStr := alloc.String()
- expected := "applicationID=app-1, allocationID=ask-1-0,
allocationKey=ask-1, Node=node-1, result=Allocated"
+ expected := "applicationID=app-1, allocationID=ask-1,
allocationKey=ask-1, Node=node-1, result=Allocated"
assert.Equal(t, allocStr, expected, "Strings should have been equal")
assert.Assert(t, !alloc.IsPlaceholderUsed(), fmt.Sprintf("Alloc should
not be placeholder replacement by default: got %t, expected %t",
alloc.IsPlaceholderUsed(), false))
created := alloc.GetCreateTime()
@@ -131,7 +131,7 @@ func TestSIFromAlloc(t *testing.T) {
assert.NilError(t, err, "Resource creation failed")
expectedSI := &si.Allocation{
AllocationKey: "ask-1",
- AllocationID: "ask-1-0",
+ AllocationID: "ask-1",
NodeID: "node-1",
ApplicationID: "app-1",
ResourcePerAlloc: res.ToProto(),
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index cba48664..3c0e0179 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -576,8 +576,10 @@ func (sa *Application) removeAsksInternal(allocKey string,
detail si.EventRecord
toRelease += releases
}
if ask := sa.requests[allocKey]; ask != nil {
- deltaPendingResource =
resources.MultiplyBy(ask.GetAllocatedResource(),
float64(ask.GetPendingAskRepeat()))
- sa.pending = resources.Sub(sa.pending,
deltaPendingResource)
+ if !ask.IsAllocated() {
+ deltaPendingResource =
ask.GetAllocatedResource()
+ sa.pending = resources.Sub(sa.pending,
deltaPendingResource)
+ }
delete(sa.requests, allocKey)
sa.sortedRequests.remove(ask)
sa.appEvents.sendRemoveAskEvent(ask, detail)
@@ -618,14 +620,14 @@ func (sa *Application) AddAllocationAsk(ask
*AllocationAsk) error {
if ask == nil {
return fmt.Errorf("ask cannot be nil when added to app %s",
sa.ApplicationID)
}
- if ask.GetPendingAskRepeat() == 0 ||
resources.IsZero(ask.GetAllocatedResource()) {
+ if ask.IsAllocated() || resources.IsZero(ask.GetAllocatedResource()) {
return fmt.Errorf("invalid ask added to app %s: %v",
sa.ApplicationID, ask)
}
- delta := resources.Multiply(ask.GetAllocatedResource(),
int64(ask.GetPendingAskRepeat()))
+ delta := ask.GetAllocatedResource().Clone()
var oldAskResource *resources.Resource = nil
- if oldAsk := sa.requests[ask.GetAllocationKey()]; oldAsk != nil {
- oldAskResource =
resources.Multiply(oldAsk.GetAllocatedResource(),
int64(oldAsk.GetPendingAskRepeat()))
+ if oldAsk := sa.requests[ask.GetAllocationKey()]; oldAsk != nil &&
!oldAsk.IsAllocated() {
+ oldAskResource = oldAsk.GetAllocatedResource().Clone()
}
// Check if we need to change state based on the ask added, there are
two cases:
@@ -683,9 +685,9 @@ func (sa *Application) addAllocationAskInternal(ask
*AllocationAsk) {
sa.requests[ask.GetAllocationKey()] = ask
// update app priority
- repeat := ask.GetPendingAskRepeat()
+ allocated := ask.IsAllocated()
priority := ask.GetPriority()
- if repeat > 0 && priority > sa.askMaxPriority {
+ if !allocated && priority > sa.askMaxPriority {
sa.askMaxPriority = priority
sa.queue.UpdateApplicationPriority(sa.ApplicationID,
sa.askMaxPriority)
}
@@ -695,44 +697,60 @@ func (sa *Application) addAllocationAskInternal(ask
*AllocationAsk) {
}
}
-func (sa *Application) UpdateAskRepeat(allocKey string, delta int32)
(*resources.Resource, error) {
+func (sa *Application) AllocateAsk(allocKey string) (*resources.Resource,
error) {
sa.Lock()
defer sa.Unlock()
if ask := sa.requests[allocKey]; ask != nil {
+ return sa.allocateAsk(ask)
+ }
+ return nil, fmt.Errorf("failed to locate ask with key %s", allocKey)
+}
- return sa.updateAskRepeatInternal(ask, delta)
+func (sa *Application) DeallocateAsk(allocKey string) (*resources.Resource,
error) {
+ sa.Lock()
+ defer sa.Unlock()
+ if ask := sa.requests[allocKey]; ask != nil {
+ return sa.deallocateAsk(ask)
}
return nil, fmt.Errorf("failed to locate ask with key %s", allocKey)
}
-func (sa *Application) updateAskRepeatInternal(ask *AllocationAsk, delta
int32) (*resources.Resource, error) {
- // updating with delta does error checking internally
- if !ask.updatePendingAskRepeat(delta) {
- return nil, fmt.Errorf("ask repaeat not updated resulting
repeat less than zero for ask %s on app %s", ask.GetAllocationKey(),
sa.ApplicationID)
+func (sa *Application) allocateAsk(ask *AllocationAsk) (*resources.Resource,
error) {
+ if !ask.allocate() {
+ return nil, fmt.Errorf("unable to allocate previously allocated
ask %s on app %s", ask.GetAllocationKey(), sa.ApplicationID)
+ }
+
+ if ask.GetPriority() >= sa.askMaxPriority {
+ // recalculate downward
+ sa.updateAskMaxPriority()
+ }
+
+ delta := resources.Multiply(ask.GetAllocatedResource(), -1)
+ sa.pending = resources.Add(sa.pending, delta)
+ // update the pending of the queue with the same delta
+ sa.queue.incPendingResource(delta)
+
+ return delta, nil
+}
+
+func (sa *Application) deallocateAsk(ask *AllocationAsk) (*resources.Resource,
error) {
+ if !ask.deallocate() {
+ return nil, fmt.Errorf("unable to deallocate pending ask %s on
app %s", ask.GetAllocationKey(), sa.ApplicationID)
}
askPriority := ask.GetPriority()
- if ask.GetPendingAskRepeat() == 0 {
- // ask removed
- if askPriority >= sa.askMaxPriority {
- // recalculate downward
- sa.updateAskMaxPriority()
- }
- } else {
- // ask added
- if askPriority > sa.askMaxPriority {
- // increase app priority
- sa.askMaxPriority = askPriority
- sa.queue.UpdateApplicationPriority(sa.ApplicationID,
askPriority)
- }
+ if askPriority > sa.askMaxPriority {
+ // increase app priority
+ sa.askMaxPriority = askPriority
+ sa.queue.UpdateApplicationPriority(sa.ApplicationID,
askPriority)
}
- deltaPendingResource := resources.Multiply(ask.GetAllocatedResource(),
int64(delta))
- sa.pending = resources.Add(sa.pending, deltaPendingResource)
+ delta := ask.GetAllocatedResource()
+ sa.pending = resources.Add(sa.pending, delta)
// update the pending of the queue with the same delta
- sa.queue.incPendingResource(deltaPendingResource)
+ sa.queue.incPendingResource(delta)
- return deltaPendingResource, nil
+ return delta, nil
}
// HasReserved returns true if the application has any reservations.
@@ -789,7 +807,11 @@ func (sa *Application) reserveInternal(node *Node, ask
*AllocationAsk) error {
return fmt.Errorf("reservation creation failed ask %s not found
on appID %s", allocKey, sa.ApplicationID)
}
if !sa.canAskReserve(ask) {
- return fmt.Errorf("reservation of ask exceeds pending repeat,
pending ask repeat %d", ask.GetPendingAskRepeat())
+ if ask.IsAllocated() {
+ return fmt.Errorf("ask is already allocated")
+ } else {
+ return fmt.Errorf("ask is already reserved")
+ }
}
// check if we can reserve the node before reserving on the app
if err := node.Reserve(sa, ask); err != nil {
@@ -869,20 +891,21 @@ func (sa *Application) GetAskReservations(allocKey
string) []string {
return reservationKeys
}
-// Check if the allocation has already been reserved. An ask can reserve
multiple nodes if the request has a repeat set
-// larger than 1. It can never reserve more than the repeat number of nodes.
+// Check if the allocation has already been reserved. An ask can never reserve
more than one node.
// No locking must be called while holding the lock
func (sa *Application) canAskReserve(ask *AllocationAsk) bool {
allocKey := ask.GetAllocationKey()
- pending := int(ask.GetPendingAskRepeat())
- resNumber := sa.GetAskReservations(allocKey)
- if len(resNumber) >= pending {
- log.Log(log.SchedApplication).Debug("reservation exceeds
repeats",
- zap.String("askKey", allocKey),
- zap.Int("askPending", pending),
- zap.Int("askReserved", len(resNumber)))
+ if ask.IsAllocated() {
+ log.Log(log.SchedApplication).Debug("ask already allocated, no
reservation allowed",
+ zap.String("askKey", allocKey))
+ return false
}
- return pending > len(resNumber)
+ if len(sa.GetAskReservations(allocKey)) > 0 {
+ log.Log(log.SchedApplication).Debug("reservation already
exists",
+ zap.String("askKey", allocKey))
+ return false
+ }
+ return true
}
func (sa *Application) getOutstandingRequests(headRoom *resources.Resource,
userHeadRoom *resources.Resource, total *[]*AllocationAsk) {
@@ -892,7 +915,7 @@ func (sa *Application) getOutstandingRequests(headRoom
*resources.Resource, user
return
}
for _, request := range sa.sortedRequests {
- if request.GetPendingAskRepeat() == 0 ||
!request.IsSchedulingAttempted() {
+ if request.IsAllocated() || !request.IsSchedulingAttempted() {
continue
}
@@ -933,7 +956,7 @@ func (sa *Application) tryAllocate(headRoom
*resources.Resource, allowPreemption
userHeadroom := ugm.GetUserManager().Headroom(sa.queuePath,
sa.ApplicationID, sa.user)
// get all the requests from the app sorted in order
for _, request := range sa.sortedRequests {
- if request.GetPendingAskRepeat() == 0 {
+ if request.IsAllocated() {
continue
}
// check if there is a replacement possible
@@ -1097,7 +1120,7 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
for _, request := range sa.sortedRequests {
// skip placeholders they follow standard allocation
// this should also be part of a task group just make sure it is
- if request.IsPlaceholder() || request.GetTaskGroup() == "" ||
request.GetPendingAskRepeat() == 0 {
+ if request.IsPlaceholder() || request.GetTaskGroup() == "" ||
request.IsAllocated() {
continue
}
// walk over the placeholders, allow for processing all as we
can have multiple task groups
@@ -1144,9 +1167,9 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
alloc.SetResult(Replaced)
// mark placeholder as released
ph.SetReleased(true)
- _, err := sa.updateAskRepeatInternal(request,
-1)
+ _, err := sa.allocateAsk(request)
if err != nil {
- log.Log(log.SchedApplication).Warn("ask
repeat update failed unexpectedly",
+
log.Log(log.SchedApplication).Warn("allocation of ask failed unexpectedly",
zap.Error(err))
}
return alloc
@@ -1195,9 +1218,9 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
zap.Stringer("placeholder", phFit))
return false
}
- _, err := sa.updateAskRepeatInternal(reqFit, -1)
+ _, err := sa.allocateAsk(reqFit)
if err != nil {
- log.Log(log.SchedApplication).Warn("ask repeat
update failed unexpectedly",
+ log.Log(log.SchedApplication).Warn("allocation
of ask failed unexpectedly",
zap.Error(err))
}
@@ -1226,7 +1249,7 @@ func (sa *Application) tryReservedAllocate(headRoom
*resources.Resource, nodeIte
for _, reserve := range sa.reservations {
ask := sa.requests[reserve.askKey]
// sanity check and cleanup if needed
- if ask == nil || ask.GetPendingAskRepeat() == 0 {
+ if ask == nil || ask.IsAllocated() {
var unreserveAsk *AllocationAsk
// if the ask was not found we need to construct one to
unreserve
if ask == nil {
@@ -1372,7 +1395,7 @@ func (sa *Application) tryNodes(ask *AllocationAsk,
iterator NodeIterator) *Allo
// check if the ask is reserved or not
allocKey := ask.GetAllocationKey()
reservedAsks := sa.GetAskReservations(allocKey)
- allowReserve := len(reservedAsks) < int(ask.GetPendingAskRepeat())
+ allowReserve := !ask.IsAllocated() && len(reservedAsks) == 0
var allocResult *Allocation
iterator.ForEachNode(func(node *Node) bool {
// skip the node if the node is not valid for the ask
@@ -1450,8 +1473,7 @@ func (sa *Application) tryNodes(ask *AllocationAsk,
iterator NodeIterator) *Allo
zap.String("appID", sa.ApplicationID),
zap.String("nodeID", nodeToReserve.NodeID),
zap.String("allocationKey", allocKey),
- zap.Int("reservations", len(reservedAsks)),
- zap.Int32("pendingRepeats", ask.GetPendingAskRepeat()))
+ zap.Int("reservations", len(reservedAsks)))
// skip the node if conditions can not be satisfied
if !nodeToReserve.preReserveConditions(ask) {
return nil
@@ -1487,10 +1509,10 @@ func (sa *Application) tryNode(node *Node, ask
*AllocationAsk) *Allocation {
node.RemoveAllocation(alloc.GetAllocationID())
return nil
}
- // mark this ask as allocated by lowering the repeat
- _, err := sa.updateAskRepeatInternal(ask, -1)
+ // mark this ask as allocated
+ _, err := sa.allocateAsk(ask)
if err != nil {
- log.Log(log.SchedApplication).Warn("ask repeat update
failed unexpectedly",
+ log.Log(log.SchedApplication).Warn("allocation of ask
failed unexpectedly",
zap.Error(err))
}
// all is OK, last update for the app
@@ -1816,7 +1838,7 @@ func (sa *Application)
removeAllocationInternal(allocationID string, releaseType
func (sa *Application) updateAskMaxPriority() {
value := configs.MinPriority
for _, v := range sa.requests {
- if v.GetPendingAskRepeat() == 0 {
+ if v.IsAllocated() {
continue
}
value = max(value, v.GetPriority())
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 75e6da5a..83984d34 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -272,11 +272,14 @@ func TestAppAllocReservation(t *testing.T) {
// reserve 1 allocate ask
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
- ask := newAllocationAskRepeat(aKey, appID1, res, 2)
+ ask := newAllocationAsk(aKey, appID1, res)
+ ask2 := newAllocationAsk(aKey2, appID1, res)
node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 10})
// reserve that works
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
+ err = app.AddAllocationAsk(ask2)
+ assert.NilError(t, err, "ask2 should have been added to app")
err = app.Reserve(node1, ask)
assert.NilError(t, err, "reservation should not have failed")
if len(app.GetAskReservations("")) != 0 {
@@ -290,17 +293,17 @@ func TestAppAllocReservation(t *testing.T) {
nodeID2 := "node-2"
node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 10})
- err = app.Reserve(node2, ask)
+ err = app.Reserve(node2, ask2)
assert.NilError(t, err, "reservation should not have failed: error %v",
err)
- nodeKey2 := nodeID2 + "|" + aKey
- askReserved = app.GetAskReservations(aKey)
- if len(askReserved) != 2 && (askReserved[0] != nodeKey2 ||
askReserved[1] != nodeKey2) {
+ nodeKey2 := nodeID2 + "|" + aKey2
+ askReserved = app.GetAskReservations(aKey2)
+ if len(askReserved) != 1 && askReserved[0] != nodeKey2 {
t.Errorf("app should have reservations for %s on %s and has
not", aKey, nodeID2)
}
- // check exceeding ask repeat: nothing should change
+ // check duplicate reserve: nothing should change
if app.canAskReserve(ask) {
- t.Error("ask has maximum repeats reserved, reserve check should
have failed")
+ t.Error("ask has already reserved, reserve check should have
failed")
}
node3 := newNode("node-3", map[string]resources.Quantity{"first": 10})
err = app.Reserve(node3, ask)
@@ -308,9 +311,12 @@ func TestAppAllocReservation(t *testing.T) {
t.Errorf("reservation should have failed")
}
askReserved = app.GetAskReservations(aKey)
- if len(askReserved) != 2 && (askReserved[0] != nodeKey1 ||
askReserved[1] != nodeKey1) &&
- (askReserved[0] != nodeKey2 || askReserved[1] != nodeKey2) {
- t.Errorf("app should have reservations for node %s and %s and
has not: %v", nodeID1, nodeID2, askReserved)
+ if len(askReserved) != 1 && askReserved[0] != nodeKey1 {
+ t.Errorf("app should have reservations for node %s and has not:
%v", nodeID1, askReserved)
+ }
+ askReserved = app.GetAskReservations(aKey2)
+ if len(askReserved) != 1 && askReserved[0] != nodeKey2 {
+ t.Errorf("app should have reservations for node %s and has not:
%v", nodeID2, askReserved)
}
// clean up all asks and reservations
reservedAsks := app.RemoveAllocationAsk("")
@@ -319,8 +325,7 @@ func TestAppAllocReservation(t *testing.T) {
}
}
-// test update allocation repeat
-func TestUpdateRepeat(t *testing.T) {
+func TestAllocateDeallocate(t *testing.T) {
app := newApplication(appID1, "default", "root.unknown")
if app == nil || app.ApplicationID != appID1 {
t.Fatalf("app create failed which should not have %v", app)
@@ -331,38 +336,39 @@ func TestUpdateRepeat(t *testing.T) {
// failure cases
var delta *resources.Resource
- delta, err = app.UpdateAskRepeat("", 0)
- if err == nil || delta != nil {
- t.Error("empty ask key should not have been found")
+ if delta, err = app.AllocateAsk(""); err == nil || delta != nil {
+ t.Error("empty ask key should not have been found by
AllocateAsk()")
+ }
+ if delta, err = app.AllocateAsk("unknown"); err == nil || delta != nil {
+ t.Error("unknown ask key should not have been found by
AllocateAsk()")
}
- delta, err = app.UpdateAskRepeat("unknown", 0)
- if err == nil || delta != nil {
- t.Error("unknown ask key should not have been found")
+ if delta, err = app.DeallocateAsk(""); err == nil || delta != nil {
+ t.Error("empty ask key should not have been found by
DeallocateAsk()")
+ }
+ if delta, err = app.DeallocateAsk("unknown"); err == nil || delta !=
nil {
+ t.Error("unknown ask key should not have been found by
DeallocateAsk()")
}
// working cases
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
- ask := newAllocationAskRepeat(aKey, appID1, res, 1)
+ ask := newAllocationAsk(aKey, appID1, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
- delta, err = app.UpdateAskRepeat(aKey, 0)
- if err != nil || !resources.IsZero(delta) {
- t.Errorf("0 increase should return zero delta and did not: %v,
err %v", delta, err)
+ // allocate
+ if delta, err := app.AllocateAsk(aKey); err != nil ||
!resources.Equals(resources.Multiply(res, -1), delta) {
+ t.Errorf("AllocateAsk() did not return correct delta, err %v,
expected %v got %v", err, resources.Multiply(res, -1), delta)
}
- delta, err = app.UpdateAskRepeat(aKey, 1)
- if err != nil || !resources.Equals(res, delta) {
- t.Errorf("increase did not return correct delta, err %v,
expected %v got %v", err, res, delta)
+ // allocate again should fail
+ if delta, err := app.AllocateAsk(aKey); err == nil || delta != nil {
+ t.Error("attempt to call Allocate() twice should have failed")
}
-
- // decrease to zero
- delta, err = app.UpdateAskRepeat(aKey, -2)
- if err != nil || !resources.Equals(resources.Multiply(res, -2), delta) {
- t.Errorf("decrease did not return correct delta, err %v,
expected %v got %v", err, resources.Multiply(res, -2), delta)
+ // deallocate
+ if delta, err := app.DeallocateAsk(aKey); err != nil ||
!resources.Equals(res, delta) {
+ t.Errorf("DeallocateAsk() did not return correct delta, err %v,
expected %v got %v", err, res, delta)
}
- // decrease to below zero
- delta, err = app.UpdateAskRepeat(aKey, -1)
- if err == nil || delta != nil {
- t.Errorf("decrease did not return correct delta, err %v, delta
%v", err, delta)
+ // deallocate again should fail
+ if delta, err := app.DeallocateAsk(aKey); err == nil || delta != nil {
+ t.Error("attempt to call Deallocate() twice should have failed")
}
}
@@ -394,16 +400,10 @@ func TestAddAllocAsk(t *testing.T) {
if err == nil {
t.Errorf("zero resource ask should not have been added to app")
}
- res =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
- ask = newAllocationAskRepeat(aKey, appID1, res, 0)
- err = app.AddAllocationAsk(ask)
- if err == nil {
- t.Errorf("ask with zero repeat should not have been added to
app")
- }
// add alloc ask
res =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
- ask = newAllocationAskRepeat(aKey, appID1, res, 1)
+ ask = newAllocationAsk(aKey, appID1, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been updated on app")
assert.Assert(t, app.IsAccepted(), "Application should be in accepted
state")
@@ -433,36 +433,21 @@ func TestAddAllocAsk(t *testing.T) {
assert.Equal(t, si.EventRecord_APP_REQUEST, record.EventChangeDetail,
"incorrect change detail, expected app request")
eventSystem.Stop()
- ask = newAllocationAskRepeat(aKey, appID1, res, 2)
+ // change resource
+ ask = newAllocationAsk(aKey, appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been updated on app")
pending = app.GetPendingResource()
- if !resources.Equals(resources.Multiply(res, 2), pending) {
+ if !resources.Equals(resources.Multiply(res, 2),
app.GetPendingResource()) {
t.Errorf("pending resource not updated correctly, expected %v
but was: %v", resources.Multiply(res, 2), pending)
}
- // change both resource and count
- ask = newAllocationAskRepeat(aKey, appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3}), 5)
- err = app.AddAllocationAsk(ask)
- assert.NilError(t, err, "ask should have been updated on app")
- pending = app.GetPendingResource()
- if !resources.Equals(resources.Multiply(res, 3),
app.GetPendingResource()) {
- t.Errorf("pending resource not updated correctly, expected %v
but was: %v", resources.Multiply(res, 3), pending)
- }
-
- // test a decrease of repeat and back to start
- ask = newAllocationAskRepeat(aKey, appID1, res, 1)
- err = app.AddAllocationAsk(ask)
- assert.NilError(t, err, "ask should have been updated on app")
- if !resources.Equals(res, app.GetPendingResource()) {
- t.Errorf("pending resource not updated correctly, expected %v
but was: %v", res, app.GetPendingResource())
- }
// after all this is must still be in an accepted state
assert.Assert(t, app.IsAccepted(), "Application should have stayed in
accepted state")
// test PlaceholderData
tg1 := "tg-1"
- ask = newAllocationAskTG(aKey, appID1, tg1, res, 1)
+ ask = newAllocationAskTG(aKey, appID1, tg1, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been updated on app")
app.SetTimedOutPlaceholder(tg1, 1)
@@ -477,7 +462,7 @@ func TestAddAllocAsk(t *testing.T) {
assert.Equal(t, app.placeholderData[tg1].TimedOut, int64(1))
assert.DeepEqual(t, app.placeholderData[tg1].MinResource, res)
- ask = newAllocationAskTG(aKey, appID1, tg1, res, 1)
+ ask = newAllocationAskTG(aKey, appID1, tg1, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been updated on app")
assert.Equal(t, len(app.placeholderData), 1)
@@ -488,7 +473,7 @@ func TestAddAllocAsk(t *testing.T) {
assert.DeepEqual(t, app.placeholderData[tg1].MinResource, res)
tg2 := "tg-2"
- ask = newAllocationAskTG(aKey, appID1, tg2, res, 1)
+ ask = newAllocationAskTG(aKey, appID1, tg2, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been updated on app")
assert.Equal(t, len(app.placeholderData), 2)
@@ -511,7 +496,7 @@ func TestAllocAskStateChange(t *testing.T) {
app.queue = queue
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
- ask := newAllocationAskRepeat(aKey, appID1, res, 1)
+ ask := newAllocationAsk(aKey, appID1, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
assert.Assert(t, app.IsAccepted(), "Application should be in accepted
state")
@@ -553,20 +538,20 @@ func TestRecoverAllocAsk(t *testing.T) {
assert.Equal(t, len(app.requests), 0, "nil ask should not be added")
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
- ask := newAllocationAskRepeat(aKey, appID1, res, 1)
+ ask := newAllocationAsk(aKey, appID1, res)
app.RecoverAllocationAsk(ask)
assert.Equal(t, len(app.requests), 1, "ask should have been added")
assert.Assert(t, app.IsAccepted(), "Application should be in accepted
state")
assertUserGroupResource(t, getTestUserGroup(), nil)
- ask = newAllocationAskRepeat("ask-2", appID1, res, 1)
+ ask = newAllocationAsk("ask-2", appID1, res)
app.RecoverAllocationAsk(ask)
assert.Equal(t, len(app.requests), 2, "ask should have been added,
total should be 2")
assert.Assert(t, app.IsAccepted(), "Application should have stayed in
accepted state")
assertUserGroupResource(t, getTestUserGroup(), nil)
assert.Equal(t, 0, len(app.placeholderData))
- ask = newAllocationAskTG("ask-3", appID1, "testGroup", res, 1)
+ ask = newAllocationAskTG("ask-3", appID1, "testGroup", res)
app.RecoverAllocationAsk(ask)
phData := app.placeholderData
assert.Equal(t, 1, len(phData))
@@ -677,16 +662,16 @@ func TestRemoveAllocAsk(t *testing.T) {
// setup the allocs
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
- ask := newAllocationAskRepeat(aKey, appID1, res, 2)
+ ask := newAllocationAsk(aKey, appID1, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask 1 should have been added to app")
- ask = newAllocationAskRepeat("alloc-2", appID1, res, 2)
+ ask = newAllocationAsk(aKey2, appID1, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask 2 should have been added to app")
if len(app.requests) != 2 {
t.Fatalf("missing asks from app expected 2 got %d",
len(app.requests))
}
- expected := resources.Multiply(res, 4)
+ expected := resources.Multiply(res, 2)
if !resources.Equals(expected, app.GetPendingResource()) {
t.Errorf("pending resource not updated correctly, expected %v
but was: %v", expected, app.GetPendingResource())
}
@@ -699,8 +684,7 @@ func TestRemoveAllocAsk(t *testing.T) {
delta := app.GetPendingResource().Clone()
reservedAsks = app.RemoveAllocationAsk(aKey)
delta.SubFrom(app.GetPendingResource())
- expected = resources.Multiply(res, 2)
- if !resources.Equals(delta, expected) || reservedAsks != 0 {
+ if !resources.Equals(delta, res) || reservedAsks != 0 {
t.Errorf("ask should have been removed from app, err %v,
expected delta %v but was: %v, (reserved released = %d)", err, expected, delta,
reservedAsks)
}
reservedAsks = app.RemoveAllocationAsk("")
@@ -723,12 +707,12 @@ func TestRemoveAllocAskWithPlaceholders(t *testing.T) {
app.queue = queue
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
- ask := newAllocationAskRepeat(aKey, appID1, res, 2)
+ ask := newAllocationAsk(aKey, appID1, res)
ask.placeholder = true
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask 1 should have been added to app")
- ask = newAllocationAskRepeat("alloc-2", appID1, res, 2)
+ ask = newAllocationAsk("alloc-2", appID1, res)
ask.placeholder = true
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask 2 should have been added to app")
@@ -749,14 +733,14 @@ func
TestRemovePlaceholderAllocationWithNoRealAllocation(t *testing.T) {
t.Fatalf("app create failed which should not have %v", app)
}
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
- ask := newAllocationAskRepeat(aKey, appID1, res, 2)
+ ask := newAllocationAsk(aKey, appID1, res)
ask.placeholder = true
allocInfo := NewAllocation(nodeID1, ask)
app.AddAllocation(allocInfo)
err := app.handleApplicationEventWithLocking(RunApplication)
assert.NilError(t, err, "no error expected new to accepted")
- app.RemoveAllocation("alloc-1-0",
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
+ app.RemoveAllocation("alloc-1",
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
assert.Equal(t, app.stateMachine.Current(), Completing.String())
assertUserGroupResource(t, getTestUserGroup(), nil)
}
@@ -813,7 +797,7 @@ func TestStateChangeOnUpdate(t *testing.T) {
assert.Assert(t, app.IsRunning(), "Application should have stayed same,
changed unexpectedly: %s", app.CurrentState())
// remove the allocation, ask has been removed so nothing left
- app.RemoveAllocation(askID+"-0",
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
+ app.RemoveAllocation(askID, si.TerminationType_UNKNOWN_TERMINATION_TYPE)
assert.Assert(t, app.IsCompleting(), "Application did not change as
expected: %s", app.CurrentState())
assertUserGroupResource(t, getTestUserGroup(), nil)
@@ -839,7 +823,7 @@ func TestStateChangeOnPlaceholderAdd(t *testing.T) {
assert.Assert(t, app.IsNew(), "New application did not return new
state: %s", app.CurrentState())
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
askID := "ask-1"
- ask := newAllocationAskTG(askID, appID1, "TG1", res, 1)
+ ask := newAllocationAskTG(askID, appID1, "TG1", res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
// app with ask, even for placeholder, should be accepted
@@ -873,7 +857,7 @@ func TestStateChangeOnPlaceholderAdd(t *testing.T) {
assertUserGroupResource(t, getTestUserGroup(), res)
// first we have to remove the allocation itself
- alloc := app.RemoveAllocation(askID+"-0",
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
+ alloc := app.RemoveAllocation(askID,
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
assert.Assert(t, alloc != nil, "Nil allocation was returned")
assert.Assert(t, app.IsAccepted(), "Application should have stayed in
Accepted, changed unexpectedly: %s", app.CurrentState())
// removing the ask should move the application into the waiting state,
because the allocation is only a placeholder allocation
@@ -1416,7 +1400,7 @@ func runTimeoutPlaceholderTest(t *testing.T,
expectedState string, gangSchedulin
assert.NilError(t, err, "Unexpected error when creating resource from
map")
// add the placeholder ask to the app
tg1 := "tg-1"
- phAsk := newAllocationAskTG("ask-1", appID1, tg1, res, 1)
+ phAsk := newAllocationAskTG("ask-1", appID1, tg1, res)
err = app.AddAllocationAsk(phAsk)
assert.NilError(t, err, "Application ask should have been added")
assert.Assert(t, app.IsAccepted(), "Application should be in accepted
state")
@@ -1749,9 +1733,9 @@ func TestCanReplace(t *testing.T) {
want bool
}{
{"nil", nil, false},
- {"placeholder", newAllocationAskTG(aKey, appID1, tg1, res, 1),
false},
+ {"placeholder", newAllocationAskTG(aKey, appID1, tg1, res),
false},
{"no TG", newAllocationAsk(aKey, appID1, res), false},
- {"no placeholder data", newAllocationAskAll(aKey, appID1, tg1,
res, 1, false, 0), false},
+ {"no placeholder data", newAllocationAskAll(aKey, appID1, tg1,
res, false, 0), false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -1760,14 +1744,14 @@ func TestCanReplace(t *testing.T) {
}
// add the placeholder data
// available tg has one replacement open
- app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg1,
res, 1))
+ app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg1,
res))
// unavailable tg has NO replacement open (replaced)
tg2 := "unavailable"
- app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg2,
res, 1))
+ app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg2,
res))
app.placeholderData[tg2].Replaced++
// unavailable tg has NO replacement open (timedout)
tg3 := "timedout"
- app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg3,
res, 1))
+ app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg3,
res))
app.placeholderData[tg3].TimedOut++
tests = []struct {
name string
@@ -1775,10 +1759,10 @@ func TestCanReplace(t *testing.T) {
want bool
}{
{"no TG", newAllocationAsk(aKey, appID1, res), false},
- {"TG mismatch", newAllocationAskAll(aKey, appID1, "unknown",
res, 1, false, 0), false},
- {"TG placeholder used", newAllocationAskAll(aKey, appID1, tg2,
res, 1, false, 0), false},
- {"TG placeholder timed out", newAllocationAskAll(aKey, appID1,
tg3, res, 1, false, 0), false},
- {"TG placeholder available", newAllocationAskAll(aKey, appID1,
tg1, res, 1, false, 0), true},
+ {"TG mismatch", newAllocationAskAll(aKey, appID1, "unknown",
res, false, 0), false},
+ {"TG placeholder used", newAllocationAskAll(aKey, appID1, tg2,
res, false, 0), false},
+ {"TG placeholder timed out", newAllocationAskAll(aKey, appID1,
tg3, res, false, 0), false},
+ {"TG placeholder available", newAllocationAskAll(aKey, appID1,
tg1, res, false, 0), true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -2009,25 +1993,25 @@ func TestMaxAskPriority(t *testing.T) {
assert.Equal(t, app.GetAskMaxPriority(), int32(15), "wrong priority
after re-adding asks")
- // update repeat to zero for p=15
- _, err = app.UpdateAskRepeat(ask3.GetAllocationKey(), -1)
+ // update to allocated for p=15
+ _, err = app.AllocateAsk(ask3.GetAllocationKey())
assert.NilError(t, err, "ask should have been updated")
- assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority
after updating p=15 repeat to 0")
+ assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority
after updating p=15 to allocated")
- // update repeat to zero for p=5
- _, err = app.UpdateAskRepeat(ask2.GetAllocationKey(), -1)
+ // update to allocated for p=5
+ _, err = app.AllocateAsk(ask2.GetAllocationKey())
assert.NilError(t, err, "ask should have been updated")
- assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority
after updating p=5 repeat to 0")
+ assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority
after updating p=5 to allocated")
- // update repeat to 1 for p=5
- _, err = app.UpdateAskRepeat(ask2.GetAllocationKey(), 1)
+ // update to unallocated for p=5
+ _, err = app.DeallocateAsk(ask2.GetAllocationKey())
assert.NilError(t, err, "ask should have been updated")
- assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority
after updating p=5 repeat to 1")
+ assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority
after updating p=5 to unallocated")
- // update repeat to 1 for p=15
- _, err = app.UpdateAskRepeat(ask3.GetAllocationKey(), 1)
+ // update to unallocated for p=15
+ _, err = app.DeallocateAsk(ask3.GetAllocationKey())
assert.NilError(t, err, "ask should have been updated")
- assert.Equal(t, app.GetAskMaxPriority(), int32(15), "wrong priority
after updating p=15 repeat to 1")
+ assert.Equal(t, app.GetAskMaxPriority(), int32(15), "wrong priority
after updating p=15 to unallocated")
}
func TestAskEvents(t *testing.T) {
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index 0cd30393..6ced5642 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -544,10 +544,10 @@ func TestSortApplications(t *testing.T) {
if len(sortedApp) != 1 || sortedApp[0].ApplicationID != appID1 {
t.Errorf("sorted application is missing expected app: %v",
sortedApp)
}
- // set 0 repeat
- _, err = app.UpdateAskRepeat("alloc-1", -1)
+ // set allocated
+ _, err = app.AllocateAsk("alloc-1")
if err != nil || len(leaf.sortApplications(false)) != 0 {
- t.Errorf("app with ask but 0 pending resources should not be in
sorted apps: %v (err = %v)", app, err)
+ t.Errorf("app with ask but no pending resources should not be
in sorted apps: %v (err = %v)", app, err)
}
}
@@ -1666,12 +1666,9 @@ func TestFindEligiblePreemptionVictims(t *testing.T) {
parentMax := map[string]string{siCommon.Memory: "200"}
parentGuar := map[string]string{siCommon.Memory: "100"}
ask := createAllocationAsk("ask1", appID1, true, true, 0, res)
- ask.pendingAskRepeat = 1
ask2 := createAllocationAsk("ask2", appID2, true, true, -1000, res)
- ask2.pendingAskRepeat = 1
alloc2 := NewAllocation(nodeID1, ask2)
ask3 := createAllocationAsk("ask3", appID2, true, true, -1000, res)
- ask3.pendingAskRepeat = 1
alloc3 := NewAllocation(nodeID1, ask3)
root, err := createRootQueue(map[string]string{siCommon.Memory: "1000"})
assert.NilError(t, err, "failed to create queue")
@@ -2522,7 +2519,7 @@ func TestQueueRunningAppsForSingleAllocationApp(t
*testing.T) {
assert.Equal(t, app.CurrentState(), Running.String(), "app state should
be running")
assert.Equal(t, leaf.runningApps, uint64(1), "leaf should have 1 app
running")
- _, err = app.updateAskRepeatInternal(ask, -1)
+ _, err = app.allocateAsk(ask)
assert.NilError(t, err, "failed to decrease pending resources")
app.RemoveAllocation(alloc.GetAllocationID(),
si.TerminationType_STOPPED_BY_RM)
diff --git a/pkg/scheduler/objects/required_node_preemptor_test.go
b/pkg/scheduler/objects/required_node_preemptor_test.go
index 9b852831..32bcdb2d 100644
--- a/pkg/scheduler/objects/required_node_preemptor_test.go
+++ b/pkg/scheduler/objects/required_node_preemptor_test.go
@@ -124,7 +124,6 @@ func TestSortAllocations(t *testing.T) {
requiredAsk := createAllocationAsk("ask", "app1", true, true, 20,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
- requiredAsk.pendingAskRepeat = 5
p := NewRequiredNodePreemptor(node, requiredAsk)
prepareAllocationAsks(node)
diff --git a/pkg/scheduler/objects/utilities_test.go
b/pkg/scheduler/objects/utilities_test.go
index 577e26ff..3c325046 100644
--- a/pkg/scheduler/objects/utilities_test.go
+++ b/pkg/scheduler/objects/utilities_test.go
@@ -40,6 +40,7 @@ const (
appID1 = "app-1"
appID2 = "app-2"
aKey = "alloc-1"
+ aKey2 = "alloc-2"
aAllocationID = "alloc-allocationid-1"
nodeID1 = "node-1"
nodeID2 = "node-2"
@@ -225,31 +226,26 @@ func newPlaceholderAlloc(appID, nodeID string, res
*resources.Resource) *Allocat
}
func newAllocationAsk(allocKey, appID string, res *resources.Resource)
*AllocationAsk {
- return newAllocationAskAll(allocKey, appID, "", res, 1, false, 0)
+ return newAllocationAskAll(allocKey, appID, "", res, false, 0)
}
func newAllocationAskPriority(allocKey, appID string, res *resources.Resource,
priority int32) *AllocationAsk {
- return newAllocationAskAll(allocKey, appID, "", res, 1, false, priority)
+ return newAllocationAskAll(allocKey, appID, "", res, false, priority)
}
-func newAllocationAskRepeat(allocKey, appID string, res *resources.Resource,
repeat int) *AllocationAsk {
- return newAllocationAskAll(allocKey, appID, "", res, repeat, false, 0)
+func newAllocationAskTG(allocKey, appID, taskGroup string, res
*resources.Resource) *AllocationAsk {
+ return newAllocationAskAll(allocKey, appID, taskGroup, res, taskGroup
!= "", 0)
}
-func newAllocationAskTG(allocKey, appID, taskGroup string, res
*resources.Resource, repeat int) *AllocationAsk {
- return newAllocationAskAll(allocKey, appID, taskGroup, res, repeat,
taskGroup != "", 0)
-}
-
-func newAllocationAskAll(allocKey, appID, taskGroup string, res
*resources.Resource, repeat int, placeholder bool, priority int32)
*AllocationAsk {
+func newAllocationAskAll(allocKey, appID, taskGroup string, res
*resources.Resource, placeholder bool, priority int32) *AllocationAsk {
ask := &si.AllocationAsk{
- AllocationKey: allocKey,
- ApplicationID: appID,
- PartitionName: "default",
- ResourceAsk: res.ToProto(),
- MaxAllocations: int32(repeat),
- TaskGroupName: taskGroup,
- Placeholder: placeholder,
- Priority: priority,
+ AllocationKey: allocKey,
+ ApplicationID: appID,
+ PartitionName: "default",
+ ResourceAsk: res.ToProto(),
+ TaskGroupName: taskGroup,
+ Placeholder: placeholder,
+ Priority: priority,
}
return NewAllocationAskFromSI(ask)
}
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index a72dc75c..3ac6a872 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -756,8 +756,8 @@ func (pc *PartitionContext) removeNodeAllocations(node
*objects.Node) ([]*object
// unlink the placeholder and allocation
release.ClearReleases()
alloc.ClearReleases()
- // update the repeat on the real alloc to get it
re-scheduled
- _, err :=
app.UpdateAskRepeat(askAlloc.GetAsk().GetAllocationKey(), 1)
+ // mark ask as unallocated to get it re-scheduled
+ _, err :=
app.DeallocateAsk(askAlloc.GetAsk().GetAllocationKey())
if err == nil {
log.Log(log.SchedPartition).Info("inflight
placeholder replacement reversed due to node removal",
zap.String("appID",
askAlloc.GetApplicationID()),
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index e8f0e001..00a980a4 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -21,7 +21,6 @@ package scheduler
import (
"fmt"
"strconv"
- "strings"
"testing"
"time"
@@ -257,7 +256,7 @@ func TestAddNodeWithAllocations(t *testing.T) {
// fail with a broken alloc
ask = newAllocationAsk("alloc-1-allocationid", appID1, appRes)
alloc = objects.NewAllocation(nodeID1, ask)
- assert.Equal(t, alloc.GetAllocationID(), "alloc-1-allocationid-0")
+ assert.Equal(t, alloc.GetAllocationID(), "alloc-1-allocationid")
// reset allocationID to empty
alloc.SetAllocationID("")
assert.Equal(t, alloc.GetAllocationID(), "")
@@ -376,10 +375,10 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
assert.Equal(t, 1, partition.getPhAllocationCount(), "number of active
placeholders")
// fake an ask that is used
- ask = newAllocationAskAll(allocID, appID1, taskGroup, appRes, 1, 1,
false)
+ ask = newAllocationAskAll(allocID, appID1, taskGroup, appRes, 1, false)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should be added to app")
- _, err = app.UpdateAskRepeat(allocID, -1)
+ _, err = app.AllocateAsk(allocID)
assert.NilError(t, err, "ask should have been updated without error")
assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app
should not have pending resources")
assertLimits(t, getTestUserGroup(), appRes)
@@ -470,7 +469,7 @@ func TestPlaceholderDataWithPlaceholderPreemption(t
*testing.T) {
newRes.MultiplyTo(4)
phRes.MultiplyTo(7)
- ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 0, 1,
false)
+ ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false)
alloc := objects.NewAllocation(nodeID1, ask)
allocs := []*objects.Allocation{alloc}
@@ -596,7 +595,7 @@ func TestPlaceholderDataWithNodeRemoval(t *testing.T) {
phRes.MultiplyTo(7)
// add a node with allocation: must have the correct app1 added already
- ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 0, 1,
false)
+ ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false)
alloc := objects.NewAllocation(nodeID1, ask)
allocs := []*objects.Allocation{alloc}
@@ -679,7 +678,7 @@ func TestPlaceholderDataWithRemoval(t *testing.T) {
phRes.MultiplyTo(7)
// add a node with allocation: must have the correct app1 added already
- ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 0, 1,
false)
+ ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false)
alloc := objects.NewAllocation(nodeID1, ask)
allocs := []*objects.Allocation{alloc}
@@ -764,7 +763,7 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
node1 := newNodeMaxResource(nodeID1, nodeRes)
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
- ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 0,
1, true)
+ ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 1,
true)
ph := objects.NewAllocation(nodeID1, ask)
allocs := []*objects.Allocation{ph}
err = partition.AddNode(node1, allocs)
@@ -779,10 +778,10 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not
updated as expected")
// fake an ask that is used
- ask = newAllocationAskAll(allocID, appID1, taskGroup, appRes, 1, 1,
false)
+ ask = newAllocationAskAll(allocID, appID1, taskGroup, appRes, 1, false)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should be added to app")
- _, err = app.UpdateAskRepeat(allocID, -1)
+ _, err = app.AllocateAsk(allocID)
assert.NilError(t, err, "ask should have been updated without error")
assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app
should not have pending resources")
@@ -836,7 +835,7 @@ func TestRemoveNodeWithReal(t *testing.T) {
nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
node1 := newNodeMaxResource(nodeID1, nodeRes)
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
- ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 0,
1, true)
+ ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 1,
true)
ph := objects.NewAllocation(nodeID1, ask)
allocs := []*objects.Allocation{ph}
err = partition.AddNode(node1, allocs)
@@ -851,10 +850,10 @@ func TestRemoveNodeWithReal(t *testing.T) {
assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not
updated as expected")
// fake an ask that is used
- ask = newAllocationAskAll(allocID, appID1, taskGroup, appRes, 1, 1,
false)
+ ask = newAllocationAskAll(allocID, appID1, taskGroup, appRes, 1, false)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should be added to app")
- _, err = app.UpdateAskRepeat(allocID, -1)
+ _, err = app.AllocateAsk(allocID)
assert.NilError(t, err, "ask should have been updated without error")
assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app
should not have pending resources")
@@ -1144,7 +1143,7 @@ func TestRemoveAppAllocs(t *testing.T) {
assertLimits(t, getTestUserGroup(), appRes)
ask = newAllocationAsk("alloc-1", appNotRemoved, appRes)
- allocationID := "alloc-1-0"
+ allocationID := "alloc-1"
alloc = objects.NewAllocation(nodeID1, ask)
err = partition.addAllocation(alloc)
assert.NilError(t, err, "add allocation to partition should not have
failed")
@@ -1552,14 +1551,14 @@ func TestTryAllocate(t *testing.T) {
assert.NilError(t, err, "failed to add app-1 to partition")
err = app.AddAllocationAsk(newAllocationAsk(allocID, appID1, res))
assert.NilError(t, err, "failed to add ask alloc-1 to app-1")
- err = app.AddAllocationAsk(newAllocationAskPriority("alloc-2", appID1,
res, 1, 2))
+ err = app.AddAllocationAsk(newAllocationAskPriority("alloc-2", appID1,
res, 2))
assert.NilError(t, err, "failed to add ask alloc-2 to app-1")
app = newApplication(appID2, "default", "root.leaf")
// add to the partition
err = partition.AddApplication(app)
assert.NilError(t, err, "failed to add app-2 to partition")
- err = app.AddAllocationAsk(newAllocationAskPriority(allocID, appID2,
res, 1, 2))
+ err = app.AddAllocationAsk(newAllocationAskPriority(allocID, appID2,
res, 2))
assert.NilError(t, err, "failed to add ask alloc-1 to app-2")
expectedQueuesMaxLimits := make(map[string]map[string]interface{})
@@ -1699,7 +1698,10 @@ func TestRequiredNodeCancelNonDSReservations(t
*testing.T) {
err = partition.AddApplication(app)
assert.NilError(t, err, "failed to add app-1 to partition")
- ask := newAllocationAskRepeat("alloc-1", appID1, res, 2)
+ ask := newAllocationAsk("alloc-1", appID1, res)
+ err = app.AddAllocationAsk(ask)
+ assert.NilError(t, err, "failed to add ask to app")
+ ask = newAllocationAsk("alloc-2", appID1, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add ask to app")
// calculate the resource size using the repeat request (reuse is
possible using proto conversions in ask)
@@ -1774,10 +1776,14 @@ func TestRequiredNodeCancelDSReservations(t *testing.T)
{
err = partition.AddApplication(app)
assert.NilError(t, err, "failed to add app-1 to partition")
- ask := newAllocationAskRepeat("alloc-1", appID1, res, 2)
+ ask := newAllocationAsk("alloc-1", appID1, res)
ask.SetRequiredNode(nodeID1)
err = app.AddAllocationAsk(ask)
- assert.NilError(t, err, "failed to add ask to app")
+ assert.NilError(t, err, "failed to add ask 1 to app")
+ ask = newAllocationAsk("alloc-2", appID1, res)
+ ask.SetRequiredNode(nodeID1)
+ err = app.AddAllocationAsk(ask)
+ assert.NilError(t, err, "failed to add ask 2 to app")
// calculate the resource size using the repeat request (reuse is
possible using proto conversions in ask)
res.MultiplyTo(2)
assert.Assert(t, resources.Equals(res, app.GetPendingResource()),
"pending resource not set as expected")
@@ -2309,10 +2315,13 @@ func TestAllocReserveNewNode(t *testing.T) {
err = partition.AddApplication(app)
assert.NilError(t, err, "failed to add app-1 to partition")
- ask := newAllocationAskRepeat("alloc-1", appID1, res, 2)
+ ask := newAllocationAsk("alloc-1", appID1, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add ask to app")
- // calculate the resource size using the repeat request (reuse is
possible using proto conversions in ask)
+ ask2 := newAllocationAsk("alloc-2", appID1, res)
+ err = app.AddAllocationAsk(ask2)
+ assert.NilError(t, err, "failed to add ask2 to app")
+ // calculate the resource size for two asks
res.MultiplyTo(2)
assert.Assert(t, resources.Equals(res, app.GetPendingResource()),
"pending resource not set as expected")
assert.Assert(t, resources.Equals(res,
partition.root.GetPendingResource()), "pending resource not set as expected on
root queue")
@@ -2438,10 +2447,14 @@ func TestTryAllocateWithReserved(t *testing.T) {
err = partition.AddApplication(app)
assert.NilError(t, err, "failed to add app-1 to partition")
- ask := newAllocationAskRepeat("alloc-1", appID1, res, 2)
+ ask := newAllocationAsk("alloc-1", appID1, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add ask alloc-1 to app")
+ ask2 := newAllocationAsk("alloc-2", appID1, res)
+ err = app.AddAllocationAsk(ask2)
+ assert.NilError(t, err, "failed to add ask alloc-2 to app")
+
// reserve one node: scheduling should happen on the other
node2 := partition.GetNode(nodeID2)
if node2 == nil {
@@ -2491,9 +2504,11 @@ func TestScheduleRemoveReservedAsk(t *testing.T) {
app := newApplication(appID1, "default", "root.parent.sub-leaf")
err = partition.AddApplication(app)
assert.NilError(t, err, "failed to add app app-1 to partition")
- ask := newAllocationAskRepeat("alloc-1", appID1, res, 4)
- err = app.AddAllocationAsk(ask)
- assert.NilError(t, err, "failed to add ask alloc-1 to app")
+ for i := 1; i <= 4; i++ {
+ ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, res)
+ err = app.AddAllocationAsk(ask)
+ assert.NilError(t, err, fmt.Sprintf("failed to add ask alloc-%d
to app", i))
+ }
// calculate the resource size using the repeat request
pending := resources.Multiply(res, 4)
@@ -2509,12 +2524,12 @@ func TestScheduleRemoveReservedAsk(t *testing.T) {
assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000}))
// add a asks which should reserve
- ask = newAllocationAskRepeat("alloc-2", appID1, res, 1)
+ ask := newAllocationAsk("alloc-5", appID1, res)
err = app.AddAllocationAsk(ask)
- assert.NilError(t, err, "failed to add ask alloc-2 to app")
- ask = newAllocationAskRepeat("alloc-3", appID1, res, 1)
+ assert.NilError(t, err, "failed to add ask alloc-5 to app")
+ ask = newAllocationAsk("alloc-6", appID1, res)
err = app.AddAllocationAsk(ask)
- assert.NilError(t, err, "failed to add ask alloc-3 to app")
+ assert.NilError(t, err, "failed to add ask alloc-6 to app")
pending = resources.Multiply(res, 2)
assert.Assert(t, resources.Equals(pending, app.GetPendingResource()),
"pending resource not set as expected")
// allocate so we get reservations
@@ -2543,9 +2558,9 @@ func TestScheduleRemoveReservedAsk(t *testing.T) {
// before confirming remove the ask: do what the scheduler does when it
gets a request from a
// shim in processAllocationReleaseByAllocationKey()
// make sure we are counting correctly and leave the other reservation
intact
- removeAskID := "alloc-2"
- if alloc.GetAllocationKey() == "alloc-3" {
- removeAskID = "alloc-3"
+ removeAskID := "alloc-5"
+ if alloc.GetAllocationKey() == "alloc-6" {
+ removeAskID = "alloc-6"
}
released := app.RemoveAllocationAsk(removeAskID)
assert.Equal(t, released, 1, "expected one reservations to be released")
@@ -3521,18 +3536,10 @@ func TestAddAllocationAsk(t *testing.T) {
assert.NilError(t, err, "failed to create resource")
askKey := "ask-key-1"
ask := si.AllocationAsk{
- AllocationKey: askKey,
- ApplicationID: appID1,
- ResourceAsk: res.ToProto(),
- MaxAllocations: 0,
- }
- err = partition.addAllocationAsk(&ask)
- if err == nil || !strings.Contains(err.Error(), "invalid") {
- t.Fatalf("0 repeat ask should have returned invalid ask error:
%v", err)
+ AllocationKey: askKey,
+ ApplicationID: appID1,
+ ResourceAsk: res.ToProto(),
}
-
- // set the repeat and retry this should work
- ask.MaxAllocations = 1
err = partition.addAllocationAsk(&ask)
assert.NilError(t, err, "failed to add ask to app")
if !resources.Equals(app.GetPendingResource(), res) {
@@ -3915,7 +3922,7 @@ func TestUserHeadroom(t *testing.T) {
res, err = resources.NewResourceFromConf(map[string]string{"memory":
"3", "vcores": "3"})
assert.NilError(t, err, "failed to create resource")
- ask := newAllocationAskRepeat("alloc-1", "app-5", res, 2)
+ ask := newAllocationAsk("alloc-1", "app-5", res)
err = app5.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add ask to app")
@@ -3936,7 +3943,7 @@ func TestUserHeadroom(t *testing.T) {
assert.Equal(t, objects.AllocatedReserved, alloc.GetResult(),
"allocation result should have been allocated")
// create a reservation and ensure reservation has not been allocated
because there is no headroom for the user
- ask = newAllocationAskRepeat("alloc-2", "app-5", res, 2)
+ ask = newAllocationAsk("alloc-2", "app-5", res)
err = app5.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add ask to app")
partition.reserve(app5, node2, ask)
@@ -4418,22 +4425,19 @@ func TestCalculateOutstandingRequests(t *testing.T) {
"memory": 1,
})
siAsk1 := &si.AllocationAsk{
- AllocationKey: "ask-uuid-1",
- ApplicationID: appID1,
- ResourceAsk: askResource.ToProto(),
- MaxAllocations: 1,
+ AllocationKey: "ask-uuid-1",
+ ApplicationID: appID1,
+ ResourceAsk: askResource.ToProto(),
}
siAsk2 := &si.AllocationAsk{
- AllocationKey: "ask-uuid-2",
- ApplicationID: appID1,
- ResourceAsk: askResource.ToProto(),
- MaxAllocations: 1,
+ AllocationKey: "ask-uuid-2",
+ ApplicationID: appID1,
+ ResourceAsk: askResource.ToProto(),
}
siAsk3 := &si.AllocationAsk{
- AllocationKey: "ask-uuid-3",
- ApplicationID: appID2,
- ResourceAsk: askResource.ToProto(),
- MaxAllocations: 1,
+ AllocationKey: "ask-uuid-3",
+ ApplicationID: appID2,
+ ResourceAsk: askResource.ToProto(),
}
err = partition.addAllocationAsk(siAsk1)
assert.NilError(t, err)
@@ -4511,18 +4515,18 @@ func
TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) {
assert.Equal(t, objects.Replaced, alloc.GetResult())
assert.Equal(t, "real-alloc", alloc.GetAllocationKey())
assert.Equal(t, "tg-1", alloc.GetTaskGroup())
- assert.Equal(t, "real-alloc-0", alloc.GetAllocationID())
+ assert.Equal(t, "real-alloc", alloc.GetAllocationID())
// remove the terminated placeholder allocation
released, confirmed := partition.removeAllocation(&si.AllocationRelease{
ApplicationID: appID1,
TerminationType: si.TerminationType_PLACEHOLDER_REPLACED,
- AllocationKey: "real-alloc-0",
- AllocationID: "placeholder-0",
+ AllocationKey: "real-alloc",
+ AllocationID: "placeholder",
})
assert.Assert(t, released == nil, "unexpected released allocation")
assert.Assert(t, confirmed != nil, "expected to have a confirmed
allocation")
assert.Equal(t, "real-alloc", confirmed.GetAllocationKey())
assert.Equal(t, "tg-1", confirmed.GetTaskGroup())
- assert.Equal(t, "real-alloc-0", confirmed.GetAllocationID())
+ assert.Equal(t, "real-alloc", confirmed.GetAllocationID())
}
diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go
index 5dc7cba5..dac29a04 100644
--- a/pkg/scheduler/scheduler_test.go
+++ b/pkg/scheduler/scheduler_test.go
@@ -46,22 +46,19 @@ func TestInspectOutstandingRequests(t *testing.T) {
"memory": 1,
})
siAsk1 := &si.AllocationAsk{
- AllocationKey: "ask-uuid-1",
- ApplicationID: appID1,
- ResourceAsk: askResource.ToProto(),
- MaxAllocations: 1,
+ AllocationKey: "ask-uuid-1",
+ ApplicationID: appID1,
+ ResourceAsk: askResource.ToProto(),
}
siAsk2 := &si.AllocationAsk{
- AllocationKey: "ask-uuid-2",
- ApplicationID: appID1,
- ResourceAsk: askResource.ToProto(),
- MaxAllocations: 1,
+ AllocationKey: "ask-uuid-2",
+ ApplicationID: appID1,
+ ResourceAsk: askResource.ToProto(),
}
siAsk3 := &si.AllocationAsk{
- AllocationKey: "ask-uuid-3",
- ApplicationID: appID2,
- ResourceAsk: askResource.ToProto(),
- MaxAllocations: 1,
+ AllocationKey: "ask-uuid-3",
+ ApplicationID: appID2,
+ ResourceAsk: askResource.ToProto(),
}
err = partition.addAllocationAsk(siAsk1)
assert.NilError(t, err)
diff --git a/pkg/scheduler/tests/application_tracking_test.go
b/pkg/scheduler/tests/application_tracking_test.go
index d14df297..9243b3dc 100644
--- a/pkg/scheduler/tests/application_tracking_test.go
+++ b/pkg/scheduler/tests/application_tracking_test.go
@@ -119,8 +119,7 @@ func TestApplicationHistoryTracking(t *testing.T) {
"vcore": {Value: 1000},
},
},
- MaxAllocations: 1,
- ApplicationID: appID1,
+ ApplicationID: appID1,
},
},
RmID: "rm:123",
diff --git a/pkg/scheduler/tests/mockscheduler_test.go
b/pkg/scheduler/tests/mockscheduler_test.go
index 05e93c1f..222aa390 100644
--- a/pkg/scheduler/tests/mockscheduler_test.go
+++ b/pkg/scheduler/tests/mockscheduler_test.go
@@ -146,16 +146,17 @@ func (m *mockScheduler) removeApp(appID, partition
string) error {
})
}
-func (m *mockScheduler) addAppRequest(appID, allocID string, resource
*si.Resource, repeat int32) error {
+func (m *mockScheduler) addAppRequest(appID, allocID string, resource
*si.Resource, repeat int) error {
+ asks := make([]*si.AllocationAsk, repeat)
+ for i := 0; i < repeat; i++ {
+ asks[i] = &si.AllocationAsk{
+ AllocationKey: fmt.Sprintf("%s-%d", allocID, i),
+ ApplicationID: appID,
+ ResourceAsk: resource,
+ }
+ }
return m.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
- {
- AllocationKey: allocID,
- ApplicationID: appID,
- ResourceAsk: resource,
- MaxAllocations: repeat,
- },
- },
+ Asks: asks,
RmID: m.rmID,
})
}
diff --git a/pkg/scheduler/tests/operation_test.go
b/pkg/scheduler/tests/operation_test.go
index 1e7f94c7..49e55a19 100644
--- a/pkg/scheduler/tests/operation_test.go
+++ b/pkg/scheduler/tests/operation_test.go
@@ -96,8 +96,17 @@ partitions:
"vcore": {Value: 1000},
},
},
- MaxAllocations: 2,
- ApplicationID: appID1,
+ ApplicationID: appID1,
+ },
+ {
+ AllocationKey: "alloc-2",
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10000000},
+ "vcore": {Value: 1000},
+ },
+ },
+ ApplicationID: appID1,
},
},
RmID: "rm:123",
@@ -224,8 +233,17 @@ partitions:
"vcore": {Value: 1000},
},
},
- MaxAllocations: 2,
- ApplicationID: appID1,
+ ApplicationID: appID1,
+ },
+ {
+ AllocationKey: "alloc-2",
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10000000},
+ "vcore": {Value: 1000},
+ },
+ },
+ ApplicationID: appID1,
},
},
RmID: "rm:123",
diff --git a/pkg/scheduler/tests/performance_test.go
b/pkg/scheduler/tests/performance_test.go
index 4749b5c5..bfbdc852 100644
--- a/pkg/scheduler/tests/performance_test.go
+++ b/pkg/scheduler/tests/performance_test.go
@@ -126,40 +126,43 @@ partitions:
// Request pods
app1NumPods := numPods / 2
- err = proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
- {
- AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value:
int64(requestMem)},
- "vcore": {Value:
int64(requestVcore)},
- },
+ app1Asks := make([]*si.AllocationAsk, app1NumPods)
+ for i := 0; i < app1NumPods; i++ {
+ app1Asks[i] = &si.AllocationAsk{
+ AllocationKey: fmt.Sprintf("alloc-1-%d", i),
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: int64(requestMem)},
+ "vcore": {Value: int64(requestVcore)},
},
- MaxAllocations: int32(app1NumPods),
- ApplicationID: appID1,
},
- },
+ ApplicationID: appID1,
+ }
+ }
+ err = proxy.UpdateAllocation(&si.AllocationRequest{
+ Asks: app1Asks,
RmID: "rm:123",
})
if err != nil {
b.Error(err.Error())
}
- err = proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
- {
- AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value:
int64(requestMem)},
- "vcore": {Value:
int64(requestVcore)},
- },
+ app2NumPods := numPods - app1NumPods
+ app2Asks := make([]*si.AllocationAsk, app2NumPods)
+ for i := 0; i < app2NumPods; i++ {
+ app2Asks[i] = &si.AllocationAsk{
+ AllocationKey: fmt.Sprintf("alloc-2-%d", i),
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: int64(requestMem)},
+ "vcore": {Value: int64(requestVcore)},
},
- MaxAllocations: int32(numPods - app1NumPods),
- ApplicationID: appID2,
},
- },
+ ApplicationID: appID2,
+ }
+ }
+ err = proxy.UpdateAllocation(&si.AllocationRequest{
+ Asks: app2Asks,
RmID: "rm:123",
})
if err != nil {
@@ -167,6 +170,8 @@ partitions:
}
// Reset timer for this benchmark
+ duration = time.Since(startTime)
+ b.Logf("Total time to add %d pods in %s, %f per second", numPods,
duration, float64(numPods)/duration.Seconds())
startTime = time.Now()
b.ResetTimer()
diff --git a/pkg/scheduler/tests/plugin_test.go
b/pkg/scheduler/tests/plugin_test.go
index c45421eb..467ffebc 100644
--- a/pkg/scheduler/tests/plugin_test.go
+++ b/pkg/scheduler/tests/plugin_test.go
@@ -97,8 +97,7 @@ partitions:
"memory": {Value: 8},
},
},
- MaxAllocations: 1,
- ApplicationID: appID1,
+ ApplicationID: appID1,
},
},
RmID: "rm:123",
@@ -122,8 +121,7 @@ partitions:
"memory": {Value: 5},
},
},
- MaxAllocations: 1,
- ApplicationID: appID1,
+ ApplicationID: appID1,
},
},
RmID: "rm:123",
diff --git a/pkg/scheduler/tests/recovery_test.go
b/pkg/scheduler/tests/recovery_test.go
index 09e0f26e..75c1f470 100644
--- a/pkg/scheduler/tests/recovery_test.go
+++ b/pkg/scheduler/tests/recovery_test.go
@@ -19,6 +19,7 @@
package tests
import (
+ "fmt"
"testing"
"gotest.tools/v3/assert"
@@ -125,6 +126,16 @@ func TestSchedulerRecovery(t *testing.T) {
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: []*si.AllocationAsk{
+ {
+ AllocationKey: "alloc-0",
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10},
+ "vcore": {Value: 1},
+ },
+ },
+ ApplicationID: appID1,
+ },
{
AllocationKey: "alloc-1",
ResourceAsk: &si.Resource{
@@ -133,8 +144,7 @@ func TestSchedulerRecovery(t *testing.T) {
"vcore": {Value: 1},
},
},
- MaxAllocations: 2,
- ApplicationID: appID1,
+ ApplicationID: appID1,
},
},
RmID: "rm:123",
@@ -170,32 +180,23 @@ func TestSchedulerRecovery(t *testing.T) {
waitForAllocatedNodeResource(t, ms.scheduler.GetClusterContext(),
"[rm:123]default",
[]string{"node-1:1234", "node-2:1234"}, 20, 1000)
- // ask for two more resources
- err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
- {
- AllocationKey: "alloc-2",
- ResourceAsk: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 50},
- "vcore": {Value: 5},
- },
- },
- MaxAllocations: 2,
- ApplicationID: appID1,
- },
- {
- AllocationKey: "alloc-3",
- ResourceAsk: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 5},
- },
+ // ask for 4 more allocations
+ asks := make([]*si.AllocationAsk, 4)
+ mem := [4]int64{50, 100, 50, 100}
+ for i := 0; i < 4; i++ {
+ asks[i] = &si.AllocationAsk{
+ AllocationKey: fmt.Sprintf("alloc-%d", i+2),
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: mem[i]},
+ "vcore": {Value: 5},
},
- MaxAllocations: 2,
- ApplicationID: appID1,
},
- },
+ ApplicationID: appID1,
+ }
+ }
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Asks: asks,
RmID: "rm:123",
})
@@ -391,8 +392,17 @@ func TestSchedulerRecovery2Allocations(t *testing.T) {
"vcore": {Value: 1},
},
},
- MaxAllocations: 2,
- ApplicationID: appID1,
+ ApplicationID: appID1,
+ },
+ {
+ AllocationKey: "alloc-2",
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10},
+ "vcore": {Value: 1},
+ },
+ },
+ ApplicationID: appID1,
},
},
RmID: "rm:123",
@@ -800,8 +810,17 @@ partitions:
"vcore": {Value: 1},
},
},
- MaxAllocations: 2,
- ApplicationID: appID1,
+ ApplicationID: appID1,
+ },
+ {
+ AllocationKey: "alloc-2",
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10},
+ "vcore": {Value: 1},
+ },
+ },
+ ApplicationID: appID1,
},
},
RmID: "rm:123",
@@ -1006,7 +1025,6 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
"vcore": {Value: 1},
},
},
- MaxAllocations: 1,
ApplicationID: appID1,
TaskGroupName: "tg-2",
Placeholder: true,
@@ -1028,7 +1046,6 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
"vcore": {Value: 1},
},
},
- MaxAllocations: 1,
ApplicationID: appID1,
TaskGroupName: "tg-1",
},
@@ -1040,7 +1057,6 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
"vcore": {Value: 1},
},
},
- MaxAllocations: 1,
ApplicationID: appID1,
TaskGroupName: "tg-2",
},
@@ -1057,13 +1073,13 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
{
ApplicationID: appID1,
PartitionName: "default",
- AllocationID: "ph-alloc-1-0",
+ AllocationID: "ph-alloc-1",
TerminationType:
si.TerminationType_PLACEHOLDER_REPLACED,
},
{
ApplicationID: appID1,
PartitionName: "default",
- AllocationID: "ph-alloc-2-0",
+ AllocationID: "ph-alloc-2",
TerminationType:
si.TerminationType_PLACEHOLDER_REPLACED,
},
},
@@ -1079,13 +1095,13 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
{
ApplicationID: appID1,
PartitionName: "default",
- AllocationID: "real-alloc-1-0",
+ AllocationID: "real-alloc-1",
TerminationType:
si.TerminationType_STOPPED_BY_RM,
},
{
ApplicationID: appID1,
PartitionName: "default",
- AllocationID: "real-alloc-2-0",
+ AllocationID: "real-alloc-2",
TerminationType:
si.TerminationType_STOPPED_BY_RM,
},
},
diff --git a/pkg/scheduler/tests/smoke_test.go
b/pkg/scheduler/tests/smoke_test.go
index 72f61fee..e25269dd 100644
--- a/pkg/scheduler/tests/smoke_test.go
+++ b/pkg/scheduler/tests/smoke_test.go
@@ -19,6 +19,7 @@
package tests
import (
+ "fmt"
"strconv"
"testing"
"time"
@@ -232,15 +233,24 @@ func TestBasicScheduler(t *testing.T) {
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: []*si.AllocationAsk{
{
- AllocationKey: "alloc-1",
+ AllocationKey: "alloc-1a",
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10000000},
+ "vcore": {Value: 1000},
+ },
+ },
+ ApplicationID: appID1,
+ },
+ {
+ AllocationKey: "alloc-1b",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
},
},
- MaxAllocations: 2,
- ApplicationID: appID1,
+ ApplicationID: appID1,
},
},
RmID: "rm:123",
@@ -274,30 +284,48 @@ func TestBasicScheduler(t *testing.T) {
// Check allocated resources of nodes
waitForAllocatedNodeResource(t, ms.scheduler.GetClusterContext(),
ms.partitionName, []string{"node-1:1234", "node-2:1234"}, 20000000, 1000)
- // ask for two more resources
+ // ask for 4 more tasks
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: []*si.AllocationAsk{
{
- AllocationKey: "alloc-2",
+ AllocationKey: "alloc-2a",
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 50000000},
+ "vcore": {Value: 5000},
+ },
+ },
+ ApplicationID: appID1,
+ },
+ {
+ AllocationKey: "alloc-2b",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 50000000},
"vcore": {Value: 5000},
},
},
- MaxAllocations: 2,
- ApplicationID: appID1,
+ ApplicationID: appID1,
+ },
+ {
+ AllocationKey: "alloc-3a",
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 100000000},
+ "vcore": {Value: 5000},
+ },
+ },
+ ApplicationID: appID1,
},
{
- AllocationKey: "alloc-3",
+ AllocationKey: "alloc-3b",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 5000},
},
},
- MaxAllocations: 2,
- ApplicationID: appID1,
+ ApplicationID: appID1,
},
},
RmID: "rm:123",
@@ -432,20 +460,21 @@ func TestBasicSchedulerAutoAllocation(t *testing.T) {
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
- err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
- {
- AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10000000},
- "vcore": {Value: 1000},
- },
+ asks := make([]*si.AllocationAsk, 20)
+ for i := 0; i < 20; i++ {
+ asks[i] = &si.AllocationAsk{
+ AllocationKey: fmt.Sprintf("alloc-%d", i),
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10000000},
+ "vcore": {Value: 1000},
},
- MaxAllocations: 20,
- ApplicationID: appID,
},
- },
+ ApplicationID: appID,
+ }
+ }
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Asks: asks,
RmID: "rm:123",
})
@@ -542,31 +571,22 @@ partitions:
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
- err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
- {
- AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10000000},
- "vcore": {Value: 1000},
- },
- },
- MaxAllocations: 20,
- ApplicationID: app1ID,
- },
- {
- AllocationKey: "alloc-2",
- ResourceAsk: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10000000},
- "vcore": {Value: 1000},
- },
+ asks := make([]*si.AllocationAsk, 40)
+ appIDs := []string{app1ID, app2ID}
+ for i := 0; i < 40; i++ {
+ asks[i] = &si.AllocationAsk{
+ AllocationKey: fmt.Sprintf("alloc-%d", i),
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10000000},
+ "vcore": {Value: 1000},
},
- MaxAllocations: 20,
- ApplicationID: app2ID,
},
- },
+ ApplicationID: appIDs[(i / 20)],
+ }
+ }
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Asks: asks,
RmID: "rm:123",
})
@@ -664,31 +684,22 @@ partitions:
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
- err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
- {
- AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10000000},
- "vcore": {Value: 1000},
- },
- },
- MaxAllocations: 20,
- ApplicationID: app1ID,
- },
- {
- AllocationKey: "alloc-2",
- ResourceAsk: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10000000},
- "vcore": {Value: 1000},
- },
+ asks := make([]*si.AllocationAsk, 40)
+ appIDs := []string{app1ID, app2ID}
+ for i := 0; i < 40; i++ {
+ asks[i] = &si.AllocationAsk{
+ AllocationKey: fmt.Sprintf("alloc-%d", i),
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10000000},
+ "vcore": {Value: 1000},
},
- MaxAllocations: 20,
- ApplicationID: app2ID,
},
- },
+ ApplicationID: appIDs[i/20],
+ }
+ }
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Asks: asks,
RmID: "rm:123",
})
@@ -862,20 +873,21 @@ partitions:
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
- err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
- {
- AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
- Resources:
map[string]*si.Quantity{
- "memory":
{Value: param.askMemory},
- "vcore":
{Value: param.askCPU},
- },
+ asks := make([]*si.AllocationAsk, param.numOfAsk)
+ for i := int32(0); i < param.numOfAsk; i++ {
+ asks[i] = &si.AllocationAsk{
+ AllocationKey: fmt.Sprintf("alloc-%d",
i),
+ ResourceAsk: &si.Resource{
+ Resources:
map[string]*si.Quantity{
+ "memory": {Value:
param.askMemory},
+ "vcore": {Value:
param.askCPU},
},
- MaxAllocations: param.numOfAsk,
- ApplicationID: appID1,
},
- },
+ ApplicationID: appID1,
+ }
+ }
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Asks: asks,
RmID: "rm:123",
})
@@ -1112,31 +1124,22 @@ partitions:
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
- err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
- {
- AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10000000},
- "vcore": {Value: 1000},
- },
- },
- MaxAllocations: 20,
- ApplicationID: app1ID,
- },
- {
- AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10000000},
- "vcore": {Value: 1000},
- },
+ asks := make([]*si.AllocationAsk, 40)
+ appIDs := []string{app1ID, app2ID}
+ for i := 0; i < 40; i++ {
+ asks[i] = &si.AllocationAsk{
+ AllocationKey: fmt.Sprintf("alloc-%d", i),
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10000000},
+ "vcore": {Value: 1000},
},
- MaxAllocations: 20,
- ApplicationID: app2ID,
},
- },
+ ApplicationID: appIDs[i/20],
+ }
+ }
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Asks: asks,
RmID: "rm:123",
})
@@ -1230,21 +1233,22 @@ partitions:
waitForNewNode(t, context, node.NodeID, partition, 1000)
}
- // Request ask with 20 allocations
- err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
- {
- AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10000000},
- "vcore": {Value: 1000},
- },
+ // Request 20 allocations
+ asks := make([]*si.AllocationAsk, 20)
+ for i := 0; i < 20; i++ {
+ asks[i] = &si.AllocationAsk{
+ AllocationKey: fmt.Sprintf("alloc-%d", i),
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10000000},
+ "vcore": {Value: 1000},
},
- MaxAllocations: 20,
- ApplicationID: appID,
},
- },
+ ApplicationID: appID,
+ }
+ }
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Asks: asks,
RmID: "rm:123",
})
@@ -1351,10 +1355,9 @@ func TestDupReleasesInGangScheduling(t *testing.T) {
"vcore": {Value: 1000},
},
},
- TaskGroupName: "tg",
- Placeholder: true,
- MaxAllocations: 1,
- ApplicationID: appID1,
+ TaskGroupName: "tg",
+ Placeholder: true,
+ ApplicationID: appID1,
},
},
RmID: "rm:123",
@@ -1386,10 +1389,9 @@ func TestDupReleasesInGangScheduling(t *testing.T) {
"vcore": {Value: 1000},
},
},
- MaxAllocations: 1,
- ApplicationID: appID1,
- Placeholder: false,
- TaskGroupName: "tg",
+ ApplicationID: appID1,
+ Placeholder: false,
+ TaskGroupName: "tg",
},
},
RmID: "rm:123",
@@ -1549,8 +1551,17 @@ partitions:
"vcore": {Value: 1000},
},
},
- MaxAllocations: 2,
- ApplicationID: appID1,
+ ApplicationID: appID1,
+ },
+ {
+ AllocationKey: "alloc-2",
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10000000},
+ "vcore": {Value: 1000},
+ },
+ },
+ ApplicationID: appID1,
},
},
RmID: "rm:123",
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index e37a9380..5736c524 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -497,44 +497,38 @@ func newApplicationTGTagsWithPhTimeout(appID, partition,
queueName string, task
}
func newAllocationAskTG(allocKey, appID, taskGroup string, res
*resources.Resource, placeHolder bool) *objects.AllocationAsk {
- return newAllocationAskAll(allocKey, appID, taskGroup, res, 1, 1,
placeHolder)
+ return newAllocationAskAll(allocKey, appID, taskGroup, res, 1,
placeHolder)
}
func newAllocationAsk(allocKey, appID string, res *resources.Resource)
*objects.AllocationAsk {
- return newAllocationAskRepeat(allocKey, appID, res, 1)
+ return newAllocationAskAll(allocKey, appID, "", res, 1, false)
}
-func newAllocationAskRepeat(allocKey, appID string, res *resources.Resource,
repeat int32) *objects.AllocationAsk {
- return newAllocationAskPriority(allocKey, appID, res, repeat, 1)
+func newAllocationAskPriority(allocKey, appID string, res *resources.Resource,
prio int32) *objects.AllocationAsk {
+ return newAllocationAskAll(allocKey, appID, "", res, prio, false)
}
-func newAllocationAskPriority(allocKey, appID string, res *resources.Resource,
repeat int32, prio int32) *objects.AllocationAsk {
- return newAllocationAskAll(allocKey, appID, "", res, repeat, prio,
false)
-}
-
-func newAllocationAskAll(allocKey, appID, taskGroup string, res
*resources.Resource, repeat int32, prio int32, placeHolder bool)
*objects.AllocationAsk {
+func newAllocationAskAll(allocKey, appID, taskGroup string, res
*resources.Resource, prio int32, placeHolder bool) *objects.AllocationAsk {
return objects.NewAllocationAskFromSI(&si.AllocationAsk{
- AllocationKey: allocKey,
- ApplicationID: appID,
- PartitionName: "test",
- ResourceAsk: res.ToProto(),
- MaxAllocations: repeat,
- Priority: prio,
- TaskGroupName: taskGroup,
- Placeholder: placeHolder,
+ AllocationKey: allocKey,
+ ApplicationID: appID,
+ PartitionName: "test",
+ ResourceAsk: res.ToProto(),
+ Priority: prio,
+ TaskGroupName: taskGroup,
+ Placeholder: placeHolder,
})
}
func newAllocationAskPreempt(allocKey, appID string, prio int32, res
*resources.Resource) *objects.AllocationAsk {
return objects.NewAllocationAskFromSI(&si.AllocationAsk{
- AllocationKey: allocKey,
- ApplicationID: appID,
- PartitionName: "default",
- ResourceAsk: res.ToProto(),
- MaxAllocations: 1,
- Priority: prio,
- TaskGroupName: taskGroup,
- Placeholder: false,
+ AllocationKey: allocKey,
+ ApplicationID: appID,
+ PartitionName: "default",
+ ResourceAsk: res.ToProto(),
+ Priority: prio,
+ TaskGroupName: taskGroup,
+ Placeholder: false,
PreemptionPolicy: &si.PreemptionPolicy{
AllowPreemptSelf: true,
AllowPreemptOther: true,
diff --git a/pkg/webservice/dao/allocation_ask_info.go
b/pkg/webservice/dao/allocation_ask_info.go
index 07fbd073..8097c5bc 100644
--- a/pkg/webservice/dao/allocation_ask_info.go
+++ b/pkg/webservice/dao/allocation_ask_info.go
@@ -29,7 +29,6 @@ type AllocationAskDAOInfo struct {
AllocationTags map[string]string
`json:"allocationTags,omitempty"`
RequestTime int64
`json:"requestTime,omitempty"`
ResourcePerAlloc map[string]int64
`json:"resource,omitempty"`
- PendingCount int32
`json:"pendingCount,omitempty"`
Priority string
`json:"priority,omitempty"`
RequiredNodeID string
`json:"requiredNodeId,omitempty"`
ApplicationID string
`json:"applicationId,omitempty"`
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 7da91622..cab7fb77 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -326,7 +326,6 @@ func getAllocationAskDAO(ask *objects.AllocationAsk)
*dao.AllocationAskDAOInfo {
AllocationTags: ask.GetTagsClone(),
RequestTime: ask.GetCreateTime().UnixNano(),
ResourcePerAlloc: ask.GetAllocatedResource().DAOMap(),
- PendingCount: ask.GetPendingAskRepeat(),
Priority: strconv.Itoa(int(ask.GetPriority())),
RequiredNodeID: ask.GetRequiredNode(),
ApplicationID: ask.GetApplicationID(),
@@ -345,7 +344,7 @@ func getAllocationAskDAO(ask *objects.AllocationAsk)
*dao.AllocationAskDAOInfo {
func getAllocationAsksDAO(asks []*objects.AllocationAsk)
[]*dao.AllocationAskDAOInfo {
asksDAO := make([]*dao.AllocationAskDAOInfo, 0, len(asks))
for _, ask := range asks {
- if ask.GetPendingAskRepeat() > 0 {
+ if !ask.IsAllocated() {
asksDAO = append(asksDAO, getAllocationAskDAO(ask))
}
}
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 4d8ca9ed..6c908e85 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -1302,15 +1302,15 @@ func TestGetPartitionNodes(t *testing.T) {
if node.NodeID == node1ID {
assert.Equal(t, node.NodeID, node1ID)
assert.Equal(t, "alloc-1",
node.Allocations[0].AllocationKey)
- assert.Equal(t, "alloc-1-0", node.Allocations[0].UUID)
- assert.Equal(t, "alloc-1-0",
node.Allocations[0].AllocationID)
+ assert.Equal(t, "alloc-1", node.Allocations[0].UUID)
+ assert.Equal(t, "alloc-1",
node.Allocations[0].AllocationID)
assert.DeepEqual(t, attributesOfnode1, node.Attributes)
assert.DeepEqual(t, map[string]int64{"memory": 50,
"vcore": 30}, node.Utilized)
} else {
assert.Equal(t, node.NodeID, node2ID)
assert.Equal(t, "alloc-2",
node.Allocations[0].AllocationKey)
- assert.Equal(t, "alloc-2-0", node.Allocations[0].UUID)
- assert.Equal(t, "alloc-2-0",
node.Allocations[0].AllocationID)
+ assert.Equal(t, "alloc-2", node.Allocations[0].UUID)
+ assert.Equal(t, "alloc-2",
node.Allocations[0].AllocationID)
assert.DeepEqual(t, attributesOfnode2, node.Attributes)
assert.DeepEqual(t, map[string]int64{"memory": 30,
"vcore": 50}, node.Utilized)
}
@@ -1385,12 +1385,11 @@ func TestGetQueueApplicationsHandler(t *testing.T) {
Resources: map[string]*si.Quantity{"vcore": {Value: 1}},
}
ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{
- ApplicationID: "app-1",
- PartitionName: part.Name,
- TaskGroupName: tg,
- ResourceAsk: res,
- Placeholder: true,
- MaxAllocations: 1})
+ ApplicationID: "app-1",
+ PartitionName: part.Name,
+ TaskGroupName: tg,
+ ResourceAsk: res,
+ Placeholder: true})
err := app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
app.SetTimedOutPlaceholder(tg, 1)
@@ -1565,10 +1564,9 @@ func TestGetApplicationHandler(t *testing.T) {
Resources: map[string]*si.Quantity{"vcore": {Value: 1}},
}
ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{
- ApplicationID: "app-1",
- PartitionName: part.Name,
- ResourceAsk: res,
- MaxAllocations: 1})
+ ApplicationID: "app-1",
+ PartitionName: part.Name,
+ ResourceAsk: res})
err := app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
@@ -2473,10 +2471,9 @@ func prepareUserAndGroupContext(t *testing.T, config
string) {
Resources: map[string]*si.Quantity{"vcore": {Value: 1}},
}
ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{
- ApplicationID: "app-1",
- PartitionName: part.Name,
- ResourceAsk: res,
- MaxAllocations: 1})
+ ApplicationID: "app-1",
+ PartitionName: part.Name,
+ ResourceAsk: res})
err := app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]