This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 1ee27745 [YUNIKORN-2778] Core: Use unified UpdateAllocation API for
both asks and allocations (#946)
1ee27745 is described below
commit 1ee277455be8b1e7b98eb92a0ecce788e2388b9d
Author: Craig Condit <[email protected]>
AuthorDate: Thu Aug 15 10:16:36 2024 -0500
[YUNIKORN-2778] Core: Use unified UpdateAllocation API for both asks and
allocations (#946)
Handle both asks and allocations using a single method. This allows for
idempotency and simplified shim interactions.
Closes: #946
---
go.mod | 2 +-
go.sum | 4 +-
pkg/examples/simple_example.go | 4 +-
pkg/rmproxy/rmevent/events.go | 10 -
pkg/rmproxy/rmproxy.go | 33 ---
pkg/scheduler/context.go | 78 +------
pkg/scheduler/health_checker_test.go | 2 +-
pkg/scheduler/objects/allocation.go | 72 ++-----
pkg/scheduler/objects/allocation_test.go | 78 +++----
pkg/scheduler/objects/application.go | 26 +--
pkg/scheduler/objects/application_test.go | 20 +-
pkg/scheduler/objects/preemption_test.go | 12 +-
.../objects/required_node_preemptor_test.go | 8 +-
pkg/scheduler/objects/utilities_test.go | 18 +-
pkg/scheduler/partition.go | 226 ++++++++++++++-------
pkg/scheduler/partition_test.go | 187 ++++++++---------
pkg/scheduler/scheduler_test.go | 34 ++--
pkg/scheduler/tests/application_tracking_test.go | 6 +-
pkg/scheduler/tests/mockscheduler_test.go | 29 +--
pkg/scheduler/tests/operation_test.go | 12 +-
pkg/scheduler/tests/performance_test.go | 20 +-
pkg/scheduler/tests/plugin_test.go | 8 +-
pkg/scheduler/tests/recovery_test.go | 43 ++--
pkg/scheduler/tests/reservation_test.go | 2 +-
pkg/scheduler/tests/smoke_test.go | 92 ++++-----
pkg/scheduler/utilities_test.go | 60 +++---
pkg/webservice/handlers_test.go | 106 +++++-----
27 files changed, 550 insertions(+), 642 deletions(-)
diff --git a/go.mod b/go.mod
index 7bbe0319..0d0122cc 100644
--- a/go.mod
+++ b/go.mod
@@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core
go 1.21
require (
- github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240731203810-92032b13d586
+ github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240815142741-38a38685cd4e
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
diff --git a/go.sum b/go.sum
index 9b2672af..2f488345 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,5 @@
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240731203810-92032b13d586
h1:ZVpo9Qj2/gvwX6Rl44UxkZBm2pZWEJDYWTramc9hwF0=
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240731203810-92032b13d586/go.mod
h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240815142741-38a38685cd4e
h1:ZOLst6ROwUrgoUQbEdYaz28iKuiU5YNYGtelKsTFhqw=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240815142741-38a38685cd4e/go.mod
h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0
h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
diff --git a/pkg/examples/simple_example.go b/pkg/examples/simple_example.go
index d47ef59d..0ca1cd4b 100644
--- a/pkg/examples/simple_example.go
+++ b/pkg/examples/simple_example.go
@@ -209,10 +209,10 @@ partitions:
// Send request
err = proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10},
"vcore": {Value: 1},
diff --git a/pkg/rmproxy/rmevent/events.go b/pkg/rmproxy/rmevent/events.go
index 39ae125e..6517a414 100644
--- a/pkg/rmproxy/rmevent/events.go
+++ b/pkg/rmproxy/rmevent/events.go
@@ -81,11 +81,6 @@ type RMApplicationUpdateEvent struct {
UpdatedApplications []*si.UpdatedApplication
}
-type RMRejectedAllocationAskEvent struct {
- RmID string
- RejectedAllocationAsks []*si.RejectedAllocationAsk
-}
-
type RMRejectedAllocationEvent struct {
RmID string
RejectedAllocations []*si.RejectedAllocation
@@ -97,11 +92,6 @@ type RMReleaseAllocationEvent struct {
Channel chan *Result `json:"-"`
}
-type RMReleaseAllocationAskEvent struct {
- RmID string
- ReleasedAllocationAsks []*si.AllocationAskRelease
-}
-
type RMNodeUpdateEvent struct {
RmID string
AcceptedNodes []*si.AcceptedNode
diff --git a/pkg/rmproxy/rmproxy.go b/pkg/rmproxy/rmproxy.go
index 97ccbcdd..6816c10c 100644
--- a/pkg/rmproxy/rmproxy.go
+++ b/pkg/rmproxy/rmproxy.go
@@ -162,27 +162,6 @@ func (rmp *RMProxy) triggerUpdateAllocation(rmID string,
response *si.Allocation
}
}
-func (rmp *RMProxy) processRMReleaseAllocationAskEvent(event
*rmevent.RMReleaseAllocationAskEvent) {
- if len(event.ReleasedAllocationAsks) == 0 {
- return
- }
- response := &si.AllocationResponse{
- ReleasedAsks: event.ReleasedAllocationAsks,
- }
- rmp.triggerUpdateAllocation(event.RmID, response)
-}
-
-func (rmp *RMProxy) processRMRejectedAllocationAskEvent(event
*rmevent.RMRejectedAllocationAskEvent) {
- if len(event.RejectedAllocationAsks) == 0 {
- return
- }
- response := &si.AllocationResponse{
- Rejected: event.RejectedAllocationAsks,
- }
- rmp.triggerUpdateAllocation(event.RmID, response)
-
metrics.GetSchedulerMetrics().AddRejectedContainers(len(event.RejectedAllocationAsks))
-}
-
func (rmp *RMProxy) processRMRejectedAllocationEvent(event
*rmevent.RMRejectedAllocationEvent) {
if len(event.RejectedAllocations) == 0 {
return
@@ -224,14 +203,10 @@ func (rmp *RMProxy) handleRMEvents() {
rmp.processApplicationUpdateEvent(v)
case *rmevent.RMReleaseAllocationEvent:
rmp.processRMReleaseAllocationEvent(v)
- case *rmevent.RMRejectedAllocationAskEvent:
- rmp.processRMRejectedAllocationAskEvent(v)
case *rmevent.RMRejectedAllocationEvent:
rmp.processRMRejectedAllocationEvent(v)
case *rmevent.RMNodeUpdateEvent:
rmp.processRMNodeUpdateEvent(v)
- case *rmevent.RMReleaseAllocationAskEvent:
- rmp.processRMReleaseAllocationAskEvent(v)
default:
panic(fmt.Sprintf("%s is not an acceptable type
for RM event.", reflect.TypeOf(v).String()))
}
@@ -304,19 +279,11 @@ func (rmp *RMProxy) UpdateAllocation(request
*si.AllocationRequest) error {
alloc.PartitionName =
common.GetNormalizedPartitionName(alloc.PartitionName, request.RmID)
}
- // Update asks
- for _, ask := range request.Asks {
- ask.PartitionName =
common.GetNormalizedPartitionName(ask.PartitionName, request.RmID)
- }
-
// Update releases
if request.Releases != nil {
for _, rel := range request.Releases.AllocationsToRelease {
rel.PartitionName =
common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
}
- for _, rel := range request.Releases.AllocationAsksToRelease {
- rel.PartitionName =
common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
- }
}
rmp.schedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request:
request})
return nil
diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go
index fe42c62d..ee7257b7 100644
--- a/pkg/scheduler/context.go
+++ b/pkg/scheduler/context.go
@@ -713,13 +713,7 @@ func (cc *ClusterContext)
handleRMUpdateAllocationEvent(event *rmevent.RMUpdateA
if len(request.Allocations) != 0 {
cc.processAllocations(request)
}
- if len(request.Asks) != 0 {
- cc.processAsks(request)
- }
if request.Releases != nil {
- if len(request.Releases.AllocationAsksToRelease) > 0 {
-
cc.processAskReleases(request.Releases.AllocationAsksToRelease)
- }
if len(request.Releases.AllocationsToRelease) > 0 {
cc.processAllocationReleases(request.Releases.AllocationsToRelease,
request.RmID)
}
@@ -750,13 +744,15 @@ func (cc *ClusterContext) processAllocations(request
*si.AllocationRequest) {
}
alloc := objects.NewAllocationFromSI(siAlloc)
- if err := partition.AddAllocation(alloc); err != nil {
+
+ _, newAlloc, err := partition.UpdateAllocation(alloc)
+ if err != nil {
rejectedAllocs = append(rejectedAllocs,
&si.RejectedAllocation{
AllocationKey: siAlloc.AllocationKey,
ApplicationID: siAlloc.ApplicationID,
Reason: err.Error(),
})
- log.Log(log.SchedContext).Error("Invalid allocation add
requested by shim",
+ log.Log(log.SchedContext).Error("Invalid allocation
update requested by shim",
zap.String("partition", siAlloc.PartitionName),
zap.String("nodeID", siAlloc.NodeID),
zap.String("applicationID",
siAlloc.ApplicationID),
@@ -764,7 +760,10 @@ func (cc *ClusterContext) processAllocations(request
*si.AllocationRequest) {
zap.Error(err))
continue
}
- cc.notifyRMNewAllocation(request.RmID, alloc)
+ // at some point, we may need to handle new requests as well
+ if newAlloc {
+ cc.notifyRMNewAllocation(request.RmID, alloc)
+ }
}
// Reject allocs returned to RM proxy for the apps and partitions not
found
@@ -776,67 +775,6 @@ func (cc *ClusterContext) processAllocations(request
*si.AllocationRequest) {
}
}
-func (cc *ClusterContext) processAsks(request *si.AllocationRequest) {
- // Send rejects back to RM
- rejectedAsks := make([]*si.RejectedAllocationAsk, 0)
-
- // Send to scheduler
- for _, siAsk := range request.Asks {
- // try to get ApplicationInfo
- partition := cc.GetPartition(siAsk.PartitionName)
- if partition == nil {
- msg := fmt.Sprintf("Failed to find partition %s, for
application %s and allocation %s", siAsk.PartitionName, siAsk.ApplicationID,
siAsk.AllocationKey)
- log.Log(log.SchedContext).Error("Invalid ask add
requested by shim, partition not found",
- zap.String("partition", siAsk.PartitionName),
- zap.String("applicationID",
siAsk.ApplicationID),
- zap.String("askKey", siAsk.AllocationKey))
- rejectedAsks = append(rejectedAsks,
&si.RejectedAllocationAsk{
- AllocationKey: siAsk.AllocationKey,
- ApplicationID: siAsk.ApplicationID,
- Reason: msg,
- })
- continue
- }
-
- // try adding to app
- if err := partition.addAllocationAsk(siAsk); err != nil {
- rejectedAsks = append(rejectedAsks,
- &si.RejectedAllocationAsk{
- AllocationKey: siAsk.AllocationKey,
- ApplicationID: siAsk.ApplicationID,
- Reason: err.Error(),
- })
- log.Log(log.SchedContext).Error("Invalid ask add
requested by shim",
- zap.String("partition", siAsk.PartitionName),
- zap.String("applicationID",
siAsk.ApplicationID),
- zap.String("askKey", siAsk.AllocationKey),
- zap.Error(err))
- }
- }
-
- // Reject asks returned to RM Proxy for the apps and partitions not
found
- if len(rejectedAsks) > 0 {
-
cc.rmEventHandler.HandleEvent(&rmevent.RMRejectedAllocationAskEvent{
- RmID: request.RmID,
- RejectedAllocationAsks: rejectedAsks,
- })
- }
-}
-
-func (cc *ClusterContext) processAskReleases(releases
[]*si.AllocationAskRelease) {
- for _, toRelease := range releases {
- partition := cc.GetPartition(toRelease.PartitionName)
- if partition == nil {
- log.Log(log.SchedContext).Error("Invalid ask release
requested by shim, partition not found",
- zap.String("partition",
toRelease.PartitionName),
- zap.String("applicationID",
toRelease.ApplicationID),
- zap.String("askKey", toRelease.AllocationKey))
- continue
- }
- partition.removeAllocationAsk(toRelease)
- }
-}
-
func (cc *ClusterContext) processAllocationReleases(releases
[]*si.AllocationRelease, rmID string) {
for _, toRelease := range releases {
partition := cc.GetPartition(toRelease.PartitionName)
diff --git a/pkg/scheduler/health_checker_test.go
b/pkg/scheduler/health_checker_test.go
index c69f5a08..6d3e50ad 100644
--- a/pkg/scheduler/health_checker_test.go
+++ b/pkg/scheduler/health_checker_test.go
@@ -205,7 +205,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) {
// add orphan allocation to a node
node := schedulerContext.partitions[partName].nodes.GetNode("node")
- alloc := markAllocated("node", newAllocationAsk("key", "appID",
resources.NewResource()))
+ alloc := newAllocation("key", "appID", "node", resources.NewResource())
node.AddAllocation(alloc)
healthInfo = GetSchedulerHealthStatus(schedulerMetrics,
schedulerContext)
assert.Assert(t, !healthInfo.Healthy, "Scheduler should not be healthy")
diff --git a/pkg/scheduler/objects/allocation.go
b/pkg/scheduler/objects/allocation.go
index f75039bc..ce7abca3 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -83,62 +83,7 @@ type AllocationLogEntry struct {
Count int32
}
-// NewAllocationAsk creates an Allocation which is not yet satisfied.
-// Visible by tests
-func NewAllocationAsk(allocationKey string, applicationID string,
allocatedResource *resources.Resource) *Allocation {
- return &Allocation{
- allocationKey: allocationKey,
- applicationID: applicationID,
- allocatedResource: allocatedResource,
- allocLog: make(map[string]*AllocationLogEntry),
- resKeyPerNode: make(map[string]string),
- resKeyWithoutNode: reservationKeyWithoutNode(applicationID,
allocationKey),
- askEvents:
schedEvt.NewAskEvents(events.GetEventSystem()),
- }
-}
-
-// NewAllocationAskFromSI creates an Allocation which is not yet satisfied,
populating it from an SI AllocationAsk.
-// If the incoming ask is invalid, nil is returned.
-func NewAllocationAskFromSI(ask *si.AllocationAsk) *Allocation {
- var createTime time.Time
- siCreationTime, err :=
strconv.ParseInt(ask.Tags[siCommon.CreationTime], 10, 64)
- if err != nil {
- log.Log(log.SchedAllocation).Debug("CreationTime is not set on
the Allocation object or invalid",
- zap.String("creationTime",
ask.Tags[siCommon.CreationTime]))
- createTime = time.Now()
- } else {
- createTime = time.Unix(siCreationTime, 0)
- }
-
- a := &Allocation{
- allocationKey: ask.AllocationKey,
- applicationID: ask.ApplicationID,
- allocatedResource:
resources.NewResourceFromProto(ask.ResourceAsk),
- tags: CloneAllocationTags(ask.Tags),
- createTime: createTime,
- priority: ask.Priority,
- placeholder: ask.Placeholder,
- taskGroupName: ask.TaskGroupName,
- requiredNode: common.GetRequiredNodeFromTag(ask.Tags),
- allowPreemptSelf:
common.IsAllowPreemptSelf(ask.PreemptionPolicy),
- allowPreemptOther:
common.IsAllowPreemptOther(ask.PreemptionPolicy),
- originator: ask.Originator,
- allocLog: make(map[string]*AllocationLogEntry),
- resKeyPerNode: make(map[string]string),
- resKeyWithoutNode: reservationKeyWithoutNode(ask.ApplicationID,
ask.AllocationKey),
- askEvents:
schedEvt.NewAskEvents(events.GetEventSystem()),
- }
- // this is a safety check placeholder and task group name must be set
as a combo
- // order is important as task group can be set without placeholder but
not the other way around
- if a.placeholder && a.taskGroupName == "" {
- log.Log(log.SchedAllocation).Debug("ask cannot be a placeholder
without a TaskGroupName",
- zap.Stringer("SI ask", ask))
- return nil
- }
- return a
-}
-
-// NewAllocatoinFromSI Create a new Allocation which has already been placed
on a node, populating it with info from
+// NewAllocationFromSI Create a new Allocation which has already been placed
on a node, populating it with info from
// the SI Allocation object. If the input object is invalid, nil is returned.
func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
if alloc == nil {
@@ -162,6 +107,15 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation
{
createTime = time.Unix(siCreationTime, 0)
}
+ var allocated bool
+ var nodeID string
+ var bindTime time.Time
+ if alloc.NodeID != "" {
+ allocated = true
+ nodeID = alloc.NodeID
+ bindTime = time.Now()
+ }
+
return &Allocation{
allocationKey: alloc.AllocationKey,
applicationID: alloc.ApplicationID,
@@ -179,9 +133,9 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
resKeyPerNode: make(map[string]string),
resKeyWithoutNode:
reservationKeyWithoutNode(alloc.ApplicationID, alloc.AllocationKey),
askEvents:
schedEvt.NewAskEvents(events.GetEventSystem()),
- allocated: true,
- bindTime: time.Now(),
- nodeID: alloc.NodeID,
+ allocated: allocated,
+ nodeID: nodeID,
+ bindTime: bindTime,
}
}
diff --git a/pkg/scheduler/objects/allocation_test.go
b/pkg/scheduler/objects/allocation_test.go
index 3fe805c9..c41de0af 100644
--- a/pkg/scheduler/objects/allocation_test.go
+++ b/pkg/scheduler/objects/allocation_test.go
@@ -39,12 +39,12 @@ const past = 1640995200 // 2022-1-1 00:00:00
func TestNewAsk(t *testing.T) {
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
- siAsk := &si.AllocationAsk{
- AllocationKey: "ask-1",
- ApplicationID: "app-1",
- ResourceAsk: res.ToProto(),
+ siAsk := &si.Allocation{
+ AllocationKey: "ask-1",
+ ApplicationID: "app-1",
+ ResourcePerAlloc: res.ToProto(),
}
- ask := NewAllocationAskFromSI(siAsk)
+ ask := NewAllocationFromSI(siAsk)
if ask == nil {
t.Fatal("NewAllocationAskFromSI create failed while it should
not")
}
@@ -80,14 +80,14 @@ func TestGetCreateTime(t *testing.T) {
}
func TestPreemptionPolicy(t *testing.T) {
- ask1 := NewAllocationAskFromSI(&si.AllocationAsk{
+ ask1 := NewAllocationFromSI(&si.Allocation{
AllocationKey: "allow-self-deny-other",
ApplicationID: "allow-self-deny-other",
PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf: true,
AllowPreemptOther: false}})
assert.Check(t, ask1.IsAllowPreemptSelf(), "preempt self not allowed")
assert.Check(t, !ask1.IsAllowPreemptOther(), "preempt other allowed")
- ask2 := NewAllocationAskFromSI(&si.AllocationAsk{
+ ask2 := NewAllocationFromSI(&si.Allocation{
AllocationKey: "deny-self-allow-other",
ApplicationID: "deny-self-allow-other",
PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf: false,
AllowPreemptOther: true}})
@@ -96,12 +96,12 @@ func TestPreemptionPolicy(t *testing.T) {
}
func TestPreemptCheckTime(t *testing.T) {
- siAsk := &si.AllocationAsk{
+ siAsk := &si.Allocation{
AllocationKey: "ask1",
ApplicationID: "app1",
PartitionName: "default",
}
- ask := NewAllocationAskFromSI(siAsk)
+ ask := NewAllocationFromSI(siAsk)
assert.Equal(t, ask.GetPreemptCheckTime(), time.Time{},
"preemptCheckTime was not default")
now := time.Now().Add(-1 * time.Second)
@@ -110,28 +110,28 @@ func TestPreemptCheckTime(t *testing.T) {
}
func TestPlaceHolder(t *testing.T) {
- siAsk := &si.AllocationAsk{
+ siAsk := &si.Allocation{
AllocationKey: "ask1",
ApplicationID: "app1",
PartitionName: "default",
}
- ask := NewAllocationAskFromSI(siAsk)
+ ask := NewAllocationFromSI(siAsk)
assert.Assert(t, !ask.IsPlaceholder(), "standard ask should not be a
placeholder")
assert.Equal(t, ask.GetTaskGroup(), "", "standard ask should not have a
TaskGroupName")
- siAsk = &si.AllocationAsk{
+ siAsk = &si.Allocation{
AllocationKey: "ask1",
ApplicationID: "app1",
PartitionName: "default",
TaskGroupName: "",
Placeholder: true,
}
- ask = NewAllocationAskFromSI(siAsk)
+ ask = NewAllocationFromSI(siAsk)
var nilAsk *Allocation
assert.Equal(t, ask, nilAsk, "placeholder ask created without a
TaskGroupName")
siAsk.TaskGroupName = "TestPlaceHolder"
- ask = NewAllocationAskFromSI(siAsk)
+ ask = NewAllocationFromSI(siAsk)
assert.Assert(t, ask != nilAsk, "placeholder ask creation failed
unexpectedly")
assert.Assert(t, ask.IsPlaceholder(), "ask should have been a
placeholder")
assert.Equal(t, ask.GetTaskGroup(), siAsk.TaskGroupName, "TaskGroupName
not set as expected")
@@ -140,34 +140,34 @@ func TestPlaceHolder(t *testing.T) {
func TestGetRequiredNode(t *testing.T) {
tag := make(map[string]string)
// unset case
- siAsk := &si.AllocationAsk{
- AllocationKey: "ask1",
- ApplicationID: "app1",
- PartitionName: "default",
- Tags: tag,
+ siAsk := &si.Allocation{
+ AllocationKey: "ask1",
+ ApplicationID: "app1",
+ PartitionName: "default",
+ AllocationTags: tag,
}
- ask := NewAllocationAskFromSI(siAsk)
+ ask := NewAllocationFromSI(siAsk)
assert.Equal(t, ask.GetRequiredNode(), "", "required node is empty as
expected")
// set case
tag[siCommon.DomainYuniKorn+siCommon.KeyRequiredNode] = "NodeName"
- siAsk = &si.AllocationAsk{
- AllocationKey: "ask1",
- ApplicationID: "app1",
- PartitionName: "default",
- Tags: tag,
+ siAsk = &si.Allocation{
+ AllocationKey: "ask1",
+ ApplicationID: "app1",
+ PartitionName: "default",
+ AllocationTags: tag,
}
- ask = NewAllocationAskFromSI(siAsk)
+ ask = NewAllocationFromSI(siAsk)
assert.Equal(t, ask.GetRequiredNode(), "NodeName", "required node
should be NodeName")
}
func TestAllocationLog(t *testing.T) {
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
- siAsk := &si.AllocationAsk{
- AllocationKey: "ask-1",
- ApplicationID: "app-1",
- ResourceAsk: res.ToProto(),
+ siAsk := &si.Allocation{
+ AllocationKey: "ask-1",
+ ApplicationID: "app-1",
+ ResourcePerAlloc: res.ToProto(),
}
- ask := NewAllocationAskFromSI(siAsk)
+ ask := NewAllocationFromSI(siAsk)
// log a reservation event
ask.LogAllocationFailure("reserve1", false)
@@ -202,12 +202,12 @@ func TestAllocationLog(t *testing.T) {
func TestSendPredicateFailed(t *testing.T) {
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
- siAsk := &si.AllocationAsk{
- AllocationKey: "ask-1",
- ApplicationID: "app-1",
- ResourceAsk: res.ToProto(),
+ siAsk := &si.Allocation{
+ AllocationKey: "ask-1",
+ ApplicationID: "app-1",
+ ResourcePerAlloc: res.ToProto(),
}
- ask := NewAllocationAskFromSI(siAsk)
+ ask := NewAllocationFromSI(siAsk)
eventSystem := mock.NewEventSystemDisabled()
ask.askEvents = schedEvt.NewAskEvents(eventSystem)
ask.SendPredicatesFailedEvent(map[string]int{})
@@ -224,13 +224,13 @@ func TestSendPredicateFailed(t *testing.T) {
}
func TestCreateTime(t *testing.T) {
- siAsk := &si.AllocationAsk{
+ siAsk := &si.Allocation{
AllocationKey: "ask1",
ApplicationID: "app1",
PartitionName: "default",
}
- siAsk.Tags = map[string]string{siCommon.CreationTime: "1622530800"}
- ask := NewAllocationAskFromSI(siAsk)
+ siAsk.AllocationTags = map[string]string{siCommon.CreationTime:
"1622530800"}
+ ask := NewAllocationFromSI(siAsk)
assert.Equal(t, ask.GetTag(siCommon.CreationTime), "1622530800")
assert.Equal(t, ask.GetTag("unknown"), "")
assert.Equal(t, ask.GetCreateTime(), time.Unix(1622530800, 0))
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 350abf3a..b796f831 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -451,7 +451,7 @@ func (sa *Application) timeoutPlaceholderProcessing() {
zap.String("currentState", sa.CurrentState()),
zap.Error(err))
}
- sa.notifyRMAllocationAskReleased(sa.getAllRequestsInternal(),
si.TerminationType_TIMEOUT, "releasing placeholders asks on placeholder
timeout")
+ sa.notifyRMAllocationReleased(sa.getAllRequestsInternal(),
si.TerminationType_TIMEOUT, "releasing placeholders asks on placeholder
timeout")
sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
// all allocations are placeholders but GetAllAllocations is
locked and cannot be used
sa.notifyRMAllocationReleased(sa.getPlaceholderAllocations(),
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder
timeout")
@@ -1984,30 +1984,6 @@ func (sa *Application)
notifyRMAllocationReleased(released []*Allocation, termin
}
}
-// notifyRMAllocationAskReleased send an ask release event to the RM to if the
event handler is configured
-// and at least one ask has been released.
-// No locking must be called while holding the lock
-func (sa *Application) notifyRMAllocationAskReleased(released []*Allocation,
terminationType si.TerminationType, message string) {
- // only generate event if needed
- if len(released) == 0 || sa.rmEventHandler == nil {
- return
- }
- releaseEvent := &rmevent.RMReleaseAllocationAskEvent{
- ReleasedAllocationAsks: make([]*si.AllocationAskRelease, 0),
- RmID: sa.rmID,
- }
- for _, alloc := range released {
- releaseEvent.ReleasedAllocationAsks =
append(releaseEvent.ReleasedAllocationAsks, &si.AllocationAskRelease{
- ApplicationID: alloc.GetApplicationID(),
- PartitionName:
common.GetPartitionNameWithoutClusterID(sa.Partition),
- AllocationKey: alloc.GetAllocationKey(),
- TerminationType: terminationType,
- Message: message,
- })
- }
- sa.rmEventHandler.HandleEvent(releaseEvent)
-}
-
func (sa *Application) IsAllocationAssignedToApp(alloc *Allocation) bool {
sa.RLock()
defer sa.RUnlock()
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 9dfcdc56..055f89bf 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1426,14 +1426,14 @@ func TestReplaceAllocationTracking(t *testing.T) {
}
func TestTimeoutPlaceholderSoftStyle(t *testing.T) {
- runTimeoutPlaceholderTest(t, Resuming.String(), Soft)
+ runTimeoutPlaceholderTest(t, Resuming.String(), Soft, []int{1, 2})
}
func TestTimeoutPlaceholderAllocAsk(t *testing.T) {
- runTimeoutPlaceholderTest(t, Failing.String(), Hard)
+ runTimeoutPlaceholderTest(t, Failing.String(), Hard, []int{1, 2})
}
-func runTimeoutPlaceholderTest(t *testing.T, expectedState string,
gangSchedulingStyle string) {
+func runTimeoutPlaceholderTest(t *testing.T, expectedState string,
gangSchedulingStyle string, expectedReleases []int) {
setupUGM()
// create a fake queue
queue, err := createRootQueue(nil)
@@ -1487,14 +1487,12 @@ func runTimeoutPlaceholderTest(t *testing.T,
expectedState string, gangSchedulin
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
events := testHandler.GetEvents()
var found int
+ idx := 0
for _, event := range events {
if allocRelease, ok :=
event.(*rmevent.RMReleaseAllocationEvent); ok {
- assert.Equal(t, len(allocRelease.ReleasedAllocations),
2, "two allocations should have been released")
- found++
- }
- if askRelease, ok :=
event.(*rmevent.RMReleaseAllocationAskEvent); ok {
- assert.Equal(t, len(askRelease.ReleasedAllocationAsks),
1, "one allocation ask should have been released")
+ assert.Equal(t, len(allocRelease.ReleasedAllocations),
expectedReleases[idx], "wrong number of allocations released")
found++
+ idx++
}
}
// check if the Replaced of PlaceHolderData is 0
@@ -1568,9 +1566,6 @@ func TestTimeoutPlaceholderAllocReleased(t *testing.T) {
assert.Equal(t,
allocRelease.ReleasedAllocations[0].AllocationKey, ph.allocationKey, "wrong
placeholder allocation released on timeout")
found = true
}
- if _, ok := event.(*rmevent.RMReleaseAllocationAskEvent); ok {
- t.Fatal("unexpected release allocation ask event found
in list of events")
- }
}
assert.Assert(t, found, "release allocation event not found in list")
assert.Assert(t, resources.Equals(app.GetAllocatedResource(), res),
"Unexpected allocated resources for the app")
@@ -1619,9 +1614,6 @@ func TestTimeoutPlaceholderCompleting(t *testing.T) {
assert.Equal(t, len(allocRelease.ReleasedAllocations),
1, "one allocation should have been released")
found = true
}
- if _, ok := event.(*rmevent.RMReleaseAllocationAskEvent); ok {
- t.Fatal("unexpected release allocation ask event found
in list of events")
- }
}
assert.Assert(t, found, "release allocation event not found in list")
assert.Assert(t, app.IsCompleting(), "App should still be in completing
state")
diff --git a/pkg/scheduler/objects/preemption_test.go
b/pkg/scheduler/objects/preemption_test.go
index e928f207..6235b253 100644
--- a/pkg/scheduler/objects/preemption_test.go
+++ b/pkg/scheduler/objects/preemption_test.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/mock"
"github.com/apache/yunikorn-core/pkg/plugins"
+ "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
const alloc = "alloc"
@@ -1565,8 +1566,11 @@ func scoreMap(nodeID string, orig, self []bool)
map[string][]*Allocation {
}
func allocForScore(originator bool, allowPreemptSelf bool) *Allocation {
- ask := NewAllocationAsk("alloc1", appID1, resources.NewResource())
- ask.originator = originator
- ask.allowPreemptSelf = allowPreemptSelf
- return markAllocated(nodeID1, ask)
+ return NewAllocationFromSI(&si.Allocation{
+ AllocationKey: "alloc1",
+ ApplicationID: appID1,
+ Originator: originator,
+ NodeID: nodeID1,
+ PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf:
allowPreemptSelf},
+ })
}
diff --git a/pkg/scheduler/objects/required_node_preemptor_test.go
b/pkg/scheduler/objects/required_node_preemptor_test.go
index 8da78ebf..51b71493 100644
--- a/pkg/scheduler/objects/required_node_preemptor_test.go
+++ b/pkg/scheduler/objects/required_node_preemptor_test.go
@@ -30,17 +30,17 @@ import (
func createAllocationAsk(allocationKey string, app string, allowPreemption
bool, isOriginator bool, priority int32, res *resources.Resource) *Allocation {
tags := map[string]string{}
- siAsk := &si.AllocationAsk{
+ siAsk := &si.Allocation{
AllocationKey: allocationKey,
ApplicationID: app,
PartitionName: "default",
Priority: priority,
- ResourceAsk: res.ToProto(),
+ ResourcePerAlloc: res.ToProto(),
Originator: isOriginator,
PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf:
allowPreemption, AllowPreemptOther: true},
- Tags: tags,
+ AllocationTags: tags,
}
- ask := NewAllocationAskFromSI(siAsk)
+ ask := NewAllocationFromSI(siAsk)
return ask
}
diff --git a/pkg/scheduler/objects/utilities_test.go
b/pkg/scheduler/objects/utilities_test.go
index 6224d71c..5da6d7bb 100644
--- a/pkg/scheduler/objects/utilities_test.go
+++ b/pkg/scheduler/objects/utilities_test.go
@@ -240,16 +240,16 @@ func newAllocationAskTG(allocKey, appID, taskGroup
string, res *resources.Resour
}
func newAllocationAskAll(allocKey, appID, taskGroup string, res
*resources.Resource, placeholder bool, priority int32) *Allocation {
- ask := &si.AllocationAsk{
- AllocationKey: allocKey,
- ApplicationID: appID,
- PartitionName: "default",
- ResourceAsk: res.ToProto(),
- TaskGroupName: taskGroup,
- Placeholder: placeholder,
- Priority: priority,
+ ask := &si.Allocation{
+ AllocationKey: allocKey,
+ ApplicationID: appID,
+ PartitionName: "default",
+ ResourcePerAlloc: res.ToProto(),
+ TaskGroupName: taskGroup,
+ Placeholder: placeholder,
+ Priority: priority,
}
- return NewAllocationAskFromSI(ask)
+ return NewAllocationFromSI(ask)
}
func getTestUserGroup() security.UserGroup {
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 3a261822..214c0756 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -1113,65 +1113,170 @@ func (pc *PartitionContext) GetNodes() []*objects.Node
{
return pc.nodes.GetNodes()
}
-// Add an already bound allocation to the partition/node/application/queue.
-// Queue max allocation is not checked as the allocation came from the RM.
-//
+// UpdateAllocation adds or updates an Allocation. If the Allocation has no
NodeID specified, it is considered a
+// pending allocation and processed appropriate. This call is idempotent, and
can be called multiple times with the
+// same allocation (such as on change updates from the shim)
+// Upon successfully processing, two flags are returned: requestCreated (if a
new request was added) and allocCreated (if an allocation was satisifed).
+// This can be used by callers that need this information to take further
action.
// NOTE: this is a lock free call. It must NOT be called holding the
PartitionContext lock.
-func (pc *PartitionContext) AddAllocation(alloc *objects.Allocation) error {
+func (pc *PartitionContext) UpdateAllocation(alloc *objects.Allocation)
(requestCreated bool, allocCreated bool, err error) { //nolint:funlen
// cannot do anything with a nil alloc, should only happen if the shim
broke things badly
if alloc == nil {
- return nil
+ return false, false, nil
}
if pc.isStopped() {
- return fmt.Errorf("partition %s is stopped cannot add new
allocation %s", pc.Name, alloc.GetAllocationKey())
+ return false, false, fmt.Errorf("partition %s is stopped;
cannot process allocation %s", pc.Name, alloc.GetAllocationKey())
}
- log.Log(log.SchedPartition).Info("adding recovered allocation",
- zap.String("partitionName", pc.Name),
- zap.String("appID", alloc.GetApplicationID()),
- zap.String("allocationKey", alloc.GetAllocationKey()))
+ allocationKey := alloc.GetAllocationKey()
+ applicationID := alloc.GetApplicationID()
+ nodeID := alloc.GetNodeID()
- // Check if allocation violates any resource restriction, or allocate
on a
- // non-existent application or nodes.
- node := pc.GetNode(alloc.GetNodeID())
- if node == nil {
- metrics.GetSchedulerMetrics().IncSchedulingError()
- return fmt.Errorf("failed to find node %s", alloc.GetNodeID())
- }
+ log.Log(log.SchedPartition).Info("processing allocation",
+ zap.String("partitionName", pc.Name),
+ zap.String("appID", applicationID),
+ zap.String("allocationKey", allocationKey))
+ // find application
app := pc.getApplication(alloc.GetApplicationID())
if app == nil {
metrics.GetSchedulerMetrics().IncSchedulingError()
- return fmt.Errorf("failed to find application %s",
alloc.GetApplicationID())
+ return false, false, fmt.Errorf("failed to find application
%s", applicationID)
}
queue := app.GetQueue()
- // Do not check if the new allocation goes beyond the queue's max
resource (recursive).
- // still handle a returned error but they should never happen.
- if err := queue.IncAllocatedResource(alloc.GetAllocatedResource(),
true); err != nil {
- metrics.GetSchedulerMetrics().IncSchedulingError()
- return fmt.Errorf("cannot allocate resource from application
%s: %v ",
- alloc.GetApplicationID(), err)
+ // find node if one is specified
+ var node *objects.Node = nil
+ allocated := alloc.IsAllocated()
+ if allocated {
+ node = pc.GetNode(alloc.GetNodeID())
+ if node == nil {
+ metrics.GetSchedulerMetrics().IncSchedulingError()
+ return false, false, fmt.Errorf("failed to find node
%s", nodeID)
+ }
}
- metrics.GetQueueMetrics(queue.GetQueuePath()).IncAllocatedContainer()
- node.AddAllocation(alloc)
- alloc.SetInstanceType(node.GetInstanceType())
- app.RecoverAllocationAsk(alloc)
- app.AddAllocation(alloc)
+ // check to see if allocation exists already on app
+ existing := app.GetAllocationAsk(allocationKey)
- // track the number of allocations
- pc.updateAllocationCount(1)
- if alloc.IsPlaceholder() {
- pc.incPhAllocationCount()
+ // CASE 1/2: no existing allocation
+ if existing == nil {
+ // CASE 1: new request
+ if node == nil {
+ log.Log(log.SchedPartition).Info("handling new request",
+ zap.String("partitionName", pc.Name),
+ zap.String("appID", applicationID),
+ zap.String("allocationKey", allocationKey))
+
+ if err := app.AddAllocationAsk(alloc); err != nil {
+ log.Log(log.SchedPartition).Info("failed to add
request",
+ zap.String("partitionName", pc.Name),
+ zap.String("appID", applicationID),
+ zap.String("allocationKey",
allocationKey),
+ zap.Error(err))
+ return false, false, err
+ }
+
+ log.Log(log.SchedPartition).Info("added new request",
+ zap.String("partitionName", pc.Name),
+ zap.String("appID", applicationID),
+ zap.String("allocationKey", allocationKey))
+ return true, false, nil
+ }
+
+ // CASE 2: new allocation already assigned
+ log.Log(log.SchedPartition).Info("handling existing allocation",
+ zap.String("partitionName", pc.Name),
+ zap.String("appID", applicationID),
+ zap.String("allocationKey", allocationKey))
+
+ // Do not check if the new allocation goes beyond the queue's
max resource (recursive).
+ // still handle a returned error but they should never happen.
+ if err :=
queue.IncAllocatedResource(alloc.GetAllocatedResource(), true); err != nil {
+ metrics.GetSchedulerMetrics().IncSchedulingError()
+ return false, false, fmt.Errorf("cannot allocate
resource from application %s: %v ",
+ alloc.GetApplicationID(), err)
+ }
+
+
metrics.GetQueueMetrics(queue.GetQueuePath()).IncAllocatedContainer()
+ node.AddAllocation(alloc)
+ alloc.SetInstanceType(node.GetInstanceType())
+ app.RecoverAllocationAsk(alloc)
+ app.AddAllocation(alloc)
+ pc.updateAllocationCount(1)
+ if alloc.IsPlaceholder() {
+ pc.incPhAllocationCount()
+ }
+
+ log.Log(log.SchedPartition).Info("added existing allocation",
+ zap.String("partitionName", pc.Name),
+ zap.String("appID", applicationID),
+ zap.String("allocationKey", allocationKey),
+ zap.Bool("placeholder", alloc.IsPlaceholder()))
+ return false, true, nil
+ }
+
+ // CASE 3: updating an existing allocation
+ if existing.IsAllocated() {
+ // this is a placeholder for eventual resource updates; nothing
to do yet
+ log.Log(log.SchedPartition).Info("handling allocation update",
+ zap.String("partitionName", pc.Name),
+ zap.String("appID", applicationID),
+ zap.String("allocationKey", allocationKey))
+ return false, false, nil
}
- log.Log(log.SchedPartition).Info("recovered allocation",
+ // CASE 4: transitioning from requested to allocated
+ if allocated {
+ log.Log(log.SchedPartition).Info("handling allocation
placement",
+ zap.String("partitionName", pc.Name),
+ zap.String("appID", applicationID),
+ zap.String("allocationKey", allocationKey))
+
+ existing.SetNodeID(nodeID)
+ existing.SetBindTime(alloc.GetBindTime())
+ if _, err := app.AllocateAsk(allocationKey); err != nil {
+ log.Log(log.SchedPartition).Info("failed to allocate
ask for allocation placement",
+ zap.String("partitionName", pc.Name),
+ zap.String("appID", applicationID),
+ zap.String("allocationKey", allocationKey),
+ zap.Error(err))
+ return false, false, err
+ }
+
+ // Do not check if the new allocation goes beyond the queue's
max resource (recursive).
+ // still handle a returned error but they should never happen.
+ if err :=
queue.IncAllocatedResource(alloc.GetAllocatedResource(), true); err != nil {
+ metrics.GetSchedulerMetrics().IncSchedulingError()
+ return false, false, fmt.Errorf("cannot allocate
resource from application %s: %v ",
+ alloc.GetApplicationID(), err)
+ }
+
+
metrics.GetQueueMetrics(queue.GetQueuePath()).IncAllocatedContainer()
+ node.AddAllocation(existing)
+ existing.SetInstanceType(node.GetInstanceType())
+ app.AddAllocation(existing)
+ pc.updateAllocationCount(1)
+ if existing.IsPlaceholder() {
+ pc.incPhAllocationCount()
+ }
+
+ log.Log(log.SchedPartition).Info("external allocation placed",
+ zap.String("partitionName", pc.Name),
+ zap.String("appID", applicationID),
+ zap.String("allocationKey", allocationKey),
+ zap.Bool("placeholder", alloc.IsPlaceholder()))
+ return false, true, nil
+ }
+
+ // CASE 5: updating existing request
+ log.Log(log.SchedPartition).Info("handling request update",
zap.String("partitionName", pc.Name),
- zap.String("appID", alloc.GetApplicationID()),
- zap.String("allocationKey", alloc.GetAllocationKey()),
- zap.Bool("placeholder", alloc.IsPlaceholder()))
- return nil
+ zap.String("appID", applicationID),
+ zap.String("allocationKey", allocationKey))
+
+ // this is a placeholder for eventual resource updates; nothing to do
yet
+ return false, false, nil
}
func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced
bool) (security.UserGroup, error) {
@@ -1255,7 +1360,7 @@ func (pc *PartitionContext) generateReleased(release
*si.AllocationRelease, app
// removeAllocation removes the referenced allocation(s) from the applications
and nodes
// NOTE: this is a lock free call. It must NOT be called holding the
PartitionContext lock.
-func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease)
([]*objects.Allocation, *objects.Allocation) {
+func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease)
([]*objects.Allocation, *objects.Allocation) { //nolint:funlen
if release == nil {
return nil, nil
}
@@ -1375,48 +1480,13 @@ func (pc *PartitionContext) removeAllocation(release
*si.AllocationRelease) ([]*
if release.TerminationType == si.TerminationType_TIMEOUT ||
release.TerminationType == si.TerminationType_PREEMPTED_BY_SCHEDULER {
released = nil
}
- return released, confirmed
-}
-// Remove the allocation ask from the specified application
-// NOTE: this is a lock free call. It must NOT be called holding the
PartitionContext lock.
-func (pc *PartitionContext) removeAllocationAsk(release
*si.AllocationAskRelease) {
- if release == nil {
- return
+ if release.TerminationType != si.TerminationType_TIMEOUT {
+ // handle ask releases as well
+ _ = app.RemoveAllocationAsk(allocationKey)
}
- appID := release.ApplicationID
- allocKey := release.AllocationKey
- // A timeout termination is send by the core to the shim, ignore on
return.
- if release.TerminationType == si.TerminationType_TIMEOUT {
- log.Log(log.SchedPartition).Debug("Ignoring ask release with
termination type Timeout",
- zap.String("appID", appID),
- zap.String("ask", allocKey))
- return
- }
- app := pc.getApplication(appID)
- if app == nil {
- log.Log(log.SchedPartition).Info("Invalid ask release requested
by shim",
- zap.String("appID", appID),
- zap.String("ask", allocKey),
- zap.Stringer("terminationType",
release.TerminationType))
- return
- }
- // remove the allocation asks from the app
- _ = app.RemoveAllocationAsk(allocKey)
-}
-// Add the allocation ask to the specified application
-// NOTE: this is a lock free call. It must NOT be called holding the
PartitionContext lock.
-func (pc *PartitionContext) addAllocationAsk(siAsk *si.AllocationAsk) error {
- if siAsk == nil {
- return nil
- }
- app := pc.getApplication(siAsk.ApplicationID)
- if app == nil {
- return fmt.Errorf("failed to find application %s, for
allocation ask %s", siAsk.ApplicationID, siAsk.AllocationKey)
- }
- // add the allocation asks to the app
- return app.AddAllocationAsk(objects.NewAllocationAskFromSI(siAsk))
+ return released, confirmed
}
func (pc *PartitionContext) GetCurrentState() string {
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 24f51fad..bc46b72f 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -255,24 +255,22 @@ func TestRemoveNodeWithAllocations(t *testing.T) {
nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000000})
node := newNodeMaxResource(nodeID1, nodeRes)
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000})
- ask := newAllocationAsk("alloc-1", appID1, appRes)
- alloc := markAllocated(nodeID1, ask)
+ alloc := newAllocation("alloc-1", appID1, nodeID1, appRes)
allocAllocationKey := alloc.GetAllocationKey()
err = partition.AddNode(node)
assert.NilError(t, err, "add node to partition should not have failed")
- err = partition.AddAllocation(alloc)
+ _, allocCreated, err := partition.UpdateAllocation(alloc)
assert.NilError(t, err)
+ assert.Check(t, allocCreated)
// get what was allocated
allocated := node.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly")
assertLimits(t, getTestUserGroup(), appRes)
// add broken allocations
- ask = newAllocationAsk("alloc-na", "not-an-app", appRes)
- alloc = markAllocated(nodeID1, ask)
+ alloc = newAllocation("alloc-na", "not-an-app", nodeID1, appRes)
node.AddAllocation(alloc)
- ask = newAllocationAsk("alloc-2", appID1, appRes)
- alloc = markAllocated(nodeID1, ask)
+ alloc = newAllocation("alloc-2", appID1, nodeID1, appRes)
node.AddAllocation(alloc)
assertLimits(t, getTestUserGroup(), appRes)
@@ -303,12 +301,12 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
node1 := newNodeMaxResource(nodeID1, nodeRes)
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
- ask := newAllocationAskTG("placeholder", appID1, taskGroup, appRes,
true)
- ph := markAllocated(nodeID1, ask)
+ ph := newAllocationTG("placeholder", appID1, nodeID1, taskGroup,
appRes, true)
err = partition.AddNode(node1)
assert.NilError(t, err, "add node1 to partition should not have failed")
- err = partition.AddAllocation(ph)
+ _, allocCreated, err := partition.UpdateAllocation(ph)
assert.NilError(t, err)
+ assert.Check(t, allocCreated)
// get what was allocated
allocated := node1.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly to
node1 expected 1 got: %v", allocated)
@@ -317,7 +315,7 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
assert.Equal(t, 1, partition.getPhAllocationCount(), "number of active
placeholders")
// fake an ask that is used
- ask = newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, false)
+ ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1,
false)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should be added to app")
_, err = app.AllocateAsk(allocKey)
@@ -326,7 +324,7 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
assertLimits(t, getTestUserGroup(), appRes)
// add real allocation that is replacing the placeholder
- alloc := markAllocated(nodeID1, ask)
+ alloc := newAllocationAll(allocKey, appID1, nodeID1, taskGroup, appRes,
1, false)
alloc.SetRelease(ph)
// double link as if the replacement is ongoing
ph.SetRelease(alloc)
@@ -360,14 +358,14 @@ func TestCalculateNodesResourceUsage(t *testing.T) {
assert.NilError(t, err)
occupiedResources :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
- alloc := markAllocated(nodeID1, newAllocationAsk("key", "appID",
occupiedResources))
+ alloc := newAllocation("key", "appID", nodeID1, occupiedResources)
node.AddAllocation(alloc)
usageMap := partition.calculateNodesResourceUsage()
assert.Equal(t, node.GetAvailableResource().Resources["first"],
resources.Quantity(50))
assert.Equal(t, usageMap["first"][4], 1)
occupiedResources =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
- alloc = markAllocated(nodeID1, newAllocationAsk("key", "appID",
occupiedResources))
+ alloc = newAllocation("key", "appID", nodeID1, occupiedResources)
node.AddAllocation(alloc)
usageMap = partition.calculateNodesResourceUsage()
assert.Equal(t, node.GetAvailableResource().Resources["first"],
resources.Quantity(0))
@@ -411,15 +409,15 @@ func TestPlaceholderDataWithPlaceholderPreemption(t
*testing.T) {
newRes.MultiplyTo(4)
phRes.MultiplyTo(7)
- ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false)
- alloc := markAllocated(nodeID1, ask)
+ alloc := newAllocationAll("ask-1", appID1, nodeID1, taskGroup, appRes,
1, false)
node1 := newNodeMaxResource(nodeID1, newRes)
err = partition.AddNode(node1)
assert.NilError(t, err, "add node1 to partition should not have failed")
assert.Equal(t, 1, partition.nodes.GetNodeCount(), "node list not
correct")
- err = partition.AddAllocation(alloc)
+ _, allocCreated, err := partition.UpdateAllocation(alloc)
assert.NilError(t, err)
+ assert.Check(t, allocCreated)
// get what was allocated
allocated := node1.GetAllAllocations()
@@ -441,7 +439,7 @@ func TestPlaceholderDataWithPlaceholderPreemption(t
*testing.T) {
for i := 1; i <= 6; i++ {
// add an ask for a placeholder and allocate
lastPh = phID + strconv.Itoa(i)
- ask = newAllocationAskTG(lastPh, appID2, taskGroup, res, true)
+ ask := newAllocationAskTG(lastPh, appID2, taskGroup, res, true)
err = gangApp.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add placeholder ask %s to
app1", lastPh)
// try to allocate a placeholder via normal allocate
@@ -538,14 +536,14 @@ func TestPlaceholderDataWithNodeRemoval(t *testing.T) {
phRes.MultiplyTo(7)
// add a node with allocation: must have the correct app1 added already
- ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false)
- alloc := markAllocated(nodeID1, ask)
+ alloc := newAllocationAll("ask-1", appID1, nodeID1, taskGroup, appRes,
1, false)
node1 := newNodeMaxResource(nodeID1, newRes)
err = partition.AddNode(node1)
assert.NilError(t, err, "add node1 to partition should not have failed")
assert.Equal(t, 1, partition.nodes.GetNodeCount(), "node list not
correct")
- err = partition.AddAllocation(alloc)
+ _, allocCreated, err := partition.UpdateAllocation(alloc)
assert.NilError(t, err)
+ assert.Check(t, allocCreated)
// get what was allocated
allocated := node1.GetAllAllocations()
@@ -565,7 +563,7 @@ func TestPlaceholderDataWithNodeRemoval(t *testing.T) {
for i := 1; i <= 6; i++ {
// add an ask for a placeholder and allocate
- ask = newAllocationAskTG(phID+strconv.Itoa(i+1), appID2,
taskGroup, res, true)
+ ask := newAllocationAskTG(phID+strconv.Itoa(i+1), appID2,
taskGroup, res, true)
err = gangApp.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add placeholder ask ph-1 to
app1")
// try to allocate a placeholder via normal allocate
@@ -577,7 +575,7 @@ func TestPlaceholderDataWithNodeRemoval(t *testing.T) {
// add an ask for a last placeholder and allocate
lastPh := phID + strconv.Itoa(7)
- ask = newAllocationAskTG(lastPh, appID2, taskGroup, res, true)
+ ask := newAllocationAskTG(lastPh, appID2, taskGroup, res, true)
err = gangApp.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add placeholder ask ph-1 to app1")
@@ -621,15 +619,15 @@ func TestPlaceholderDataWithRemoval(t *testing.T) {
phRes.MultiplyTo(7)
// add a node with allocation: must have the correct app1 added already
- ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false)
- alloc := markAllocated(nodeID1, ask)
+ alloc := newAllocationAll("ask-1", appID1, nodeID1, taskGroup, appRes,
1, false)
node1 := newNodeMaxResource(nodeID1, newRes)
err = partition.AddNode(node1)
assert.NilError(t, err, "add node1 to partition should not have failed")
assert.Equal(t, 1, partition.nodes.GetNodeCount(), "node list not
correct")
- err = partition.AddAllocation(alloc)
+ _, allocCreated, err := partition.UpdateAllocation(alloc)
assert.NilError(t, err)
+ assert.Check(t, allocCreated)
// get what was allocated
allocated := node1.GetAllAllocations()
@@ -650,7 +648,7 @@ func TestPlaceholderDataWithRemoval(t *testing.T) {
var lastPhAllocationKey string
for i := 1; i <= 6; i++ {
// add an ask for a placeholder and allocate
- ask = newAllocationAskTG(phID+strconv.Itoa(i+1), appID2,
taskGroup, res, true)
+ ask := newAllocationAskTG(phID+strconv.Itoa(i+1), appID2,
taskGroup, res, true)
err = gangApp.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add placeholder ask ph-1 to
app1")
// try to allocate a placeholder via normal allocate
@@ -663,7 +661,7 @@ func TestPlaceholderDataWithRemoval(t *testing.T) {
// add an ask for a last placeholder and allocate
lastPh := phID + strconv.Itoa(7)
- ask = newAllocationAskTG(lastPh, appID2, taskGroup, res, true)
+ ask := newAllocationAskTG(lastPh, appID2, taskGroup, res, true)
err = gangApp.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add placeholder ask ph-1 to app1")
@@ -707,12 +705,13 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
node1 := newNodeMaxResource(nodeID1, nodeRes)
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
- ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 1,
true)
- ph := markAllocated(nodeID1, ask)
+ ph := newAllocationAll("placeholder", appID1, nodeID1, taskGroup,
appRes, 1, true)
err = partition.AddNode(node1)
assert.NilError(t, err, "add node1 to partition should not have failed")
- err = partition.AddAllocation(ph)
+ _, allocCreated, err := partition.UpdateAllocation(ph)
assert.NilError(t, err)
+ assert.Check(t, allocCreated)
+
// get what was allocated
allocated := node1.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly to
node1")
@@ -723,7 +722,7 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not
updated as expected")
// fake an ask that is used
- ask = newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, false)
+ ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1,
false)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should be added to app")
_, err = app.AllocateAsk(allocKey)
@@ -731,7 +730,7 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app
should not have pending resources")
// add real allocation that is replacing the placeholder on the 2nd node
- alloc := markAllocated(nodeID2, ask)
+ alloc := newAllocationAll(allocKey, appID1, nodeID2, taskGroup, appRes,
1, false)
alloc.SetRelease(ph)
node2.AddAllocation(alloc)
allocated = node2.GetAllAllocations()
@@ -778,12 +777,12 @@ func TestRemoveNodeWithReal(t *testing.T) {
nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
node1 := newNodeMaxResource(nodeID1, nodeRes)
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
- ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 1,
true)
- ph := markAllocated(nodeID1, ask)
+ ph := newAllocationAll("placeholder", appID1, nodeID1, taskGroup,
appRes, 1, true)
err = partition.AddNode(node1)
assert.NilError(t, err, "add node1 to partition should not have failed")
- err = partition.AddAllocation(ph)
+ _, allocCreated, err := partition.UpdateAllocation(ph)
assert.NilError(t, err)
+ assert.Check(t, allocCreated)
// get what was allocated
allocated := node1.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly to
node1")
@@ -794,7 +793,7 @@ func TestRemoveNodeWithReal(t *testing.T) {
assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not
updated as expected")
// fake an ask that is used
- ask = newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, false)
+ ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1,
false)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should be added to app")
_, err = app.AllocateAsk(allocKey)
@@ -802,7 +801,7 @@ func TestRemoveNodeWithReal(t *testing.T) {
assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app
should not have pending resources")
// add real allocation that is replacing the placeholder on the 2nd node
- alloc := markAllocated(nodeID2, ask)
+ alloc := newAllocationAll(allocKey, appID1, nodeID2, taskGroup, appRes,
1, false)
alloc.SetRelease(ph)
node2.AddAllocation(alloc)
allocated = node2.GetAllAllocations()
@@ -1034,10 +1033,10 @@ func TestRemoveApp(t *testing.T) {
setupNode(t, nodeID1, partition,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000000}))
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000})
- ask := newAllocationAsk("alloc-nr", appNotRemoved, appRes)
- alloc := markAllocated(nodeID1, ask)
- err = partition.AddAllocation(alloc)
+ alloc := newAllocation("alloc-nr", appNotRemoved, nodeID1, appRes)
+ _, allocCreated, err := partition.UpdateAllocation(alloc)
assert.NilError(t, err, "add allocation to partition should not have
failed")
+ assert.Check(t, allocCreated, "alloc should have been created")
assertLimits(t, getTestUserGroup(), appRes)
allocs := partition.removeApplication("does_not_exist")
@@ -1061,10 +1060,10 @@ func TestRemoveApp(t *testing.T) {
err = partition.AddApplication(app)
assert.NilError(t, err, "add application to partition should not have
failed")
- ask = newAllocationAsk("alloc-1", appID1, appRes)
- alloc = markAllocated(nodeID1, ask)
- err = partition.AddAllocation(alloc)
+ alloc = newAllocation("alloc-1", appID1, nodeID1, appRes)
+ _, allocCreated, err = partition.UpdateAllocation(alloc)
assert.NilError(t, err, "add allocation to partition should not have
failed")
+ assert.Check(t, allocCreated)
assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2))
// remove the newly added app
@@ -1095,17 +1094,17 @@ func TestRemoveAppAllocs(t *testing.T) {
setupNode(t, nodeID1, partition,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000000}))
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000})
- ask := newAllocationAsk("alloc-nr", appNotRemoved, appRes)
- alloc := markAllocated(nodeID1, ask)
- err = partition.AddAllocation(alloc)
+ alloc := newAllocation("alloc-nr", appNotRemoved, nodeID1, appRes)
+ _, allocCreated, err := partition.UpdateAllocation(alloc)
assert.NilError(t, err, "add allocation to partition should not have
failed")
+ assert.Check(t, allocCreated)
assertLimits(t, getTestUserGroup(), appRes)
- ask = newAllocationAsk("alloc-1", appNotRemoved, appRes)
allocationKey := "alloc-1"
- alloc = markAllocated(nodeID1, ask)
- err = partition.AddAllocation(alloc)
+ alloc = newAllocation("alloc-1", appNotRemoved, nodeID1, appRes)
+ _, allocCreated, err = partition.UpdateAllocation(alloc)
assert.NilError(t, err, "add allocation to partition should not have
failed")
+ assert.Check(t, allocCreated, "alloc should have been created")
assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2))
release := &si.AllocationRelease{
PartitionName: "default",
@@ -1156,14 +1155,14 @@ func TestRemoveAllPlaceholderAllocs(t *testing.T) {
res, err := resources.NewResourceFromConf(map[string]string{"vcore":
"10"})
assert.NilError(t, err, "failed to create resource")
- phAsk1 := newAllocationAskTG(phID, appID1, taskGroup, res, true)
- phAlloc1 := markAllocated(nodeID1, phAsk1)
- err = partition.AddAllocation(phAlloc1)
+ phAlloc1 := newAllocationTG(phID, appID1, nodeID1, taskGroup, res, true)
+ _, allocCreated, err := partition.UpdateAllocation(phAlloc1)
assert.NilError(t, err, "could not add allocation to partition")
- phAsk2 := newAllocationAskTG(phID2, appID1, taskGroup, res, true)
- phAlloc2 := markAllocated(nodeID1, phAsk2)
- err = partition.AddAllocation(phAlloc2)
+ assert.Check(t, allocCreated)
+ phAlloc2 := newAllocationTG(phID2, appID1, nodeID1, taskGroup, res,
true)
+ _, allocCreated, err = partition.UpdateAllocation(phAlloc2)
assert.NilError(t, err, "could not add allocation to partition")
+ assert.Check(t, allocCreated)
partition.removeAllocation(&si.AllocationRelease{
PartitionName: "default",
ApplicationID: appID1,
@@ -3517,16 +3516,18 @@ func TestFailReplacePlaceholder(t *testing.T) {
assertLimits(t, getTestUserGroup(), res)
}
-func TestAddAllocationAsk(t *testing.T) {
+func TestUpdateAllocationWithAsk(t *testing.T) {
setupUGM()
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
- err = partition.addAllocationAsk(nil)
+ askCreated, _, err := partition.UpdateAllocation(nil)
assert.NilError(t, err, "nil ask should not return an error")
- err = partition.addAllocationAsk(&si.AllocationAsk{})
+ assert.Check(t, !askCreated, "ask should not have been created")
+ askCreated, _, err =
partition.UpdateAllocation(objects.NewAllocationFromSI(&si.Allocation{}))
if err == nil {
t.Fatal("empty ask should have returned application not found
error")
}
+ assert.Check(t, !askCreated, "ask should not have been creatd")
// add the app to add an ask to
app := newApplication(appID1, "default", "root.default")
@@ -3537,13 +3538,14 @@ func TestAddAllocationAsk(t *testing.T) {
res, err = resources.NewResourceFromConf(map[string]string{"vcore":
"10"})
assert.NilError(t, err, "failed to create resource")
askKey := "ask-key-1"
- ask := si.AllocationAsk{
- AllocationKey: askKey,
- ApplicationID: appID1,
- ResourceAsk: res.ToProto(),
+ ask := si.Allocation{
+ AllocationKey: askKey,
+ ApplicationID: appID1,
+ ResourcePerAlloc: res.ToProto(),
}
- err = partition.addAllocationAsk(&ask)
+ askCreated, _, err =
partition.UpdateAllocation(objects.NewAllocationFromSI(&ask))
assert.NilError(t, err, "failed to add ask to app")
+ assert.Check(t, askCreated, "ask should have been created")
if !resources.Equals(app.GetPendingResource(), res) {
t.Fatal("app not updated by adding ask, no error thrown")
}
@@ -3567,15 +3569,15 @@ func TestRemoveAllocationAsk(t *testing.T) {
assert.NilError(t, err, "failed to ask to application")
// we should not panic on nil
- partition.removeAllocationAsk(nil)
+ partition.removeAllocation(nil)
// we don't care about the partition name as we test just the partition
code
- release := &si.AllocationAskRelease{
+ release := &si.AllocationRelease{
ApplicationID: "fake-app",
AllocationKey: askKey,
TerminationType: si.TerminationType_STOPPED_BY_RM,
}
// unknown app should do nothing
- partition.removeAllocationAsk(release)
+ partition.removeAllocation(release)
if !resources.Equals(app.GetPendingResource(), res) {
t.Fatal("wrong app updated removing ask")
}
@@ -3583,7 +3585,7 @@ func TestRemoveAllocationAsk(t *testing.T) {
// known app, unknown ask no change
release.ApplicationID = appID1
release.AllocationKey = "fake"
- partition.removeAllocationAsk(release)
+ partition.removeAllocation(release)
if !resources.Equals(app.GetPendingResource(), res) {
t.Fatal("app updated removing unknown ask")
}
@@ -3591,14 +3593,14 @@ func TestRemoveAllocationAsk(t *testing.T) {
// known app, known ask, ignore timeout as it originates in the core
release.AllocationKey = askKey
release.TerminationType = si.TerminationType_TIMEOUT
- partition.removeAllocationAsk(release)
+ partition.removeAllocation(release)
if !resources.Equals(app.GetPendingResource(), res) {
t.Fatal("app updated removing timed out ask, should not have
changed")
}
// correct remove of a known ask
release.TerminationType = si.TerminationType_STOPPED_BY_RM
- partition.removeAllocationAsk(release)
+ partition.removeAllocation(release)
assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app
should not have pending asks")
assertLimits(t, getTestUserGroup(), nil)
}
@@ -4429,27 +4431,30 @@ func TestCalculateOutstandingRequests(t *testing.T) {
"vcores": 1,
"memory": 1,
})
- siAsk1 := &si.AllocationAsk{
- AllocationKey: "ask-uuid-1",
- ApplicationID: appID1,
- ResourceAsk: askResource.ToProto(),
- }
- siAsk2 := &si.AllocationAsk{
- AllocationKey: "ask-uuid-2",
- ApplicationID: appID1,
- ResourceAsk: askResource.ToProto(),
- }
- siAsk3 := &si.AllocationAsk{
- AllocationKey: "ask-uuid-3",
- ApplicationID: appID2,
- ResourceAsk: askResource.ToProto(),
- }
- err = partition.addAllocationAsk(siAsk1)
+ siAsk1 := &si.Allocation{
+ AllocationKey: "ask-uuid-1",
+ ApplicationID: appID1,
+ ResourcePerAlloc: askResource.ToProto(),
+ }
+ siAsk2 := &si.Allocation{
+ AllocationKey: "ask-uuid-2",
+ ApplicationID: appID1,
+ ResourcePerAlloc: askResource.ToProto(),
+ }
+ siAsk3 := &si.Allocation{
+ AllocationKey: "ask-uuid-3",
+ ApplicationID: appID2,
+ ResourcePerAlloc: askResource.ToProto(),
+ }
+ addedAsk, _, err :=
partition.UpdateAllocation(objects.NewAllocationFromSI(siAsk1))
assert.NilError(t, err)
- err = partition.addAllocationAsk(siAsk2)
+ assert.Check(t, addedAsk)
+ addedAsk, _, err =
partition.UpdateAllocation(objects.NewAllocationFromSI(siAsk2))
assert.NilError(t, err)
- err = partition.addAllocationAsk(siAsk3)
+ assert.Check(t, addedAsk)
+ addedAsk, _, err =
partition.UpdateAllocation(objects.NewAllocationFromSI(siAsk3))
assert.NilError(t, err)
+ assert.Check(t, addedAsk)
requests = partition.calculateOutstandingRequests()
assert.Equal(t, 0, len(requests))
@@ -4491,12 +4496,12 @@ func
TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) {
nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
node1 := newNodeMaxResource(nodeID1, nodeRes)
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
- phAsk := newAllocationAskTG("placeholder", appID1, taskGroup, appRes,
true)
- ph := markAllocated(nodeID1, phAsk)
+ ph := newAllocationTG("placeholder", appID1, nodeID1, taskGroup,
appRes, true)
err = partition.AddNode(node1)
assert.NilError(t, err)
- err = partition.AddAllocation(ph)
+ _, allocCreated, err := partition.UpdateAllocation(ph)
assert.NilError(t, err)
+ assert.Check(t, allocCreated)
// add a placeholder ask with a different taskgroup
phAsk2 := newAllocationAskTG("placeholder2", appID1, "tg-2", appRes,
true)
diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go
index dac29a04..d3108dba 100644
--- a/pkg/scheduler/scheduler_test.go
+++ b/pkg/scheduler/scheduler_test.go
@@ -23,6 +23,7 @@ import (
"gotest.tools/v3/assert"
"github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -45,27 +46,30 @@ func TestInspectOutstandingRequests(t *testing.T) {
"vcores": 1,
"memory": 1,
})
- siAsk1 := &si.AllocationAsk{
- AllocationKey: "ask-uuid-1",
- ApplicationID: appID1,
- ResourceAsk: askResource.ToProto(),
+ siAsk1 := &si.Allocation{
+ AllocationKey: "ask-uuid-1",
+ ApplicationID: appID1,
+ ResourcePerAlloc: askResource.ToProto(),
}
- siAsk2 := &si.AllocationAsk{
- AllocationKey: "ask-uuid-2",
- ApplicationID: appID1,
- ResourceAsk: askResource.ToProto(),
+ siAsk2 := &si.Allocation{
+ AllocationKey: "ask-uuid-2",
+ ApplicationID: appID1,
+ ResourcePerAlloc: askResource.ToProto(),
}
- siAsk3 := &si.AllocationAsk{
- AllocationKey: "ask-uuid-3",
- ApplicationID: appID2,
- ResourceAsk: askResource.ToProto(),
+ siAsk3 := &si.Allocation{
+ AllocationKey: "ask-uuid-3",
+ ApplicationID: appID2,
+ ResourcePerAlloc: askResource.ToProto(),
}
- err = partition.addAllocationAsk(siAsk1)
+ askCreated, _, err :=
partition.UpdateAllocation(objects.NewAllocationFromSI(siAsk1))
assert.NilError(t, err)
- err = partition.addAllocationAsk(siAsk2)
+ assert.Check(t, askCreated)
+ askCreated, _, err =
partition.UpdateAllocation(objects.NewAllocationFromSI(siAsk2))
assert.NilError(t, err)
- err = partition.addAllocationAsk(siAsk3)
+ assert.Check(t, askCreated)
+ askCreated, _, err =
partition.UpdateAllocation(objects.NewAllocationFromSI(siAsk3))
assert.NilError(t, err)
+ assert.Check(t, askCreated)
// mark asks as attempted
expectedTotal :=
resources.NewResourceFromMap(map[string]resources.Quantity{
diff --git a/pkg/scheduler/tests/application_tracking_test.go
b/pkg/scheduler/tests/application_tracking_test.go
index cda2a1ce..11d6a943 100644
--- a/pkg/scheduler/tests/application_tracking_test.go
+++ b/pkg/scheduler/tests/application_tracking_test.go
@@ -111,10 +111,10 @@ func TestApplicationHistoryTracking(t *testing.T) {
// Add allocation ask & check events
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -166,7 +166,7 @@ func TestApplicationHistoryTracking(t *testing.T) {
eventsDao, err = client.GetBatchEvents()
assert.NilError(t, err)
- assert.Equal(t, 17, len(eventsDao.EventRecords), "number of events
generated")
+ assert.Equal(t, 18, len(eventsDao.EventRecords), "number of events
generated")
verifyAllocationCancelledEvents(t, eventsDao.EventRecords[13:])
events, _ = getEventsFromStream(t, false, stream, 4)
assert.NilError(t, err)
diff --git a/pkg/scheduler/tests/mockscheduler_test.go
b/pkg/scheduler/tests/mockscheduler_test.go
index 51289c1a..95614044 100644
--- a/pkg/scheduler/tests/mockscheduler_test.go
+++ b/pkg/scheduler/tests/mockscheduler_test.go
@@ -147,17 +147,17 @@ func (m *mockScheduler) removeApp(appID, partition
string) error {
}
func (m *mockScheduler) addAppRequest(appID, allocKeyPrefix string, resource
*si.Resource, repeat int) error {
- asks := make([]*si.AllocationAsk, repeat)
+ asks := make([]*si.Allocation, repeat)
for i := 0; i < repeat; i++ {
- asks[i] = &si.AllocationAsk{
- AllocationKey: fmt.Sprintf("%s-%d", allocKeyPrefix, i),
- ApplicationID: appID,
- ResourceAsk: resource,
+ asks[i] = &si.Allocation{
+ AllocationKey: fmt.Sprintf("%s-%d", allocKeyPrefix,
i),
+ ApplicationID: appID,
+ ResourcePerAlloc: resource,
}
}
return m.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: asks,
- RmID: m.rmID,
+ Allocations: asks,
+ RmID: m.rmID,
})
}
@@ -176,21 +176,6 @@ func (m *mockScheduler) releaseAllocRequest(appID,
allocationKey string) error {
})
}
-func (m *mockScheduler) releaseAskRequest(appID, allocKey string) error {
- return m.proxy.UpdateAllocation(&si.AllocationRequest{
- Releases: &si.AllocationReleasesRequest{
- AllocationAsksToRelease: []*si.AllocationAskRelease{
- {
- ApplicationID: appID,
- AllocationKey: allocKey,
- PartitionName: m.partitionName,
- },
- },
- },
- RmID: m.rmID,
- })
-}
-
// simple wrapper to limit the repeating code getting the queue
func (m *mockScheduler) getNode(nodeName string) *objects.Node {
return m.scheduler.GetClusterContext().GetNode(nodeName,
m.partitionName)
diff --git a/pkg/scheduler/tests/operation_test.go
b/pkg/scheduler/tests/operation_test.go
index 17f51a81..98e3e742 100644
--- a/pkg/scheduler/tests/operation_test.go
+++ b/pkg/scheduler/tests/operation_test.go
@@ -89,10 +89,10 @@ partitions:
// App asks for 2 allocations
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -102,7 +102,7 @@ partitions:
},
{
AllocationKey: "alloc-2",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -227,10 +227,10 @@ partitions:
// App asks for 2 allocations
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -240,7 +240,7 @@ partitions:
},
{
AllocationKey: "alloc-2",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
diff --git a/pkg/scheduler/tests/performance_test.go
b/pkg/scheduler/tests/performance_test.go
index 9cda5bf8..0dbf2eb8 100644
--- a/pkg/scheduler/tests/performance_test.go
+++ b/pkg/scheduler/tests/performance_test.go
@@ -127,11 +127,11 @@ partitions:
// Request pods
app1NumPods := numPods / 2
- app1Asks := make([]*si.AllocationAsk, app1NumPods)
+ app1Asks := make([]*si.Allocation, app1NumPods)
for i := 0; i < app1NumPods; i++ {
- app1Asks[i] = &si.AllocationAsk{
+ app1Asks[i] = &si.Allocation{
AllocationKey: fmt.Sprintf("alloc-1-%d", i),
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: int64(requestMem)},
"vcore": {Value: int64(requestVcore)},
@@ -141,19 +141,19 @@ partitions:
}
}
err = proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: app1Asks,
- RmID: "rm:123",
+ Allocations: app1Asks,
+ RmID: "rm:123",
})
if err != nil {
b.Error(err.Error())
}
app2NumPods := numPods - app1NumPods
- app2Asks := make([]*si.AllocationAsk, app2NumPods)
+ app2Asks := make([]*si.Allocation, app2NumPods)
for i := 0; i < app2NumPods; i++ {
- app2Asks[i] = &si.AllocationAsk{
+ app2Asks[i] = &si.Allocation{
AllocationKey: fmt.Sprintf("alloc-2-%d", i),
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: int64(requestMem)},
"vcore": {Value: int64(requestVcore)},
@@ -163,8 +163,8 @@ partitions:
}
}
err = proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: app2Asks,
- RmID: "rm:123",
+ Allocations: app2Asks,
+ RmID: "rm:123",
})
if err != nil {
b.Error(err.Error())
diff --git a/pkg/scheduler/tests/plugin_test.go
b/pkg/scheduler/tests/plugin_test.go
index 4eb4062e..d0186b44 100644
--- a/pkg/scheduler/tests/plugin_test.go
+++ b/pkg/scheduler/tests/plugin_test.go
@@ -89,10 +89,10 @@ partitions:
// now submit a request, that uses 8/10 memory from the node
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 8},
},
@@ -113,10 +113,10 @@ partitions:
// - queue has plenty of resources
// we expect the plugin to be called to trigger an update
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-2",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 5},
},
diff --git a/pkg/scheduler/tests/recovery_test.go
b/pkg/scheduler/tests/recovery_test.go
index e1393999..eeb9b8f4 100644
--- a/pkg/scheduler/tests/recovery_test.go
+++ b/pkg/scheduler/tests/recovery_test.go
@@ -126,10 +126,10 @@ func TestSchedulerRecovery(t *testing.T) {
assert.Equal(t, app01.CurrentState(), objects.New.String())
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-0",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10},
"vcore": {Value: 1},
@@ -139,7 +139,7 @@ func TestSchedulerRecovery(t *testing.T) {
},
{
AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10},
"vcore": {Value: 1},
@@ -182,12 +182,12 @@ func TestSchedulerRecovery(t *testing.T) {
[]string{"node-1:1234", "node-2:1234"}, 20, 1000)
// ask for 4 more allocations
- asks := make([]*si.AllocationAsk, 4)
+ asks := make([]*si.Allocation, 4)
mem := [4]int64{50, 100, 50, 100}
for i := 0; i < 4; i++ {
- asks[i] = &si.AllocationAsk{
+ asks[i] = &si.Allocation{
AllocationKey: fmt.Sprintf("alloc-%d", i+2),
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: mem[i]},
"vcore": {Value: 5},
@@ -197,8 +197,8 @@ func TestSchedulerRecovery(t *testing.T) {
}
}
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: asks,
- RmID: "rm:123",
+ Allocations: asks,
+ RmID: "rm:123",
})
assert.NilError(t, err, "UpdateRequest further alloc on existing app
failed")
@@ -389,10 +389,10 @@ func TestSchedulerRecovery2Allocations(t *testing.T) {
assert.Equal(t, app01.CurrentState(), objects.New.String())
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10},
"vcore": {Value: 1},
@@ -402,7 +402,7 @@ func TestSchedulerRecovery2Allocations(t *testing.T) {
},
{
AllocationKey: "alloc-2",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10},
"vcore": {Value: 1},
@@ -823,10 +823,10 @@ partitions:
assert.Assert(t, appQueue != nil, "application queue was not created")
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10},
"vcore": {Value: 1},
@@ -836,7 +836,7 @@ partitions:
},
{
AllocationKey: "alloc-2",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10},
"vcore": {Value: 1},
@@ -1047,10 +1047,10 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
// Add a new placeholder ask with a different task group
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "ph-alloc-2",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10},
"vcore": {Value: 1},
@@ -1068,10 +1068,10 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
// Add two real asks
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "real-alloc-1",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10},
"vcore": {Value: 1},
@@ -1082,7 +1082,7 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
},
{
AllocationKey: "real-alloc-2",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10},
"vcore": {Value: 1},
@@ -1146,10 +1146,13 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
func registerAllocations(partition *scheduler.PartitionContext, allocs
[]*si.Allocation) error {
for _, alloc := range allocs {
- err :=
partition.AddAllocation(objects.NewAllocationFromSI(alloc))
+ _, allocCreated, err :=
partition.UpdateAllocation(objects.NewAllocationFromSI(alloc))
if err != nil {
return err
}
+ if !allocCreated {
+ return fmt.Errorf("no alloc created")
+ }
}
return nil
}
diff --git a/pkg/scheduler/tests/reservation_test.go
b/pkg/scheduler/tests/reservation_test.go
index 442d94ea..4a037572 100644
--- a/pkg/scheduler/tests/reservation_test.go
+++ b/pkg/scheduler/tests/reservation_test.go
@@ -413,7 +413,7 @@ func TestUnReservationAndDeletion(t *testing.T) {
// delete pending asks
for _, ask := range app.GetReservations() {
askID := ask[strings.Index(ask, "|")+1:]
- err = ms.releaseAskRequest(appID1, askID)
+ err = ms.releaseAllocRequest(appID1, askID)
assert.NilError(t, err, "ask release update failed")
}
// delete existing allocations
diff --git a/pkg/scheduler/tests/smoke_test.go
b/pkg/scheduler/tests/smoke_test.go
index 546e8ed6..97f6a713 100644
--- a/pkg/scheduler/tests/smoke_test.go
+++ b/pkg/scheduler/tests/smoke_test.go
@@ -235,10 +235,10 @@ func TestBasicScheduler(t *testing.T) {
assert.Equal(t, app01.CurrentState(), objects.New.String())
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-1a",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -248,7 +248,7 @@ func TestBasicScheduler(t *testing.T) {
},
{
AllocationKey: "alloc-1b",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -290,10 +290,10 @@ func TestBasicScheduler(t *testing.T) {
// ask for 4 more tasks
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-2a",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 50000000},
"vcore": {Value: 5000},
@@ -303,7 +303,7 @@ func TestBasicScheduler(t *testing.T) {
},
{
AllocationKey: "alloc-2b",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 50000000},
"vcore": {Value: 5000},
@@ -313,7 +313,7 @@ func TestBasicScheduler(t *testing.T) {
},
{
AllocationKey: "alloc-3a",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 5000},
@@ -323,7 +323,7 @@ func TestBasicScheduler(t *testing.T) {
},
{
AllocationKey: "alloc-3b",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 5000},
@@ -395,7 +395,7 @@ func TestBasicScheduler(t *testing.T) {
// Release asks
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Releases: &si.AllocationReleasesRequest{
- AllocationAsksToRelease: []*si.AllocationAskRelease{
+ AllocationsToRelease: []*si.AllocationRelease{
{
ApplicationID: appID1,
PartitionName: "default",
@@ -463,11 +463,11 @@ func TestBasicSchedulerAutoAllocation(t *testing.T) {
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
- asks := make([]*si.AllocationAsk, 20)
+ asks := make([]*si.Allocation, 20)
for i := 0; i < 20; i++ {
- asks[i] = &si.AllocationAsk{
+ asks[i] = &si.Allocation{
AllocationKey: fmt.Sprintf("alloc-%d", i),
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -477,8 +477,8 @@ func TestBasicSchedulerAutoAllocation(t *testing.T) {
}
}
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: asks,
- RmID: "rm:123",
+ Allocations: asks,
+ RmID: "rm:123",
})
assert.NilError(t, err, "AllocationRequest 2 failed")
@@ -574,12 +574,12 @@ partitions:
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
- asks := make([]*si.AllocationAsk, 40)
+ asks := make([]*si.Allocation, 40)
appIDs := []string{app1ID, app2ID}
for i := 0; i < 40; i++ {
- asks[i] = &si.AllocationAsk{
+ asks[i] = &si.Allocation{
AllocationKey: fmt.Sprintf("alloc-%d", i),
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -589,8 +589,8 @@ partitions:
}
}
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: asks,
- RmID: "rm:123",
+ Allocations: asks,
+ RmID: "rm:123",
})
assert.NilError(t, err, "AllocationRequest 2 failed")
@@ -686,12 +686,12 @@ partitions:
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
- asks := make([]*si.AllocationAsk, 40)
+ asks := make([]*si.Allocation, 40)
appIDs := []string{app1ID, app2ID}
for i := 0; i < 40; i++ {
- asks[i] = &si.AllocationAsk{
+ asks[i] = &si.Allocation{
AllocationKey: fmt.Sprintf("alloc-%d", i),
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -701,8 +701,8 @@ partitions:
}
}
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: asks,
- RmID: "rm:123",
+ Allocations: asks,
+ RmID: "rm:123",
})
assert.NilError(t, err, "AllocationRequest 2 failed")
@@ -876,11 +876,11 @@ partitions:
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
- asks := make([]*si.AllocationAsk, param.numOfAsk)
+ asks := make([]*si.Allocation, param.numOfAsk)
for i := int32(0); i < param.numOfAsk; i++ {
- asks[i] = &si.AllocationAsk{
+ asks[i] = &si.Allocation{
AllocationKey: fmt.Sprintf("alloc-%d",
i),
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources:
map[string]*si.Quantity{
"memory": {Value:
param.askMemory},
"vcore": {Value:
param.askCPU},
@@ -890,8 +890,8 @@ partitions:
}
}
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: asks,
- RmID: "rm:123",
+ Allocations: asks,
+ RmID: "rm:123",
})
assert.NilError(t, err, "AllocationRequest 2 failed in
run %s", param.name)
@@ -1127,12 +1127,12 @@ partitions:
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
- asks := make([]*si.AllocationAsk, 40)
+ asks := make([]*si.Allocation, 40)
appIDs := []string{app1ID, app2ID}
for i := 0; i < 40; i++ {
- asks[i] = &si.AllocationAsk{
+ asks[i] = &si.Allocation{
AllocationKey: fmt.Sprintf("alloc-%d", i),
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -1142,8 +1142,8 @@ partitions:
}
}
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: asks,
- RmID: "rm:123",
+ Allocations: asks,
+ RmID: "rm:123",
})
assert.NilError(t, err, "UpdateRequest 2 failed")
@@ -1237,11 +1237,11 @@ partitions:
}
// Request 20 allocations
- asks := make([]*si.AllocationAsk, 20)
+ asks := make([]*si.Allocation, 20)
for i := 0; i < 20; i++ {
- asks[i] = &si.AllocationAsk{
+ asks[i] = &si.Allocation{
AllocationKey: fmt.Sprintf("alloc-%d", i),
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -1251,8 +1251,8 @@ partitions:
}
}
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: asks,
- RmID: "rm:123",
+ Allocations: asks,
+ RmID: "rm:123",
})
assert.NilError(t, err, "AllocationRequest failed")
@@ -1348,10 +1348,10 @@ func TestDupReleasesInGangScheduling(t *testing.T) {
// shim side creates a placeholder, and send the UpdateRequest
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -1382,10 +1382,10 @@ func TestDupReleasesInGangScheduling(t *testing.T) {
// shim submits the actual pod for scheduling
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-2",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -1544,10 +1544,10 @@ partitions:
assert.Equal(t, app01.CurrentState(), objects.New.String())
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
- Asks: []*si.AllocationAsk{
+ Allocations: []*si.Allocation{
{
AllocationKey: "alloc-1",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
@@ -1557,7 +1557,7 @@ partitions:
},
{
AllocationKey: "alloc-2",
- ResourceAsk: &si.Resource{
+ ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index ea1192cc..a87b26bd 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -21,7 +21,6 @@ package scheduler
import (
"strconv"
"testing"
- "time"
"gotest.tools/v3/assert"
@@ -568,26 +567,47 @@ func newAllocationAskPriority(allocKey, appID string, res
*resources.Resource, p
}
func newAllocationAskAll(allocKey, appID, taskGroup string, res
*resources.Resource, prio int32, placeHolder bool) *objects.Allocation {
- return objects.NewAllocationAskFromSI(&si.AllocationAsk{
- AllocationKey: allocKey,
- ApplicationID: appID,
- PartitionName: "test",
- ResourceAsk: res.ToProto(),
- Priority: prio,
- TaskGroupName: taskGroup,
- Placeholder: placeHolder,
+ return objects.NewAllocationFromSI(&si.Allocation{
+ AllocationKey: allocKey,
+ ApplicationID: appID,
+ PartitionName: "test",
+ ResourcePerAlloc: res.ToProto(),
+ Priority: prio,
+ TaskGroupName: taskGroup,
+ Placeholder: placeHolder,
+ })
+}
+
+func newAllocationTG(allocKey, appID, nodeID, taskGroup string, res
*resources.Resource, placeHolder bool) *objects.Allocation {
+ return newAllocationAll(allocKey, appID, nodeID, taskGroup, res, 1,
placeHolder)
+}
+
+func newAllocation(allocKey, appID, nodeID string, res *resources.Resource)
*objects.Allocation {
+ return newAllocationAll(allocKey, appID, nodeID, "", res, 1, false)
+}
+
+func newAllocationAll(allocKey, appID, nodeID, taskGroup string, res
*resources.Resource, prio int32, placeHolder bool) *objects.Allocation {
+ return objects.NewAllocationFromSI(&si.Allocation{
+ AllocationKey: allocKey,
+ ApplicationID: appID,
+ PartitionName: "test",
+ NodeID: nodeID,
+ ResourcePerAlloc: res.ToProto(),
+ Priority: prio,
+ TaskGroupName: taskGroup,
+ Placeholder: placeHolder,
})
}
func newAllocationAskPreempt(allocKey, appID string, prio int32, res
*resources.Resource) *objects.Allocation {
- return objects.NewAllocationAskFromSI(&si.AllocationAsk{
- AllocationKey: allocKey,
- ApplicationID: appID,
- PartitionName: "default",
- ResourceAsk: res.ToProto(),
- Priority: prio,
- TaskGroupName: taskGroup,
- Placeholder: false,
+ return objects.NewAllocationFromSI(&si.Allocation{
+ AllocationKey: allocKey,
+ ApplicationID: appID,
+ PartitionName: "default",
+ ResourcePerAlloc: res.ToProto(),
+ Priority: prio,
+ TaskGroupName: taskGroup,
+ Placeholder: false,
PreemptionPolicy: &si.PreemptionPolicy{
AllowPreemptSelf: true,
AllowPreemptOther: true,
@@ -726,9 +746,3 @@ func getMaxApplications(usage *dao.ResourceUsageDAOInfo,
maxApplications map[str
}
return maxApplications
}
-
-func markAllocated(nodeID string, alloc *objects.Allocation)
*objects.Allocation {
- alloc.SetBindTime(time.Now())
- alloc.SetNodeID(nodeID)
- return alloc
-}
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 00660a32..d63e63ba 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -619,16 +619,16 @@ func TestGetClusterUtilJSON(t *testing.T) {
resAlloc1 :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
500, siCommon.CPU: 300})
resAlloc2 :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
300, siCommon.CPU: 200})
- ask1 := objects.NewAllocationAsk("alloc-1", appID, resAlloc1)
- ask2 := objects.NewAllocationAsk("alloc-2", appID, resAlloc2)
- alloc1 := markAllocated(nodeID, ask1)
- alloc2 := markAllocated(nodeID, ask2)
+ alloc1 := newAlloc("alloc-1", appID, nodeID, resAlloc1)
+ alloc2 := newAlloc("alloc-2", appID, nodeID, resAlloc2)
err = partition.AddNode(node1)
assert.NilError(t, err, "add node to partition should not have failed")
- err = partition.AddAllocation(alloc1)
+ _, allocAdded, err := partition.UpdateAllocation(alloc1)
assert.NilError(t, err, "failed to add alloc1")
- err = partition.AddAllocation(alloc2)
+ assert.Check(t, allocAdded)
+ _, allocAdded, err = partition.UpdateAllocation(alloc2)
assert.NilError(t, err, "failed to add alloc2")
+ assert.Check(t, allocAdded)
// set expected result
utilMem := &dao.ClusterUtilDAOInfo{
@@ -684,18 +684,20 @@ func TestGetNodesUtilJSON(t *testing.T) {
// create test allocations
resAlloc1 :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
500, siCommon.CPU: 300})
resAlloc2 :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
300, siCommon.CPU: 500, "GPU": 5})
- ask1 := objects.NewAllocationAsk("alloc-1", app.ApplicationID,
resAlloc1)
- ask2 := objects.NewAllocationAsk("alloc-2", app.ApplicationID,
resAlloc2)
- allocs := []*objects.Allocation{markAllocated(node1.NodeID, ask1)}
+ alloc1 := newAlloc("alloc-1", app.ApplicationID, node1.NodeID,
resAlloc1)
+ allocs := []*objects.Allocation{alloc1}
err = partition.AddNode(node1)
assert.NilError(t, err, "add node to partition should not have failed")
- err = partition.AddAllocation(allocs[0])
+ _, allocCreated, err := partition.UpdateAllocation(allocs[0])
assert.NilError(t, err, "add alloc-1 should not have failed")
- allocs = []*objects.Allocation{markAllocated(node2.NodeID, ask2)}
+ assert.Check(t, allocCreated)
+ alloc2 := newAlloc("alloc-2", app.ApplicationID, node2.NodeID,
resAlloc2)
+ allocs = []*objects.Allocation{alloc2}
err = partition.AddNode(node2)
assert.NilError(t, err, "add node to partition should not have failed")
- err = partition.AddAllocation(allocs[0])
+ _, allocCreated, err = partition.UpdateAllocation(allocs[0])
assert.NilError(t, err, "add alloc-2 should not have failed")
+ assert.Check(t, allocCreated)
err = partition.AddNode(node3)
assert.NilError(t, err, "add node to partition should not have failed")
@@ -770,8 +772,7 @@ func TestGetNodeUtilisation(t *testing.T) {
assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 0),
"unexpected number of nodes returned should be 0")
resAlloc :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
- ask := objects.NewAllocationAsk("alloc-1", "app", resAlloc)
- alloc := markAllocated(node1.NodeID, ask)
+ alloc := newAlloc("alloc-1", "app", node1.NodeID, resAlloc)
assert.Assert(t, node1.TryAddAllocation(alloc), "unexpected failure
adding allocation to node")
rootQ := partition.GetQueue("root")
err = rootQ.IncAllocatedResource(resAlloc, false)
@@ -788,8 +789,7 @@ func TestGetNodeUtilisation(t *testing.T) {
// make second type dominant by using all
resAlloc =
resources.NewResourceFromMap(map[string]resources.Quantity{"second": 5})
- ask = objects.NewAllocationAsk("alloc-2", "app", resAlloc)
- alloc = markAllocated(node2.NodeID, ask)
+ alloc = newAlloc("alloc-2", "app", node2.NodeID, resAlloc)
assert.Assert(t, node2.TryAddAllocation(alloc), "unexpected failure
adding allocation to node")
err = rootQ.IncAllocatedResource(resAlloc, false)
assert.NilError(t, err, "unexpected error returned setting allocated
resource on queue")
@@ -815,8 +815,7 @@ func addNode(t *testing.T, partition
*scheduler.PartitionContext, nodeId string,
func addAllocatedResource(t *testing.T, node *objects.Node, allocationKey
string, appID string, quantityMap map[string]resources.Quantity) {
t.Helper()
resAlloc := resources.NewResourceFromMap(quantityMap)
- ask := objects.NewAllocationAsk(allocationKey, appID, resAlloc)
- alloc := markAllocated(node.NodeID, ask)
+ alloc := newAlloc(allocationKey, appID, node.NodeID, resAlloc)
assert.Assert(t, node.TryAddAllocation(alloc), "unexpected failure
adding allocation to node")
}
@@ -958,7 +957,7 @@ func getNodesUtilByType(t *testing.T, nodesUtilList
[]*dao.NodesUtilDAOInfo, res
return nil
}
-func TestPartitions(t *testing.T) {
+func TestPartitions(t *testing.T) { //nolint:funlen
schedulerContext.Store(&scheduler.ClusterContext{})
var req *http.Request
@@ -1015,18 +1014,20 @@ func TestPartitions(t *testing.T) {
// create test allocations
resAlloc1 :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
100, siCommon.CPU: 400})
resAlloc2 :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
200, siCommon.CPU: 300})
- ask1 := objects.NewAllocationAsk("alloc-1", app5.ApplicationID,
resAlloc1)
- ask2 := objects.NewAllocationAsk("alloc-2", app2.ApplicationID,
resAlloc2)
- allocs := []*objects.Allocation{markAllocated(node1ID, ask1)}
+ alloc1 := newAlloc("alloc-1", app5.ApplicationID, node1ID, resAlloc1)
+ allocs := []*objects.Allocation{alloc1}
err = defaultPartition.AddNode(node1)
assert.NilError(t, err, "add node to partition should not have failed")
- err = defaultPartition.AddAllocation(allocs[0])
+ _, allocCreated, err := defaultPartition.UpdateAllocation(allocs[0])
assert.NilError(t, err, "add alloc-1 should not have failed")
- allocs = []*objects.Allocation{markAllocated(node2ID, ask2)}
+ assert.Check(t, allocCreated)
+ alloc2 := newAlloc("alloc-2", app2.ApplicationID, node1ID, resAlloc2)
+ allocs = []*objects.Allocation{alloc2}
err = defaultPartition.AddNode(node2)
assert.NilError(t, err, "add node to partition should not have failed")
- err = defaultPartition.AddAllocation(allocs[0])
+ _, allocCreated, err = defaultPartition.UpdateAllocation(allocs[0])
assert.NilError(t, err, "add alloc-2 should not have failed")
+ assert.Check(t, allocCreated)
req, err = http.NewRequest("GET", "/ws/v1/partitions",
strings.NewReader(""))
assert.NilError(t, err, "App Handler request failed")
@@ -1295,19 +1296,21 @@ func TestGetPartitionNodes(t *testing.T) {
// create test allocations
resAlloc1 :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
500, siCommon.CPU: 300})
resAlloc2 :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
300, siCommon.CPU: 500})
- ask1 := objects.NewAllocationAsk("alloc-1", appID, resAlloc1)
- ask2 := objects.NewAllocationAsk("alloc-2", appID, resAlloc2)
- allocs := []*objects.Allocation{markAllocated(node1ID, ask1)}
+ alloc1 := newAlloc("alloc-1", appID, node1ID, resAlloc1)
+ allocs := []*objects.Allocation{alloc1}
err = partition.AddNode(node1)
assert.NilError(t, err, "add node to partition should not have failed")
- err = partition.AddAllocation(allocs[0])
+ _, allocCreated, err := partition.UpdateAllocation(allocs[0])
assert.NilError(t, err, "add alloc-1 should not have failed")
+ assert.Check(t, allocCreated)
- allocs = []*objects.Allocation{markAllocated(node2ID, ask2)}
+ alloc2 := newAlloc("alloc-2", appID, node2ID, resAlloc2)
+ allocs = []*objects.Allocation{alloc2}
err = partition.AddNode(node2)
assert.NilError(t, err, "add node to partition should not have failed")
- err = partition.AddAllocation(allocs[0])
+ _, allocCreated, err = partition.UpdateAllocation(allocs[0])
assert.NilError(t, err, "add alloc-2 should not have failed")
+ assert.Check(t, allocCreated)
NewWebApp(schedulerContext.Load(), nil)
@@ -1409,12 +1412,12 @@ func TestGetQueueApplicationsHandler(t *testing.T) {
res := &si.Resource{
Resources: map[string]*si.Quantity{"vcore": {Value: 1}},
}
- ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{
- ApplicationID: "app-1",
- PartitionName: part.Name,
- TaskGroupName: tg,
- ResourceAsk: res,
- Placeholder: true})
+ ask := objects.NewAllocationFromSI(&si.Allocation{
+ ApplicationID: "app-1",
+ PartitionName: part.Name,
+ TaskGroupName: tg,
+ ResourcePerAlloc: res,
+ Placeholder: true})
err := app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
app.SetTimedOutPlaceholder(tg, 1)
@@ -1721,10 +1724,10 @@ func TestGetApplicationHandler(t *testing.T) {
res := &si.Resource{
Resources: map[string]*si.Quantity{"vcore": {Value: 1}},
}
- ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{
- ApplicationID: "app-1",
- PartitionName: part.Name,
- ResourceAsk: res})
+ ask := objects.NewAllocationFromSI(&si.Allocation{
+ ApplicationID: "app-1",
+ PartitionName: part.Name,
+ ResourcePerAlloc: res})
err := app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
@@ -2674,15 +2677,15 @@ func prepareUserAndGroupContext(t *testing.T, config
string) {
res := &si.Resource{
Resources: map[string]*si.Quantity{"vcore": {Value: 1}},
}
- ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{
- ApplicationID: "app-1",
- PartitionName: part.Name,
- ResourceAsk: res})
+ ask := objects.NewAllocationFromSI(&si.Allocation{
+ ApplicationID: "app-1",
+ PartitionName: part.Name,
+ ResourcePerAlloc: res})
err := app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
// add an alloc
- allocInfo := markAllocated("node-1", ask)
+ allocInfo := newAlloc(ask.GetAllocationKey(), ask.GetApplicationID(),
"node-1", ask.GetAllocatedResource())
app.AddAllocation(allocInfo)
assert.Assert(t, app.IsRunning(), "Application did not return running
state after alloc: %s", app.CurrentState())
@@ -2884,8 +2887,11 @@ func NewResponseRecorderWithDeadline()
*ResponseRecorderWithDeadline {
}
}
-func markAllocated(nodeID string, alloc *objects.Allocation)
*objects.Allocation {
- alloc.SetBindTime(time.Now())
- alloc.SetNodeID(nodeID)
- return alloc
+func newAlloc(allocationKey string, appID string, nodeID string, resAlloc
*resources.Resource) *objects.Allocation {
+ return objects.NewAllocationFromSI(&si.Allocation{
+ AllocationKey: allocationKey,
+ ApplicationID: appID,
+ NodeID: nodeID,
+ ResourcePerAlloc: resAlloc.ToProto(),
+ })
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]